From b6ec6a8a7686b8d3239bd9f363fa55490f9f9217 Mon Sep 17 00:00:00 2001 From: Re-bin Date: Sun, 8 Feb 2026 18:06:07 +0000 Subject: [PATCH] fix(dingtalk): security and resource fixes for DingTalk channel --- README.md | 10 +- nanobot/channels/dingtalk.py | 195 +++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 8c5c387..326f253 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. -📏 Real-time line count: **3,423 lines** (run `bash core_agent_lines.sh` to verify anytime) +📏 Real-time line count: **3,429 lines** (run `bash core_agent_lines.sh` to verify anytime) ## 📢 News @@ -293,10 +293,6 @@ nanobot gateway Uses **WebSocket** long connection — no public IP required. -```bash -pip install nanobot-ai[feishu] -``` - **1. Create a Feishu bot** - Visit [Feishu Open Platform](https://open.feishu.cn/app) - Create a new app → Enable **Bot** capability @@ -342,10 +338,6 @@ nanobot gateway Uses **Stream Mode** — no public IP required. -```bash -pip install nanobot-ai[dingtalk] -``` - **1. Create a DingTalk bot** - Visit [DingTalk Open Platform](https://open-dev.dingtalk.com/) - Create a new app -> Add **Robot** capability diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index 897e5be..72d3afd 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -2,30 +2,35 @@ import asyncio import json -import threading import time from typing import Any from loguru import logger import httpx -from nanobot.bus.events import OutboundMessage, InboundMessage +from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel from nanobot.config.schema import DingTalkConfig try: from dingtalk_stream import ( - DingTalkStreamClient, + DingTalkStreamClient, Credential, CallbackHandler, CallbackMessage, - AckMessage + AckMessage, ) from dingtalk_stream.chatbot import ChatbotMessage + DINGTALK_AVAILABLE = True except ImportError: DINGTALK_AVAILABLE = False + # Fallback so class definitions don't crash at module level + CallbackHandler = object # type: ignore[assignment,misc] + CallbackMessage = None # type: ignore[assignment,misc] + AckMessage = None # type: ignore[assignment,misc] + ChatbotMessage = None # type: ignore[assignment,misc] class NanobotDingTalkHandler(CallbackHandler): @@ -33,127 +38,146 @@ class NanobotDingTalkHandler(CallbackHandler): Standard DingTalk Stream SDK Callback Handler. Parses incoming messages and forwards them to the Nanobot channel. """ + def __init__(self, channel: "DingTalkChannel"): super().__init__() self.channel = channel - + async def process(self, message: CallbackMessage): """Process incoming stream message.""" try: # Parse using SDK's ChatbotMessage for robust handling chatbot_msg = ChatbotMessage.from_dict(message.data) - - # Extract content based on message type + + # Extract text content; fall back to raw dict if SDK object is empty content = "" if chatbot_msg.text: content = chatbot_msg.text.content.strip() - elif chatbot_msg.message_type == "text": - # Fallback manual extraction if object not populated - content = message.data.get("text", {}).get("content", "").strip() - if not content: - logger.warning(f"Received empty or unsupported message type: {chatbot_msg.message_type}") + content = message.data.get("text", {}).get("content", "").strip() + + if not content: + logger.warning( + f"Received empty or unsupported message type: {chatbot_msg.message_type}" + ) return AckMessage.STATUS_OK, "OK" sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id sender_name = chatbot_msg.sender_nick or "Unknown" - + logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}") - # Forward to Nanobot - # We use asyncio.create_task to avoid blocking the ACK return - asyncio.create_task( + # Forward to Nanobot via _on_message (non-blocking). + # Store reference to prevent GC before task completes. + task = asyncio.create_task( self.channel._on_message(content, sender_id, sender_name) ) + self.channel._background_tasks.add(task) + task.add_done_callback(self.channel._background_tasks.discard) return AckMessage.STATUS_OK, "OK" - + except Exception as e: logger.error(f"Error processing DingTalk message: {e}") - # Return OK to avoid retry loop from DingTalk server if it's a parsing error + # Return OK to avoid retry loop from DingTalk server return AckMessage.STATUS_OK, "Error" + class DingTalkChannel(BaseChannel): """ DingTalk channel using Stream Mode. - + Uses WebSocket to receive events via `dingtalk-stream` SDK. - Uses direct HTTP API to send messages (since SDK is mainly for receiving). + Uses direct HTTP API to send messages (SDK is mainly for receiving). + + Note: Currently only supports private (1:1) chat. Group messages are + received but replies are sent back as private messages to the sender. """ - + name = "dingtalk" - + def __init__(self, config: DingTalkConfig, bus: MessageBus): super().__init__(config, bus) self.config: DingTalkConfig = config self._client: Any = None - self._loop: asyncio.AbstractEventLoop | None = None - + self._http: httpx.AsyncClient | None = None + # Access Token management for sending messages self._access_token: str | None = None self._token_expiry: float = 0 - + + # Hold references to background tasks to prevent GC + self._background_tasks: set[asyncio.Task] = set() + async def start(self) -> None: """Start the DingTalk bot with Stream Mode.""" try: if not DINGTALK_AVAILABLE: - logger.error("DingTalk Stream SDK not installed. Run: pip install dingtalk-stream") + logger.error( + "DingTalk Stream SDK not installed. Run: pip install dingtalk-stream" + ) return - + if not self.config.client_id or not self.config.client_secret: logger.error("DingTalk client_id and client_secret not configured") return - + self._running = True - self._loop = asyncio.get_running_loop() - - logger.info(f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}...") + self._http = httpx.AsyncClient() + + logger.info( + f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..." + ) credential = Credential(self.config.client_id, self.config.client_secret) self._client = DingTalkStreamClient(credential) - + # Register standard handler handler = NanobotDingTalkHandler(self) - - # Register using the chatbot topic standard for bots - self._client.register_callback_handler( - ChatbotMessage.TOPIC, - handler - ) - + self._client.register_callback_handler(ChatbotMessage.TOPIC, handler) + logger.info("DingTalk bot started with Stream Mode") - - # The client.start() method is an async infinite loop that handles the websocket connection + + # client.start() is an async infinite loop handling the websocket connection await self._client.start() except Exception as e: logger.exception(f"Failed to start DingTalk channel: {e}") - + async def stop(self) -> None: """Stop the DingTalk bot.""" self._running = False - # SDK doesn't expose a clean stop method that cancels loop immediately without private access - pass + # Close the shared HTTP client + if self._http: + await self._http.aclose() + self._http = None + # Cancel outstanding background tasks + for task in self._background_tasks: + task.cancel() + self._background_tasks.clear() async def _get_access_token(self) -> str | None: """Get or refresh Access Token.""" if self._access_token and time.time() < self._token_expiry: return self._access_token - + url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" data = { "appKey": self.config.client_id, - "appSecret": self.config.client_secret + "appSecret": self.config.client_secret, } - + + if not self._http: + logger.warning("DingTalk HTTP client not initialized, cannot refresh token") + return None + try: - async with httpx.AsyncClient() as client: - resp = await client.post(url, json=data) - resp.raise_for_status() - res_data = resp.json() - self._access_token = res_data.get("accessToken") - # Expire 60s early to be safe - self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60 - return self._access_token + resp = await self._http.post(url, json=data) + resp.raise_for_status() + res_data = resp.json() + self._access_token = res_data.get("accessToken") + # Expire 60s early to be safe + self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60 + return self._access_token except Exception as e: logger.error(f"Failed to get DingTalk access token: {e}") return None @@ -163,57 +187,52 @@ class DingTalkChannel(BaseChannel): token = await self._get_access_token() if not token: return - - # This endpoint is for sending to a single user in a bot chat + + # oToMessages/batchSend: sends to individual users (private chat) # https://open.dingtalk.com/document/orgapp/robot-batch-send-messages url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" - - headers = { - "x-acs-dingtalk-access-token": token - } - - # Convert markdown code blocks for basic compatibility if needed, - # but DingTalk supports markdown loosely. - + + headers = {"x-acs-dingtalk-access-token": token} + data = { "robotCode": self.config.client_id, - "userIds": [msg.chat_id], # chat_id is the user's staffId/unionId - "msgKey": "sampleMarkdown", # Using markdown template + "userIds": [msg.chat_id], # chat_id is the user's staffId + "msgKey": "sampleMarkdown", "msgParam": json.dumps({ "text": msg.content, - "title": "Nanobot Reply" - }) + "title": "Nanobot Reply", + }), } - + + if not self._http: + logger.warning("DingTalk HTTP client not initialized, cannot send") + return + try: - async with httpx.AsyncClient() as client: - resp = await client.post(url, json=data, headers=headers) - # Check 200 OK but also API error codes if any - if resp.status_code != 200: - logger.error(f"DingTalk send failed: {resp.text}") - else: - logger.debug(f"DingTalk message sent to {msg.chat_id}") + resp = await self._http.post(url, json=data, headers=headers) + if resp.status_code != 200: + logger.error(f"DingTalk send failed: {resp.text}") + else: + logger.debug(f"DingTalk message sent to {msg.chat_id}") except Exception as e: logger.error(f"Error sending DingTalk message: {e}") async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None: - """Handle incoming message (called by NanobotDingTalkHandler).""" + """Handle incoming message (called by NanobotDingTalkHandler). + + Delegates to BaseChannel._handle_message() which enforces allow_from + permission checks before publishing to the bus. + """ try: logger.info(f"DingTalk inbound: {content} from {sender_name}") - - # Correct InboundMessage usage based on events.py definition - # @dataclass class InboundMessage: - # channel: str, sender_id: str, chat_id: str, content: str, ... - msg = InboundMessage( - channel=self.name, + await self._handle_message( sender_id=sender_id, - chat_id=sender_id, # For private stats, chat_id is sender_id + chat_id=sender_id, # For private chat, chat_id == sender_id content=str(content), metadata={ "sender_name": sender_name, - "platform": "dingtalk" - } + "platform": "dingtalk", + }, ) - await self.bus.publish_inbound(msg) except Exception as e: logger.error(f"Error publishing DingTalk message: {e}")