From ea849650efee5c9df32834c7dd79284c6b147eb7 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Mon, 2 Feb 2026 19:34:22 +0800 Subject: [PATCH] feat: improve web_fetch URL validation and security Add URL validation and redirect limits to web_fetch tool to prevent potential security issues: - Add _validate_url() function to validate URLs before fetching - Only allow http:// and https:// schemes (prevent file://, ftp://, etc.) - Verify URL has valid scheme and domain - Return descriptive error messages for invalid URLs - Limit HTTP redirects to 5 (down from default 20) to prevent DoS attacks - Add MAX_REDIRECTS constant for easy configuration - Explicitly configure httpx.AsyncClient with max_redirects parameter - Improve error handling with JSON error responses for validation failures This addresses security concerns identified in code review where web_fetch had no URL validation or redirect limits, potentially allowing: - Unsafe URL schemes (file://, etc.) - Redirect-based DoS attacks - Invalid URL formats causing unclear errors --- nanobot/agent/tools/web.py | 46 ++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py index c9d989c..ad72604 100644 --- a/nanobot/agent/tools/web.py +++ b/nanobot/agent/tools/web.py @@ -5,6 +5,7 @@ import json import os import re from typing import Any +from urllib.parse import urlparse import httpx @@ -12,6 +13,7 @@ from nanobot.agent.tools.base import Tool # Shared constants USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_7_2) AppleWebKit/537.36" +MAX_REDIRECTS = 5 # Limit redirects to prevent DoS attacks def _strip_tags(text: str) -> str: @@ -28,6 +30,33 @@ def _normalize(text: str) -> str: return re.sub(r'\n{3,}', '\n\n', text).strip() +def _validate_url(url: str) -> tuple[bool, str]: + """ + Validate URL for security. + + Returns: + (is_valid, error_message): Tuple of validation result and error message if invalid. + """ + try: + parsed = urlparse(url) + + # Check if scheme exists + if not parsed.scheme: + return False, "URL must include a scheme (http:// or https://)" + + # Only allow http and https schemes + if parsed.scheme.lower() not in ('http', 'https'): + return False, f"Invalid URL scheme '{parsed.scheme}'. Only http:// and https:// are allowed" + + # Check if netloc (domain) exists + if not parsed.netloc: + return False, "URL must include a valid domain" + + return True, "" + except Exception as e: + return False, f"Invalid URL format: {str(e)}" + + class WebSearchTool(Tool): """Search the web using Brave Search API.""" @@ -95,12 +124,21 @@ class WebFetchTool(Tool): async def execute(self, url: str, extractMode: str = "markdown", maxChars: int | None = None, **kwargs: Any) -> str: from readability import Document - + max_chars = maxChars or self.max_chars - + + # Validate URL before fetching + is_valid, error_msg = _validate_url(url) + if not is_valid: + return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url}) + try: - async with httpx.AsyncClient() as client: - r = await client.get(url, headers={"User-Agent": USER_AGENT}, follow_redirects=True, timeout=30.0) + async with httpx.AsyncClient( + follow_redirects=True, + max_redirects=MAX_REDIRECTS, + timeout=30.0 + ) as client: + r = await client.get(url, headers={"User-Agent": USER_AGENT}) r.raise_for_status() ctype = r.headers.get("content-type", "")