feat: improve web_fetch URL validation and security
Add URL validation and redirect limits to web_fetch tool to prevent potential security issues: - Add _validate_url() function to validate URLs before fetching - Only allow http:// and https:// schemes (prevent file://, ftp://, etc.) - Verify URL has valid scheme and domain - Return descriptive error messages for invalid URLs - Limit HTTP redirects to 5 (down from default 20) to prevent DoS attacks - Add MAX_REDIRECTS constant for easy configuration - Explicitly configure httpx.AsyncClient with max_redirects parameter - Improve error handling with JSON error responses for validation failures This addresses security concerns identified in code review where web_fetch had no URL validation or redirect limits, potentially allowing: - Unsafe URL schemes (file://, etc.) - Redirect-based DoS attacks - Invalid URL formats causing unclear errors
This commit is contained in:
parent
229fde021a
commit
ea849650ef
@ -5,6 +5,7 @@ import json
|
||||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import httpx
|
||||
|
||||
@ -12,6 +13,7 @@ from nanobot.agent.tools.base import Tool
|
||||
|
||||
# Shared constants
|
||||
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_7_2) AppleWebKit/537.36"
|
||||
MAX_REDIRECTS = 5 # Limit redirects to prevent DoS attacks
|
||||
|
||||
|
||||
def _strip_tags(text: str) -> str:
|
||||
@ -28,6 +30,33 @@ def _normalize(text: str) -> str:
|
||||
return re.sub(r'\n{3,}', '\n\n', text).strip()
|
||||
|
||||
|
||||
def _validate_url(url: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate URL for security.
|
||||
|
||||
Returns:
|
||||
(is_valid, error_message): Tuple of validation result and error message if invalid.
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Check if scheme exists
|
||||
if not parsed.scheme:
|
||||
return False, "URL must include a scheme (http:// or https://)"
|
||||
|
||||
# Only allow http and https schemes
|
||||
if parsed.scheme.lower() not in ('http', 'https'):
|
||||
return False, f"Invalid URL scheme '{parsed.scheme}'. Only http:// and https:// are allowed"
|
||||
|
||||
# Check if netloc (domain) exists
|
||||
if not parsed.netloc:
|
||||
return False, "URL must include a valid domain"
|
||||
|
||||
return True, ""
|
||||
except Exception as e:
|
||||
return False, f"Invalid URL format: {str(e)}"
|
||||
|
||||
|
||||
class WebSearchTool(Tool):
|
||||
"""Search the web using Brave Search API."""
|
||||
|
||||
@ -95,12 +124,21 @@ class WebFetchTool(Tool):
|
||||
|
||||
async def execute(self, url: str, extractMode: str = "markdown", maxChars: int | None = None, **kwargs: Any) -> str:
|
||||
from readability import Document
|
||||
|
||||
|
||||
max_chars = maxChars or self.max_chars
|
||||
|
||||
|
||||
# Validate URL before fetching
|
||||
is_valid, error_msg = _validate_url(url)
|
||||
if not is_valid:
|
||||
return json.dumps({"error": f"URL validation failed: {error_msg}", "url": url})
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
r = await client.get(url, headers={"User-Agent": USER_AGENT}, follow_redirects=True, timeout=30.0)
|
||||
async with httpx.AsyncClient(
|
||||
follow_redirects=True,
|
||||
max_redirects=MAX_REDIRECTS,
|
||||
timeout=30.0
|
||||
) as client:
|
||||
r = await client.get(url, headers={"User-Agent": USER_AGENT})
|
||||
r.raise_for_status()
|
||||
|
||||
ctype = r.headers.get("content-type", "")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user