fix(cli): stabilize wrapped CJK arrow navigation in interactive input

This commit is contained in:
张涔熙 2026-02-08 11:10:03 +08:00
parent 8b1ef77970
commit 342ba2b879
3 changed files with 282 additions and 15 deletions

View File

@ -1,10 +1,12 @@
"""CLI commands for nanobot."""
import asyncio
import atexit
import os
from pathlib import Path
import select
import sys
from typing import Any
import typer
from rich.console import Console
@ -19,6 +21,12 @@ app = typer.Typer(
)
console = Console()
_READLINE: Any | None = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_PROMPT_SESSION: Any | None = None
_PROMPT_SESSION_LABEL: Any = None
def _flush_pending_tty_input() -> None:
@ -49,10 +57,248 @@ def _flush_pending_tty_input() -> None:
return
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
def _enable_line_editing() -> None:
"""Best-effort enable readline/libedit line editing for arrow keys/history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT
global _PROMPT_SESSION, _PROMPT_SESSION_LABEL
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
# Preferred path: prompt_toolkit handles wrapped wide-char rendering better.
try:
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import ANSI
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings
key_bindings = KeyBindings()
@key_bindings.add("enter")
def _accept_input(event) -> None:
_clear_visual_nav_state(event.current_buffer)
event.current_buffer.validate_and_handle()
@key_bindings.add("up")
def _handle_up(event) -> None:
count = event.arg if event.arg and event.arg > 0 else 1
moved = _move_buffer_cursor_visual_from_render(
buffer=event.current_buffer,
event=event,
delta=-1,
count=count,
)
if not moved:
event.current_buffer.history_backward(count=count)
_clear_visual_nav_state(event.current_buffer)
@key_bindings.add("down")
def _handle_down(event) -> None:
count = event.arg if event.arg and event.arg > 0 else 1
moved = _move_buffer_cursor_visual_from_render(
buffer=event.current_buffer,
event=event,
delta=1,
count=count,
)
if not moved:
event.current_buffer.history_forward(count=count)
_clear_visual_nav_state(event.current_buffer)
_PROMPT_SESSION = PromptSession(
history=FileHistory(str(history_file)),
multiline=True,
wrap_lines=True,
complete_while_typing=False,
key_bindings=key_bindings,
)
_PROMPT_SESSION.default_buffer.on_text_changed += (
lambda _event: _clear_visual_nav_state(_PROMPT_SESSION.default_buffer)
)
_PROMPT_SESSION_LABEL = ANSI("\x1b[1;34mYou:\x1b[0m ")
_READLINE = None
_USING_LIBEDIT = False
return
except Exception:
_PROMPT_SESSION = None
_PROMPT_SESSION_LABEL = None
try:
import readline
except Exception:
return
_READLINE = readline
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
try:
if _USING_LIBEDIT:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
except Exception:
pass
try:
readline.read_history_file(str(history_file))
except Exception:
pass
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
def _prompt_text() -> str:
"""Build a readline-friendly colored prompt."""
if _READLINE is None:
return "You: "
# libedit on macOS does not honor GNU readline non-printing markers.
if _USING_LIBEDIT:
return "\033[1;34mYou:\033[0m "
return "\001\033[1;34m\002You:\001\033[0m\002 "
def _read_interactive_input() -> str:
"""Read user input with a stable prompt for terminal line editing."""
console.print("[bold blue]You:[/bold blue] ", end="")
return input()
"""Read user input with stable prompt rendering (sync fallback)."""
return input(_prompt_text())
async def _read_interactive_input_async() -> str:
"""Read user input safely inside the interactive asyncio loop."""
if _PROMPT_SESSION is not None:
try:
return await _PROMPT_SESSION.prompt_async(_PROMPT_SESSION_LABEL)
except EOFError as exc:
raise KeyboardInterrupt from exc
try:
return await asyncio.to_thread(_read_interactive_input)
except EOFError as exc:
raise KeyboardInterrupt from exc
def _choose_visual_rowcol(
rowcol_to_yx: dict[tuple[int, int], tuple[int, int]],
current_rowcol: tuple[int, int],
delta: int,
preferred_x: int | None = None,
) -> tuple[tuple[int, int] | None, int | None]:
"""Choose next logical row/col by rendered screen coordinates."""
if delta not in (-1, 1):
return None, preferred_x
current_yx = rowcol_to_yx.get(current_rowcol)
if current_yx is None:
same_row = [
(rowcol, yx)
for rowcol, yx in rowcol_to_yx.items()
if rowcol[0] == current_rowcol[0]
]
if not same_row:
return None, preferred_x
_, current_yx = min(same_row, key=lambda item: abs(item[0][1] - current_rowcol[1]))
target_x = current_yx[1] if preferred_x is None else preferred_x
target_y = current_yx[0] + delta
candidates = [(rowcol, yx) for rowcol, yx in rowcol_to_yx.items() if yx[0] == target_y]
if not candidates:
return None, preferred_x
best_rowcol, _ = min(
candidates,
key=lambda item: (abs(item[1][1] - target_x), item[1][1] < target_x, item[1][1]),
)
return best_rowcol, target_x
def _clear_visual_nav_state(buffer: Any) -> None:
"""Reset cached vertical-navigation anchor state."""
setattr(buffer, "_nanobot_visual_pref_x", None)
setattr(buffer, "_nanobot_visual_last_dir", None)
setattr(buffer, "_nanobot_visual_last_cursor", None)
setattr(buffer, "_nanobot_visual_last_text", None)
def _can_reuse_visual_anchor(buffer: Any, delta: int) -> bool:
"""Reuse anchor only for uninterrupted vertical navigation."""
return (
getattr(buffer, "_nanobot_visual_last_dir", None) == delta
and getattr(buffer, "_nanobot_visual_last_cursor", None) == buffer.cursor_position
and getattr(buffer, "_nanobot_visual_last_text", None) == buffer.text
)
def _remember_visual_anchor(buffer: Any, delta: int) -> None:
"""Remember current state as anchor baseline for repeated up/down."""
setattr(buffer, "_nanobot_visual_last_dir", delta)
setattr(buffer, "_nanobot_visual_last_cursor", buffer.cursor_position)
setattr(buffer, "_nanobot_visual_last_text", buffer.text)
def _move_buffer_cursor_visual_from_render(
buffer: Any,
event: Any,
delta: int,
count: int,
) -> bool:
"""Move cursor across rendered screen rows (soft-wrap/CJK aware)."""
try:
window = event.app.layout.current_window
render_info = getattr(window, "render_info", None)
rowcol_to_yx = getattr(render_info, "_rowcol_to_yx", None)
if not isinstance(rowcol_to_yx, dict) or not rowcol_to_yx:
return False
except Exception:
return False
moved_any = False
preferred_x = (
getattr(buffer, "_nanobot_visual_pref_x", None)
if _can_reuse_visual_anchor(buffer, delta)
else None
)
steps = max(1, count)
for _ in range(steps):
doc = buffer.document
current_rowcol = (doc.cursor_position_row, doc.cursor_position_col)
next_rowcol, preferred_x = _choose_visual_rowcol(
rowcol_to_yx=rowcol_to_yx,
current_rowcol=current_rowcol,
delta=delta,
preferred_x=preferred_x,
)
if next_rowcol is None:
break
try:
new_position = doc.translate_row_col_to_index(*next_rowcol)
except Exception:
break
if new_position == buffer.cursor_position:
break
buffer.cursor_position = new_position
moved_any = True
if moved_any:
setattr(buffer, "_nanobot_visual_pref_x", preferred_x)
_remember_visual_anchor(buffer, delta)
else:
_clear_visual_nav_state(buffer)
return moved_any
def version_callback(value: bool):
@ -350,13 +596,14 @@ def agent(
asyncio.run(run_once())
else:
# Interactive mode
_enable_line_editing()
console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n")
async def run_interactive():
while True:
try:
_flush_pending_tty_input()
user_input = _read_interactive_input()
user_input = await _read_interactive_input_async()
if not user_input.strip():
continue

View File

@ -31,6 +31,7 @@ dependencies = [
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
"prompt-toolkit>=3.0.47",
]
[project.optional-dependencies]

View File

@ -4,25 +4,45 @@ import nanobot.cli.commands as commands
def test_read_interactive_input_uses_plain_input(monkeypatch) -> None:
captured: dict[str, object] = {}
def fake_print(*args, **kwargs):
captured["printed"] = args
captured["print_kwargs"] = kwargs
captured: dict[str, str] = {}
def fake_input(prompt: str = "") -> str:
captured["prompt"] = prompt
return "hello"
monkeypatch.setattr(commands.console, "print", fake_print)
monkeypatch.setattr(builtins, "input", fake_input)
monkeypatch.setattr(commands, "_PROMPT_SESSION", None)
monkeypatch.setattr(commands, "_READLINE", None)
value = commands._read_interactive_input()
assert value == "hello"
assert captured["prompt"] == ""
assert captured["print_kwargs"] == {"end": ""}
assert captured["printed"] == ("[bold blue]You:[/bold blue] ",)
assert captured["prompt"] == "You: "
def test_read_interactive_input_prefers_prompt_session(monkeypatch) -> None:
captured: dict[str, object] = {}
class FakePromptSession:
async def prompt_async(self, label: object) -> str:
captured["label"] = label
return "hello"
monkeypatch.setattr(commands, "_PROMPT_SESSION", FakePromptSession())
monkeypatch.setattr(commands, "_PROMPT_SESSION_LABEL", "LBL")
value = __import__("asyncio").run(commands._read_interactive_input_async())
assert value == "hello"
assert captured["label"] == "LBL"
def test_prompt_text_for_readline_modes(monkeypatch) -> None:
monkeypatch.setattr(commands, "_READLINE", object())
monkeypatch.setattr(commands, "_USING_LIBEDIT", True)
assert commands._prompt_text() == "\033[1;34mYou:\033[0m "
monkeypatch.setattr(commands, "_USING_LIBEDIT", False)
assert "\001" in commands._prompt_text()
def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None:
@ -34,4 +54,3 @@ def test_flush_pending_tty_input_skips_non_tty(monkeypatch) -> None:
monkeypatch.setattr(commands.os, "isatty", lambda _fd: False)
commands._flush_pending_tty_input()