feat: add Telegram typing indicator

This commit is contained in:
Re-bin 2026-02-08 05:44:06 +00:00
parent f7f812a177
commit 00185f2bee
2 changed files with 38 additions and 1 deletions

1
.gitignore vendored
View File

@ -17,3 +17,4 @@ docs/
__pycache__/
poetry.lock
.pytest_cache/
tests/

View File

@ -111,6 +111,7 @@ class TelegramChannel(BaseChannel):
self.session_manager = session_manager
self._app: Application | None = None
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
async def start(self) -> None:
"""Start the Telegram bot with long polling."""
@ -170,6 +171,10 @@ class TelegramChannel(BaseChannel):
"""Stop the Telegram bot."""
self._running = False
# Cancel all typing indicators
for chat_id in list(self._typing_tasks):
self._stop_typing(chat_id)
if self._app:
logger.info("Stopping Telegram bot...")
await self._app.updater.stop()
@ -183,6 +188,9 @@ class TelegramChannel(BaseChannel):
logger.warning("Telegram bot not running")
return
# Stop typing indicator for this chat
self._stop_typing(msg.chat_id)
try:
# chat_id should be the Telegram chat ID (integer)
chat_id = int(msg.chat_id)
@ -335,10 +343,15 @@ class TelegramChannel(BaseChannel):
logger.debug(f"Telegram message from {sender_id}: {content[:50]}...")
str_chat_id = str(chat_id)
# Start typing indicator before processing
self._start_typing(str_chat_id)
# Forward to the message bus
await self._handle_message(
sender_id=sender_id,
chat_id=str(chat_id),
chat_id=str_chat_id,
content=content,
media=media_paths,
metadata={
@ -350,6 +363,29 @@ class TelegramChannel(BaseChannel):
}
)
def _start_typing(self, chat_id: str) -> None:
"""Start sending 'typing...' indicator for a chat."""
# Cancel any existing typing task for this chat
self._stop_typing(chat_id)
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
def _stop_typing(self, chat_id: str) -> None:
"""Stop the typing indicator for a chat."""
task = self._typing_tasks.pop(chat_id, None)
if task and not task.done():
task.cancel()
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled."""
try:
while self._app:
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
def _get_extension(self, media_type: str, mime_type: str | None) -> str:
"""Get file extension based on media type."""
if mime_type: