feat(cli): rewrite input layer with prompt_toolkit and polish UI
- Replaces fragile input() hacks with robust prompt_toolkit.PromptSession - Native support for multiline paste, history, and clean display - Restores animated spinner in _thinking_ctx (now safe) - Replaces boxed Panel with clean header for easier copying - Adds prompt-toolkit dependency - Adds new unit tests for input layer
This commit is contained in:
parent
ea1d2d763a
commit
3561b6a63d
@ -1,7 +1,6 @@
|
|||||||
"""CLI commands for nanobot."""
|
"""CLI commands for nanobot."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import atexit
|
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -15,6 +14,11 @@ from rich.panel import Panel
|
|||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
|
|
||||||
|
from prompt_toolkit import PromptSession
|
||||||
|
from prompt_toolkit.formatted_text import HTML
|
||||||
|
from prompt_toolkit.history import FileHistory
|
||||||
|
from prompt_toolkit.patch_stdout import patch_stdout
|
||||||
|
|
||||||
from nanobot import __version__, __logo__
|
from nanobot import __version__, __logo__
|
||||||
|
|
||||||
app = typer.Typer(
|
app = typer.Typer(
|
||||||
@ -27,13 +31,10 @@ console = Console()
|
|||||||
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
|
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Lightweight CLI input: readline for arrow keys / history, termios for flush
|
# CLI input: prompt_toolkit for editing, paste, history, and display
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_READLINE = None
|
_PROMPT_SESSION: PromptSession | None = None
|
||||||
_HISTORY_FILE: Path | None = None
|
|
||||||
_HISTORY_HOOK_REGISTERED = False
|
|
||||||
_USING_LIBEDIT = False
|
|
||||||
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
|
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
|
||||||
|
|
||||||
|
|
||||||
@ -64,15 +65,6 @@ def _flush_pending_tty_input() -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def _save_history() -> None:
|
|
||||||
if _READLINE is None or _HISTORY_FILE is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
_READLINE.write_history_file(str(_HISTORY_FILE))
|
|
||||||
except Exception:
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def _restore_terminal() -> None:
|
def _restore_terminal() -> None:
|
||||||
"""Restore terminal to its original state (echo, line buffering, etc.)."""
|
"""Restore terminal to its original state (echo, line buffering, etc.)."""
|
||||||
if _SAVED_TERM_ATTRS is None:
|
if _SAVED_TERM_ATTRS is None:
|
||||||
@ -84,11 +76,11 @@ def _restore_terminal() -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _enable_line_editing() -> None:
|
def _init_prompt_session() -> None:
|
||||||
"""Enable readline for arrow keys, line editing, and persistent history."""
|
"""Create the prompt_toolkit session with persistent file history."""
|
||||||
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS
|
global _PROMPT_SESSION, _SAVED_TERM_ATTRS
|
||||||
|
|
||||||
# Save terminal state before readline touches it
|
# Save terminal state so we can restore it on exit
|
||||||
try:
|
try:
|
||||||
import termios
|
import termios
|
||||||
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
|
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
|
||||||
@ -97,59 +89,22 @@ def _enable_line_editing() -> None:
|
|||||||
|
|
||||||
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
|
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
|
||||||
history_file.parent.mkdir(parents=True, exist_ok=True)
|
history_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
_HISTORY_FILE = history_file
|
|
||||||
|
|
||||||
try:
|
_PROMPT_SESSION = PromptSession(
|
||||||
import readline
|
history=FileHistory(str(history_file)),
|
||||||
except ImportError:
|
enable_open_in_editor=False,
|
||||||
return
|
multiline=False, # Enter submits (single line mode)
|
||||||
|
)
|
||||||
_READLINE = readline
|
|
||||||
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
|
|
||||||
|
|
||||||
try:
|
|
||||||
if _USING_LIBEDIT:
|
|
||||||
readline.parse_and_bind("bind ^I rl_complete")
|
|
||||||
else:
|
|
||||||
readline.parse_and_bind("tab: complete")
|
|
||||||
readline.parse_and_bind("set editing-mode emacs")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
readline.read_history_file(str(history_file))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if not _HISTORY_HOOK_REGISTERED:
|
|
||||||
atexit.register(_save_history)
|
|
||||||
_HISTORY_HOOK_REGISTERED = True
|
|
||||||
|
|
||||||
|
|
||||||
def _prompt_text() -> str:
|
|
||||||
"""Build a readline-friendly colored prompt."""
|
|
||||||
if _READLINE is None:
|
|
||||||
return "You: "
|
|
||||||
# libedit on macOS does not honor GNU readline non-printing markers.
|
|
||||||
if _USING_LIBEDIT:
|
|
||||||
return "\033[1;34mYou:\033[0m "
|
|
||||||
return "\001\033[1;34m\002You:\001\033[0m\002 "
|
|
||||||
|
|
||||||
|
|
||||||
def _print_agent_response(response: str, render_markdown: bool) -> None:
|
def _print_agent_response(response: str, render_markdown: bool) -> None:
|
||||||
"""Render assistant response with consistent terminal styling."""
|
"""Render assistant response with clean, copy-friendly header."""
|
||||||
content = response or ""
|
content = response or ""
|
||||||
body = Markdown(content) if render_markdown else Text(content)
|
body = Markdown(content) if render_markdown else Text(content)
|
||||||
console.print()
|
console.print()
|
||||||
console.print(
|
# Use a simple header instead of a Panel box, making it easier to copy text
|
||||||
Panel(
|
console.print(f"{__logo__} [bold cyan]nanobot[/bold cyan]")
|
||||||
body,
|
console.print(body)
|
||||||
title=f"{__logo__} nanobot",
|
|
||||||
title_align="left",
|
|
||||||
border_style="cyan",
|
|
||||||
padding=(0, 1),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
console.print()
|
console.print()
|
||||||
|
|
||||||
|
|
||||||
@ -159,13 +114,25 @@ def _is_exit_command(command: str) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
async def _read_interactive_input_async() -> str:
|
async def _read_interactive_input_async() -> str:
|
||||||
"""Read user input with arrow keys and history (runs input() in a thread)."""
|
"""Read user input using prompt_toolkit (handles paste, history, display).
|
||||||
|
|
||||||
|
prompt_toolkit natively handles:
|
||||||
|
- Multiline paste (bracketed paste mode)
|
||||||
|
- History navigation (up/down arrows)
|
||||||
|
- Clean display (no ghost characters or artifacts)
|
||||||
|
"""
|
||||||
|
if _PROMPT_SESSION is None:
|
||||||
|
raise RuntimeError("Call _init_prompt_session() first")
|
||||||
try:
|
try:
|
||||||
return await asyncio.to_thread(input, _prompt_text())
|
with patch_stdout():
|
||||||
|
return await _PROMPT_SESSION.prompt_async(
|
||||||
|
HTML("<b fg='ansiblue'>You:</b> "),
|
||||||
|
)
|
||||||
except EOFError as exc:
|
except EOFError as exc:
|
||||||
raise KeyboardInterrupt from exc
|
raise KeyboardInterrupt from exc
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def version_callback(value: bool):
|
def version_callback(value: bool):
|
||||||
if value:
|
if value:
|
||||||
console.print(f"{__logo__} nanobot v{__version__}")
|
console.print(f"{__logo__} nanobot v{__version__}")
|
||||||
@ -473,6 +440,7 @@ def agent(
|
|||||||
if logs:
|
if logs:
|
||||||
from contextlib import nullcontext
|
from contextlib import nullcontext
|
||||||
return nullcontext()
|
return nullcontext()
|
||||||
|
# Animated spinner is safe to use with prompt_toolkit input handling
|
||||||
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
|
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
@ -485,13 +453,10 @@ def agent(
|
|||||||
asyncio.run(run_once())
|
asyncio.run(run_once())
|
||||||
else:
|
else:
|
||||||
# Interactive mode
|
# Interactive mode
|
||||||
_enable_line_editing()
|
_init_prompt_session()
|
||||||
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
|
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
|
||||||
|
|
||||||
# input() runs in a worker thread that can't be cancelled.
|
|
||||||
# Without this handler, asyncio.run() would hang waiting for it.
|
|
||||||
def _exit_on_sigint(signum, frame):
|
def _exit_on_sigint(signum, frame):
|
||||||
_save_history()
|
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
@ -508,7 +473,6 @@ def agent(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if _is_exit_command(command):
|
if _is_exit_command(command):
|
||||||
_save_history()
|
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
break
|
break
|
||||||
@ -517,12 +481,10 @@ def agent(
|
|||||||
response = await agent_loop.process_direct(user_input, session_id)
|
response = await agent_loop.process_direct(user_input, session_id)
|
||||||
_print_agent_response(response, render_markdown=markdown)
|
_print_agent_response(response, render_markdown=markdown)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
_save_history()
|
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
break
|
break
|
||||||
except EOFError:
|
except EOFError:
|
||||||
_save_history()
|
|
||||||
_restore_terminal()
|
_restore_terminal()
|
||||||
console.print("\nGoodbye!")
|
console.print("\nGoodbye!")
|
||||||
break
|
break
|
||||||
|
|||||||
@ -37,6 +37,7 @@ dependencies = [
|
|||||||
"slack-sdk>=3.26.0",
|
"slack-sdk>=3.26.0",
|
||||||
"qq-botpy>=1.0.0",
|
"qq-botpy>=1.0.0",
|
||||||
"python-socks[asyncio]>=2.4.0",
|
"python-socks[asyncio]>=2.4.0",
|
||||||
|
"prompt-toolkit>=3.0.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
58
tests/test_cli_input.py
Normal file
58
tests/test_cli_input.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import asyncio
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from prompt_toolkit.formatted_text import HTML
|
||||||
|
|
||||||
|
from nanobot.cli import commands
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_prompt_session():
|
||||||
|
"""Mock the global prompt session."""
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.prompt_async = AsyncMock()
|
||||||
|
with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session):
|
||||||
|
yield mock_session
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_read_interactive_input_async_returns_input(mock_prompt_session):
|
||||||
|
"""Test that _read_interactive_input_async returns the user input from prompt_session."""
|
||||||
|
mock_prompt_session.prompt_async.return_value = "hello world"
|
||||||
|
|
||||||
|
result = await commands._read_interactive_input_async()
|
||||||
|
|
||||||
|
assert result == "hello world"
|
||||||
|
mock_prompt_session.prompt_async.assert_called_once()
|
||||||
|
args, _ = mock_prompt_session.prompt_async.call_args
|
||||||
|
assert isinstance(args[0], HTML) # Verify HTML prompt is used
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_read_interactive_input_async_handles_eof(mock_prompt_session):
|
||||||
|
"""Test that EOFError converts to KeyboardInterrupt."""
|
||||||
|
mock_prompt_session.prompt_async.side_effect = EOFError()
|
||||||
|
|
||||||
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
await commands._read_interactive_input_async()
|
||||||
|
|
||||||
|
|
||||||
|
def test_init_prompt_session_creates_session():
|
||||||
|
"""Test that _init_prompt_session initializes the global session."""
|
||||||
|
# Ensure global is None before test
|
||||||
|
commands._PROMPT_SESSION = None
|
||||||
|
|
||||||
|
with patch("nanobot.cli.commands.PromptSession") as MockSession, \
|
||||||
|
patch("nanobot.cli.commands.FileHistory") as MockHistory, \
|
||||||
|
patch("pathlib.Path.home") as mock_home:
|
||||||
|
|
||||||
|
mock_home.return_value = MagicMock()
|
||||||
|
|
||||||
|
commands._init_prompt_session()
|
||||||
|
|
||||||
|
assert commands._PROMPT_SESSION is not None
|
||||||
|
MockSession.assert_called_once()
|
||||||
|
_, kwargs = MockSession.call_args
|
||||||
|
assert kwargs["multiline"] is False
|
||||||
|
assert kwargs["enable_open_in_editor"] is False
|
||||||
Loading…
x
Reference in New Issue
Block a user