refactor: improve feishu channel implementation

This commit is contained in:
Re-bin 2026-02-05 06:01:02 +00:00
parent 1e0f87b356
commit 50a4c4ca1a

View File

@ -3,6 +3,7 @@
import asyncio import asyncio
import json import json
import threading import threading
from collections import OrderedDict
from typing import Any from typing import Any
from loguru import logger from loguru import logger
@ -19,12 +20,22 @@ try:
CreateMessageRequestBody, CreateMessageRequestBody,
CreateMessageReactionRequest, CreateMessageReactionRequest,
CreateMessageReactionRequestBody, CreateMessageReactionRequestBody,
Emoji,
P2ImMessageReceiveV1, P2ImMessageReceiveV1,
) )
FEISHU_AVAILABLE = True FEISHU_AVAILABLE = True
except ImportError: except ImportError:
FEISHU_AVAILABLE = False FEISHU_AVAILABLE = False
lark = None lark = None
Emoji = None
# Message type display mapping
MSG_TYPE_MAP = {
"image": "[image]",
"audio": "[audio]",
"file": "[file]",
"sticker": "[sticker]",
}
class FeishuChannel(BaseChannel): class FeishuChannel(BaseChannel):
@ -47,7 +58,7 @@ class FeishuChannel(BaseChannel):
self._client: Any = None self._client: Any = None
self._ws_client: Any = None self._ws_client: Any = None
self._ws_thread: threading.Thread | None = None self._ws_thread: threading.Thread | None = None
self._processed_message_ids: set[str] = set() # Dedup message IDs self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache
self._loop: asyncio.AbstractEventLoop | None = None self._loop: asyncio.AbstractEventLoop | None = None
async def start(self) -> None: async def start(self) -> None:
@ -61,7 +72,7 @@ class FeishuChannel(BaseChannel):
return return
self._running = True self._running = True
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_running_loop()
# Create Lark client for sending messages # Create Lark client for sending messages
self._client = lark.Client.builder() \ self._client = lark.Client.builder() \
@ -106,21 +117,16 @@ class FeishuChannel(BaseChannel):
async def stop(self) -> None: async def stop(self) -> None:
"""Stop the Feishu bot.""" """Stop the Feishu bot."""
self._running = False self._running = False
if self._ws_client:
try:
self._ws_client.stop()
except Exception as e:
logger.warning(f"Error stopping WebSocket client: {e}")
logger.info("Feishu bot stopped") logger.info("Feishu bot stopped")
def _add_reaction(self, message_id: str, emoji_type: str = "SMILE") -> None: def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None:
""" """Sync helper for adding reaction (runs in thread pool)."""
Add a reaction emoji to a message.
Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART
"""
if not self._client:
logger.warning("Cannot add reaction: client not initialized")
return
try: try:
from lark_oapi.api.im.v1 import Emoji
request = CreateMessageReactionRequest.builder() \ request = CreateMessageReactionRequest.builder() \
.message_id(message_id) \ .message_id(message_id) \
.request_body( .request_body(
@ -134,10 +140,22 @@ class FeishuChannel(BaseChannel):
if not response.success(): if not response.success():
logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}") logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}")
else: else:
logger.info(f"Added {emoji_type} reaction to message {message_id}") logger.debug(f"Added {emoji_type} reaction to message {message_id}")
except Exception as e: except Exception as e:
logger.warning(f"Error adding reaction: {e}") logger.warning(f"Error adding reaction: {e}")
async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None:
"""
Add a reaction emoji to a message (non-blocking).
Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART
"""
if not self._client or not Emoji:
return
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._add_reaction_sync, message_id, emoji_type)
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Feishu.""" """Send a message through Feishu."""
if not self._client: if not self._client:
@ -183,23 +201,8 @@ class FeishuChannel(BaseChannel):
Sync handler for incoming messages (called from WebSocket thread). Sync handler for incoming messages (called from WebSocket thread).
Schedules async handling in the main event loop. Schedules async handling in the main event loop.
""" """
try:
if self._loop and self._loop.is_running(): if self._loop and self._loop.is_running():
# Schedule the async handler in the main event loop asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop)
asyncio.run_coroutine_threadsafe(
self._on_message(data),
self._loop
)
else:
# Fallback: run in new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self._on_message(data))
finally:
loop.close()
except Exception as e:
logger.error(f"Error handling Feishu message: {e}")
async def _on_message(self, data: "P2ImMessageReceiveV1") -> None: async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
"""Handle incoming message from Feishu.""" """Handle incoming message from Feishu."""
@ -208,63 +211,43 @@ class FeishuChannel(BaseChannel):
message = event.message message = event.message
sender = event.sender sender = event.sender
# Get message ID for deduplication # Deduplication check
message_id = message.message_id message_id = message.message_id
if message_id in self._processed_message_ids: if message_id in self._processed_message_ids:
logger.debug(f"Skipping duplicate message: {message_id}")
return return
self._processed_message_ids.add(message_id) self._processed_message_ids[message_id] = None
# Limit dedup cache size # Trim cache: keep most recent 500 when exceeds 1000
if len(self._processed_message_ids) > 1000: while len(self._processed_message_ids) > 1000:
self._processed_message_ids = set(list(self._processed_message_ids)[-500:]) self._processed_message_ids.popitem(last=False)
# Extract sender info
sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
sender_type = sender.sender_type # "user" or "bot"
# Skip bot messages # Skip bot messages
sender_type = sender.sender_type
if sender_type == "bot": if sender_type == "bot":
return return
# Add reaction to user's message to indicate "seen" (👍 THUMBSUP) sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
self._add_reaction(message_id, "THUMBSUP")
# Get chat_id for replies
chat_id = message.chat_id chat_id = message.chat_id
chat_type = message.chat_type # "p2p" or "group" chat_type = message.chat_type # "p2p" or "group"
# Parse message content
content = ""
msg_type = message.message_type msg_type = message.message_type
# Add reaction to indicate "seen"
await self._add_reaction(message_id, "THUMBSUP")
# Parse message content
if msg_type == "text": if msg_type == "text":
# Text message: {"text": "hello"}
try: try:
content_obj = json.loads(message.content) content = json.loads(message.content).get("text", "")
content = content_obj.get("text", "")
except json.JSONDecodeError: except json.JSONDecodeError:
content = message.content or "" content = message.content or ""
elif msg_type == "image":
content = "[image]"
elif msg_type == "audio":
content = "[audio]"
elif msg_type == "file":
content = "[file]"
elif msg_type == "sticker":
content = "[sticker]"
else: else:
content = f"[{msg_type}]" content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")
if not content: if not content:
return return
logger.debug(f"Feishu message from {sender_id} in {chat_id}: {content[:50]}...")
# Forward to message bus # Forward to message bus
# Use chat_id for group chats, sender's open_id for p2p
reply_to = chat_id if chat_type == "group" else sender_id reply_to = chat_id if chat_type == "group" else sender_id
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=reply_to, chat_id=reply_to,
@ -273,7 +256,6 @@ class FeishuChannel(BaseChannel):
"message_id": message_id, "message_id": message_id,
"chat_type": chat_type, "chat_type": chat_type,
"msg_type": msg_type, "msg_type": msg_type,
"sender_type": sender_type,
} }
) )