From a20d887f9e8eae6bf34bb73480027a5140710df3 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Wed, 4 Feb 2026 03:45:26 +0000 Subject: [PATCH] feat: add parameter validation and safety guard for exec tool --- nanobot/agent/loop.py | 12 ++++++-- nanobot/agent/subagent.py | 9 +++++- nanobot/agent/tools/base.py | 57 +++++++++++++++--------------------- nanobot/agent/tools/shell.py | 31 ++++++++------------ nanobot/cli/commands.py | 6 ++-- nanobot/config/schema.py | 7 +++++ 6 files changed, 64 insertions(+), 58 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 4a96b84..bfe6e89 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -40,14 +40,17 @@ class AgentLoop: workspace: Path, model: str | None = None, max_iterations: int = 20, - brave_api_key: str | None = None + brave_api_key: str | None = None, + exec_config: "ExecToolConfig | None" = None, ): + from nanobot.config.schema import ExecToolConfig self.bus = bus self.provider = provider self.workspace = workspace self.model = model or provider.get_default_model() self.max_iterations = max_iterations self.brave_api_key = brave_api_key + self.exec_config = exec_config or ExecToolConfig() self.context = ContextBuilder(workspace) self.sessions = SessionManager(workspace) @@ -58,6 +61,7 @@ class AgentLoop: bus=bus, model=self.model, brave_api_key=brave_api_key, + exec_config=self.exec_config, ) self._running = False @@ -72,7 +76,11 @@ class AgentLoop: self.tools.register(ListDirTool()) # Shell tool - self.tools.register(ExecTool(working_dir=str(self.workspace))) + self.tools.register(ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.exec_config.restrict_to_workspace, + )) # Web tools self.tools.register(WebSearchTool(api_key=self.brave_api_key)) diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index d3b320c..05ffbb8 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -33,12 +33,15 @@ class SubagentManager: bus: MessageBus, model: str | None = None, brave_api_key: str | None = None, + exec_config: "ExecToolConfig | None" = None, ): + from nanobot.config.schema import ExecToolConfig self.provider = provider self.workspace = workspace self.bus = bus self.model = model or provider.get_default_model() self.brave_api_key = brave_api_key + self.exec_config = exec_config or ExecToolConfig() self._running_tasks: dict[str, asyncio.Task[None]] = {} async def spawn( @@ -96,7 +99,11 @@ class SubagentManager: tools.register(ReadFileTool()) tools.register(WriteFileTool()) tools.register(ListDirTool()) - tools.register(ExecTool(working_dir=str(self.workspace))) + tools.register(ExecTool( + working_dir=str(self.workspace), + timeout=self.exec_config.timeout, + restrict_to_workspace=self.exec_config.restrict_to_workspace, + )) tools.register(WebSearchTool(api_key=self.brave_api_key)) tools.register(WebFetchTool()) diff --git a/nanobot/agent/tools/base.py b/nanobot/agent/tools/base.py index 5888a77..cbaadbd 100644 --- a/nanobot/agent/tools/base.py +++ b/nanobot/agent/tools/base.py @@ -12,6 +12,15 @@ class Tool(ABC): the environment, such as reading files, executing commands, etc. """ + _TYPE_MAP = { + "string": str, + "integer": int, + "number": (int, float), + "boolean": bool, + "array": list, + "object": dict, + } + @property @abstractmethod def name(self) -> str: @@ -65,60 +74,40 @@ class Tool(ABC): def _validate_schema(self, value: Any, schema: dict[str, Any], path: str) -> list[str]: errors: list[str] = [] expected_type = schema.get("type") + label = path or "parameter" - type_map = { - "string": str, - "integer": int, - "number": (int, float), - "boolean": bool, - "array": list, - "object": dict, - } - - def label(p: str) -> str: - return p or "parameter" - - if expected_type in type_map and not isinstance(value, type_map[expected_type]): - errors.append(f"{label(path)} should be {expected_type}") - return errors + if expected_type in self._TYPE_MAP and not isinstance(value, self._TYPE_MAP[expected_type]): + return [f"{label} should be {expected_type}"] if "enum" in schema and value not in schema["enum"]: - errors.append(f"{label(path)} must be one of {schema['enum']}") + errors.append(f"{label} must be one of {schema['enum']}") if expected_type in ("integer", "number"): if "minimum" in schema and value < schema["minimum"]: - errors.append(f"{label(path)} must be >= {schema['minimum']}") + errors.append(f"{label} must be >= {schema['minimum']}") if "maximum" in schema and value > schema["maximum"]: - errors.append(f"{label(path)} must be <= {schema['maximum']}") + errors.append(f"{label} must be <= {schema['maximum']}") if expected_type == "string": if "minLength" in schema and len(value) < schema["minLength"]: - errors.append(f"{label(path)} must be at least {schema['minLength']} chars") + errors.append(f"{label} must be at least {schema['minLength']} chars") if "maxLength" in schema and len(value) > schema["maxLength"]: - errors.append(f"{label(path)} must be at most {schema['maxLength']} chars") + errors.append(f"{label} must be at most {schema['maxLength']} chars") if expected_type == "object": - properties = schema.get("properties", {}) or {} - required = set(schema.get("required", []) or []) - - for key in required: + properties = schema.get("properties", {}) + for key in schema.get("required", []): if key not in value: - p = f"{path}.{key}" if path else key - errors.append(f"missing required {p}") - + errors.append(f"missing required {path}.{key}" if path else f"missing required {key}") for key, item in value.items(): - prop_schema = properties.get(key) - if not prop_schema: - continue # ignore unknown fields - p = f"{path}.{key}" if path else key - errors.extend(self._validate_schema(item, prop_schema, p)) + if key in properties: + errors.extend(self._validate_schema(item, properties[key], f"{path}.{key}" if path else key)) if expected_type == "array": items_schema = schema.get("items") if items_schema: for idx, item in enumerate(value): - p = f"{path}[{idx}]" if path else f"[{idx}]" - errors.extend(self._validate_schema(item, items_schema, p)) + errors.extend(self._validate_schema(item, items_schema, f"{path}[{idx}]" if path else f"[{idx}]")) return errors diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index ce00bca..143d187 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -18,29 +18,22 @@ class ExecTool(Tool): working_dir: str | None = None, deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, - restrict_to_working_dir: bool = False, + restrict_to_workspace: bool = False, ): self.timeout = timeout self.working_dir = working_dir self.deny_patterns = deny_patterns or [ - r"\brm\s+-rf\b", - r"\brm\s+-fr\b", - r"\brm\s+-r\b", - r"\bdel\s+/f\b", - r"\bdel\s+/q\b", - r"\brmdir\s+/s\b", - r"\bformat\b", - r"\bmkfs\b", - r"\bdd\s+if=", - r">\s*/dev/sd", - r"\bdiskpart\b", - r"\bshutdown\b", - r"\breboot\b", - r"\bpoweroff\b", - r":\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\};\s*:", + r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr + r"\bdel\s+/[fq]\b", # del /f, del /q + r"\brmdir\s+/s\b", # rmdir /s + r"\b(format|mkfs|diskpart)\b", # disk operations + r"\bdd\s+if=", # dd + r">\s*/dev/sd", # write to disk + r"\b(shutdown|reboot|poweroff)\b", # system power + r":\(\)\s*\{.*\};\s*:", # fork bomb ] self.allow_patterns = allow_patterns or [] - self.restrict_to_working_dir = restrict_to_working_dir + self.restrict_to_workspace = restrict_to_workspace @property def name(self) -> str: @@ -128,14 +121,14 @@ class ExecTool(Tool): if not any(re.search(p, lower) for p in self.allow_patterns): return "Error: Command blocked by safety guard (not in allowlist)" - if self.restrict_to_working_dir: + if self.restrict_to_workspace: if "..\\" in cmd or "../" in cmd: return "Error: Command blocked by safety guard (path traversal detected)" cwd_path = Path(cwd).resolve() win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd) - posix_paths = re.findall(r"/[^\\s\"']+", cmd) + posix_paths = re.findall(r"/[^\s\"']+", cmd) for raw in win_paths + posix_paths: try: diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 5ecc31b..6b95667 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -202,7 +202,8 @@ def gateway( workspace=config.workspace_path, model=config.agents.defaults.model, max_iterations=config.agents.defaults.max_tool_iterations, - brave_api_key=config.tools.web.search.api_key or None + brave_api_key=config.tools.web.search.api_key or None, + exec_config=config.tools.exec, ) # Create cron service @@ -309,7 +310,8 @@ def agent( bus=bus, provider=provider, workspace=config.workspace_path, - brave_api_key=config.tools.web.search.api_key or None + brave_api_key=config.tools.web.search.api_key or None, + exec_config=config.tools.exec, ) if message: diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 71e3361..4c34834 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -73,9 +73,16 @@ class WebToolsConfig(BaseModel): search: WebSearchConfig = Field(default_factory=WebSearchConfig) +class ExecToolConfig(BaseModel): + """Shell exec tool configuration.""" + timeout: int = 60 + restrict_to_workspace: bool = False # If true, block commands accessing paths outside workspace + + class ToolsConfig(BaseModel): """Tools configuration.""" web: WebToolsConfig = Field(default_factory=WebToolsConfig) + exec: ExecToolConfig = Field(default_factory=ExecToolConfig) class Config(BaseSettings):