diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 1dab818..c90ecde 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1,7 +1,11 @@ """CLI commands for nanobot.""" import asyncio +import atexit +import os from pathlib import Path +import select +import sys import typer from rich.console import Console @@ -17,6 +21,104 @@ app = typer.Typer( console = Console() +# --------------------------------------------------------------------------- +# Lightweight CLI input: readline for arrow keys / history, termios for flush +# --------------------------------------------------------------------------- + +_READLINE = None +_HISTORY_FILE: Path | None = None +_HISTORY_HOOK_REGISTERED = False +_USING_LIBEDIT = False + + +def _flush_pending_tty_input() -> None: + """Drop unread keypresses typed while the model was generating output.""" + try: + fd = sys.stdin.fileno() + if not os.isatty(fd): + return + except Exception: + return + + try: + import termios + termios.tcflush(fd, termios.TCIFLUSH) + return + except Exception: + pass + + try: + while True: + ready, _, _ = select.select([fd], [], [], 0) + if not ready: + break + if not os.read(fd, 4096): + break + except Exception: + return + + +def _save_history() -> None: + if _READLINE is None or _HISTORY_FILE is None: + return + try: + _READLINE.write_history_file(str(_HISTORY_FILE)) + except Exception: + return + + +def _enable_line_editing() -> None: + """Enable readline for arrow keys, line editing, and persistent history.""" + global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT + + history_file = Path.home() / ".nanobot" / "history" / "cli_history" + history_file.parent.mkdir(parents=True, exist_ok=True) + _HISTORY_FILE = history_file + + try: + import readline + except ImportError: + return + + _READLINE = readline + _USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower() + + try: + if _USING_LIBEDIT: + readline.parse_and_bind("bind ^I rl_complete") + else: + readline.parse_and_bind("tab: complete") + readline.parse_and_bind("set editing-mode emacs") + except Exception: + pass + + try: + readline.read_history_file(str(history_file)) + except Exception: + pass + + if not _HISTORY_HOOK_REGISTERED: + atexit.register(_save_history) + _HISTORY_HOOK_REGISTERED = True + + +def _prompt_text() -> str: + """Build a readline-friendly colored prompt.""" + if _READLINE is None: + return "You: " + # libedit on macOS does not honor GNU readline non-printing markers. + if _USING_LIBEDIT: + return "\033[1;34mYou:\033[0m " + return "\001\033[1;34m\002You:\001\033[0m\002 " + + +async def _read_interactive_input_async() -> str: + """Read user input with arrow keys and history (runs input() in a thread).""" + try: + return await asyncio.to_thread(input, _prompt_text()) + except EOFError as exc: + raise KeyboardInterrupt from exc + def version_callback(value: bool): if value: @@ -316,12 +418,14 @@ def agent( asyncio.run(run_once()) else: # Interactive mode + _enable_line_editing() console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n") async def run_interactive(): while True: try: - user_input = console.input("[bold blue]You:[/bold blue] ") + _flush_pending_tty_input() + user_input = await _read_interactive_input_async() if not user_input.strip(): continue diff --git a/tests/test_cli_input_minimal.py b/tests/test_cli_input_minimal.py new file mode 100644 index 0000000..4726ea3 --- /dev/null +++ b/tests/test_cli_input_minimal.py @@ -0,0 +1,56 @@ +import builtins + +import nanobot.cli.commands as commands + + +def test_read_interactive_input_uses_plain_input(monkeypatch) -> None: + captured: dict[str, str] = {} + def fake_input(prompt: str = "") -> str: + captured["prompt"] = prompt + return "hello" + + monkeypatch.setattr(builtins, "input", fake_input) + monkeypatch.setattr(commands, "_PROMPT_SESSION", None) + monkeypatch.setattr(commands, "_READLINE", None) + + value = commands._read_interactive_input() + + assert value == "hello" + assert captured["prompt"] == "You: " + + +def test_read_interactive_input_prefers_prompt_session(monkeypatch) -> None: + captured: dict[str, object] = {} + + class FakePromptSession: + async def prompt_async(self, label: object) -> str: + captured["label"] = label + return "hello" + + monkeypatch.setattr(commands, "_PROMPT_SESSION", FakePromptSession()) + monkeypatch.setattr(commands, "_PROMPT_SESSION_LABEL", "LBL") + + value = __import__("asyncio").run(commands._read_interactive_input_async()) + + assert value == "hello" + assert captured["label"] == "LBL" + + +def test_prompt_text_for_readline_modes(monkeypatch) -> None: + monkeypatch.setattr(commands, "_READLINE", object()) + monkeypatch.setattr(commands, "_USING_LIBEDIT", True) + assert commands._prompt_text() == "\033[1;34mYou:\033[0m " + + monkeypatch.setattr(commands, "_USING_LIBEDIT", False) + assert "\001" in commands._prompt_text() + + +def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None: + class FakeStdin: + def fileno(self) -> int: + return 0 + + monkeypatch.setattr(commands.sys, "stdin", FakeStdin()) + monkeypatch.setattr(commands.os, "isatty", lambda _fd: False) + + commands._flush_pending_tty_input()