From c5191eed1a33936102911b50042bda8854d8592b Mon Sep 17 00:00:00 2001 From: Re-bin Date: Fri, 6 Feb 2026 09:16:20 +0000 Subject: [PATCH] refactor: unify workspace restriction for file tools, remove redundant checks, fix SECURITY.md --- .gitignore | 2 + SECURITY.md | 6 +-- nanobot/agent/loop.py | 11 ++-- nanobot/agent/subagent.py | 7 +-- nanobot/agent/tools/filesystem.py | 84 +++++++++++-------------------- nanobot/agent/tools/shell.py | 29 +---------- nanobot/channels/base.py | 5 +- pyproject.toml | 2 +- 8 files changed, 49 insertions(+), 97 deletions(-) diff --git a/.gitignore b/.gitignore index d066237..316e214 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,7 @@ docs/ *.pyz *.pywz *.pyzz +.venv/ +__pycache__/ poetry.lock .pytest_cache/ diff --git a/SECURITY.md b/SECURITY.md index 193a993..ac15ba4 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -55,7 +55,7 @@ chmod 600 ~/.nanobot/config.json ``` **Security Notes:** -- Empty `allowFrom` list will **BLOCK ALL** users (fail-closed by design) +- Empty `allowFrom` list will **ALLOW ALL** users (open by default for personal use) - Get your Telegram user ID from `@userinfobot` - Use full phone numbers with country code for WhatsApp - Review access logs regularly for unauthorized access attempts @@ -120,7 +120,7 @@ npm audit fix ``` **Important Notes:** -- We've updated `litellm` to `>=1.61.15` to fix critical vulnerabilities +- Keep `litellm` updated to the latest version for security fixes - We've updated `ws` to `>=8.17.1` to fix DoS vulnerability - Run `pip-audit` or `npm audit` regularly - Subscribe to security advisories for nanobot and its dependencies @@ -214,7 +214,7 @@ If you suspect a security breach: ✅ **Authentication** - Allow-list based access control - Failed authentication attempt logging -- Fail-closed by default (deny if no allowFrom configured) +- Open by default (configure allowFrom for production use) ✅ **Resource Protection** - Command execution timeouts (60s default) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 48c9908..85accda 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -73,11 +73,12 @@ class AgentLoop: def _register_default_tools(self) -> None: """Register the default set of tools.""" - # File tools - self.tools.register(ReadFileTool()) - self.tools.register(WriteFileTool()) - self.tools.register(EditFileTool()) - self.tools.register(ListDirTool()) + # File tools (restrict to workspace if configured) + allowed_dir = self.workspace if self.exec_config.restrict_to_workspace else None + self.tools.register(ReadFileTool(allowed_dir=allowed_dir)) + self.tools.register(WriteFileTool(allowed_dir=allowed_dir)) + self.tools.register(EditFileTool(allowed_dir=allowed_dir)) + self.tools.register(ListDirTool(allowed_dir=allowed_dir)) # Shell tool self.tools.register(ExecTool( diff --git a/nanobot/agent/subagent.py b/nanobot/agent/subagent.py index b3ada77..7c42116 100644 --- a/nanobot/agent/subagent.py +++ b/nanobot/agent/subagent.py @@ -96,9 +96,10 @@ class SubagentManager: try: # Build subagent tools (no message tool, no spawn tool) tools = ToolRegistry() - tools.register(ReadFileTool()) - tools.register(WriteFileTool()) - tools.register(ListDirTool()) + allowed_dir = self.workspace if self.exec_config.restrict_to_workspace else None + tools.register(ReadFileTool(allowed_dir=allowed_dir)) + tools.register(WriteFileTool(allowed_dir=allowed_dir)) + tools.register(ListDirTool(allowed_dir=allowed_dir)) tools.register(ExecTool( working_dir=str(self.workspace), timeout=self.exec_config.timeout, diff --git a/nanobot/agent/tools/filesystem.py b/nanobot/agent/tools/filesystem.py index d7c8323..6b3254a 100644 --- a/nanobot/agent/tools/filesystem.py +++ b/nanobot/agent/tools/filesystem.py @@ -6,37 +6,20 @@ from typing import Any from nanobot.agent.tools.base import Tool -def _validate_path(path: str, base_dir: Path | None = None) -> tuple[bool, Path | str]: - """ - Validate path to prevent directory traversal attacks. - - Args: - path: The path to validate - base_dir: Optional base directory to restrict operations to - - Returns: - Tuple of (is_valid, resolved_path_or_error_message) - """ - try: - file_path = Path(path).expanduser().resolve() - - # If base_dir is specified, ensure the path is within it - if base_dir is not None: - base_resolved = base_dir.resolve() - try: - # Check if file_path is relative to base_dir - file_path.relative_to(base_resolved) - except ValueError: - return False, f"Error: Path {path} is outside allowed directory" - - return True, file_path - except Exception as e: - return False, f"Error: Invalid path: {str(e)}" +def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path: + """Resolve path and optionally enforce directory restriction.""" + resolved = Path(path).expanduser().resolve() + if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())): + raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}") + return resolved class ReadFileTool(Tool): """Tool to read file contents.""" + def __init__(self, allowed_dir: Path | None = None): + self._allowed_dir = allowed_dir + @property def name(self) -> str: return "read_file" @@ -60,11 +43,7 @@ class ReadFileTool(Tool): async def execute(self, path: str, **kwargs: Any) -> str: try: - is_valid, result = _validate_path(path) - if not is_valid: - return str(result) - - file_path = result + file_path = _resolve_path(path, self._allowed_dir) if not file_path.exists(): return f"Error: File not found: {path}" if not file_path.is_file(): @@ -72,8 +51,8 @@ class ReadFileTool(Tool): content = file_path.read_text(encoding="utf-8") return content - except PermissionError: - return f"Error: Permission denied: {path}" + except PermissionError as e: + return f"Error: {e}" except Exception as e: return f"Error reading file: {str(e)}" @@ -81,6 +60,9 @@ class ReadFileTool(Tool): class WriteFileTool(Tool): """Tool to write content to a file.""" + def __init__(self, allowed_dir: Path | None = None): + self._allowed_dir = allowed_dir + @property def name(self) -> str: return "write_file" @@ -108,16 +90,12 @@ class WriteFileTool(Tool): async def execute(self, path: str, content: str, **kwargs: Any) -> str: try: - is_valid, result = _validate_path(path) - if not is_valid: - return str(result) - - file_path = result + file_path = _resolve_path(path, self._allowed_dir) file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content, encoding="utf-8") return f"Successfully wrote {len(content)} bytes to {path}" - except PermissionError: - return f"Error: Permission denied: {path}" + except PermissionError as e: + return f"Error: {e}" except Exception as e: return f"Error writing file: {str(e)}" @@ -125,6 +103,9 @@ class WriteFileTool(Tool): class EditFileTool(Tool): """Tool to edit a file by replacing text.""" + def __init__(self, allowed_dir: Path | None = None): + self._allowed_dir = allowed_dir + @property def name(self) -> str: return "edit_file" @@ -156,11 +137,7 @@ class EditFileTool(Tool): async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str: try: - is_valid, result = _validate_path(path) - if not is_valid: - return str(result) - - file_path = result + file_path = _resolve_path(path, self._allowed_dir) if not file_path.exists(): return f"Error: File not found: {path}" @@ -178,8 +155,8 @@ class EditFileTool(Tool): file_path.write_text(new_content, encoding="utf-8") return f"Successfully edited {path}" - except PermissionError: - return f"Error: Permission denied: {path}" + except PermissionError as e: + return f"Error: {e}" except Exception as e: return f"Error editing file: {str(e)}" @@ -187,6 +164,9 @@ class EditFileTool(Tool): class ListDirTool(Tool): """Tool to list directory contents.""" + def __init__(self, allowed_dir: Path | None = None): + self._allowed_dir = allowed_dir + @property def name(self) -> str: return "list_dir" @@ -210,11 +190,7 @@ class ListDirTool(Tool): async def execute(self, path: str, **kwargs: Any) -> str: try: - is_valid, result = _validate_path(path) - if not is_valid: - return str(result) - - dir_path = result + dir_path = _resolve_path(path, self._allowed_dir) if not dir_path.exists(): return f"Error: Directory not found: {path}" if not dir_path.is_dir(): @@ -229,7 +205,7 @@ class ListDirTool(Tool): return f"Directory {path} is empty" return "\n".join(items) - except PermissionError: - return f"Error: Permission denied: {path}" + except PermissionError as e: + return f"Error: {e}" except Exception as e: return f"Error listing directory: {str(e)}" diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 805d36c..143d187 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -3,34 +3,12 @@ import asyncio import os import re +from pathlib import Path from typing import Any from nanobot.agent.tools.base import Tool -# List of potentially dangerous command patterns -DANGEROUS_PATTERNS = [ - r'rm\s+-rf\s+/(?:\s|$)', # rm -rf / (at root, followed by space or end) - r':\(\)\{\s*:\|:&\s*\};:', # fork bomb - r'mkfs\.', # format filesystem - r'dd\s+if=.*\s+of=/dev/(sd|hd)', # overwrite disk - r'>\s*/dev/(sd|hd)', # write to raw disk device -] - - -def validate_command_safety(command: str) -> tuple[bool, str | None]: - """ - Check if a command contains dangerous patterns. - - Returns: - Tuple of (is_dangerous, warning_message) - """ - for pattern in DANGEROUS_PATTERNS: - if re.search(pattern, command, re.IGNORECASE): - return True, f"Warning: Command contains potentially dangerous pattern: {pattern}" - return False, None - - class ExecTool(Tool): """Tool to execute shell commands.""" @@ -83,11 +61,6 @@ class ExecTool(Tool): } async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str: - # Check for dangerous command patterns - is_dangerous, warning = validate_command_safety(command) - if is_dangerous: - return f"Error: Refusing to execute dangerous command. {warning}" - cwd = working_dir or self.working_dir or os.getcwd() guard_error = self._guard_command(command, cwd) if guard_error: diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 086b762..30fcd1a 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -70,10 +70,9 @@ class BaseChannel(ABC): """ allow_list = getattr(self.config, "allow_from", []) - # Fail-closed: if no allow list is configured, deny access - # Users must explicitly configure allowed senders + # If no allow list, allow everyone if not allow_list: - return False + return True sender_str = str(sender_id) if sender_str in allow_list: diff --git a/pyproject.toml b/pyproject.toml index 6aa8a83..2a952a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ classifiers = [ dependencies = [ "typer>=0.9.0", - "litellm>=1.61.15", + "litellm>=1.0.0", "pydantic>=2.0.0", "pydantic-settings>=2.0.0", "websockets>=12.0",