diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index a200e67..aa99d55 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1,7 +1,6 @@ """CLI commands for nanobot.""" import asyncio -import atexit import os import signal from pathlib import Path @@ -11,10 +10,14 @@ import sys import typer from rich.console import Console from rich.markdown import Markdown -from rich.panel import Panel from rich.table import Table from rich.text import Text +from prompt_toolkit import PromptSession +from prompt_toolkit.formatted_text import HTML +from prompt_toolkit.history import FileHistory +from prompt_toolkit.patch_stdout import patch_stdout + from nanobot import __version__, __logo__ app = typer.Typer( @@ -27,13 +30,10 @@ console = Console() EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"} # --------------------------------------------------------------------------- -# Lightweight CLI input: readline for arrow keys / history, termios for flush +# CLI input: prompt_toolkit for editing, paste, history, and display # --------------------------------------------------------------------------- -_READLINE = None -_HISTORY_FILE: Path | None = None -_HISTORY_HOOK_REGISTERED = False -_USING_LIBEDIT = False +_PROMPT_SESSION: PromptSession | None = None _SAVED_TERM_ATTRS = None # original termios settings, restored on exit @@ -64,15 +64,6 @@ def _flush_pending_tty_input() -> None: return -def _save_history() -> None: - if _READLINE is None or _HISTORY_FILE is None: - return - try: - _READLINE.write_history_file(str(_HISTORY_FILE)) - except Exception: - return - - def _restore_terminal() -> None: """Restore terminal to its original state (echo, line buffering, etc.).""" if _SAVED_TERM_ATTRS is None: @@ -84,11 +75,11 @@ def _restore_terminal() -> None: pass -def _enable_line_editing() -> None: - """Enable readline for arrow keys, line editing, and persistent history.""" - global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS +def _init_prompt_session() -> None: + """Create the prompt_toolkit session with persistent file history.""" + global _PROMPT_SESSION, _SAVED_TERM_ATTRS - # Save terminal state before readline touches it + # Save terminal state so we can restore it on exit try: import termios _SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno()) @@ -97,43 +88,12 @@ def _enable_line_editing() -> None: history_file = Path.home() / ".nanobot" / "history" / "cli_history" history_file.parent.mkdir(parents=True, exist_ok=True) - _HISTORY_FILE = history_file - try: - import readline - except ImportError: - return - - _READLINE = readline - _USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower() - - try: - if _USING_LIBEDIT: - readline.parse_and_bind("bind ^I rl_complete") - else: - readline.parse_and_bind("tab: complete") - readline.parse_and_bind("set editing-mode emacs") - except Exception: - pass - - try: - readline.read_history_file(str(history_file)) - except Exception: - pass - - if not _HISTORY_HOOK_REGISTERED: - atexit.register(_save_history) - _HISTORY_HOOK_REGISTERED = True - - -def _prompt_text() -> str: - """Build a readline-friendly colored prompt.""" - if _READLINE is None: - return "You: " - # libedit on macOS does not honor GNU readline non-printing markers. - if _USING_LIBEDIT: - return "\033[1;34mYou:\033[0m " - return "\001\033[1;34m\002You:\001\033[0m\002 " + _PROMPT_SESSION = PromptSession( + history=FileHistory(str(history_file)), + enable_open_in_editor=False, + multiline=False, # Enter submits (single line mode) + ) def _print_agent_response(response: str, render_markdown: bool) -> None: @@ -141,15 +101,8 @@ def _print_agent_response(response: str, render_markdown: bool) -> None: content = response or "" body = Markdown(content) if render_markdown else Text(content) console.print() - console.print( - Panel( - body, - title=f"{__logo__} nanobot", - title_align="left", - border_style="cyan", - padding=(0, 1), - ) - ) + console.print(f"[cyan]{__logo__} nanobot[/cyan]") + console.print(body) console.print() @@ -159,13 +112,25 @@ def _is_exit_command(command: str) -> bool: async def _read_interactive_input_async() -> str: - """Read user input with arrow keys and history (runs input() in a thread).""" + """Read user input using prompt_toolkit (handles paste, history, display). + + prompt_toolkit natively handles: + - Multiline paste (bracketed paste mode) + - History navigation (up/down arrows) + - Clean display (no ghost characters or artifacts) + """ + if _PROMPT_SESSION is None: + raise RuntimeError("Call _init_prompt_session() first") try: - return await asyncio.to_thread(input, _prompt_text()) + with patch_stdout(): + return await _PROMPT_SESSION.prompt_async( + HTML("You: "), + ) except EOFError as exc: raise KeyboardInterrupt from exc + def version_callback(value: bool): if value: console.print(f"{__logo__} nanobot v{__version__}") @@ -473,6 +438,7 @@ def agent( if logs: from contextlib import nullcontext return nullcontext() + # Animated spinner is safe to use with prompt_toolkit input handling return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots") if message: @@ -485,13 +451,10 @@ def agent( asyncio.run(run_once()) else: # Interactive mode - _enable_line_editing() + _init_prompt_session() console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n") - # input() runs in a worker thread that can't be cancelled. - # Without this handler, asyncio.run() would hang waiting for it. def _exit_on_sigint(signum, frame): - _save_history() _restore_terminal() console.print("\nGoodbye!") os._exit(0) @@ -508,7 +471,6 @@ def agent( continue if _is_exit_command(command): - _save_history() _restore_terminal() console.print("\nGoodbye!") break @@ -517,12 +479,10 @@ def agent( response = await agent_loop.process_direct(user_input, session_id) _print_agent_response(response, render_markdown=markdown) except KeyboardInterrupt: - _save_history() _restore_terminal() console.print("\nGoodbye!") break except EOFError: - _save_history() _restore_terminal() console.print("\nGoodbye!") break diff --git a/pyproject.toml b/pyproject.toml index 3c2fec9..b1b3c81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "slack-sdk>=3.26.0", "qq-botpy>=1.0.0", "python-socks[asyncio]>=2.4.0", + "prompt-toolkit>=3.0.0", ] [project.optional-dependencies] diff --git a/tests/test_cli_input.py b/tests/test_cli_input.py new file mode 100644 index 0000000..6f9c257 --- /dev/null +++ b/tests/test_cli_input.py @@ -0,0 +1,58 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from prompt_toolkit.formatted_text import HTML + +from nanobot.cli import commands + + +@pytest.fixture +def mock_prompt_session(): + """Mock the global prompt session.""" + mock_session = MagicMock() + mock_session.prompt_async = AsyncMock() + with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session): + yield mock_session + + +@pytest.mark.asyncio +async def test_read_interactive_input_async_returns_input(mock_prompt_session): + """Test that _read_interactive_input_async returns the user input from prompt_session.""" + mock_prompt_session.prompt_async.return_value = "hello world" + + result = await commands._read_interactive_input_async() + + assert result == "hello world" + mock_prompt_session.prompt_async.assert_called_once() + args, _ = mock_prompt_session.prompt_async.call_args + assert isinstance(args[0], HTML) # Verify HTML prompt is used + + +@pytest.mark.asyncio +async def test_read_interactive_input_async_handles_eof(mock_prompt_session): + """Test that EOFError converts to KeyboardInterrupt.""" + mock_prompt_session.prompt_async.side_effect = EOFError() + + with pytest.raises(KeyboardInterrupt): + await commands._read_interactive_input_async() + + +def test_init_prompt_session_creates_session(): + """Test that _init_prompt_session initializes the global session.""" + # Ensure global is None before test + commands._PROMPT_SESSION = None + + with patch("nanobot.cli.commands.PromptSession") as MockSession, \ + patch("nanobot.cli.commands.FileHistory") as MockHistory, \ + patch("pathlib.Path.home") as mock_home: + + mock_home.return_value = MagicMock() + + commands._init_prompt_session() + + assert commands._PROMPT_SESSION is not None + MockSession.assert_called_once() + _, kwargs = MockSession.call_args + assert kwargs["multiline"] is False + assert kwargs["enable_open_in_editor"] is False