""" Boundary enforcement policy. Enforces separation between work and family agents through: - Path whitelisting - Config separation - Network-level checks - Static policy validation """ import logging from pathlib import Path from typing import List, Optional, Set from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class BoundaryPolicy: """Policy for agent boundaries.""" agent_type: str # "work" or "family" allowed_paths: Set[Path] forbidden_paths: Set[Path] allowed_networks: Set[str] # IP ranges or network names forbidden_networks: Set[str] allowed_tools: Set[str] forbidden_tools: Set[str] class BoundaryEnforcer: """Enforces boundaries between work and family agents.""" def __init__(self): """Initialize boundary enforcer with policies.""" # Base paths self.base_dir = Path(__file__).parent.parent.parent # Family agent policy self.family_policy = BoundaryPolicy( agent_type="family", allowed_paths={ self.base_dir / "data" / "tasks" / "home", self.base_dir / "data" / "notes" / "home", self.base_dir / "data" / "conversations.db", self.base_dir / "data" / "timers.db", }, forbidden_paths={ # Work-related paths (if they exist) self.base_dir.parent / "work-repos", # Example self.base_dir / "data" / "work", # If work data exists }, allowed_networks={ "localhost", "127.0.0.1", "10.0.0.0/8", # Local network (can be restricted further) }, forbidden_networks={ # Work-specific networks (if configured) }, allowed_tools={ "get_current_time", "get_date", "get_timezone_info", "convert_timezone", "get_weather", "create_timer", "create_reminder", "list_timers", "cancel_timer", "add_task", "update_task_status", "list_tasks", "create_note", "read_note", "append_to_note", "search_notes", "list_notes", }, forbidden_tools={ # Work-specific tools (if any) } ) # Work agent policy self.work_policy = BoundaryPolicy( agent_type="work", allowed_paths={ # Work agent can access more paths self.base_dir / "data" / "tasks" / "home", # Can read home tasks self.base_dir / "data" / "work", # Work-specific data }, forbidden_paths={ # Work agent should not modify family data self.base_dir / "data" / "notes" / "home", # Family notes }, allowed_networks={ "localhost", "127.0.0.1", "10.0.0.0/8", "10.0.30.63", # GPU VM }, forbidden_networks=set(), allowed_tools={ # Work agent can use all tools }, forbidden_tools=set() ) def check_path_access(self, agent_type: str, path: Path) -> tuple[bool, str]: """ Check if agent can access a path. Args: agent_type: "work" or "family" path: Path to check Returns: (allowed, reason) """ policy = self.family_policy if agent_type == "family" else self.work_policy # Resolve path try: resolved_path = path.resolve() except Exception as e: return False, f"Invalid path: {e}" # Check forbidden paths first for forbidden in policy.forbidden_paths: try: if resolved_path.is_relative_to(forbidden.resolve()): return False, f"Path is in forbidden area: {forbidden}" except Exception: continue # Check allowed paths for allowed in policy.allowed_paths: try: if resolved_path.is_relative_to(allowed.resolve()): return True, f"Path is in allowed area: {allowed}" except Exception: continue # Default: deny for family agent, allow for work agent if agent_type == "family": return False, "Path not in allowed whitelist for family agent" else: return True, "Work agent has broader access" def check_tool_access(self, agent_type: str, tool_name: str) -> tuple[bool, str]: """ Check if agent can use a tool. Args: agent_type: "work" or "family" tool_name: Name of tool Returns: (allowed, reason) """ policy = self.family_policy if agent_type == "family" else self.work_policy # Check forbidden tools if tool_name in policy.forbidden_tools: return False, f"Tool '{tool_name}' is forbidden for {agent_type} agent" # Check allowed tools (if specified) if policy.allowed_tools and tool_name not in policy.allowed_tools: return False, f"Tool '{tool_name}' is not in allowed list for {agent_type} agent" return True, f"Tool '{tool_name}' is allowed for {agent_type} agent" def check_network_access(self, agent_type: str, target: str) -> tuple[bool, str]: """ Check if agent can access a network target. Args: agent_type: "work" or "family" target: Network target (IP, hostname, or network range) Returns: (allowed, reason) """ policy = self.family_policy if agent_type == "family" else self.work_policy # Check forbidden networks for forbidden in policy.forbidden_networks: if self._matches_network(target, forbidden): return False, f"Network '{target}' is forbidden for {agent_type} agent" # Check allowed networks for allowed in policy.allowed_networks: if self._matches_network(target, allowed): return True, f"Network '{target}' is allowed for {agent_type} agent" # Default: deny for family agent, allow for work agent if agent_type == "family": return False, f"Network '{target}' is not in allowed whitelist for family agent" else: return True, "Work agent has broader network access" def _matches_network(self, target: str, network: str) -> bool: """Check if target matches network pattern.""" # Simple matching - can be enhanced with proper CIDR matching if network == target: return True if network.endswith("/8") and target.startswith(network.split("/")[0].rsplit(".", 1)[0]): return True return False def validate_config_separation(self, agent_type: str, config_path: Path) -> tuple[bool, str]: """ Validate that config is properly separated. Args: agent_type: "work" or "family" config_path: Path to config file Returns: (valid, reason) """ # Family agent config should not contain work-related paths if agent_type == "family": config_content = config_path.read_text() work_indicators = ["work-repos", "work/", "work_"] for indicator in work_indicators: if indicator in config_content: return False, f"Family config contains work indicator: {indicator}" return True, "Config separation validated" # Global enforcer instance _enforcer = BoundaryEnforcer() def get_enforcer() -> BoundaryEnforcer: """Get the global boundary enforcer instance.""" return _enforcer