refactor(cli): simplify input handling — drop prompt-toolkit, use readline

This commit is contained in:
Re-bin 2026-02-08 18:23:43 +00:00
parent 5a20f3681d
commit dfa173323c
2 changed files with 10 additions and 192 deletions

View File

@ -6,7 +6,6 @@ import os
from pathlib import Path
import select
import sys
from typing import Any
import typer
from rich.console import Console
@ -21,12 +20,15 @@ app = typer.Typer(
)
console = Console()
_READLINE: Any | None = None
# ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush
# ---------------------------------------------------------------------------
_READLINE = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_PROMPT_SESSION: Any | None = None
_PROMPT_SESSION_LABEL: Any = None
def _flush_pending_tty_input() -> None:
@ -40,7 +42,6 @@ def _flush_pending_tty_input() -> None:
try:
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
@ -67,75 +68,16 @@ def _save_history() -> None:
def _enable_line_editing() -> None:
"""Best-effort enable readline/libedit line editing for arrow keys/history."""
"""Enable readline for arrow keys, line editing, and persistent history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT
global _PROMPT_SESSION, _PROMPT_SESSION_LABEL
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
# Preferred path: prompt_toolkit handles wrapped wide-char rendering better.
try:
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import ANSI
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings
key_bindings = KeyBindings()
@key_bindings.add("enter")
def _accept_input(event) -> None:
_clear_visual_nav_state(event.current_buffer)
event.current_buffer.validate_and_handle()
@key_bindings.add("up")
def _handle_up(event) -> None:
count = event.arg if event.arg and event.arg > 0 else 1
moved = _move_buffer_cursor_visual_from_render(
buffer=event.current_buffer,
event=event,
delta=-1,
count=count,
)
if not moved:
event.current_buffer.history_backward(count=count)
_clear_visual_nav_state(event.current_buffer)
@key_bindings.add("down")
def _handle_down(event) -> None:
count = event.arg if event.arg and event.arg > 0 else 1
moved = _move_buffer_cursor_visual_from_render(
buffer=event.current_buffer,
event=event,
delta=1,
count=count,
)
if not moved:
event.current_buffer.history_forward(count=count)
_clear_visual_nav_state(event.current_buffer)
_PROMPT_SESSION = PromptSession(
history=FileHistory(str(history_file)),
multiline=True,
wrap_lines=True,
complete_while_typing=False,
key_bindings=key_bindings,
)
_PROMPT_SESSION.default_buffer.on_text_changed += (
lambda _event: _clear_visual_nav_state(_PROMPT_SESSION.default_buffer)
)
_PROMPT_SESSION_LABEL = ANSI("\x1b[1;34mYou:\x1b[0m ")
_READLINE = None
_USING_LIBEDIT = False
return
except Exception:
_PROMPT_SESSION = None
_PROMPT_SESSION_LABEL = None
try:
import readline
except Exception:
except ImportError:
return
_READLINE = readline
@ -170,137 +112,14 @@ def _prompt_text() -> str:
return "\001\033[1;34m\002You:\001\033[0m\002 "
def _read_interactive_input() -> str:
"""Read user input with stable prompt rendering (sync fallback)."""
return input(_prompt_text())
async def _read_interactive_input_async() -> str:
"""Read user input safely inside the interactive asyncio loop."""
if _PROMPT_SESSION is not None:
try:
return await _PROMPT_SESSION.prompt_async(_PROMPT_SESSION_LABEL)
except EOFError as exc:
raise KeyboardInterrupt from exc
"""Read user input with arrow keys and history (runs input() in a thread)."""
try:
return await asyncio.to_thread(_read_interactive_input)
return await asyncio.to_thread(input, _prompt_text())
except EOFError as exc:
raise KeyboardInterrupt from exc
def _choose_visual_rowcol(
rowcol_to_yx: dict[tuple[int, int], tuple[int, int]],
current_rowcol: tuple[int, int],
delta: int,
preferred_x: int | None = None,
) -> tuple[tuple[int, int] | None, int | None]:
"""Choose next logical row/col by rendered screen coordinates."""
if delta not in (-1, 1):
return None, preferred_x
current_yx = rowcol_to_yx.get(current_rowcol)
if current_yx is None:
same_row = [
(rowcol, yx)
for rowcol, yx in rowcol_to_yx.items()
if rowcol[0] == current_rowcol[0]
]
if not same_row:
return None, preferred_x
_, current_yx = min(same_row, key=lambda item: abs(item[0][1] - current_rowcol[1]))
target_x = current_yx[1] if preferred_x is None else preferred_x
target_y = current_yx[0] + delta
candidates = [(rowcol, yx) for rowcol, yx in rowcol_to_yx.items() if yx[0] == target_y]
if not candidates:
return None, preferred_x
best_rowcol, _ = min(
candidates,
key=lambda item: (abs(item[1][1] - target_x), item[1][1] < target_x, item[1][1]),
)
return best_rowcol, target_x
def _clear_visual_nav_state(buffer: Any) -> None:
"""Reset cached vertical-navigation anchor state."""
setattr(buffer, "_nanobot_visual_pref_x", None)
setattr(buffer, "_nanobot_visual_last_dir", None)
setattr(buffer, "_nanobot_visual_last_cursor", None)
setattr(buffer, "_nanobot_visual_last_text", None)
def _can_reuse_visual_anchor(buffer: Any, delta: int) -> bool:
"""Reuse anchor only for uninterrupted vertical navigation."""
return (
getattr(buffer, "_nanobot_visual_last_dir", None) == delta
and getattr(buffer, "_nanobot_visual_last_cursor", None) == buffer.cursor_position
and getattr(buffer, "_nanobot_visual_last_text", None) == buffer.text
)
def _remember_visual_anchor(buffer: Any, delta: int) -> None:
"""Remember current state as anchor baseline for repeated up/down."""
setattr(buffer, "_nanobot_visual_last_dir", delta)
setattr(buffer, "_nanobot_visual_last_cursor", buffer.cursor_position)
setattr(buffer, "_nanobot_visual_last_text", buffer.text)
def _move_buffer_cursor_visual_from_render(
buffer: Any,
event: Any,
delta: int,
count: int,
) -> bool:
"""Move cursor across rendered screen rows (soft-wrap/CJK aware)."""
try:
window = event.app.layout.current_window
render_info = getattr(window, "render_info", None)
rowcol_to_yx = getattr(render_info, "_rowcol_to_yx", None)
if not isinstance(rowcol_to_yx, dict) or not rowcol_to_yx:
return False
except Exception:
return False
moved_any = False
preferred_x = (
getattr(buffer, "_nanobot_visual_pref_x", None)
if _can_reuse_visual_anchor(buffer, delta)
else None
)
steps = max(1, count)
for _ in range(steps):
doc = buffer.document
current_rowcol = (doc.cursor_position_row, doc.cursor_position_col)
next_rowcol, preferred_x = _choose_visual_rowcol(
rowcol_to_yx=rowcol_to_yx,
current_rowcol=current_rowcol,
delta=delta,
preferred_x=preferred_x,
)
if next_rowcol is None:
break
try:
new_position = doc.translate_row_col_to_index(*next_rowcol)
except Exception:
break
if new_position == buffer.cursor_position:
break
buffer.cursor_position = new_position
moved_any = True
if moved_any:
setattr(buffer, "_nanobot_visual_pref_x", preferred_x)
_remember_visual_anchor(buffer, delta)
else:
_clear_visual_nav_state(buffer)
return moved_any
def version_callback(value: bool):
if value:
console.print(f"{__logo__} nanobot v{__version__}")

View File

@ -32,7 +32,6 @@ dependencies = [
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
"prompt-toolkit>=3.0.47",
]
[project.optional-dependencies]