Merge pull request #802 to enable stream intermediate progress

feat: stream intermediate progress to user during tool execution
This commit is contained in:
Xubin Ren 2026-02-18 22:28:37 +08:00 committed by GitHub
commit 92d279924f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 57 additions and 11 deletions

View File

@ -16,7 +16,7 @@
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
📏 Real-time line count: **3,696 lines** (run `bash core_agent_lines.sh` to verify anytime)
📏 Real-time line count: **3,761 lines** (run `bash core_agent_lines.sh` to verify anytime)
## 📢 News

View File

@ -105,7 +105,7 @@ IMPORTANT: When responding to direct questions or conversations, reply directly
Only use the 'message' tool when you need to send a message to a specific chat channel (like WhatsApp).
For normal conversation, just respond with text - do not call the message tool.
Always be helpful, accurate, and concise. When using tools, think step by step: what you know, what you need, and why you chose this tool.
Always be helpful, accurate, and concise. Before calling tools, briefly tell the user what you're about to do (one short sentence in the user's language).
When remembering something important, write to {workspace_path}/memory/MEMORY.md
To recall past events, grep {workspace_path}/memory/HISTORY.md"""

View File

@ -5,7 +5,8 @@ from contextlib import AsyncExitStack
import json
import json_repair
from pathlib import Path
from typing import Any
import re
from typing import Any, Awaitable, Callable
from loguru import logger
@ -146,12 +147,34 @@ class AgentLoop:
if isinstance(cron_tool, CronTool):
cron_tool.set_context(channel, chat_id)
async def _run_agent_loop(self, initial_messages: list[dict]) -> tuple[str | None, list[str]]:
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used).
@ -173,6 +196,10 @@ class AgentLoop:
)
if response.has_tool_calls:
if on_progress:
clean = self._strip_think(response.content)
await on_progress(clean or self._tool_hint(response.tool_calls))
tool_call_dicts = [
{
"id": tc.id,
@ -197,9 +224,8 @@ class AgentLoop:
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
messages.append({"role": "user", "content": "Reflect on the results and decide next steps."})
else:
final_content = response.content
final_content = self._strip_think(response.content)
break
return final_content, tools_used
@ -244,13 +270,19 @@ class AgentLoop:
self._running = False
logger.info("Agent loop stopping")
async def _process_message(self, msg: InboundMessage, session_key: str | None = None) -> OutboundMessage | None:
async def _process_message(
self,
msg: InboundMessage,
session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
@ -297,7 +329,16 @@ class AgentLoop:
channel=msg.channel,
chat_id=msg.chat_id,
)
final_content, tools_used = await self._run_agent_loop(initial_messages)
async def _bus_progress(content: str) -> None:
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=content,
metadata=msg.metadata or {},
))
final_content, tools_used = await self._run_agent_loop(
initial_messages, on_progress=on_progress or _bus_progress,
)
if final_content is None:
final_content = "I've completed processing but have no response to give."
@ -451,6 +492,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""
Process a message directly (for CLI or cron usage).
@ -460,6 +502,7 @@ Respond with ONLY valid JSON, no markdown fences."""
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
@ -472,5 +515,5 @@ Respond with ONLY valid JSON, no markdown fences."""
content=content
)
response = await self._process_message(msg, session_key=session_key)
response = await self._process_message(msg, session_key=session_key, on_progress=on_progress)
return response.content if response else ""

View File

@ -494,11 +494,14 @@ def agent(
# Animated spinner is safe to use with prompt_toolkit input handling
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
async def _cli_progress(content: str) -> None:
console.print(f" [dim]↳ {content}[/dim]")
if message:
# Single message mode
async def run_once():
with _thinking_ctx():
response = await agent_loop.process_direct(message, session_id)
response = await agent_loop.process_direct(message, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
await agent_loop.close_mcp()
@ -531,7 +534,7 @@ def agent(
break
with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id)
response = await agent_loop.process_direct(user_input, session_id, on_progress=_cli_progress)
_print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt:
_restore_terminal()