diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py index c9d989c..ad72604 100644 --- a/nanobot/agent/tools/web.py +++ b/nanobot/agent/tools/web.py @@ -5,6 +5,7 @@ import json import os import re from typing import Any +from urllib.parse import urlparse import httpx @@ -12,6 +13,7 @@ from nanobot.agent.tools.base import Tool # Shared constants USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_7_2) AppleWebKit/537.36" +MAX_REDIRECTS = 5 # Limit redirects to prevent DoS attacks def _strip_tags(text: str) -> str: @@ -28,6 +30,33 @@ def _normalize(text: str) -> str: return re.sub(r'\n{3,}', '\n\n', text).strip() +def _validate_url(url: str) -> tuple[bool, str]: + """ + Validate URL for security. + + Returns: + (is_valid, error_message): Tuple of validation result and error message if invalid. + """ + try: + parsed = urlparse(url) + + # Check if scheme exists + if not parsed.scheme: + return False, "URL must include a scheme (http:// or https://)" + + # Only allow http and https schemes + if parsed.scheme.lower() not in ('http', 'https'): + return False, f"Invalid URL scheme '{parsed.scheme}'. Only http:// and https:// are allowed" + + # Check if netloc (domain) exists + if not parsed.netloc: + return False, "URL must include a valid domain" + + return True, "" + except Exception as e: + return False, f"Invalid URL format: {str(e)}" + + class WebSearchTool(Tool): """Search the web using Brave Search API.""" @@ -95,12 +124,21 @@ class WebFetchTool(Tool): async def execute(self, url: str, extractMode: str = "markdown", maxChars: int | None = None, **kwargs: Any) -> str: from readability import Document - + max_chars = maxChars or self.max_chars - + + # Validate URL before fetching + is_valid, error_msg = _validate_url(url) + if not is_valid: + return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url}) + try: - async with httpx.AsyncClient() as client: - r = await client.get(url, headers={"User-Agent": USER_AGENT}, follow_redirects=True, timeout=30.0) + async with httpx.AsyncClient( + follow_redirects=True, + max_redirects=MAX_REDIRECTS, + timeout=30.0 + ) as client: + r = await client.get(url, headers={"User-Agent": USER_AGENT}) r.raise_for_status() ctype = r.headers.get("content-type", "")