From 20b8a2fc58dd4550c487d9b6f88e38b57a55b4bf Mon Sep 17 00:00:00 2001 From: tjb-tech Date: Mon, 9 Feb 2026 08:46:47 +0000 Subject: [PATCH 1/4] feat(channels): add Moltchat websocket channel with polling fallback --- README.md | 49 +- nanobot/channels/__init__.py | 3 +- nanobot/channels/manager.py | 12 + nanobot/channels/moltchat.py | 1227 ++++++++++++++++++++++++++++++++ nanobot/cli/commands.py | 18 + nanobot/config/schema.py | 37 + pyproject.toml | 2 + tests/test_moltchat_channel.py | 115 +++ 8 files changed, 1459 insertions(+), 4 deletions(-) create mode 100644 nanobot/channels/moltchat.py create mode 100644 tests/test_moltchat_channel.py diff --git a/README.md b/README.md index 8a15892..74c24d9 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ nanobot agent -m "Hello from my local LLM!" ## 💬 Chat Apps -Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, anywhere. +Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Moltchat — anytime, anywhere. | Channel | Setup | |---------|-------| @@ -172,6 +172,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, | **Discord** | Easy (bot token + intents) | | **WhatsApp** | Medium (scan QR) | | **Feishu** | Medium (app credentials) | +| **Moltchat** | Medium (claw token + websocket) |
Telegram (Recommended) @@ -205,6 +206,48 @@ nanobot gateway
+
+Moltchat (Claw IM) + +Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. + +**1. Prepare credentials** +- `clawToken`: Claw API token +- `agentUserId`: your bot user id +- Optional: `sessions`/`panels` with `["*"]` for auto-discovery + +**2. Configure** + +```json +{ + "channels": { + "moltchat": { + "enabled": true, + "baseUrl": "https://mochat.io", + "socketUrl": "https://mochat.io", + "socketPath": "/socket.io", + "clawToken": "claw_xxx", + "agentUserId": "69820107a785110aea8b1069", + "sessions": ["*"], + "panels": ["*"], + "replyDelayMode": "non-mention", + "replyDelayMs": 120000 + } + } +} +``` + +**3. Run** + +```bash +nanobot gateway +``` + +> [!TIP] +> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Moltchat API endpoint. + +
+
Discord @@ -413,7 +456,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard # Edit config on host to add API keys vim ~/.nanobot/config.json -# Run gateway (connects to Telegram/WhatsApp) +# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Moltchat) docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway # Or run a single command @@ -433,7 +476,7 @@ nanobot/ │ ├── subagent.py # Background task execution │ └── tools/ # Built-in tools (incl. spawn) ├── skills/ # 🎯 Bundled skills (github, weather, tmux...) -├── channels/ # 📱 WhatsApp integration +├── channels/ # 📱 Chat channel integrations ├── bus/ # 🚌 Message routing ├── cron/ # ⏰ Scheduled tasks ├── heartbeat/ # 💓 Proactive wake-up diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py index 588169d..4d77063 100644 --- a/nanobot/channels/__init__.py +++ b/nanobot/channels/__init__.py @@ -2,5 +2,6 @@ from nanobot.channels.base import BaseChannel from nanobot.channels.manager import ChannelManager +from nanobot.channels.moltchat import MoltchatChannel -__all__ = ["BaseChannel", "ChannelManager"] +__all__ = ["BaseChannel", "ChannelManager", "MoltchatChannel"] diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 64ced48..11690ef 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -77,6 +77,18 @@ class ChannelManager: logger.info("Feishu channel enabled") except ImportError as e: logger.warning(f"Feishu channel not available: {e}") + + # Moltchat channel + if self.config.channels.moltchat.enabled: + try: + from nanobot.channels.moltchat import MoltchatChannel + + self.channels["moltchat"] = MoltchatChannel( + self.config.channels.moltchat, self.bus + ) + logger.info("Moltchat channel enabled") + except ImportError as e: + logger.warning(f"Moltchat channel not available: {e}") async def start_all(self) -> None: """Start WhatsApp channel and the outbound dispatcher.""" diff --git a/nanobot/channels/moltchat.py b/nanobot/channels/moltchat.py new file mode 100644 index 0000000..cc590d4 --- /dev/null +++ b/nanobot/channels/moltchat.py @@ -0,0 +1,1227 @@ +"""Moltchat channel implementation using Socket.IO with HTTP polling fallback.""" + +from __future__ import annotations + +import asyncio +import json +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +import httpx +from loguru import logger + +from nanobot.bus.events import OutboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.channels.base import BaseChannel +from nanobot.config.schema import MoltchatConfig +from nanobot.utils.helpers import get_data_path + +try: + import socketio + + SOCKETIO_AVAILABLE = True +except ImportError: + socketio = None + SOCKETIO_AVAILABLE = False + +try: + import msgpack # noqa: F401 + + MSGPACK_AVAILABLE = True +except ImportError: + MSGPACK_AVAILABLE = False + + +MAX_SEEN_MESSAGE_IDS = 2000 +CURSOR_SAVE_DEBOUNCE_S = 0.5 + + +@dataclass +class MoltchatBufferedEntry: + """Buffered inbound entry for delayed dispatch.""" + + raw_body: str + author: str + sender_name: str = "" + sender_username: str = "" + timestamp: int | None = None + message_id: str = "" + group_id: str = "" + + +@dataclass +class DelayState: + """Per-target delayed message state.""" + + entries: list[MoltchatBufferedEntry] = field(default_factory=list) + lock: asyncio.Lock = field(default_factory=asyncio.Lock) + timer: asyncio.Task | None = None + + +@dataclass +class MoltchatTarget: + """Outbound target resolution result.""" + + id: str + is_panel: bool + + +def normalize_moltchat_content(content: Any) -> str: + """Normalize content payload to text.""" + if isinstance(content, str): + return content.strip() + if content is None: + return "" + try: + return json.dumps(content, ensure_ascii=False) + except TypeError: + return str(content) + + +def resolve_moltchat_target(raw: str) -> MoltchatTarget: + """Resolve id and target kind from user-provided target string.""" + trimmed = (raw or "").strip() + if not trimmed: + return MoltchatTarget(id="", is_panel=False) + + lowered = trimmed.lower() + cleaned = trimmed + forced_panel = False + + prefixes = ["moltchat:", "mochat:", "group:", "channel:", "panel:"] + for prefix in prefixes: + if lowered.startswith(prefix): + cleaned = trimmed[len(prefix) :].strip() + if prefix in {"group:", "channel:", "panel:"}: + forced_panel = True + break + + if not cleaned: + return MoltchatTarget(id="", is_panel=False) + + return MoltchatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) + + +def extract_mention_ids(value: Any) -> list[str]: + """Extract mention ids from heterogeneous mention payload.""" + if not isinstance(value, list): + return [] + + ids: list[str] = [] + for item in value: + if isinstance(item, str): + text = item.strip() + if text: + ids.append(text) + continue + + if not isinstance(item, dict): + continue + + for key in ("id", "userId", "_id"): + candidate = item.get(key) + if isinstance(candidate, str) and candidate.strip(): + ids.append(candidate.strip()) + break + + return ids + + +def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: + """Resolve mention state from payload metadata and text fallback.""" + meta = payload.get("meta") + if isinstance(meta, dict): + if meta.get("mentioned") is True or meta.get("wasMentioned") is True: + return True + + for field in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): + ids = extract_mention_ids(meta.get(field)) + if agent_user_id and agent_user_id in ids: + return True + + if not agent_user_id: + return False + + content = payload.get("content") + if not isinstance(content, str) or not content: + return False + + return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content + + +def resolve_require_mention( + config: MoltchatConfig, + session_id: str, + group_id: str, +) -> bool: + """Resolve mention requirement for group/panel conversations.""" + groups = config.groups or {} + if group_id and group_id in groups: + return bool(groups[group_id].require_mention) + if session_id in groups: + return bool(groups[session_id].require_mention) + if "*" in groups: + return bool(groups["*"].require_mention) + return bool(config.mention.require_in_groups) + + +def build_buffered_body(entries: list[MoltchatBufferedEntry], is_group: bool) -> str: + """Build text body from one or more buffered entries.""" + if not entries: + return "" + + if len(entries) == 1: + return entries[0].raw_body + + lines: list[str] = [] + for entry in entries: + body = entry.raw_body + if not body: + continue + if is_group: + label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author + if label: + lines.append(f"{label}: {body}") + continue + lines.append(body) + + return "\n".join(lines).strip() + + +def parse_timestamp(value: Any) -> int | None: + """Parse event timestamp to epoch milliseconds.""" + if not isinstance(value, str) or not value.strip(): + return None + try: + return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000) + except ValueError: + return None + + +class MoltchatChannel(BaseChannel): + """Moltchat channel using socket.io with fallback polling workers.""" + + name = "moltchat" + + def __init__(self, config: MoltchatConfig, bus: MessageBus): + super().__init__(config, bus) + self.config: MoltchatConfig = config + self._http: httpx.AsyncClient | None = None + self._socket: Any = None + self._ws_connected = False + self._ws_ready = False + + self._state_dir = get_data_path() / "moltchat" + self._cursor_path = self._state_dir / "session_cursors.json" + self._session_cursor: dict[str, int] = {} + self._cursor_save_task: asyncio.Task | None = None + + self._session_set: set[str] = set() + self._panel_set: set[str] = set() + self._auto_discover_sessions = False + self._auto_discover_panels = False + + self._cold_sessions: set[str] = set() + self._session_by_converse: dict[str, str] = {} + + self._seen_set: dict[str, set[str]] = {} + self._seen_queue: dict[str, deque[str]] = {} + + self._delay_states: dict[str, DelayState] = {} + + self._fallback_mode = False + self._session_fallback_tasks: dict[str, asyncio.Task] = {} + self._panel_fallback_tasks: dict[str, asyncio.Task] = {} + self._refresh_task: asyncio.Task | None = None + + self._target_locks: dict[str, asyncio.Lock] = {} + + async def start(self) -> None: + """Start Moltchat channel workers and websocket connection.""" + if not self.config.claw_token: + logger.error("Moltchat claw_token not configured") + return + + self._running = True + self._http = httpx.AsyncClient(timeout=30.0) + + self._state_dir.mkdir(parents=True, exist_ok=True) + await self._load_session_cursors() + self._seed_targets_from_config() + + await self._refresh_targets(subscribe_new=False) + + websocket_started = await self._start_socket_client() + if not websocket_started: + await self._ensure_fallback_workers() + + self._refresh_task = asyncio.create_task(self._refresh_loop()) + + while self._running: + await asyncio.sleep(1) + + async def stop(self) -> None: + """Stop all workers and clean up resources.""" + self._running = False + + if self._refresh_task: + self._refresh_task.cancel() + self._refresh_task = None + + await self._stop_fallback_workers() + await self._cancel_delay_timers() + + if self._socket: + try: + await self._socket.disconnect() + except Exception: + pass + self._socket = None + + if self._cursor_save_task: + self._cursor_save_task.cancel() + self._cursor_save_task = None + + await self._save_session_cursors() + + if self._http: + await self._http.aclose() + self._http = None + + self._ws_connected = False + self._ws_ready = False + + async def send(self, msg: OutboundMessage) -> None: + """Send outbound message to session or panel.""" + if not self.config.claw_token: + logger.warning("Moltchat claw_token missing, skip send") + return + + content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else [] + if msg.media: + content_parts.extend([m for m in msg.media if isinstance(m, str) and m.strip()]) + content = "\n".join(content_parts).strip() + if not content: + return + + target = resolve_moltchat_target(msg.chat_id) + if not target.id: + logger.warning("Moltchat outbound target is empty") + return + + is_panel = target.is_panel or target.id in self._panel_set + if target.id.startswith("session_"): + is_panel = False + + try: + if is_panel: + await self._send_panel_message( + panel_id=target.id, + content=content, + reply_to=msg.reply_to, + group_id=self._read_group_id(msg.metadata), + ) + else: + await self._send_session_message( + session_id=target.id, + content=content, + reply_to=msg.reply_to, + ) + except Exception as e: + logger.error(f"Failed to send Moltchat message: {e}") + + def _seed_targets_from_config(self) -> None: + sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) + panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels) + + self._session_set.update(sessions) + self._panel_set.update(panels) + + for session_id in sessions: + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + def _normalize_id_list(self, values: list[str]) -> tuple[list[str], bool]: + cleaned = [str(v).strip() for v in values if str(v).strip()] + has_wildcard = "*" in cleaned + ids = sorted({v for v in cleaned if v != "*"}) + return ids, has_wildcard + + async def _start_socket_client(self) -> bool: + if not SOCKETIO_AVAILABLE: + logger.warning("python-socketio not installed, Moltchat using polling fallback") + return False + + serializer = "default" + if not self.config.socket_disable_msgpack: + if MSGPACK_AVAILABLE: + serializer = "msgpack" + else: + logger.warning( + "msgpack is not installed but socket_disable_msgpack=false; " + "trying JSON serializer" + ) + + reconnect_attempts = None + if self.config.max_retry_attempts > 0: + reconnect_attempts = self.config.max_retry_attempts + + client = socketio.AsyncClient( + reconnection=True, + reconnection_attempts=reconnect_attempts, + reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0), + reconnection_delay_max=max( + 0.1, + self.config.socket_max_reconnect_delay_ms / 1000.0, + ), + logger=False, + engineio_logger=False, + serializer=serializer, + ) + + @client.event + async def connect() -> None: + self._ws_connected = True + self._ws_ready = False + logger.info("Moltchat websocket connected") + + subscribed = await self._subscribe_all() + self._ws_ready = subscribed + if subscribed: + await self._stop_fallback_workers() + else: + await self._ensure_fallback_workers() + + @client.event + async def disconnect() -> None: + if not self._running: + return + self._ws_connected = False + self._ws_ready = False + logger.warning("Moltchat websocket disconnected") + await self._ensure_fallback_workers() + + @client.event + async def connect_error(data: Any) -> None: + message = str(data) + logger.error(f"Moltchat websocket connect error: {message}") + + @client.on("claw.session.events") + async def on_session_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, target_kind="session") + + @client.on("claw.panel.events") + async def on_panel_events(payload: dict[str, Any]) -> None: + await self._handle_watch_payload(payload, target_kind="panel") + + for event_name in ( + "notify:chat.inbox.append", + "notify:chat.message.add", + "notify:chat.message.update", + "notify:chat.message.recall", + "notify:chat.message.delete", + ): + client.on(event_name, self._build_notify_handler(event_name)) + + socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/") + socket_path = (self.config.socket_path or "/socket.io").strip() + if socket_path.startswith("/"): + socket_path = socket_path[1:] + + try: + self._socket = client + await client.connect( + socket_url, + transports=["websocket"], + socketio_path=socket_path, + auth={"token": self.config.claw_token}, + wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0), + ) + return True + except Exception as e: + logger.error(f"Failed to connect Moltchat websocket: {e}") + try: + await client.disconnect() + except Exception: + pass + self._socket = None + return False + + def _build_notify_handler(self, event_name: str): + async def handler(payload: Any) -> None: + if event_name == "notify:chat.inbox.append": + await self._handle_notify_inbox_append(payload) + return + + if event_name.startswith("notify:chat.message."): + await self._handle_notify_chat_message(payload) + + return handler + + async def _subscribe_all(self) -> bool: + sessions_ok = await self._subscribe_sessions(sorted(self._session_set)) + panels_ok = await self._subscribe_panels(sorted(self._panel_set)) + + if self._auto_discover_sessions or self._auto_discover_panels: + await self._refresh_targets(subscribe_new=True) + + return sessions_ok and panels_ok + + async def _subscribe_sessions(self, session_ids: list[str]) -> bool: + if not session_ids: + return True + + for session_id in session_ids: + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + ack = await self._socket_call( + "com.claw.im.subscribeSessions", + { + "sessionIds": session_ids, + "cursors": self._session_cursor, + "limit": self.config.watch_limit, + }, + ) + if not ack.get("result"): + logger.error(f"Moltchat subscribeSessions failed: {ack.get('message', 'unknown error')}") + return False + + data = ack.get("data") + items: list[dict[str, Any]] = [] + if isinstance(data, list): + items = [item for item in data if isinstance(item, dict)] + elif isinstance(data, dict): + sessions = data.get("sessions") + if isinstance(sessions, list): + items = [item for item in sessions if isinstance(item, dict)] + elif "sessionId" in data: + items = [data] + + for payload in items: + await self._handle_watch_payload(payload, target_kind="session") + + return True + + async def _subscribe_panels(self, panel_ids: list[str]) -> bool: + if not self._auto_discover_panels and not panel_ids: + return True + + ack = await self._socket_call( + "com.claw.im.subscribePanels", + { + "panelIds": panel_ids, + }, + ) + if not ack.get("result"): + logger.error(f"Moltchat subscribePanels failed: {ack.get('message', 'unknown error')}") + return False + + return True + + async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._socket: + return {"result": False, "message": "socket not connected"} + + try: + raw = await self._socket.call(event_name, payload, timeout=10) + except Exception as e: + return {"result": False, "message": str(e)} + + if isinstance(raw, dict): + return raw + + return {"result": True, "data": raw} + + async def _refresh_loop(self) -> None: + interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + + while self._running: + await asyncio.sleep(interval_s) + + try: + await self._refresh_targets(subscribe_new=self._ws_ready) + except Exception as e: + logger.warning(f"Moltchat refresh failed: {e}") + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_targets(self, subscribe_new: bool) -> None: + if self._auto_discover_sessions: + await self._refresh_sessions_directory(subscribe_new=subscribe_new) + + if self._auto_discover_panels: + await self._refresh_panels(subscribe_new=subscribe_new) + + async def _refresh_sessions_directory(self, subscribe_new: bool) -> None: + try: + response = await self._list_sessions() + except Exception as e: + logger.warning(f"Moltchat listSessions failed: {e}") + return + + sessions = response.get("sessions") + if not isinstance(sessions, list): + return + + new_sessions: list[str] = [] + for session in sessions: + if not isinstance(session, dict): + continue + + session_id = str(session.get("sessionId") or "").strip() + if not session_id: + continue + + if session_id not in self._session_set: + self._session_set.add(session_id) + new_sessions.append(session_id) + if session_id not in self._session_cursor: + self._cold_sessions.add(session_id) + + converse_id = str(session.get("converseId") or "").strip() + if converse_id: + self._session_by_converse[converse_id] = session_id + + if not new_sessions: + return + + if self._ws_ready and subscribe_new: + await self._subscribe_sessions(new_sessions) + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _refresh_panels(self, subscribe_new: bool) -> None: + try: + response = await self._get_workspace_group() + except Exception as e: + logger.warning(f"Moltchat getWorkspaceGroup failed: {e}") + return + + raw_panels = response.get("panels") + if not isinstance(raw_panels, list): + return + + new_panels: list[str] = [] + for panel in raw_panels: + if not isinstance(panel, dict): + continue + + panel_type = panel.get("type") + if isinstance(panel_type, int) and panel_type != 0: + continue + + panel_id = str(panel.get("id") or panel.get("_id") or "").strip() + if not panel_id: + continue + + if panel_id not in self._panel_set: + self._panel_set.add(panel_id) + new_panels.append(panel_id) + + if not new_panels: + return + + if self._ws_ready and subscribe_new: + await self._subscribe_panels(new_panels) + + if self._fallback_mode: + await self._ensure_fallback_workers() + + async def _ensure_fallback_workers(self) -> None: + if not self._running: + return + + self._fallback_mode = True + + for session_id in sorted(self._session_set): + task = self._session_fallback_tasks.get(session_id) + if task and not task.done(): + continue + self._session_fallback_tasks[session_id] = asyncio.create_task( + self._session_watch_worker(session_id) + ) + + for panel_id in sorted(self._panel_set): + task = self._panel_fallback_tasks.get(panel_id) + if task and not task.done(): + continue + self._panel_fallback_tasks[panel_id] = asyncio.create_task( + self._panel_poll_worker(panel_id) + ) + + async def _stop_fallback_workers(self) -> None: + self._fallback_mode = False + + tasks = [ + *self._session_fallback_tasks.values(), + *self._panel_fallback_tasks.values(), + ] + for task in tasks: + task.cancel() + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + self._session_fallback_tasks.clear() + self._panel_fallback_tasks.clear() + + async def _session_watch_worker(self, session_id: str) -> None: + while self._running and self._fallback_mode: + try: + payload = await self._watch_session( + session_id=session_id, + cursor=self._session_cursor.get(session_id, 0), + timeout_ms=self.config.watch_timeout_ms, + limit=self.config.watch_limit, + ) + await self._handle_watch_payload(payload, target_kind="session") + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Moltchat watch fallback error ({session_id}): {e}") + await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) + + async def _panel_poll_worker(self, panel_id: str) -> None: + sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0) + + while self._running and self._fallback_mode: + try: + response = await self._list_panel_messages( + panel_id=panel_id, + limit=min(100, max(1, self.config.watch_limit)), + ) + + raw_messages = response.get("messages") + if isinstance(raw_messages, list): + for message in reversed(raw_messages): + if not isinstance(message, dict): + continue + + synthetic_event = { + "type": "message.add", + "timestamp": message.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": str(message.get("messageId") or ""), + "author": str(message.get("author") or ""), + "authorInfo": message.get("authorInfo") if isinstance(message.get("authorInfo"), dict) else {}, + "content": message.get("content"), + "meta": message.get("meta") if isinstance(message.get("meta"), dict) else {}, + "groupId": str(response.get("groupId") or ""), + "converseId": panel_id, + }, + } + await self._process_inbound_event( + target_id=panel_id, + event=synthetic_event, + target_kind="panel", + ) + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Moltchat panel polling error ({panel_id}): {e}") + + await asyncio.sleep(sleep_s) + + async def _handle_watch_payload( + self, + payload: dict[str, Any], + target_kind: str, + ) -> None: + if not isinstance(payload, dict): + return + + target_id = str(payload.get("sessionId") or "").strip() + if not target_id: + return + + lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock()) + async with lock: + previous_cursor = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 + payload_cursor = payload.get("cursor") + if ( + target_kind == "session" + and isinstance(payload_cursor, int) + and payload_cursor >= 0 + ): + self._mark_session_cursor(target_id, payload_cursor) + + raw_events = payload.get("events") + if not isinstance(raw_events, list): + return + + if target_kind == "session" and target_id in self._cold_sessions: + self._cold_sessions.discard(target_id) + return + + for event in raw_events: + if not isinstance(event, dict): + continue + seq = event.get("seq") + if ( + target_kind == "session" + and isinstance(seq, int) + and seq > self._session_cursor.get(target_id, previous_cursor) + ): + self._mark_session_cursor(target_id, seq) + + if event.get("type") != "message.add": + continue + + await self._process_inbound_event( + target_id=target_id, + event=event, + target_kind=target_kind, + ) + + async def _process_inbound_event( + self, + target_id: str, + event: dict[str, Any], + target_kind: str, + ) -> None: + payload = event.get("payload") + if not isinstance(payload, dict): + return + + author = str(payload.get("author") or "").strip() + if not author: + return + + if self.config.agent_user_id and author == self.config.agent_user_id: + return + + if not self.is_allowed(author): + return + + message_id = str(payload.get("messageId") or "").strip() + seen_key = f"{target_kind}:{target_id}" + if message_id and self._remember_message_id(seen_key, message_id): + return + + raw_body = normalize_moltchat_content(payload.get("content")) + if not raw_body: + raw_body = "[empty message]" + + author_info = payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {} + sender_name = str(author_info.get("nickname") or author_info.get("email") or "").strip() + sender_username = str(author_info.get("agentId") or "").strip() + + group_id = str(payload.get("groupId") or "").strip() + is_group = bool(group_id) + was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id) + + require_mention = ( + target_kind == "panel" + and is_group + and resolve_require_mention(self.config, target_id, group_id) + ) + + use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention" + + if require_mention and not was_mentioned and not use_delay: + return + + entry = MoltchatBufferedEntry( + raw_body=raw_body, + author=author, + sender_name=sender_name, + sender_username=sender_username, + timestamp=parse_timestamp(event.get("timestamp")), + message_id=message_id, + group_id=group_id, + ) + + if use_delay: + delay_key = f"{target_kind}:{target_id}" + if was_mentioned: + await self._flush_delayed_entries( + key=delay_key, + target_id=target_id, + target_kind=target_kind, + reason="mention", + entry=entry, + ) + else: + await self._enqueue_delayed_entry( + key=delay_key, + target_id=target_id, + target_kind=target_kind, + entry=entry, + ) + return + + await self._dispatch_entries( + target_id=target_id, + target_kind=target_kind, + entries=[entry], + was_mentioned=was_mentioned, + ) + + def _remember_message_id(self, key: str, message_id: str) -> bool: + seen_set = self._seen_set.setdefault(key, set()) + seen_queue = self._seen_queue.setdefault(key, deque()) + + if message_id in seen_set: + return True + + seen_set.add(message_id) + seen_queue.append(message_id) + + while len(seen_queue) > MAX_SEEN_MESSAGE_IDS: + removed = seen_queue.popleft() + seen_set.discard(removed) + + return False + + async def _enqueue_delayed_entry( + self, + key: str, + target_id: str, + target_kind: str, + entry: MoltchatBufferedEntry, + ) -> None: + state = self._delay_states.setdefault(key, DelayState()) + + async with state.lock: + state.entries.append(entry) + if state.timer: + state.timer.cancel() + + state.timer = asyncio.create_task( + self._delay_flush_after(key, target_id, target_kind) + ) + + async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None: + await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0) + await self._flush_delayed_entries( + key=key, + target_id=target_id, + target_kind=target_kind, + reason="timer", + entry=None, + ) + + async def _flush_delayed_entries( + self, + key: str, + target_id: str, + target_kind: str, + reason: str, + entry: MoltchatBufferedEntry | None, + ) -> None: + state = self._delay_states.setdefault(key, DelayState()) + + async with state.lock: + if entry: + state.entries.append(entry) + + current = asyncio.current_task() + if state.timer and state.timer is not current: + state.timer.cancel() + state.timer = None + elif state.timer is current: + state.timer = None + + entries = state.entries[:] + state.entries.clear() + + if not entries: + return + + await self._dispatch_entries( + target_id=target_id, + target_kind=target_kind, + entries=entries, + was_mentioned=(reason == "mention"), + ) + + async def _dispatch_entries( + self, + target_id: str, + target_kind: str, + entries: list[MoltchatBufferedEntry], + was_mentioned: bool, + ) -> None: + if not entries: + return + + is_group = bool(entries[-1].group_id) + body = build_buffered_body(entries, is_group) + if not body: + body = "[empty message]" + + last = entries[-1] + metadata = { + "message_id": last.message_id, + "timestamp": last.timestamp, + "is_group": is_group, + "group_id": last.group_id, + "sender_name": last.sender_name, + "sender_username": last.sender_username, + "target_kind": target_kind, + "was_mentioned": was_mentioned, + "buffered_count": len(entries), + } + + await self._handle_message( + sender_id=last.author, + chat_id=target_id, + content=body, + metadata=metadata, + ) + + async def _cancel_delay_timers(self) -> None: + for state in self._delay_states.values(): + if state.timer: + state.timer.cancel() + state.timer = None + self._delay_states.clear() + + async def _handle_notify_chat_message(self, payload: Any) -> None: + if not isinstance(payload, dict): + return + + group_id = str(payload.get("groupId") or "").strip() + panel_id = str(payload.get("converseId") or payload.get("panelId") or "").strip() + if not group_id or not panel_id: + return + + if self._panel_set and panel_id not in self._panel_set: + return + + synthetic_event = { + "type": "message.add", + "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": str(payload.get("_id") or payload.get("messageId") or ""), + "author": str(payload.get("author") or ""), + "authorInfo": payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {}, + "content": payload.get("content"), + "meta": payload.get("meta") if isinstance(payload.get("meta"), dict) else {}, + "groupId": group_id, + "converseId": panel_id, + }, + } + await self._process_inbound_event( + target_id=panel_id, + event=synthetic_event, + target_kind="panel", + ) + + async def _handle_notify_inbox_append(self, payload: Any) -> None: + if not isinstance(payload, dict): + return + + if payload.get("type") != "message": + return + + detail = payload.get("payload") + if not isinstance(detail, dict): + return + + group_id = str(detail.get("groupId") or "").strip() + if group_id: + return + + converse_id = str(detail.get("converseId") or "").strip() + if not converse_id: + return + + session_id = self._session_by_converse.get(converse_id) + if not session_id: + await self._refresh_sessions_directory(subscribe_new=self._ws_ready) + session_id = self._session_by_converse.get(converse_id) + if not session_id: + return + + message_id = str(detail.get("messageId") or payload.get("_id") or "").strip() + author = str(detail.get("messageAuthor") or "").strip() + content = str(detail.get("messagePlainContent") or detail.get("messageSnippet") or "").strip() + + synthetic_event = { + "type": "message.add", + "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), + "payload": { + "messageId": message_id, + "author": author, + "content": content, + "meta": { + "source": "notify:chat.inbox.append", + "converseId": converse_id, + }, + "converseId": converse_id, + }, + } + + await self._process_inbound_event( + target_id=session_id, + event=synthetic_event, + target_kind="session", + ) + + def _mark_session_cursor(self, session_id: str, cursor: int) -> None: + if cursor < 0: + return + + previous = self._session_cursor.get(session_id, 0) + if cursor < previous: + return + + self._session_cursor[session_id] = cursor + self._schedule_cursor_save() + + def _schedule_cursor_save(self) -> None: + if self._cursor_save_task and not self._cursor_save_task.done(): + return + + self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) + + async def _save_cursor_debounced(self) -> None: + await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S) + await self._save_session_cursors() + + async def _load_session_cursors(self) -> None: + if not self._cursor_path.exists(): + return + + try: + data = json.loads(self._cursor_path.read_text("utf-8")) + except Exception as e: + logger.warning(f"Failed to read Moltchat cursor file: {e}") + return + + cursors = data.get("cursors") if isinstance(data, dict) else None + if not isinstance(cursors, dict): + return + + for session_id, cursor in cursors.items(): + if isinstance(session_id, str) and isinstance(cursor, int) and cursor >= 0: + self._session_cursor[session_id] = cursor + + async def _save_session_cursors(self) -> None: + payload = { + "schemaVersion": 1, + "updatedAt": datetime.utcnow().isoformat(), + "cursors": self._session_cursor, + } + + try: + self._state_dir.mkdir(parents=True, exist_ok=True) + self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8") + except Exception as e: + logger.warning(f"Failed to save Moltchat cursor file: {e}") + + def _base_url(self) -> str: + return self.config.base_url.strip().rstrip("/") + + async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: + if not self._http: + raise RuntimeError("Moltchat HTTP client not initialized") + + url = f"{self._base_url()}{path}" + response = await self._http.post( + url, + headers={ + "Content-Type": "application/json", + "X-Claw-Token": self.config.claw_token, + }, + json=payload, + ) + + text = response.text + if not response.is_success: + raise RuntimeError(f"Moltchat HTTP {response.status_code}: {text[:200]}") + + parsed: Any + try: + parsed = response.json() + except Exception: + parsed = text + + if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): + if parsed["code"] != 200: + message = str(parsed.get("message") or parsed.get("name") or "request failed") + raise RuntimeError(f"Moltchat API error: {message} (code={parsed['code']})") + data = parsed.get("data") + return data if isinstance(data, dict) else {} + + if isinstance(parsed, dict): + return parsed + + return {} + + async def _watch_session( + self, + session_id: str, + cursor: int, + timeout_ms: int, + limit: int, + ) -> dict[str, Any]: + return await self._post_json( + "/api/claw/sessions/watch", + { + "sessionId": session_id, + "cursor": cursor, + "timeoutMs": timeout_ms, + "limit": limit, + }, + ) + + async def _send_session_message( + self, + session_id: str, + content: str, + reply_to: str | None, + ) -> dict[str, Any]: + payload = { + "sessionId": session_id, + "content": content, + } + if reply_to: + payload["replyTo"] = reply_to + return await self._post_json("/api/claw/sessions/send", payload) + + async def _send_panel_message( + self, + panel_id: str, + content: str, + reply_to: str | None, + group_id: str | None, + ) -> dict[str, Any]: + payload = { + "panelId": panel_id, + "content": content, + } + if reply_to: + payload["replyTo"] = reply_to + if group_id: + payload["groupId"] = group_id + return await self._post_json("/api/claw/groups/panels/send", payload) + + async def _list_sessions(self) -> dict[str, Any]: + return await self._post_json("/api/claw/sessions/list", {}) + + async def _get_workspace_group(self) -> dict[str, Any]: + return await self._post_json("/api/claw/groups/get", {}) + + async def _list_panel_messages(self, panel_id: str, limit: int) -> dict[str, Any]: + return await self._post_json( + "/api/claw/groups/panels/messages", + { + "panelId": panel_id, + "limit": limit, + }, + ) + + def _read_group_id(self, metadata: dict[str, Any]) -> str | None: + if not isinstance(metadata, dict): + return None + value = metadata.get("group_id") or metadata.get("groupId") + if isinstance(value, str) and value.strip(): + return value.strip() + return None diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 19e62e9..2039f82 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -366,6 +366,24 @@ def channels_status(): "✓" if dc.enabled else "✗", dc.gateway_url ) + + # Feishu + fs = config.channels.feishu + fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]" + table.add_row( + "Feishu", + "✓" if fs.enabled else "✗", + fs_config + ) + + # Moltchat + mc = config.channels.moltchat + mc_base = mc.base_url or "[dim]not configured[/dim]" + table.add_row( + "Moltchat", + "✓" if mc.enabled else "✗", + mc_base + ) # Telegram tg = config.channels.telegram diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 7724288..4df4251 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -39,12 +39,49 @@ class DiscordConfig(BaseModel): intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT +class MoltchatMentionConfig(BaseModel): + """Moltchat mention behavior configuration.""" + require_in_groups: bool = False + + +class MoltchatGroupRule(BaseModel): + """Moltchat per-group mention requirement.""" + require_mention: bool = False + + +class MoltchatConfig(BaseModel): + """Moltchat channel configuration.""" + enabled: bool = False + base_url: str = "http://localhost:11000" + socket_url: str = "" + socket_path: str = "/socket.io" + socket_disable_msgpack: bool = False + socket_reconnect_delay_ms: int = 1000 + socket_max_reconnect_delay_ms: int = 10000 + socket_connect_timeout_ms: int = 10000 + refresh_interval_ms: int = 30000 + watch_timeout_ms: int = 25000 + watch_limit: int = 100 + retry_delay_ms: int = 500 + max_retry_attempts: int = 0 # 0 means unlimited retries + claw_token: str = "" + agent_user_id: str = "" + sessions: list[str] = Field(default_factory=list) + panels: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=list) + mention: MoltchatMentionConfig = Field(default_factory=MoltchatMentionConfig) + groups: dict[str, MoltchatGroupRule] = Field(default_factory=dict) + reply_delay_mode: str = "non-mention" # off | non-mention + reply_delay_ms: int = 120000 + + class ChannelsConfig(BaseModel): """Configuration for chat channels.""" whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) feishu: FeishuConfig = Field(default_factory=FeishuConfig) + moltchat: MoltchatConfig = Field(default_factory=MoltchatConfig) class AgentDefaults(BaseModel): diff --git a/pyproject.toml b/pyproject.toml index 2a952a1..81d38b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "croniter>=2.0.0", "python-telegram-bot>=21.0", "lark-oapi>=1.0.0", + "python-socketio>=5.11.0", + "msgpack>=1.0.8", ] [project.optional-dependencies] diff --git a/tests/test_moltchat_channel.py b/tests/test_moltchat_channel.py new file mode 100644 index 0000000..1f65a68 --- /dev/null +++ b/tests/test_moltchat_channel.py @@ -0,0 +1,115 @@ +import pytest + +from nanobot.bus.queue import MessageBus +from nanobot.channels.moltchat import ( + MoltchatBufferedEntry, + MoltchatChannel, + build_buffered_body, + resolve_moltchat_target, + resolve_require_mention, + resolve_was_mentioned, +) +from nanobot.config.schema import MoltchatConfig, MoltchatGroupRule, MoltchatMentionConfig + + +def test_resolve_moltchat_target_prefixes() -> None: + t = resolve_moltchat_target("panel:abc") + assert t.id == "abc" + assert t.is_panel is True + + t = resolve_moltchat_target("session_123") + assert t.id == "session_123" + assert t.is_panel is False + + t = resolve_moltchat_target("mochat:session_456") + assert t.id == "session_456" + assert t.is_panel is False + + +def test_resolve_was_mentioned_from_meta_and_text() -> None: + payload = { + "content": "hello", + "meta": { + "mentionIds": ["bot-1"], + }, + } + assert resolve_was_mentioned(payload, "bot-1") is True + + payload = {"content": "ping <@bot-2>", "meta": {}} + assert resolve_was_mentioned(payload, "bot-2") is True + + +def test_resolve_require_mention_priority() -> None: + cfg = MoltchatConfig( + groups={ + "*": MoltchatGroupRule(require_mention=False), + "group-a": MoltchatGroupRule(require_mention=True), + }, + mention=MoltchatMentionConfig(require_in_groups=False), + ) + + assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-a") is True + assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-b") is False + + +@pytest.mark.asyncio +async def test_delay_buffer_flushes_on_mention() -> None: + bus = MessageBus() + cfg = MoltchatConfig( + enabled=True, + claw_token="token", + agent_user_id="bot", + reply_delay_mode="non-mention", + reply_delay_ms=60_000, + ) + channel = MoltchatChannel(cfg, bus) + + first = { + "type": "message.add", + "timestamp": "2026-02-07T10:00:00Z", + "payload": { + "messageId": "m1", + "author": "user1", + "content": "first", + "groupId": "group-1", + "meta": {}, + }, + } + second = { + "type": "message.add", + "timestamp": "2026-02-07T10:00:01Z", + "payload": { + "messageId": "m2", + "author": "user2", + "content": "hello <@bot>", + "groupId": "group-1", + "meta": {}, + }, + } + + await channel._process_inbound_event(target_id="panel-1", event=first, target_kind="panel") + assert bus.inbound_size == 0 + + await channel._process_inbound_event(target_id="panel-1", event=second, target_kind="panel") + assert bus.inbound_size == 1 + + msg = await bus.consume_inbound() + assert msg.channel == "moltchat" + assert msg.chat_id == "panel-1" + assert "user1: first" in msg.content + assert "user2: hello <@bot>" in msg.content + assert msg.metadata.get("buffered_count") == 2 + + await channel._cancel_delay_timers() + + +def test_build_buffered_body_group_labels() -> None: + body = build_buffered_body( + entries=[ + MoltchatBufferedEntry(raw_body="a", author="u1", sender_name="Alice"), + MoltchatBufferedEntry(raw_body="b", author="u2", sender_username="bot"), + ], + is_group=True, + ) + assert "Alice: a" in body + assert "bot: b" in body From 377922591788ae7c1fb84e83b9ba1a3e29fd893f Mon Sep 17 00:00:00 2001 From: tjb-tech Date: Mon, 9 Feb 2026 08:50:17 +0000 Subject: [PATCH 2/4] refactor(channels): rename moltchat integration to mochat --- README.md | 12 +-- nanobot/channels/__init__.py | 4 +- nanobot/channels/manager.py | 14 +-- nanobot/channels/{moltchat.py => mochat.py} | 94 +++++++++---------- nanobot/cli/commands.py | 6 +- nanobot/config/schema.py | 18 ++-- ...chat_channel.py => test_mochat_channel.py} | 36 +++---- 7 files changed, 92 insertions(+), 92 deletions(-) rename nanobot/channels/{moltchat.py => mochat.py} (92%) rename tests/{test_moltchat_channel.py => test_mochat_channel.py} (73%) diff --git a/README.md b/README.md index 74c24d9..55dc7fa 100644 --- a/README.md +++ b/README.md @@ -164,7 +164,7 @@ nanobot agent -m "Hello from my local LLM!" ## 💬 Chat Apps -Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Moltchat — anytime, anywhere. +Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Mochat — anytime, anywhere. | Channel | Setup | |---------|-------| @@ -172,7 +172,7 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, or Moltchat | **Discord** | Easy (bot token + intents) | | **WhatsApp** | Medium (scan QR) | | **Feishu** | Medium (app credentials) | -| **Moltchat** | Medium (claw token + websocket) | +| **Mochat** | Medium (claw token + websocket) |
Telegram (Recommended) @@ -207,7 +207,7 @@ nanobot gateway
-Moltchat (Claw IM) +Mochat (Claw IM) Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. @@ -221,7 +221,7 @@ Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. ```json { "channels": { - "moltchat": { + "mochat": { "enabled": true, "baseUrl": "https://mochat.io", "socketUrl": "https://mochat.io", @@ -244,7 +244,7 @@ nanobot gateway ``` > [!TIP] -> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Moltchat API endpoint. +> Keep `clawToken` private. It should only be sent in `X-Claw-Token` header to your Mochat API endpoint.
@@ -456,7 +456,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard # Edit config on host to add API keys vim ~/.nanobot/config.json -# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Moltchat) +# Run gateway (connects to enabled channels, e.g. Telegram/Discord/Mochat) docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway # Or run a single command diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py index 4d77063..034d401 100644 --- a/nanobot/channels/__init__.py +++ b/nanobot/channels/__init__.py @@ -2,6 +2,6 @@ from nanobot.channels.base import BaseChannel from nanobot.channels.manager import ChannelManager -from nanobot.channels.moltchat import MoltchatChannel +from nanobot.channels.mochat import MochatChannel -__all__ = ["BaseChannel", "ChannelManager", "MoltchatChannel"] +__all__ = ["BaseChannel", "ChannelManager", "MochatChannel"] diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 11690ef..64214ce 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -78,17 +78,17 @@ class ChannelManager: except ImportError as e: logger.warning(f"Feishu channel not available: {e}") - # Moltchat channel - if self.config.channels.moltchat.enabled: + # Mochat channel + if self.config.channels.mochat.enabled: try: - from nanobot.channels.moltchat import MoltchatChannel + from nanobot.channels.mochat import MochatChannel - self.channels["moltchat"] = MoltchatChannel( - self.config.channels.moltchat, self.bus + self.channels["mochat"] = MochatChannel( + self.config.channels.mochat, self.bus ) - logger.info("Moltchat channel enabled") + logger.info("Mochat channel enabled") except ImportError as e: - logger.warning(f"Moltchat channel not available: {e}") + logger.warning(f"Mochat channel not available: {e}") async def start_all(self) -> None: """Start WhatsApp channel and the outbound dispatcher.""" diff --git a/nanobot/channels/moltchat.py b/nanobot/channels/mochat.py similarity index 92% rename from nanobot/channels/moltchat.py rename to nanobot/channels/mochat.py index cc590d4..6569cdd 100644 --- a/nanobot/channels/moltchat.py +++ b/nanobot/channels/mochat.py @@ -1,4 +1,4 @@ -"""Moltchat channel implementation using Socket.IO with HTTP polling fallback.""" +"""Mochat channel implementation using Socket.IO with HTTP polling fallback.""" from __future__ import annotations @@ -15,7 +15,7 @@ from loguru import logger from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel -from nanobot.config.schema import MoltchatConfig +from nanobot.config.schema import MochatConfig from nanobot.utils.helpers import get_data_path try: @@ -39,7 +39,7 @@ CURSOR_SAVE_DEBOUNCE_S = 0.5 @dataclass -class MoltchatBufferedEntry: +class MochatBufferedEntry: """Buffered inbound entry for delayed dispatch.""" raw_body: str @@ -55,20 +55,20 @@ class MoltchatBufferedEntry: class DelayState: """Per-target delayed message state.""" - entries: list[MoltchatBufferedEntry] = field(default_factory=list) + entries: list[MochatBufferedEntry] = field(default_factory=list) lock: asyncio.Lock = field(default_factory=asyncio.Lock) timer: asyncio.Task | None = None @dataclass -class MoltchatTarget: +class MochatTarget: """Outbound target resolution result.""" id: str is_panel: bool -def normalize_moltchat_content(content: Any) -> str: +def normalize_mochat_content(content: Any) -> str: """Normalize content payload to text.""" if isinstance(content, str): return content.strip() @@ -80,17 +80,17 @@ def normalize_moltchat_content(content: Any) -> str: return str(content) -def resolve_moltchat_target(raw: str) -> MoltchatTarget: +def resolve_mochat_target(raw: str) -> MochatTarget: """Resolve id and target kind from user-provided target string.""" trimmed = (raw or "").strip() if not trimmed: - return MoltchatTarget(id="", is_panel=False) + return MochatTarget(id="", is_panel=False) lowered = trimmed.lower() cleaned = trimmed forced_panel = False - prefixes = ["moltchat:", "mochat:", "group:", "channel:", "panel:"] + prefixes = ["mochat:", "group:", "channel:", "panel:"] for prefix in prefixes: if lowered.startswith(prefix): cleaned = trimmed[len(prefix) :].strip() @@ -99,9 +99,9 @@ def resolve_moltchat_target(raw: str) -> MoltchatTarget: break if not cleaned: - return MoltchatTarget(id="", is_panel=False) + return MochatTarget(id="", is_panel=False) - return MoltchatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) + return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) def extract_mention_ids(value: Any) -> list[str]: @@ -152,7 +152,7 @@ def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: def resolve_require_mention( - config: MoltchatConfig, + config: MochatConfig, session_id: str, group_id: str, ) -> bool: @@ -167,7 +167,7 @@ def resolve_require_mention( return bool(config.mention.require_in_groups) -def build_buffered_body(entries: list[MoltchatBufferedEntry], is_group: bool) -> str: +def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> str: """Build text body from one or more buffered entries.""" if not entries: return "" @@ -200,20 +200,20 @@ def parse_timestamp(value: Any) -> int | None: return None -class MoltchatChannel(BaseChannel): - """Moltchat channel using socket.io with fallback polling workers.""" +class MochatChannel(BaseChannel): + """Mochat channel using socket.io with fallback polling workers.""" - name = "moltchat" + name = "mochat" - def __init__(self, config: MoltchatConfig, bus: MessageBus): + def __init__(self, config: MochatConfig, bus: MessageBus): super().__init__(config, bus) - self.config: MoltchatConfig = config + self.config: MochatConfig = config self._http: httpx.AsyncClient | None = None self._socket: Any = None self._ws_connected = False self._ws_ready = False - self._state_dir = get_data_path() / "moltchat" + self._state_dir = get_data_path() / "mochat" self._cursor_path = self._state_dir / "session_cursors.json" self._session_cursor: dict[str, int] = {} self._cursor_save_task: asyncio.Task | None = None @@ -239,9 +239,9 @@ class MoltchatChannel(BaseChannel): self._target_locks: dict[str, asyncio.Lock] = {} async def start(self) -> None: - """Start Moltchat channel workers and websocket connection.""" + """Start Mochat channel workers and websocket connection.""" if not self.config.claw_token: - logger.error("Moltchat claw_token not configured") + logger.error("Mochat claw_token not configured") return self._running = True @@ -296,7 +296,7 @@ class MoltchatChannel(BaseChannel): async def send(self, msg: OutboundMessage) -> None: """Send outbound message to session or panel.""" if not self.config.claw_token: - logger.warning("Moltchat claw_token missing, skip send") + logger.warning("Mochat claw_token missing, skip send") return content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else [] @@ -306,9 +306,9 @@ class MoltchatChannel(BaseChannel): if not content: return - target = resolve_moltchat_target(msg.chat_id) + target = resolve_mochat_target(msg.chat_id) if not target.id: - logger.warning("Moltchat outbound target is empty") + logger.warning("Mochat outbound target is empty") return is_panel = target.is_panel or target.id in self._panel_set @@ -330,7 +330,7 @@ class MoltchatChannel(BaseChannel): reply_to=msg.reply_to, ) except Exception as e: - logger.error(f"Failed to send Moltchat message: {e}") + logger.error(f"Failed to send Mochat message: {e}") def _seed_targets_from_config(self) -> None: sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) @@ -351,7 +351,7 @@ class MoltchatChannel(BaseChannel): async def _start_socket_client(self) -> bool: if not SOCKETIO_AVAILABLE: - logger.warning("python-socketio not installed, Moltchat using polling fallback") + logger.warning("python-socketio not installed, Mochat using polling fallback") return False serializer = "default" @@ -385,7 +385,7 @@ class MoltchatChannel(BaseChannel): async def connect() -> None: self._ws_connected = True self._ws_ready = False - logger.info("Moltchat websocket connected") + logger.info("Mochat websocket connected") subscribed = await self._subscribe_all() self._ws_ready = subscribed @@ -400,13 +400,13 @@ class MoltchatChannel(BaseChannel): return self._ws_connected = False self._ws_ready = False - logger.warning("Moltchat websocket disconnected") + logger.warning("Mochat websocket disconnected") await self._ensure_fallback_workers() @client.event async def connect_error(data: Any) -> None: message = str(data) - logger.error(f"Moltchat websocket connect error: {message}") + logger.error(f"Mochat websocket connect error: {message}") @client.on("claw.session.events") async def on_session_events(payload: dict[str, Any]) -> None: @@ -441,7 +441,7 @@ class MoltchatChannel(BaseChannel): ) return True except Exception as e: - logger.error(f"Failed to connect Moltchat websocket: {e}") + logger.error(f"Failed to connect Mochat websocket: {e}") try: await client.disconnect() except Exception: @@ -486,7 +486,7 @@ class MoltchatChannel(BaseChannel): }, ) if not ack.get("result"): - logger.error(f"Moltchat subscribeSessions failed: {ack.get('message', 'unknown error')}") + logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") return False data = ack.get("data") @@ -516,7 +516,7 @@ class MoltchatChannel(BaseChannel): }, ) if not ack.get("result"): - logger.error(f"Moltchat subscribePanels failed: {ack.get('message', 'unknown error')}") + logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") return False return True @@ -544,7 +544,7 @@ class MoltchatChannel(BaseChannel): try: await self._refresh_targets(subscribe_new=self._ws_ready) except Exception as e: - logger.warning(f"Moltchat refresh failed: {e}") + logger.warning(f"Mochat refresh failed: {e}") if self._fallback_mode: await self._ensure_fallback_workers() @@ -560,7 +560,7 @@ class MoltchatChannel(BaseChannel): try: response = await self._list_sessions() except Exception as e: - logger.warning(f"Moltchat listSessions failed: {e}") + logger.warning(f"Mochat listSessions failed: {e}") return sessions = response.get("sessions") @@ -599,7 +599,7 @@ class MoltchatChannel(BaseChannel): try: response = await self._get_workspace_group() except Exception as e: - logger.warning(f"Moltchat getWorkspaceGroup failed: {e}") + logger.warning(f"Mochat getWorkspaceGroup failed: {e}") return raw_panels = response.get("panels") @@ -683,7 +683,7 @@ class MoltchatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Moltchat watch fallback error ({session_id}): {e}") + logger.warning(f"Mochat watch fallback error ({session_id}): {e}") await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0)) async def _panel_poll_worker(self, panel_id: str) -> None: @@ -723,7 +723,7 @@ class MoltchatChannel(BaseChannel): except asyncio.CancelledError: break except Exception as e: - logger.warning(f"Moltchat panel polling error ({panel_id}): {e}") + logger.warning(f"Mochat panel polling error ({panel_id}): {e}") await asyncio.sleep(sleep_s) @@ -803,7 +803,7 @@ class MoltchatChannel(BaseChannel): if message_id and self._remember_message_id(seen_key, message_id): return - raw_body = normalize_moltchat_content(payload.get("content")) + raw_body = normalize_mochat_content(payload.get("content")) if not raw_body: raw_body = "[empty message]" @@ -826,7 +826,7 @@ class MoltchatChannel(BaseChannel): if require_mention and not was_mentioned and not use_delay: return - entry = MoltchatBufferedEntry( + entry = MochatBufferedEntry( raw_body=raw_body, author=author, sender_name=sender_name, @@ -883,7 +883,7 @@ class MoltchatChannel(BaseChannel): key: str, target_id: str, target_kind: str, - entry: MoltchatBufferedEntry, + entry: MochatBufferedEntry, ) -> None: state = self._delay_states.setdefault(key, DelayState()) @@ -912,7 +912,7 @@ class MoltchatChannel(BaseChannel): target_id: str, target_kind: str, reason: str, - entry: MoltchatBufferedEntry | None, + entry: MochatBufferedEntry | None, ) -> None: state = self._delay_states.setdefault(key, DelayState()) @@ -944,7 +944,7 @@ class MoltchatChannel(BaseChannel): self, target_id: str, target_kind: str, - entries: list[MoltchatBufferedEntry], + entries: list[MochatBufferedEntry], was_mentioned: bool, ) -> None: if not entries: @@ -1092,7 +1092,7 @@ class MoltchatChannel(BaseChannel): try: data = json.loads(self._cursor_path.read_text("utf-8")) except Exception as e: - logger.warning(f"Failed to read Moltchat cursor file: {e}") + logger.warning(f"Failed to read Mochat cursor file: {e}") return cursors = data.get("cursors") if isinstance(data, dict) else None @@ -1114,14 +1114,14 @@ class MoltchatChannel(BaseChannel): self._state_dir.mkdir(parents=True, exist_ok=True) self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8") except Exception as e: - logger.warning(f"Failed to save Moltchat cursor file: {e}") + logger.warning(f"Failed to save Mochat cursor file: {e}") def _base_url(self) -> str: return self.config.base_url.strip().rstrip("/") async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: if not self._http: - raise RuntimeError("Moltchat HTTP client not initialized") + raise RuntimeError("Mochat HTTP client not initialized") url = f"{self._base_url()}{path}" response = await self._http.post( @@ -1135,7 +1135,7 @@ class MoltchatChannel(BaseChannel): text = response.text if not response.is_success: - raise RuntimeError(f"Moltchat HTTP {response.status_code}: {text[:200]}") + raise RuntimeError(f"Mochat HTTP {response.status_code}: {text[:200]}") parsed: Any try: @@ -1146,7 +1146,7 @@ class MoltchatChannel(BaseChannel): if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): if parsed["code"] != 200: message = str(parsed.get("message") or parsed.get("name") or "request failed") - raise RuntimeError(f"Moltchat API error: {message} (code={parsed['code']})") + raise RuntimeError(f"Mochat API error: {message} (code={parsed['code']})") data = parsed.get("data") return data if isinstance(data, dict) else {} diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 2039f82..3094aa1 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -376,11 +376,11 @@ def channels_status(): fs_config ) - # Moltchat - mc = config.channels.moltchat + # Mochat + mc = config.channels.mochat mc_base = mc.base_url or "[dim]not configured[/dim]" table.add_row( - "Moltchat", + "Mochat", "✓" if mc.enabled else "✗", mc_base ) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 4df4251..1d6ca9e 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -39,18 +39,18 @@ class DiscordConfig(BaseModel): intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT -class MoltchatMentionConfig(BaseModel): - """Moltchat mention behavior configuration.""" +class MochatMentionConfig(BaseModel): + """Mochat mention behavior configuration.""" require_in_groups: bool = False -class MoltchatGroupRule(BaseModel): - """Moltchat per-group mention requirement.""" +class MochatGroupRule(BaseModel): + """Mochat per-group mention requirement.""" require_mention: bool = False -class MoltchatConfig(BaseModel): - """Moltchat channel configuration.""" +class MochatConfig(BaseModel): + """Mochat channel configuration.""" enabled: bool = False base_url: str = "http://localhost:11000" socket_url: str = "" @@ -69,8 +69,8 @@ class MoltchatConfig(BaseModel): sessions: list[str] = Field(default_factory=list) panels: list[str] = Field(default_factory=list) allow_from: list[str] = Field(default_factory=list) - mention: MoltchatMentionConfig = Field(default_factory=MoltchatMentionConfig) - groups: dict[str, MoltchatGroupRule] = Field(default_factory=dict) + mention: MochatMentionConfig = Field(default_factory=MochatMentionConfig) + groups: dict[str, MochatGroupRule] = Field(default_factory=dict) reply_delay_mode: str = "non-mention" # off | non-mention reply_delay_ms: int = 120000 @@ -81,7 +81,7 @@ class ChannelsConfig(BaseModel): telegram: TelegramConfig = Field(default_factory=TelegramConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig) feishu: FeishuConfig = Field(default_factory=FeishuConfig) - moltchat: MoltchatConfig = Field(default_factory=MoltchatConfig) + mochat: MochatConfig = Field(default_factory=MochatConfig) class AgentDefaults(BaseModel): diff --git a/tests/test_moltchat_channel.py b/tests/test_mochat_channel.py similarity index 73% rename from tests/test_moltchat_channel.py rename to tests/test_mochat_channel.py index 1f65a68..4d73840 100644 --- a/tests/test_moltchat_channel.py +++ b/tests/test_mochat_channel.py @@ -1,27 +1,27 @@ import pytest from nanobot.bus.queue import MessageBus -from nanobot.channels.moltchat import ( - MoltchatBufferedEntry, - MoltchatChannel, +from nanobot.channels.mochat import ( + MochatBufferedEntry, + MochatChannel, build_buffered_body, - resolve_moltchat_target, + resolve_mochat_target, resolve_require_mention, resolve_was_mentioned, ) -from nanobot.config.schema import MoltchatConfig, MoltchatGroupRule, MoltchatMentionConfig +from nanobot.config.schema import MochatConfig, MochatGroupRule, MochatMentionConfig -def test_resolve_moltchat_target_prefixes() -> None: - t = resolve_moltchat_target("panel:abc") +def test_resolve_mochat_target_prefixes() -> None: + t = resolve_mochat_target("panel:abc") assert t.id == "abc" assert t.is_panel is True - t = resolve_moltchat_target("session_123") + t = resolve_mochat_target("session_123") assert t.id == "session_123" assert t.is_panel is False - t = resolve_moltchat_target("mochat:session_456") + t = resolve_mochat_target("mochat:session_456") assert t.id == "session_456" assert t.is_panel is False @@ -40,12 +40,12 @@ def test_resolve_was_mentioned_from_meta_and_text() -> None: def test_resolve_require_mention_priority() -> None: - cfg = MoltchatConfig( + cfg = MochatConfig( groups={ - "*": MoltchatGroupRule(require_mention=False), - "group-a": MoltchatGroupRule(require_mention=True), + "*": MochatGroupRule(require_mention=False), + "group-a": MochatGroupRule(require_mention=True), }, - mention=MoltchatMentionConfig(require_in_groups=False), + mention=MochatMentionConfig(require_in_groups=False), ) assert resolve_require_mention(cfg, session_id="panel-x", group_id="group-a") is True @@ -55,14 +55,14 @@ def test_resolve_require_mention_priority() -> None: @pytest.mark.asyncio async def test_delay_buffer_flushes_on_mention() -> None: bus = MessageBus() - cfg = MoltchatConfig( + cfg = MochatConfig( enabled=True, claw_token="token", agent_user_id="bot", reply_delay_mode="non-mention", reply_delay_ms=60_000, ) - channel = MoltchatChannel(cfg, bus) + channel = MochatChannel(cfg, bus) first = { "type": "message.add", @@ -94,7 +94,7 @@ async def test_delay_buffer_flushes_on_mention() -> None: assert bus.inbound_size == 1 msg = await bus.consume_inbound() - assert msg.channel == "moltchat" + assert msg.channel == "mochat" assert msg.chat_id == "panel-1" assert "user1: first" in msg.content assert "user2: hello <@bot>" in msg.content @@ -106,8 +106,8 @@ async def test_delay_buffer_flushes_on_mention() -> None: def test_build_buffered_body_group_labels() -> None: body = build_buffered_body( entries=[ - MoltchatBufferedEntry(raw_body="a", author="u1", sender_name="Alice"), - MoltchatBufferedEntry(raw_body="b", author="u2", sender_username="bot"), + MochatBufferedEntry(raw_body="a", author="u1", sender_name="Alice"), + MochatBufferedEntry(raw_body="b", author="u2", sender_username="bot"), ], is_group=True, ) From 866942eedd02a3fc85ae4e0393c450a2394b8922 Mon Sep 17 00:00:00 2001 From: tjb-tech Date: Mon, 9 Feb 2026 09:12:53 +0000 Subject: [PATCH 3/4] fix: update agentUserId in README and change base_url to HTTPS in configuration --- README.md | 2 +- nanobot/config/schema.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d15fd2f..7bf98fd 100644 --- a/README.md +++ b/README.md @@ -231,7 +231,7 @@ Uses **Socket.IO WebSocket** by default, with HTTP polling fallback. "socketUrl": "https://mochat.io", "socketPath": "/socket.io", "clawToken": "claw_xxx", - "agentUserId": "69820107a785110aea8b1069", + "agentUserId": "6982abcdef", "sessions": ["*"], "panels": ["*"], "replyDelayMode": "non-mention", diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index a3d8aa5..26abcd7 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -90,7 +90,7 @@ class MochatGroupRule(BaseModel): class MochatConfig(BaseModel): """Mochat channel configuration.""" enabled: bool = False - base_url: str = "http://localhost:11000" + base_url: str = "https://mochat.io" socket_url: str = "" socket_path: str = "/socket.io" socket_disable_msgpack: bool = False From ba2bdb080de75079b5457fb98bd39aef3797b13b Mon Sep 17 00:00:00 2001 From: Re-bin Date: Tue, 10 Feb 2026 07:06:04 +0000 Subject: [PATCH 4/4] refactor: streamline mochat channel --- nanobot/channels/__init__.py | 3 +- nanobot/channels/mochat.py | 920 +++++++++++------------------------ 2 files changed, 295 insertions(+), 628 deletions(-) diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py index 034d401..588169d 100644 --- a/nanobot/channels/__init__.py +++ b/nanobot/channels/__init__.py @@ -2,6 +2,5 @@ from nanobot.channels.base import BaseChannel from nanobot.channels.manager import ChannelManager -from nanobot.channels.mochat import MochatChannel -__all__ = ["BaseChannel", "ChannelManager", "MochatChannel"] +__all__ = ["BaseChannel", "ChannelManager"] diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 6569cdd..30c3dbf 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -20,7 +20,6 @@ from nanobot.utils.helpers import get_data_path try: import socketio - SOCKETIO_AVAILABLE = True except ImportError: socketio = None @@ -28,20 +27,21 @@ except ImportError: try: import msgpack # noqa: F401 - MSGPACK_AVAILABLE = True except ImportError: MSGPACK_AVAILABLE = False - MAX_SEEN_MESSAGE_IDS = 2000 CURSOR_SAVE_DEBOUNCE_S = 0.5 +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + @dataclass class MochatBufferedEntry: """Buffered inbound entry for delayed dispatch.""" - raw_body: str author: str sender_name: str = "" @@ -54,7 +54,6 @@ class MochatBufferedEntry: @dataclass class DelayState: """Per-target delayed message state.""" - entries: list[MochatBufferedEntry] = field(default_factory=list) lock: asyncio.Lock = field(default_factory=asyncio.Lock) timer: asyncio.Task | None = None @@ -63,11 +62,48 @@ class DelayState: @dataclass class MochatTarget: """Outbound target resolution result.""" - id: str is_panel: bool +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + +def _safe_dict(value: Any) -> dict: + """Return *value* if it's a dict, else empty dict.""" + return value if isinstance(value, dict) else {} + + +def _str_field(src: dict, *keys: str) -> str: + """Return the first non-empty str value found for *keys*, stripped.""" + for k in keys: + v = src.get(k) + if isinstance(v, str) and v.strip(): + return v.strip() + return "" + + +def _make_synthetic_event( + message_id: str, author: str, content: Any, + meta: Any, group_id: str, converse_id: str, + timestamp: Any = None, *, author_info: Any = None, +) -> dict[str, Any]: + """Build a synthetic ``message.add`` event dict.""" + payload: dict[str, Any] = { + "messageId": message_id, "author": author, + "content": content, "meta": _safe_dict(meta), + "groupId": group_id, "converseId": converse_id, + } + if author_info is not None: + payload["authorInfo"] = _safe_dict(author_info) + return { + "type": "message.add", + "timestamp": timestamp or datetime.utcnow().isoformat(), + "payload": payload, + } + + def normalize_mochat_content(content: Any) -> str: """Normalize content payload to text.""" if isinstance(content, str): @@ -87,20 +123,15 @@ def resolve_mochat_target(raw: str) -> MochatTarget: return MochatTarget(id="", is_panel=False) lowered = trimmed.lower() - cleaned = trimmed - forced_panel = False - - prefixes = ["mochat:", "group:", "channel:", "panel:"] - for prefix in prefixes: + cleaned, forced_panel = trimmed, False + for prefix in ("mochat:", "group:", "channel:", "panel:"): if lowered.startswith(prefix): - cleaned = trimmed[len(prefix) :].strip() - if prefix in {"group:", "channel:", "panel:"}: - forced_panel = True + cleaned = trimmed[len(prefix):].strip() + forced_panel = prefix in {"group:", "channel:", "panel:"} break if not cleaned: return MochatTarget(id="", is_panel=False) - return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) @@ -108,24 +139,17 @@ def extract_mention_ids(value: Any) -> list[str]: """Extract mention ids from heterogeneous mention payload.""" if not isinstance(value, list): return [] - ids: list[str] = [] for item in value: if isinstance(item, str): - text = item.strip() - if text: - ids.append(text) - continue - - if not isinstance(item, dict): - continue - - for key in ("id", "userId", "_id"): - candidate = item.get(key) - if isinstance(candidate, str) and candidate.strip(): - ids.append(candidate.strip()) - break - + if item.strip(): + ids.append(item.strip()) + elif isinstance(item, dict): + for key in ("id", "userId", "_id"): + candidate = item.get(key) + if isinstance(candidate, str) and candidate.strip(): + ids.append(candidate.strip()) + break return ids @@ -135,35 +159,23 @@ def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: if isinstance(meta, dict): if meta.get("mentioned") is True or meta.get("wasMentioned") is True: return True - - for field in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): - ids = extract_mention_ids(meta.get(field)) - if agent_user_id and agent_user_id in ids: + for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): + if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)): return True - if not agent_user_id: return False - content = payload.get("content") if not isinstance(content, str) or not content: return False - return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content -def resolve_require_mention( - config: MochatConfig, - session_id: str, - group_id: str, -) -> bool: +def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool: """Resolve mention requirement for group/panel conversations.""" groups = config.groups or {} - if group_id and group_id in groups: - return bool(groups[group_id].require_mention) - if session_id in groups: - return bool(groups[session_id].require_mention) - if "*" in groups: - return bool(groups["*"].require_mention) + for key in (group_id, session_id, "*"): + if key and key in groups: + return bool(groups[key].require_mention) return bool(config.mention.require_in_groups) @@ -171,22 +183,18 @@ def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> s """Build text body from one or more buffered entries.""" if not entries: return "" - if len(entries) == 1: return entries[0].raw_body - lines: list[str] = [] for entry in entries: - body = entry.raw_body - if not body: + if not entry.raw_body: continue if is_group: label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author if label: - lines.append(f"{label}: {body}") + lines.append(f"{label}: {entry.raw_body}") continue - lines.append(body) - + lines.append(entry.raw_body) return "\n".join(lines).strip() @@ -200,6 +208,10 @@ def parse_timestamp(value: Any) -> int | None: return None +# --------------------------------------------------------------------------- +# Channel +# --------------------------------------------------------------------------- + class MochatChannel(BaseChannel): """Mochat channel using socket.io with fallback polling workers.""" @@ -210,8 +222,7 @@ class MochatChannel(BaseChannel): self.config: MochatConfig = config self._http: httpx.AsyncClient | None = None self._socket: Any = None - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False self._state_dir = get_data_path() / "mochat" self._cursor_path = self._state_dir / "session_cursors.json" @@ -220,24 +231,23 @@ class MochatChannel(BaseChannel): self._session_set: set[str] = set() self._panel_set: set[str] = set() - self._auto_discover_sessions = False - self._auto_discover_panels = False + self._auto_discover_sessions = self._auto_discover_panels = False self._cold_sessions: set[str] = set() self._session_by_converse: dict[str, str] = {} self._seen_set: dict[str, set[str]] = {} self._seen_queue: dict[str, deque[str]] = {} - self._delay_states: dict[str, DelayState] = {} self._fallback_mode = False self._session_fallback_tasks: dict[str, asyncio.Task] = {} self._panel_fallback_tasks: dict[str, asyncio.Task] = {} self._refresh_task: asyncio.Task | None = None - self._target_locks: dict[str, asyncio.Lock] = {} + # ---- lifecycle --------------------------------------------------------- + async def start(self) -> None: """Start Mochat channel workers and websocket connection.""" if not self.config.claw_token: @@ -246,26 +256,21 @@ class MochatChannel(BaseChannel): self._running = True self._http = httpx.AsyncClient(timeout=30.0) - self._state_dir.mkdir(parents=True, exist_ok=True) await self._load_session_cursors() self._seed_targets_from_config() - await self._refresh_targets(subscribe_new=False) - websocket_started = await self._start_socket_client() - if not websocket_started: + if not await self._start_socket_client(): await self._ensure_fallback_workers() self._refresh_task = asyncio.create_task(self._refresh_loop()) - while self._running: await asyncio.sleep(1) async def stop(self) -> None: """Stop all workers and clean up resources.""" self._running = False - if self._refresh_task: self._refresh_task.cancel() self._refresh_task = None @@ -283,15 +288,12 @@ class MochatChannel(BaseChannel): if self._cursor_save_task: self._cursor_save_task.cancel() self._cursor_save_task = None - await self._save_session_cursors() if self._http: await self._http.aclose() self._http = None - - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False async def send(self, msg: OutboundMessage) -> None: """Send outbound message to session or panel.""" @@ -299,10 +301,10 @@ class MochatChannel(BaseChannel): logger.warning("Mochat claw_token missing, skip send") return - content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else [] + parts = ([msg.content.strip()] if msg.content and msg.content.strip() else []) if msg.media: - content_parts.extend([m for m in msg.media if isinstance(m, str) and m.strip()]) - content = "\n".join(content_parts).strip() + parts.extend(m for m in msg.media if isinstance(m, str) and m.strip()) + content = "\n".join(parts).strip() if not content: return @@ -311,43 +313,34 @@ class MochatChannel(BaseChannel): logger.warning("Mochat outbound target is empty") return - is_panel = target.is_panel or target.id in self._panel_set - if target.id.startswith("session_"): - is_panel = False - + is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_") try: if is_panel: - await self._send_panel_message( - panel_id=target.id, - content=content, - reply_to=msg.reply_to, - group_id=self._read_group_id(msg.metadata), - ) + await self._api_send("/api/claw/groups/panels/send", "panelId", target.id, + content, msg.reply_to, self._read_group_id(msg.metadata)) else: - await self._send_session_message( - session_id=target.id, - content=content, - reply_to=msg.reply_to, - ) + await self._api_send("/api/claw/sessions/send", "sessionId", target.id, + content, msg.reply_to) except Exception as e: logger.error(f"Failed to send Mochat message: {e}") + # ---- config / init helpers --------------------------------------------- + def _seed_targets_from_config(self) -> None: sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels) - self._session_set.update(sessions) self._panel_set.update(panels) + for sid in sessions: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) - for session_id in sessions: - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - def _normalize_id_list(self, values: list[str]) -> tuple[list[str], bool]: + @staticmethod + def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]: cleaned = [str(v).strip() for v in values if str(v).strip()] - has_wildcard = "*" in cleaned - ids = sorted({v for v in cleaned if v != "*"}) - return ids, has_wildcard + return sorted({v for v in cleaned if v != "*"}), "*" in cleaned + + # ---- websocket --------------------------------------------------------- async def _start_socket_client(self) -> bool: if not SOCKETIO_AVAILABLE: @@ -359,83 +352,56 @@ class MochatChannel(BaseChannel): if MSGPACK_AVAILABLE: serializer = "msgpack" else: - logger.warning( - "msgpack is not installed but socket_disable_msgpack=false; " - "trying JSON serializer" - ) - - reconnect_attempts = None - if self.config.max_retry_attempts > 0: - reconnect_attempts = self.config.max_retry_attempts + logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON") client = socketio.AsyncClient( reconnection=True, - reconnection_attempts=reconnect_attempts, + reconnection_attempts=self.config.max_retry_attempts or None, reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0), - reconnection_delay_max=max( - 0.1, - self.config.socket_max_reconnect_delay_ms / 1000.0, - ), - logger=False, - engineio_logger=False, - serializer=serializer, + reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0), + logger=False, engineio_logger=False, serializer=serializer, ) @client.event async def connect() -> None: - self._ws_connected = True - self._ws_ready = False + self._ws_connected, self._ws_ready = True, False logger.info("Mochat websocket connected") - subscribed = await self._subscribe_all() self._ws_ready = subscribed - if subscribed: - await self._stop_fallback_workers() - else: - await self._ensure_fallback_workers() + await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers()) @client.event async def disconnect() -> None: if not self._running: return - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False logger.warning("Mochat websocket disconnected") await self._ensure_fallback_workers() @client.event async def connect_error(data: Any) -> None: - message = str(data) - logger.error(f"Mochat websocket connect error: {message}") + logger.error(f"Mochat websocket connect error: {data}") @client.on("claw.session.events") async def on_session_events(payload: dict[str, Any]) -> None: - await self._handle_watch_payload(payload, target_kind="session") + await self._handle_watch_payload(payload, "session") @client.on("claw.panel.events") async def on_panel_events(payload: dict[str, Any]) -> None: - await self._handle_watch_payload(payload, target_kind="panel") + await self._handle_watch_payload(payload, "panel") - for event_name in ( - "notify:chat.inbox.append", - "notify:chat.message.add", - "notify:chat.message.update", - "notify:chat.message.recall", - "notify:chat.message.delete", - ): - client.on(event_name, self._build_notify_handler(event_name)) + for ev in ("notify:chat.inbox.append", "notify:chat.message.add", + "notify:chat.message.update", "notify:chat.message.recall", + "notify:chat.message.delete"): + client.on(ev, self._build_notify_handler(ev)) socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/") - socket_path = (self.config.socket_path or "/socket.io").strip() - if socket_path.startswith("/"): - socket_path = socket_path[1:] + socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/") try: self._socket = client await client.connect( - socket_url, - transports=["websocket"], - socketio_path=socket_path, + socket_url, transports=["websocket"], socketio_path=socket_path, auth={"token": self.config.claw_token}, wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0), ) @@ -453,38 +419,30 @@ class MochatChannel(BaseChannel): async def handler(payload: Any) -> None: if event_name == "notify:chat.inbox.append": await self._handle_notify_inbox_append(payload) - return - - if event_name.startswith("notify:chat.message."): + elif event_name.startswith("notify:chat.message."): await self._handle_notify_chat_message(payload) - return handler - async def _subscribe_all(self) -> bool: - sessions_ok = await self._subscribe_sessions(sorted(self._session_set)) - panels_ok = await self._subscribe_panels(sorted(self._panel_set)) + # ---- subscribe --------------------------------------------------------- + async def _subscribe_all(self) -> bool: + ok = await self._subscribe_sessions(sorted(self._session_set)) + ok = await self._subscribe_panels(sorted(self._panel_set)) and ok if self._auto_discover_sessions or self._auto_discover_panels: await self._refresh_targets(subscribe_new=True) - - return sessions_ok and panels_ok + return ok async def _subscribe_sessions(self, session_ids: list[str]) -> bool: if not session_ids: return True + for sid in session_ids: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) - for session_id in session_ids: - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - ack = await self._socket_call( - "com.claw.im.subscribeSessions", - { - "sessionIds": session_ids, - "cursors": self._session_cursor, - "limit": self.config.watch_limit, - }, - ) + ack = await self._socket_call("com.claw.im.subscribeSessions", { + "sessionIds": session_ids, "cursors": self._session_cursor, + "limit": self.config.watch_limit, + }) if not ack.get("result"): logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") return False @@ -492,73 +450,57 @@ class MochatChannel(BaseChannel): data = ack.get("data") items: list[dict[str, Any]] = [] if isinstance(data, list): - items = [item for item in data if isinstance(item, dict)] + items = [i for i in data if isinstance(i, dict)] elif isinstance(data, dict): sessions = data.get("sessions") if isinstance(sessions, list): - items = [item for item in sessions if isinstance(item, dict)] + items = [i for i in sessions if isinstance(i, dict)] elif "sessionId" in data: items = [data] - - for payload in items: - await self._handle_watch_payload(payload, target_kind="session") - + for p in items: + await self._handle_watch_payload(p, "session") return True async def _subscribe_panels(self, panel_ids: list[str]) -> bool: if not self._auto_discover_panels and not panel_ids: return True - - ack = await self._socket_call( - "com.claw.im.subscribePanels", - { - "panelIds": panel_ids, - }, - ) + ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) if not ack.get("result"): logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") return False - return True async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]: if not self._socket: return {"result": False, "message": "socket not connected"} - try: raw = await self._socket.call(event_name, payload, timeout=10) except Exception as e: return {"result": False, "message": str(e)} + return raw if isinstance(raw, dict) else {"result": True, "data": raw} - if isinstance(raw, dict): - return raw - - return {"result": True, "data": raw} + # ---- refresh / discovery ----------------------------------------------- async def _refresh_loop(self) -> None: interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0) - while self._running: await asyncio.sleep(interval_s) - try: await self._refresh_targets(subscribe_new=self._ws_ready) except Exception as e: logger.warning(f"Mochat refresh failed: {e}") - if self._fallback_mode: await self._ensure_fallback_workers() async def _refresh_targets(self, subscribe_new: bool) -> None: if self._auto_discover_sessions: - await self._refresh_sessions_directory(subscribe_new=subscribe_new) - + await self._refresh_sessions_directory(subscribe_new) if self._auto_discover_panels: - await self._refresh_panels(subscribe_new=subscribe_new) + await self._refresh_panels(subscribe_new) async def _refresh_sessions_directory(self, subscribe_new: bool) -> None: try: - response = await self._list_sessions() + response = await self._post_json("/api/claw/sessions/list", {}) except Exception as e: logger.warning(f"Mochat listSessions failed: {e}") return @@ -567,37 +509,32 @@ class MochatChannel(BaseChannel): if not isinstance(sessions, list): return - new_sessions: list[str] = [] - for session in sessions: - if not isinstance(session, dict): + new_ids: list[str] = [] + for s in sessions: + if not isinstance(s, dict): continue - - session_id = str(session.get("sessionId") or "").strip() - if not session_id: + sid = _str_field(s, "sessionId") + if not sid: continue + if sid not in self._session_set: + self._session_set.add(sid) + new_ids.append(sid) + if sid not in self._session_cursor: + self._cold_sessions.add(sid) + cid = _str_field(s, "converseId") + if cid: + self._session_by_converse[cid] = sid - if session_id not in self._session_set: - self._session_set.add(session_id) - new_sessions.append(session_id) - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - converse_id = str(session.get("converseId") or "").strip() - if converse_id: - self._session_by_converse[converse_id] = session_id - - if not new_sessions: + if not new_ids: return - if self._ws_ready and subscribe_new: - await self._subscribe_sessions(new_sessions) - + await self._subscribe_sessions(new_ids) if self._fallback_mode: await self._ensure_fallback_workers() async def _refresh_panels(self, subscribe_new: bool) -> None: try: - response = await self._get_workspace_group() + response = await self._post_json("/api/claw/groups/get", {}) except Exception as e: logger.warning(f"Mochat getWorkspaceGroup failed: {e}") return @@ -606,80 +543,58 @@ class MochatChannel(BaseChannel): if not isinstance(raw_panels, list): return - new_panels: list[str] = [] - for panel in raw_panels: - if not isinstance(panel, dict): + new_ids: list[str] = [] + for p in raw_panels: + if not isinstance(p, dict): continue - - panel_type = panel.get("type") - if isinstance(panel_type, int) and panel_type != 0: + pt = p.get("type") + if isinstance(pt, int) and pt != 0: continue + pid = _str_field(p, "id", "_id") + if pid and pid not in self._panel_set: + self._panel_set.add(pid) + new_ids.append(pid) - panel_id = str(panel.get("id") or panel.get("_id") or "").strip() - if not panel_id: - continue - - if panel_id not in self._panel_set: - self._panel_set.add(panel_id) - new_panels.append(panel_id) - - if not new_panels: + if not new_ids: return - if self._ws_ready and subscribe_new: - await self._subscribe_panels(new_panels) - + await self._subscribe_panels(new_ids) if self._fallback_mode: await self._ensure_fallback_workers() + # ---- fallback workers -------------------------------------------------- + async def _ensure_fallback_workers(self) -> None: if not self._running: return - self._fallback_mode = True - - for session_id in sorted(self._session_set): - task = self._session_fallback_tasks.get(session_id) - if task and not task.done(): - continue - self._session_fallback_tasks[session_id] = asyncio.create_task( - self._session_watch_worker(session_id) - ) - - for panel_id in sorted(self._panel_set): - task = self._panel_fallback_tasks.get(panel_id) - if task and not task.done(): - continue - self._panel_fallback_tasks[panel_id] = asyncio.create_task( - self._panel_poll_worker(panel_id) - ) + for sid in sorted(self._session_set): + t = self._session_fallback_tasks.get(sid) + if not t or t.done(): + self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid)) + for pid in sorted(self._panel_set): + t = self._panel_fallback_tasks.get(pid) + if not t or t.done(): + self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid)) async def _stop_fallback_workers(self) -> None: self._fallback_mode = False - - tasks = [ - *self._session_fallback_tasks.values(), - *self._panel_fallback_tasks.values(), - ] - for task in tasks: - task.cancel() - + tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()] + for t in tasks: + t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) - self._session_fallback_tasks.clear() self._panel_fallback_tasks.clear() async def _session_watch_worker(self, session_id: str) -> None: while self._running and self._fallback_mode: try: - payload = await self._watch_session( - session_id=session_id, - cursor=self._session_cursor.get(session_id, 0), - timeout_ms=self.config.watch_timeout_ms, - limit=self.config.watch_limit, - ) - await self._handle_watch_payload(payload, target_kind="session") + payload = await self._post_json("/api/claw/sessions/watch", { + "sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0), + "timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit, + }) + await self._handle_watch_payload(payload, "session") except asyncio.CancelledError: break except Exception as e: @@ -688,72 +603,50 @@ class MochatChannel(BaseChannel): async def _panel_poll_worker(self, panel_id: str) -> None: sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0) - while self._running and self._fallback_mode: try: - response = await self._list_panel_messages( - panel_id=panel_id, - limit=min(100, max(1, self.config.watch_limit)), - ) - - raw_messages = response.get("messages") - if isinstance(raw_messages, list): - for message in reversed(raw_messages): - if not isinstance(message, dict): + resp = await self._post_json("/api/claw/groups/panels/messages", { + "panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)), + }) + msgs = resp.get("messages") + if isinstance(msgs, list): + for m in reversed(msgs): + if not isinstance(m, dict): continue - - synthetic_event = { - "type": "message.add", - "timestamp": message.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": str(message.get("messageId") or ""), - "author": str(message.get("author") or ""), - "authorInfo": message.get("authorInfo") if isinstance(message.get("authorInfo"), dict) else {}, - "content": message.get("content"), - "meta": message.get("meta") if isinstance(message.get("meta"), dict) else {}, - "groupId": str(response.get("groupId") or ""), - "converseId": panel_id, - }, - } - await self._process_inbound_event( - target_id=panel_id, - event=synthetic_event, - target_kind="panel", + evt = _make_synthetic_event( + message_id=str(m.get("messageId") or ""), + author=str(m.get("author") or ""), + content=m.get("content"), + meta=m.get("meta"), group_id=str(resp.get("groupId") or ""), + converse_id=panel_id, timestamp=m.get("createdAt"), + author_info=m.get("authorInfo"), ) + await self._process_inbound_event(panel_id, evt, "panel") except asyncio.CancelledError: break except Exception as e: logger.warning(f"Mochat panel polling error ({panel_id}): {e}") - await asyncio.sleep(sleep_s) - async def _handle_watch_payload( - self, - payload: dict[str, Any], - target_kind: str, - ) -> None: + # ---- inbound event processing ------------------------------------------ + + async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None: if not isinstance(payload, dict): return - - target_id = str(payload.get("sessionId") or "").strip() + target_id = _str_field(payload, "sessionId") if not target_id: return lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock()) async with lock: - previous_cursor = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 - payload_cursor = payload.get("cursor") - if ( - target_kind == "session" - and isinstance(payload_cursor, int) - and payload_cursor >= 0 - ): - self._mark_session_cursor(target_id, payload_cursor) + prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 + pc = payload.get("cursor") + if target_kind == "session" and isinstance(pc, int) and pc >= 0: + self._mark_session_cursor(target_id, pc) raw_events = payload.get("events") if not isinstance(raw_events, list): return - if target_kind == "session" and target_id in self._cold_sessions: self._cold_sessions.discard(target_id) return @@ -762,324 +655,176 @@ class MochatChannel(BaseChannel): if not isinstance(event, dict): continue seq = event.get("seq") - if ( - target_kind == "session" - and isinstance(seq, int) - and seq > self._session_cursor.get(target_id, previous_cursor) - ): + if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev): self._mark_session_cursor(target_id, seq) + if event.get("type") == "message.add": + await self._process_inbound_event(target_id, event, target_kind) - if event.get("type") != "message.add": - continue - - await self._process_inbound_event( - target_id=target_id, - event=event, - target_kind=target_kind, - ) - - async def _process_inbound_event( - self, - target_id: str, - event: dict[str, Any], - target_kind: str, - ) -> None: + async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None: payload = event.get("payload") if not isinstance(payload, dict): return - author = str(payload.get("author") or "").strip() - if not author: + author = _str_field(payload, "author") + if not author or (self.config.agent_user_id and author == self.config.agent_user_id): return - - if self.config.agent_user_id and author == self.config.agent_user_id: - return - if not self.is_allowed(author): return - message_id = str(payload.get("messageId") or "").strip() + message_id = _str_field(payload, "messageId") seen_key = f"{target_kind}:{target_id}" if message_id and self._remember_message_id(seen_key, message_id): return - raw_body = normalize_mochat_content(payload.get("content")) - if not raw_body: - raw_body = "[empty message]" + raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]" + ai = _safe_dict(payload.get("authorInfo")) + sender_name = _str_field(ai, "nickname", "email") + sender_username = _str_field(ai, "agentId") - author_info = payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {} - sender_name = str(author_info.get("nickname") or author_info.get("email") or "").strip() - sender_username = str(author_info.get("agentId") or "").strip() - - group_id = str(payload.get("groupId") or "").strip() + group_id = _str_field(payload, "groupId") is_group = bool(group_id) was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id) - - require_mention = ( - target_kind == "panel" - and is_group - and resolve_require_mention(self.config, target_id, group_id) - ) - + require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id) use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention" if require_mention and not was_mentioned and not use_delay: return entry = MochatBufferedEntry( - raw_body=raw_body, - author=author, - sender_name=sender_name, - sender_username=sender_username, - timestamp=parse_timestamp(event.get("timestamp")), - message_id=message_id, - group_id=group_id, + raw_body=raw_body, author=author, sender_name=sender_name, + sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")), + message_id=message_id, group_id=group_id, ) if use_delay: - delay_key = f"{target_kind}:{target_id}" + delay_key = seen_key if was_mentioned: - await self._flush_delayed_entries( - key=delay_key, - target_id=target_id, - target_kind=target_kind, - reason="mention", - entry=entry, - ) + await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry) else: - await self._enqueue_delayed_entry( - key=delay_key, - target_id=target_id, - target_kind=target_kind, - entry=entry, - ) + await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry) return - await self._dispatch_entries( - target_id=target_id, - target_kind=target_kind, - entries=[entry], - was_mentioned=was_mentioned, - ) + await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned) + + # ---- dedup / buffering ------------------------------------------------- def _remember_message_id(self, key: str, message_id: str) -> bool: seen_set = self._seen_set.setdefault(key, set()) seen_queue = self._seen_queue.setdefault(key, deque()) - if message_id in seen_set: return True - seen_set.add(message_id) seen_queue.append(message_id) - while len(seen_queue) > MAX_SEEN_MESSAGE_IDS: - removed = seen_queue.popleft() - seen_set.discard(removed) - + seen_set.discard(seen_queue.popleft()) return False - async def _enqueue_delayed_entry( - self, - key: str, - target_id: str, - target_kind: str, - entry: MochatBufferedEntry, - ) -> None: + async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None: state = self._delay_states.setdefault(key, DelayState()) - async with state.lock: state.entries.append(entry) if state.timer: state.timer.cancel() - - state.timer = asyncio.create_task( - self._delay_flush_after(key, target_id, target_kind) - ) + state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind)) async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None: await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0) - await self._flush_delayed_entries( - key=key, - target_id=target_id, - target_kind=target_kind, - reason="timer", - entry=None, - ) + await self._flush_delayed_entries(key, target_id, target_kind, "timer", None) - async def _flush_delayed_entries( - self, - key: str, - target_id: str, - target_kind: str, - reason: str, - entry: MochatBufferedEntry | None, - ) -> None: + async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None: state = self._delay_states.setdefault(key, DelayState()) - async with state.lock: if entry: state.entries.append(entry) - current = asyncio.current_task() if state.timer and state.timer is not current: state.timer.cancel() - state.timer = None - elif state.timer is current: - state.timer = None - + state.timer = None entries = state.entries[:] state.entries.clear() + if entries: + await self._dispatch_entries(target_id, target_kind, entries, reason == "mention") + async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None: if not entries: return - - await self._dispatch_entries( - target_id=target_id, - target_kind=target_kind, - entries=entries, - was_mentioned=(reason == "mention"), - ) - - async def _dispatch_entries( - self, - target_id: str, - target_kind: str, - entries: list[MochatBufferedEntry], - was_mentioned: bool, - ) -> None: - if not entries: - return - - is_group = bool(entries[-1].group_id) - body = build_buffered_body(entries, is_group) - if not body: - body = "[empty message]" - last = entries[-1] - metadata = { - "message_id": last.message_id, - "timestamp": last.timestamp, - "is_group": is_group, - "group_id": last.group_id, - "sender_name": last.sender_name, - "sender_username": last.sender_username, - "target_kind": target_kind, - "was_mentioned": was_mentioned, - "buffered_count": len(entries), - } - + is_group = bool(last.group_id) + body = build_buffered_body(entries, is_group) or "[empty message]" await self._handle_message( - sender_id=last.author, - chat_id=target_id, - content=body, - metadata=metadata, + sender_id=last.author, chat_id=target_id, content=body, + metadata={ + "message_id": last.message_id, "timestamp": last.timestamp, + "is_group": is_group, "group_id": last.group_id, + "sender_name": last.sender_name, "sender_username": last.sender_username, + "target_kind": target_kind, "was_mentioned": was_mentioned, + "buffered_count": len(entries), + }, ) async def _cancel_delay_timers(self) -> None: for state in self._delay_states.values(): if state.timer: state.timer.cancel() - state.timer = None self._delay_states.clear() + # ---- notify handlers --------------------------------------------------- + async def _handle_notify_chat_message(self, payload: Any) -> None: if not isinstance(payload, dict): return - - group_id = str(payload.get("groupId") or "").strip() - panel_id = str(payload.get("converseId") or payload.get("panelId") or "").strip() + group_id = _str_field(payload, "groupId") + panel_id = _str_field(payload, "converseId", "panelId") if not group_id or not panel_id: return - if self._panel_set and panel_id not in self._panel_set: return - synthetic_event = { - "type": "message.add", - "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": str(payload.get("_id") or payload.get("messageId") or ""), - "author": str(payload.get("author") or ""), - "authorInfo": payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {}, - "content": payload.get("content"), - "meta": payload.get("meta") if isinstance(payload.get("meta"), dict) else {}, - "groupId": group_id, - "converseId": panel_id, - }, - } - await self._process_inbound_event( - target_id=panel_id, - event=synthetic_event, - target_kind="panel", + evt = _make_synthetic_event( + message_id=str(payload.get("_id") or payload.get("messageId") or ""), + author=str(payload.get("author") or ""), + content=payload.get("content"), meta=payload.get("meta"), + group_id=group_id, converse_id=panel_id, + timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"), ) + await self._process_inbound_event(panel_id, evt, "panel") async def _handle_notify_inbox_append(self, payload: Any) -> None: - if not isinstance(payload, dict): + if not isinstance(payload, dict) or payload.get("type") != "message": return - - if payload.get("type") != "message": - return - detail = payload.get("payload") if not isinstance(detail, dict): return - - group_id = str(detail.get("groupId") or "").strip() - if group_id: + if _str_field(detail, "groupId"): return - - converse_id = str(detail.get("converseId") or "").strip() + converse_id = _str_field(detail, "converseId") if not converse_id: return session_id = self._session_by_converse.get(converse_id) if not session_id: - await self._refresh_sessions_directory(subscribe_new=self._ws_ready) + await self._refresh_sessions_directory(self._ws_ready) session_id = self._session_by_converse.get(converse_id) if not session_id: return - message_id = str(detail.get("messageId") or payload.get("_id") or "").strip() - author = str(detail.get("messageAuthor") or "").strip() - content = str(detail.get("messagePlainContent") or detail.get("messageSnippet") or "").strip() - - synthetic_event = { - "type": "message.add", - "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": message_id, - "author": author, - "content": content, - "meta": { - "source": "notify:chat.inbox.append", - "converseId": converse_id, - }, - "converseId": converse_id, - }, - } - - await self._process_inbound_event( - target_id=session_id, - event=synthetic_event, - target_kind="session", + evt = _make_synthetic_event( + message_id=str(detail.get("messageId") or payload.get("_id") or ""), + author=str(detail.get("messageAuthor") or ""), + content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""), + meta={"source": "notify:chat.inbox.append", "converseId": converse_id}, + group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"), ) + await self._process_inbound_event(session_id, evt, "session") + + # ---- cursor persistence ------------------------------------------------ def _mark_session_cursor(self, session_id: str, cursor: int) -> None: - if cursor < 0: + if cursor < 0 or cursor < self._session_cursor.get(session_id, 0): return - - previous = self._session_cursor.get(session_id, 0) - if cursor < previous: - return - self._session_cursor[session_id] = cursor - self._schedule_cursor_save() - - def _schedule_cursor_save(self) -> None: - if self._cursor_save_task and not self._cursor_save_task.done(): - return - - self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) + if not self._cursor_save_task or self._cursor_save_task.done(): + self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) async def _save_cursor_debounced(self) -> None: await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S) @@ -1088,140 +833,63 @@ class MochatChannel(BaseChannel): async def _load_session_cursors(self) -> None: if not self._cursor_path.exists(): return - try: data = json.loads(self._cursor_path.read_text("utf-8")) except Exception as e: logger.warning(f"Failed to read Mochat cursor file: {e}") return - cursors = data.get("cursors") if isinstance(data, dict) else None - if not isinstance(cursors, dict): - return - - for session_id, cursor in cursors.items(): - if isinstance(session_id, str) and isinstance(cursor, int) and cursor >= 0: - self._session_cursor[session_id] = cursor + if isinstance(cursors, dict): + for sid, cur in cursors.items(): + if isinstance(sid, str) and isinstance(cur, int) and cur >= 0: + self._session_cursor[sid] = cur async def _save_session_cursors(self) -> None: - payload = { - "schemaVersion": 1, - "updatedAt": datetime.utcnow().isoformat(), - "cursors": self._session_cursor, - } - try: self._state_dir.mkdir(parents=True, exist_ok=True) - self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8") + self._cursor_path.write_text(json.dumps({ + "schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(), + "cursors": self._session_cursor, + }, ensure_ascii=False, indent=2) + "\n", "utf-8") except Exception as e: logger.warning(f"Failed to save Mochat cursor file: {e}") - def _base_url(self) -> str: - return self.config.base_url.strip().rstrip("/") + # ---- HTTP helpers ------------------------------------------------------ async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: if not self._http: raise RuntimeError("Mochat HTTP client not initialized") - - url = f"{self._base_url()}{path}" - response = await self._http.post( - url, - headers={ - "Content-Type": "application/json", - "X-Claw-Token": self.config.claw_token, - }, - json=payload, - ) - - text = response.text + url = f"{self.config.base_url.strip().rstrip('/')}{path}" + response = await self._http.post(url, headers={ + "Content-Type": "application/json", "X-Claw-Token": self.config.claw_token, + }, json=payload) if not response.is_success: - raise RuntimeError(f"Mochat HTTP {response.status_code}: {text[:200]}") - - parsed: Any + raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}") try: parsed = response.json() except Exception: - parsed = text - + parsed = response.text if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): if parsed["code"] != 200: - message = str(parsed.get("message") or parsed.get("name") or "request failed") - raise RuntimeError(f"Mochat API error: {message} (code={parsed['code']})") + msg = str(parsed.get("message") or parsed.get("name") or "request failed") + raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})") data = parsed.get("data") return data if isinstance(data, dict) else {} + return parsed if isinstance(parsed, dict) else {} - if isinstance(parsed, dict): - return parsed - - return {} - - async def _watch_session( - self, - session_id: str, - cursor: int, - timeout_ms: int, - limit: int, - ) -> dict[str, Any]: - return await self._post_json( - "/api/claw/sessions/watch", - { - "sessionId": session_id, - "cursor": cursor, - "timeoutMs": timeout_ms, - "limit": limit, - }, - ) - - async def _send_session_message( - self, - session_id: str, - content: str, - reply_to: str | None, - ) -> dict[str, Any]: - payload = { - "sessionId": session_id, - "content": content, - } + async def _api_send(self, path: str, id_key: str, id_val: str, + content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]: + """Unified send helper for session and panel messages.""" + body: dict[str, Any] = {id_key: id_val, "content": content} if reply_to: - payload["replyTo"] = reply_to - return await self._post_json("/api/claw/sessions/send", payload) - - async def _send_panel_message( - self, - panel_id: str, - content: str, - reply_to: str | None, - group_id: str | None, - ) -> dict[str, Any]: - payload = { - "panelId": panel_id, - "content": content, - } - if reply_to: - payload["replyTo"] = reply_to + body["replyTo"] = reply_to if group_id: - payload["groupId"] = group_id - return await self._post_json("/api/claw/groups/panels/send", payload) + body["groupId"] = group_id + return await self._post_json(path, body) - async def _list_sessions(self) -> dict[str, Any]: - return await self._post_json("/api/claw/sessions/list", {}) - - async def _get_workspace_group(self) -> dict[str, Any]: - return await self._post_json("/api/claw/groups/get", {}) - - async def _list_panel_messages(self, panel_id: str, limit: int) -> dict[str, Any]: - return await self._post_json( - "/api/claw/groups/panels/messages", - { - "panelId": panel_id, - "limit": limit, - }, - ) - - def _read_group_id(self, metadata: dict[str, Any]) -> str | None: + @staticmethod + def _read_group_id(metadata: dict[str, Any]) -> str | None: if not isinstance(metadata, dict): return None value = metadata.get("group_id") or metadata.get("groupId") - if isinstance(value, str) and value.strip(): - return value.strip() - return None + return value.strip() if isinstance(value, str) and value.strip() else None