Merge PR #488: refactor CLI input with prompt_toolkit

This commit is contained in:
Re-bin 2026-02-11 09:38:11 +00:00
commit c8831a1e1e
3 changed files with 93 additions and 74 deletions

View File

@ -1,7 +1,6 @@
"""CLI commands for nanobot.""" """CLI commands for nanobot."""
import asyncio import asyncio
import atexit
import os import os
import signal import signal
from pathlib import Path from pathlib import Path
@ -11,10 +10,14 @@ import sys
import typer import typer
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
from nanobot import __version__, __logo__ from nanobot import __version__, __logo__
app = typer.Typer( app = typer.Typer(
@ -27,13 +30,10 @@ console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"} EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush # CLI input: prompt_toolkit for editing, paste, history, and display
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_READLINE = None _PROMPT_SESSION: PromptSession | None = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit _SAVED_TERM_ATTRS = None # original termios settings, restored on exit
@ -64,15 +64,6 @@ def _flush_pending_tty_input() -> None:
return return
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
def _restore_terminal() -> None: def _restore_terminal() -> None:
"""Restore terminal to its original state (echo, line buffering, etc.).""" """Restore terminal to its original state (echo, line buffering, etc.)."""
if _SAVED_TERM_ATTRS is None: if _SAVED_TERM_ATTRS is None:
@ -84,11 +75,11 @@ def _restore_terminal() -> None:
pass pass
def _enable_line_editing() -> None: def _init_prompt_session() -> None:
"""Enable readline for arrow keys, line editing, and persistent history.""" """Create the prompt_toolkit session with persistent file history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS global _PROMPT_SESSION, _SAVED_TERM_ATTRS
# Save terminal state before readline touches it # Save terminal state so we can restore it on exit
try: try:
import termios import termios
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno()) _SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
@ -97,43 +88,12 @@ def _enable_line_editing() -> None:
history_file = Path.home() / ".nanobot" / "history" / "cli_history" history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True) history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
try: _PROMPT_SESSION = PromptSession(
import readline history=FileHistory(str(history_file)),
except ImportError: enable_open_in_editor=False,
return multiline=False, # Enter submits (single line mode)
)
_READLINE = readline
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
try:
if _USING_LIBEDIT:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
except Exception:
pass
try:
readline.read_history_file(str(history_file))
except Exception:
pass
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
def _prompt_text() -> str:
"""Build a readline-friendly colored prompt."""
if _READLINE is None:
return "You: "
# libedit on macOS does not honor GNU readline non-printing markers.
if _USING_LIBEDIT:
return "\033[1;34mYou:\033[0m "
return "\001\033[1;34m\002You:\001\033[0m\002 "
def _print_agent_response(response: str, render_markdown: bool) -> None: def _print_agent_response(response: str, render_markdown: bool) -> None:
@ -141,15 +101,8 @@ def _print_agent_response(response: str, render_markdown: bool) -> None:
content = response or "" content = response or ""
body = Markdown(content) if render_markdown else Text(content) body = Markdown(content) if render_markdown else Text(content)
console.print() console.print()
console.print( console.print(f"[cyan]{__logo__} nanobot[/cyan]")
Panel( console.print(body)
body,
title=f"{__logo__} nanobot",
title_align="left",
border_style="cyan",
padding=(0, 1),
)
)
console.print() console.print()
@ -159,13 +112,25 @@ def _is_exit_command(command: str) -> bool:
async def _read_interactive_input_async() -> str: async def _read_interactive_input_async() -> str:
"""Read user input with arrow keys and history (runs input() in a thread).""" """Read user input using prompt_toolkit (handles paste, history, display).
prompt_toolkit natively handles:
- Multiline paste (bracketed paste mode)
- History navigation (up/down arrows)
- Clean display (no ghost characters or artifacts)
"""
if _PROMPT_SESSION is None:
raise RuntimeError("Call _init_prompt_session() first")
try: try:
return await asyncio.to_thread(input, _prompt_text()) with patch_stdout():
return await _PROMPT_SESSION.prompt_async(
HTML("<b fg='ansiblue'>You:</b> "),
)
except EOFError as exc: except EOFError as exc:
raise KeyboardInterrupt from exc raise KeyboardInterrupt from exc
def version_callback(value: bool): def version_callback(value: bool):
if value: if value:
console.print(f"{__logo__} nanobot v{__version__}") console.print(f"{__logo__} nanobot v{__version__}")
@ -473,6 +438,7 @@ def agent(
if logs: if logs:
from contextlib import nullcontext from contextlib import nullcontext
return nullcontext() return nullcontext()
# Animated spinner is safe to use with prompt_toolkit input handling
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots") return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
if message: if message:
@ -485,13 +451,10 @@ def agent(
asyncio.run(run_once()) asyncio.run(run_once())
else: else:
# Interactive mode # Interactive mode
_enable_line_editing() _init_prompt_session()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
# input() runs in a worker thread that can't be cancelled.
# Without this handler, asyncio.run() would hang waiting for it.
def _exit_on_sigint(signum, frame): def _exit_on_sigint(signum, frame):
_save_history()
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
os._exit(0) os._exit(0)
@ -508,7 +471,6 @@ def agent(
continue continue
if _is_exit_command(command): if _is_exit_command(command):
_save_history()
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
@ -517,12 +479,10 @@ def agent(
response = await agent_loop.process_direct(user_input, session_id) response = await agent_loop.process_direct(user_input, session_id)
_print_agent_response(response, render_markdown=markdown) _print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt: except KeyboardInterrupt:
_save_history()
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
except EOFError: except EOFError:
_save_history()
_restore_terminal() _restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break

View File

@ -37,6 +37,7 @@ dependencies = [
"slack-sdk>=3.26.0", "slack-sdk>=3.26.0",
"qq-botpy>=1.0.0", "qq-botpy>=1.0.0",
"python-socks[asyncio]>=2.4.0", "python-socks[asyncio]>=2.4.0",
"prompt-toolkit>=3.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

58
tests/test_cli_input.py Normal file
View File

@ -0,0 +1,58 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from prompt_toolkit.formatted_text import HTML
from nanobot.cli import commands
@pytest.fixture
def mock_prompt_session():
"""Mock the global prompt session."""
mock_session = MagicMock()
mock_session.prompt_async = AsyncMock()
with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session):
yield mock_session
@pytest.mark.asyncio
async def test_read_interactive_input_async_returns_input(mock_prompt_session):
"""Test that _read_interactive_input_async returns the user input from prompt_session."""
mock_prompt_session.prompt_async.return_value = "hello world"
result = await commands._read_interactive_input_async()
assert result == "hello world"
mock_prompt_session.prompt_async.assert_called_once()
args, _ = mock_prompt_session.prompt_async.call_args
assert isinstance(args[0], HTML) # Verify HTML prompt is used
@pytest.mark.asyncio
async def test_read_interactive_input_async_handles_eof(mock_prompt_session):
"""Test that EOFError converts to KeyboardInterrupt."""
mock_prompt_session.prompt_async.side_effect = EOFError()
with pytest.raises(KeyboardInterrupt):
await commands._read_interactive_input_async()
def test_init_prompt_session_creates_session():
"""Test that _init_prompt_session initializes the global session."""
# Ensure global is None before test
commands._PROMPT_SESSION = None
with patch("nanobot.cli.commands.PromptSession") as MockSession, \
patch("nanobot.cli.commands.FileHistory") as MockHistory, \
patch("pathlib.Path.home") as mock_home:
mock_home.return_value = MagicMock()
commands._init_prompt_session()
assert commands._PROMPT_SESSION is not None
MockSession.assert_called_once()
_, kwargs = MockSession.call_args
assert kwargs["multiline"] is False
assert kwargs["enable_open_in_editor"] is False