Merge PR #326: fix cli input arrow keys

This commit is contained in:
Re-bin 2026-02-08 18:24:29 +00:00
commit 119f94c57a
2 changed files with 161 additions and 1 deletions

View File

@ -1,7 +1,11 @@
"""CLI commands for nanobot.""" """CLI commands for nanobot."""
import asyncio import asyncio
import atexit
import os
from pathlib import Path from pathlib import Path
import select
import sys
import typer import typer
from rich.console import Console from rich.console import Console
@ -17,6 +21,104 @@ app = typer.Typer(
console = Console() console = Console()
# ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush
# ---------------------------------------------------------------------------
_READLINE = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
def _flush_pending_tty_input() -> None:
"""Drop unread keypresses typed while the model was generating output."""
try:
fd = sys.stdin.fileno()
if not os.isatty(fd):
return
except Exception:
return
try:
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
pass
try:
while True:
ready, _, _ = select.select([fd], [], [], 0)
if not ready:
break
if not os.read(fd, 4096):
break
except Exception:
return
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
def _enable_line_editing() -> None:
"""Enable readline for arrow keys, line editing, and persistent history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
try:
import readline
except ImportError:
return
_READLINE = readline
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
try:
if _USING_LIBEDIT:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
except Exception:
pass
try:
readline.read_history_file(str(history_file))
except Exception:
pass
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
def _prompt_text() -> str:
"""Build a readline-friendly colored prompt."""
if _READLINE is None:
return "You: "
# libedit on macOS does not honor GNU readline non-printing markers.
if _USING_LIBEDIT:
return "\033[1;34mYou:\033[0m "
return "\001\033[1;34m\002You:\001\033[0m\002 "
async def _read_interactive_input_async() -> str:
"""Read user input with arrow keys and history (runs input() in a thread)."""
try:
return await asyncio.to_thread(input, _prompt_text())
except EOFError as exc:
raise KeyboardInterrupt from exc
def version_callback(value: bool): def version_callback(value: bool):
if value: if value:
@ -316,12 +418,14 @@ def agent(
asyncio.run(run_once()) asyncio.run(run_once())
else: else:
# Interactive mode # Interactive mode
_enable_line_editing()
console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n") console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n")
async def run_interactive(): async def run_interactive():
while True: while True:
try: try:
user_input = console.input("[bold blue]You:[/bold blue] ") _flush_pending_tty_input()
user_input = await _read_interactive_input_async()
if not user_input.strip(): if not user_input.strip():
continue continue

View File

@ -0,0 +1,56 @@
import builtins
import nanobot.cli.commands as commands
def test_read_interactive_input_uses_plain_input(monkeypatch) -> None:
captured: dict[str, str] = {}
def fake_input(prompt: str = "") -> str:
captured["prompt"] = prompt
return "hello"
monkeypatch.setattr(builtins, "input", fake_input)
monkeypatch.setattr(commands, "_PROMPT_SESSION", None)
monkeypatch.setattr(commands, "_READLINE", None)
value = commands._read_interactive_input()
assert value == "hello"
assert captured["prompt"] == "You: "
def test_read_interactive_input_prefers_prompt_session(monkeypatch) -> None:
captured: dict[str, object] = {}
class FakePromptSession:
async def prompt_async(self, label: object) -> str:
captured["label"] = label
return "hello"
monkeypatch.setattr(commands, "_PROMPT_SESSION", FakePromptSession())
monkeypatch.setattr(commands, "_PROMPT_SESSION_LABEL", "LBL")
value = __import__("asyncio").run(commands._read_interactive_input_async())
assert value == "hello"
assert captured["label"] == "LBL"
def test_prompt_text_for_readline_modes(monkeypatch) -> None:
monkeypatch.setattr(commands, "_READLINE", object())
monkeypatch.setattr(commands, "_USING_LIBEDIT", True)
assert commands._prompt_text() == "\033[1;34mYou:\033[0m "
monkeypatch.setattr(commands, "_USING_LIBEDIT", False)
assert "\001" in commands._prompt_text()
def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None:
class FakeStdin:
def fileno(self) -> int:
return 0
monkeypatch.setattr(commands.sys, "stdin", FakeStdin())
monkeypatch.setattr(commands.os, "isatty", lambda _fd: False)
commands._flush_pending_tty_input()