diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 4326cf0..01b808e 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -3,6 +3,7 @@ import asyncio import json import threading +from collections import OrderedDict from typing import Any from loguru import logger @@ -19,12 +20,22 @@ try: CreateMessageRequestBody, CreateMessageReactionRequest, CreateMessageReactionRequestBody, + Emoji, P2ImMessageReceiveV1, ) FEISHU_AVAILABLE = True except ImportError: FEISHU_AVAILABLE = False lark = None + Emoji = None + +# Message type display mapping +MSG_TYPE_MAP = { + "image": "[image]", + "audio": "[audio]", + "file": "[file]", + "sticker": "[sticker]", +} class FeishuChannel(BaseChannel): @@ -47,7 +58,7 @@ class FeishuChannel(BaseChannel): self._client: Any = None self._ws_client: Any = None self._ws_thread: threading.Thread | None = None - self._processed_message_ids: set[str] = set() # Dedup message IDs + self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache self._loop: asyncio.AbstractEventLoop | None = None async def start(self) -> None: @@ -61,7 +72,7 @@ class FeishuChannel(BaseChannel): return self._running = True - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() # Create Lark client for sending messages self._client = lark.Client.builder() \ @@ -106,21 +117,16 @@ class FeishuChannel(BaseChannel): async def stop(self) -> None: """Stop the Feishu bot.""" self._running = False + if self._ws_client: + try: + self._ws_client.stop() + except Exception as e: + logger.warning(f"Error stopping WebSocket client: {e}") logger.info("Feishu bot stopped") - def _add_reaction(self, message_id: str, emoji_type: str = "SMILE") -> None: - """ - Add a reaction emoji to a message. - - Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART - """ - if not self._client: - logger.warning("Cannot add reaction: client not initialized") - return - + def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None: + """Sync helper for adding reaction (runs in thread pool).""" try: - from lark_oapi.api.im.v1 import Emoji - request = CreateMessageReactionRequest.builder() \ .message_id(message_id) \ .request_body( @@ -134,9 +140,21 @@ class FeishuChannel(BaseChannel): if not response.success(): logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}") else: - logger.info(f"Added {emoji_type} reaction to message {message_id}") + logger.debug(f"Added {emoji_type} reaction to message {message_id}") except Exception as e: logger.warning(f"Error adding reaction: {e}") + + async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None: + """ + Add a reaction emoji to a message (non-blocking). + + Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART + """ + if not self._client or not Emoji: + return + + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._add_reaction_sync, message_id, emoji_type) async def send(self, msg: OutboundMessage) -> None: """Send a message through Feishu.""" @@ -183,23 +201,8 @@ class FeishuChannel(BaseChannel): Sync handler for incoming messages (called from WebSocket thread). Schedules async handling in the main event loop. """ - try: - if self._loop and self._loop.is_running(): - # Schedule the async handler in the main event loop - asyncio.run_coroutine_threadsafe( - self._on_message(data), - self._loop - ) - else: - # Fallback: run in new event loop - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self._on_message(data)) - finally: - loop.close() - except Exception as e: - logger.error(f"Error handling Feishu message: {e}") + if self._loop and self._loop.is_running(): + asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop) async def _on_message(self, data: "P2ImMessageReceiveV1") -> None: """Handle incoming message from Feishu.""" @@ -208,63 +211,43 @@ class FeishuChannel(BaseChannel): message = event.message sender = event.sender - # Get message ID for deduplication + # Deduplication check message_id = message.message_id if message_id in self._processed_message_ids: - logger.debug(f"Skipping duplicate message: {message_id}") return - self._processed_message_ids.add(message_id) + self._processed_message_ids[message_id] = None - # Limit dedup cache size - if len(self._processed_message_ids) > 1000: - self._processed_message_ids = set(list(self._processed_message_ids)[-500:]) - - # Extract sender info - sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" - sender_type = sender.sender_type # "user" or "bot" + # Trim cache: keep most recent 500 when exceeds 1000 + while len(self._processed_message_ids) > 1000: + self._processed_message_ids.popitem(last=False) # Skip bot messages + sender_type = sender.sender_type if sender_type == "bot": return - # Add reaction to user's message to indicate "seen" (👍 THUMBSUP) - self._add_reaction(message_id, "THUMBSUP") - - # Get chat_id for replies + sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" chat_id = message.chat_id chat_type = message.chat_type # "p2p" or "group" - - # Parse message content - content = "" msg_type = message.message_type + # Add reaction to indicate "seen" + await self._add_reaction(message_id, "THUMBSUP") + + # Parse message content if msg_type == "text": - # Text message: {"text": "hello"} try: - content_obj = json.loads(message.content) - content = content_obj.get("text", "") + content = json.loads(message.content).get("text", "") except json.JSONDecodeError: content = message.content or "" - elif msg_type == "image": - content = "[image]" - elif msg_type == "audio": - content = "[audio]" - elif msg_type == "file": - content = "[file]" - elif msg_type == "sticker": - content = "[sticker]" else: - content = f"[{msg_type}]" + content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]") if not content: return - logger.debug(f"Feishu message from {sender_id} in {chat_id}: {content[:50]}...") - # Forward to message bus - # Use chat_id for group chats, sender's open_id for p2p reply_to = chat_id if chat_type == "group" else sender_id - await self._handle_message( sender_id=sender_id, chat_id=reply_to, @@ -273,7 +256,6 @@ class FeishuChannel(BaseChannel): "message_id": message_id, "chat_type": chat_type, "msg_type": msg_type, - "sender_type": sender_type, } )