tanyar09 cb7bbaf6e5 Improve MCP tool profiles, routing, and Gitea agent guidance
- Route forge-style messages to forge-related profiles instead of arbitrary *mcp* keys
- Expand agent loop, custom provider, session manager, filesystem/web tools
- Document local MCP server setup; extend setup-mcp-servers.sh and backlog
- System prompt: Gitea list_issues vs search_issues, issues vs PRs, since/date windows

Made-with: Cursor
2026-04-02 12:17:39 -04:00

937 lines
39 KiB
Python

"""Agent loop: the core processing engine."""
import asyncio
import copy
import json
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Awaitable, Callable
import json_repair
from loguru import logger
from nanobot.agent.context import ContextBuilder
from nanobot.agent.memory import MemoryStore
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.filesystem import EditFileTool, ListDirTool, ReadFileTool, WriteFileTool
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.web import WebFetchTool, WebSearchTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import ExecToolConfig, ToolRoutingConfig
from nanobot.cron.service import CronService
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
class AgentLoop:
"""
The agent loop is the core processing engine.
It:
1. Receives messages from the bus
2. Builds context with history, memory, skills
3. Calls the LLM
4. Executes tool calls
5. Sends responses back
"""
def __init__(
self,
bus: MessageBus,
provider: LLMProvider,
workspace: Path,
model: str | None = None,
max_iterations: int = 20,
temperature: float = 0.7,
max_tokens: int = 4096,
memory_window: int = 50,
brave_api_key: str | None = None,
exec_config: ExecToolConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
tool_profiles: dict | None = None,
default_tool_profile: str = "default",
tool_routing: ToolRoutingConfig | None = None,
):
self.bus = bus
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
self.max_iterations = max_iterations
self.temperature = temperature
self.max_tokens = max_tokens
self.memory_window = memory_window
self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig()
self.cron_service = cron_service
self.restrict_to_workspace = restrict_to_workspace
self.context = ContextBuilder(workspace)
self.sessions = session_manager or SessionManager(workspace)
self.tools = ToolRegistry()
self.subagents = SubagentManager(
provider=provider,
workspace=workspace,
bus=bus,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
brave_api_key=brave_api_key,
exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace,
)
self._running = False
self._mcp_servers = mcp_servers or {}
self._mcp_stacks: dict[str, AsyncExitStack] = {}
self._mcp_connected_servers: set[str] = set()
self._tool_profiles: dict = tool_profiles or {}
self._default_tool_profile = default_tool_profile
self._tool_routing = tool_routing or ToolRoutingConfig()
self._register_default_tools()
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
# File tools (restrict to workspace if configured)
allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(ReadFileTool(allowed_dir=allowed_dir))
self.tools.register(WriteFileTool(allowed_dir=allowed_dir))
self.tools.register(EditFileTool(allowed_dir=allowed_dir))
self.tools.register(ListDirTool(allowed_dir=allowed_dir))
# Shell tool
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
))
# Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(WebFetchTool())
# Message tool
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
self.tools.register(message_tool)
# Spawn tool (for subagents)
spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool)
# Cron tool (for scheduling)
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
# Email tool (if email channel is configured)
try:
from nanobot.agent.tools.email import EmailTool
from nanobot.config.loader import load_config
config = load_config()
if config.channels.email.enabled:
email_tool = EmailTool(
email_config=config.channels.email,
workspace=self.workspace,
)
self.tools.register(email_tool)
logger.info(f"Email tool '{email_tool.name}' registered successfully")
else:
logger.debug("Email tool not registered: email channel not enabled")
except Exception as e:
logger.warning(f"Email tool not available: {e}")
# Email tool not available or not configured - silently skip
# Calendar tool (if calendar is configured)
try:
from nanobot.agent.tools.calendar import CalendarTool
from nanobot.config.loader import load_config
config = load_config()
if config.tools.calendar.enabled:
calendar_tool = CalendarTool(calendar_config=config.tools.calendar)
self.tools.register(calendar_tool)
logger.info(f"Calendar tool '{calendar_tool.name}' registered successfully")
else:
logger.debug("Calendar tool not registered: calendar not enabled")
except Exception as e:
logger.warning(f"Calendar tool not available: {e}")
# Calendar tool not available or not configured - silently skip
def _unregister_mcp_tools_for_server(self, server_key: str) -> None:
"""Remove tools registered from one MCP server (prefix mcp_<key>_)."""
prefix = f"mcp_{server_key}_"
for name in list(self.tools.tool_names):
if name.startswith(prefix):
self.tools.unregister(name)
async def _disconnect_mcp_server(self, server_key: str) -> None:
"""Close one MCP server and remove its tools (used when switching tool profiles)."""
stack = self._mcp_stacks.pop(server_key, None)
if stack is not None:
try:
await stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass
self._unregister_mcp_tools_for_server(server_key)
self._mcp_connected_servers.discard(server_key)
logger.info(f"MCP server '{server_key}': disconnected")
async def _sync_mcp_to_profile_needs(self, needed_keys: list[str]) -> None:
"""
Ensure only MCP servers in needed_keys are connected: tear down extras, connect missing.
When tools.toolProfiles is empty, pass the full configured key list so all servers stay up.
"""
if not self._mcp_servers:
return
needed = set(needed_keys)
for key in list(self._mcp_connected_servers):
if key not in needed:
await self._disconnect_mcp_server(key)
connect_order = [k for k in self._mcp_servers.keys() if k in needed]
await self._ensure_mcp_servers_connected(connect_order)
async def _ensure_mcp_servers_connected(self, server_keys: list[str]) -> None:
"""Lazily connect MCP servers (each gets its own AsyncExitStack for per-server teardown)."""
if not self._mcp_servers or not server_keys:
return
pending = [
k
for k in server_keys
if k in self._mcp_servers and k not in self._mcp_connected_servers
]
if not pending:
return
from nanobot.agent.tools.mcp import connect_mcp_server
for key in pending:
stack = AsyncExitStack()
await stack.__aenter__()
try:
await connect_mcp_server(
key, self._mcp_servers[key], self.tools, stack
)
self._mcp_stacks[key] = stack
self._mcp_connected_servers.add(key)
except Exception as e:
logger.error(f"MCP server '{key}': failed to connect: {e}")
try:
await stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass
def _set_tool_context(self, channel: str, chat_id: str) -> None:
"""Update context for all tools that need routing info."""
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.set_context(channel, chat_id)
if spawn_tool := self.tools.get("spawn"):
if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(channel, chat_id)
if cron_tool := self.tools.get("cron"):
if isinstance(cron_tool, CronTool):
cron_tool.set_context(channel, chat_id)
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
@staticmethod
def _extract_routing_text(messages: list[dict]) -> str:
"""Last user message text (string or multimodal) for the tool-profile router."""
for m in reversed(messages):
if m.get("role") != "user":
continue
c = m.get("content")
if isinstance(c, str):
return c.strip()
if isinstance(c, list):
parts: list[str] = []
for block in c:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(str(block.get("text") or ""))
return "\n".join(parts).strip()
return ""
@staticmethod
def _contains_relative_link_reference(text: str) -> bool:
"""
Detect phrases like "first link you gave me" when no URL is provided.
Local models often miss this reference resolution step, so we inject a
lightweight hint with recent URLs from assistant history.
"""
t = (text or "").lower()
if not t:
return False
# If user already supplied a URL, no hint needed.
if "http://" in t or "https://" in t or "www." in t:
return False
markers = [
"first link", "1st link", "second link", "2nd link", "third link", "3rd link",
"last link", "final link", "from your last", "your last link",
"that link", "the link above", "link above", "previous link",
"first result", "second result", "third result",
"last result", "your last result",
"the first one", "the second one", "the third one", "the last one",
]
return any(m in t for m in markers)
@staticmethod
def _ordered_urls_from_latest_assistant(
history: list[dict], max_messages: int = 24, max_urls: int = 40
) -> list[str]:
"""
URLs from the single most recent assistant message only, in document order.
"Last link" / "first link" refer to this message, not older turns or deduped
global lists — local models otherwise pick unrelated URLs (e.g. joke APIs).
"""
url_pattern = re.compile(r"https?://[^\s)>\]\"']+")
for m in reversed(history[-max_messages:]):
if m.get("role") != "assistant":
continue
content = m.get("content")
if not isinstance(content, str) or not content:
continue
found = url_pattern.findall(content)
if not found:
continue
out: list[str] = []
seen: set[str] = set()
for u in found:
if u not in seen:
seen.add(u)
out.append(u)
if len(out) >= max_urls:
break
return out
return []
@staticmethod
def _extract_recent_assistant_urls(history: list[dict], max_messages: int = 12, max_urls: int = 10) -> list[str]:
"""Extract unique URLs from the most recent assistant messages."""
url_pattern = re.compile(r"https?://[^\s)>\]\"']+")
urls: list[str] = []
seen: set[str] = set()
for m in reversed(history[-max_messages:]):
if m.get("role") != "assistant":
continue
content = m.get("content")
if not isinstance(content, str) or not content:
continue
found = url_pattern.findall(content)
if not found:
continue
for u in found:
if u not in seen:
seen.add(u)
urls.append(u)
if len(urls) >= max_urls:
return urls
return urls
def _build_link_reference_hint(self, user_text: str, history: list[dict]) -> str | None:
"""Build an explicit URL mapping hint for relative link references."""
if not self._contains_relative_link_reference(user_text):
return None
# Prefer URLs from the latest assistant reply only ("your last link" = last URL there).
urls = self._ordered_urls_from_latest_assistant(history)
if not urls:
urls = self._extract_recent_assistant_urls(history)
if not urls:
return None
lines = [f"{idx + 1}. {url}" for idx, url in enumerate(urls)]
n = len(urls)
mapping = (
f'- "first link" / "first result" → URL #1.\n'
f'- "last link" / "from your last link" / "final link" → URL #{n}.\n'
f'- "second link" → URL #2 (if {n} >= 2).\n'
)
return (
"The user refers to a link from your **previous assistant message**. "
"Do not guess a URL (e.g. random joke sites). Use `web_fetch` on the URL below.\n"
f"{mapping}\n"
"URLs from that message (order = how they appeared):\n"
+ "\n".join(lines)
)
@staticmethod
def _append_turn_to_session(
session: Session,
user_content: str,
turn_tail: list[dict],
tools_used: list[str],
final_for_fallback: str | None = None,
) -> None:
"""Persist one user turn and the in-loop assistant/tool messages for the next LLM context."""
session.add_message("user", user_content)
if not turn_tail:
session.add_message(
"assistant",
final_for_fallback or "",
tools_used=tools_used if tools_used else None,
)
return
for i, m in enumerate(turn_tail):
role = m.get("role")
if role not in ("assistant", "tool"):
continue
content = m.get("content")
if content is None:
content = ""
kwargs = {
k: v for k, v in m.items() if k not in ("role", "content")
}
if role == "assistant" and i == len(turn_tail) - 1 and tools_used:
kwargs["tools_used"] = tools_used
session.add_message(role, content, **kwargs)
if (
final_for_fallback
and turn_tail
and turn_tail[-1].get("role") == "tool"
):
session.add_message(
"assistant",
final_for_fallback,
tools_used=tools_used if tools_used else None,
)
async def _pick_tool_profile(self, user_text: str) -> str:
"""Resolve profile key when tools.toolProfiles is configured."""
if not self._tool_profiles:
return self._default_tool_profile
if self._tool_routing.enabled:
from nanobot.agent.tool_routing import route_tool_profile
return await route_tool_profile(
self.provider,
model=self.model,
user_message=user_text,
profiles=self._tool_profiles,
default_profile=self._default_tool_profile,
temperature=self._tool_routing.router_temperature,
max_tokens=self._tool_routing.router_max_tokens,
)
return self._default_tool_profile
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used, turn_messages).
``turn_messages`` are messages appended after ``initial_messages`` (assistant
tool rounds + tool results + final assistant) for session persistence so
follow-up turns see tool outputs, not only final chat text.
"""
n0 = len(initial_messages)
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[str] = []
empty_final_retry_used = False
from nanobot.agent.tool_profiles import (
compute_allowed_tool_names,
mcp_keys_to_connect,
)
from nanobot.agent.tool_routing import is_tool_not_found_error
configured_mcp = list(self._mcp_servers.keys())
tools_expanded = False
allowed_names: set[str] | None = None
if self._tool_profiles:
routing_text = self._extract_routing_text(initial_messages)
profile_key = await self._pick_tool_profile(routing_text)
prof = self._tool_profiles[profile_key]
await self._sync_mcp_to_profile_needs(
mcp_keys_to_connect(prof, configured_mcp)
)
always = set(self._tool_routing.always_include_tools)
allowed_names = compute_allowed_tool_names(
self.tools,
prof,
configured_mcp,
always,
)
logger.info(
f"Tool profile '{profile_key}': {len(allowed_names)}/{len(self.tools)} tools exposed"
)
else:
await self._sync_mcp_to_profile_needs(configured_mcp)
tools_full = self.tools.get_definitions()
while iteration < self.max_iterations:
iteration += 1
logger.debug(f"Agent loop iteration {iteration}/{self.max_iterations}, calling LLM provider...")
if allowed_names is not None and not tools_expanded:
tool_defs = self.tools.get_definitions_subset(allowed_names)
else:
tool_defs = tools_full
try:
response = await asyncio.wait_for(
self.provider.chat(
messages=messages,
tools=tool_defs,
model=self.model,
temperature=self.temperature,
max_tokens=self.max_tokens,
),
timeout=120.0 # 2 minute timeout per LLM call
)
logger.debug(f"LLM provider returned response, has_tool_calls={response.has_tool_calls}")
except asyncio.TimeoutError:
logger.error("LLM provider call timed out after 120 seconds")
return "Error: Request timed out. The LLM provider may be slow or unresponsive.", tools_used, []
except Exception as e:
logger.error(f"LLM provider error: {e}")
return f"Error calling LLM: {str(e)}", tools_used, []
if response.has_tool_calls:
if on_progress:
clean = self._strip_think(response.content)
await on_progress(clean or self._tool_hint(response.tool_calls))
tool_call_dicts = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments)
}
}
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
)
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments)
logger.info(f"Tool result length: {len(result) if result else 0}, preview: {result[:200] if result else 'None'}")
if (
allowed_names is not None
and self._tool_routing.expand_on_missing_tool
and not tools_expanded
and is_tool_not_found_error(result)
):
tools_expanded = True
await self._sync_mcp_to_profile_needs(configured_mcp)
tools_full = self.tools.get_definitions()
logger.info(
"Expanded tool set to full registry (missing tool after profile filter)"
)
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
logger.debug(f"Added tool result to messages. Total messages: {len(messages)}")
else:
final_content = self._strip_think(response.content)
logger.info(f"Final response generated. Content length: {len(final_content) if final_content else 0}")
# Some local OpenAI-compatible backends occasionally return an empty assistant message.
# Retry once with an explicit nudge to either call a tool or answer in text.
if (not final_content or not final_content.strip()) and not empty_final_retry_used:
empty_final_retry_used = True
logger.warning(
"LLM returned empty final content; retrying once with a non-empty response nudge"
)
messages = messages + [
{
"role": "system",
"content": (
"Your previous reply was empty. You MUST either (a) call an appropriate tool, "
"or (b) respond with a short helpful text answer. Do not return an empty message."
),
}
]
final_content = None
continue
# Record final assistant in-message (was missing); enables session turn_tail extraction.
messages = self.context.add_assistant_message(
messages,
final_content,
None,
reasoning_content=response.reasoning_content,
)
break
if final_content is None and iteration >= self.max_iterations:
logger.warning(f"Max iterations ({self.max_iterations}) reached without final response. Last tool calls: {tools_used[-3:] if len(tools_used) >= 3 else tools_used}")
turn_tail = copy.deepcopy(messages[n0:])
return final_content, tools_used, turn_tail
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
self._running = True
logger.info("Agent loop started")
while self._running:
try:
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
try:
response = await self._process_message(msg)
if response:
await self.bus.publish_outbound(response)
except Exception as e:
logger.error(f"Error processing message: {e}")
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"Sorry, I encountered an error: {str(e)}"
))
except asyncio.TimeoutError:
continue
async def close_mcp(self) -> None:
"""Close all MCP connections and drop MCP tools from the registry."""
for key in list(
set(self._mcp_stacks.keys()) | self._mcp_connected_servers
):
await self._disconnect_mcp_server(key)
self._mcp_stacks.clear()
self._mcp_connected_servers.clear()
def stop(self) -> None:
"""Stop the agent loop."""
self._running = False
logger.info("Agent loop stopping")
async def _process_message(
self,
msg: InboundMessage,
session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
"""
# System messages route back via chat_id ("channel:chat_id")
if msg.channel == "system":
return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}")
key = session_key or msg.session_key
session = self.sessions.get_or_create(key)
# Handle slash commands
cmd = msg.content.strip().lower()
if cmd == "/new":
# Capture messages before clearing (avoid race condition with background task)
messages_to_archive = session.messages.copy()
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session.key)
async def _consolidate_and_cleanup():
temp_session = Session(key=session.key)
temp_session.messages = messages_to_archive
await self._consolidate_memory(temp_session, archive_all=True)
asyncio.create_task(_consolidate_and_cleanup())
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="New session started. Memory consolidation in progress.")
if cmd == "/help":
return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
content="🐈 nanobot commands:\n/new — Start a new conversation\n/help — Show available commands")
# Skip memory consolidation for CLI mode to avoid blocking/hanging
# Memory consolidation can be slow and CLI users want fast responses
if len(session.messages) > self.memory_window and msg.channel != "cli":
# Start memory consolidation in background with timeout protection
async def _consolidate_with_timeout():
try:
await asyncio.wait_for(
self._consolidate_memory(session),
timeout=120.0 # 2 minute timeout for memory consolidation
)
except asyncio.TimeoutError:
logger.warning(f"Memory consolidation timed out for session {session.key}")
except Exception as e:
logger.error(f"Memory consolidation error: {e}")
asyncio.create_task(_consolidate_with_timeout())
self._set_tool_context(msg.channel, msg.chat_id)
history = session.get_history(max_messages=self.memory_window)
hinted_message = msg.content
if link_hint := self._build_link_reference_hint(msg.content, history):
hinted_message = (
f"{msg.content}\n\n"
f"[Context hint for reference resolution]\n{link_hint}"
)
logger.debug("Injected link reference hint for follow-up resolution")
initial_messages = self.context.build_messages(
history=history,
current_message=hinted_message,
media=msg.media if msg.media else None,
channel=msg.channel,
chat_id=msg.chat_id,
)
async def _bus_progress(content: str) -> None:
# Skip progress updates for email channel to avoid sending intermediate tool call hints as emails
if msg.channel == "email":
return
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
))
final_content, tools_used, turn_tail = await self._run_agent_loop(
initial_messages, on_progress=on_progress or _bus_progress,
)
outbound_fallback = final_content
if final_content is None:
final_content = "I've completed processing but have no response to give."
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}")
self._append_turn_to_session(
session,
msg.content,
turn_tail,
tools_used,
final_for_fallback=outbound_fallback,
)
self.sessions.save(session)
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
)
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info(f"Processing system message from {msg.sender_id}")
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
else:
# Fallback
origin_channel = "cli"
origin_chat_id = msg.chat_id
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
self._set_tool_context(origin_channel, origin_chat_id)
initial_messages = self.context.build_messages(
history=session.get_history(max_messages=self.memory_window),
current_message=msg.content,
channel=origin_channel,
chat_id=origin_chat_id,
)
final_content, tools_used, turn_tail = await self._run_agent_loop(initial_messages)
sys_user = f"[System: {msg.sender_id}] {msg.content}"
outbound_fallback = final_content
if final_content is None:
final_content = "Background task completed."
self._append_turn_to_session(
session,
sys_user,
turn_tail,
tools_used,
final_for_fallback=outbound_fallback,
)
self.sessions.save(session)
return OutboundMessage(
channel=origin_channel,
chat_id=origin_chat_id,
content=final_content
)
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md.
Args:
archive_all: If True, clear all messages and reset session (for /new command).
If False, only write to files without modifying session.
"""
memory = MemoryStore(self.workspace)
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info(f"Memory consolidation (archive_all): {len(session.messages)} total messages archived")
else:
keep_count = self.memory_window // 2
if len(session.messages) <= keep_count:
logger.debug(f"Session {session.key}: No consolidation needed (messages={len(session.messages)}, keep={keep_count})")
return
messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <= 0:
logger.debug(f"Session {session.key}: No new messages to consolidate (last_consolidated={session.last_consolidated}, total={len(session.messages)})")
return
old_messages = session.messages[session.last_consolidated:-keep_count]
if not old_messages:
return
logger.info(f"Memory consolidation started: {len(session.messages)} total, {len(old_messages)} new to consolidate, {keep_count} keep")
lines = []
for m in old_messages:
if not m.get("content"):
continue
tools = f" [tools: {', '.join(m['tools_used'])}]" if m.get("tools_used") else ""
lines.append(f"[{m.get('timestamp', '?')[:16]}] {m['role'].upper()}{tools}: {m['content']}")
conversation = "\n".join(lines)
current_memory = memory.read_long_term()
prompt = f"""You are a memory consolidation agent. Process this conversation and return a JSON object with exactly two keys:
1. "history_entry": A paragraph (2-5 sentences) summarizing the key events/decisions/topics. Start with a timestamp like [YYYY-MM-DD HH:MM]. Include enough detail to be useful when found by grep search later.
2. "memory_update": The updated long-term memory content. Add any new facts: user location, preferences, personal info, habits, project context, technical decisions, tools/services used. If nothing new, return the existing content unchanged.
## Current Long-term Memory
{current_memory or "(empty)"}
## Conversation to Process
{conversation}
Respond with ONLY valid JSON, no markdown fences."""
try:
# Add timeout to memory consolidation LLM call
response = await asyncio.wait_for(
self.provider.chat(
messages=[
{"role": "system", "content": "You are a memory consolidation agent. Respond only with valid JSON."},
{"role": "user", "content": prompt},
],
model=self.model,
),
timeout=120.0 # 2 minute timeout for consolidation LLM call
)
text = (response.content or "").strip()
if not text:
logger.warning("Memory consolidation: LLM returned empty response, skipping")
return
if text.startswith("```"):
text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip()
result = json_repair.loads(text)
if not isinstance(result, dict):
logger.warning(f"Memory consolidation: unexpected response type, skipping. Response: {text[:200]}")
return
if entry := result.get("history_entry"):
# Convert to string if LLM returned a non-string (e.g., dict)
if not isinstance(entry, str):
entry = str(entry)
memory.append_history(entry)
if update := result.get("memory_update"):
# Convert to string if LLM returned a non-string (e.g., dict)
if not isinstance(update, str):
update = str(update)
if update != current_memory:
memory.write_long_term(update)
if archive_all:
session.last_consolidated = 0
else:
session.last_consolidated = len(session.messages) - keep_count
logger.info(f"Memory consolidation done: {len(session.messages)} messages, last_consolidated={session.last_consolidated}")
except Exception as e:
logger.error(f"Memory consolidation failed: {e}")
async def process_direct(
self,
content: str,
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""
Process a message directly (for CLI or cron usage).
Args:
content: The message content.
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
"""
msg = InboundMessage(
channel=channel,
sender_id="user",
chat_id=chat_id,
content=content
)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else ""