330 lines
12 KiB
Python
330 lines
12 KiB
Python
"""Agent loop: the core processing engine."""
|
|
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.providers.base import LLMProvider
|
|
from nanobot.agent.context import ContextBuilder
|
|
from nanobot.agent.tools.registry import ToolRegistry
|
|
from nanobot.agent.tools.filesystem import ReadFileTool, WriteFileTool, EditFileTool, ListDirTool
|
|
from nanobot.agent.tools.shell import ExecTool
|
|
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
|
|
from nanobot.agent.tools.message import MessageTool
|
|
from nanobot.agent.tools.spawn import SpawnTool
|
|
from nanobot.agent.subagent import SubagentManager
|
|
from nanobot.session.manager import SessionManager
|
|
|
|
|
|
class AgentLoop:
|
|
"""
|
|
The agent loop is the core processing engine.
|
|
|
|
It:
|
|
1. Receives messages from the bus
|
|
2. Builds context with history, memory, skills
|
|
3. Calls the LLM
|
|
4. Executes tool calls
|
|
5. Sends responses back
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
bus: MessageBus,
|
|
provider: LLMProvider,
|
|
workspace: Path,
|
|
model: str | None = None,
|
|
max_iterations: int = 20,
|
|
brave_api_key: str | None = None
|
|
):
|
|
self.bus = bus
|
|
self.provider = provider
|
|
self.workspace = workspace
|
|
self.model = model or provider.get_default_model()
|
|
self.max_iterations = max_iterations
|
|
self.brave_api_key = brave_api_key
|
|
|
|
self.context = ContextBuilder(workspace)
|
|
self.sessions = SessionManager(workspace)
|
|
self.tools = ToolRegistry()
|
|
self.subagents = SubagentManager(
|
|
provider=provider,
|
|
workspace=workspace,
|
|
bus=bus,
|
|
model=self.model,
|
|
brave_api_key=brave_api_key,
|
|
)
|
|
|
|
self._running = False
|
|
self._register_default_tools()
|
|
|
|
def _register_default_tools(self) -> None:
|
|
"""Register the default set of tools."""
|
|
# File tools
|
|
self.tools.register(ReadFileTool())
|
|
self.tools.register(WriteFileTool())
|
|
self.tools.register(EditFileTool())
|
|
self.tools.register(ListDirTool())
|
|
|
|
# Shell tool
|
|
self.tools.register(ExecTool(working_dir=str(self.workspace)))
|
|
|
|
# Web tools
|
|
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
|
|
self.tools.register(WebFetchTool())
|
|
|
|
# Message tool
|
|
message_tool = MessageTool(send_callback=self.bus.publish_outbound)
|
|
self.tools.register(message_tool)
|
|
|
|
# Spawn tool (for subagents)
|
|
spawn_tool = SpawnTool(manager=self.subagents)
|
|
self.tools.register(spawn_tool)
|
|
|
|
async def run(self) -> None:
|
|
"""Run the agent loop, processing messages from the bus."""
|
|
self._running = True
|
|
logger.info("Agent loop started")
|
|
|
|
while self._running:
|
|
try:
|
|
# Wait for next message
|
|
msg = await asyncio.wait_for(
|
|
self.bus.consume_inbound(),
|
|
timeout=1.0
|
|
)
|
|
|
|
# Process it
|
|
try:
|
|
response = await self._process_message(msg)
|
|
if response:
|
|
await self.bus.publish_outbound(response)
|
|
except Exception as e:
|
|
logger.error(f"Error processing message: {e}")
|
|
# Send error response
|
|
await self.bus.publish_outbound(OutboundMessage(
|
|
channel=msg.channel,
|
|
chat_id=msg.chat_id,
|
|
content=f"Sorry, I encountered an error: {str(e)}"
|
|
))
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the agent loop."""
|
|
self._running = False
|
|
logger.info("Agent loop stopping")
|
|
|
|
async def _process_message(self, msg: InboundMessage) -> OutboundMessage | None:
|
|
"""
|
|
Process a single inbound message.
|
|
|
|
Args:
|
|
msg: The inbound message to process.
|
|
|
|
Returns:
|
|
The response message, or None if no response needed.
|
|
"""
|
|
# Handle system messages (subagent announces)
|
|
# The chat_id contains the original "channel:chat_id" to route back to
|
|
if msg.channel == "system":
|
|
return await self._process_system_message(msg)
|
|
|
|
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}")
|
|
|
|
# Get or create session
|
|
session = self.sessions.get_or_create(msg.session_key)
|
|
|
|
# Update tool contexts
|
|
message_tool = self.tools.get("message")
|
|
if isinstance(message_tool, MessageTool):
|
|
message_tool.set_context(msg.channel, msg.chat_id)
|
|
|
|
spawn_tool = self.tools.get("spawn")
|
|
if isinstance(spawn_tool, SpawnTool):
|
|
spawn_tool.set_context(msg.channel, msg.chat_id)
|
|
|
|
# Build initial messages (use get_history for LLM-formatted messages)
|
|
messages = self.context.build_messages(
|
|
history=session.get_history(),
|
|
current_message=msg.content,
|
|
media=msg.media if msg.media else None,
|
|
)
|
|
|
|
# Agent loop
|
|
iteration = 0
|
|
final_content = None
|
|
|
|
while iteration < self.max_iterations:
|
|
iteration += 1
|
|
|
|
# Call LLM
|
|
response = await self.provider.chat(
|
|
messages=messages,
|
|
tools=self.tools.get_definitions(),
|
|
model=self.model
|
|
)
|
|
|
|
# Handle tool calls
|
|
if response.has_tool_calls:
|
|
# Add assistant message with tool calls
|
|
tool_call_dicts = [
|
|
{
|
|
"id": tc.id,
|
|
"type": "function",
|
|
"function": {
|
|
"name": tc.name,
|
|
"arguments": json.dumps(tc.arguments) # Must be JSON string
|
|
}
|
|
}
|
|
for tc in response.tool_calls
|
|
]
|
|
messages = self.context.add_assistant_message(
|
|
messages, response.content, tool_call_dicts
|
|
)
|
|
|
|
# Execute tools
|
|
for tool_call in response.tool_calls:
|
|
args_str = json.dumps(tool_call.arguments)
|
|
logger.debug(f"Executing tool: {tool_call.name} with arguments: {args_str}")
|
|
result = await self.tools.execute(tool_call.name, tool_call.arguments)
|
|
messages = self.context.add_tool_result(
|
|
messages, tool_call.id, tool_call.name, result
|
|
)
|
|
else:
|
|
# No tool calls, we're done
|
|
final_content = response.content
|
|
break
|
|
|
|
if final_content is None:
|
|
final_content = "I've completed processing but have no response to give."
|
|
|
|
# Save to session
|
|
session.add_message("user", msg.content)
|
|
session.add_message("assistant", final_content)
|
|
self.sessions.save(session)
|
|
|
|
return OutboundMessage(
|
|
channel=msg.channel,
|
|
chat_id=msg.chat_id,
|
|
content=final_content
|
|
)
|
|
|
|
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
|
|
"""
|
|
Process a system message (e.g., subagent announce).
|
|
|
|
The chat_id field contains "original_channel:original_chat_id" to route
|
|
the response back to the correct destination.
|
|
"""
|
|
logger.info(f"Processing system message from {msg.sender_id}")
|
|
|
|
# Parse origin from chat_id (format: "channel:chat_id")
|
|
if ":" in msg.chat_id:
|
|
parts = msg.chat_id.split(":", 1)
|
|
origin_channel = parts[0]
|
|
origin_chat_id = parts[1]
|
|
else:
|
|
# Fallback
|
|
origin_channel = "cli"
|
|
origin_chat_id = msg.chat_id
|
|
|
|
# Use the origin session for context
|
|
session_key = f"{origin_channel}:{origin_chat_id}"
|
|
session = self.sessions.get_or_create(session_key)
|
|
|
|
# Update tool contexts
|
|
message_tool = self.tools.get("message")
|
|
if isinstance(message_tool, MessageTool):
|
|
message_tool.set_context(origin_channel, origin_chat_id)
|
|
|
|
spawn_tool = self.tools.get("spawn")
|
|
if isinstance(spawn_tool, SpawnTool):
|
|
spawn_tool.set_context(origin_channel, origin_chat_id)
|
|
|
|
# Build messages with the announce content
|
|
messages = self.context.build_messages(
|
|
history=session.get_history(),
|
|
current_message=msg.content
|
|
)
|
|
|
|
# Agent loop (limited for announce handling)
|
|
iteration = 0
|
|
final_content = None
|
|
|
|
while iteration < self.max_iterations:
|
|
iteration += 1
|
|
|
|
response = await self.provider.chat(
|
|
messages=messages,
|
|
tools=self.tools.get_definitions(),
|
|
model=self.model
|
|
)
|
|
|
|
if response.has_tool_calls:
|
|
tool_call_dicts = [
|
|
{
|
|
"id": tc.id,
|
|
"type": "function",
|
|
"function": {
|
|
"name": tc.name,
|
|
"arguments": json.dumps(tc.arguments)
|
|
}
|
|
}
|
|
for tc in response.tool_calls
|
|
]
|
|
messages = self.context.add_assistant_message(
|
|
messages, response.content, tool_call_dicts
|
|
)
|
|
|
|
for tool_call in response.tool_calls:
|
|
args_str = json.dumps(tool_call.arguments)
|
|
logger.debug(f"Executing tool: {tool_call.name} with arguments: {args_str}")
|
|
result = await self.tools.execute(tool_call.name, tool_call.arguments)
|
|
messages = self.context.add_tool_result(
|
|
messages, tool_call.id, tool_call.name, result
|
|
)
|
|
else:
|
|
final_content = response.content
|
|
break
|
|
|
|
if final_content is None:
|
|
final_content = "Background task completed."
|
|
|
|
# Save to session (mark as system message in history)
|
|
session.add_message("user", f"[System: {msg.sender_id}] {msg.content}")
|
|
session.add_message("assistant", final_content)
|
|
self.sessions.save(session)
|
|
|
|
return OutboundMessage(
|
|
channel=origin_channel,
|
|
chat_id=origin_chat_id,
|
|
content=final_content
|
|
)
|
|
|
|
async def process_direct(self, content: str, session_key: str = "cli:direct") -> str:
|
|
"""
|
|
Process a message directly (for CLI usage).
|
|
|
|
Args:
|
|
content: The message content.
|
|
session_key: Session identifier.
|
|
|
|
Returns:
|
|
The agent's response.
|
|
"""
|
|
msg = InboundMessage(
|
|
channel="cli",
|
|
sender_id="user",
|
|
chat_id="direct",
|
|
content=content
|
|
)
|
|
|
|
response = await self._process_message(msg)
|
|
return response.content if response else ""
|