diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index e70fd32..bd7a408 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1,10 +1,12 @@ """CLI commands for nanobot.""" import asyncio +import atexit import os from pathlib import Path import select import sys +from typing import Any import typer from rich.console import Console @@ -19,6 +21,12 @@ app = typer.Typer( ) console = Console() +_READLINE: Any | None = None +_HISTORY_FILE: Path | None = None +_HISTORY_HOOK_REGISTERED = False +_USING_LIBEDIT = False +_PROMPT_SESSION: Any | None = None +_PROMPT_SESSION_LABEL: Any = None def _flush_pending_tty_input() -> None: @@ -49,10 +57,248 @@ def _flush_pending_tty_input() -> None: return +def _save_history() -> None: + if _READLINE is None or _HISTORY_FILE is None: + return + try: + _READLINE.write_history_file(str(_HISTORY_FILE)) + except Exception: + return + + +def _enable_line_editing() -> None: + """Best-effort enable readline/libedit line editing for arrow keys/history.""" + global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT + global _PROMPT_SESSION, _PROMPT_SESSION_LABEL + + history_file = Path.home() / ".nanobot" / "history" / "cli_history" + history_file.parent.mkdir(parents=True, exist_ok=True) + _HISTORY_FILE = history_file + + # Preferred path: prompt_toolkit handles wrapped wide-char rendering better. + try: + from prompt_toolkit import PromptSession + from prompt_toolkit.formatted_text import ANSI + from prompt_toolkit.history import FileHistory + from prompt_toolkit.key_binding import KeyBindings + + key_bindings = KeyBindings() + + @key_bindings.add("enter") + def _accept_input(event) -> None: + _clear_visual_nav_state(event.current_buffer) + event.current_buffer.validate_and_handle() + + @key_bindings.add("up") + def _handle_up(event) -> None: + count = event.arg if event.arg and event.arg > 0 else 1 + moved = _move_buffer_cursor_visual_from_render( + buffer=event.current_buffer, + event=event, + delta=-1, + count=count, + ) + if not moved: + event.current_buffer.history_backward(count=count) + _clear_visual_nav_state(event.current_buffer) + + @key_bindings.add("down") + def _handle_down(event) -> None: + count = event.arg if event.arg and event.arg > 0 else 1 + moved = _move_buffer_cursor_visual_from_render( + buffer=event.current_buffer, + event=event, + delta=1, + count=count, + ) + if not moved: + event.current_buffer.history_forward(count=count) + _clear_visual_nav_state(event.current_buffer) + + _PROMPT_SESSION = PromptSession( + history=FileHistory(str(history_file)), + multiline=True, + wrap_lines=True, + complete_while_typing=False, + key_bindings=key_bindings, + ) + _PROMPT_SESSION.default_buffer.on_text_changed += ( + lambda _event: _clear_visual_nav_state(_PROMPT_SESSION.default_buffer) + ) + _PROMPT_SESSION_LABEL = ANSI("\x1b[1;34mYou:\x1b[0m ") + _READLINE = None + _USING_LIBEDIT = False + return + except Exception: + _PROMPT_SESSION = None + _PROMPT_SESSION_LABEL = None + + try: + import readline + except Exception: + return + + _READLINE = readline + _USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower() + + try: + if _USING_LIBEDIT: + readline.parse_and_bind("bind ^I rl_complete") + else: + readline.parse_and_bind("tab: complete") + readline.parse_and_bind("set editing-mode emacs") + except Exception: + pass + + try: + readline.read_history_file(str(history_file)) + except Exception: + pass + + if not _HISTORY_HOOK_REGISTERED: + atexit.register(_save_history) + _HISTORY_HOOK_REGISTERED = True + + +def _prompt_text() -> str: + """Build a readline-friendly colored prompt.""" + if _READLINE is None: + return "You: " + # libedit on macOS does not honor GNU readline non-printing markers. + if _USING_LIBEDIT: + return "\033[1;34mYou:\033[0m " + return "\001\033[1;34m\002You:\001\033[0m\002 " + + def _read_interactive_input() -> str: - """Read user input with a stable prompt for terminal line editing.""" - console.print("[bold blue]You:[/bold blue] ", end="") - return input() + """Read user input with stable prompt rendering (sync fallback).""" + return input(_prompt_text()) + + +async def _read_interactive_input_async() -> str: + """Read user input safely inside the interactive asyncio loop.""" + if _PROMPT_SESSION is not None: + try: + return await _PROMPT_SESSION.prompt_async(_PROMPT_SESSION_LABEL) + except EOFError as exc: + raise KeyboardInterrupt from exc + try: + return await asyncio.to_thread(_read_interactive_input) + except EOFError as exc: + raise KeyboardInterrupt from exc + + +def _choose_visual_rowcol( + rowcol_to_yx: dict[tuple[int, int], tuple[int, int]], + current_rowcol: tuple[int, int], + delta: int, + preferred_x: int | None = None, +) -> tuple[tuple[int, int] | None, int | None]: + """Choose next logical row/col by rendered screen coordinates.""" + if delta not in (-1, 1): + return None, preferred_x + + current_yx = rowcol_to_yx.get(current_rowcol) + if current_yx is None: + same_row = [ + (rowcol, yx) + for rowcol, yx in rowcol_to_yx.items() + if rowcol[0] == current_rowcol[0] + ] + if not same_row: + return None, preferred_x + _, current_yx = min(same_row, key=lambda item: abs(item[0][1] - current_rowcol[1])) + + target_x = current_yx[1] if preferred_x is None else preferred_x + target_y = current_yx[0] + delta + candidates = [(rowcol, yx) for rowcol, yx in rowcol_to_yx.items() if yx[0] == target_y] + if not candidates: + return None, preferred_x + + best_rowcol, _ = min( + candidates, + key=lambda item: (abs(item[1][1] - target_x), item[1][1] < target_x, item[1][1]), + ) + return best_rowcol, target_x + + +def _clear_visual_nav_state(buffer: Any) -> None: + """Reset cached vertical-navigation anchor state.""" + setattr(buffer, "_nanobot_visual_pref_x", None) + setattr(buffer, "_nanobot_visual_last_dir", None) + setattr(buffer, "_nanobot_visual_last_cursor", None) + setattr(buffer, "_nanobot_visual_last_text", None) + + +def _can_reuse_visual_anchor(buffer: Any, delta: int) -> bool: + """Reuse anchor only for uninterrupted vertical navigation.""" + return ( + getattr(buffer, "_nanobot_visual_last_dir", None) == delta + and getattr(buffer, "_nanobot_visual_last_cursor", None) == buffer.cursor_position + and getattr(buffer, "_nanobot_visual_last_text", None) == buffer.text + ) + + +def _remember_visual_anchor(buffer: Any, delta: int) -> None: + """Remember current state as anchor baseline for repeated up/down.""" + setattr(buffer, "_nanobot_visual_last_dir", delta) + setattr(buffer, "_nanobot_visual_last_cursor", buffer.cursor_position) + setattr(buffer, "_nanobot_visual_last_text", buffer.text) + + +def _move_buffer_cursor_visual_from_render( + buffer: Any, + event: Any, + delta: int, + count: int, +) -> bool: + """Move cursor across rendered screen rows (soft-wrap/CJK aware).""" + try: + window = event.app.layout.current_window + render_info = getattr(window, "render_info", None) + rowcol_to_yx = getattr(render_info, "_rowcol_to_yx", None) + if not isinstance(rowcol_to_yx, dict) or not rowcol_to_yx: + return False + except Exception: + return False + + moved_any = False + preferred_x = ( + getattr(buffer, "_nanobot_visual_pref_x", None) + if _can_reuse_visual_anchor(buffer, delta) + else None + ) + steps = max(1, count) + + for _ in range(steps): + doc = buffer.document + current_rowcol = (doc.cursor_position_row, doc.cursor_position_col) + next_rowcol, preferred_x = _choose_visual_rowcol( + rowcol_to_yx=rowcol_to_yx, + current_rowcol=current_rowcol, + delta=delta, + preferred_x=preferred_x, + ) + if next_rowcol is None: + break + + try: + new_position = doc.translate_row_col_to_index(*next_rowcol) + except Exception: + break + if new_position == buffer.cursor_position: + break + + buffer.cursor_position = new_position + moved_any = True + + if moved_any: + setattr(buffer, "_nanobot_visual_pref_x", preferred_x) + _remember_visual_anchor(buffer, delta) + else: + _clear_visual_nav_state(buffer) + + return moved_any def version_callback(value: bool): @@ -350,13 +596,14 @@ def agent( asyncio.run(run_once()) else: # Interactive mode + _enable_line_editing() console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n") async def run_interactive(): while True: try: _flush_pending_tty_input() - user_input = _read_interactive_input() + user_input = await _read_interactive_input_async() if not user_input.strip(): continue diff --git a/pyproject.toml b/pyproject.toml index 4093474..b1bc3de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "python-telegram-bot[socks]>=21.0", "lark-oapi>=1.0.0", "socksio>=1.0.0", + "prompt-toolkit>=3.0.47", ] [project.optional-dependencies] diff --git a/tests/test_cli_input_minimal.py b/tests/test_cli_input_minimal.py index 49d9d4f..4726ea3 100644 --- a/tests/test_cli_input_minimal.py +++ b/tests/test_cli_input_minimal.py @@ -4,25 +4,45 @@ import nanobot.cli.commands as commands def test_read_interactive_input_uses_plain_input(monkeypatch) -> None: - captured: dict[str, object] = {} - - def fake_print(*args, **kwargs): - captured["printed"] = args - captured["print_kwargs"] = kwargs - + captured: dict[str, str] = {} def fake_input(prompt: str = "") -> str: captured["prompt"] = prompt return "hello" - monkeypatch.setattr(commands.console, "print", fake_print) monkeypatch.setattr(builtins, "input", fake_input) + monkeypatch.setattr(commands, "_PROMPT_SESSION", None) + monkeypatch.setattr(commands, "_READLINE", None) value = commands._read_interactive_input() assert value == "hello" - assert captured["prompt"] == "" - assert captured["print_kwargs"] == {"end": ""} - assert captured["printed"] == ("[bold blue]You:[/bold blue] ",) + assert captured["prompt"] == "You: " + + +def test_read_interactive_input_prefers_prompt_session(monkeypatch) -> None: + captured: dict[str, object] = {} + + class FakePromptSession: + async def prompt_async(self, label: object) -> str: + captured["label"] = label + return "hello" + + monkeypatch.setattr(commands, "_PROMPT_SESSION", FakePromptSession()) + monkeypatch.setattr(commands, "_PROMPT_SESSION_LABEL", "LBL") + + value = __import__("asyncio").run(commands._read_interactive_input_async()) + + assert value == "hello" + assert captured["label"] == "LBL" + + +def test_prompt_text_for_readline_modes(monkeypatch) -> None: + monkeypatch.setattr(commands, "_READLINE", object()) + monkeypatch.setattr(commands, "_USING_LIBEDIT", True) + assert commands._prompt_text() == "\033[1;34mYou:\033[0m " + + monkeypatch.setattr(commands, "_USING_LIBEDIT", False) + assert "\001" in commands._prompt_text() def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None: @@ -34,4 +54,3 @@ def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None: monkeypatch.setattr(commands.os, "isatty", lambda _fd: False) commands._flush_pending_tty_input() -