diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 5d198a5..c90ecde 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -6,7 +6,6 @@ import os from pathlib import Path import select import sys -from typing import Any import typer from rich.console import Console @@ -21,12 +20,15 @@ app = typer.Typer( ) console = Console() -_READLINE: Any | None = None + +# --------------------------------------------------------------------------- +# Lightweight CLI input: readline for arrow keys / history, termios for flush +# --------------------------------------------------------------------------- + +_READLINE = None _HISTORY_FILE: Path | None = None _HISTORY_HOOK_REGISTERED = False _USING_LIBEDIT = False -_PROMPT_SESSION: Any | None = None -_PROMPT_SESSION_LABEL: Any = None def _flush_pending_tty_input() -> None: @@ -40,7 +42,6 @@ def _flush_pending_tty_input() -> None: try: import termios - termios.tcflush(fd, termios.TCIFLUSH) return except Exception: @@ -67,75 +68,16 @@ def _save_history() -> None: def _enable_line_editing() -> None: - """Best-effort enable readline/libedit line editing for arrow keys/history.""" + """Enable readline for arrow keys, line editing, and persistent history.""" global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT - global _PROMPT_SESSION, _PROMPT_SESSION_LABEL history_file = Path.home() / ".nanobot" / "history" / "cli_history" history_file.parent.mkdir(parents=True, exist_ok=True) _HISTORY_FILE = history_file - # Preferred path: prompt_toolkit handles wrapped wide-char rendering better. - try: - from prompt_toolkit import PromptSession - from prompt_toolkit.formatted_text import ANSI - from prompt_toolkit.history import FileHistory - from prompt_toolkit.key_binding import KeyBindings - - key_bindings = KeyBindings() - - @key_bindings.add("enter") - def _accept_input(event) -> None: - _clear_visual_nav_state(event.current_buffer) - event.current_buffer.validate_and_handle() - - @key_bindings.add("up") - def _handle_up(event) -> None: - count = event.arg if event.arg and event.arg > 0 else 1 - moved = _move_buffer_cursor_visual_from_render( - buffer=event.current_buffer, - event=event, - delta=-1, - count=count, - ) - if not moved: - event.current_buffer.history_backward(count=count) - _clear_visual_nav_state(event.current_buffer) - - @key_bindings.add("down") - def _handle_down(event) -> None: - count = event.arg if event.arg and event.arg > 0 else 1 - moved = _move_buffer_cursor_visual_from_render( - buffer=event.current_buffer, - event=event, - delta=1, - count=count, - ) - if not moved: - event.current_buffer.history_forward(count=count) - _clear_visual_nav_state(event.current_buffer) - - _PROMPT_SESSION = PromptSession( - history=FileHistory(str(history_file)), - multiline=True, - wrap_lines=True, - complete_while_typing=False, - key_bindings=key_bindings, - ) - _PROMPT_SESSION.default_buffer.on_text_changed += ( - lambda _event: _clear_visual_nav_state(_PROMPT_SESSION.default_buffer) - ) - _PROMPT_SESSION_LABEL = ANSI("\x1b[1;34mYou:\x1b[0m ") - _READLINE = None - _USING_LIBEDIT = False - return - except Exception: - _PROMPT_SESSION = None - _PROMPT_SESSION_LABEL = None - try: import readline - except Exception: + except ImportError: return _READLINE = readline @@ -170,137 +112,14 @@ def _prompt_text() -> str: return "\001\033[1;34m\002You:\001\033[0m\002 " -def _read_interactive_input() -> str: - """Read user input with stable prompt rendering (sync fallback).""" - return input(_prompt_text()) - - async def _read_interactive_input_async() -> str: - """Read user input safely inside the interactive asyncio loop.""" - if _PROMPT_SESSION is not None: - try: - return await _PROMPT_SESSION.prompt_async(_PROMPT_SESSION_LABEL) - except EOFError as exc: - raise KeyboardInterrupt from exc + """Read user input with arrow keys and history (runs input() in a thread).""" try: - return await asyncio.to_thread(_read_interactive_input) + return await asyncio.to_thread(input, _prompt_text()) except EOFError as exc: raise KeyboardInterrupt from exc -def _choose_visual_rowcol( - rowcol_to_yx: dict[tuple[int, int], tuple[int, int]], - current_rowcol: tuple[int, int], - delta: int, - preferred_x: int | None = None, -) -> tuple[tuple[int, int] | None, int | None]: - """Choose next logical row/col by rendered screen coordinates.""" - if delta not in (-1, 1): - return None, preferred_x - - current_yx = rowcol_to_yx.get(current_rowcol) - if current_yx is None: - same_row = [ - (rowcol, yx) - for rowcol, yx in rowcol_to_yx.items() - if rowcol[0] == current_rowcol[0] - ] - if not same_row: - return None, preferred_x - _, current_yx = min(same_row, key=lambda item: abs(item[0][1] - current_rowcol[1])) - - target_x = current_yx[1] if preferred_x is None else preferred_x - target_y = current_yx[0] + delta - candidates = [(rowcol, yx) for rowcol, yx in rowcol_to_yx.items() if yx[0] == target_y] - if not candidates: - return None, preferred_x - - best_rowcol, _ = min( - candidates, - key=lambda item: (abs(item[1][1] - target_x), item[1][1] < target_x, item[1][1]), - ) - return best_rowcol, target_x - - -def _clear_visual_nav_state(buffer: Any) -> None: - """Reset cached vertical-navigation anchor state.""" - setattr(buffer, "_nanobot_visual_pref_x", None) - setattr(buffer, "_nanobot_visual_last_dir", None) - setattr(buffer, "_nanobot_visual_last_cursor", None) - setattr(buffer, "_nanobot_visual_last_text", None) - - -def _can_reuse_visual_anchor(buffer: Any, delta: int) -> bool: - """Reuse anchor only for uninterrupted vertical navigation.""" - return ( - getattr(buffer, "_nanobot_visual_last_dir", None) == delta - and getattr(buffer, "_nanobot_visual_last_cursor", None) == buffer.cursor_position - and getattr(buffer, "_nanobot_visual_last_text", None) == buffer.text - ) - - -def _remember_visual_anchor(buffer: Any, delta: int) -> None: - """Remember current state as anchor baseline for repeated up/down.""" - setattr(buffer, "_nanobot_visual_last_dir", delta) - setattr(buffer, "_nanobot_visual_last_cursor", buffer.cursor_position) - setattr(buffer, "_nanobot_visual_last_text", buffer.text) - - -def _move_buffer_cursor_visual_from_render( - buffer: Any, - event: Any, - delta: int, - count: int, -) -> bool: - """Move cursor across rendered screen rows (soft-wrap/CJK aware).""" - try: - window = event.app.layout.current_window - render_info = getattr(window, "render_info", None) - rowcol_to_yx = getattr(render_info, "_rowcol_to_yx", None) - if not isinstance(rowcol_to_yx, dict) or not rowcol_to_yx: - return False - except Exception: - return False - - moved_any = False - preferred_x = ( - getattr(buffer, "_nanobot_visual_pref_x", None) - if _can_reuse_visual_anchor(buffer, delta) - else None - ) - steps = max(1, count) - - for _ in range(steps): - doc = buffer.document - current_rowcol = (doc.cursor_position_row, doc.cursor_position_col) - next_rowcol, preferred_x = _choose_visual_rowcol( - rowcol_to_yx=rowcol_to_yx, - current_rowcol=current_rowcol, - delta=delta, - preferred_x=preferred_x, - ) - if next_rowcol is None: - break - - try: - new_position = doc.translate_row_col_to_index(*next_rowcol) - except Exception: - break - if new_position == buffer.cursor_position: - break - - buffer.cursor_position = new_position - moved_any = True - - if moved_any: - setattr(buffer, "_nanobot_visual_pref_x", preferred_x) - _remember_visual_anchor(buffer, delta) - else: - _clear_visual_nav_state(buffer) - - return moved_any - - def version_callback(value: bool): if value: console.print(f"{__logo__} nanobot v{__version__}") diff --git a/pyproject.toml b/pyproject.toml index 3669ee5..6fda084 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,6 @@ dependencies = [ "python-telegram-bot[socks]>=21.0", "lark-oapi>=1.0.0", "socksio>=1.0.0", - "prompt-toolkit>=3.0.47", ] [project.optional-dependencies]