Agent: tool profiles and profile-scoped MCP connections

- Extend tool profile helpers for MCP server key resolution and filtering
- Lazily connect/disconnect MCP servers per active profile in AgentLoop
- Harden MCP client (timeouts, tool naming, connect_mcp_server entry)
- Adjust context and tool modules to align with profile-aware tooling
- docker-compose: minor gateway/workspace notes

Made-with: Cursor
This commit is contained in:
tanyar09 2026-03-30 13:27:46 -04:00
parent 7901f090f9
commit a6bd3e0e9b
7 changed files with 278 additions and 91 deletions

View File

@ -4,6 +4,8 @@ x-common-config: &common-config
dockerfile: Dockerfile
volumes:
- ~/.nanobot:/root/.nanobot
# Host repo ./workspace → /workspace in container. Set agents.defaults.workspace to /workspace.
- ./workspace:/workspace
services:
nanobot-gateway:

View File

@ -101,6 +101,12 @@ Your workspace is at: {workspace_path}
- History log: {workspace_path}/memory/HISTORY.md (grep-searchable)
- Custom skills: {workspace_path}/skills/{{skill-name}}/SKILL.md
**Filesystem tools (read_file, write_file, edit_file, list_dir):** Use paths **under this workspace root only** (`{workspace_path}`). Do not invent other roots (e.g. `/mnt/data/...` on a host) unless you know they are valid on this runtime. **`list_dir` takes one directory path**no wildcards (never pass `*.pdf` in the path). To find PDFs, `list_dir("{workspace_path}")` (or a subfolder) and filter for `.pdf` names, or use `exec` with `find` under that directory.
**Answering after tools:** When a tool already returned what the user needs, base your reply **only on that tool output**same topic as the users question, no hijacking.
- After **`list_dir`:** If they asked for PDFs (or another extension), list **only** matching names (paths under `{workspace_path}` if useful). If none, say so briefly. No essays, no calling the folder "code" unless they asked for analysis.
- After **`read_emails`:** Answer **only** from the email text the tool returned (From, Subject, Date, attachments, downloaded paths, body as needed). Do **not** switch to unrelated topics (Git, Gitea, this repo, workspace docs, coding help, general chit-chat). Do **not** apologize at length or describe "what an email is". Match the question: e.g. latest email sender + subject (+ date) in a few lines unless they asked for the full body.
## Gitea API (This Repository)
**CRITICAL**: This repository uses Gitea at `http://10.0.30.169:3000/api/v1`, NOT GitHub.
- Repository: `ilia/nanobot`
@ -120,7 +126,7 @@ Always be helpful, accurate, and concise. Before calling tools, briefly tell the
When remembering something important, write to {workspace_path}/memory/MEMORY.md
To recall past events, grep {workspace_path}/memory/HISTORY.md
IMPORTANT: For email queries (latest email, email sender, inbox, etc.), ALWAYS use the read_emails tool. NEVER use exec() with mail/tail/awk commands or read_file() on /var/mail - those will not work. The read_emails tool is the only way to access emails."""
IMPORTANT: For email queries (latest email, email sender, inbox, etc.), ALWAYS use the read_emails tool. NEVER use exec() with mail/tail/awk commands or read_file() on /var/mail - those will not work. The read_emails tool is the only way to access emails. Once read_emails returns, your assistant reply must **only** satisfy that email question from the tool resultignore Gitea/workspace/bootstrap content unless the user tied their question to it."""
def _load_bootstrap_files(self) -> str:
"""Load all bootstrap files from workspace."""

View File

@ -90,8 +90,8 @@ class AgentLoop:
self._running = False
self._mcp_servers = mcp_servers or {}
self._mcp_stack: AsyncExitStack | None = None
self._mcp_connected = False
self._mcp_stacks: dict[str, AsyncExitStack] = {}
self._mcp_connected_servers: set[str] = set()
self._tool_profiles: dict = tool_profiles or {}
self._default_tool_profile = default_tool_profile
self._tool_routing = tool_routing or ToolRoutingConfig()
@ -135,7 +135,10 @@ class AgentLoop:
from nanobot.config.loader import load_config
config = load_config()
if config.channels.email.enabled:
email_tool = EmailTool(email_config=config.channels.email)
email_tool = EmailTool(
email_config=config.channels.email,
workspace=self.workspace,
)
self.tools.register(email_tool)
logger.info(f"Email tool '{email_tool.name}' registered successfully")
else:
@ -159,15 +162,69 @@ class AgentLoop:
logger.warning(f"Calendar tool not available: {e}")
# Calendar tool not available or not configured - silently skip
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or not self._mcp_servers:
def _unregister_mcp_tools_for_server(self, server_key: str) -> None:
"""Remove tools registered from one MCP server (prefix mcp_<key>_)."""
prefix = f"mcp_{server_key}_"
for name in list(self.tools.tool_names):
if name.startswith(prefix):
self.tools.unregister(name)
async def _disconnect_mcp_server(self, server_key: str) -> None:
"""Close one MCP server and remove its tools (used when switching tool profiles)."""
stack = self._mcp_stacks.pop(server_key, None)
if stack is not None:
try:
await stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass
self._unregister_mcp_tools_for_server(server_key)
self._mcp_connected_servers.discard(server_key)
logger.info(f"MCP server '{server_key}': disconnected")
async def _sync_mcp_to_profile_needs(self, needed_keys: list[str]) -> None:
"""
Ensure only MCP servers in needed_keys are connected: tear down extras, connect missing.
When tools.toolProfiles is empty, pass the full configured key list so all servers stay up.
"""
if not self._mcp_servers:
return
self._mcp_connected = True
from nanobot.agent.tools.mcp import connect_mcp_servers
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
needed = set(needed_keys)
for key in list(self._mcp_connected_servers):
if key not in needed:
await self._disconnect_mcp_server(key)
connect_order = [k for k in self._mcp_servers.keys() if k in needed]
await self._ensure_mcp_servers_connected(connect_order)
async def _ensure_mcp_servers_connected(self, server_keys: list[str]) -> None:
"""Lazily connect MCP servers (each gets its own AsyncExitStack for per-server teardown)."""
if not self._mcp_servers or not server_keys:
return
pending = [
k
for k in server_keys
if k in self._mcp_servers and k not in self._mcp_connected_servers
]
if not pending:
return
from nanobot.agent.tools.mcp import connect_mcp_server
for key in pending:
stack = AsyncExitStack()
await stack.__aenter__()
try:
await connect_mcp_server(
key, self._mcp_servers[key], self.tools, stack
)
self._mcp_stacks[key] = stack
self._mcp_connected_servers.add(key)
except Exception as e:
logger.error(f"MCP server '{key}': failed to connect: {e}")
try:
await stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass
def _set_tool_context(self, channel: str, chat_id: str) -> None:
"""Update context for all tools that need routing info."""
@ -255,26 +312,37 @@ class AgentLoop:
final_content = None
tools_used: list[str] = []
from nanobot.agent.tool_profiles import compute_allowed_tool_names
from nanobot.agent.tool_profiles import (
compute_allowed_tool_names,
mcp_keys_to_connect,
)
from nanobot.agent.tool_routing import is_tool_not_found_error
tools_full = self.tools.get_definitions()
configured_mcp = list(self._mcp_servers.keys())
tools_expanded = False
allowed_names: set[str] | None = None
if self._tool_profiles:
routing_text = self._extract_routing_text(initial_messages)
profile_key = await self._pick_tool_profile(routing_text)
prof = self._tool_profiles[profile_key]
await self._sync_mcp_to_profile_needs(
mcp_keys_to_connect(prof, configured_mcp)
)
always = set(self._tool_routing.always_include_tools)
allowed_names = compute_allowed_tool_names(
self.tools,
prof,
list(self._mcp_servers.keys()),
configured_mcp,
always,
)
logger.info(
f"Tool profile '{profile_key}': {len(allowed_names)}/{len(self.tools)} tools exposed"
)
else:
await self._sync_mcp_to_profile_needs(configured_mcp)
tools_full = self.tools.get_definitions()
while iteration < self.max_iterations:
iteration += 1
@ -338,6 +406,8 @@ class AgentLoop:
and is_tool_not_found_error(result)
):
tools_expanded = True
await self._sync_mcp_to_profile_needs(configured_mcp)
tools_full = self.tools.get_definitions()
logger.info(
"Expanded tool set to full registry (missing tool after profile filter)"
)
@ -358,7 +428,6 @@ class AgentLoop:
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
@ -382,13 +451,13 @@ class AgentLoop:
continue
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass # MCP SDK cancel scope cleanup is noisy but harmless
self._mcp_stack = None
"""Close all MCP connections and drop MCP tools from the registry."""
for key in list(
set(self._mcp_stacks.keys()) | self._mcp_connected_servers
):
await self._disconnect_mcp_server(key)
self._mcp_stacks.clear()
self._mcp_connected_servers.clear()
def stop(self) -> None:
"""Stop the agent loop."""
@ -658,7 +727,6 @@ Respond with ONLY valid JSON, no markdown fences."""
Returns:
The agent's response.
"""
await self._connect_mcp()
msg = InboundMessage(
channel=channel,
sender_id="user",

View File

@ -4,9 +4,12 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from loguru import logger
from nanobot.config.schema import ToolProfileConfig
if TYPE_CHECKING:
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.config.schema import ToolProfileConfig
def mcp_server_for_tool(tool_name: str, mcp_server_keys: list[str]) -> str | None:
@ -26,6 +29,32 @@ def mcp_server_for_tool(tool_name: str, mcp_server_keys: list[str]) -> str | Non
return None
def mcp_keys_to_connect(
profile: ToolProfileConfig, configured_mcp_keys: list[str]
) -> list[str]:
"""
Config keys for MCP servers to connect for this profile, in config order.
None on profile.mcp_servers means all configured servers; [] means none.
Unknown keys in the profile list are logged and skipped.
"""
if not configured_mcp_keys:
return []
configured_set = set(configured_mcp_keys)
if profile.mcp_servers is None:
return list(configured_mcp_keys)
out: list[str] = []
for k in profile.mcp_servers:
if k in configured_set:
out.append(k)
else:
logger.warning(
f"tools.toolProfiles entry references unknown MCP server {k!r}; "
"not in tools.mcpServers keys"
)
return out
def compute_allowed_tool_names(
registry: ToolRegistry,
profile: ToolProfileConfig,

View File

@ -3,6 +3,7 @@
import asyncio
import imaplib
import ssl
from pathlib import Path
from datetime import date
from email import policy
from email.header import decode_header, make_header
@ -36,17 +37,20 @@ class EmailTool(Tool):
"unread_only (bool, default false), mark_seen (bool, default false), download_attachments (bool, default false "
"- set to true to download all attachments to workspace), attachment_name (string, optional - filter emails by "
"attachment filename, case-insensitive partial match). Returns formatted email list with sender, subject, date, "
"attachments (if any), downloaded file paths (if downloaded), and body."
"attachments (if any), downloaded file paths (if downloaded), and body. After you receive this output, your "
"reply to the user must address their email question using only this data—no unrelated topics."
)
def __init__(self, email_config: Any = None):
def __init__(self, email_config: Any = None, workspace: Path | None = None):
"""
Initialize email tool with email configuration.
Args:
email_config: Optional EmailConfig instance. If None, loads from config.
workspace: Directory for downloaded attachments (defaults to config workspace_path).
"""
self._email_config = email_config
self._workspace = workspace
@property
def config(self) -> Any:
@ -315,8 +319,12 @@ class EmailTool(Tool):
if download_attachments and attachments:
import logging
logger = logging.getLogger(__name__)
from pathlib import Path
workspace = Path("/mnt/data/nanobot/workspace")
if self._workspace is not None:
workspace = Path(self._workspace).expanduser().resolve()
else:
from nanobot.config.loader import load_config
workspace = load_config().workspace_path.expanduser().resolve()
workspace.mkdir(parents=True, exist_ok=True)
# Build a map of attachment parts by decoded filename for efficient lookup

View File

@ -30,6 +30,8 @@ class ReadFileTool(Tool):
def description(self) -> str:
return """Read the contents of a file at the given path.
`path` must be a single file path under the configured workspace (no `*` globs).
ALWAYS use this tool to read files - it supports:
- Text files (plain text, code, markdown, etc.)
- PDF files (automatically extracts text using pdftotext)
@ -44,7 +46,7 @@ For reading files, use read_file FIRST. Only use exec for complex data processin
"properties": {
"path": {
"type": "string",
"description": "The file path to read"
"description": "Absolute or workspace-relative path to one file (no wildcards)",
}
},
"required": ["path"]
@ -115,7 +117,7 @@ class WriteFileTool(Tool):
@property
def description(self) -> str:
return "Write content to a file at the given path. Creates parent directories if needed. IMPORTANT: Always provide both 'path' and 'content' parameters. If no full path is specified, use the workspace directory (/mnt/data/nanobot/workspace/)."
return "Write content to a file at the given path. Creates parent directories if needed. IMPORTANT: Always provide both 'path' and 'content' parameters. Paths must be under the workspace root from the system prompt (no globs)."
@property
def parameters(self) -> dict[str, Any]:
@ -219,7 +221,11 @@ class ListDirTool(Tool):
@property
def description(self) -> str:
return "List the contents of a directory."
return (
"List files and subfolders in one directory. "
"`path` must be a directory that exists under the workspace root—no `*` or `*.pdf` wildcards. "
"To list PDFs, list the directory and read names ending in .pdf, or use exec with find."
)
@property
def parameters(self) -> dict[str, Any]:
@ -228,7 +234,7 @@ class ListDirTool(Tool):
"properties": {
"path": {
"type": "string",
"description": "The directory path to list"
"description": "Path to an existing directory under the workspace (no wildcards)",
}
},
"required": ["path"]

View File

@ -1,5 +1,8 @@
"""MCP client: connects to MCP servers and wraps their tools as native nanobot tools."""
import asyncio
import json
import re
from contextlib import AsyncExitStack
from typing import Any
@ -9,15 +12,65 @@ from nanobot.agent.tools.base import Tool
from nanobot.agent.tools.registry import ToolRegistry
_SAFE_TOOL_NAME_RE = re.compile(r"[^A-Za-z0-9_]+")
def _normalize_tool_segment(segment: str) -> str:
"""
Normalize MCP server/tool names into a safe function name segment.
- Replace non [A-Za-z0-9_] with underscore
- Collapse repeated underscores
- Trim leading/trailing underscores
- Ensure non-empty
"""
s = _SAFE_TOOL_NAME_RE.sub("_", (segment or "").strip())
s = re.sub(r"_+", "_", s).strip("_")
return s or "tool"
def _render_mcp_content_blocks(blocks: list[Any]) -> str:
"""Render MCP content blocks into a stable, readable string."""
from mcp import types
parts: list[str] = []
for block in blocks or []:
if isinstance(block, types.TextContent):
parts.append(block.text)
continue
# Prefer structured JSON for non-text blocks when possible.
dump = getattr(block, "model_dump", None)
if callable(dump):
try:
parts.append(json.dumps(dump(), ensure_ascii=False, indent=2))
continue
except Exception:
pass
parts.append(str(block))
return "\n".join([p for p in parts if p is not None]).strip()
class MCPToolWrapper(Tool):
"""Wraps a single MCP server tool as a nanobot Tool."""
def __init__(self, session, server_name: str, tool_def):
def __init__(
self,
session,
*,
server_key: str,
tool_def,
registered_name: str,
call_timeout_s: float = 30.0,
):
self._session = session
self._original_name = tool_def.name
self._name = f"mcp_{server_name}_{tool_def.name}"
self._server_key = server_key
self._name = registered_name
self._description = tool_def.description or tool_def.name
self._parameters = tool_def.inputSchema or {"type": "object", "properties": {}}
self._call_timeout_s = call_timeout_s
@property
def name(self) -> str:
@ -32,48 +85,39 @@ class MCPToolWrapper(Tool):
return self._parameters
async def execute(self, **kwargs: Any) -> str:
from mcp import types
import json
result = await self._session.call_tool(self._original_name, arguments=kwargs)
parts = []
for block in result.content:
if isinstance(block, types.TextContent):
parts.append(block.text)
else:
parts.append(str(block))
output = "\n".join(parts)
try:
result = await asyncio.wait_for(
self._session.call_tool(self._original_name, arguments=kwargs),
timeout=self._call_timeout_s,
)
except asyncio.TimeoutError:
return (
f"Error: MCP tool timed out after {self._call_timeout_s:.0f}s "
f"({self._server_key}:{self._original_name})"
)
# For empty results from search/list operations, provide clearer feedback
if not output or output.strip() == "":
# Check if this is a search/list operation (common patterns)
if "search" in self._original_name.lower() or "list" in self._original_name.lower():
if "unread" in str(kwargs).lower() or "is:unread" in str(kwargs).lower():
return "No unread emails found."
return "No results found."
output = _render_mcp_content_blocks(getattr(result, "content", []))
if not output:
return "(no output)"
# Try to parse JSON to check for empty arrays/lists
# If the tool returned JSON, normalize empty collections to a clearer message.
try:
parsed = json.loads(output)
if isinstance(parsed, list) and len(parsed) == 0:
if "search" in self._original_name.lower() or "list" in self._original_name.lower():
if "unread" in str(kwargs).lower() or "is:unread" in str(kwargs).lower():
return "No unread emails found."
if parsed == [] or parsed == {}:
return "No results found."
except (json.JSONDecodeError, ValueError):
pass # Not JSON, continue with original output
return output or "(no output)"
return output
async def connect_mcp_servers(
mcp_servers: dict, registry: ToolRegistry, stack: AsyncExitStack
async def connect_mcp_server(
name: str, cfg: Any, registry: ToolRegistry, stack: AsyncExitStack
) -> None:
"""Connect to configured MCP servers and register their tools."""
"""Connect one MCP server and register its tools (used for lazy profile-scoped connections)."""
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
for name, cfg in mcp_servers.items():
try:
if cfg.command:
params = StdioServerParameters(
command=cfg.command, args=cfg.args, env=cfg.env or None
@ -81,22 +125,46 @@ async def connect_mcp_servers(
read, write = await stack.enter_async_context(stdio_client(params))
elif cfg.url:
from mcp.client.streamable_http import streamable_http_client
read, write, _ = await stack.enter_async_context(
streamable_http_client(cfg.url)
)
else:
logger.warning(f"MCP server '{name}': no command or url configured, skipping")
continue
return
session = await stack.enter_async_context(ClientSession(read, write))
await session.initialize()
tools = await session.list_tools()
for tool_def in tools.tools:
wrapper = MCPToolWrapper(session, name, tool_def)
safe_server = _normalize_tool_segment(name)
safe_tool = _normalize_tool_segment(tool_def.name)
base = f"mcp_{safe_server}_{safe_tool}"
registered_name = base
i = 2
while registry.has(registered_name):
registered_name = f"{base}_{i}"
i += 1
wrapper = MCPToolWrapper(
session,
server_key=name,
tool_def=tool_def,
registered_name=registered_name,
)
registry.register(wrapper)
logger.debug(f"MCP: registered tool '{wrapper.name}' from server '{name}'")
logger.info(f"MCP server '{name}': connected, {len(tools.tools)} tools registered")
async def connect_mcp_servers(
mcp_servers: dict, registry: ToolRegistry, stack: AsyncExitStack
) -> None:
"""Connect to every configured MCP server and register their tools."""
for name, cfg in mcp_servers.items():
try:
await connect_mcp_server(name, cfg, registry, stack)
except Exception as e:
logger.error(f"MCP server '{name}': failed to connect: {e}")