""" Confirmation token system. Generates and validates signed tokens for confirmed actions. """ import hashlib import hmac import json import time from datetime import datetime, timedelta from typing import Optional, Dict, Any from pathlib import Path import secrets class ConfirmationToken: """Confirmation token for high-risk actions.""" def __init__(self, secret_key: Optional[str] = None): """ Initialize token system. Args: secret_key: Secret key for signing tokens. If None, generates one. """ if secret_key is None: # Load or generate secret key key_file = Path(__file__).parent.parent.parent / "data" / ".confirmation_secret" if key_file.exists(): self.secret_key = key_file.read_text().strip() else: self.secret_key = secrets.token_urlsafe(32) key_file.parent.mkdir(parents=True, exist_ok=True) key_file.write_text(self.secret_key) else: self.secret_key = secret_key def generate_token(self, tool_name: str, parameters: Dict[str, Any], session_id: Optional[str] = None, user_id: Optional[str] = None, expires_in: int = 300) -> str: """ Generate a signed confirmation token. Args: tool_name: Name of the tool parameters: Tool parameters session_id: Session ID user_id: User ID expires_in: Token expiration in seconds (default 5 minutes) Returns: Signed token string """ payload = { "tool_name": tool_name, "parameters": parameters, "session_id": session_id, "user_id": user_id, "timestamp": datetime.now().isoformat(), "expires_at": (datetime.now() + timedelta(seconds=expires_in)).isoformat(), } # Create signature payload_str = json.dumps(payload, sort_keys=True) signature = hmac.new( self.secret_key.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() # Combine payload and signature token_data = { "payload": payload, "signature": signature } # Encode as base64-like string (simplified) import base64 token_str = base64.urlsafe_b64encode( json.dumps(token_data).encode() ).decode() return token_str def validate_token(self, token: str) -> tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ Validate a confirmation token. Args: token: Token string to validate Returns: (is_valid, payload, error_message) """ try: import base64 # Decode token token_data = json.loads(base64.urlsafe_b64decode(token.encode()).decode()) payload = token_data["payload"] signature = token_data["signature"] # Verify signature payload_str = json.dumps(payload, sort_keys=True) expected_signature = hmac.new( self.secret_key.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected_signature): return False, None, "Invalid token signature" # Check expiration expires_at = datetime.fromisoformat(payload["expires_at"]) if datetime.now() > expires_at: return False, None, "Token expired" return True, payload, None except Exception as e: return False, None, f"Token validation error: {e}" def verify_action(self, token: str, tool_name: str, parameters: Dict[str, Any]) -> tuple[bool, Optional[str]]: """ Verify that token matches the intended action. Args: token: Confirmation token tool_name: Expected tool name parameters: Expected parameters Returns: (is_valid, error_message) """ is_valid, payload, error = self.validate_token(token) if not is_valid: return False, error # Check tool name matches if payload["tool_name"] != tool_name: return False, f"Token tool name mismatch: expected {tool_name}, got {payload['tool_name']}" # Check parameters match (simplified - could be more sophisticated) token_params = payload["parameters"] for key, value in parameters.items(): if key not in token_params or token_params[key] != value: return False, f"Token parameter mismatch for {key}" return True, None # Global token instance _token = ConfirmationToken() def get_token_system() -> ConfirmationToken: """Get the global token system instance.""" return _token