feat(discord): implement typing indicator functionality

- Add methods to manage typing indicators in Discord channels.
- Introduce periodic typing notifications while sending messages.
- Ensure proper cleanup of typing tasks on channel closure.
This commit is contained in:
Anunay Aatipamula 2026-02-02 19:01:46 +05:30
parent 226cb5b46b
commit bab464df5f

View File

@ -38,6 +38,7 @@ class DiscordChannel(BaseChannel):
self._seq: int | None = None
self._session_id: str | None = None
self._heartbeat_task: asyncio.Task | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
self._http: httpx.AsyncClient | None = None
self._max_attachment_bytes = DEFAULT_MAX_ATTACHMENT_BYTES
@ -70,6 +71,9 @@ class DiscordChannel(BaseChannel):
if self._heartbeat_task:
self._heartbeat_task.cancel()
self._heartbeat_task = None
for task in self._typing_tasks.values():
task.cancel()
self._typing_tasks.clear()
if self._ws:
await self._ws.close()
self._ws = None
@ -92,22 +96,25 @@ class DiscordChannel(BaseChannel):
headers = {"Authorization": f"Bot {self.config.token}"}
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning(f"Discord rate limited, retrying in {retry_after}s")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return
except Exception as e:
if attempt == 2:
logger.error(f"Error sending Discord message: {e}")
else:
await asyncio.sleep(1)
try:
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning(f"Discord rate limited, retrying in {retry_after}s")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return
except Exception as e:
if attempt == 2:
logger.error(f"Error sending Discord message: {e}")
else:
await asyncio.sleep(1)
finally:
await self._stop_typing(msg.chat_id)
async def _gateway_loop(self) -> None:
"""Main gateway loop: identify, heartbeat, dispatch events."""
@ -232,6 +239,8 @@ class DiscordChannel(BaseChannel):
referenced = payload.get("referenced_message") or {}
reply_to_id = referenced.get("id")
await self._start_typing(channel_id)
await self._handle_message(
sender_id=sender_id,
chat_id=channel_id,
@ -250,3 +259,31 @@ class DiscordChannel(BaseChannel):
"reply_to": reply_to_id,
},
)
async def _send_typing(self, channel_id: str) -> None:
"""Send a typing indicator to Discord."""
if not self._http:
return
url = f"{DISCORD_API_BASE}/channels/{channel_id}/typing"
headers = {"Authorization": f"Bot {self.config.token}"}
try:
await self._http.post(url, headers=headers)
except Exception as e:
logger.debug(f"Discord typing indicator failed: {e}")
async def _start_typing(self, channel_id: str) -> None:
"""Start periodic typing indicator for a channel."""
await self._stop_typing(channel_id)
async def typing_loop() -> None:
while self._running:
await self._send_typing(channel_id)
await asyncio.sleep(8)
self._typing_tasks[channel_id] = asyncio.create_task(typing_loop())
async def _stop_typing(self, channel_id: str) -> None:
"""Stop typing indicator for a channel."""
task = self._typing_tasks.pop(channel_id, None)
if task:
task.cancel()