feat(agent): tool profiles and LLM router (rebased on docker/merge base)

- tools.toolProfiles / tools.toolRouting in config; filter tools per turn
- Router picks profile; expandOnMissingTool widens to full registry once
- Wire gateway and CLI AgentLoop; ToolRegistry.get_definitions_subset
- Ruff: fix tool_routing exception handling and format touched files

Made-with: Cursor
This commit is contained in:
tanyar09 2026-03-27 14:30:15 -04:00
parent a2ae3f0cea
commit d50183c3d7
6 changed files with 375 additions and 108 deletions

View File

@ -22,7 +22,7 @@ from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import ExecToolConfig
from nanobot.config.schema import ExecToolConfig, ToolRoutingConfig
from nanobot.cron.service import CronService
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
@ -56,6 +56,9 @@ class AgentLoop:
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
tool_profiles: dict | None = None,
default_tool_profile: str = "default",
tool_routing: ToolRoutingConfig | None = None,
):
self.bus = bus
self.provider = provider
@ -89,6 +92,9 @@ class AgentLoop:
self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._tool_profiles: dict = tool_profiles or {}
self._default_tool_profile = default_tool_profile
self._tool_routing = tool_routing or ToolRoutingConfig()
self._register_default_tools()
def _register_default_tools(self) -> None:
@ -194,6 +200,41 @@ class AgentLoop:
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
@staticmethod
def _extract_routing_text(messages: list[dict]) -> str:
"""Last user message text (string or multimodal) for the tool-profile router."""
for m in reversed(messages):
if m.get("role") != "user":
continue
c = m.get("content")
if isinstance(c, str):
return c.strip()
if isinstance(c, list):
parts: list[str] = []
for block in c:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(str(block.get("text") or ""))
return "\n".join(parts).strip()
return ""
async def _pick_tool_profile(self, user_text: str) -> str:
"""Resolve profile key when tools.toolProfiles is configured."""
if not self._tool_profiles:
return self._default_tool_profile
if self._tool_routing.enabled:
from nanobot.agent.tool_routing import route_tool_profile
return await route_tool_profile(
self.provider,
model=self.model,
user_message=user_text,
profiles=self._tool_profiles,
default_profile=self._default_tool_profile,
temperature=self._tool_routing.router_temperature,
max_tokens=self._tool_routing.router_max_tokens,
)
return self._default_tool_profile
async def _run_agent_loop(
self,
initial_messages: list[dict],
@ -214,15 +255,41 @@ class AgentLoop:
final_content = None
tools_used: list[str] = []
from nanobot.agent.tool_profiles import compute_allowed_tool_names
from nanobot.agent.tool_routing import is_tool_not_found_error
tools_full = self.tools.get_definitions()
tools_expanded = False
allowed_names: set[str] | None = None
if self._tool_profiles:
routing_text = self._extract_routing_text(initial_messages)
profile_key = await self._pick_tool_profile(routing_text)
prof = self._tool_profiles[profile_key]
always = set(self._tool_routing.always_include_tools)
allowed_names = compute_allowed_tool_names(
self.tools,
prof,
list(self._mcp_servers.keys()),
always,
)
logger.info(
f"Tool profile '{profile_key}': {len(allowed_names)}/{len(self.tools)} tools exposed"
)
while iteration < self.max_iterations:
iteration += 1
logger.debug(f"Agent loop iteration {iteration}/{self.max_iterations}, calling LLM provider...")
if allowed_names is not None and not tools_expanded:
tool_defs = self.tools.get_definitions_subset(allowed_names)
else:
tool_defs = tools_full
try:
response = await asyncio.wait_for(
self.provider.chat(
messages=messages,
tools=self.tools.get_definitions(),
tools=tool_defs,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
@ -264,6 +331,16 @@ class AgentLoop:
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments)
logger.info(f"Tool result length: {len(result) if result else 0}, preview: {result[:200] if result else 'None'}")
if (
allowed_names is not None
and self._tool_routing.expand_on_missing_tool
and not tools_expanded
and is_tool_not_found_error(result)
):
tools_expanded = True
logger.info(
"Expanded tool set to full registry (missing tool after profile filter)"
)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)

View File

@ -0,0 +1,59 @@
"""Tool profile: compute which tools are visible to the LLM for a given config profile."""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.config.schema import ToolProfileConfig
def mcp_server_for_tool(tool_name: str, mcp_server_keys: list[str]) -> str | None:
"""
Infer MCP server config key from nanobot's tool name pattern mcp_<serverKey>_<mcpToolName>.
Server keys are matched longest-first so names with underscores resolve unambiguously.
"""
prefix = "mcp_"
if not tool_name.startswith(prefix):
return None
rest = tool_name[len(prefix) :]
for key in sorted(mcp_server_keys, key=len, reverse=True):
sep = f"{key}_"
if rest.startswith(sep):
return key
return None
def compute_allowed_tool_names(
registry: ToolRegistry,
profile: ToolProfileConfig,
mcp_server_keys: list[str],
always_include: set[str],
) -> set[str]:
"""Union of profile-filtered builtins + MCP tools + always-include (intersected with registered names)."""
all_names = set(registry.tool_names)
mcp_keys = list(mcp_server_keys)
builtins = {n for n in all_names if mcp_server_for_tool(n, mcp_keys) is None}
if profile.builtin_tools is None:
allowed_builtins = set(builtins)
else:
allowed_builtins = set(profile.builtin_tools) & builtins
if profile.mcp_servers is None:
allowed_mcp = {
n for n in all_names if mcp_server_for_tool(n, mcp_keys) is not None
}
else:
allow_srv = set(profile.mcp_servers)
allowed_mcp = {
n
for n in all_names
if (srv := mcp_server_for_tool(n, mcp_keys)) is not None and srv in allow_srv
}
extras = always_include & all_names
return allowed_builtins | allowed_mcp | extras

View File

@ -0,0 +1,82 @@
"""LLM-based router: choose a tools.toolProfiles key from the user message."""
from __future__ import annotations
import json_repair
from loguru import logger
from nanobot.config.schema import ToolProfileConfig
from nanobot.providers.base import LLMProvider
async def route_tool_profile(
provider: LLMProvider,
*,
model: str,
user_message: str,
profiles: dict[str, ToolProfileConfig],
default_profile: str,
temperature: float = 0.2,
max_tokens: int = 128,
) -> str:
"""
Ask a small LLM call to return JSON {"profile": "<key>"}.
Falls back to default_profile on any failure or unknown key.
"""
if not profiles:
return default_profile
lines = []
for name, p in profiles.items():
desc = (p.description or "").strip() or "(no description)"
lines.append(f"- {name}: {desc}")
catalog = "\n".join(lines)
allowed = ", ".join(f'"{k}"' for k in profiles)
system = (
"You are a tool-profile router. Pick exactly one profile key for the assistant's next turn. "
"Respond with JSON only: {\"profile\": \"<key>\"} where <key> is one of: "
f"{allowed}. "
"Prefer narrower profiles when the request is clearly scoped (e.g. only read files). "
"Use the broadest profile only when multiple unrelated capabilities are needed."
)
user = f"Available profiles:\n{catalog}\n\nUser message:\n{user_message.strip()[:8000]}"
try:
response = await provider.chat(
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
tools=None,
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
text = (response.content or "").strip()
if not text:
return default_profile
if text.startswith("```"):
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
data = json_repair.loads(text)
if not isinstance(data, dict):
return default_profile
name = data.get("profile")
if isinstance(name, str) and name in profiles:
logger.info(f"Tool router selected profile '{name}'")
return name
logger.warning(f"Tool router returned invalid profile {name!r}, using default")
except (TypeError, ValueError) as e:
logger.warning(f"Tool router JSON parse failed: {e}")
except Exception as e:
logger.warning(f"Tool router failed: {e}")
return default_profile
def is_tool_not_found_error(result: str) -> bool:
"""Detect registry execute() message for missing tools."""
if not result:
return False
return result.startswith("Error: Tool '") and "' not found" in result

View File

@ -8,44 +8,52 @@ from nanobot.agent.tools.base import Tool
class ToolRegistry:
"""
Registry for agent tools.
Allows dynamic registration and execution of tools.
"""
def __init__(self):
self._tools: dict[str, Tool] = {}
def register(self, tool: Tool) -> None:
"""Register a tool."""
self._tools[tool.name] = tool
def unregister(self, name: str) -> None:
"""Unregister a tool by name."""
self._tools.pop(name, None)
def get(self, name: str) -> Tool | None:
"""Get a tool by name."""
return self._tools.get(name)
def has(self, name: str) -> bool:
"""Check if a tool is registered."""
return name in self._tools
def get_definitions(self) -> list[dict[str, Any]]:
"""Get all tool definitions in OpenAI format."""
return [tool.to_schema() for tool in self._tools.values()]
def get_definitions_subset(self, names: set[str]) -> list[dict[str, Any]]:
"""Tool definitions for the given names only (preserves registration order)."""
out: list[dict[str, Any]] = []
for key, tool in self._tools.items():
if key in names:
out.append(tool.to_schema())
return out
async def execute(self, name: str, params: dict[str, Any]) -> str:
"""
Execute a tool by name with given parameters.
Args:
name: Tool name.
params: Tool parameters.
Returns:
Tool execution result as string.
Raises:
KeyError: If tool not found.
"""
@ -62,14 +70,14 @@ class ToolRegistry:
return await tool.execute(**coerced_params)
except Exception as e:
return f"Error executing {name}: {str(e)}"
@property
def tool_names(self) -> list[str]:
"""Get list of registered tool names."""
return list(self._tools.keys())
def __len__(self) -> int:
return len(self._tools)
def __contains__(self, name: str) -> bool:
return name in self._tools

View File

@ -2,23 +2,22 @@
import asyncio
import os
import signal
from pathlib import Path
import select
import signal
import sys
from pathlib import Path
import typer
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from rich.console import Console
from rich.markdown import Markdown
from rich.table import Table
from rich.text import Text
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from nanobot import __version__, __logo__
from nanobot import __logo__, __version__
from nanobot.config.schema import Config
app = typer.Typer(
@ -159,9 +158,9 @@ def onboard():
from nanobot.config.loader import get_config_path, load_config, save_config
from nanobot.config.schema import Config
from nanobot.utils.helpers import get_workspace_path
config_path = get_config_path()
if config_path.exists():
console.print(f"[yellow]Config already exists at {config_path}[/yellow]")
console.print(" [bold]y[/bold] = overwrite with defaults (existing values will be lost)")
@ -177,17 +176,17 @@ def onboard():
else:
save_config(Config())
console.print(f"[green]✓[/green] Created config at {config_path}")
# Create workspace
workspace = get_workspace_path()
if not workspace.exists():
workspace.mkdir(parents=True, exist_ok=True)
console.print(f"[green]✓[/green] Created workspace at {workspace}")
# Create default bootstrap files
_create_workspace_templates(workspace)
console.print(f"\n{__logo__} nanobot is ready!")
console.print("\nNext steps:")
console.print(" 1. Add your API key to [cyan]~/.nanobot/config.json[/cyan]")
@ -239,13 +238,13 @@ Information about the user goes here.
- Language: (your preferred language)
""",
}
for filename, content in templates.items():
file_path = workspace / filename
if not file_path.exists():
file_path.write_text(content)
console.print(f" [dim]Created {filename}[/dim]")
# Create memory directory and MEMORY.md
memory_dir = workspace / "memory"
memory_dir.mkdir(exist_ok=True)
@ -268,7 +267,7 @@ This file stores important information that should persist across sessions.
(Things to remember)
""")
console.print(" [dim]Created memory/MEMORY.md[/dim]")
history_file = memory_dir / "HISTORY.md"
if not history_file.exists():
history_file.write_text("")
@ -281,9 +280,9 @@ This file stores important information that should persist across sessions.
def _make_provider(config: Config):
"""Create the appropriate LLM provider from config."""
from nanobot.providers.custom_provider import CustomProvider
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import OpenAICodexProvider
from nanobot.providers.custom_provider import CustomProvider
model = config.agents.defaults.model
provider_name = config.get_provider_name(model)
@ -310,7 +309,7 @@ def _make_provider(config: Config):
airllm_config = getattr(config.providers, "airllm", None)
model_path = None
compression = None
# Try to get model from airllm config's api_key field (repurposed as model path)
# or from the default model
if airllm_config and airllm_config.api_key:
@ -325,7 +324,7 @@ def _make_provider(config: Config):
else:
model_path = model
hf_token = None
# Check for compression setting in extra_headers or api_base
if airllm_config:
if airllm_config.api_base:
@ -335,7 +334,7 @@ def _make_provider(config: Config):
# Check for HF token in extra_headers
if not hf_token and airllm_config.extra_headers and "hf_token" in airllm_config.extra_headers:
hf_token = airllm_config.extra_headers["hf_token"]
return AirLLMProvider(
api_key=airllm_config.api_key if airllm_config else None,
api_base=compression if compression else None,
@ -375,30 +374,30 @@ def gateway(
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
):
"""Start the nanobot gateway."""
from nanobot.config.loader import load_config, get_data_dir
from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
from nanobot.channels.manager import ChannelManager
from nanobot.session.manager import SessionManager
from nanobot.config.loader import get_data_dir, load_config
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService
from nanobot.session.manager import SessionManager
if verbose:
import logging
logging.basicConfig(level=logging.DEBUG)
console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
config = load_config()
bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
# Create cron service first (callback set after agent creation)
cron_store_path = get_data_dir() / "cron" / "jobs.json"
cron = CronService(cron_store_path)
# Create agent with cron service
agent = AgentLoop(
bus=bus,
@ -415,8 +414,11 @@ def gateway(
restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
mcp_servers=config.tools.mcp_servers,
tool_profiles=config.tools.tool_profiles,
default_tool_profile=config.tools.default_tool_profile,
tool_routing=config.tools.tool_routing,
)
# Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent."""
@ -449,33 +451,33 @@ def gateway(
))
return response
cron.on_job = on_cron_job
# Create heartbeat service
async def on_heartbeat(prompt: str) -> str:
"""Execute heartbeat through the agent."""
return await agent.process_direct(prompt, session_key="heartbeat")
heartbeat = HeartbeatService(
workspace=config.workspace_path,
on_heartbeat=on_heartbeat,
interval_s=30 * 60, # 30 minutes
enabled=True
)
# Create channel manager
channels = ChannelManager(config, bus)
if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
else:
console.print("[yellow]Warning: No channels enabled[/yellow]")
cron_status = cron.status()
if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
console.print(f"[green]✓[/green] Heartbeat: every 30m")
console.print("[green]✓[/green] Heartbeat: every 30m")
async def run():
try:
await cron.start()
@ -492,7 +494,7 @@ def gateway(
cron.stop()
agent.stop()
await channels.stop_all()
asyncio.run(run())
@ -511,15 +513,16 @@ def agent(
logs: bool = typer.Option(False, "--logs/--no-logs", help="Show nanobot runtime logs during chat"),
):
"""Interact with the agent directly."""
from nanobot.config.loader import load_config, get_data_dir
from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop
from nanobot.cron.service import CronService
from loguru import logger
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
from nanobot.config.loader import get_data_dir, load_config
from nanobot.cron.service import CronService
# Load config (this also loads .env file into environment)
config = load_config()
bus = MessageBus()
provider = _make_provider(config)
@ -531,7 +534,7 @@ def agent(
logger.enable("nanobot")
else:
logger.disable("nanobot")
agent_loop = AgentLoop(
bus=bus,
provider=provider,
@ -546,8 +549,11 @@ def agent(
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
mcp_servers=config.tools.mcp_servers,
tool_profiles=config.tools.tool_profiles,
default_tool_profile=config.tools.default_tool_profile,
tool_routing=config.tools.tool_routing,
)
# Show spinner when logs are off (no output to miss); skip when logs are on
def _thinking_ctx():
if logs:
@ -573,7 +579,7 @@ def agent(
console.print(f"[red]Error: {e}[/red]")
console.print(f"[dim]{traceback.format_exc()}[/dim]")
raise
asyncio.run(run_once())
else:
# Interactive mode
@ -586,7 +592,7 @@ def agent(
os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive():
try:
while True:
@ -601,7 +607,7 @@ def agent(
_restore_terminal()
console.print("\nGoodbye!")
break
with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
@ -615,7 +621,7 @@ def agent(
break
finally:
await agent_loop.close_mcp()
asyncio.run(run_interactive())
@ -672,7 +678,7 @@ def channels_status():
"" if mc.enabled else "",
mc_base
)
# Telegram
tg = config.channels.telegram
tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
@ -698,57 +704,57 @@ def _get_bridge_dir() -> Path:
"""Get the bridge directory, setting it up if needed."""
import shutil
import subprocess
# User's bridge location
user_bridge = Path.home() / ".nanobot" / "bridge"
# Check if already built
if (user_bridge / "dist" / "index.js").exists():
return user_bridge
# Check for npm
if not shutil.which("npm"):
console.print("[red]npm not found. Please install Node.js >= 18.[/red]")
raise typer.Exit(1)
# Find source bridge: first check package data, then source dir
pkg_bridge = Path(__file__).parent.parent / "bridge" # nanobot/bridge (installed)
src_bridge = Path(__file__).parent.parent.parent / "bridge" # repo root/bridge (dev)
source = None
if (pkg_bridge / "package.json").exists():
source = pkg_bridge
elif (src_bridge / "package.json").exists():
source = src_bridge
if not source:
console.print("[red]Bridge source not found.[/red]")
console.print("Try reinstalling: pip install --force-reinstall nanobot")
raise typer.Exit(1)
console.print(f"{__logo__} Setting up bridge...")
# Copy to user directory
user_bridge.parent.mkdir(parents=True, exist_ok=True)
if user_bridge.exists():
shutil.rmtree(user_bridge)
shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
# Install and build
try:
console.print(" Installing dependencies...")
subprocess.run(["npm", "install"], cwd=user_bridge, check=True, capture_output=True)
console.print(" Building...")
subprocess.run(["npm", "run", "build"], cwd=user_bridge, check=True, capture_output=True)
console.print("[green]✓[/green] Bridge ready\n")
except subprocess.CalledProcessError as e:
console.print(f"[red]Build failed: {e}[/red]")
if e.stderr:
console.print(f"[dim]{e.stderr.decode()[:500]}[/dim]")
raise typer.Exit(1)
return user_bridge
@ -756,18 +762,19 @@ def _get_bridge_dir() -> Path:
def channels_login():
"""Link device via QR code."""
import subprocess
from nanobot.config.loader import load_config
config = load_config()
bridge_dir = _get_bridge_dir()
console.print(f"{__logo__} Starting bridge...")
console.print("Scan the QR code to connect.\n")
env = {**os.environ}
if config.channels.whatsapp.bridge_token:
env["BRIDGE_TOKEN"] = config.channels.whatsapp.bridge_token
try:
subprocess.run(["npm", "start"], cwd=bridge_dir, check=True, env=env)
except subprocess.CalledProcessError as e:
@ -791,23 +798,23 @@ def cron_list(
"""List scheduled jobs."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
jobs = service.list_jobs(include_disabled=all)
if not jobs:
console.print("No scheduled jobs.")
return
table = Table(title="Scheduled Jobs")
table.add_column("ID", style="cyan")
table.add_column("Name")
table.add_column("Schedule")
table.add_column("Status")
table.add_column("Next Run")
import time
from datetime import datetime as _dt
from zoneinfo import ZoneInfo
@ -819,7 +826,7 @@ def cron_list(
sched = f"{job.schedule.expr or ''} ({job.schedule.tz})" if job.schedule.tz else (job.schedule.expr or "")
else:
sched = "one-time"
# Format next run
next_run = ""
if job.state.next_run_at_ms:
@ -829,11 +836,11 @@ def cron_list(
next_run = _dt.fromtimestamp(ts, tz).strftime("%Y-%m-%d %H:%M")
except Exception:
next_run = time.strftime("%Y-%m-%d %H:%M", time.localtime(ts))
status = "[green]enabled[/green]" if job.enabled else "[dim]disabled[/dim]"
table.add_row(job.id, job.name, sched, status, next_run)
console.print(table)
@ -853,7 +860,7 @@ def cron_add(
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
from nanobot.cron.types import CronSchedule
if tz and not cron_expr:
console.print("[red]Error: --tz can only be used with --cron[/red]")
raise typer.Exit(1)
@ -870,10 +877,10 @@ def cron_add(
else:
console.print("[red]Error: Must specify --every, --cron, or --at[/red]")
raise typer.Exit(1)
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
job = service.add_job(
name=name,
schedule=schedule,
@ -882,7 +889,7 @@ def cron_add(
to=to,
channel=channel,
)
console.print(f"[green]✓[/green] Added job '{job.name}' ({job.id})")
@ -893,10 +900,10 @@ def cron_remove(
"""Remove a scheduled job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
if service.remove_job(job_id):
console.print(f"[green]✓[/green] Removed job {job_id}")
else:
@ -911,10 +918,10 @@ def cron_enable(
"""Enable or disable a job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
job = service.enable_job(job_id, enabled=not disable)
if job:
status = "disabled" if disable else "enabled"
@ -931,15 +938,15 @@ def cron_run(
"""Manually run a job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
async def run():
return await service.run_job(job_id, force=force)
if asyncio.run(run()):
console.print(f"[green]✓[/green] Job executed")
console.print("[green]✓[/green] Job executed")
else:
console.print(f"[red]Failed to run job {job_id}[/red]")
@ -952,7 +959,7 @@ def cron_run(
@app.command()
def status():
"""Show nanobot status."""
from nanobot.config.loader import load_config, get_config_path
from nanobot.config.loader import get_config_path, load_config
config_path = get_config_path()
config = load_config()
@ -967,7 +974,7 @@ def status():
from nanobot.providers.registry import PROVIDERS
console.print(f"Model: {config.agents.defaults.model}")
# Check API keys from registry
for spec in PROVIDERS:
p = getattr(config.providers, spec.name, None)

View File

@ -1,7 +1,8 @@
"""Configuration schema using Pydantic."""
from pathlib import Path
from pydantic import BaseModel, Field, ConfigDict
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings
@ -270,6 +271,26 @@ class MCPServerConfig(Base):
url: str = "" # HTTP: streamable HTTP endpoint URL
class ToolProfileConfig(Base):
"""Subset of tools exposed to the LLM when this profile is active."""
description: str = "" # Shown to the router model when toolRouting is enabled
builtin_tools: list[str] | None = None # None = all non-MCP tools; [] = none (except always-include)
mcp_servers: list[str] | None = None # None = all configured MCP servers; [] = no MCP tools
class ToolRoutingConfig(Base):
"""Optional LLM router that picks a tool profile from the user message (phase 2)."""
enabled: bool = False
router_temperature: float = 0.2
router_max_tokens: int = 128
# Always merged into the allowed set (if registered), e.g. channel reply + subagent spawn
always_include_tools: list[str] = Field(default_factory=lambda: ["message", "spawn"])
# If the model calls a missing tool, retry the loop once with all tools registered
expand_on_missing_tool: bool = True
class ToolsConfig(Base):
"""Tools configuration."""
@ -278,6 +299,19 @@ class ToolsConfig(Base):
calendar: CalendarConfig = Field(default_factory=CalendarConfig)
restrict_to_workspace: bool = True # If true, restrict all tool access to workspace directory
mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict)
tool_profiles: dict[str, ToolProfileConfig] = Field(default_factory=dict)
default_tool_profile: str = "default"
tool_routing: ToolRoutingConfig = Field(default_factory=ToolRoutingConfig)
@model_validator(mode="after")
def _tool_profiles_consistent(self) -> "ToolsConfig":
if self.tool_profiles and self.default_tool_profile not in self.tool_profiles:
raise ValueError(
f"defaultToolProfile '{self.default_tool_profile}' is missing from tools.toolProfiles"
)
if self.tool_routing.enabled and not self.tool_profiles:
raise ValueError("toolRouting.enabled requires a non-empty tools.toolProfiles map")
return self
class Config(BaseSettings):