diff --git a/README.md b/README.md index 8503b6c..0c74e17 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,7 @@ nanobot agent -m "Hello from my local LLM!" ## 💬 Chat Apps -Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Slack, Email, or QQ — anytime, anywhere. +Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, Mochat, DingTalk, Slack, Email, or QQ — anytime, anywhere. | Channel | Setup | |---------|-------| @@ -176,6 +176,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, Slac | **Discord** | Easy (bot token + intents) | | **WhatsApp** | Medium (scan QR) | | **Feishu** | Medium (app credentials) | +| **Mochat** | Medium (claw token + websocket) | | **DingTalk** | Medium (app credentials) | | **Slack** | Medium (bot + app tokens) | | **Email** | Medium (IMAP/SMTP credentials) | @@ -215,6 +216,48 @@ nanobot gateway +
+Mochat (Claw IM) + +Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. + +**1. Prepare credentials** +- `clawToken`: Claw API token +- `agentUserId`: your bot user id +- Optional: `sessions`/`panels` with `["*"]` for auto-discovery + +**2. Configure** + +```json +{ + "channels": { + "mochat": { + "enabled": true, + "baseUrl": "https://mochat.io", + "socketUrl": "https://mochat.io", + "socketPath": "/socket.io", + "clawToken": "claw_xxx", + "agentUserId": "6982abcdef", + "sessions": ["*"], + "panels": ["*"], + "replyDelayMode": "non-mention", + "replyDelayMs": 120000 + } + } +} +``` + +**3. Run** + +```bash +nanobot gateway +``` + +> [!TIP] +> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Mochat API endpoint. + +
+
Discord @@ -644,7 +687,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard # Edit config on host to add API keys vim ~/.nanobot/config.json -# Run gateway (connects to Telegram/WhatsApp) +# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Mochat) docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway # Or run a single command @@ -664,7 +707,7 @@ nanobot/ │ ├── subagent.py # Background task execution │ └── tools/ # Built-in tools (incl. spawn) ├── skills/ # 🎯 Bundled skills (github, weather, tmux...) -├── channels/ # 📱 WhatsApp integration +├── channels/ # 📱 Chat channel integrations ├── bus/ # 🚌 Message routing ├── cron/ # ⏰ Scheduled tasks ├── heartbeat/ # 💓 Proactive wake-up diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index c63df5e..464fa97 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -85,6 +85,18 @@ class ChannelManager: except ImportError as e: logger.warning(f"Feishu channel not available: {e}") + # Mochat channel + if self.config.channels.mochat.enabled: + try: + from nanobot.channels.mochat import MochatChannel + + self.channels["mochat"] = MochatChannel( + self.config.channels.mochat, self.bus + ) + logger.info("Mochat channel enabled") + except ImportError as e: + logger.warning(f"Mochat channel not available: {e}") + # DingTalk channel if self.config.channels.dingtalk.enabled: try: diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py new file mode 100644 index 0000000..30c3dbf --- /dev/null +++ b/nanobot/channels/mochat.py @@ -0,0 +1,895 @@ +"""Mochat channel implementation using Socket.IO with HTTP polling fallback.""" + +from __future__ import annotations + +import asyncio +import json +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +import httpx +from loguru import logger + +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.base import BaseChannel +from nanobot.config.schema import MochatConfig +from nanobot.utils.helpers import get_data_path + +try: + import socketio + SOCKETIO_AVAILABLE = True +except ImportError: + socketio = None + SOCKETIO_AVAILABLE = False + +try: + import msgpack # noqa: F401 + MSGPACK_AVAILABLE = True +except ImportError: + MSGPACK_AVAILABLE = False + +MAX_SEEN_MESSAGE_IDS = 2000 +CURSOR_SAVE_DEBOUNCE_S = 0.5 + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + +@dataclass +class MochatBufferedEntry: + """Buffered inbound entry for delayed dispatch.""" + raw_body: str + author: str + sender_name: str = "" + sender_username: str = "" + timestamp: int | None = None + message_id: str = "" + group_id: str = "" + + +@dataclass +class DelayState: + """Per-target delayed message state.""" + entries: list[MochatBufferedEntry] = field(default_factory=list) + lock: asyncio.Lock = field(default_factory=asyncio.Lock) + timer: asyncio.Task | None = None + + +@dataclass +class MochatTarget: + """Outbound target resolution result.""" + id: str + is_panel: bool + + +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + +def _safe_dict(value: Any) -> dict: + """Return *value* if it's a dict, else empty dict.""" + return value if isinstance(value, dict) else {} + + +def _str_field(src: dict, *keys: str) -> str: + """Return the first non-empty str value found for *keys*, stripped.""" + for k in keys: + v = src.get(k) + if isinstance(v, str) and v.strip(): + return v.strip() + return "" + + +def _make_synthetic_event( + message_id: str, author: str, content: Any, + meta: Any, group_id: str, converse_id: str, + timestamp: Any = None, *, author_info: Any = None, +) -> dict[str, Any]: + """Build a synthetic ``message.add`` event dict.""" + payload: dict[str, Any] = { + "messageId": message_id, "author": author, + "content": content, "meta": _safe_dict(meta), + "groupId": group_id, "converseId": converse_id, + } + if author_info is not None: + payload["authorInfo"] = _safe_dict(author_info) + return { + "type": "message.add", + "timestamp": timestamp or datetime.utcnow().isoformat(), + "payload": payload, + } + + +def normalize_mochat_content(content: Any) -> str: + """Normalize content payload to text.""" + if isinstance(content, str): + return content.strip() + if content is None: + return "" + try: + return json.dumps(content, ensure_ascii=False) + except TypeError: + return str(content) + + +def resolve_mochat_target(raw: str) -> MochatTarget: + """Resolve id and target kind from user-provided target string.""" + trimmed = (raw or "").strip() + if not trimmed: + return MochatTarget(id="", is_panel=False) + + lowered = trimmed.lower() + cleaned, forced_panel = trimmed, False + for prefix in ("mochat:", "group:", "channel:", "panel:"): + if lowered.startswith(prefix): + cleaned = trimmed[len(prefix):].strip() + forced_panel = prefix in {"group:", "channel:", "panel:"} + break + + if not cleaned: + return MochatTarget(id="", is_panel=False) + return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) + + +def extract_mention_ids(value: Any) -> list[str]: + """Extract mention ids from heterogeneous mention payload.""" + if not isinstance(value, list): + return [] + ids: list[str] = [] + for item in value: + if isinstance(item, str): + if item.strip(): + ids.append(item.strip()) + elif isinstance(item, dict): + for key in ("id", "userId", "_id"): + candidate = item.get(key) + if isinstance(candidate, str) and candidate.strip(): + ids.append(candidate.strip()) + break + return ids + + +def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: + """Resolve mention state from payload metadata and text fallback.""" + meta = payload.get("meta") + if isinstance(meta, dict): + if meta.get("mentioned") is True or meta.get("wasMentioned") is True: + return True + for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): + if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)): + return True + if not agent_user_id: + return False + content = payload.get("content") + if not isinstance(content, str) or not content: + return False + return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content + + +def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool: + """Resolve mention requirement for group/panel conversations.""" + groups = config.groups or {} + for key in (group_id, session_id, "*"): + if key and key in groups: + return bool(groups[key].require_mention) + return bool(config.mention.require_in_groups) + + +def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> str: + """Build text body from one or more buffered entries.""" + if not entries: + return "" + if len(entries) == 1: + return entries[0].raw_body + lines: list[str] = [] + for entry in entries: + if not entry.raw_body: + continue + if is_group: + label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author + if label: + lines.append(f"{label}: {entry.raw_body}") + continue + lines.append(entry.raw_body) + return "\n".join(lines).strip() + + +def parse_timestamp(value: Any) -> int | None: + """Parse event timestamp to epoch milliseconds.""" + if not isinstance(value, str) or not value.strip(): + return None + try: + return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000) + except ValueError: + return None + + +# --------------------------------------------------------------------------- +# Channel +# --------------------------------------------------------------------------- + +class MochatChannel(BaseChannel): + """Mochat channel using socket.io with fallback polling workers.""" + + name = "mochat" + + def __init__(self, config: MochatConfig, bus: MessageBus): + super().__init__(config, bus) + self.config: MochatConfig = config + self._http: httpx.AsyncClient | None = None + self._socket: Any = None + self._ws_connected = self._ws_ready = False + + self._state_dir = get_data_path() / "mochat" + self._cursor_path = self._state_dir / "session_cursors.json" + self._session_cursor: dict[str, int] = {} + self._cursor_save_task: asyncio.Task | None = None + + self._session_set: set[str] = set() + self._panel_set: set[str] = set() + self._auto_discover_sessions = self._auto_discover_panels = False + + self._cold_sessions: set[str] = set() + self._session_by_converse: dict[str, str] = {} + + self._seen_set: dict[str, set[str]] = {} + self._seen_queue: dict[str, deque[str]] = {} + self._delay_states: dict[str, DelayState] = {} + + self._fallback_mode = False + self._session_fallback_tasks: dict[str, asyncio.Task] = {} + self._panel_fallback_tasks: dict[str, asyncio.Task] = {} + self._refresh_task: asyncio.Task | None = None + self._target_locks: dict[str, asyncio.Lock] = {} + + # ---- lifecycle --------------------------------------------------------- + + async def start(self) -> None: + """Start Mochat channel workers and websocket connection.""" + if not self.config.claw_token: + logger.error("Mochat claw_token not configured") + return + + self._running = True + self._http = httpx.AsyncClient(timeout=30.0) + self._state_dir.mkdir(parents=True, exist_ok=True) + await self._load_session_cursors() + self._seed_targets_from_config() + await self._refresh_targets(subscribe_new=False) + + if not await self._start_socket_client(): + await self._ensure_fallback_workers() + + self._refresh_task = asyncio.create_task(self._refresh_loop()) + while self._running: + await asyncio.sleep(1) + + async def stop(self) -> None: + """Stop all workers and clean up resources.""" + self._running = False + if self._refresh_task: + self._refresh_task.cancel() + self._refresh_task = None + + await self._stop_fallback_workers() + await self._cancel_delay_timers() + + if self._socket: + try: + await self._socket.disconnect() + except Exception: + pass + self._socket = None + + if self._cursor_save_task: + self._cursor_save_task.cancel() + self._cursor_save_task = None + await self._save_session_cursors() + + if self._http: + await self._http.aclose() + self._http = None + self._ws_connected = self._ws_ready = False + + async def send(self, msg: OutboundMessage) -> None: + """Send outbound message to session or panel.""" + if not self.config.claw_token: + logger.warning("Mochat claw_token missing, skip send") + return + + parts = ([msg.content.strip()] if msg.content and msg.content.strip() else []) + if msg.media: + parts.extend(m for m in msg.media if isinstance(m, str) and m.strip()) + content = "\n".join(parts).strip() + if not content: + return + + target = resolve_mochat_target(msg.chat_id) + if not target.id: + logger.warning("Mochat outbound target is empty") + return + + is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_") + try: + if is_panel: + await self._api_send("/api/claw/groups/panels/send", "panelId", target.id, + content, msg.reply_to, self._read_group_id(msg.metadata)) + else: + await self._api_send("/api/claw/sessions/send", "sessionId", target.id, + content, msg.reply_to) + except Exception as e: + logger.error(f"Failed to send Mochat message: {e}") + + # ---- config / init helpers --------------------------------------------- + + def _seed_targets_from_config(self) -> None: + sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) + panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels) + self._session_set.update(sessions) + self._panel_set.update(panels) + for sid in sessions: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) + + @staticmethod + def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]: + cleaned = [str(v).strip() for v in values if str(v).strip()] + return sorted({v for v in cleaned if v != "*"}), "*" in cleaned + + # ---- websocket --------------------------------------------------------- + + async def _start_socket_client(self) -> bool: + if not SOCKETIO_AVAILABLE: + logger.warning("python-socketio not installed, Mochat using polling fallback") + return False + + serializer = "default" + if not self.config.socket_disable_msgpack: + if MSGPACK_AVAILABLE: + serializer = "msgpack" + else: + logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON") + + client = socketio.AsyncClient( + reconnection=True, + reconnection_attempts=self.config.max_retry_attempts or None, + reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0), + reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0), + logger=False, engineio_logger=False, serializer=serializer, + ) + + @client.event + async def connect() -> None: + self._ws_connected, self._ws_ready = True, False + logger.info("Mochat websocket connected") + subscribed = await self._subscribe_all() + self._ws_ready = subscribed + await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers()) + + @client.event + async def disconnect() -> None: + if not self._running: + return + self._ws_connected = self._ws_ready = False + logger.warning("Mochat websocket disconnected") + await self._ensure_fallback_workers() + + @client.event + async def connect_error(data: Any) -> None: + logger.error(f"Mochat websocket connect error: {data}") + + @client.on("claw.session.events") + async def on_session_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, "session") + + @client.on("claw.panel.events") + async def on_panel_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, "panel") + + for ev in ("notify:chat.inbox.append", "notify:chat.message.add", + "notify:chat.message.update", "notify:chat.message.recall", + "notify:chat.message.delete"): + client.on(ev, self._build_notify_handler(ev)) + + socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/") + socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/") + + try: + self._socket = client + await client.connect( + socket_url, transports=["websocket"], socketio_path=socket_path, + auth={"token": self.config.claw_token}, + wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0), + ) + return True + except Exception as e: + logger.error(f"Failed to connect Mochat websocket: {e}") + try: + await client.disconnect() + except Exception: + pass + self._socket = None + return False + + def _build_notify_handler(self, event_name: str): + async def handler(payload: Any) -> None: + if event_name == "notify:chat.inbox.append": + await self._handle_notify_inbox_append(payload) + elif event_name.startswith("notify:chat.message."): + await self._handle_notify_chat_message(payload) + return handler + + # ---- subscribe --------------------------------------------------------- + + async def _subscribe_all(self) -> bool: + ok = await self._subscribe_sessions(sorted(self._session_set)) + ok = await self._subscribe_panels(sorted(self._panel_set)) and ok + if self._auto_discover_sessions or self._auto_discover_panels: + await self._refresh_targets(subscribe_new=True) + return ok + + async def _subscribe_sessions(self, session_ids: list[str]) -> bool: + if not session_ids: + return True + for sid in session_ids: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) + + ack = await self._socket_call("com.claw.im.subscribeSessions", { + "sessionIds": session_ids, "cursors": self._session_cursor, + "limit": self.config.watch_limit, + }) + if not ack.get("result"): + logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") + return False + + data = ack.get("data") + items: list[dict[str, Any]] = [] + if isinstance(data, list): + items = [i for i in data if isinstance(i, dict)] + elif isinstance(data, dict): + sessions = data.get("sessions") + if isinstance(sessions, list): + items = [i for i in sessions if isinstance(i, dict)] + elif "sessionId" in data: + items = [data] + for p in items: + await self._handle_watch_payload(p, "session") + return True + + async def _subscribe_panels(self, panel_ids: list[str]) -> bool: + if not self._auto_discover_panels and not panel_ids: + return True + ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) + if not ack.get("result"): + logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") + return False + return True + + async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._socket: + return {"result": False, "message": "socket not connected"} + try: + raw = await self._socket.call(event_name, payload, timeout=10) + except Exception as e: + return {"result": False, "message": str(e)} + return raw if isinstance(raw, dict) else {"result": True, "data": raw} + + # ---- refresh / discovery ----------------------------------------------- + + async def _refresh_loop(self) -> None: + interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + while self._running: + await asyncio.sleep(interval_s) + try: + await self._refresh_targets(subscribe_new=self._ws_ready) + except Exception as e: + logger.warning(f"Mochat refresh failed: {e}") + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_targets(self, subscribe_new: bool) -> None: + if self._auto_discover_sessions: + await self._refresh_sessions_directory(subscribe_new) + if self._auto_discover_panels: + await self._refresh_panels(subscribe_new) + + async def _refresh_sessions_directory(self, subscribe_new: bool) -> None: + try: + response = await self._post_json("/api/claw/sessions/list", {}) + except Exception as e: + logger.warning(f"Mochat listSessions failed: {e}") + return + + sessions = response.get("sessions") + if not isinstance(sessions, list): + return + + new_ids: list[str] = [] + for s in sessions: + if not isinstance(s, dict): + continue + sid = _str_field(s, "sessionId") + if not sid: + continue + if sid not in self._session_set: + self._session_set.add(sid) + new_ids.append(sid) + if sid not in self._session_cursor: + self._cold_sessions.add(sid) + cid = _str_field(s, "converseId") + if cid: + self._session_by_converse[cid] = sid + + if not new_ids: + return + if self._ws_ready and subscribe_new: + await self._subscribe_sessions(new_ids) + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_panels(self, subscribe_new: bool) -> None: + try: + response = await self._post_json("/api/claw/groups/get", {}) + except Exception as e: + logger.warning(f"Mochat getWorkspaceGroup failed: {e}") + return + + raw_panels = response.get("panels") + if not isinstance(raw_panels, list): + return + + new_ids: list[str] = [] + for p in raw_panels: + if not isinstance(p, dict): + continue + pt = p.get("type") + if isinstance(pt, int) and pt != 0: + continue + pid = _str_field(p, "id", "_id") + if pid and pid not in self._panel_set: + self._panel_set.add(pid) + new_ids.append(pid) + + if not new_ids: + return + if self._ws_ready and subscribe_new: + await self._subscribe_panels(new_ids) + if self._fallback_mode: + await self._ensure_fallback_workers() + + # ---- fallback workers -------------------------------------------------- + + async def _ensure_fallback_workers(self) -> None: + if not self._running: + return + self._fallback_mode = True + for sid in sorted(self._session_set): + t = self._session_fallback_tasks.get(sid) + if not t or t.done(): + self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid)) + for pid in sorted(self._panel_set): + t = self._panel_fallback_tasks.get(pid) + if not t or t.done(): + self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid)) + + async def _stop_fallback_workers(self) -> None: + self._fallback_mode = False + tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()] + for t in tasks: + t.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + self._session_fallback_tasks.clear() + self._panel_fallback_tasks.clear() + + async def _session_watch_worker(self, session_id: str) -> None: + while self._running and self._fallback_mode: + try: + payload = await self._post_json("/api/claw/sessions/watch", { + "sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0), + "timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit, + }) + await self._handle_watch_payload(payload, "session") + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Mochat watch fallback error ({session_id}): {e}") + await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) + + async def _panel_poll_worker(self, panel_id: str) -> None: + sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + while self._running and self._fallback_mode: + try: + resp = await self._post_json("/api/claw/groups/panels/messages", { + "panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)), + }) + msgs = resp.get("messages") + if isinstance(msgs, list): + for m in reversed(msgs): + if not isinstance(m, dict): + continue + evt = _make_synthetic_event( + message_id=str(m.get("messageId") or ""), + author=str(m.get("author") or ""), + content=m.get("content"), + meta=m.get("meta"), group_id=str(resp.get("groupId") or ""), + converse_id=panel_id, timestamp=m.get("createdAt"), + author_info=m.get("authorInfo"), + ) + await self._process_inbound_event(panel_id, evt, "panel") + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Mochat panel polling error ({panel_id}): {e}") + await asyncio.sleep(sleep_s) + + # ---- inbound event processing ------------------------------------------ + + async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None: + if not isinstance(payload, dict): + return + target_id = _str_field(payload, "sessionId") + if not target_id: + return + + lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock()) + async with lock: + prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 + pc = payload.get("cursor") + if target_kind == "session" and isinstance(pc, int) and pc >= 0: + self._mark_session_cursor(target_id, pc) + + raw_events = payload.get("events") + if not isinstance(raw_events, list): + return + if target_kind == "session" and target_id in self._cold_sessions: + self._cold_sessions.discard(target_id) + return + + for event in raw_events: + if not isinstance(event, dict): + continue + seq = event.get("seq") + if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev): + self._mark_session_cursor(target_id, seq) + if event.get("type") == "message.add": + await self._process_inbound_event(target_id, event, target_kind) + + async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None: + payload = event.get("payload") + if not isinstance(payload, dict): + return + + author = _str_field(payload, "author") + if not author or (self.config.agent_user_id and author == self.config.agent_user_id): + return + if not self.is_allowed(author): + return + + message_id = _str_field(payload, "messageId") + seen_key = f"{target_kind}:{target_id}" + if message_id and self._remember_message_id(seen_key, message_id): + return + + raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]" + ai = _safe_dict(payload.get("authorInfo")) + sender_name = _str_field(ai, "nickname", "email") + sender_username = _str_field(ai, "agentId") + + group_id = _str_field(payload, "groupId") + is_group = bool(group_id) + was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id) + require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id) + use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention" + + if require_mention and not was_mentioned and not use_delay: + return + + entry = MochatBufferedEntry( + raw_body=raw_body, author=author, sender_name=sender_name, + sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")), + message_id=message_id, group_id=group_id, + ) + + if use_delay: + delay_key = seen_key + if was_mentioned: + await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry) + else: + await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry) + return + + await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned) + + # ---- dedup / buffering ------------------------------------------------- + + def _remember_message_id(self, key: str, message_id: str) -> bool: + seen_set = self._seen_set.setdefault(key, set()) + seen_queue = self._seen_queue.setdefault(key, deque()) + if message_id in seen_set: + return True + seen_set.add(message_id) + seen_queue.append(message_id) + while len(seen_queue) > MAX_SEEN_MESSAGE_IDS: + seen_set.discard(seen_queue.popleft()) + return False + + async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None: + state = self._delay_states.setdefault(key, DelayState()) + async with state.lock: + state.entries.append(entry) + if state.timer: + state.timer.cancel() + state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind)) + + async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None: + await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0) + await self._flush_delayed_entries(key, target_id, target_kind, "timer", None) + + async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None: + state = self._delay_states.setdefault(key, DelayState()) + async with state.lock: + if entry: + state.entries.append(entry) + current = asyncio.current_task() + if state.timer and state.timer is not current: + state.timer.cancel() + state.timer = None + entries = state.entries[:] + state.entries.clear() + if entries: + await self._dispatch_entries(target_id, target_kind, entries, reason == "mention") + + async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None: + if not entries: + return + last = entries[-1] + is_group = bool(last.group_id) + body = build_buffered_body(entries, is_group) or "[empty message]" + await self._handle_message( + sender_id=last.author, chat_id=target_id, content=body, + metadata={ + "message_id": last.message_id, "timestamp": last.timestamp, + "is_group": is_group, "group_id": last.group_id, + "sender_name": last.sender_name, "sender_username": last.sender_username, + "target_kind": target_kind, "was_mentioned": was_mentioned, + "buffered_count": len(entries), + }, + ) + + async def _cancel_delay_timers(self) -> None: + for state in self._delay_states.values(): + if state.timer: + state.timer.cancel() + self._delay_states.clear() + + # ---- notify handlers --------------------------------------------------- + + async def _handle_notify_chat_message(self, payload: Any) -> None: + if not isinstance(payload, dict): + return + group_id = _str_field(payload, "groupId") + panel_id = _str_field(payload, "converseId", "panelId") + if not group_id or not panel_id: + return + if self._panel_set and panel_id not in self._panel_set: + return + + evt = _make_synthetic_event( + message_id=str(payload.get("_id") or payload.get("messageId") or ""), + author=str(payload.get("author") or ""), + content=payload.get("content"), meta=payload.get("meta"), + group_id=group_id, converse_id=panel_id, + timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"), + ) + await self._process_inbound_event(panel_id, evt, "panel") + + async def _handle_notify_inbox_append(self, payload: Any) -> None: + if not isinstance(payload, dict) or payload.get("type") != "message": + return + detail = payload.get("payload") + if not isinstance(detail, dict): + return + if _str_field(detail, "groupId"): + return + converse_id = _str_field(detail, "converseId") + if not converse_id: + return + + session_id = self._session_by_converse.get(converse_id) + if not session_id: + await self._refresh_sessions_directory(self._ws_ready) + session_id = self._session_by_converse.get(converse_id) + if not session_id: + return + + evt = _make_synthetic_event( + message_id=str(detail.get("messageId") or payload.get("_id") or ""), + author=str(detail.get("messageAuthor") or ""), + content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""), + meta={"source": "notify:chat.inbox.append", "converseId": converse_id}, + group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"), + ) + await self._process_inbound_event(session_id, evt, "session") + + # ---- cursor persistence ------------------------------------------------ + + def _mark_session_cursor(self, session_id: str, cursor: int) -> None: + if cursor < 0 or cursor < self._session_cursor.get(session_id, 0): + return + self._session_cursor[session_id] = cursor + if not self._cursor_save_task or self._cursor_save_task.done(): + self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) + + async def _save_cursor_debounced(self) -> None: + await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S) + await self._save_session_cursors() + + async def _load_session_cursors(self) -> None: + if not self._cursor_path.exists(): + return + try: + data = json.loads(self._cursor_path.read_text("utf-8")) + except Exception as e: + logger.warning(f"Failed to read Mochat cursor file: {e}") + return + cursors = data.get("cursors") if isinstance(data, dict) else None + if isinstance(cursors, dict): + for sid, cur in cursors.items(): + if isinstance(sid, str) and isinstance(cur, int) and cur >= 0: + self._session_cursor[sid] = cur + + async def _save_session_cursors(self) -> None: + try: + self._state_dir.mkdir(parents=True, exist_ok=True) + self._cursor_path.write_text(json.dumps({ + "schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(), + "cursors": self._session_cursor, + }, ensure_ascii=False, indent=2) + "\n", "utf-8") + except Exception as e: + logger.warning(f"Failed to save Mochat cursor file: {e}") + + # ---- HTTP helpers ------------------------------------------------------ + + async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._http: + raise RuntimeError("Mochat HTTP client not initialized") + url = f"{self.config.base_url.strip().rstrip('/')}{path}" + response = await self._http.post(url, headers={ + "Content-Type": "application/json", "X-Claw-Token": self.config.claw_token, + }, json=payload) + if not response.is_success: + raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}") + try: + parsed = response.json() + except Exception: + parsed = response.text + if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): + if parsed["code"] != 200: + msg = str(parsed.get("message") or parsed.get("name") or "request failed") + raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})") + data = parsed.get("data") + return data if isinstance(data, dict) else {} + return parsed if isinstance(parsed, dict) else {} + + async def _api_send(self, path: str, id_key: str, id_val: str, + content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]: + """Unified send helper for session and panel messages.""" + body: dict[str, Any] = {id_key: id_val, "content": content} + if reply_to: + body["replyTo"] = reply_to + if group_id: + body["groupId"] = group_id + return await self._post_json(path, body) + + @staticmethod + def _read_group_id(metadata: dict[str, Any]) -> str | None: + if not isinstance(metadata, dict): + return None + value = metadata.get("group_id") or metadata.get("groupId") + return value.strip() if isinstance(value, str) and value.strip() else None diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 78046e0..bcadba9 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -561,6 +561,24 @@ def channels_status(): "✓" if dc.enabled else "✗", dc.gateway_url ) + + # Feishu + fs = config.channels.feishu + fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]" + table.add_row( + "Feishu", + "✓" if fs.enabled else "✗", + fs_config + ) + + # Mochat + mc = config.channels.mochat + mc_base = mc.base_url or "[dim]not configured[/dim]" + table.add_row( + "Mochat", + "✓" if mc.enabled else "✗", + mc_base + ) # Telegram tg = config.channels.telegram diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index fe0259e..c9bdb02 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -77,6 +77,42 @@ class EmailConfig(BaseModel): allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses +class MochatMentionConfig(BaseModel): + """Mochat mention behavior configuration.""" + require_in_groups: bool = False + + +class MochatGroupRule(BaseModel): + """Mochat per-group mention requirement.""" + require_mention: bool = False + + +class MochatConfig(BaseModel): + """Mochat channel configuration.""" + enabled: bool = False + base_url: str = "https://mochat.io" + socket_url: str = "" + socket_path: str = "/socket.io" + socket_disable_msgpack: bool = False + socket_reconnect_delay_ms: int = 1000 + socket_max_reconnect_delay_ms: int = 10000 + socket_connect_timeout_ms: int = 10000 + refresh_interval_ms: int = 30000 + watch_timeout_ms: int = 25000 + watch_limit: int = 100 + retry_delay_ms: int = 500 + max_retry_attempts: int = 0 # 0 means unlimited retries + claw_token: str = "" + agent_user_id: str = "" + sessions: list[str] = Field(default_factory=list) + panels: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=list) + mention: MochatMentionConfig = Field(default_factory=MochatMentionConfig) + groups: dict[str, MochatGroupRule] = Field(default_factory=dict) + reply_delay_mode: str = "non-mention" # off | non-mention + reply_delay_ms: int = 120000 + + class SlackDMConfig(BaseModel): """Slack DM policy configuration.""" enabled: bool = True @@ -111,6 +147,7 @@ class ChannelsConfig(BaseModel): telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) feishu: FeishuConfig = Field(default_factory=FeishuConfig) + mochat: MochatConfig = Field(default_factory=MochatConfig) dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig) email: EmailConfig = Field(default_factory=EmailConfig) slack: SlackConfig = Field(default_factory=SlackConfig) diff --git a/pyproject.toml b/pyproject.toml index 63e148d..4c10d49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,8 @@ dependencies = [ "python-telegram-bot[socks]>=21.0", "lark-oapi>=1.0.0", "socksio>=1.0.0", + "python-socketio>=5.11.0", + "msgpack>=1.0.8", "slack-sdk>=3.26.0", "qq-botpy>=1.0.0", ]