resolve conflicts with main and adapt MiniMax

This commit is contained in:
Re-bin 2026-02-10 16:27:10 +00:00
commit 39dd7feb28
21 changed files with 3548 additions and 180 deletions

2
.gitignore vendored
View File

@ -17,3 +17,5 @@ docs/
__pycache__/ __pycache__/
poetry.lock poetry.lock
.pytest_cache/ .pytest_cache/
tests/
botpy.log

323
README.md
View File

@ -12,17 +12,21 @@
</p> </p>
</div> </div>
🐈 **nanobot** is an **ultra-lightweight** personal AI assistant inspired by [Clawdbot](https://github.com/openclaw/openclaw) 🐈 **nanobot** is an **ultra-lightweight** personal AI assistant inspired by [OpenClaw](https://github.com/openclaw/openclaw)
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
📏 Real-time line count: **3,422 lines** (run `bash core_agent_lines.sh` to verify anytime) 📏 Real-time line count: **3,510 lines** (run `bash core_agent_lines.sh` to verify anytime)
## 📢 News ## 📢 News
- **2026-02-10** 🎉 Released v0.1.3.post6 with improvements! Check the updates [notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post6) and our [roadmap](https://github.com/HKUDS/nanobot/discussions/431).
- **2026-02-09** 💬 Added Slack, Email, and QQ support — nanobot now supports multiple chat platforms!
- **2026-02-08** 🔧 Refactored Providers—adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
- **2026-02-07** 🚀 Released v0.1.3.post5 with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details.
- **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening! - **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening!
- **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support! - **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support!
- **2026-02-04** 🚀 Released v0.1.3.post4 with multi-provider & Docker support! Check [release notes](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post4) for details. - **2026-02-04** 🚀 Released v0.1.3.post4 with multi-provider & Docker support! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post4) for details.
- **2026-02-03** ⚡ Integrated vLLM for local LLM support and improved natural language task scheduling! - **2026-02-03** ⚡ Integrated vLLM for local LLM support and improved natural language task scheduling!
- **2026-02-02** 🎉 nanobot officially launched! Welcome to try 🐈 nanobot! - **2026-02-02** 🎉 nanobot officially launched! Welcome to try 🐈 nanobot!
@ -91,7 +95,7 @@ pip install nanobot-ai
> [!TIP] > [!TIP]
> Set your API key in `~/.nanobot/config.json`. > Set your API key in `~/.nanobot/config.json`.
> Get API keys: [OpenRouter](https://openrouter.ai/keys) (Global) · [DashScope](https://dashscope.console.aliyun.com) (Qwen) · [Brave Search](https://brave.com/search/api/) (optional, for web search) > Get API keys: [OpenRouter](https://openrouter.ai/keys) (Global) · [Brave Search](https://brave.com/search/api/) (optional, for web search)
**1. Initialize** **1. Initialize**
@ -164,7 +168,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps ## 💬 Chat Apps
Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, anywhere. Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, Mochat, DingTalk, Slack, Email, or QQ — anytime, anywhere.
| Channel | Setup | | Channel | Setup |
|---------|-------| |---------|-------|
@ -172,6 +176,11 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime,
| **Discord** | Easy (bot token + intents) | | **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) | | **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) | | **Feishu** | Medium (app credentials) |
| **Mochat** | Medium (claw token + websocket) |
| **DingTalk** | Medium (app credentials) |
| **Slack** | Medium (bot + app tokens) |
| **Email** | Medium (IMAP/SMTP credentials) |
| **QQ** | Easy (app credentials) |
<details> <details>
<summary><b>Telegram</b> (Recommended)</summary> <summary><b>Telegram</b> (Recommended)</summary>
@ -195,7 +204,9 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime,
} }
``` ```
> Get your user ID from `@userinfobot` on Telegram. > You can find your **User ID** in Telegram settings. It is shown as `@yourUserId`.
> Copy this value **without the `@` symbol** and paste it into the config file.
**3. Run** **3. Run**
@ -205,6 +216,63 @@ nanobot gateway
</details> </details>
<details>
<summary><b>Mochat (Claw IM)</b></summary>
Uses **Socket.IO WebSocket** by default, with HTTP polling fallback.
**1. Ask nanobot to set up Mochat for you**
Simply send this message to nanobot (replace `xxx@xxx` with your real email):
```
Read https://raw.githubusercontent.com/HKUDS/MoChat/refs/heads/main/skills/nanobot/skill.md and register on MoChat. My Email account is xxx@xxx Bind me as your owner and DM me on MoChat.
```
nanobot will automatically register, configure `~/.nanobot/config.json`, and connect to Mochat.
**2. Restart gateway**
```bash
nanobot gateway
```
That's it — nanobot handles the rest!
<br>
<details>
<summary>Manual configuration (advanced)</summary>
If you prefer to configure manually, add the following to `~/.nanobot/config.json`:
> Keep `claw_token` private. It should only be sent in `X-Claw-Token` header to your Mochat API endpoint.
```json
{
"channels": {
"mochat": {
"enabled": true,
"base_url": "https://mochat.io",
"socket_url": "https://mochat.io",
"socket_path": "/socket.io",
"claw_token": "claw_xxx",
"agent_user_id": "6982abcdef",
"sessions": ["*"],
"panels": ["*"],
"reply_delay_mode": "non-mention",
"reply_delay_ms": 120000
}
}
}
```
</details>
</details>
<details> <details>
<summary><b>Discord</b></summary> <summary><b>Discord</b></summary>
@ -291,10 +359,6 @@ nanobot gateway
Uses **WebSocket** long connection — no public IP required. Uses **WebSocket** long connection — no public IP required.
```bash
pip install nanobot-ai[feishu]
```
**1. Create a Feishu bot** **1. Create a Feishu bot**
- Visit [Feishu Open Platform](https://open.feishu.cn/app) - Visit [Feishu Open Platform](https://open.feishu.cn/app)
- Create a new app → Enable **Bot** capability - Create a new app → Enable **Bot** capability
@ -335,14 +399,189 @@ nanobot gateway
</details> </details>
<details>
<summary><b>QQ (QQ单聊)</b></summary>
Uses **botpy SDK** with WebSocket — no public IP required. Currently supports **private messages only**.
**1. Register & create bot**
- Visit [QQ Open Platform](https://q.qq.com) → Register as a developer (personal or enterprise)
- Create a new bot application
- Go to **开发设置 (Developer Settings)** → copy **AppID** and **AppSecret**
**2. Set up sandbox for testing**
- In the bot management console, find **沙箱配置 (Sandbox Config)**
- Under **在消息列表配置**, click **添加成员** and add your own QQ number
- Once added, scan the bot's QR code with mobile QQ → open the bot profile → tap "发消息" to start chatting
**3. Configure**
> - `allowFrom`: Leave empty for public access, or add user openids to restrict. You can find openids in the nanobot logs when a user messages the bot.
> - For production: submit a review in the bot console and publish. See [QQ Bot Docs](https://bot.q.qq.com/wiki/) for the full publishing flow.
```json
{
"channels": {
"qq": {
"enabled": true,
"appId": "YOUR_APP_ID",
"secret": "YOUR_APP_SECRET",
"allowFrom": []
}
}
}
```
**4. Run**
```bash
nanobot gateway
```
Now send a message to the bot from QQ — it should respond!
</details>
<details>
<summary><b>DingTalk (钉钉)</b></summary>
Uses **Stream Mode** — no public IP required.
**1. Create a DingTalk bot**
- Visit [DingTalk Open Platform](https://open-dev.dingtalk.com/)
- Create a new app -> Add **Robot** capability
- **Configuration**:
- Toggle **Stream Mode** ON
- **Permissions**: Add necessary permissions for sending messages
- Get **AppKey** (Client ID) and **AppSecret** (Client Secret) from "Credentials"
- Publish the app
**2. Configure**
```json
{
"channels": {
"dingtalk": {
"enabled": true,
"clientId": "YOUR_APP_KEY",
"clientSecret": "YOUR_APP_SECRET",
"allowFrom": []
}
}
}
```
> `allowFrom`: Leave empty to allow all users, or add `["staffId"]` to restrict access.
**3. Run**
```bash
nanobot gateway
```
</details>
<details>
<summary><b>Slack</b></summary>
Uses **Socket Mode** — no public URL required.
**1. Create a Slack app**
- Go to [Slack API](https://api.slack.com/apps) → **Create New App** → "From scratch"
- Pick a name and select your workspace
**2. Configure the app**
- **Socket Mode**: Toggle ON → Generate an **App-Level Token** with `connections:write` scope → copy it (`xapp-...`)
- **OAuth & Permissions**: Add bot scopes: `chat:write`, `reactions:write`, `app_mentions:read`
- **Event Subscriptions**: Toggle ON → Subscribe to bot events: `message.im`, `message.channels`, `app_mention` → Save Changes
- **App Home**: Scroll to **Show Tabs** → Enable **Messages Tab** → Check **"Allow users to send Slash commands and messages from the messages tab"**
- **Install App**: Click **Install to Workspace** → Authorize → copy the **Bot Token** (`xoxb-...`)
**3. Configure nanobot**
```json
{
"channels": {
"slack": {
"enabled": true,
"botToken": "xoxb-...",
"appToken": "xapp-...",
"groupPolicy": "mention"
}
}
}
```
**4. Run**
```bash
nanobot gateway
```
DM the bot directly or @mention it in a channel — it should respond!
> [!TIP]
> - `groupPolicy`: `"mention"` (default — respond only when @mentioned), `"open"` (respond to all channel messages), or `"allowlist"` (restrict to specific channels).
> - DM policy defaults to open. Set `"dm": {"enabled": false}` to disable DMs.
</details>
<details>
<summary><b>Email</b></summary>
Give nanobot its own email account. It polls **IMAP** for incoming mail and replies via **SMTP** — like a personal email assistant.
**1. Get credentials (Gmail example)**
- Create a dedicated Gmail account for your bot (e.g. `my-nanobot@gmail.com`)
- Enable 2-Step Verification → Create an [App Password](https://myaccount.google.com/apppasswords)
- Use this app password for both IMAP and SMTP
**2. Configure**
> - `consentGranted` must be `true` to allow mailbox access. This is a safety gate — set `false` to fully disable.
> - `allowFrom`: Leave empty to accept emails from anyone, or restrict to specific senders.
> - `smtpUseTls` and `smtpUseSsl` default to `true` / `false` respectively, which is correct for Gmail (port 587 + STARTTLS). No need to set them explicitly.
> - Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
```json
{
"channels": {
"email": {
"enabled": true,
"consentGranted": true,
"imapHost": "imap.gmail.com",
"imapPort": 993,
"imapUsername": "my-nanobot@gmail.com",
"imapPassword": "your-app-password",
"smtpHost": "smtp.gmail.com",
"smtpPort": 587,
"smtpUsername": "my-nanobot@gmail.com",
"smtpPassword": "your-app-password",
"fromAddress": "my-nanobot@gmail.com",
"allowFrom": ["your-real-email@gmail.com"]
}
}
}
```
**3. Run**
```bash
nanobot gateway
```
</details>
## ⚙️ Configuration ## ⚙️ Configuration
Config file: `~/.nanobot/config.json` Config file: `~/.nanobot/config.json`
### Providers ### Providers
> [!NOTE] > [!TIP]
> Groq provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed. > - **Groq** provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed.
> - **Zhipu Coding Plan**: If you're on Zhipu's coding plan, set `"apiBase": "https://open.bigmodel.cn/api/coding/paas/v4"` in your zhipu provider config.
| Provider | Purpose | Get API Key | | Provider | Purpose | Get API Key |
|----------|---------|-------------| |----------|---------|-------------|
@ -355,11 +594,57 @@ Config file: `~/.nanobot/config.json`
| `minimax` | LLM (MiniMax direct) | [platform.minimax.io](https://platform.minimax.io) | | `minimax` | LLM (MiniMax direct) | [platform.minimax.io](https://platform.minimax.io) |
| `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) | | `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) |
| `dashscope` | LLM (Qwen) | [dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) | | `dashscope` | LLM (Qwen) | [dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) |
| `moonshot` | LLM (Moonshot/Kimi) | [platform.moonshot.cn](https://platform.moonshot.cn) |
| `zhipu` | LLM (Zhipu GLM) | [open.bigmodel.cn](https://open.bigmodel.cn) |
| `vllm` | LLM (local, any OpenAI-compatible server) | — |
<details>
<summary><b>Adding a New Provider (Developer Guide)</b></summary>
nanobot uses a **Provider Registry** (`nanobot/providers/registry.py`) as the single source of truth.
Adding a new provider only takes **2 steps** — no if-elif chains to touch.
**Step 1.** Add a `ProviderSpec` entry to `PROVIDERS` in `nanobot/providers/registry.py`:
```python
ProviderSpec(
name="myprovider", # config field name
keywords=("myprovider", "mymodel"), # model-name keywords for auto-matching
env_key="MYPROVIDER_API_KEY", # env var for LiteLLM
display_name="My Provider", # shown in `nanobot status`
litellm_prefix="myprovider", # auto-prefix: model → myprovider/model
skip_prefixes=("myprovider/",), # don't double-prefix
)
```
**Step 2.** Add a field to `ProvidersConfig` in `nanobot/config/schema.py`:
```python
class ProvidersConfig(BaseModel):
...
myprovider: ProviderConfig = ProviderConfig()
```
That's it! Environment variables, model prefixing, config matching, and `nanobot status` display will all work automatically.
**Common `ProviderSpec` options:**
| Field | Description | Example |
|-------|-------------|---------|
| `litellm_prefix` | Auto-prefix model names for LiteLLM | `"dashscope"``dashscope/qwen-max` |
| `skip_prefixes` | Don't prefix if model already starts with these | `("dashscope/", "openrouter/")` |
| `env_extras` | Additional env vars to set | `(("ZHIPUAI_API_KEY", "{api_key}"),)` |
| `model_overrides` | Per-model parameter overrides | `(("kimi-k2.5", {"temperature": 1.0}),)` |
| `is_gateway` | Can route any model (like OpenRouter) | `True` |
| `detect_by_key_prefix` | Detect gateway by API key prefix | `"sk-or-"` |
| `detect_by_base_keyword` | Detect gateway by API base URL | `"openrouter"` |
| `strip_model_prefix` | Strip existing prefix before re-prefixing | `True` (for AiHubMix) |
</details>
### Security ### Security
> [!TIP]
> For production deployments, set `"restrictToWorkspace": true` in your config to sandbox the agent. > For production deployments, set `"restrictToWorkspace": true` in your config to sandbox the agent.
| Option | Default | Description | | Option | Default | Description |
@ -375,11 +660,15 @@ Config file: `~/.nanobot/config.json`
| `nanobot onboard` | Initialize config & workspace | | `nanobot onboard` | Initialize config & workspace |
| `nanobot agent -m "..."` | Chat with the agent | | `nanobot agent -m "..."` | Chat with the agent |
| `nanobot agent` | Interactive chat mode | | `nanobot agent` | Interactive chat mode |
| `nanobot agent --no-markdown` | Show plain-text replies |
| `nanobot agent --logs` | Show runtime logs during chat |
| `nanobot gateway` | Start the gateway | | `nanobot gateway` | Start the gateway |
| `nanobot status` | Show status | | `nanobot status` | Show status |
| `nanobot channels login` | Link WhatsApp (scan QR) | | `nanobot channels login` | Link WhatsApp (scan QR) |
| `nanobot channels status` | Show channel status | | `nanobot channels status` | Show channel status |
Interactive mode exits: `exit`, `quit`, `/exit`, `/quit`, `:q`, or `Ctrl+D`.
<details> <details>
<summary><b>Scheduled Tasks (Cron)</b></summary> <summary><b>Scheduled Tasks (Cron)</b></summary>
@ -414,7 +703,7 @@ docker run -v ~/.nanobot:/root/.nanobot --rm nanobot onboard
# Edit config on host to add API keys # Edit config on host to add API keys
vim ~/.nanobot/config.json vim ~/.nanobot/config.json
# Run gateway (connects to Telegram/WhatsApp) # Run gateway (connects to enabled channels, e.g. Telegram/Discord/Mochat)
docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway docker run -v ~/.nanobot:/root/.nanobot -p 18790:18790 nanobot gateway
# Or run a single command # Or run a single command
@ -434,7 +723,7 @@ nanobot/
│ ├── subagent.py # Background task execution │ ├── subagent.py # Background task execution
│ └── tools/ # Built-in tools (incl. spawn) │ └── tools/ # Built-in tools (incl. spawn)
├── skills/ # 🎯 Bundled skills (github, weather, tmux...) ├── skills/ # 🎯 Bundled skills (github, weather, tmux...)
├── channels/ # 📱 WhatsApp integration ├── channels/ # 📱 Chat channel integrations
├── bus/ # 🚌 Message routing ├── bus/ # 🚌 Message routing
├── cron/ # ⏰ Scheduled tasks ├── cron/ # ⏰ Scheduled tasks
├── heartbeat/ # 💓 Proactive wake-up ├── heartbeat/ # 💓 Proactive wake-up
@ -454,13 +743,13 @@ PRs welcome! The codebase is intentionally small and readable. 🤗
- [ ] **Multi-modal** — See and hear (images, voice, video) - [ ] **Multi-modal** — See and hear (images, voice, video)
- [ ] **Long-term memory** — Never forget important context - [ ] **Long-term memory** — Never forget important context
- [ ] **Better reasoning** — Multi-step planning and reflection - [ ] **Better reasoning** — Multi-step planning and reflection
- [ ] **More integrations**Discord, Slack, email, calendar - [ ] **More integrations**Calendar and more
- [ ] **Self-improvement** — Learn from feedback and mistakes - [ ] **Self-improvement** — Learn from feedback and mistakes
### Contributors ### Contributors
<a href="https://github.com/HKUDS/nanobot/graphs/contributors"> <a href="https://github.com/HKUDS/nanobot/graphs/contributors">
<img src="https://contrib.rocks/image?repo=HKUDS/nanobot&max=100&columns=12" /> <img src="https://contrib.rocks/image?repo=HKUDS/nanobot&max=100&columns=12&updated=20260210" alt="Contributors" />
</a> </a>

View File

@ -20,6 +20,7 @@ const VERSION = '0.1.0';
export interface InboundMessage { export interface InboundMessage {
id: string; id: string;
sender: string; sender: string;
pn: string;
content: string; content: string;
timestamp: number; timestamp: number;
isGroup: boolean; isGroup: boolean;
@ -123,6 +124,7 @@ export class WhatsAppClient {
this.options.onMessage({ this.options.onMessage({
id: msg.key.id || '', id: msg.key.id || '',
sender: msg.key.remoteJid || '', sender: msg.key.remoteJid || '',
pn: msg.key.remoteJidAlt || '',
content, content,
timestamp: msg.messageTimestamp as number, timestamp: msg.messageTimestamp as number,
isGroup, isGroup,

View File

@ -207,7 +207,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
self, self,
messages: list[dict[str, Any]], messages: list[dict[str, Any]],
content: str | None, content: str | None,
tool_calls: list[dict[str, Any]] | None = None tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Add an assistant message to the message list. Add an assistant message to the message list.
@ -216,6 +217,7 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
messages: Current message list. messages: Current message list.
content: Message content. content: Message content.
tool_calls: Optional tool calls. tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns: Returns:
Updated message list. Updated message list.
@ -225,5 +227,9 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
if tool_calls: if tool_calls:
msg["tool_calls"] = tool_calls msg["tool_calls"] = tool_calls
# Thinking models reject history without this
if reasoning_content:
msg["reasoning_content"] = reasoning_content
messages.append(msg) messages.append(msg)
return messages return messages

View File

@ -45,6 +45,7 @@ class AgentLoop:
exec_config: "ExecToolConfig | None" = None, exec_config: "ExecToolConfig | None" = None,
cron_service: "CronService | None" = None, cron_service: "CronService | None" = None,
restrict_to_workspace: bool = False, restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
@ -59,7 +60,7 @@ class AgentLoop:
self.restrict_to_workspace = restrict_to_workspace self.restrict_to_workspace = restrict_to_workspace
self.context = ContextBuilder(workspace) self.context = ContextBuilder(workspace)
self.sessions = SessionManager(workspace) self.sessions = session_manager or SessionManager(workspace)
self.tools = ToolRegistry() self.tools = ToolRegistry()
self.subagents = SubagentManager( self.subagents = SubagentManager(
provider=provider, provider=provider,
@ -212,7 +213,8 @@ class AgentLoop:
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
) )
# Execute tools # Execute tools
@ -243,7 +245,8 @@ class AgentLoop:
return OutboundMessage( return OutboundMessage(
channel=msg.channel, channel=msg.channel,
chat_id=msg.chat_id, chat_id=msg.chat_id,
content=final_content content=final_content,
metadata=msg.metadata or {}, # Pass through for channel-specific needs (e.g. Slack thread_ts)
) )
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None: async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
@ -316,7 +319,8 @@ class AgentLoop:
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
) )
for tool_call in response.tool_calls: for tool_call in response.tool_calls:

View File

@ -128,14 +128,17 @@ class ExecTool(Tool):
cwd_path = Path(cwd).resolve() cwd_path = Path(cwd).resolve()
win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd) win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd)
posix_paths = re.findall(r"/[^\s\"']+", cmd) # Only match absolute paths — avoid false positives on relative
# paths like ".venv/bin/python" where "/bin/python" would be
# incorrectly extracted by the old pattern.
posix_paths = re.findall(r"(?:^|[\s|>])(/[^\s\"'>]+)", cmd)
for raw in win_paths + posix_paths: for raw in win_paths + posix_paths:
try: try:
p = Path(raw).resolve() p = Path(raw.strip()).resolve()
except Exception: except Exception:
continue continue
if cwd_path not in p.parents and p != cwd_path: if p.is_absolute() and cwd_path not in p.parents and p != cwd_path:
return "Error: Command blocked by safety guard (path outside working dir)" return "Error: Command blocked by safety guard (path outside working dir)"
return None return None

View File

@ -0,0 +1,238 @@
"""DingTalk/DingDing channel implementation using Stream Mode."""
import asyncio
import json
import time
from typing import Any
from loguru import logger
import httpx
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DingTalkConfig
try:
from dingtalk_stream import (
DingTalkStreamClient,
Credential,
CallbackHandler,
CallbackMessage,
AckMessage,
)
from dingtalk_stream.chatbot import ChatbotMessage
DINGTALK_AVAILABLE = True
except ImportError:
DINGTALK_AVAILABLE = False
# Fallback so class definitions don't crash at module level
CallbackHandler = object # type: ignore[assignment,misc]
CallbackMessage = None # type: ignore[assignment,misc]
AckMessage = None # type: ignore[assignment,misc]
ChatbotMessage = None # type: ignore[assignment,misc]
class NanobotDingTalkHandler(CallbackHandler):
"""
Standard DingTalk Stream SDK Callback Handler.
Parses incoming messages and forwards them to the Nanobot channel.
"""
def __init__(self, channel: "DingTalkChannel"):
super().__init__()
self.channel = channel
async def process(self, message: CallbackMessage):
"""Process incoming stream message."""
try:
# Parse using SDK's ChatbotMessage for robust handling
chatbot_msg = ChatbotMessage.from_dict(message.data)
# Extract text content; fall back to raw dict if SDK object is empty
content = ""
if chatbot_msg.text:
content = chatbot_msg.text.content.strip()
if not content:
content = message.data.get("text", {}).get("content", "").strip()
if not content:
logger.warning(
f"Received empty or unsupported message type: {chatbot_msg.message_type}"
)
return AckMessage.STATUS_OK, "OK"
sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id
sender_name = chatbot_msg.sender_nick or "Unknown"
logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}")
# Forward to Nanobot via _on_message (non-blocking).
# Store reference to prevent GC before task completes.
task = asyncio.create_task(
self.channel._on_message(content, sender_id, sender_name)
)
self.channel._background_tasks.add(task)
task.add_done_callback(self.channel._background_tasks.discard)
return AckMessage.STATUS_OK, "OK"
except Exception as e:
logger.error(f"Error processing DingTalk message: {e}")
# Return OK to avoid retry loop from DingTalk server
return AckMessage.STATUS_OK, "Error"
class DingTalkChannel(BaseChannel):
"""
DingTalk channel using Stream Mode.
Uses WebSocket to receive events via `dingtalk-stream` SDK.
Uses direct HTTP API to send messages (SDK is mainly for receiving).
Note: Currently only supports private (1:1) chat. Group messages are
received but replies are sent back as private messages to the sender.
"""
name = "dingtalk"
def __init__(self, config: DingTalkConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DingTalkConfig = config
self._client: Any = None
self._http: httpx.AsyncClient | None = None
# Access Token management for sending messages
self._access_token: str | None = None
self._token_expiry: float = 0
# Hold references to background tasks to prevent GC
self._background_tasks: set[asyncio.Task] = set()
async def start(self) -> None:
"""Start the DingTalk bot with Stream Mode."""
try:
if not DINGTALK_AVAILABLE:
logger.error(
"DingTalk Stream SDK not installed. Run: pip install dingtalk-stream"
)
return
if not self.config.client_id or not self.config.client_secret:
logger.error("DingTalk client_id and client_secret not configured")
return
self._running = True
self._http = httpx.AsyncClient()
logger.info(
f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..."
)
credential = Credential(self.config.client_id, self.config.client_secret)
self._client = DingTalkStreamClient(credential)
# Register standard handler
handler = NanobotDingTalkHandler(self)
self._client.register_callback_handler(ChatbotMessage.TOPIC, handler)
logger.info("DingTalk bot started with Stream Mode")
# client.start() is an async infinite loop handling the websocket connection
await self._client.start()
except Exception as e:
logger.exception(f"Failed to start DingTalk channel: {e}")
async def stop(self) -> None:
"""Stop the DingTalk bot."""
self._running = False
# Close the shared HTTP client
if self._http:
await self._http.aclose()
self._http = None
# Cancel outstanding background tasks
for task in self._background_tasks:
task.cancel()
self._background_tasks.clear()
async def _get_access_token(self) -> str | None:
"""Get or refresh Access Token."""
if self._access_token and time.time() < self._token_expiry:
return self._access_token
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
data = {
"appKey": self.config.client_id,
"appSecret": self.config.client_secret,
}
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot refresh token")
return None
try:
resp = await self._http.post(url, json=data)
resp.raise_for_status()
res_data = resp.json()
self._access_token = res_data.get("accessToken")
# Expire 60s early to be safe
self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60
return self._access_token
except Exception as e:
logger.error(f"Failed to get DingTalk access token: {e}")
return None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through DingTalk."""
token = await self._get_access_token()
if not token:
return
# oToMessages/batchSend: sends to individual users (private chat)
# https://open.dingtalk.com/document/orgapp/robot-batch-send-messages
url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
headers = {"x-acs-dingtalk-access-token": token}
data = {
"robotCode": self.config.client_id,
"userIds": [msg.chat_id], # chat_id is the user's staffId
"msgKey": "sampleMarkdown",
"msgParam": json.dumps({
"text": msg.content,
"title": "Nanobot Reply",
}),
}
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot send")
return
try:
resp = await self._http.post(url, json=data, headers=headers)
if resp.status_code != 200:
logger.error(f"DingTalk send failed: {resp.text}")
else:
logger.debug(f"DingTalk message sent to {msg.chat_id}")
except Exception as e:
logger.error(f"Error sending DingTalk message: {e}")
async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None:
"""Handle incoming message (called by NanobotDingTalkHandler).
Delegates to BaseChannel._handle_message() which enforces allow_from
permission checks before publishing to the bus.
"""
try:
logger.info(f"DingTalk inbound: {content} from {sender_name}")
await self._handle_message(
sender_id=sender_id,
chat_id=sender_id, # For private chat, chat_id == sender_id
content=str(content),
metadata={
"sender_name": sender_name,
"platform": "dingtalk",
},
)
except Exception as e:
logger.error(f"Error publishing DingTalk message: {e}")

403
nanobot/channels/email.py Normal file
View File

@ -0,0 +1,403 @@
"""Email channel implementation using IMAP polling + SMTP replies."""
import asyncio
import html
import imaplib
import re
import smtplib
import ssl
from datetime import date
from email import policy
from email.header import decode_header, make_header
from email.message import EmailMessage
from email.parser import BytesParser
from email.utils import parseaddr
from typing import Any
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import EmailConfig
class EmailChannel(BaseChannel):
"""
Email channel.
Inbound:
- Poll IMAP mailbox for unread messages.
- Convert each message into an inbound event.
Outbound:
- Send responses via SMTP back to the sender address.
"""
name = "email"
_IMAP_MONTHS = (
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
)
def __init__(self, config: EmailConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: EmailConfig = config
self._last_subject_by_chat: dict[str, str] = {}
self._last_message_id_by_chat: dict[str, str] = {}
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
self._MAX_PROCESSED_UIDS = 100000
async def start(self) -> None:
"""Start polling IMAP for inbound emails."""
if not self.config.consent_granted:
logger.warning(
"Email channel disabled: consent_granted is false. "
"Set channels.email.consentGranted=true after explicit user permission."
)
return
if not self._validate_config():
return
self._running = True
logger.info("Starting Email channel (IMAP polling mode)...")
poll_seconds = max(5, int(self.config.poll_interval_seconds))
while self._running:
try:
inbound_items = await asyncio.to_thread(self._fetch_new_messages)
for item in inbound_items:
sender = item["sender"]
subject = item.get("subject", "")
message_id = item.get("message_id", "")
if subject:
self._last_subject_by_chat[sender] = subject
if message_id:
self._last_message_id_by_chat[sender] = message_id
await self._handle_message(
sender_id=sender,
chat_id=sender,
content=item["content"],
metadata=item.get("metadata", {}),
)
except Exception as e:
logger.error(f"Email polling error: {e}")
await asyncio.sleep(poll_seconds)
async def stop(self) -> None:
"""Stop polling loop."""
self._running = False
async def send(self, msg: OutboundMessage) -> None:
"""Send email via SMTP."""
if not self.config.consent_granted:
logger.warning("Skip email send: consent_granted is false")
return
force_send = bool((msg.metadata or {}).get("force_send"))
if not self.config.auto_reply_enabled and not force_send:
logger.info("Skip automatic email reply: auto_reply_enabled is false")
return
if not self.config.smtp_host:
logger.warning("Email channel SMTP host not configured")
return
to_addr = msg.chat_id.strip()
if not to_addr:
logger.warning("Email channel missing recipient address")
return
base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply")
subject = self._reply_subject(base_subject)
if msg.metadata and isinstance(msg.metadata.get("subject"), str):
override = msg.metadata["subject"].strip()
if override:
subject = override
email_msg = EmailMessage()
email_msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
email_msg["To"] = to_addr
email_msg["Subject"] = subject
email_msg.set_content(msg.content or "")
in_reply_to = self._last_message_id_by_chat.get(to_addr)
if in_reply_to:
email_msg["In-Reply-To"] = in_reply_to
email_msg["References"] = in_reply_to
try:
await asyncio.to_thread(self._smtp_send, email_msg)
except Exception as e:
logger.error(f"Error sending email to {to_addr}: {e}")
raise
def _validate_config(self) -> bool:
missing = []
if not self.config.imap_host:
missing.append("imap_host")
if not self.config.imap_username:
missing.append("imap_username")
if not self.config.imap_password:
missing.append("imap_password")
if not self.config.smtp_host:
missing.append("smtp_host")
if not self.config.smtp_username:
missing.append("smtp_username")
if not self.config.smtp_password:
missing.append("smtp_password")
if missing:
logger.error(f"Email channel not configured, missing: {', '.join(missing)}")
return False
return True
def _smtp_send(self, msg: EmailMessage) -> None:
timeout = 30
if self.config.smtp_use_ssl:
with smtplib.SMTP_SSL(
self.config.smtp_host,
self.config.smtp_port,
timeout=timeout,
) as smtp:
smtp.login(self.config.smtp_username, self.config.smtp_password)
smtp.send_message(msg)
return
with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port, timeout=timeout) as smtp:
if self.config.smtp_use_tls:
smtp.starttls(context=ssl.create_default_context())
smtp.login(self.config.smtp_username, self.config.smtp_password)
smtp.send_message(msg)
def _fetch_new_messages(self) -> list[dict[str, Any]]:
"""Poll IMAP and return parsed unread messages."""
return self._fetch_messages(
search_criteria=("UNSEEN",),
mark_seen=self.config.mark_seen,
dedupe=True,
limit=0,
)
def fetch_messages_between_dates(
self,
start_date: date,
end_date: date,
limit: int = 20,
) -> list[dict[str, Any]]:
"""
Fetch messages in [start_date, end_date) by IMAP date search.
This is used for historical summarization tasks (e.g. "yesterday").
"""
if end_date <= start_date:
return []
return self._fetch_messages(
search_criteria=(
"SINCE",
self._format_imap_date(start_date),
"BEFORE",
self._format_imap_date(end_date),
),
mark_seen=False,
dedupe=False,
limit=max(1, int(limit)),
)
def _fetch_messages(
self,
search_criteria: tuple[str, ...],
mark_seen: bool,
dedupe: bool,
limit: int,
) -> list[dict[str, Any]]:
"""Fetch messages by arbitrary IMAP search criteria."""
messages: list[dict[str, Any]] = []
mailbox = self.config.imap_mailbox or "INBOX"
if self.config.imap_use_ssl:
client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
else:
client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
try:
client.login(self.config.imap_username, self.config.imap_password)
status, _ = client.select(mailbox)
if status != "OK":
return messages
status, data = client.search(None, *search_criteria)
if status != "OK" or not data:
return messages
ids = data[0].split()
if limit > 0 and len(ids) > limit:
ids = ids[-limit:]
for imap_id in ids:
status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
if status != "OK" or not fetched:
continue
raw_bytes = self._extract_message_bytes(fetched)
if raw_bytes is None:
continue
uid = self._extract_uid(fetched)
if dedupe and uid and uid in self._processed_uids:
continue
parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
if not sender:
continue
subject = self._decode_header_value(parsed.get("Subject", ""))
date_value = parsed.get("Date", "")
message_id = parsed.get("Message-ID", "").strip()
body = self._extract_text_body(parsed)
if not body:
body = "(empty email body)"
body = body[: self.config.max_body_chars]
content = (
f"Email received.\n"
f"From: {sender}\n"
f"Subject: {subject}\n"
f"Date: {date_value}\n\n"
f"{body}"
)
metadata = {
"message_id": message_id,
"subject": subject,
"date": date_value,
"sender_email": sender,
"uid": uid,
}
messages.append(
{
"sender": sender,
"subject": subject,
"message_id": message_id,
"content": content,
"metadata": metadata,
}
)
if dedupe and uid:
self._processed_uids.add(uid)
# mark_seen is the primary dedup; this set is a safety net
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
self._processed_uids.clear()
if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen")
finally:
try:
client.logout()
except Exception:
pass
return messages
@classmethod
def _format_imap_date(cls, value: date) -> str:
"""Format date for IMAP search (always English month abbreviations)."""
month = cls._IMAP_MONTHS[value.month - 1]
return f"{value.day:02d}-{month}-{value.year}"
@staticmethod
def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
for item in fetched:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
return bytes(item[1])
return None
@staticmethod
def _extract_uid(fetched: list[Any]) -> str:
for item in fetched:
if isinstance(item, tuple) and item and isinstance(item[0], (bytes, bytearray)):
head = bytes(item[0]).decode("utf-8", errors="ignore")
m = re.search(r"UID\s+(\d+)", head)
if m:
return m.group(1)
return ""
@staticmethod
def _decode_header_value(value: str) -> str:
if not value:
return ""
try:
return str(make_header(decode_header(value)))
except Exception:
return value
@classmethod
def _extract_text_body(cls, msg: Any) -> str:
"""Best-effort extraction of readable body text."""
if msg.is_multipart():
plain_parts: list[str] = []
html_parts: list[str] = []
for part in msg.walk():
if part.get_content_disposition() == "attachment":
continue
content_type = part.get_content_type()
try:
payload = part.get_content()
except Exception:
payload_bytes = part.get_payload(decode=True) or b""
charset = part.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
continue
if content_type == "text/plain":
plain_parts.append(payload)
elif content_type == "text/html":
html_parts.append(payload)
if plain_parts:
return "\n\n".join(plain_parts).strip()
if html_parts:
return cls._html_to_text("\n\n".join(html_parts)).strip()
return ""
try:
payload = msg.get_content()
except Exception:
payload_bytes = msg.get_payload(decode=True) or b""
charset = msg.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
return ""
if msg.get_content_type() == "text/html":
return cls._html_to_text(payload).strip()
return payload.strip()
@staticmethod
def _html_to_text(raw_html: str) -> str:
text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text)
def _reply_subject(self, base_subject: str) -> str:
subject = (base_subject or "").strip() or "nanobot reply"
prefix = self.config.subject_prefix or "Re: "
if subject.lower().startswith("re:"):
return subject
return f"{prefix}{subject}"

View File

@ -1,7 +1,9 @@
"""Channel manager for coordinating chat channels.""" """Channel manager for coordinating chat channels."""
from __future__ import annotations
import asyncio import asyncio
from typing import Any from typing import Any, TYPE_CHECKING
from loguru import logger from loguru import logger
@ -10,6 +12,9 @@ from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Config from nanobot.config.schema import Config
if TYPE_CHECKING:
from nanobot.session.manager import SessionManager
class ChannelManager: class ChannelManager:
""" """
@ -21,9 +26,10 @@ class ChannelManager:
- Route outbound messages - Route outbound messages
""" """
def __init__(self, config: Config, bus: MessageBus): def __init__(self, config: Config, bus: MessageBus, session_manager: "SessionManager | None" = None):
self.config = config self.config = config
self.bus = bus self.bus = bus
self.session_manager = session_manager
self.channels: dict[str, BaseChannel] = {} self.channels: dict[str, BaseChannel] = {}
self._dispatch_task: asyncio.Task | None = None self._dispatch_task: asyncio.Task | None = None
@ -40,6 +46,7 @@ class ChannelManager:
self.config.channels.telegram, self.config.channels.telegram,
self.bus, self.bus,
groq_api_key=self.config.providers.groq.api_key, groq_api_key=self.config.providers.groq.api_key,
session_manager=self.session_manager,
) )
logger.info("Telegram channel enabled") logger.info("Telegram channel enabled")
except ImportError as e: except ImportError as e:
@ -78,8 +85,72 @@ class ChannelManager:
except ImportError as e: except ImportError as e:
logger.warning(f"Feishu channel not available: {e}") logger.warning(f"Feishu channel not available: {e}")
# Mochat channel
if self.config.channels.mochat.enabled:
try:
from nanobot.channels.mochat import MochatChannel
self.channels["mochat"] = MochatChannel(
self.config.channels.mochat, self.bus
)
logger.info("Mochat channel enabled")
except ImportError as e:
logger.warning(f"Mochat channel not available: {e}")
# DingTalk channel
if self.config.channels.dingtalk.enabled:
try:
from nanobot.channels.dingtalk import DingTalkChannel
self.channels["dingtalk"] = DingTalkChannel(
self.config.channels.dingtalk, self.bus
)
logger.info("DingTalk channel enabled")
except ImportError as e:
logger.warning(f"DingTalk channel not available: {e}")
# Email channel
if self.config.channels.email.enabled:
try:
from nanobot.channels.email import EmailChannel
self.channels["email"] = EmailChannel(
self.config.channels.email, self.bus
)
logger.info("Email channel enabled")
except ImportError as e:
logger.warning(f"Email channel not available: {e}")
# Slack channel
if self.config.channels.slack.enabled:
try:
from nanobot.channels.slack import SlackChannel
self.channels["slack"] = SlackChannel(
self.config.channels.slack, self.bus
)
logger.info("Slack channel enabled")
except ImportError as e:
logger.warning(f"Slack channel not available: {e}")
# QQ channel
if self.config.channels.qq.enabled:
try:
from nanobot.channels.qq import QQChannel
self.channels["qq"] = QQChannel(
self.config.channels.qq,
self.bus,
)
logger.info("QQ channel enabled")
except ImportError as e:
logger.warning(f"QQ channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
try:
await channel.start()
except Exception as e:
logger.error(f"Failed to start channel {name}: {e}")
async def start_all(self) -> None: async def start_all(self) -> None:
"""Start WhatsApp channel and the outbound dispatcher.""" """Start all channels and the outbound dispatcher."""
if not self.channels: if not self.channels:
logger.warning("No channels enabled") logger.warning("No channels enabled")
return return
@ -87,11 +158,11 @@ class ChannelManager:
# Start outbound dispatcher # Start outbound dispatcher
self._dispatch_task = asyncio.create_task(self._dispatch_outbound()) self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
# Start WhatsApp channel # Start channels
tasks = [] tasks = []
for name, channel in self.channels.items(): for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...") logger.info(f"Starting {name} channel...")
tasks.append(asyncio.create_task(channel.start())) tasks.append(asyncio.create_task(self._start_channel(name, channel)))
# Wait for all to complete (they should run forever) # Wait for all to complete (they should run forever)
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)

895
nanobot/channels/mochat.py Normal file
View File

@ -0,0 +1,895 @@
"""Mochat channel implementation using Socket.IO with HTTP polling fallback."""
from __future__ import annotations
import asyncio
import json
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import httpx
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import MochatConfig
from nanobot.utils.helpers import get_data_path
try:
import socketio
SOCKETIO_AVAILABLE = True
except ImportError:
socketio = None
SOCKETIO_AVAILABLE = False
try:
import msgpack # noqa: F401
MSGPACK_AVAILABLE = True
except ImportError:
MSGPACK_AVAILABLE = False
MAX_SEEN_MESSAGE_IDS = 2000
CURSOR_SAVE_DEBOUNCE_S = 0.5
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class MochatBufferedEntry:
"""Buffered inbound entry for delayed dispatch."""
raw_body: str
author: str
sender_name: str = ""
sender_username: str = ""
timestamp: int | None = None
message_id: str = ""
group_id: str = ""
@dataclass
class DelayState:
"""Per-target delayed message state."""
entries: list[MochatBufferedEntry] = field(default_factory=list)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
timer: asyncio.Task | None = None
@dataclass
class MochatTarget:
"""Outbound target resolution result."""
id: str
is_panel: bool
# ---------------------------------------------------------------------------
# Pure helpers
# ---------------------------------------------------------------------------
def _safe_dict(value: Any) -> dict:
"""Return *value* if it's a dict, else empty dict."""
return value if isinstance(value, dict) else {}
def _str_field(src: dict, *keys: str) -> str:
"""Return the first non-empty str value found for *keys*, stripped."""
for k in keys:
v = src.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return ""
def _make_synthetic_event(
message_id: str, author: str, content: Any,
meta: Any, group_id: str, converse_id: str,
timestamp: Any = None, *, author_info: Any = None,
) -> dict[str, Any]:
"""Build a synthetic ``message.add`` event dict."""
payload: dict[str, Any] = {
"messageId": message_id, "author": author,
"content": content, "meta": _safe_dict(meta),
"groupId": group_id, "converseId": converse_id,
}
if author_info is not None:
payload["authorInfo"] = _safe_dict(author_info)
return {
"type": "message.add",
"timestamp": timestamp or datetime.utcnow().isoformat(),
"payload": payload,
}
def normalize_mochat_content(content: Any) -> str:
"""Normalize content payload to text."""
if isinstance(content, str):
return content.strip()
if content is None:
return ""
try:
return json.dumps(content, ensure_ascii=False)
except TypeError:
return str(content)
def resolve_mochat_target(raw: str) -> MochatTarget:
"""Resolve id and target kind from user-provided target string."""
trimmed = (raw or "").strip()
if not trimmed:
return MochatTarget(id="", is_panel=False)
lowered = trimmed.lower()
cleaned, forced_panel = trimmed, False
for prefix in ("mochat:", "group:", "channel:", "panel:"):
if lowered.startswith(prefix):
cleaned = trimmed[len(prefix):].strip()
forced_panel = prefix in {"group:", "channel:", "panel:"}
break
if not cleaned:
return MochatTarget(id="", is_panel=False)
return MochatTarget(id=cleaned, is_panel=forced_panel or not cleaned.startswith("session_"))
def extract_mention_ids(value: Any) -> list[str]:
"""Extract mention ids from heterogeneous mention payload."""
if not isinstance(value, list):
return []
ids: list[str] = []
for item in value:
if isinstance(item, str):
if item.strip():
ids.append(item.strip())
elif isinstance(item, dict):
for key in ("id", "userId", "_id"):
candidate = item.get(key)
if isinstance(candidate, str) and candidate.strip():
ids.append(candidate.strip())
break
return ids
def resolve_was_mentioned(payload: dict[str, Any], agent_user_id: str) -> bool:
"""Resolve mention state from payload metadata and text fallback."""
meta = payload.get("meta")
if isinstance(meta, dict):
if meta.get("mentioned") is True or meta.get("wasMentioned") is True:
return True
for f in ("mentions", "mentionIds", "mentionedUserIds", "mentionedUsers"):
if agent_user_id and agent_user_id in extract_mention_ids(meta.get(f)):
return True
if not agent_user_id:
return False
content = payload.get("content")
if not isinstance(content, str) or not content:
return False
return f"<@{agent_user_id}>" in content or f"@{agent_user_id}" in content
def resolve_require_mention(config: MochatConfig, session_id: str, group_id: str) -> bool:
"""Resolve mention requirement for group/panel conversations."""
groups = config.groups or {}
for key in (group_id, session_id, "*"):
if key and key in groups:
return bool(groups[key].require_mention)
return bool(config.mention.require_in_groups)
def build_buffered_body(entries: list[MochatBufferedEntry], is_group: bool) -> str:
"""Build text body from one or more buffered entries."""
if not entries:
return ""
if len(entries) == 1:
return entries[0].raw_body
lines: list[str] = []
for entry in entries:
if not entry.raw_body:
continue
if is_group:
label = entry.sender_name.strip() or entry.sender_username.strip() or entry.author
if label:
lines.append(f"{label}: {entry.raw_body}")
continue
lines.append(entry.raw_body)
return "\n".join(lines).strip()
def parse_timestamp(value: Any) -> int | None:
"""Parse event timestamp to epoch milliseconds."""
if not isinstance(value, str) or not value.strip():
return None
try:
return int(datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() * 1000)
except ValueError:
return None
# ---------------------------------------------------------------------------
# Channel
# ---------------------------------------------------------------------------
class MochatChannel(BaseChannel):
"""Mochat channel using socket.io with fallback polling workers."""
name = "mochat"
def __init__(self, config: MochatConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: MochatConfig = config
self._http: httpx.AsyncClient | None = None
self._socket: Any = None
self._ws_connected = self._ws_ready = False
self._state_dir = get_data_path() / "mochat"
self._cursor_path = self._state_dir / "session_cursors.json"
self._session_cursor: dict[str, int] = {}
self._cursor_save_task: asyncio.Task | None = None
self._session_set: set[str] = set()
self._panel_set: set[str] = set()
self._auto_discover_sessions = self._auto_discover_panels = False
self._cold_sessions: set[str] = set()
self._session_by_converse: dict[str, str] = {}
self._seen_set: dict[str, set[str]] = {}
self._seen_queue: dict[str, deque[str]] = {}
self._delay_states: dict[str, DelayState] = {}
self._fallback_mode = False
self._session_fallback_tasks: dict[str, asyncio.Task] = {}
self._panel_fallback_tasks: dict[str, asyncio.Task] = {}
self._refresh_task: asyncio.Task | None = None
self._target_locks: dict[str, asyncio.Lock] = {}
# ---- lifecycle ---------------------------------------------------------
async def start(self) -> None:
"""Start Mochat channel workers and websocket connection."""
if not self.config.claw_token:
logger.error("Mochat claw_token not configured")
return
self._running = True
self._http = httpx.AsyncClient(timeout=30.0)
self._state_dir.mkdir(parents=True, exist_ok=True)
await self._load_session_cursors()
self._seed_targets_from_config()
await self._refresh_targets(subscribe_new=False)
if not await self._start_socket_client():
await self._ensure_fallback_workers()
self._refresh_task = asyncio.create_task(self._refresh_loop())
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop all workers and clean up resources."""
self._running = False
if self._refresh_task:
self._refresh_task.cancel()
self._refresh_task = None
await self._stop_fallback_workers()
await self._cancel_delay_timers()
if self._socket:
try:
await self._socket.disconnect()
except Exception:
pass
self._socket = None
if self._cursor_save_task:
self._cursor_save_task.cancel()
self._cursor_save_task = None
await self._save_session_cursors()
if self._http:
await self._http.aclose()
self._http = None
self._ws_connected = self._ws_ready = False
async def send(self, msg: OutboundMessage) -> None:
"""Send outbound message to session or panel."""
if not self.config.claw_token:
logger.warning("Mochat claw_token missing, skip send")
return
parts = ([msg.content.strip()] if msg.content and msg.content.strip() else [])
if msg.media:
parts.extend(m for m in msg.media if isinstance(m, str) and m.strip())
content = "\n".join(parts).strip()
if not content:
return
target = resolve_mochat_target(msg.chat_id)
if not target.id:
logger.warning("Mochat outbound target is empty")
return
is_panel = (target.is_panel or target.id in self._panel_set) and not target.id.startswith("session_")
try:
if is_panel:
await self._api_send("/api/claw/groups/panels/send", "panelId", target.id,
content, msg.reply_to, self._read_group_id(msg.metadata))
else:
await self._api_send("/api/claw/sessions/send", "sessionId", target.id,
content, msg.reply_to)
except Exception as e:
logger.error(f"Failed to send Mochat message: {e}")
# ---- config / init helpers ---------------------------------------------
def _seed_targets_from_config(self) -> None:
sessions, self._auto_discover_sessions = self._normalize_id_list(self.config.sessions)
panels, self._auto_discover_panels = self._normalize_id_list(self.config.panels)
self._session_set.update(sessions)
self._panel_set.update(panels)
for sid in sessions:
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
@staticmethod
def _normalize_id_list(values: list[str]) -> tuple[list[str], bool]:
cleaned = [str(v).strip() for v in values if str(v).strip()]
return sorted({v for v in cleaned if v != "*"}), "*" in cleaned
# ---- websocket ---------------------------------------------------------
async def _start_socket_client(self) -> bool:
if not SOCKETIO_AVAILABLE:
logger.warning("python-socketio not installed, Mochat using polling fallback")
return False
serializer = "default"
if not self.config.socket_disable_msgpack:
if MSGPACK_AVAILABLE:
serializer = "msgpack"
else:
logger.warning("msgpack not installed but socket_disable_msgpack=false; using JSON")
client = socketio.AsyncClient(
reconnection=True,
reconnection_attempts=self.config.max_retry_attempts or None,
reconnection_delay=max(0.1, self.config.socket_reconnect_delay_ms / 1000.0),
reconnection_delay_max=max(0.1, self.config.socket_max_reconnect_delay_ms / 1000.0),
logger=False, engineio_logger=False, serializer=serializer,
)
@client.event
async def connect() -> None:
self._ws_connected, self._ws_ready = True, False
logger.info("Mochat websocket connected")
subscribed = await self._subscribe_all()
self._ws_ready = subscribed
await (self._stop_fallback_workers() if subscribed else self._ensure_fallback_workers())
@client.event
async def disconnect() -> None:
if not self._running:
return
self._ws_connected = self._ws_ready = False
logger.warning("Mochat websocket disconnected")
await self._ensure_fallback_workers()
@client.event
async def connect_error(data: Any) -> None:
logger.error(f"Mochat websocket connect error: {data}")
@client.on("claw.session.events")
async def on_session_events(payload: dict[str, Any]) -> None:
await self._handle_watch_payload(payload, "session")
@client.on("claw.panel.events")
async def on_panel_events(payload: dict[str, Any]) -> None:
await self._handle_watch_payload(payload, "panel")
for ev in ("notify:chat.inbox.append", "notify:chat.message.add",
"notify:chat.message.update", "notify:chat.message.recall",
"notify:chat.message.delete"):
client.on(ev, self._build_notify_handler(ev))
socket_url = (self.config.socket_url or self.config.base_url).strip().rstrip("/")
socket_path = (self.config.socket_path or "/socket.io").strip().lstrip("/")
try:
self._socket = client
await client.connect(
socket_url, transports=["websocket"], socketio_path=socket_path,
auth={"token": self.config.claw_token},
wait_timeout=max(1.0, self.config.socket_connect_timeout_ms / 1000.0),
)
return True
except Exception as e:
logger.error(f"Failed to connect Mochat websocket: {e}")
try:
await client.disconnect()
except Exception:
pass
self._socket = None
return False
def _build_notify_handler(self, event_name: str):
async def handler(payload: Any) -> None:
if event_name == "notify:chat.inbox.append":
await self._handle_notify_inbox_append(payload)
elif event_name.startswith("notify:chat.message."):
await self._handle_notify_chat_message(payload)
return handler
# ---- subscribe ---------------------------------------------------------
async def _subscribe_all(self) -> bool:
ok = await self._subscribe_sessions(sorted(self._session_set))
ok = await self._subscribe_panels(sorted(self._panel_set)) and ok
if self._auto_discover_sessions or self._auto_discover_panels:
await self._refresh_targets(subscribe_new=True)
return ok
async def _subscribe_sessions(self, session_ids: list[str]) -> bool:
if not session_ids:
return True
for sid in session_ids:
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
ack = await self._socket_call("com.claw.im.subscribeSessions", {
"sessionIds": session_ids, "cursors": self._session_cursor,
"limit": self.config.watch_limit,
})
if not ack.get("result"):
logger.error(f"Mochat subscribeSessions failed: {ack.get('message', 'unknown error')}")
return False
data = ack.get("data")
items: list[dict[str, Any]] = []
if isinstance(data, list):
items = [i for i in data if isinstance(i, dict)]
elif isinstance(data, dict):
sessions = data.get("sessions")
if isinstance(sessions, list):
items = [i for i in sessions if isinstance(i, dict)]
elif "sessionId" in data:
items = [data]
for p in items:
await self._handle_watch_payload(p, "session")
return True
async def _subscribe_panels(self, panel_ids: list[str]) -> bool:
if not self._auto_discover_panels and not panel_ids:
return True
ack = await self._socket_call("com.claw.im.subscribePanels", {"panelIds": panel_ids})
if not ack.get("result"):
logger.error(f"Mochat subscribePanels failed: {ack.get('message', 'unknown error')}")
return False
return True
async def _socket_call(self, event_name: str, payload: dict[str, Any]) -> dict[str, Any]:
if not self._socket:
return {"result": False, "message": "socket not connected"}
try:
raw = await self._socket.call(event_name, payload, timeout=10)
except Exception as e:
return {"result": False, "message": str(e)}
return raw if isinstance(raw, dict) else {"result": True, "data": raw}
# ---- refresh / discovery -----------------------------------------------
async def _refresh_loop(self) -> None:
interval_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
while self._running:
await asyncio.sleep(interval_s)
try:
await self._refresh_targets(subscribe_new=self._ws_ready)
except Exception as e:
logger.warning(f"Mochat refresh failed: {e}")
if self._fallback_mode:
await self._ensure_fallback_workers()
async def _refresh_targets(self, subscribe_new: bool) -> None:
if self._auto_discover_sessions:
await self._refresh_sessions_directory(subscribe_new)
if self._auto_discover_panels:
await self._refresh_panels(subscribe_new)
async def _refresh_sessions_directory(self, subscribe_new: bool) -> None:
try:
response = await self._post_json("/api/claw/sessions/list", {})
except Exception as e:
logger.warning(f"Mochat listSessions failed: {e}")
return
sessions = response.get("sessions")
if not isinstance(sessions, list):
return
new_ids: list[str] = []
for s in sessions:
if not isinstance(s, dict):
continue
sid = _str_field(s, "sessionId")
if not sid:
continue
if sid not in self._session_set:
self._session_set.add(sid)
new_ids.append(sid)
if sid not in self._session_cursor:
self._cold_sessions.add(sid)
cid = _str_field(s, "converseId")
if cid:
self._session_by_converse[cid] = sid
if not new_ids:
return
if self._ws_ready and subscribe_new:
await self._subscribe_sessions(new_ids)
if self._fallback_mode:
await self._ensure_fallback_workers()
async def _refresh_panels(self, subscribe_new: bool) -> None:
try:
response = await self._post_json("/api/claw/groups/get", {})
except Exception as e:
logger.warning(f"Mochat getWorkspaceGroup failed: {e}")
return
raw_panels = response.get("panels")
if not isinstance(raw_panels, list):
return
new_ids: list[str] = []
for p in raw_panels:
if not isinstance(p, dict):
continue
pt = p.get("type")
if isinstance(pt, int) and pt != 0:
continue
pid = _str_field(p, "id", "_id")
if pid and pid not in self._panel_set:
self._panel_set.add(pid)
new_ids.append(pid)
if not new_ids:
return
if self._ws_ready and subscribe_new:
await self._subscribe_panels(new_ids)
if self._fallback_mode:
await self._ensure_fallback_workers()
# ---- fallback workers --------------------------------------------------
async def _ensure_fallback_workers(self) -> None:
if not self._running:
return
self._fallback_mode = True
for sid in sorted(self._session_set):
t = self._session_fallback_tasks.get(sid)
if not t or t.done():
self._session_fallback_tasks[sid] = asyncio.create_task(self._session_watch_worker(sid))
for pid in sorted(self._panel_set):
t = self._panel_fallback_tasks.get(pid)
if not t or t.done():
self._panel_fallback_tasks[pid] = asyncio.create_task(self._panel_poll_worker(pid))
async def _stop_fallback_workers(self) -> None:
self._fallback_mode = False
tasks = [*self._session_fallback_tasks.values(), *self._panel_fallback_tasks.values()]
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._session_fallback_tasks.clear()
self._panel_fallback_tasks.clear()
async def _session_watch_worker(self, session_id: str) -> None:
while self._running and self._fallback_mode:
try:
payload = await self._post_json("/api/claw/sessions/watch", {
"sessionId": session_id, "cursor": self._session_cursor.get(session_id, 0),
"timeoutMs": self.config.watch_timeout_ms, "limit": self.config.watch_limit,
})
await self._handle_watch_payload(payload, "session")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Mochat watch fallback error ({session_id}): {e}")
await asyncio.sleep(max(0.1, self.config.retry_delay_ms / 1000.0))
async def _panel_poll_worker(self, panel_id: str) -> None:
sleep_s = max(1.0, self.config.refresh_interval_ms / 1000.0)
while self._running and self._fallback_mode:
try:
resp = await self._post_json("/api/claw/groups/panels/messages", {
"panelId": panel_id, "limit": min(100, max(1, self.config.watch_limit)),
})
msgs = resp.get("messages")
if isinstance(msgs, list):
for m in reversed(msgs):
if not isinstance(m, dict):
continue
evt = _make_synthetic_event(
message_id=str(m.get("messageId") or ""),
author=str(m.get("author") or ""),
content=m.get("content"),
meta=m.get("meta"), group_id=str(resp.get("groupId") or ""),
converse_id=panel_id, timestamp=m.get("createdAt"),
author_info=m.get("authorInfo"),
)
await self._process_inbound_event(panel_id, evt, "panel")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Mochat panel polling error ({panel_id}): {e}")
await asyncio.sleep(sleep_s)
# ---- inbound event processing ------------------------------------------
async def _handle_watch_payload(self, payload: dict[str, Any], target_kind: str) -> None:
if not isinstance(payload, dict):
return
target_id = _str_field(payload, "sessionId")
if not target_id:
return
lock = self._target_locks.setdefault(f"{target_kind}:{target_id}", asyncio.Lock())
async with lock:
prev = self._session_cursor.get(target_id, 0) if target_kind == "session" else 0
pc = payload.get("cursor")
if target_kind == "session" and isinstance(pc, int) and pc >= 0:
self._mark_session_cursor(target_id, pc)
raw_events = payload.get("events")
if not isinstance(raw_events, list):
return
if target_kind == "session" and target_id in self._cold_sessions:
self._cold_sessions.discard(target_id)
return
for event in raw_events:
if not isinstance(event, dict):
continue
seq = event.get("seq")
if target_kind == "session" and isinstance(seq, int) and seq > self._session_cursor.get(target_id, prev):
self._mark_session_cursor(target_id, seq)
if event.get("type") == "message.add":
await self._process_inbound_event(target_id, event, target_kind)
async def _process_inbound_event(self, target_id: str, event: dict[str, Any], target_kind: str) -> None:
payload = event.get("payload")
if not isinstance(payload, dict):
return
author = _str_field(payload, "author")
if not author or (self.config.agent_user_id and author == self.config.agent_user_id):
return
if not self.is_allowed(author):
return
message_id = _str_field(payload, "messageId")
seen_key = f"{target_kind}:{target_id}"
if message_id and self._remember_message_id(seen_key, message_id):
return
raw_body = normalize_mochat_content(payload.get("content")) or "[empty message]"
ai = _safe_dict(payload.get("authorInfo"))
sender_name = _str_field(ai, "nickname", "email")
sender_username = _str_field(ai, "agentId")
group_id = _str_field(payload, "groupId")
is_group = bool(group_id)
was_mentioned = resolve_was_mentioned(payload, self.config.agent_user_id)
require_mention = target_kind == "panel" and is_group and resolve_require_mention(self.config, target_id, group_id)
use_delay = target_kind == "panel" and self.config.reply_delay_mode == "non-mention"
if require_mention and not was_mentioned and not use_delay:
return
entry = MochatBufferedEntry(
raw_body=raw_body, author=author, sender_name=sender_name,
sender_username=sender_username, timestamp=parse_timestamp(event.get("timestamp")),
message_id=message_id, group_id=group_id,
)
if use_delay:
delay_key = seen_key
if was_mentioned:
await self._flush_delayed_entries(delay_key, target_id, target_kind, "mention", entry)
else:
await self._enqueue_delayed_entry(delay_key, target_id, target_kind, entry)
return
await self._dispatch_entries(target_id, target_kind, [entry], was_mentioned)
# ---- dedup / buffering -------------------------------------------------
def _remember_message_id(self, key: str, message_id: str) -> bool:
seen_set = self._seen_set.setdefault(key, set())
seen_queue = self._seen_queue.setdefault(key, deque())
if message_id in seen_set:
return True
seen_set.add(message_id)
seen_queue.append(message_id)
while len(seen_queue) > MAX_SEEN_MESSAGE_IDS:
seen_set.discard(seen_queue.popleft())
return False
async def _enqueue_delayed_entry(self, key: str, target_id: str, target_kind: str, entry: MochatBufferedEntry) -> None:
state = self._delay_states.setdefault(key, DelayState())
async with state.lock:
state.entries.append(entry)
if state.timer:
state.timer.cancel()
state.timer = asyncio.create_task(self._delay_flush_after(key, target_id, target_kind))
async def _delay_flush_after(self, key: str, target_id: str, target_kind: str) -> None:
await asyncio.sleep(max(0, self.config.reply_delay_ms) / 1000.0)
await self._flush_delayed_entries(key, target_id, target_kind, "timer", None)
async def _flush_delayed_entries(self, key: str, target_id: str, target_kind: str, reason: str, entry: MochatBufferedEntry | None) -> None:
state = self._delay_states.setdefault(key, DelayState())
async with state.lock:
if entry:
state.entries.append(entry)
current = asyncio.current_task()
if state.timer and state.timer is not current:
state.timer.cancel()
state.timer = None
entries = state.entries[:]
state.entries.clear()
if entries:
await self._dispatch_entries(target_id, target_kind, entries, reason == "mention")
async def _dispatch_entries(self, target_id: str, target_kind: str, entries: list[MochatBufferedEntry], was_mentioned: bool) -> None:
if not entries:
return
last = entries[-1]
is_group = bool(last.group_id)
body = build_buffered_body(entries, is_group) or "[empty message]"
await self._handle_message(
sender_id=last.author, chat_id=target_id, content=body,
metadata={
"message_id": last.message_id, "timestamp": last.timestamp,
"is_group": is_group, "group_id": last.group_id,
"sender_name": last.sender_name, "sender_username": last.sender_username,
"target_kind": target_kind, "was_mentioned": was_mentioned,
"buffered_count": len(entries),
},
)
async def _cancel_delay_timers(self) -> None:
for state in self._delay_states.values():
if state.timer:
state.timer.cancel()
self._delay_states.clear()
# ---- notify handlers ---------------------------------------------------
async def _handle_notify_chat_message(self, payload: Any) -> None:
if not isinstance(payload, dict):
return
group_id = _str_field(payload, "groupId")
panel_id = _str_field(payload, "converseId", "panelId")
if not group_id or not panel_id:
return
if self._panel_set and panel_id not in self._panel_set:
return
evt = _make_synthetic_event(
message_id=str(payload.get("_id") or payload.get("messageId") or ""),
author=str(payload.get("author") or ""),
content=payload.get("content"), meta=payload.get("meta"),
group_id=group_id, converse_id=panel_id,
timestamp=payload.get("createdAt"), author_info=payload.get("authorInfo"),
)
await self._process_inbound_event(panel_id, evt, "panel")
async def _handle_notify_inbox_append(self, payload: Any) -> None:
if not isinstance(payload, dict) or payload.get("type") != "message":
return
detail = payload.get("payload")
if not isinstance(detail, dict):
return
if _str_field(detail, "groupId"):
return
converse_id = _str_field(detail, "converseId")
if not converse_id:
return
session_id = self._session_by_converse.get(converse_id)
if not session_id:
await self._refresh_sessions_directory(self._ws_ready)
session_id = self._session_by_converse.get(converse_id)
if not session_id:
return
evt = _make_synthetic_event(
message_id=str(detail.get("messageId") or payload.get("_id") or ""),
author=str(detail.get("messageAuthor") or ""),
content=str(detail.get("messagePlainContent") or detail.get("messageSnippet") or ""),
meta={"source": "notify:chat.inbox.append", "converseId": converse_id},
group_id="", converse_id=converse_id, timestamp=payload.get("createdAt"),
)
await self._process_inbound_event(session_id, evt, "session")
# ---- cursor persistence ------------------------------------------------
def _mark_session_cursor(self, session_id: str, cursor: int) -> None:
if cursor < 0 or cursor < self._session_cursor.get(session_id, 0):
return
self._session_cursor[session_id] = cursor
if not self._cursor_save_task or self._cursor_save_task.done():
self._cursor_save_task = asyncio.create_task(self._save_cursor_debounced())
async def _save_cursor_debounced(self) -> None:
await asyncio.sleep(CURSOR_SAVE_DEBOUNCE_S)
await self._save_session_cursors()
async def _load_session_cursors(self) -> None:
if not self._cursor_path.exists():
return
try:
data = json.loads(self._cursor_path.read_text("utf-8"))
except Exception as e:
logger.warning(f"Failed to read Mochat cursor file: {e}")
return
cursors = data.get("cursors") if isinstance(data, dict) else None
if isinstance(cursors, dict):
for sid, cur in cursors.items():
if isinstance(sid, str) and isinstance(cur, int) and cur >= 0:
self._session_cursor[sid] = cur
async def _save_session_cursors(self) -> None:
try:
self._state_dir.mkdir(parents=True, exist_ok=True)
self._cursor_path.write_text(json.dumps({
"schemaVersion": 1, "updatedAt": datetime.utcnow().isoformat(),
"cursors": self._session_cursor,
}, ensure_ascii=False, indent=2) + "\n", "utf-8")
except Exception as e:
logger.warning(f"Failed to save Mochat cursor file: {e}")
# ---- HTTP helpers ------------------------------------------------------
async def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
if not self._http:
raise RuntimeError("Mochat HTTP client not initialized")
url = f"{self.config.base_url.strip().rstrip('/')}{path}"
response = await self._http.post(url, headers={
"Content-Type": "application/json", "X-Claw-Token": self.config.claw_token,
}, json=payload)
if not response.is_success:
raise RuntimeError(f"Mochat HTTP {response.status_code}: {response.text[:200]}")
try:
parsed = response.json()
except Exception:
parsed = response.text
if isinstance(parsed, dict) and isinstance(parsed.get("code"), int):
if parsed["code"] != 200:
msg = str(parsed.get("message") or parsed.get("name") or "request failed")
raise RuntimeError(f"Mochat API error: {msg} (code={parsed['code']})")
data = parsed.get("data")
return data if isinstance(data, dict) else {}
return parsed if isinstance(parsed, dict) else {}
async def _api_send(self, path: str, id_key: str, id_val: str,
content: str, reply_to: str | None, group_id: str | None = None) -> dict[str, Any]:
"""Unified send helper for session and panel messages."""
body: dict[str, Any] = {id_key: id_val, "content": content}
if reply_to:
body["replyTo"] = reply_to
if group_id:
body["groupId"] = group_id
return await self._post_json(path, body)
@staticmethod
def _read_group_id(metadata: dict[str, Any]) -> str | None:
if not isinstance(metadata, dict):
return None
value = metadata.get("group_id") or metadata.get("groupId")
return value.strip() if isinstance(value, str) and value.strip() else None

131
nanobot/channels/qq.py Normal file
View File

@ -0,0 +1,131 @@
"""QQ channel implementation using botpy SDK."""
import asyncio
from collections import deque
from typing import TYPE_CHECKING
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import QQConfig
try:
import botpy
from botpy.message import C2CMessage
QQ_AVAILABLE = True
except ImportError:
QQ_AVAILABLE = False
botpy = None
C2CMessage = None
if TYPE_CHECKING:
from botpy.message import C2CMessage
def _make_bot_class(channel: "QQChannel") -> "type[botpy.Client]":
"""Create a botpy Client subclass bound to the given channel."""
intents = botpy.Intents(public_messages=True, direct_message=True)
class _Bot(botpy.Client):
def __init__(self):
super().__init__(intents=intents)
async def on_ready(self):
logger.info(f"QQ bot ready: {self.robot.name}")
async def on_c2c_message_create(self, message: "C2CMessage"):
await channel._on_message(message)
async def on_direct_message_create(self, message):
await channel._on_message(message)
return _Bot
class QQChannel(BaseChannel):
"""QQ channel using botpy SDK with WebSocket connection."""
name = "qq"
def __init__(self, config: QQConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: QQConfig = config
self._client: "botpy.Client | None" = None
self._processed_ids: deque = deque(maxlen=1000)
self._bot_task: asyncio.Task | None = None
async def start(self) -> None:
"""Start the QQ bot."""
if not QQ_AVAILABLE:
logger.error("QQ SDK not installed. Run: pip install qq-botpy")
return
if not self.config.app_id or not self.config.secret:
logger.error("QQ app_id and secret not configured")
return
self._running = True
BotClass = _make_bot_class(self)
self._client = BotClass()
self._bot_task = asyncio.create_task(self._run_bot())
logger.info("QQ bot started (C2C private message)")
async def _run_bot(self) -> None:
"""Run the bot connection."""
try:
await self._client.start(appid=self.config.app_id, secret=self.config.secret)
except Exception as e:
logger.error(f"QQ auth failed, check AppID/Secret at q.qq.com: {e}")
self._running = False
async def stop(self) -> None:
"""Stop the QQ bot."""
self._running = False
if self._bot_task:
self._bot_task.cancel()
try:
await self._bot_task
except asyncio.CancelledError:
pass
logger.info("QQ bot stopped")
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through QQ."""
if not self._client:
logger.warning("QQ client not initialized")
return
try:
await self._client.api.post_c2c_message(
openid=msg.chat_id,
msg_type=0,
content=msg.content,
)
except Exception as e:
logger.error(f"Error sending QQ message: {e}")
async def _on_message(self, data: "C2CMessage") -> None:
"""Handle incoming message from QQ."""
try:
# Dedup by message ID
if data.id in self._processed_ids:
return
self._processed_ids.append(data.id)
author = data.author
user_id = str(getattr(author, 'id', None) or getattr(author, 'user_openid', 'unknown'))
content = (data.content or "").strip()
if not content:
return
await self._handle_message(
sender_id=user_id,
chat_id=user_id,
content=content,
metadata={"message_id": data.id},
)
except Exception as e:
logger.error(f"Error handling QQ message: {e}")

205
nanobot/channels/slack.py Normal file
View File

@ -0,0 +1,205 @@
"""Slack channel implementation using Socket Mode."""
import asyncio
import re
from typing import Any
from loguru import logger
from slack_sdk.socket_mode.websockets import SocketModeClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from slack_sdk.web.async_client import AsyncWebClient
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import SlackConfig
class SlackChannel(BaseChannel):
"""Slack channel using Socket Mode."""
name = "slack"
def __init__(self, config: SlackConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: SlackConfig = config
self._web_client: AsyncWebClient | None = None
self._socket_client: SocketModeClient | None = None
self._bot_user_id: str | None = None
async def start(self) -> None:
"""Start the Slack Socket Mode client."""
if not self.config.bot_token or not self.config.app_token:
logger.error("Slack bot/app token not configured")
return
if self.config.mode != "socket":
logger.error(f"Unsupported Slack mode: {self.config.mode}")
return
self._running = True
self._web_client = AsyncWebClient(token=self.config.bot_token)
self._socket_client = SocketModeClient(
app_token=self.config.app_token,
web_client=self._web_client,
)
self._socket_client.socket_mode_request_listeners.append(self._on_socket_request)
# Resolve bot user ID for mention handling
try:
auth = await self._web_client.auth_test()
self._bot_user_id = auth.get("user_id")
logger.info(f"Slack bot connected as {self._bot_user_id}")
except Exception as e:
logger.warning(f"Slack auth_test failed: {e}")
logger.info("Starting Slack Socket Mode client...")
await self._socket_client.connect()
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop the Slack client."""
self._running = False
if self._socket_client:
try:
await self._socket_client.close()
except Exception as e:
logger.warning(f"Slack socket close failed: {e}")
self._socket_client = None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Slack."""
if not self._web_client:
logger.warning("Slack client not running")
return
try:
slack_meta = msg.metadata.get("slack", {}) if msg.metadata else {}
thread_ts = slack_meta.get("thread_ts")
channel_type = slack_meta.get("channel_type")
# Only reply in thread for channel/group messages; DMs don't use threads
use_thread = thread_ts and channel_type != "im"
await self._web_client.chat_postMessage(
channel=msg.chat_id,
text=msg.content or "",
thread_ts=thread_ts if use_thread else None,
)
except Exception as e:
logger.error(f"Error sending Slack message: {e}")
async def _on_socket_request(
self,
client: SocketModeClient,
req: SocketModeRequest,
) -> None:
"""Handle incoming Socket Mode requests."""
if req.type != "events_api":
return
# Acknowledge right away
await client.send_socket_mode_response(
SocketModeResponse(envelope_id=req.envelope_id)
)
payload = req.payload or {}
event = payload.get("event") or {}
event_type = event.get("type")
# Handle app mentions or plain messages
if event_type not in ("message", "app_mention"):
return
sender_id = event.get("user")
chat_id = event.get("channel")
# Ignore bot/system messages (any subtype = not a normal user message)
if event.get("subtype"):
return
if self._bot_user_id and sender_id == self._bot_user_id:
return
# Avoid double-processing: Slack sends both `message` and `app_mention`
# for mentions in channels. Prefer `app_mention`.
text = event.get("text") or ""
if event_type == "message" and self._bot_user_id and f"<@{self._bot_user_id}>" in text:
return
# Debug: log basic event shape
logger.debug(
"Slack event: type={} subtype={} user={} channel={} channel_type={} text={}",
event_type,
event.get("subtype"),
sender_id,
chat_id,
event.get("channel_type"),
text[:80],
)
if not sender_id or not chat_id:
return
channel_type = event.get("channel_type") or ""
if not self._is_allowed(sender_id, chat_id, channel_type):
return
if channel_type != "im" and not self._should_respond_in_channel(event_type, text, chat_id):
return
text = self._strip_bot_mention(text)
thread_ts = event.get("thread_ts") or event.get("ts")
# Add :eyes: reaction to the triggering message (best-effort)
try:
if self._web_client and event.get("ts"):
await self._web_client.reactions_add(
channel=chat_id,
name="eyes",
timestamp=event.get("ts"),
)
except Exception as e:
logger.debug(f"Slack reactions_add failed: {e}")
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=text,
metadata={
"slack": {
"event": event,
"thread_ts": thread_ts,
"channel_type": channel_type,
}
},
)
def _is_allowed(self, sender_id: str, chat_id: str, channel_type: str) -> bool:
if channel_type == "im":
if not self.config.dm.enabled:
return False
if self.config.dm.policy == "allowlist":
return sender_id in self.config.dm.allow_from
return True
# Group / channel messages
if self.config.group_policy == "allowlist":
return chat_id in self.config.group_allow_from
return True
def _should_respond_in_channel(self, event_type: str, text: str, chat_id: str) -> bool:
if self.config.group_policy == "open":
return True
if self.config.group_policy == "mention":
if event_type == "app_mention":
return True
return self._bot_user_id is not None and f"<@{self._bot_user_id}>" in text
if self.config.group_policy == "allowlist":
return chat_id in self.config.group_allow_from
return False
def _strip_bot_mention(self, text: str) -> str:
if not text or not self._bot_user_id:
return text
return re.sub(rf"<@{re.escape(self._bot_user_id)}>\s*", "", text).strip()

View File

@ -1,17 +1,23 @@
"""Telegram channel implementation using python-telegram-bot.""" """Telegram channel implementation using python-telegram-bot."""
from __future__ import annotations
import asyncio import asyncio
import re import re
from typing import TYPE_CHECKING
from loguru import logger from loguru import logger
from telegram import Update from telegram import BotCommand, Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import TelegramConfig from nanobot.config.schema import TelegramConfig
if TYPE_CHECKING:
from nanobot.session.manager import SessionManager
def _markdown_to_telegram_html(text: str) -> str: def _markdown_to_telegram_html(text: str) -> str:
""" """
@ -85,12 +91,27 @@ class TelegramChannel(BaseChannel):
name = "telegram" name = "telegram"
def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""): # Commands registered with Telegram's command menu
BOT_COMMANDS = [
BotCommand("start", "Start the bot"),
BotCommand("reset", "Reset conversation history"),
BotCommand("help", "Show available commands"),
]
def __init__(
self,
config: TelegramConfig,
bus: MessageBus,
groq_api_key: str = "",
session_manager: SessionManager | None = None,
):
super().__init__(config, bus) super().__init__(config, bus)
self.config: TelegramConfig = config self.config: TelegramConfig = config
self.groq_api_key = groq_api_key self.groq_api_key = groq_api_key
self.session_manager = session_manager
self._app: Application | None = None self._app: Application | None = None
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
async def start(self) -> None: async def start(self) -> None:
"""Start the Telegram bot with long polling.""" """Start the Telegram bot with long polling."""
@ -101,11 +122,15 @@ class TelegramChannel(BaseChannel):
self._running = True self._running = True
# Build the application # Build the application
self._app = ( builder = Application.builder().token(self.config.token)
Application.builder() if self.config.proxy:
.token(self.config.token) builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
.build() self._app = builder.build()
)
# Add command handlers
self._app.add_handler(CommandHandler("start", self._on_start))
self._app.add_handler(CommandHandler("reset", self._on_reset))
self._app.add_handler(CommandHandler("help", self._on_help))
# Add message handler for text, photos, voice, documents # Add message handler for text, photos, voice, documents
self._app.add_handler( self._app.add_handler(
@ -116,20 +141,22 @@ class TelegramChannel(BaseChannel):
) )
) )
# Add /start command handler
from telegram.ext import CommandHandler
self._app.add_handler(CommandHandler("start", self._on_start))
logger.info("Starting Telegram bot (polling mode)...") logger.info("Starting Telegram bot (polling mode)...")
# Initialize and start polling # Initialize and start polling
await self._app.initialize() await self._app.initialize()
await self._app.start() await self._app.start()
# Get bot info # Get bot info and register command menu
bot_info = await self._app.bot.get_me() bot_info = await self._app.bot.get_me()
logger.info(f"Telegram bot @{bot_info.username} connected") logger.info(f"Telegram bot @{bot_info.username} connected")
try:
await self._app.bot.set_my_commands(self.BOT_COMMANDS)
logger.debug("Telegram bot commands registered")
except Exception as e:
logger.warning(f"Failed to register bot commands: {e}")
# Start polling (this runs until stopped) # Start polling (this runs until stopped)
await self._app.updater.start_polling( await self._app.updater.start_polling(
allowed_updates=["message"], allowed_updates=["message"],
@ -144,6 +171,10 @@ class TelegramChannel(BaseChannel):
"""Stop the Telegram bot.""" """Stop the Telegram bot."""
self._running = False self._running = False
# Cancel all typing indicators
for chat_id in list(self._typing_tasks):
self._stop_typing(chat_id)
if self._app: if self._app:
logger.info("Stopping Telegram bot...") logger.info("Stopping Telegram bot...")
await self._app.updater.stop() await self._app.updater.stop()
@ -157,6 +188,9 @@ class TelegramChannel(BaseChannel):
logger.warning("Telegram bot not running") logger.warning("Telegram bot not running")
return return
# Stop typing indicator for this chat
self._stop_typing(msg.chat_id)
try: try:
# chat_id should be the Telegram chat ID (integer) # chat_id should be the Telegram chat ID (integer)
chat_id = int(msg.chat_id) chat_id = int(msg.chat_id)
@ -188,9 +222,45 @@ class TelegramChannel(BaseChannel):
user = update.effective_user user = update.effective_user
await update.message.reply_text( await update.message.reply_text(
f"👋 Hi {user.first_name}! I'm nanobot.\n\n" f"👋 Hi {user.first_name}! I'm nanobot.\n\n"
"Send me a message and I'll respond!" "Send me a message and I'll respond!\n"
"Type /help to see available commands."
) )
async def _on_reset(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /reset command — clear conversation history."""
if not update.message or not update.effective_user:
return
chat_id = str(update.message.chat_id)
session_key = f"{self.name}:{chat_id}"
if self.session_manager is None:
logger.warning("/reset called but session_manager is not available")
await update.message.reply_text("⚠️ Session management is not available.")
return
session = self.session_manager.get_or_create(session_key)
msg_count = len(session.messages)
session.clear()
self.session_manager.save(session)
logger.info(f"Session reset for {session_key} (cleared {msg_count} messages)")
await update.message.reply_text("🔄 Conversation history cleared. Let's start fresh!")
async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /help command — show available commands."""
if not update.message:
return
help_text = (
"🐈 <b>nanobot commands</b>\n\n"
"/start — Start the bot\n"
"/reset — Reset conversation history\n"
"/help — Show this help message\n\n"
"Just send me a text message to chat!"
)
await update.message.reply_text(help_text, parse_mode="HTML")
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming messages (text, photos, voice, documents).""" """Handle incoming messages (text, photos, voice, documents)."""
if not update.message or not update.effective_user: if not update.message or not update.effective_user:
@ -273,10 +343,15 @@ class TelegramChannel(BaseChannel):
logger.debug(f"Telegram message from {sender_id}: {content[:50]}...") logger.debug(f"Telegram message from {sender_id}: {content[:50]}...")
str_chat_id = str(chat_id)
# Start typing indicator before processing
self._start_typing(str_chat_id)
# Forward to the message bus # Forward to the message bus
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=str(chat_id), chat_id=str_chat_id,
content=content, content=content,
media=media_paths, media=media_paths,
metadata={ metadata={
@ -288,6 +363,29 @@ class TelegramChannel(BaseChannel):
} }
) )
def _start_typing(self, chat_id: str) -> None:
"""Start sending 'typing...' indicator for a chat."""
# Cancel any existing typing task for this chat
self._stop_typing(chat_id)
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
def _stop_typing(self, chat_id: str) -> None:
"""Stop the typing indicator for a chat."""
task = self._typing_tasks.pop(chat_id, None)
if task and not task.done():
task.cancel()
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled."""
try:
while self._app:
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
def _get_extension(self, media_type: str, mime_type: str | None) -> str: def _get_extension(self, media_type: str, mime_type: str | None) -> str:
"""Get file extension based on media type.""" """Get file extension based on media type."""
if mime_type: if mime_type:

View File

@ -100,21 +100,25 @@ class WhatsAppChannel(BaseChannel):
if msg_type == "message": if msg_type == "message":
# Incoming message from WhatsApp # Incoming message from WhatsApp
# Deprecated by whatsapp: old phone number style typically: <phone>@s.whatspp.net
pn = data.get("pn", "")
# New LID sytle typically:
sender = data.get("sender", "") sender = data.get("sender", "")
content = data.get("content", "") content = data.get("content", "")
# sender is typically: <phone>@s.whatsapp.net # Extract just the phone number or lid as chat_id
# Extract just the phone number as chat_id user_id = pn if pn else sender
chat_id = sender.split("@")[0] if "@" in sender else sender sender_id = user_id.split("@")[0] if "@" in user_id else user_id
logger.info(f"Sender {sender}")
# Handle voice transcription if it's a voice message # Handle voice transcription if it's a voice message
if content == "[Voice Message]": if content == "[Voice Message]":
logger.info(f"Voice message received from {chat_id}, but direct download from bridge is not yet supported.") logger.info(f"Voice message received from {sender_id}, but direct download from bridge is not yet supported.")
content = "[Voice Message: Transcription not available for WhatsApp yet]" content = "[Voice Message: Transcription not available for WhatsApp yet]"
await self._handle_message( await self._handle_message(
sender_id=chat_id, sender_id=sender_id,
chat_id=sender, # Use full JID for replies chat_id=sender, # Use full LID for replies
content=content, content=content,
metadata={ metadata={
"message_id": data.get("id"), "message_id": data.get("id"),

View File

@ -1,11 +1,19 @@
"""CLI commands for nanobot.""" """CLI commands for nanobot."""
import asyncio import asyncio
import atexit
import os
import signal
from pathlib import Path from pathlib import Path
import select
import sys
import typer import typer
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table from rich.table import Table
from rich.text import Text
from nanobot import __version__, __logo__ from nanobot import __version__, __logo__
@ -16,6 +24,146 @@ app = typer.Typer(
) )
console = Console() console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
# ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush
# ---------------------------------------------------------------------------
_READLINE = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
def _flush_pending_tty_input() -> None:
"""Drop unread keypresses typed while the model was generating output."""
try:
fd = sys.stdin.fileno()
if not os.isatty(fd):
return
except Exception:
return
try:
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
pass
try:
while True:
ready, _, _ = select.select([fd], [], [], 0)
if not ready:
break
if not os.read(fd, 4096):
break
except Exception:
return
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
def _restore_terminal() -> None:
"""Restore terminal to its original state (echo, line buffering, etc.)."""
if _SAVED_TERM_ATTRS is None:
return
try:
import termios
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, _SAVED_TERM_ATTRS)
except Exception:
pass
def _enable_line_editing() -> None:
"""Enable readline for arrow keys, line editing, and persistent history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS
# Save terminal state before readline touches it
try:
import termios
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
except Exception:
pass
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
try:
import readline
except ImportError:
return
_READLINE = readline
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
try:
if _USING_LIBEDIT:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
except Exception:
pass
try:
readline.read_history_file(str(history_file))
except Exception:
pass
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
def _prompt_text() -> str:
"""Build a readline-friendly colored prompt."""
if _READLINE is None:
return "You: "
# libedit on macOS does not honor GNU readline non-printing markers.
if _USING_LIBEDIT:
return "\033[1;34mYou:\033[0m "
return "\001\033[1;34m\002You:\001\033[0m\002 "
def _print_agent_response(response: str, render_markdown: bool) -> None:
"""Render assistant response with consistent terminal styling."""
content = response or ""
body = Markdown(content) if render_markdown else Text(content)
console.print()
console.print(
Panel(
body,
title=f"{__logo__} nanobot",
title_align="left",
border_style="cyan",
padding=(0, 1),
)
)
console.print()
def _is_exit_command(command: str) -> bool:
"""Return True when input should end interactive chat."""
return command.lower() in EXIT_COMMANDS
async def _read_interactive_input_async() -> str:
"""Read user input with arrow keys and history (runs input() in a thread)."""
try:
return await asyncio.to_thread(input, _prompt_text())
except EOFError as exc:
raise KeyboardInterrupt from exc
def version_callback(value: bool): def version_callback(value: bool):
@ -146,6 +294,10 @@ This file stores important information that should persist across sessions.
""") """)
console.print(" [dim]Created memory/MEMORY.md[/dim]") console.print(" [dim]Created memory/MEMORY.md[/dim]")
# Create skills directory for custom user skills
skills_dir = workspace / "skills"
skills_dir.mkdir(exist_ok=True)
def _make_provider(config): def _make_provider(config):
"""Create LiteLLMProvider from config. Exits if no API key found.""" """Create LiteLLMProvider from config. Exits if no API key found."""
@ -161,6 +313,7 @@ def _make_provider(config):
api_base=config.get_api_base(), api_base=config.get_api_base(),
default_model=model, default_model=model,
extra_headers=p.extra_headers if p else None, extra_headers=p.extra_headers if p else None,
provider_name=config.get_provider_name(),
) )
@ -179,6 +332,7 @@ def gateway(
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from nanobot.channels.manager import ChannelManager from nanobot.channels.manager import ChannelManager
from nanobot.session.manager import SessionManager
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService from nanobot.heartbeat.service import HeartbeatService
@ -192,6 +346,7 @@ def gateway(
config = load_config() config = load_config()
bus = MessageBus() bus = MessageBus()
provider = _make_provider(config) provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
# Create cron service first (callback set after agent creation) # Create cron service first (callback set after agent creation)
cron_store_path = get_data_dir() / "cron" / "jobs.json" cron_store_path = get_data_dir() / "cron" / "jobs.json"
@ -208,6 +363,7 @@ def gateway(
exec_config=config.tools.exec, exec_config=config.tools.exec,
cron_service=cron, cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace, restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
) )
# Set cron callback (needs agent) # Set cron callback (needs agent)
@ -242,7 +398,7 @@ def gateway(
) )
# Create channel manager # Create channel manager
channels = ChannelManager(config, bus) channels = ChannelManager(config, bus, session_manager=session_manager)
if channels.enabled_channels: if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
@ -284,17 +440,25 @@ def gateway(
def agent( def agent(
message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"), message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"),
session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"), session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"),
markdown: bool = typer.Option(True, "--markdown/--no-markdown", help="Render assistant output as Markdown"),
logs: bool = typer.Option(False, "--logs/--no-logs", help="Show nanobot runtime logs during chat"),
): ):
"""Interact with the agent directly.""" """Interact with the agent directly."""
from nanobot.config.loader import load_config from nanobot.config.loader import load_config
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from loguru import logger
config = load_config() config = load_config()
bus = MessageBus() bus = MessageBus()
provider = _make_provider(config) provider = _make_provider(config)
if logs:
logger.enable("nanobot")
else:
logger.disable("nanobot")
agent_loop = AgentLoop( agent_loop = AgentLoop(
bus=bus, bus=bus,
provider=provider, provider=provider,
@ -304,27 +468,62 @@ def agent(
restrict_to_workspace=config.tools.restrict_to_workspace, restrict_to_workspace=config.tools.restrict_to_workspace,
) )
# Show spinner when logs are off (no output to miss); skip when logs are on
def _thinking_ctx():
if logs:
from contextlib import nullcontext
return nullcontext()
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
if message: if message:
# Single message mode # Single message mode
async def run_once(): async def run_once():
response = await agent_loop.process_direct(message, session_id) with _thinking_ctx():
console.print(f"\n{__logo__} {response}") response = await agent_loop.process_direct(message, session_id)
_print_agent_response(response, render_markdown=markdown)
asyncio.run(run_once()) asyncio.run(run_once())
else: else:
# Interactive mode # Interactive mode
console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n") _enable_line_editing()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
# input() runs in a worker thread that can't be cancelled.
# Without this handler, asyncio.run() would hang waiting for it.
def _exit_on_sigint(signum, frame):
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive(): async def run_interactive():
while True: while True:
try: try:
user_input = console.input("[bold blue]You:[/bold blue] ") _flush_pending_tty_input()
if not user_input.strip(): user_input = await _read_interactive_input_async()
command = user_input.strip()
if not command:
continue continue
response = await agent_loop.process_direct(user_input, session_id) if _is_exit_command(command):
console.print(f"\n{__logo__} {response}\n") _save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id)
_print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt: except KeyboardInterrupt:
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
except EOFError:
_save_history()
_restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
@ -367,6 +566,24 @@ def channels_status():
dc.gateway_url dc.gateway_url
) )
# Feishu
fs = config.channels.feishu
fs_config = f"app_id: {fs.app_id[:10]}..." if fs.app_id else "[dim]not configured[/dim]"
table.add_row(
"Feishu",
"" if fs.enabled else "",
fs_config
)
# Mochat
mc = config.channels.mochat
mc_base = mc.base_url or "[dim]not configured[/dim]"
table.add_row(
"Mochat",
"" if mc.enabled else "",
mc_base
)
# Telegram # Telegram
tg = config.channels.telegram tg = config.channels.telegram
tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]" tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
@ -376,6 +593,15 @@ def channels_status():
tg_config tg_config
) )
# Slack
slack = config.channels.slack
slack_config = "socket" if slack.app_token and slack.bot_token else "[dim]not configured[/dim]"
table.add_row(
"Slack",
"" if slack.enabled else "",
slack_config
)
console.print(table) console.print(table)
@ -632,27 +858,24 @@ def status():
console.print(f"Workspace: {workspace} {'[green]✓[/green]' if workspace.exists() else '[red]✗[/red]'}") console.print(f"Workspace: {workspace} {'[green]✓[/green]' if workspace.exists() else '[red]✗[/red]'}")
if config_path.exists(): if config_path.exists():
from nanobot.providers.registry import PROVIDERS
console.print(f"Model: {config.agents.defaults.model}") console.print(f"Model: {config.agents.defaults.model}")
# Check API keys # Check API keys from registry
has_openrouter = bool(config.providers.openrouter.api_key) for spec in PROVIDERS:
has_anthropic = bool(config.providers.anthropic.api_key) p = getattr(config.providers, spec.name, None)
has_openai = bool(config.providers.openai.api_key) if p is None:
has_gemini = bool(config.providers.gemini.api_key) continue
has_zhipu = bool(config.providers.zhipu.api_key) if spec.is_local:
has_minimax = bool(config.providers.minimax.api_key) # Local deployments show api_base instead of api_key
has_vllm = bool(config.providers.vllm.api_base) if p.api_base:
has_aihubmix = bool(config.providers.aihubmix.api_key) console.print(f"{spec.label}: [green]✓ {p.api_base}[/green]")
else:
console.print(f"OpenRouter API: {'[green]✓[/green]' if has_openrouter else '[dim]not set[/dim]'}") console.print(f"{spec.label}: [dim]not set[/dim]")
console.print(f"Anthropic API: {'[green]✓[/green]' if has_anthropic else '[dim]not set[/dim]'}") else:
console.print(f"OpenAI API: {'[green]✓[/green]' if has_openai else '[dim]not set[/dim]'}") has_key = bool(p.api_key)
console.print(f"Gemini API: {'[green]✓[/green]' if has_gemini else '[dim]not set[/dim]'}") console.print(f"{spec.label}: {'[green]✓[/green]' if has_key else '[dim]not set[/dim]'}")
console.print(f"Zhipu AI API: {'[green]✓[/green]' if has_zhipu else '[dim]not set[/dim]'}")
console.print(f"MiniMax API: {'[green]✓[/green]' if has_minimax else '[dim]not set[/dim]'}")
console.print(f"AiHubMix API: {'[green]✓[/green]' if has_aihubmix else '[dim]not set[/dim]'}")
vllm_status = f"[green]✓ {config.providers.vllm.api_base}[/green]" if has_vllm else "[dim]not set[/dim]"
console.print(f"vLLM/Local: {vllm_status}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -30,6 +30,14 @@ class FeishuConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids
class DingTalkConfig(BaseModel):
"""DingTalk channel configuration using Stream mode."""
enabled: bool = False
client_id: str = "" # AppKey
client_secret: str = "" # AppSecret
allow_from: list[str] = Field(default_factory=list) # Allowed staff_ids
class DiscordConfig(BaseModel): class DiscordConfig(BaseModel):
"""Discord channel configuration.""" """Discord channel configuration."""
enabled: bool = False enabled: bool = False
@ -38,6 +46,100 @@ class DiscordConfig(BaseModel):
gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json" gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
class EmailConfig(BaseModel):
"""Email channel configuration (IMAP inbound + SMTP outbound)."""
enabled: bool = False
consent_granted: bool = False # Explicit owner permission to access mailbox data
# IMAP (receive)
imap_host: str = ""
imap_port: int = 993
imap_username: str = ""
imap_password: str = ""
imap_mailbox: str = "INBOX"
imap_use_ssl: bool = True
# SMTP (send)
smtp_host: str = ""
smtp_port: int = 587
smtp_username: str = ""
smtp_password: str = ""
smtp_use_tls: bool = True
smtp_use_ssl: bool = False
from_address: str = ""
# Behavior
auto_reply_enabled: bool = True # If false, inbound email is read but no automatic reply is sent
poll_interval_seconds: int = 30
mark_seen: bool = True
max_body_chars: int = 12000
subject_prefix: str = "Re: "
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
class MochatMentionConfig(BaseModel):
"""Mochat mention behavior configuration."""
require_in_groups: bool = False
class MochatGroupRule(BaseModel):
"""Mochat per-group mention requirement."""
require_mention: bool = False
class MochatConfig(BaseModel):
"""Mochat channel configuration."""
enabled: bool = False
base_url: str = "https://mochat.io"
socket_url: str = ""
socket_path: str = "/socket.io"
socket_disable_msgpack: bool = False
socket_reconnect_delay_ms: int = 1000
socket_max_reconnect_delay_ms: int = 10000
socket_connect_timeout_ms: int = 10000
refresh_interval_ms: int = 30000
watch_timeout_ms: int = 25000
watch_limit: int = 100
retry_delay_ms: int = 500
max_retry_attempts: int = 0 # 0 means unlimited retries
claw_token: str = ""
agent_user_id: str = ""
sessions: list[str] = Field(default_factory=list)
panels: list[str] = Field(default_factory=list)
allow_from: list[str] = Field(default_factory=list)
mention: MochatMentionConfig = Field(default_factory=MochatMentionConfig)
groups: dict[str, MochatGroupRule] = Field(default_factory=dict)
reply_delay_mode: str = "non-mention" # off | non-mention
reply_delay_ms: int = 120000
class SlackDMConfig(BaseModel):
"""Slack DM policy configuration."""
enabled: bool = True
policy: str = "open" # "open" or "allowlist"
allow_from: list[str] = Field(default_factory=list) # Allowed Slack user IDs
class SlackConfig(BaseModel):
"""Slack channel configuration."""
enabled: bool = False
mode: str = "socket" # "socket" supported
webhook_path: str = "/slack/events"
bot_token: str = "" # xoxb-...
app_token: str = "" # xapp-...
user_token_read_only: bool = True
group_policy: str = "mention" # "mention", "open", "allowlist"
group_allow_from: list[str] = Field(default_factory=list) # Allowed channel IDs if allowlist
dm: SlackDMConfig = Field(default_factory=SlackDMConfig)
class QQConfig(BaseModel):
"""QQ channel configuration using botpy SDK."""
enabled: bool = False
app_id: str = "" # 机器人 ID (AppID) from q.qq.com
secret: str = "" # 机器人密钥 (AppSecret) from q.qq.com
allow_from: list[str] = Field(default_factory=list) # Allowed user openids (empty = public access)
class ChannelsConfig(BaseModel): class ChannelsConfig(BaseModel):
"""Configuration for chat channels.""" """Configuration for chat channels."""
@ -45,6 +147,11 @@ class ChannelsConfig(BaseModel):
telegram: TelegramConfig = Field(default_factory=TelegramConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig) discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig) feishu: FeishuConfig = Field(default_factory=FeishuConfig)
mochat: MochatConfig = Field(default_factory=MochatConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig)
qq: QQConfig = Field(default_factory=QQConfig)
class AgentDefaults(BaseModel): class AgentDefaults(BaseModel):
@ -126,30 +233,33 @@ class Config(BaseSettings):
"""Get expanded workspace path.""" """Get expanded workspace path."""
return Path(self.agents.defaults.workspace).expanduser() return Path(self.agents.defaults.workspace).expanduser()
# Default base URLs for API gateways def _match_provider(self, model: str | None = None) -> tuple["ProviderConfig | None", str | None]:
_GATEWAY_DEFAULTS = {"openrouter": "https://openrouter.ai/api/v1", "aihubmix": "https://aihubmix.com/v1"} """Match provider config and its registry name. Returns (config, spec_name)."""
from nanobot.providers.registry import PROVIDERS
model_lower = (model or self.agents.defaults.model).lower()
# Match by keyword (order follows PROVIDERS registry)
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
if p and any(kw in model_lower for kw in spec.keywords) and p.api_key:
return p, spec.name
# Fallback: gateways first, then others (follows registry order)
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
if p and p.api_key:
return p, spec.name
return None, None
def get_provider(self, model: str | None = None) -> ProviderConfig | None: def get_provider(self, model: str | None = None) -> ProviderConfig | None:
"""Get matched provider config (api_key, api_base, extra_headers). Falls back to first available.""" """Get matched provider config (api_key, api_base, extra_headers). Falls back to first available."""
model = (model or self.agents.defaults.model).lower() p, _ = self._match_provider(model)
p = self.providers return p
# Keyword → provider mapping (order matters: gateways first)
keyword_map = { def get_provider_name(self, model: str | None = None) -> str | None:
"aihubmix": p.aihubmix, "openrouter": p.openrouter, """Get the registry name of the matched provider (e.g. "deepseek", "openrouter")."""
"deepseek": p.deepseek, "anthropic": p.anthropic, "claude": p.anthropic, _, name = self._match_provider(model)
"openai": p.openai, "gpt": p.openai, "gemini": p.gemini, return name
"zhipu": p.zhipu, "glm": p.zhipu, "zai": p.zhipu,
"dashscope": p.dashscope, "qwen": p.dashscope,
"groq": p.groq, "moonshot": p.moonshot, "kimi": p.moonshot,
"minimax": p.minimax, "vllm": p.vllm,
}
for kw, provider in keyword_map.items():
if kw in model and provider.api_key:
return provider
# Fallback: gateways first (can serve any model), then specific providers
all_providers = [p.openrouter, p.aihubmix, p.anthropic, p.openai, p.deepseek,
p.gemini, p.zhipu, p.dashscope, p.moonshot, p.minimax, p.vllm, p.groq]
return next((pr for pr in all_providers if pr.api_key), None)
def get_api_key(self, model: str | None = None) -> str | None: def get_api_key(self, model: str | None = None) -> str | None:
"""Get API key for the given model. Falls back to first available key.""" """Get API key for the given model. Falls back to first available key."""
@ -158,13 +268,17 @@ class Config(BaseSettings):
def get_api_base(self, model: str | None = None) -> str | None: def get_api_base(self, model: str | None = None) -> str | None:
"""Get API base URL for the given model. Applies default URLs for known gateways.""" """Get API base URL for the given model. Applies default URLs for known gateways."""
p = self.get_provider(model) from nanobot.providers.registry import find_by_name
p, name = self._match_provider(model)
if p and p.api_base: if p and p.api_base:
return p.api_base return p.api_base
# Default URLs for known gateways (openrouter, aihubmix) # Only gateways get a default api_base here. Standard providers
for name, url in self._GATEWAY_DEFAULTS.items(): # (like Moonshot) set their base URL via env vars in _setup_env
if p == getattr(self.providers, name): # to avoid polluting the global litellm.api_base.
return url if name:
spec = find_by_name(name)
if spec and spec.is_gateway and spec.default_api_base:
return spec.default_api_base
return None return None
class Config: class Config:

View File

@ -20,6 +20,7 @@ class LLMResponse:
tool_calls: list[ToolCallRequest] = field(default_factory=list) tool_calls: list[ToolCallRequest] = field(default_factory=list)
finish_reason: str = "stop" finish_reason: str = "stop"
usage: dict[str, int] = field(default_factory=dict) usage: dict[str, int] = field(default_factory=dict)
reasoning_content: str | None = None # Kimi, DeepSeek-R1 etc.
@property @property
def has_tool_calls(self) -> bool: def has_tool_calls(self) -> bool:

View File

@ -1,5 +1,6 @@
"""LiteLLM provider implementation for multi-provider support.""" """LiteLLM provider implementation for multi-provider support."""
import json
import os import os
from typing import Any from typing import Any
@ -7,6 +8,7 @@ import litellm
from litellm import acompletion from litellm import acompletion
from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
from nanobot.providers.registry import find_by_model, find_gateway
class LiteLLMProvider(LLMProvider): class LiteLLMProvider(LLMProvider):
@ -14,7 +16,8 @@ class LiteLLMProvider(LLMProvider):
LLM provider using LiteLLM for multi-provider support. LLM provider using LiteLLM for multi-provider support.
Supports OpenRouter, Anthropic, OpenAI, Gemini, MiniMax, and many other providers through Supports OpenRouter, Anthropic, OpenAI, Gemini, MiniMax, and many other providers through
a unified interface. a unified interface. Provider-specific logic is driven by the registry
(see providers/registry.py) no if-elif chains needed here.
""" """
def __init__( def __init__(
@ -23,61 +26,78 @@ class LiteLLMProvider(LLMProvider):
api_base: str | None = None, api_base: str | None = None,
default_model: str = "anthropic/claude-opus-4-5", default_model: str = "anthropic/claude-opus-4-5",
extra_headers: dict[str, str] | None = None, extra_headers: dict[str, str] | None = None,
provider_name: str | None = None,
): ):
super().__init__(api_key, api_base) super().__init__(api_key, api_base)
self.default_model = default_model self.default_model = default_model
self.extra_headers = extra_headers or {} self.extra_headers = extra_headers or {}
# Detect OpenRouter by api_key prefix or explicit api_base # Detect gateway / local deployment.
self.is_openrouter = ( # provider_name (from config key) is the primary signal;
(api_key and api_key.startswith("sk-or-")) or # api_key / api_base are fallback for auto-detection.
(api_base and "openrouter" in api_base) self._gateway = find_gateway(provider_name, api_key, api_base)
)
# Detect AiHubMix by api_base # Configure environment variables
self.is_aihubmix = bool(api_base and "aihubmix" in api_base)
# Track if using custom endpoint (vLLM, etc.)
self.is_vllm = bool(api_base) and not self.is_openrouter and not self.is_aihubmix
# Configure LiteLLM based on provider
if api_key: if api_key:
if self.is_openrouter: self._setup_env(api_key, api_base, default_model)
# OpenRouter mode - set key
os.environ["OPENROUTER_API_KEY"] = api_key
elif self.is_aihubmix:
# AiHubMix gateway - OpenAI-compatible
os.environ["OPENAI_API_KEY"] = api_key
elif self.is_vllm:
# vLLM/custom endpoint - uses OpenAI-compatible API
os.environ["HOSTED_VLLM_API_KEY"] = api_key
elif "deepseek" in default_model:
os.environ.setdefault("DEEPSEEK_API_KEY", api_key)
elif "anthropic" in default_model:
os.environ.setdefault("ANTHROPIC_API_KEY", api_key)
elif "openai" in default_model or "gpt" in default_model:
os.environ.setdefault("OPENAI_API_KEY", api_key)
elif "gemini" in default_model.lower():
os.environ.setdefault("GEMINI_API_KEY", api_key)
elif "zhipu" in default_model or "glm" in default_model or "zai" in default_model:
os.environ.setdefault("ZAI_API_KEY", api_key)
os.environ.setdefault("ZHIPUAI_API_KEY", api_key)
elif "dashscope" in default_model or "qwen" in default_model.lower():
os.environ.setdefault("DASHSCOPE_API_KEY", api_key)
elif "groq" in default_model:
os.environ.setdefault("GROQ_API_KEY", api_key)
elif "moonshot" in default_model or "kimi" in default_model:
os.environ.setdefault("MOONSHOT_API_KEY", api_key)
os.environ.setdefault("MOONSHOT_API_BASE", api_base or "https://api.moonshot.cn/v1")
elif "minimax" in default_model.lower():
os.environ.setdefault("MINIMAX_API_KEY", api_key)
os.environ.setdefault("MINIMAX_API_BASE", api_base or "https://api.minimax.io/v1")
if api_base: if api_base:
litellm.api_base = api_base litellm.api_base = api_base
# Disable LiteLLM logging noise # Disable LiteLLM logging noise
litellm.suppress_debug_info = True litellm.suppress_debug_info = True
# Drop unsupported parameters for providers (e.g., gpt-5 rejects some params)
litellm.drop_params = True
def _setup_env(self, api_key: str, api_base: str | None, model: str) -> None:
"""Set environment variables based on detected provider."""
spec = self._gateway or find_by_model(model)
if not spec:
return
# Gateway/local overrides existing env; standard provider doesn't
if self._gateway:
os.environ[spec.env_key] = api_key
else:
os.environ.setdefault(spec.env_key, api_key)
# Resolve env_extras placeholders:
# {api_key} → user's API key
# {api_base} → user's api_base, falling back to spec.default_api_base
effective_base = api_base or spec.default_api_base
for env_name, env_val in spec.env_extras:
resolved = env_val.replace("{api_key}", api_key)
resolved = resolved.replace("{api_base}", effective_base)
os.environ.setdefault(env_name, resolved)
def _resolve_model(self, model: str) -> str:
"""Resolve model name by applying provider/gateway prefixes."""
if self._gateway:
# Gateway mode: apply gateway prefix, skip provider-specific prefixes
prefix = self._gateway.litellm_prefix
if self._gateway.strip_model_prefix:
model = model.split("/")[-1]
if prefix and not model.startswith(f"{prefix}/"):
model = f"{prefix}/{model}"
return model
# Standard mode: auto-prefix for known providers
spec = find_by_model(model)
if spec and spec.litellm_prefix:
if not any(model.startswith(s) for s in spec.skip_prefixes):
model = f"{spec.litellm_prefix}/{model}"
return model
def _apply_model_overrides(self, model: str, kwargs: dict[str, Any]) -> None:
"""Apply model-specific parameter overrides from the registry."""
model_lower = model.lower()
spec = find_by_model(model)
if spec:
for pattern, overrides in spec.model_overrides:
if pattern in model_lower:
kwargs.update(overrides)
return
async def chat( async def chat(
self, self,
@ -100,34 +120,7 @@ class LiteLLMProvider(LLMProvider):
Returns: Returns:
LLMResponse with content and/or tool calls. LLMResponse with content and/or tool calls.
""" """
model = model or self.default_model model = self._resolve_model(model or self.default_model)
# Auto-prefix model names for known providers
# (keywords, target_prefix, skip_if_starts_with)
_prefix_rules = [
(("glm", "zhipu"), "zai", ("zhipu/", "zai/", "openrouter/", "hosted_vllm/")),
(("qwen", "dashscope"), "dashscope", ("dashscope/", "openrouter/")),
(("moonshot", "kimi"), "moonshot", ("moonshot/", "openrouter/")),
(("minimax",), "minimax", ("minimax/", "openrouter/")),
(("gemini",), "gemini", ("gemini/",)),
]
model_lower = model.lower()
for keywords, prefix, skip in _prefix_rules:
if any(kw in model_lower for kw in keywords) and not any(model.startswith(s) for s in skip):
model = f"{prefix}/{model}"
break
# Gateway/endpoint-specific prefixes (detected by api_base/api_key, not model name)
if self.is_openrouter and not model.startswith("openrouter/"):
model = f"openrouter/{model}"
elif self.is_aihubmix:
model = f"openai/{model.split('/')[-1]}"
elif self.is_vllm:
model = f"hosted_vllm/{model}"
# kimi-k2.5 only supports temperature=1.0
if "kimi-k2.5" in model.lower():
temperature = 1.0
kwargs: dict[str, Any] = { kwargs: dict[str, Any] = {
"model": model, "model": model,
@ -136,7 +129,14 @@ class LiteLLMProvider(LLMProvider):
"temperature": temperature, "temperature": temperature,
} }
# Pass api_base directly for custom endpoints (vLLM, etc.) # Apply model-specific overrides (e.g. kimi-k2.5 temperature)
self._apply_model_overrides(model, kwargs)
# Pass api_key directly — more reliable than env vars alone
if self.api_key:
kwargs["api_key"] = self.api_key
# Pass api_base for custom endpoints
if self.api_base: if self.api_base:
kwargs["api_base"] = self.api_base kwargs["api_base"] = self.api_base
@ -169,7 +169,6 @@ class LiteLLMProvider(LLMProvider):
# Parse arguments from JSON string if needed # Parse arguments from JSON string if needed
args = tc.function.arguments args = tc.function.arguments
if isinstance(args, str): if isinstance(args, str):
import json
try: try:
args = json.loads(args) args = json.loads(args)
except json.JSONDecodeError: except json.JSONDecodeError:
@ -189,11 +188,14 @@ class LiteLLMProvider(LLMProvider):
"total_tokens": response.usage.total_tokens, "total_tokens": response.usage.total_tokens,
} }
reasoning_content = getattr(message, "reasoning_content", None)
return LLMResponse( return LLMResponse(
content=message.content, content=message.content,
tool_calls=tool_calls, tool_calls=tool_calls,
finish_reason=choice.finish_reason or "stop", finish_reason=choice.finish_reason or "stop",
usage=usage, usage=usage,
reasoning_content=reasoning_content,
) )
def get_default_model(self) -> str: def get_default_model(self) -> str:

View File

@ -0,0 +1,359 @@
"""
Provider Registry single source of truth for LLM provider metadata.
Adding a new provider:
1. Add a ProviderSpec to PROVIDERS below.
2. Add a field to ProvidersConfig in config/schema.py.
Done. Env vars, prefixing, config matching, status display all derive from here.
Order matters it controls match priority and fallback. Gateways first.
Every entry writes out all fields so you can copy-paste as a template.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ProviderSpec:
"""One LLM provider's metadata. See PROVIDERS below for real examples.
Placeholders in env_extras values:
{api_key} the user's API key
{api_base} api_base from config, or this spec's default_api_base
"""
# identity
name: str # config field name, e.g. "dashscope"
keywords: tuple[str, ...] # model-name keywords for matching (lowercase)
env_key: str # LiteLLM env var, e.g. "DASHSCOPE_API_KEY"
display_name: str = "" # shown in `nanobot status`
# model prefixing
litellm_prefix: str = "" # "dashscope" → model becomes "dashscope/{model}"
skip_prefixes: tuple[str, ...] = () # don't prefix if model already starts with these
# extra env vars, e.g. (("ZHIPUAI_API_KEY", "{api_key}"),)
env_extras: tuple[tuple[str, str], ...] = ()
# gateway / local detection
is_gateway: bool = False # routes any model (OpenRouter, AiHubMix)
is_local: bool = False # local deployment (vLLM, Ollama)
detect_by_key_prefix: str = "" # match api_key prefix, e.g. "sk-or-"
detect_by_base_keyword: str = "" # match substring in api_base URL
default_api_base: str = "" # fallback base URL
# gateway behavior
strip_model_prefix: bool = False # strip "provider/" before re-prefixing
# per-model param overrides, e.g. (("kimi-k2.5", {"temperature": 1.0}),)
model_overrides: tuple[tuple[str, dict[str, Any]], ...] = ()
@property
def label(self) -> str:
return self.display_name or self.name.title()
# ---------------------------------------------------------------------------
# PROVIDERS — the registry. Order = priority. Copy any entry as template.
# ---------------------------------------------------------------------------
PROVIDERS: tuple[ProviderSpec, ...] = (
# === Gateways (detected by api_key / api_base, not model name) =========
# Gateways can route any model, so they win in fallback.
# OpenRouter: global gateway, keys start with "sk-or-"
ProviderSpec(
name="openrouter",
keywords=("openrouter",),
env_key="OPENROUTER_API_KEY",
display_name="OpenRouter",
litellm_prefix="openrouter", # claude-3 → openrouter/claude-3
skip_prefixes=(),
env_extras=(),
is_gateway=True,
is_local=False,
detect_by_key_prefix="sk-or-",
detect_by_base_keyword="openrouter",
default_api_base="https://openrouter.ai/api/v1",
strip_model_prefix=False,
model_overrides=(),
),
# AiHubMix: global gateway, OpenAI-compatible interface.
# strip_model_prefix=True: it doesn't understand "anthropic/claude-3",
# so we strip to bare "claude-3" then re-prefix as "openai/claude-3".
ProviderSpec(
name="aihubmix",
keywords=("aihubmix",),
env_key="OPENAI_API_KEY", # OpenAI-compatible
display_name="AiHubMix",
litellm_prefix="openai", # → openai/{model}
skip_prefixes=(),
env_extras=(),
is_gateway=True,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="aihubmix",
default_api_base="https://aihubmix.com/v1",
strip_model_prefix=True, # anthropic/claude-3 → claude-3 → openai/claude-3
model_overrides=(),
),
# === Standard providers (matched by model-name keywords) ===============
# Anthropic: LiteLLM recognizes "claude-*" natively, no prefix needed.
ProviderSpec(
name="anthropic",
keywords=("anthropic", "claude"),
env_key="ANTHROPIC_API_KEY",
display_name="Anthropic",
litellm_prefix="",
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# OpenAI: LiteLLM recognizes "gpt-*" natively, no prefix needed.
ProviderSpec(
name="openai",
keywords=("openai", "gpt"),
env_key="OPENAI_API_KEY",
display_name="OpenAI",
litellm_prefix="",
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# DeepSeek: needs "deepseek/" prefix for LiteLLM routing.
ProviderSpec(
name="deepseek",
keywords=("deepseek",),
env_key="DEEPSEEK_API_KEY",
display_name="DeepSeek",
litellm_prefix="deepseek", # deepseek-chat → deepseek/deepseek-chat
skip_prefixes=("deepseek/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Gemini: needs "gemini/" prefix for LiteLLM.
ProviderSpec(
name="gemini",
keywords=("gemini",),
env_key="GEMINI_API_KEY",
display_name="Gemini",
litellm_prefix="gemini", # gemini-pro → gemini/gemini-pro
skip_prefixes=("gemini/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Zhipu: LiteLLM uses "zai/" prefix.
# Also mirrors key to ZHIPUAI_API_KEY (some LiteLLM paths check that).
# skip_prefixes: don't add "zai/" when already routed via gateway.
ProviderSpec(
name="zhipu",
keywords=("zhipu", "glm", "zai"),
env_key="ZAI_API_KEY",
display_name="Zhipu AI",
litellm_prefix="zai", # glm-4 → zai/glm-4
skip_prefixes=("zhipu/", "zai/", "openrouter/", "hosted_vllm/"),
env_extras=(
("ZHIPUAI_API_KEY", "{api_key}"),
),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# DashScope: Qwen models, needs "dashscope/" prefix.
ProviderSpec(
name="dashscope",
keywords=("qwen", "dashscope"),
env_key="DASHSCOPE_API_KEY",
display_name="DashScope",
litellm_prefix="dashscope", # qwen-max → dashscope/qwen-max
skip_prefixes=("dashscope/", "openrouter/"),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Moonshot: Kimi models, needs "moonshot/" prefix.
# LiteLLM requires MOONSHOT_API_BASE env var to find the endpoint.
# Kimi K2.5 API enforces temperature >= 1.0.
ProviderSpec(
name="moonshot",
keywords=("moonshot", "kimi"),
env_key="MOONSHOT_API_KEY",
display_name="Moonshot",
litellm_prefix="moonshot", # kimi-k2.5 → moonshot/kimi-k2.5
skip_prefixes=("moonshot/", "openrouter/"),
env_extras=(
("MOONSHOT_API_BASE", "{api_base}"),
),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="https://api.moonshot.ai/v1", # intl; use api.moonshot.cn for China
strip_model_prefix=False,
model_overrides=(
("kimi-k2.5", {"temperature": 1.0}),
),
),
# MiniMax: needs "minimax/" prefix for LiteLLM routing.
# Uses OpenAI-compatible API at api.minimax.io/v1.
ProviderSpec(
name="minimax",
keywords=("minimax",),
env_key="MINIMAX_API_KEY",
display_name="MiniMax",
litellm_prefix="minimax", # MiniMax-M2.1 → minimax/MiniMax-M2.1
skip_prefixes=("minimax/", "openrouter/"),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="https://api.minimax.io/v1",
strip_model_prefix=False,
model_overrides=(),
),
# === Local deployment (matched by config key, NOT by api_base) =========
# vLLM / any OpenAI-compatible local server.
# Detected when config key is "vllm" (provider_name="vllm").
ProviderSpec(
name="vllm",
keywords=("vllm",),
env_key="HOSTED_VLLM_API_KEY",
display_name="vLLM/Local",
litellm_prefix="hosted_vllm", # Llama-3-8B → hosted_vllm/Llama-3-8B
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=True,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="", # user must provide in config
strip_model_prefix=False,
model_overrides=(),
),
# === Auxiliary (not a primary LLM provider) ============================
# Groq: mainly used for Whisper voice transcription, also usable for LLM.
# Needs "groq/" prefix for LiteLLM routing. Placed last — it rarely wins fallback.
ProviderSpec(
name="groq",
keywords=("groq",),
env_key="GROQ_API_KEY",
display_name="Groq",
litellm_prefix="groq", # llama3-8b-8192 → groq/llama3-8b-8192
skip_prefixes=("groq/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
)
# ---------------------------------------------------------------------------
# Lookup helpers
# ---------------------------------------------------------------------------
def find_by_model(model: str) -> ProviderSpec | None:
"""Match a standard provider by model-name keyword (case-insensitive).
Skips gateways/local those are matched by api_key/api_base instead."""
model_lower = model.lower()
for spec in PROVIDERS:
if spec.is_gateway or spec.is_local:
continue
if any(kw in model_lower for kw in spec.keywords):
return spec
return None
def find_gateway(
provider_name: str | None = None,
api_key: str | None = None,
api_base: str | None = None,
) -> ProviderSpec | None:
"""Detect gateway/local provider.
Priority:
1. provider_name if it maps to a gateway/local spec, use it directly.
2. api_key prefix e.g. "sk-or-" OpenRouter.
3. api_base keyword e.g. "aihubmix" in URL AiHubMix.
A standard provider with a custom api_base (e.g. DeepSeek behind a proxy)
will NOT be mistaken for vLLM the old fallback is gone.
"""
# 1. Direct match by config key
if provider_name:
spec = find_by_name(provider_name)
if spec and (spec.is_gateway or spec.is_local):
return spec
# 2. Auto-detect by api_key prefix / api_base keyword
for spec in PROVIDERS:
if spec.detect_by_key_prefix and api_key and api_key.startswith(spec.detect_by_key_prefix):
return spec
if spec.detect_by_base_keyword and api_base and spec.detect_by_base_keyword in api_base:
return spec
return None
def find_by_name(name: str) -> ProviderSpec | None:
"""Find a provider spec by config field name, e.g. "dashscope"."""
for spec in PROVIDERS:
if spec.name == name:
return spec
return None

View File

@ -1,6 +1,6 @@
[project] [project]
name = "nanobot-ai" name = "nanobot-ai"
version = "0.1.3.post4" version = "0.1.3.post6"
description = "A lightweight personal AI assistant framework" description = "A lightweight personal AI assistant framework"
requires-python = ">=3.11" requires-python = ">=3.11"
license = {text = "MIT"} license = {text = "MIT"}
@ -23,13 +23,20 @@ dependencies = [
"pydantic-settings>=2.0.0", "pydantic-settings>=2.0.0",
"websockets>=12.0", "websockets>=12.0",
"websocket-client>=1.6.0", "websocket-client>=1.6.0",
"httpx>=0.25.0", "httpx[socks]>=0.25.0",
"loguru>=0.7.0", "loguru>=0.7.0",
"readability-lxml>=0.8.0", "readability-lxml>=0.8.0",
"rich>=13.0.0", "rich>=13.0.0",
"croniter>=2.0.0", "croniter>=2.0.0",
"python-telegram-bot>=21.0", "dingtalk-stream>=0.4.0",
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0", "lark-oapi>=1.0.0",
"socksio>=1.0.0",
"python-socketio>=5.11.0",
"msgpack>=1.0.8",
"slack-sdk>=3.26.0",
"qq-botpy>=1.0.0",
"python-socks[asyncio]>=2.4.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

311
tests/test_email_channel.py Normal file
View File

@ -0,0 +1,311 @@
from email.message import EmailMessage
from datetime import date
import pytest
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.email import EmailChannel
from nanobot.config.schema import EmailConfig
def _make_config() -> EmailConfig:
return EmailConfig(
enabled=True,
consent_granted=True,
imap_host="imap.example.com",
imap_port=993,
imap_username="bot@example.com",
imap_password="secret",
smtp_host="smtp.example.com",
smtp_port=587,
smtp_username="bot@example.com",
smtp_password="secret",
mark_seen=True,
)
def _make_raw_email(
from_addr: str = "alice@example.com",
subject: str = "Hello",
body: str = "This is the body.",
) -> bytes:
msg = EmailMessage()
msg["From"] = from_addr
msg["To"] = "bot@example.com"
msg["Subject"] = subject
msg["Message-ID"] = "<m1@example.com>"
msg.set_content(body)
return msg.as_bytes()
def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
raw = _make_raw_email(subject="Invoice", body="Please pay")
class FakeIMAP:
def __init__(self) -> None:
self.store_calls: list[tuple[bytes, str, str]] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def search(self, *_args):
return "OK", [b"1"]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(_make_config(), MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["sender"] == "alice@example.com"
assert items[0]["subject"] == "Invoice"
assert "Please pay" in items[0]["content"]
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
# Same UID should be deduped in-process.
items_again = channel._fetch_new_messages()
assert items_again == []
def test_extract_text_body_falls_back_to_html() -> None:
msg = EmailMessage()
msg["From"] = "alice@example.com"
msg["To"] = "bot@example.com"
msg["Subject"] = "HTML only"
msg.add_alternative("<p>Hello<br>world</p>", subtype="html")
text = EmailChannel._extract_text_body(msg)
assert "Hello" in text
assert "world" in text
@pytest.mark.asyncio
async def test_start_returns_immediately_without_consent(monkeypatch) -> None:
cfg = _make_config()
cfg.consent_granted = False
channel = EmailChannel(cfg, MessageBus())
called = {"fetch": False}
def _fake_fetch():
called["fetch"] = True
return []
monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
await channel.start()
assert channel.is_running is False
assert called["fetch"] is False
@pytest.mark.asyncio
async def test_send_uses_smtp_and_reply_subject(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.timeout = timeout
self.started_tls = False
self.logged_in = False
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
self.started_tls = True
def login(self, _user: str, _pw: str):
self.logged_in = True
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
fake_instances: list[FakeSMTP] = []
def _smtp_factory(host: str, port: int, timeout: int = 30):
instance = FakeSMTP(host, port, timeout=timeout)
fake_instances.append(instance)
return instance
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
channel = EmailChannel(_make_config(), MessageBus())
channel._last_subject_by_chat["alice@example.com"] = "Invoice #42"
channel._last_message_id_by_chat["alice@example.com"] = "<m1@example.com>"
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Acknowledged.",
)
)
assert len(fake_instances) == 1
smtp = fake_instances[0]
assert smtp.started_tls is True
assert smtp.logged_in is True
assert len(smtp.sent_messages) == 1
sent = smtp.sent_messages[0]
assert sent["Subject"] == "Re: Invoice #42"
assert sent["To"] == "alice@example.com"
assert sent["In-Reply-To"] == "<m1@example.com>"
@pytest.mark.asyncio
async def test_send_skips_when_auto_reply_disabled(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
return None
def login(self, _user: str, _pw: str):
return None
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
fake_instances: list[FakeSMTP] = []
def _smtp_factory(host: str, port: int, timeout: int = 30):
instance = FakeSMTP(host, port, timeout=timeout)
fake_instances.append(instance)
return instance
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
cfg = _make_config()
cfg.auto_reply_enabled = False
channel = EmailChannel(cfg, MessageBus())
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Should not send.",
)
)
assert fake_instances == []
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Force send.",
metadata={"force_send": True},
)
)
assert len(fake_instances) == 1
assert len(fake_instances[0].sent_messages) == 1
@pytest.mark.asyncio
async def test_send_skips_when_consent_not_granted(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
return None
def login(self, _user: str, _pw: str):
return None
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
called = {"smtp": False}
def _smtp_factory(host: str, port: int, timeout: int = 30):
called["smtp"] = True
return FakeSMTP(host, port, timeout=timeout)
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
cfg = _make_config()
cfg.consent_granted = False
channel = EmailChannel(cfg, MessageBus())
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Should not send.",
metadata={"force_send": True},
)
)
assert called["smtp"] is False
def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(monkeypatch) -> None:
raw = _make_raw_email(subject="Status", body="Yesterday update")
class FakeIMAP:
def __init__(self) -> None:
self.search_args = None
self.store_calls: list[tuple[bytes, str, str]] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def search(self, *_args):
self.search_args = _args
return "OK", [b"5"]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"5 (UID 999 BODY[] {200})", raw), b")"]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(_make_config(), MessageBus())
items = channel.fetch_messages_between_dates(
start_date=date(2026, 2, 6),
end_date=date(2026, 2, 7),
limit=10,
)
assert len(items) == 1
assert items[0]["subject"] == "Status"
# search(None, "SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
assert fake.search_args is not None
assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
assert fake.store_calls == []