diff --git a/nanobot/channels/__init__.py b/nanobot/channels/__init__.py index 034d401..588169d 100644 --- a/nanobot/channels/__init__.py +++ b/nanobot/channels/__init__.py @@ -2,6 +2,5 @@ from nanobot.channels.base import BaseChannel from nanobot.channels.manager import ChannelManager -from nanobot.channels.mochat import MochatChannel -__all__ = ["BaseChannel", "ChannelManager", "MochatChannel"] +__all__ = ["BaseChannel", "ChannelManager"] diff --git a/nanobot/channels/mochat.py b/nanobot/channels/mochat.py index 6569cdd..30c3dbf 100644 --- a/nanobot/channels/mochat.py +++ b/nanobot/channels/mochat.py @@ -20,7 +20,6 @@ from nanobot.utils.helpers import get_data_path try: import socketio - SOCKETIO_AVAILABLE = True except ImportError: socketio = None @@ -28,20 +27,21 @@ except ImportError: try: import msgpack # noqa: F401 - MSGPACK_AVAILABLE = True except ImportError: MSGPACK_AVAILABLE = False - MAX_SEEN_MESSAGE_IDS = 2000 CURSOR_SAVE_DEBOUNCE_S = 0.5 +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + @dataclass class MochatBufferedEntry: """Buffered inbound entry for delayed dispatch.""" - raw_body: str author: str sender_name: str = "" @@ -54,7 +54,6 @@ class MochatBufferedEntry: @dataclass class DelayState: """Per-target delayed message state.""" - entries: list[MochatBufferedEntry] = field(default_factory=list) lock: asyncio.Lock = field(default_factory=asyncio.Lock) timer: asyncio.Task | None = None @@ -63,11 +62,48 @@ class DelayState: @dataclass class MochatTarget: """Outbound target resolution result.""" - id: str is_panel: bool +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + +def _safe_dict(value: Any) -> dict: + """Return *value* if it's a dict, else empty dict.""" + return value if isinstance(value, dict) else {} + + +def _str_field(src: dict, *keys: str) -> str: + """Return the first non-empty str value found for *keys*, stripped.""" + for k in keys: + v = src.get(k) + if isinstance(v, str) and v.strip(): + return v.strip() + return "" + + +def _make_synthetic_event( + message_id: str, author: str, content: Any, + meta: Any, group_id: str, converse_id: str, + timestamp: Any = None, *, author_info: Any = None, +) -> dict[str, Any]: + """Build a synthetic ``message.add`` event dict.""" + payload: dict[str, Any] = { + "messageId": message_id, "author": author, + "content": content, "meta": _safe_dict(meta), + "groupId": group_id, "converseId": converse_id, + } + if author_info is not None: + payload["authorInfo"] = _safe_dict(author_info) + return { + "type": "message.add", + "timestamp": timestamp or datetime.utcnow().isoformat(), + "payload": payload, + } + + def normalize_mochat_content(content: Any) -> str: """Normalize content payload to text.""" if isinstance(content, str): @@ -87,20 +123,15 @@ def resolve_mochat_target(raw: str) -> MochatTarget: return MochatTarget(id="", is_panel=False) lowered = trimmed.lower() - cleaned = trimmed - forced_panel = False - - prefixes = ["mochat:", "group:", "channel:", "panel:"] - for prefix in prefixes: + cleaned, forced_panel = trimmed, False + for prefix in ("mochat:", "group:", "channel:", "panel:"): if lowered.startswith(prefix): - cleaned = trimmed[len(prefix) :].strip() - if prefix in {"group:", "channel:", "panel:"}: - forced_panel = True + cleaned = trimmed[len(prefix):].strip() + forced_panel = prefix in {"group:", "channel:", "panel:"} break if not cleaned: return MochatTarget(id="", is_panel=False) - return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_")) @@ -108,24 +139,17 @@ def extract_mention_ids(value: Any) -> list[str]: """Extract mention ids from heterogeneous mention payload.""" if not isinstance(value, list): return [] - ids: list[str] = [] for item in value: if isinstance(item, str): - text = item.strip() - if text: - ids.append(text) - continue - - if not isinstance(item, dict): - continue - - for key in ("id", "userId", "_id"): - candidate = item.get(key) - if isinstance(candidate, str) and candidate.strip(): - ids.append(candidate.strip()) - break - + if item.strip(): + ids.append(item.strip()) + elif isinstance(item, dict): + for key in ("id", "userId", "_id"): + candidate = item.get(key) + if isinstance(candidate, str) and candidate.strip(): + ids.append(candidate.strip()) + break return ids @@ -135,35 +159,23 @@ def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool: if isinstance(meta, dict): if meta.get("mentioned") is True or meta.get("wasMentioned") is True: return True - - for field in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): - ids = extract_mention_ids(meta.get(field)) - if agent_user_id and agent_user_id in ids: + for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"): + if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)): return True - if not agent_user_id: return False - content = payload.get("content") if not isinstance(content, str) or not content: return False - return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content -def resolve_require_mention( - config: MochatConfig, - session_id: str, - group_id: str, -) -> bool: +def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool: """Resolve mention requirement for group/panel conversations.""" groups = config.groups or {} - if group_id and group_id in groups: - return bool(groups[group_id].require_mention) - if session_id in groups: - return bool(groups[session_id].require_mention) - if "*" in groups: - return bool(groups["*"].require_mention) + for key in (group_id, session_id, "*"): + if key and key in groups: + return bool(groups[key].require_mention) return bool(config.mention.require_in_groups) @@ -171,22 +183,18 @@ def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> s """Build text body from one or more buffered entries.""" if not entries: return "" - if len(entries) == 1: return entries[0].raw_body - lines: list[str] = [] for entry in entries: - body = entry.raw_body - if not body: + if not entry.raw_body: continue if is_group: label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author if label: - lines.append(f"{label}: {body}") + lines.append(f"{label}: {entry.raw_body}") continue - lines.append(body) - + lines.append(entry.raw_body) return "\n".join(lines).strip() @@ -200,6 +208,10 @@ def parse_timestamp(value: Any) -> int | None: return None +# --------------------------------------------------------------------------- +# Channel +# --------------------------------------------------------------------------- + class MochatChannel(BaseChannel): """Mochat channel using socket.io with fallback polling workers.""" @@ -210,8 +222,7 @@ class MochatChannel(BaseChannel): self.config: MochatConfig = config self._http: httpx.AsyncClient | None = None self._socket: Any = None - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False self._state_dir = get_data_path() / "mochat" self._cursor_path = self._state_dir / "session_cursors.json" @@ -220,24 +231,23 @@ class MochatChannel(BaseChannel): self._session_set: set[str] = set() self._panel_set: set[str] = set() - self._auto_discover_sessions = False - self._auto_discover_panels = False + self._auto_discover_sessions = self._auto_discover_panels = False self._cold_sessions: set[str] = set() self._session_by_converse: dict[str, str] = {} self._seen_set: dict[str, set[str]] = {} self._seen_queue: dict[str, deque[str]] = {} - self._delay_states: dict[str, DelayState] = {} self._fallback_mode = False self._session_fallback_tasks: dict[str, asyncio.Task] = {} self._panel_fallback_tasks: dict[str, asyncio.Task] = {} self._refresh_task: asyncio.Task | None = None - self._target_locks: dict[str, asyncio.Lock] = {} + # ---- lifecycle --------------------------------------------------------- + async def start(self) -> None: """Start Mochat channel workers and websocket connection.""" if not self.config.claw_token: @@ -246,26 +256,21 @@ class MochatChannel(BaseChannel): self._running = True self._http = httpx.AsyncClient(timeout=30.0) - self._state_dir.mkdir(parents=True, exist_ok=True) await self._load_session_cursors() self._seed_targets_from_config() - await self._refresh_targets(subscribe_new=False) - websocket_started = await self._start_socket_client() - if not websocket_started: + if not await self._start_socket_client(): await self._ensure_fallback_workers() self._refresh_task = asyncio.create_task(self._refresh_loop()) - while self._running: await asyncio.sleep(1) async def stop(self) -> None: """Stop all workers and clean up resources.""" self._running = False - if self._refresh_task: self._refresh_task.cancel() self._refresh_task = None @@ -283,15 +288,12 @@ class MochatChannel(BaseChannel): if self._cursor_save_task: self._cursor_save_task.cancel() self._cursor_save_task = None - await self._save_session_cursors() if self._http: await self._http.aclose() self._http = None - - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False async def send(self, msg: OutboundMessage) -> None: """Send outbound message to session or panel.""" @@ -299,10 +301,10 @@ class MochatChannel(BaseChannel): logger.warning("Mochat claw_token missing, skip send") return - content_parts = [msg.content.strip()] if msg.content and msg.content.strip() else [] + parts = ([msg.content.strip()] if msg.content and msg.content.strip() else []) if msg.media: - content_parts.extend([m for m in msg.media if isinstance(m, str) and m.strip()]) - content = "\n".join(content_parts).strip() + parts.extend(m for m in msg.media if isinstance(m, str) and m.strip()) + content = "\n".join(parts).strip() if not content: return @@ -311,43 +313,34 @@ class MochatChannel(BaseChannel): logger.warning("Mochat outbound target is empty") return - is_panel = target.is_panel or target.id in self._panel_set - if target.id.startswith("session_"): - is_panel = False - + is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_") try: if is_panel: - await self._send_panel_message( - panel_id=target.id, - content=content, - reply_to=msg.reply_to, - group_id=self._read_group_id(msg.metadata), - ) + await self._api_send("/api/claw/groups/panels/send", "panelId", target.id, + content, msg.reply_to, self._read_group_id(msg.metadata)) else: - await self._send_session_message( - session_id=target.id, - content=content, - reply_to=msg.reply_to, - ) + await self._api_send("/api/claw/sessions/send", "sessionId", target.id, + content, msg.reply_to) except Exception as e: logger.error(f"Failed to send Mochat message: {e}") + # ---- config / init helpers --------------------------------------------- + def _seed_targets_from_config(self) -> None: sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions) panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels) - self._session_set.update(sessions) self._panel_set.update(panels) + for sid in sessions: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) - for session_id in sessions: - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - def _normalize_id_list(self, values: list[str]) -> tuple[list[str], bool]: + @staticmethod + def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]: cleaned = [str(v).strip() for v in values if str(v).strip()] - has_wildcard = "*" in cleaned - ids = sorted({v for v in cleaned if v != "*"}) - return ids, has_wildcard + return sorted({v for v in cleaned if v != "*"}), "*" in cleaned + + # ---- websocket --------------------------------------------------------- async def _start_socket_client(self) -> bool: if not SOCKETIO_AVAILABLE: @@ -359,83 +352,56 @@ class MochatChannel(BaseChannel): if MSGPACK_AVAILABLE: serializer = "msgpack" else: - logger.warning( - "msgpack is not installed but socket_disable_msgpack=false; " - "trying JSON serializer" - ) - - reconnect_attempts = None - if self.config.max_retry_attempts > 0: - reconnect_attempts = self.config.max_retry_attempts + logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON") client = socketio.AsyncClient( reconnection=True, - reconnection_attempts=reconnect_attempts, + reconnection_attempts=self.config.max_retry_attempts or None, reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0), - reconnection_delay_max=max( - 0.1, - self.config.socket_max_reconnect_delay_ms / 1000.0, - ), - logger=False, - engineio_logger=False, - serializer=serializer, + reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0), + logger=False, engineio_logger=False, serializer=serializer, ) @client.event async def connect() -> None: - self._ws_connected = True - self._ws_ready = False + self._ws_connected, self._ws_ready = True, False logger.info("Mochat websocket connected") - subscribed = await self._subscribe_all() self._ws_ready = subscribed - if subscribed: - await self._stop_fallback_workers() - else: - await self._ensure_fallback_workers() + await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers()) @client.event async def disconnect() -> None: if not self._running: return - self._ws_connected = False - self._ws_ready = False + self._ws_connected = self._ws_ready = False logger.warning("Mochat websocket disconnected") await self._ensure_fallback_workers() @client.event async def connect_error(data: Any) -> None: - message = str(data) - logger.error(f"Mochat websocket connect error: {message}") + logger.error(f"Mochat websocket connect error: {data}") @client.on("claw.session.events") async def on_session_events(payload: dict[str, Any]) -> None: - await self._handle_watch_payload(payload, target_kind="session") + await self._handle_watch_payload(payload, "session") @client.on("claw.panel.events") async def on_panel_events(payload: dict[str, Any]) -> None: - await self._handle_watch_payload(payload, target_kind="panel") + await self._handle_watch_payload(payload, "panel") - for event_name in ( - "notify:chat.inbox.append", - "notify:chat.message.add", - "notify:chat.message.update", - "notify:chat.message.recall", - "notify:chat.message.delete", - ): - client.on(event_name, self._build_notify_handler(event_name)) + for ev in ("notify:chat.inbox.append", "notify:chat.message.add", + "notify:chat.message.update", "notify:chat.message.recall", + "notify:chat.message.delete"): + client.on(ev, self._build_notify_handler(ev)) socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/") - socket_path = (self.config.socket_path or "/socket.io").strip() - if socket_path.startswith("/"): - socket_path = socket_path[1:] + socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/") try: self._socket = client await client.connect( - socket_url, - transports=["websocket"], - socketio_path=socket_path, + socket_url, transports=["websocket"], socketio_path=socket_path, auth={"token": self.config.claw_token}, wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0), ) @@ -453,38 +419,30 @@ class MochatChannel(BaseChannel): async def handler(payload: Any) -> None: if event_name == "notify:chat.inbox.append": await self._handle_notify_inbox_append(payload) - return - - if event_name.startswith("notify:chat.message."): + elif event_name.startswith("notify:chat.message."): await self._handle_notify_chat_message(payload) - return handler - async def _subscribe_all(self) -> bool: - sessions_ok = await self._subscribe_sessions(sorted(self._session_set)) - panels_ok = await self._subscribe_panels(sorted(self._panel_set)) + # ---- subscribe --------------------------------------------------------- + async def _subscribe_all(self) -> bool: + ok = await self._subscribe_sessions(sorted(self._session_set)) + ok = await self._subscribe_panels(sorted(self._panel_set)) and ok if self._auto_discover_sessions or self._auto_discover_panels: await self._refresh_targets(subscribe_new=True) - - return sessions_ok and panels_ok + return ok async def _subscribe_sessions(self, session_ids: list[str]) -> bool: if not session_ids: return True + for sid in session_ids: + if sid not in self._session_cursor: + self._cold_sessions.add(sid) - for session_id in session_ids: - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - ack = await self._socket_call( - "com.claw.im.subscribeSessions", - { - "sessionIds": session_ids, - "cursors": self._session_cursor, - "limit": self.config.watch_limit, - }, - ) + ack = await self._socket_call("com.claw.im.subscribeSessions", { + "sessionIds": session_ids, "cursors": self._session_cursor, + "limit": self.config.watch_limit, + }) if not ack.get("result"): logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}") return False @@ -492,73 +450,57 @@ class MochatChannel(BaseChannel): data = ack.get("data") items: list[dict[str, Any]] = [] if isinstance(data, list): - items = [item for item in data if isinstance(item, dict)] + items = [i for i in data if isinstance(i, dict)] elif isinstance(data, dict): sessions = data.get("sessions") if isinstance(sessions, list): - items = [item for item in sessions if isinstance(item, dict)] + items = [i for i in sessions if isinstance(i, dict)] elif "sessionId" in data: items = [data] - - for payload in items: - await self._handle_watch_payload(payload, target_kind="session") - + for p in items: + await self._handle_watch_payload(p, "session") return True async def _subscribe_panels(self, panel_ids: list[str]) -> bool: if not self._auto_discover_panels and not panel_ids: return True - - ack = await self._socket_call( - "com.claw.im.subscribePanels", - { - "panelIds": panel_ids, - }, - ) + ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids}) if not ack.get("result"): logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}") return False - return True async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]: if not self._socket: return {"result": False, "message": "socket not connected"} - try: raw = await self._socket.call(event_name, payload, timeout=10) except Exception as e: return {"result": False, "message": str(e)} + return raw if isinstance(raw, dict) else {"result": True, "data": raw} - if isinstance(raw, dict): - return raw - - return {"result": True, "data": raw} + # ---- refresh / discovery ----------------------------------------------- async def _refresh_loop(self) -> None: interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0) - while self._running: await asyncio.sleep(interval_s) - try: await self._refresh_targets(subscribe_new=self._ws_ready) except Exception as e: logger.warning(f"Mochat refresh failed: {e}") - if self._fallback_mode: await self._ensure_fallback_workers() async def _refresh_targets(self, subscribe_new: bool) -> None: if self._auto_discover_sessions: - await self._refresh_sessions_directory(subscribe_new=subscribe_new) - + await self._refresh_sessions_directory(subscribe_new) if self._auto_discover_panels: - await self._refresh_panels(subscribe_new=subscribe_new) + await self._refresh_panels(subscribe_new) async def _refresh_sessions_directory(self, subscribe_new: bool) -> None: try: - response = await self._list_sessions() + response = await self._post_json("/api/claw/sessions/list", {}) except Exception as e: logger.warning(f"Mochat listSessions failed: {e}") return @@ -567,37 +509,32 @@ class MochatChannel(BaseChannel): if not isinstance(sessions, list): return - new_sessions: list[str] = [] - for session in sessions: - if not isinstance(session, dict): + new_ids: list[str] = [] + for s in sessions: + if not isinstance(s, dict): continue - - session_id = str(session.get("sessionId") or "").strip() - if not session_id: + sid = _str_field(s, "sessionId") + if not sid: continue + if sid not in self._session_set: + self._session_set.add(sid) + new_ids.append(sid) + if sid not in self._session_cursor: + self._cold_sessions.add(sid) + cid = _str_field(s, "converseId") + if cid: + self._session_by_converse[cid] = sid - if session_id not in self._session_set: - self._session_set.add(session_id) - new_sessions.append(session_id) - if session_id not in self._session_cursor: - self._cold_sessions.add(session_id) - - converse_id = str(session.get("converseId") or "").strip() - if converse_id: - self._session_by_converse[converse_id] = session_id - - if not new_sessions: + if not new_ids: return - if self._ws_ready and subscribe_new: - await self._subscribe_sessions(new_sessions) - + await self._subscribe_sessions(new_ids) if self._fallback_mode: await self._ensure_fallback_workers() async def _refresh_panels(self, subscribe_new: bool) -> None: try: - response = await self._get_workspace_group() + response = await self._post_json("/api/claw/groups/get", {}) except Exception as e: logger.warning(f"Mochat getWorkspaceGroup failed: {e}") return @@ -606,80 +543,58 @@ class MochatChannel(BaseChannel): if not isinstance(raw_panels, list): return - new_panels: list[str] = [] - for panel in raw_panels: - if not isinstance(panel, dict): + new_ids: list[str] = [] + for p in raw_panels: + if not isinstance(p, dict): continue - - panel_type = panel.get("type") - if isinstance(panel_type, int) and panel_type != 0: + pt = p.get("type") + if isinstance(pt, int) and pt != 0: continue + pid = _str_field(p, "id", "_id") + if pid and pid not in self._panel_set: + self._panel_set.add(pid) + new_ids.append(pid) - panel_id = str(panel.get("id") or panel.get("_id") or "").strip() - if not panel_id: - continue - - if panel_id not in self._panel_set: - self._panel_set.add(panel_id) - new_panels.append(panel_id) - - if not new_panels: + if not new_ids: return - if self._ws_ready and subscribe_new: - await self._subscribe_panels(new_panels) - + await self._subscribe_panels(new_ids) if self._fallback_mode: await self._ensure_fallback_workers() + # ---- fallback workers -------------------------------------------------- + async def _ensure_fallback_workers(self) -> None: if not self._running: return - self._fallback_mode = True - - for session_id in sorted(self._session_set): - task = self._session_fallback_tasks.get(session_id) - if task and not task.done(): - continue - self._session_fallback_tasks[session_id] = asyncio.create_task( - self._session_watch_worker(session_id) - ) - - for panel_id in sorted(self._panel_set): - task = self._panel_fallback_tasks.get(panel_id) - if task and not task.done(): - continue - self._panel_fallback_tasks[panel_id] = asyncio.create_task( - self._panel_poll_worker(panel_id) - ) + for sid in sorted(self._session_set): + t = self._session_fallback_tasks.get(sid) + if not t or t.done(): + self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid)) + for pid in sorted(self._panel_set): + t = self._panel_fallback_tasks.get(pid) + if not t or t.done(): + self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid)) async def _stop_fallback_workers(self) -> None: self._fallback_mode = False - - tasks = [ - *self._session_fallback_tasks.values(), - *self._panel_fallback_tasks.values(), - ] - for task in tasks: - task.cancel() - + tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()] + for t in tasks: + t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) - self._session_fallback_tasks.clear() self._panel_fallback_tasks.clear() async def _session_watch_worker(self, session_id: str) -> None: while self._running and self._fallback_mode: try: - payload = await self._watch_session( - session_id=session_id, - cursor=self._session_cursor.get(session_id, 0), - timeout_ms=self.config.watch_timeout_ms, - limit=self.config.watch_limit, - ) - await self._handle_watch_payload(payload, target_kind="session") + payload = await self._post_json("/api/claw/sessions/watch", { + "sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0), + "timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit, + }) + await self._handle_watch_payload(payload, "session") except asyncio.CancelledError: break except Exception as e: @@ -688,72 +603,50 @@ class MochatChannel(BaseChannel): async def _panel_poll_worker(self, panel_id: str) -> None: sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0) - while self._running and self._fallback_mode: try: - response = await self._list_panel_messages( - panel_id=panel_id, - limit=min(100, max(1, self.config.watch_limit)), - ) - - raw_messages = response.get("messages") - if isinstance(raw_messages, list): - for message in reversed(raw_messages): - if not isinstance(message, dict): + resp = await self._post_json("/api/claw/groups/panels/messages", { + "panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)), + }) + msgs = resp.get("messages") + if isinstance(msgs, list): + for m in reversed(msgs): + if not isinstance(m, dict): continue - - synthetic_event = { - "type": "message.add", - "timestamp": message.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": str(message.get("messageId") or ""), - "author": str(message.get("author") or ""), - "authorInfo": message.get("authorInfo") if isinstance(message.get("authorInfo"), dict) else {}, - "content": message.get("content"), - "meta": message.get("meta") if isinstance(message.get("meta"), dict) else {}, - "groupId": str(response.get("groupId") or ""), - "converseId": panel_id, - }, - } - await self._process_inbound_event( - target_id=panel_id, - event=synthetic_event, - target_kind="panel", + evt = _make_synthetic_event( + message_id=str(m.get("messageId") or ""), + author=str(m.get("author") or ""), + content=m.get("content"), + meta=m.get("meta"), group_id=str(resp.get("groupId") or ""), + converse_id=panel_id, timestamp=m.get("createdAt"), + author_info=m.get("authorInfo"), ) + await self._process_inbound_event(panel_id, evt, "panel") except asyncio.CancelledError: break except Exception as e: logger.warning(f"Mochat panel polling error ({panel_id}): {e}") - await asyncio.sleep(sleep_s) - async def _handle_watch_payload( - self, - payload: dict[str, Any], - target_kind: str, - ) -> None: + # ---- inbound event processing ------------------------------------------ + + async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None: if not isinstance(payload, dict): return - - target_id = str(payload.get("sessionId") or "").strip() + target_id = _str_field(payload, "sessionId") if not target_id: return lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock()) async with lock: - previous_cursor = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 - payload_cursor = payload.get("cursor") - if ( - target_kind == "session" - and isinstance(payload_cursor, int) - and payload_cursor >= 0 - ): - self._mark_session_cursor(target_id, payload_cursor) + prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0 + pc = payload.get("cursor") + if target_kind == "session" and isinstance(pc, int) and pc >= 0: + self._mark_session_cursor(target_id, pc) raw_events = payload.get("events") if not isinstance(raw_events, list): return - if target_kind == "session" and target_id in self._cold_sessions: self._cold_sessions.discard(target_id) return @@ -762,324 +655,176 @@ class MochatChannel(BaseChannel): if not isinstance(event, dict): continue seq = event.get("seq") - if ( - target_kind == "session" - and isinstance(seq, int) - and seq > self._session_cursor.get(target_id, previous_cursor) - ): + if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev): self._mark_session_cursor(target_id, seq) + if event.get("type") == "message.add": + await self._process_inbound_event(target_id, event, target_kind) - if event.get("type") != "message.add": - continue - - await self._process_inbound_event( - target_id=target_id, - event=event, - target_kind=target_kind, - ) - - async def _process_inbound_event( - self, - target_id: str, - event: dict[str, Any], - target_kind: str, - ) -> None: + async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None: payload = event.get("payload") if not isinstance(payload, dict): return - author = str(payload.get("author") or "").strip() - if not author: + author = _str_field(payload, "author") + if not author or (self.config.agent_user_id and author == self.config.agent_user_id): return - - if self.config.agent_user_id and author == self.config.agent_user_id: - return - if not self.is_allowed(author): return - message_id = str(payload.get("messageId") or "").strip() + message_id = _str_field(payload, "messageId") seen_key = f"{target_kind}:{target_id}" if message_id and self._remember_message_id(seen_key, message_id): return - raw_body = normalize_mochat_content(payload.get("content")) - if not raw_body: - raw_body = "[empty message]" + raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]" + ai = _safe_dict(payload.get("authorInfo")) + sender_name = _str_field(ai, "nickname", "email") + sender_username = _str_field(ai, "agentId") - author_info = payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {} - sender_name = str(author_info.get("nickname") or author_info.get("email") or "").strip() - sender_username = str(author_info.get("agentId") or "").strip() - - group_id = str(payload.get("groupId") or "").strip() + group_id = _str_field(payload, "groupId") is_group = bool(group_id) was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id) - - require_mention = ( - target_kind == "panel" - and is_group - and resolve_require_mention(self.config, target_id, group_id) - ) - + require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id) use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention" if require_mention and not was_mentioned and not use_delay: return entry = MochatBufferedEntry( - raw_body=raw_body, - author=author, - sender_name=sender_name, - sender_username=sender_username, - timestamp=parse_timestamp(event.get("timestamp")), - message_id=message_id, - group_id=group_id, + raw_body=raw_body, author=author, sender_name=sender_name, + sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")), + message_id=message_id, group_id=group_id, ) if use_delay: - delay_key = f"{target_kind}:{target_id}" + delay_key = seen_key if was_mentioned: - await self._flush_delayed_entries( - key=delay_key, - target_id=target_id, - target_kind=target_kind, - reason="mention", - entry=entry, - ) + await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry) else: - await self._enqueue_delayed_entry( - key=delay_key, - target_id=target_id, - target_kind=target_kind, - entry=entry, - ) + await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry) return - await self._dispatch_entries( - target_id=target_id, - target_kind=target_kind, - entries=[entry], - was_mentioned=was_mentioned, - ) + await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned) + + # ---- dedup / buffering ------------------------------------------------- def _remember_message_id(self, key: str, message_id: str) -> bool: seen_set = self._seen_set.setdefault(key, set()) seen_queue = self._seen_queue.setdefault(key, deque()) - if message_id in seen_set: return True - seen_set.add(message_id) seen_queue.append(message_id) - while len(seen_queue) > MAX_SEEN_MESSAGE_IDS: - removed = seen_queue.popleft() - seen_set.discard(removed) - + seen_set.discard(seen_queue.popleft()) return False - async def _enqueue_delayed_entry( - self, - key: str, - target_id: str, - target_kind: str, - entry: MochatBufferedEntry, - ) -> None: + async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None: state = self._delay_states.setdefault(key, DelayState()) - async with state.lock: state.entries.append(entry) if state.timer: state.timer.cancel() - - state.timer = asyncio.create_task( - self._delay_flush_after(key, target_id, target_kind) - ) + state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind)) async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None: await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0) - await self._flush_delayed_entries( - key=key, - target_id=target_id, - target_kind=target_kind, - reason="timer", - entry=None, - ) + await self._flush_delayed_entries(key, target_id, target_kind, "timer", None) - async def _flush_delayed_entries( - self, - key: str, - target_id: str, - target_kind: str, - reason: str, - entry: MochatBufferedEntry | None, - ) -> None: + async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None: state = self._delay_states.setdefault(key, DelayState()) - async with state.lock: if entry: state.entries.append(entry) - current = asyncio.current_task() if state.timer and state.timer is not current: state.timer.cancel() - state.timer = None - elif state.timer is current: - state.timer = None - + state.timer = None entries = state.entries[:] state.entries.clear() + if entries: + await self._dispatch_entries(target_id, target_kind, entries, reason == "mention") + async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None: if not entries: return - - await self._dispatch_entries( - target_id=target_id, - target_kind=target_kind, - entries=entries, - was_mentioned=(reason == "mention"), - ) - - async def _dispatch_entries( - self, - target_id: str, - target_kind: str, - entries: list[MochatBufferedEntry], - was_mentioned: bool, - ) -> None: - if not entries: - return - - is_group = bool(entries[-1].group_id) - body = build_buffered_body(entries, is_group) - if not body: - body = "[empty message]" - last = entries[-1] - metadata = { - "message_id": last.message_id, - "timestamp": last.timestamp, - "is_group": is_group, - "group_id": last.group_id, - "sender_name": last.sender_name, - "sender_username": last.sender_username, - "target_kind": target_kind, - "was_mentioned": was_mentioned, - "buffered_count": len(entries), - } - + is_group = bool(last.group_id) + body = build_buffered_body(entries, is_group) or "[empty message]" await self._handle_message( - sender_id=last.author, - chat_id=target_id, - content=body, - metadata=metadata, + sender_id=last.author, chat_id=target_id, content=body, + metadata={ + "message_id": last.message_id, "timestamp": last.timestamp, + "is_group": is_group, "group_id": last.group_id, + "sender_name": last.sender_name, "sender_username": last.sender_username, + "target_kind": target_kind, "was_mentioned": was_mentioned, + "buffered_count": len(entries), + }, ) async def _cancel_delay_timers(self) -> None: for state in self._delay_states.values(): if state.timer: state.timer.cancel() - state.timer = None self._delay_states.clear() + # ---- notify handlers --------------------------------------------------- + async def _handle_notify_chat_message(self, payload: Any) -> None: if not isinstance(payload, dict): return - - group_id = str(payload.get("groupId") or "").strip() - panel_id = str(payload.get("converseId") or payload.get("panelId") or "").strip() + group_id = _str_field(payload, "groupId") + panel_id = _str_field(payload, "converseId", "panelId") if not group_id or not panel_id: return - if self._panel_set and panel_id not in self._panel_set: return - synthetic_event = { - "type": "message.add", - "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": str(payload.get("_id") or payload.get("messageId") or ""), - "author": str(payload.get("author") or ""), - "authorInfo": payload.get("authorInfo") if isinstance(payload.get("authorInfo"), dict) else {}, - "content": payload.get("content"), - "meta": payload.get("meta") if isinstance(payload.get("meta"), dict) else {}, - "groupId": group_id, - "converseId": panel_id, - }, - } - await self._process_inbound_event( - target_id=panel_id, - event=synthetic_event, - target_kind="panel", + evt = _make_synthetic_event( + message_id=str(payload.get("_id") or payload.get("messageId") or ""), + author=str(payload.get("author") or ""), + content=payload.get("content"), meta=payload.get("meta"), + group_id=group_id, converse_id=panel_id, + timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"), ) + await self._process_inbound_event(panel_id, evt, "panel") async def _handle_notify_inbox_append(self, payload: Any) -> None: - if not isinstance(payload, dict): + if not isinstance(payload, dict) or payload.get("type") != "message": return - - if payload.get("type") != "message": - return - detail = payload.get("payload") if not isinstance(detail, dict): return - - group_id = str(detail.get("groupId") or "").strip() - if group_id: + if _str_field(detail, "groupId"): return - - converse_id = str(detail.get("converseId") or "").strip() + converse_id = _str_field(detail, "converseId") if not converse_id: return session_id = self._session_by_converse.get(converse_id) if not session_id: - await self._refresh_sessions_directory(subscribe_new=self._ws_ready) + await self._refresh_sessions_directory(self._ws_ready) session_id = self._session_by_converse.get(converse_id) if not session_id: return - message_id = str(detail.get("messageId") or payload.get("_id") or "").strip() - author = str(detail.get("messageAuthor") or "").strip() - content = str(detail.get("messagePlainContent") or detail.get("messageSnippet") or "").strip() - - synthetic_event = { - "type": "message.add", - "timestamp": payload.get("createdAt") or datetime.utcnow().isoformat(), - "payload": { - "messageId": message_id, - "author": author, - "content": content, - "meta": { - "source": "notify:chat.inbox.append", - "converseId": converse_id, - }, - "converseId": converse_id, - }, - } - - await self._process_inbound_event( - target_id=session_id, - event=synthetic_event, - target_kind="session", + evt = _make_synthetic_event( + message_id=str(detail.get("messageId") or payload.get("_id") or ""), + author=str(detail.get("messageAuthor") or ""), + content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""), + meta={"source": "notify:chat.inbox.append", "converseId": converse_id}, + group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"), ) + await self._process_inbound_event(session_id, evt, "session") + + # ---- cursor persistence ------------------------------------------------ def _mark_session_cursor(self, session_id: str, cursor: int) -> None: - if cursor < 0: + if cursor < 0 or cursor < self._session_cursor.get(session_id, 0): return - - previous = self._session_cursor.get(session_id, 0) - if cursor < previous: - return - self._session_cursor[session_id] = cursor - self._schedule_cursor_save() - - def _schedule_cursor_save(self) -> None: - if self._cursor_save_task and not self._cursor_save_task.done(): - return - - self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) + if not self._cursor_save_task or self._cursor_save_task.done(): + self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced()) async def _save_cursor_debounced(self) -> None: await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S) @@ -1088,140 +833,63 @@ class MochatChannel(BaseChannel): async def _load_session_cursors(self) -> None: if not self._cursor_path.exists(): return - try: data = json.loads(self._cursor_path.read_text("utf-8")) except Exception as e: logger.warning(f"Failed to read Mochat cursor file: {e}") return - cursors = data.get("cursors") if isinstance(data, dict) else None - if not isinstance(cursors, dict): - return - - for session_id, cursor in cursors.items(): - if isinstance(session_id, str) and isinstance(cursor, int) and cursor >= 0: - self._session_cursor[session_id] = cursor + if isinstance(cursors, dict): + for sid, cur in cursors.items(): + if isinstance(sid, str) and isinstance(cur, int) and cur >= 0: + self._session_cursor[sid] = cur async def _save_session_cursors(self) -> None: - payload = { - "schemaVersion": 1, - "updatedAt": datetime.utcnow().isoformat(), - "cursors": self._session_cursor, - } - try: self._state_dir.mkdir(parents=True, exist_ok=True) - self._cursor_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", "utf-8") + self._cursor_path.write_text(json.dumps({ + "schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(), + "cursors": self._session_cursor, + }, ensure_ascii=False, indent=2) + "\n", "utf-8") except Exception as e: logger.warning(f"Failed to save Mochat cursor file: {e}") - def _base_url(self) -> str: - return self.config.base_url.strip().rstrip("/") + # ---- HTTP helpers ------------------------------------------------------ async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: if not self._http: raise RuntimeError("Mochat HTTP client not initialized") - - url = f"{self._base_url()}{path}" - response = await self._http.post( - url, - headers={ - "Content-Type": "application/json", - "X-Claw-Token": self.config.claw_token, - }, - json=payload, - ) - - text = response.text + url = f"{self.config.base_url.strip().rstrip('/')}{path}" + response = await self._http.post(url, headers={ + "Content-Type": "application/json", "X-Claw-Token": self.config.claw_token, + }, json=payload) if not response.is_success: - raise RuntimeError(f"Mochat HTTP {response.status_code}: {text[:200]}") - - parsed: Any + raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}") try: parsed = response.json() except Exception: - parsed = text - + parsed = response.text if isinstance(parsed, dict) and isinstance(parsed.get("code"), int): if parsed["code"] != 200: - message = str(parsed.get("message") or parsed.get("name") or "request failed") - raise RuntimeError(f"Mochat API error: {message} (code={parsed['code']})") + msg = str(parsed.get("message") or parsed.get("name") or "request failed") + raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})") data = parsed.get("data") return data if isinstance(data, dict) else {} + return parsed if isinstance(parsed, dict) else {} - if isinstance(parsed, dict): - return parsed - - return {} - - async def _watch_session( - self, - session_id: str, - cursor: int, - timeout_ms: int, - limit: int, - ) -> dict[str, Any]: - return await self._post_json( - "/api/claw/sessions/watch", - { - "sessionId": session_id, - "cursor": cursor, - "timeoutMs": timeout_ms, - "limit": limit, - }, - ) - - async def _send_session_message( - self, - session_id: str, - content: str, - reply_to: str | None, - ) -> dict[str, Any]: - payload = { - "sessionId": session_id, - "content": content, - } + async def _api_send(self, path: str, id_key: str, id_val: str, + content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]: + """Unified send helper for session and panel messages.""" + body: dict[str, Any] = {id_key: id_val, "content": content} if reply_to: - payload["replyTo"] = reply_to - return await self._post_json("/api/claw/sessions/send", payload) - - async def _send_panel_message( - self, - panel_id: str, - content: str, - reply_to: str | None, - group_id: str | None, - ) -> dict[str, Any]: - payload = { - "panelId": panel_id, - "content": content, - } - if reply_to: - payload["replyTo"] = reply_to + body["replyTo"] = reply_to if group_id: - payload["groupId"] = group_id - return await self._post_json("/api/claw/groups/panels/send", payload) + body["groupId"] = group_id + return await self._post_json(path, body) - async def _list_sessions(self) -> dict[str, Any]: - return await self._post_json("/api/claw/sessions/list", {}) - - async def _get_workspace_group(self) -> dict[str, Any]: - return await self._post_json("/api/claw/groups/get", {}) - - async def _list_panel_messages(self, panel_id: str, limit: int) -> dict[str, Any]: - return await self._post_json( - "/api/claw/groups/panels/messages", - { - "panelId": panel_id, - "limit": limit, - }, - ) - - def _read_group_id(self, metadata: dict[str, Any]) -> str | None: + @staticmethod + def _read_group_id(metadata: dict[str, Any]) -> str | None: if not isinstance(metadata, dict): return None value = metadata.get("group_id") or metadata.get("groupId") - if isinstance(value, str) and value.strip(): - return value.strip() - return None + return value.strip() if isinstance(value, str) and value.strip() else None