resolve merge conflicts: keep all channels and add slack

This commit is contained in:
Re-bin 2026-02-09 11:17:07 +00:00
commit 74e3c411a1
29 changed files with 3346 additions and 257 deletions

7
.gitignore vendored
View File

@ -12,4 +12,9 @@ docs/
*.pyw *.pyw
*.pyz *.pyz
*.pywz *.pywz
*.pyzz *.pyzz
.venv/
__pycache__/
poetry.lock
.pytest_cache/
tests/

312
README.md
View File

@ -16,19 +16,27 @@
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines. ⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
📏 Real-time line count: **3,479 lines** (run `bash core_agent_lines.sh` to verify anytime)
## 📢 News ## 📢 News
- **2026-02-01** 🎉 nanobot launched! Welcome to try 🐈 nanobot! - **2026-02-08** 🔧 Refactored Providers—adding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
- **2026-02-07** 🚀 Released v0.1.3.post5 with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details.
- **2026-02-06** ✨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening!
- **2026-02-05** ✨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support!
- **2026-02-04** 🚀 Released v0.1.3.post4 with multi-provider & Docker support! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post4) for details.
- **2026-02-03** ⚡ Integrated vLLM for local LLM support and improved natural language task scheduling!
- **2026-02-02** 🎉 nanobot officially launched! Welcome to try 🐈 nanobot!
## Key Features of nanobot: ## Key Features of nanobot:
🪶 **Ultra-Lightweight**: Just ~4,000 lines of code — 99% smaller than Clawdbot - core functionality. 🪶 **Ultra-Lightweight**: Just ~4,000 lines of core agent code — 99% smaller than Clawdbot.
🔬 **Research-Ready**: Clean, readable code that's easy to understand, modify, and extend for research. 🔬 **Research-Ready**: Clean, readable code that's easy to understand, modify, and extend for research.
⚡️ **Lightning Fast**: Minimal footprint means faster startup, lower resource usage, and quicker iterations. ⚡️ **Lightning Fast**: Minimal footprint means faster startup, lower resource usage, and quicker iterations.
💎 **Easy-to-Use**: One-click to depoly and you're ready to go. 💎 **Easy-to-Use**: One-click to deploy and you're ready to go.
## 🏗️ Architecture ## 🏗️ Architecture
@ -85,8 +93,7 @@ pip install nanobot-ai
> [!TIP] > [!TIP]
> Set your API key in `~/.nanobot/config.json`. > Set your API key in `~/.nanobot/config.json`.
> Get API keys: [OpenRouter](https://openrouter.ai/keys) (LLM) · [Brave Search](https://brave.com/search/api/) (optional, for web search) > Get API keys: [OpenRouter](https://openrouter.ai/keys) (Global) · [DashScope](https://dashscope.console.aliyun.com) (Qwen) · [Brave Search](https://brave.com/search/api/) (optional, for web search)
> You can also change the model to `minimax/minimax-m2` for lower cost.
**1. Initialize** **1. Initialize**
@ -96,6 +103,7 @@ nanobot onboard
**2. Configure** (`~/.nanobot/config.json`) **2. Configure** (`~/.nanobot/config.json`)
For OpenRouter - recommended for global users:
```json ```json
{ {
"providers": { "providers": {
@ -107,18 +115,10 @@ nanobot onboard
"defaults": { "defaults": {
"model": "anthropic/claude-opus-4-5" "model": "anthropic/claude-opus-4-5"
} }
},
"tools": {
"web": {
"search": {
"apiKey": "BSA-xxx"
}
}
} }
} }
``` ```
**3. Chat** **3. Chat**
```bash ```bash
@ -166,12 +166,16 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps ## 💬 Chat Apps
Talk to your nanobot through Telegram or WhatsApp — anytime, anywhere. Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or Email — anytime, anywhere.
| Channel | Setup | | Channel | Setup |
|---------|-------| |---------|-------|
| **Telegram** | Easy (just a token) | | **Telegram** | Easy (just a token) |
| **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) | | **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) |
| **DingTalk** | Medium (app credentials) |
| **Email** | Medium (IMAP/SMTP credentials) |
<details> <details>
<summary><b>Telegram</b> (Recommended)</summary> <summary><b>Telegram</b> (Recommended)</summary>
@ -205,6 +209,50 @@ nanobot gateway
</details> </details>
<details>
<summary><b>Discord</b></summary>
**1. Create a bot**
- Go to https://discord.com/developers/applications
- Create an application → Bot → Add Bot
- Copy the bot token
**2. Enable intents**
- In the Bot settings, enable **MESSAGE CONTENT INTENT**
- (Optional) Enable **SERVER MEMBERS INTENT** if you plan to use allow lists based on member data
**3. Get your User ID**
- Discord Settings → Advanced → enable **Developer Mode**
- Right-click your avatar → **Copy User ID**
**4. Configure**
```json
{
"channels": {
"discord": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"allowFrom": ["YOUR_USER_ID"]
}
}
}
```
**5. Invite the bot**
- OAuth2 → URL Generator
- Scopes: `bot`
- Bot Permissions: `Send Messages`, `Read Message History`
- Open the generated invite URL and add the bot to your server
**6. Run**
```bash
nanobot gateway
```
</details>
<details> <details>
<summary><b>WhatsApp</b></summary> <summary><b>WhatsApp</b></summary>
@ -242,64 +290,218 @@ nanobot gateway
</details> </details>
<details>
<summary><b>Feishu (飞书)</b></summary>
Uses **WebSocket** long connection — no public IP required.
**1. Create a Feishu bot**
- Visit [Feishu Open Platform](https://open.feishu.cn/app)
- Create a new app → Enable **Bot** capability
- **Permissions**: Add `im:message` (send messages)
- **Events**: Add `im.message.receive_v1` (receive messages)
- Select **Long Connection** mode (requires running nanobot first to establish connection)
- Get **App ID** and **App Secret** from "Credentials & Basic Info"
- Publish the app
**2. Configure**
```json
{
"channels": {
"feishu": {
"enabled": true,
"appId": "cli_xxx",
"appSecret": "xxx",
"encryptKey": "",
"verificationToken": "",
"allowFrom": []
}
}
}
```
> `encryptKey` and `verificationToken` are optional for Long Connection mode.
> `allowFrom`: Leave empty to allow all users, or add `["ou_xxx"]` to restrict access.
**3. Run**
```bash
nanobot gateway
```
> [!TIP]
> Feishu uses WebSocket to receive messages — no webhook or public IP needed!
</details>
<details>
<summary><b>DingTalk (钉钉)</b></summary>
Uses **Stream Mode** — no public IP required.
**1. Create a DingTalk bot**
- Visit [DingTalk Open Platform](https://open-dev.dingtalk.com/)
- Create a new app -> Add **Robot** capability
- **Configuration**:
- Toggle **Stream Mode** ON
- **Permissions**: Add necessary permissions for sending messages
- Get **AppKey** (Client ID) and **AppSecret** (Client Secret) from "Credentials"
- Publish the app
**2. Configure**
```json
{
"channels": {
"dingtalk": {
"enabled": true,
"clientId": "YOUR_APP_KEY",
"clientSecret": "YOUR_APP_SECRET",
"allowFrom": []
}
}
}
```
> `allowFrom`: Leave empty to allow all users, or add `["staffId"]` to restrict access.
**3. Run**
```bash
nanobot gateway
```
</details>
<details>
<summary><b>Email</b></summary>
Uses **IMAP** polling for inbound + **SMTP** for outbound. Requires explicit consent before accessing mailbox data.
**1. Get credentials (Gmail example)**
- Enable 2-Step Verification in Google account security
- Create an [App Password](https://myaccount.google.com/apppasswords)
- Use this app password for both IMAP and SMTP
**2. Configure**
> [!TIP]
> Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
```json
{
"channels": {
"email": {
"enabled": true,
"consentGranted": true,
"imapHost": "imap.gmail.com",
"imapPort": 993,
"imapUsername": "you@gmail.com",
"imapPassword": "your-app-password",
"imapUseSsl": true,
"smtpHost": "smtp.gmail.com",
"smtpPort": 587,
"smtpUsername": "you@gmail.com",
"smtpPassword": "your-app-password",
"smtpUseTls": true,
"fromAddress": "you@gmail.com",
"allowFrom": ["trusted@example.com"]
}
}
}
```
> `consentGranted`: Must be `true` to allow mailbox access. Set to `false` to disable reading and sending entirely.
> `allowFrom`: Leave empty to accept emails from anyone, or restrict to specific sender addresses.
**3. Run**
```bash
nanobot gateway
```
</details>
## ⚙️ Configuration ## ⚙️ Configuration
Config file: `~/.nanobot/config.json` Config file: `~/.nanobot/config.json`
### Providers ### Providers
> [!NOTE] > [!TIP]
> Groq provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed. > - **Groq** provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed.
> - **Zhipu Coding Plan**: If you're on Zhipu's coding plan, set `"apiBase": "https://open.bigmodel.cn/api/coding/paas/v4"` in your zhipu provider config.
| Provider | Purpose | Get API Key | | Provider | Purpose | Get API Key |
|----------|---------|-------------| |----------|---------|-------------|
| `openrouter` | LLM (recommended, access to all models) | [openrouter.ai](https://openrouter.ai) | | `openrouter` | LLM (recommended, access to all models) | [openrouter.ai](https://openrouter.ai) |
| `anthropic` | LLM (Claude direct) | [console.anthropic.com](https://console.anthropic.com) | | `anthropic` | LLM (Claude direct) | [console.anthropic.com](https://console.anthropic.com) |
| `openai` | LLM (GPT direct) | [platform.openai.com](https://platform.openai.com) | | `openai` | LLM (GPT direct) | [platform.openai.com](https://platform.openai.com) |
| `deepseek` | LLM (DeepSeek direct) | [platform.deepseek.com](https://platform.deepseek.com) |
| `groq` | LLM + **Voice transcription** (Whisper) | [console.groq.com](https://console.groq.com) | | `groq` | LLM + **Voice transcription** (Whisper) | [console.groq.com](https://console.groq.com) |
| `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) | | `gemini` | LLM (Gemini direct) | [aistudio.google.com](https://aistudio.google.com) |
| `aihubmix` | LLM (API gateway, access to all models) | [aihubmix.com](https://aihubmix.com) |
| `dashscope` | LLM (Qwen) | [dashscope.console.aliyun.com](https://dashscope.console.aliyun.com) |
| `moonshot` | LLM (Moonshot/Kimi) | [platform.moonshot.cn](https://platform.moonshot.cn) |
| `zhipu` | LLM (Zhipu GLM) | [open.bigmodel.cn](https://open.bigmodel.cn) |
| `vllm` | LLM (local, any OpenAI-compatible server) | — |
<details> <details>
<summary><b>Full config example</b></summary> <summary><b>Adding a New Provider (Developer Guide)</b></summary>
```json nanobot uses a **Provider Registry** (`nanobot/providers/registry.py`) as the single source of truth.
{ Adding a new provider only takes **2 steps** — no if-elif chains to touch.
"agents": {
"defaults": { **Step 1.** Add a `ProviderSpec` entry to `PROVIDERS` in `nanobot/providers/registry.py`:
"model": "anthropic/claude-opus-4-5"
} ```python
}, ProviderSpec(
"providers": { name="myprovider", # config field name
"openrouter": { keywords=("myprovider", "mymodel"), # model-name keywords for auto-matching
"apiKey": "sk-or-v1-xxx" env_key="MYPROVIDER_API_KEY", # env var for LiteLLM
}, display_name="My Provider", # shown in `nanobot status`
"groq": { litellm_prefix="myprovider", # auto-prefix: model → myprovider/model
"apiKey": "gsk_xxx" skip_prefixes=("myprovider/",), # don't double-prefix
} )
},
"channels": {
"telegram": {
"enabled": true,
"token": "123456:ABC...",
"allowFrom": ["123456789"]
},
"whatsapp": {
"enabled": false
}
},
"tools": {
"web": {
"search": {
"apiKey": "BSA..."
}
}
}
}
``` ```
**Step 2.** Add a field to `ProvidersConfig` in `nanobot/config/schema.py`:
```python
class ProvidersConfig(BaseModel):
...
myprovider: ProviderConfig = ProviderConfig()
```
That's it! Environment variables, model prefixing, config matching, and `nanobot status` display will all work automatically.
**Common `ProviderSpec` options:**
| Field | Description | Example |
|-------|-------------|---------|
| `litellm_prefix` | Auto-prefix model names for LiteLLM | `"dashscope"``dashscope/qwen-max` |
| `skip_prefixes` | Don't prefix if model already starts with these | `("dashscope/", "openrouter/")` |
| `env_extras` | Additional env vars to set | `(("ZHIPUAI_API_KEY", "{api_key}"),)` |
| `model_overrides` | Per-model parameter overrides | `(("kimi-k2.5", {"temperature": 1.0}),)` |
| `is_gateway` | Can route any model (like OpenRouter) | `True` |
| `detect_by_key_prefix` | Detect gateway by API key prefix | `"sk-or-"` |
| `detect_by_base_keyword` | Detect gateway by API base URL | `"openrouter"` |
| `strip_model_prefix` | Strip existing prefix before re-prefixing | `True` (for AiHubMix) |
</details> </details>
### Security
> For production deployments, set `"restrictToWorkspace": true` in your config to sandbox the agent.
| Option | Default | Description |
|--------|---------|-------------|
| `tools.restrictToWorkspace` | `false` | When `true`, restricts **all** agent tools (shell, file read/write/edit, list) to the workspace directory. Prevents path traversal and out-of-scope access. |
| `channels.*.allowFrom` | `[]` (allow all) | Whitelist of user IDs. Empty = allow everyone; non-empty = only listed users can interact. |
## CLI Reference ## CLI Reference
| Command | Description | | Command | Description |
@ -307,11 +509,15 @@ Config file: `~/.nanobot/config.json`
| `nanobot onboard` | Initialize config & workspace | | `nanobot onboard` | Initialize config & workspace |
| `nanobot agent -m "..."` | Chat with the agent | | `nanobot agent -m "..."` | Chat with the agent |
| `nanobot agent` | Interactive chat mode | | `nanobot agent` | Interactive chat mode |
| `nanobot agent --no-markdown` | Show plain-text replies |
| `nanobot agent --logs` | Show runtime logs during chat |
| `nanobot gateway` | Start the gateway | | `nanobot gateway` | Start the gateway |
| `nanobot status` | Show status | | `nanobot status` | Show status |
| `nanobot channels login` | Link WhatsApp (scan QR) | | `nanobot channels login` | Link WhatsApp (scan QR) |
| `nanobot channels status` | Show channel status | | `nanobot channels status` | Show channel status |
Interactive mode exits: `exit`, `quit`, `/exit`, `/quit`, `:q`, or `Ctrl+D`.
<details> <details>
<summary><b>Scheduled Tasks (Cron)</b></summary> <summary><b>Scheduled Tasks (Cron)</b></summary>
@ -386,13 +592,13 @@ PRs welcome! The codebase is intentionally small and readable. 🤗
- [ ] **Multi-modal** — See and hear (images, voice, video) - [ ] **Multi-modal** — See and hear (images, voice, video)
- [ ] **Long-term memory** — Never forget important context - [ ] **Long-term memory** — Never forget important context
- [ ] **Better reasoning** — Multi-step planning and reflection - [ ] **Better reasoning** — Multi-step planning and reflection
- [ ] **More integrations**Discord, Slack, email, calendar - [ ] **More integrations** — Slack, calendar, and more
- [ ] **Self-improvement** — Learn from feedback and mistakes - [ ] **Self-improvement** — Learn from feedback and mistakes
### Contributors ### Contributors
<a href="https://github.com/HKUDS/nanobot/graphs/contributors"> <a href="https://github.com/HKUDS/nanobot/graphs/contributors">
<img src="https://contrib.rocks/image?repo=HKUDS/nanobot" /> <img src="https://contrib.rocks/image?repo=HKUDS/nanobot&max=100&columns=12" />
</a> </a>

264
SECURITY.md Normal file
View File

@ -0,0 +1,264 @@
# Security Policy
## Reporting a Vulnerability
If you discover a security vulnerability in nanobot, please report it by:
1. **DO NOT** open a public GitHub issue
2. Create a private security advisory on GitHub or contact the repository maintainers
3. Include:
- Description of the vulnerability
- Steps to reproduce
- Potential impact
- Suggested fix (if any)
We aim to respond to security reports within 48 hours.
## Security Best Practices
### 1. API Key Management
**CRITICAL**: Never commit API keys to version control.
```bash
# ✅ Good: Store in config file with restricted permissions
chmod 600 ~/.nanobot/config.json
# ❌ Bad: Hardcoding keys in code or committing them
```
**Recommendations:**
- Store API keys in `~/.nanobot/config.json` with file permissions set to `0600`
- Consider using environment variables for sensitive keys
- Use OS keyring/credential manager for production deployments
- Rotate API keys regularly
- Use separate API keys for development and production
### 2. Channel Access Control
**IMPORTANT**: Always configure `allowFrom` lists for production use.
```json
{
"channels": {
"telegram": {
"enabled": true,
"token": "YOUR_BOT_TOKEN",
"allowFrom": ["123456789", "987654321"]
},
"whatsapp": {
"enabled": true,
"allowFrom": ["+1234567890"]
}
}
}
```
**Security Notes:**
- Empty `allowFrom` list will **ALLOW ALL** users (open by default for personal use)
- Get your Telegram user ID from `@userinfobot`
- Use full phone numbers with country code for WhatsApp
- Review access logs regularly for unauthorized access attempts
### 3. Shell Command Execution
The `exec` tool can execute shell commands. While dangerous command patterns are blocked, you should:
- ✅ Review all tool usage in agent logs
- ✅ Understand what commands the agent is running
- ✅ Use a dedicated user account with limited privileges
- ✅ Never run nanobot as root
- ❌ Don't disable security checks
- ❌ Don't run on systems with sensitive data without careful review
**Blocked patterns:**
- `rm -rf /` - Root filesystem deletion
- Fork bombs
- Filesystem formatting (`mkfs.*`)
- Raw disk writes
- Other destructive operations
### 4. File System Access
File operations have path traversal protection, but:
- ✅ Run nanobot with a dedicated user account
- ✅ Use filesystem permissions to protect sensitive directories
- ✅ Regularly audit file operations in logs
- ❌ Don't give unrestricted access to sensitive files
### 5. Network Security
**API Calls:**
- All external API calls use HTTPS by default
- Timeouts are configured to prevent hanging requests
- Consider using a firewall to restrict outbound connections if needed
**WhatsApp Bridge:**
- The bridge runs on `localhost:3001` by default
- If exposing to network, use proper authentication and TLS
- Keep authentication data in `~/.nanobot/whatsapp-auth` secure (mode 0700)
### 6. Dependency Security
**Critical**: Keep dependencies updated!
```bash
# Check for vulnerable dependencies
pip install pip-audit
pip-audit
# Update to latest secure versions
pip install --upgrade nanobot-ai
```
For Node.js dependencies (WhatsApp bridge):
```bash
cd bridge
npm audit
npm audit fix
```
**Important Notes:**
- Keep `litellm` updated to the latest version for security fixes
- We've updated `ws` to `>=8.17.1` to fix DoS vulnerability
- Run `pip-audit` or `npm audit` regularly
- Subscribe to security advisories for nanobot and its dependencies
### 7. Production Deployment
For production use:
1. **Isolate the Environment**
```bash
# Run in a container or VM
docker run --rm -it python:3.11
pip install nanobot-ai
```
2. **Use a Dedicated User**
```bash
sudo useradd -m -s /bin/bash nanobot
sudo -u nanobot nanobot gateway
```
3. **Set Proper Permissions**
```bash
chmod 700 ~/.nanobot
chmod 600 ~/.nanobot/config.json
chmod 700 ~/.nanobot/whatsapp-auth
```
4. **Enable Logging**
```bash
# Configure log monitoring
tail -f ~/.nanobot/logs/nanobot.log
```
5. **Use Rate Limiting**
- Configure rate limits on your API providers
- Monitor usage for anomalies
- Set spending limits on LLM APIs
6. **Regular Updates**
```bash
# Check for updates weekly
pip install --upgrade nanobot-ai
```
### 8. Development vs Production
**Development:**
- Use separate API keys
- Test with non-sensitive data
- Enable verbose logging
- Use a test Telegram bot
**Production:**
- Use dedicated API keys with spending limits
- Restrict file system access
- Enable audit logging
- Regular security reviews
- Monitor for unusual activity
### 9. Data Privacy
- **Logs may contain sensitive information** - secure log files appropriately
- **LLM providers see your prompts** - review their privacy policies
- **Chat history is stored locally** - protect the `~/.nanobot` directory
- **API keys are in plain text** - use OS keyring for production
### 10. Incident Response
If you suspect a security breach:
1. **Immediately revoke compromised API keys**
2. **Review logs for unauthorized access**
```bash
grep "Access denied" ~/.nanobot/logs/nanobot.log
```
3. **Check for unexpected file modifications**
4. **Rotate all credentials**
5. **Update to latest version**
6. **Report the incident** to maintainers
## Security Features
### Built-in Security Controls
✅ **Input Validation**
- Path traversal protection on file operations
- Dangerous command pattern detection
- Input length limits on HTTP requests
✅ **Authentication**
- Allow-list based access control
- Failed authentication attempt logging
- Open by default (configure allowFrom for production use)
✅ **Resource Protection**
- Command execution timeouts (60s default)
- Output truncation (10KB limit)
- HTTP request timeouts (10-30s)
✅ **Secure Communication**
- HTTPS for all external API calls
- TLS for Telegram API
- WebSocket security for WhatsApp bridge
## Known Limitations
⚠️ **Current Security Limitations:**
1. **No Rate Limiting** - Users can send unlimited messages (add your own if needed)
2. **Plain Text Config** - API keys stored in plain text (use keyring for production)
3. **No Session Management** - No automatic session expiry
4. **Limited Command Filtering** - Only blocks obvious dangerous patterns
5. **No Audit Trail** - Limited security event logging (enhance as needed)
## Security Checklist
Before deploying nanobot:
- [ ] API keys stored securely (not in code)
- [ ] Config file permissions set to 0600
- [ ] `allowFrom` lists configured for all channels
- [ ] Running as non-root user
- [ ] File system permissions properly restricted
- [ ] Dependencies updated to latest secure versions
- [ ] Logs monitored for security events
- [ ] Rate limits configured on API providers
- [ ] Backup and disaster recovery plan in place
- [ ] Security review of custom skills/tools
## Updates
**Last Updated**: 2026-02-03
For the latest security updates and announcements, check:
- GitHub Security Advisories: https://github.com/HKUDS/nanobot/security/advisories
- Release Notes: https://github.com/HKUDS/nanobot/releases
## License
See LICENSE file for details.

View File

@ -11,7 +11,7 @@
}, },
"dependencies": { "dependencies": {
"@whiskeysockets/baileys": "7.0.0-rc.9", "@whiskeysockets/baileys": "7.0.0-rc.9",
"ws": "^8.17.0", "ws": "^8.17.1",
"qrcode-terminal": "^0.12.0", "qrcode-terminal": "^0.12.0",
"pino": "^9.0.0" "pino": "^9.0.0"
}, },

View File

@ -20,6 +20,7 @@ const VERSION = '0.1.0';
export interface InboundMessage { export interface InboundMessage {
id: string; id: string;
sender: string; sender: string;
pn: string;
content: string; content: string;
timestamp: number; timestamp: number;
isGroup: boolean; isGroup: boolean;
@ -123,6 +124,7 @@ export class WhatsAppClient {
this.options.onMessage({ this.options.onMessage({
id: msg.key.id || '', id: msg.key.id || '',
sender: msg.key.remoteJid || '', sender: msg.key.remoteJid || '',
pn: msg.key.remoteJidAlt || '',
content, content,
timestamp: msg.messageTimestamp as number, timestamp: msg.messageTimestamp as number,
isGroup, isGroup,

21
core_agent_lines.sh Executable file
View File

@ -0,0 +1,21 @@
#!/bin/bash
# Count core agent lines (excluding channels/, cli/, providers/ adapters)
cd "$(dirname "$0")" || exit 1
echo "nanobot core agent line count"
echo "================================"
echo ""
for dir in agent agent/tools bus config cron heartbeat session utils; do
count=$(find "nanobot/$dir" -maxdepth 1 -name "*.py" -exec cat {} + | wc -l)
printf " %-16s %5s lines\n" "$dir/" "$count"
done
root=$(cat nanobot/__init__.py nanobot/__main__.py | wc -l)
printf " %-16s %5s lines\n" "(root)" "$root"
echo ""
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/providers/*" | xargs cat | wc -l)
echo " Core total: $total lines"
echo ""
echo " (excludes: channels/, cli/, providers/)"

View File

@ -2,6 +2,7 @@
import base64 import base64
import mimetypes import mimetypes
import platform
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -74,6 +75,8 @@ Skills with available="false" need dependencies installed first - you can try in
from datetime import datetime from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)") now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
workspace_path = str(self.workspace.expanduser().resolve()) workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
return f"""# nanobot 🐈 return f"""# nanobot 🐈
@ -87,6 +90,9 @@ You are nanobot, a helpful AI assistant. You have access to tools that allow you
## Current Time ## Current Time
{now} {now}
## Runtime
{runtime}
## Workspace ## Workspace
Your workspace is at: {workspace_path} Your workspace is at: {workspace_path}
- Memory files: {workspace_path}/memory/MEMORY.md - Memory files: {workspace_path}/memory/MEMORY.md
@ -118,6 +124,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
current_message: str, current_message: str,
skill_names: list[str] | None = None, skill_names: list[str] | None = None,
media: list[str] | None = None, media: list[str] | None = None,
channel: str | None = None,
chat_id: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Build the complete message list for an LLM call. Build the complete message list for an LLM call.
@ -127,6 +135,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
current_message: The new user message. current_message: The new user message.
skill_names: Optional skills to include. skill_names: Optional skills to include.
media: Optional list of local file paths for images/media. media: Optional list of local file paths for images/media.
channel: Current channel (telegram, feishu, etc.).
chat_id: Current chat/user ID.
Returns: Returns:
List of messages including system prompt. List of messages including system prompt.
@ -135,6 +145,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
# System prompt # System prompt
system_prompt = self.build_system_prompt(skill_names) system_prompt = self.build_system_prompt(skill_names)
if channel and chat_id:
system_prompt += f"\n\n## Current Session\nChannel: {channel}\nChat ID: {chat_id}"
messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "system", "content": system_prompt})
# History # History
@ -195,7 +207,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
self, self,
messages: list[dict[str, Any]], messages: list[dict[str, Any]],
content: str | None, content: str | None,
tool_calls: list[dict[str, Any]] | None = None tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
""" """
Add an assistant message to the message list. Add an assistant message to the message list.
@ -204,6 +217,7 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
messages: Current message list. messages: Current message list.
content: Message content. content: Message content.
tool_calls: Optional tool calls. tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns: Returns:
Updated message list. Updated message list.
@ -213,5 +227,9 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
if tool_calls: if tool_calls:
msg["tool_calls"] = tool_calls msg["tool_calls"] = tool_calls
# Thinking models reject history without this
if reasoning_content:
msg["reasoning_content"] = reasoning_content
messages.append(msg) messages.append(msg)
return messages return messages

View File

@ -17,6 +17,7 @@ from nanobot.agent.tools.shell import ExecTool
from nanobot.agent.tools.web import WebSearchTool, WebFetchTool from nanobot.agent.tools.web import WebSearchTool, WebFetchTool
from nanobot.agent.tools.message import MessageTool from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.spawn import SpawnTool from nanobot.agent.tools.spawn import SpawnTool
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.subagent import SubagentManager from nanobot.agent.subagent import SubagentManager
from nanobot.session.manager import SessionManager from nanobot.session.manager import SessionManager
@ -42,8 +43,12 @@ class AgentLoop:
max_iterations: int = 20, max_iterations: int = 20,
brave_api_key: str | None = None, brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None, exec_config: "ExecToolConfig | None" = None,
cron_service: "CronService | None" = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
from nanobot.cron.service import CronService
self.bus = bus self.bus = bus
self.provider = provider self.provider = provider
self.workspace = workspace self.workspace = workspace
@ -51,9 +56,11 @@ class AgentLoop:
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.brave_api_key = brave_api_key self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig() self.exec_config = exec_config or ExecToolConfig()
self.cron_service = cron_service
self.restrict_to_workspace = restrict_to_workspace
self.context = ContextBuilder(workspace) self.context = ContextBuilder(workspace)
self.sessions = SessionManager(workspace) self.sessions = session_manager or SessionManager(workspace)
self.tools = ToolRegistry() self.tools = ToolRegistry()
self.subagents = SubagentManager( self.subagents = SubagentManager(
provider=provider, provider=provider,
@ -62,6 +69,7 @@ class AgentLoop:
model=self.model, model=self.model,
brave_api_key=brave_api_key, brave_api_key=brave_api_key,
exec_config=self.exec_config, exec_config=self.exec_config,
restrict_to_workspace=restrict_to_workspace,
) )
self._running = False self._running = False
@ -69,17 +77,18 @@ class AgentLoop:
def _register_default_tools(self) -> None: def _register_default_tools(self) -> None:
"""Register the default set of tools.""" """Register the default set of tools."""
# File tools # File tools (restrict to workspace if configured)
self.tools.register(ReadFileTool()) allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(WriteFileTool()) self.tools.register(ReadFileTool(allowed_dir=allowed_dir))
self.tools.register(EditFileTool()) self.tools.register(WriteFileTool(allowed_dir=allowed_dir))
self.tools.register(ListDirTool()) self.tools.register(EditFileTool(allowed_dir=allowed_dir))
self.tools.register(ListDirTool(allowed_dir=allowed_dir))
# Shell tool # Shell tool
self.tools.register(ExecTool( self.tools.register(ExecTool(
working_dir=str(self.workspace), working_dir=str(self.workspace),
timeout=self.exec_config.timeout, timeout=self.exec_config.timeout,
restrict_to_workspace=self.exec_config.restrict_to_workspace, restrict_to_workspace=self.restrict_to_workspace,
)) ))
# Web tools # Web tools
@ -93,6 +102,10 @@ class AgentLoop:
# Spawn tool (for subagents) # Spawn tool (for subagents)
spawn_tool = SpawnTool(manager=self.subagents) spawn_tool = SpawnTool(manager=self.subagents)
self.tools.register(spawn_tool) self.tools.register(spawn_tool)
# Cron tool (for scheduling)
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
async def run(self) -> None: async def run(self) -> None:
"""Run the agent loop, processing messages from the bus.""" """Run the agent loop, processing messages from the bus."""
@ -143,7 +156,8 @@ class AgentLoop:
if msg.channel == "system": if msg.channel == "system":
return await self._process_system_message(msg) return await self._process_system_message(msg)
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}") preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info(f"Processing message from {msg.channel}:{msg.sender_id}: {preview}")
# Get or create session # Get or create session
session = self.sessions.get_or_create(msg.session_key) session = self.sessions.get_or_create(msg.session_key)
@ -157,11 +171,17 @@ class AgentLoop:
if isinstance(spawn_tool, SpawnTool): if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(msg.channel, msg.chat_id) spawn_tool.set_context(msg.channel, msg.chat_id)
cron_tool = self.tools.get("cron")
if isinstance(cron_tool, CronTool):
cron_tool.set_context(msg.channel, msg.chat_id)
# Build initial messages (use get_history for LLM-formatted messages) # Build initial messages (use get_history for LLM-formatted messages)
messages = self.context.build_messages( messages = self.context.build_messages(
history=session.get_history(), history=session.get_history(),
current_message=msg.content, current_message=msg.content,
media=msg.media if msg.media else None, media=msg.media if msg.media else None,
channel=msg.channel,
chat_id=msg.chat_id,
) )
# Agent loop # Agent loop
@ -193,13 +213,14 @@ class AgentLoop:
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
) )
# Execute tools # Execute tools
for tool_call in response.tool_calls: for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments) args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.debug(f"Executing tool: {tool_call.name} with arguments: {args_str}") logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments) result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result( messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result messages, tool_call.id, tool_call.name, result
@ -212,6 +233,10 @@ class AgentLoop:
if final_content is None: if final_content is None:
final_content = "I've completed processing but have no response to give." final_content = "I've completed processing but have no response to give."
# Log response preview
preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
logger.info(f"Response to {msg.channel}:{msg.sender_id}: {preview}")
# Save to session # Save to session
session.add_message("user", msg.content) session.add_message("user", msg.content)
session.add_message("assistant", final_content) session.add_message("assistant", final_content)
@ -256,10 +281,16 @@ class AgentLoop:
if isinstance(spawn_tool, SpawnTool): if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(origin_channel, origin_chat_id) spawn_tool.set_context(origin_channel, origin_chat_id)
cron_tool = self.tools.get("cron")
if isinstance(cron_tool, CronTool):
cron_tool.set_context(origin_channel, origin_chat_id)
# Build messages with the announce content # Build messages with the announce content
messages = self.context.build_messages( messages = self.context.build_messages(
history=session.get_history(), history=session.get_history(),
current_message=msg.content current_message=msg.content,
channel=origin_channel,
chat_id=origin_chat_id,
) )
# Agent loop (limited for announce handling) # Agent loop (limited for announce handling)
@ -288,12 +319,13 @@ class AgentLoop:
for tc in response.tool_calls for tc in response.tool_calls
] ]
messages = self.context.add_assistant_message( messages = self.context.add_assistant_message(
messages, response.content, tool_call_dicts messages, response.content, tool_call_dicts,
reasoning_content=response.reasoning_content,
) )
for tool_call in response.tool_calls: for tool_call in response.tool_calls:
args_str = json.dumps(tool_call.arguments) args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.debug(f"Executing tool: {tool_call.name} with arguments: {args_str}") logger.info(f"Tool call: {tool_call.name}({args_str[:200]})")
result = await self.tools.execute(tool_call.name, tool_call.arguments) result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self.context.add_tool_result( messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result messages, tool_call.id, tool_call.name, result
@ -316,21 +348,29 @@ class AgentLoop:
content=final_content content=final_content
) )
async def process_direct(self, content: str, session_key: str = "cli:direct") -> str: async def process_direct(
self,
content: str,
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
) -> str:
""" """
Process a message directly (for CLI usage). Process a message directly (for CLI or cron usage).
Args: Args:
content: The message content. content: The message content.
session_key: Session identifier. session_key: Session identifier.
channel: Source channel (for context).
chat_id: Source chat ID (for context).
Returns: Returns:
The agent's response. The agent's response.
""" """
msg = InboundMessage( msg = InboundMessage(
channel="cli", channel=channel,
sender_id="user", sender_id="user",
chat_id="direct", chat_id=chat_id,
content=content content=content
) )

View File

@ -34,6 +34,7 @@ class SubagentManager:
model: str | None = None, model: str | None = None,
brave_api_key: str | None = None, brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None, exec_config: "ExecToolConfig | None" = None,
restrict_to_workspace: bool = False,
): ):
from nanobot.config.schema import ExecToolConfig from nanobot.config.schema import ExecToolConfig
self.provider = provider self.provider = provider
@ -42,6 +43,7 @@ class SubagentManager:
self.model = model or provider.get_default_model() self.model = model or provider.get_default_model()
self.brave_api_key = brave_api_key self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig() self.exec_config = exec_config or ExecToolConfig()
self.restrict_to_workspace = restrict_to_workspace
self._running_tasks: dict[str, asyncio.Task[None]] = {} self._running_tasks: dict[str, asyncio.Task[None]] = {}
async def spawn( async def spawn(
@ -96,13 +98,14 @@ class SubagentManager:
try: try:
# Build subagent tools (no message tool, no spawn tool) # Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry() tools = ToolRegistry()
tools.register(ReadFileTool()) allowed_dir = self.workspace if self.restrict_to_workspace else None
tools.register(WriteFileTool()) tools.register(ReadFileTool(allowed_dir=allowed_dir))
tools.register(ListDirTool()) tools.register(WriteFileTool(allowed_dir=allowed_dir))
tools.register(ListDirTool(allowed_dir=allowed_dir))
tools.register(ExecTool( tools.register(ExecTool(
working_dir=str(self.workspace), working_dir=str(self.workspace),
timeout=self.exec_config.timeout, timeout=self.exec_config.timeout,
restrict_to_workspace=self.exec_config.restrict_to_workspace, restrict_to_workspace=self.restrict_to_workspace,
)) ))
tools.register(WebSearchTool(api_key=self.brave_api_key)) tools.register(WebSearchTool(api_key=self.brave_api_key))
tools.register(WebFetchTool()) tools.register(WebFetchTool())
@ -149,7 +152,8 @@ class SubagentManager:
# Execute tools # Execute tools
for tool_call in response.tool_calls: for tool_call in response.tool_calls:
logger.debug(f"Subagent [{task_id}] executing: {tool_call.name}") args_str = json.dumps(tool_call.arguments)
logger.debug(f"Subagent [{task_id}] executing: {tool_call.name} with arguments: {args_str}")
result = await tools.execute(tool_call.name, tool_call.arguments) result = await tools.execute(tool_call.name, tool_call.arguments)
messages.append({ messages.append({
"role": "tool", "role": "tool",

114
nanobot/agent/tools/cron.py Normal file
View File

@ -0,0 +1,114 @@
"""Cron tool for scheduling reminders and tasks."""
from typing import Any
from nanobot.agent.tools.base import Tool
from nanobot.cron.service import CronService
from nanobot.cron.types import CronSchedule
class CronTool(Tool):
"""Tool to schedule reminders and recurring tasks."""
def __init__(self, cron_service: CronService):
self._cron = cron_service
self._channel = ""
self._chat_id = ""
def set_context(self, channel: str, chat_id: str) -> None:
"""Set the current session context for delivery."""
self._channel = channel
self._chat_id = chat_id
@property
def name(self) -> str:
return "cron"
@property
def description(self) -> str:
return "Schedule reminders and recurring tasks. Actions: add, list, remove."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "list", "remove"],
"description": "Action to perform"
},
"message": {
"type": "string",
"description": "Reminder message (for add)"
},
"every_seconds": {
"type": "integer",
"description": "Interval in seconds (for recurring tasks)"
},
"cron_expr": {
"type": "string",
"description": "Cron expression like '0 9 * * *' (for scheduled tasks)"
},
"job_id": {
"type": "string",
"description": "Job ID (for remove)"
}
},
"required": ["action"]
}
async def execute(
self,
action: str,
message: str = "",
every_seconds: int | None = None,
cron_expr: str | None = None,
job_id: str | None = None,
**kwargs: Any
) -> str:
if action == "add":
return self._add_job(message, every_seconds, cron_expr)
elif action == "list":
return self._list_jobs()
elif action == "remove":
return self._remove_job(job_id)
return f"Unknown action: {action}"
def _add_job(self, message: str, every_seconds: int | None, cron_expr: str | None) -> str:
if not message:
return "Error: message is required for add"
if not self._channel or not self._chat_id:
return "Error: no session context (channel/chat_id)"
# Build schedule
if every_seconds:
schedule = CronSchedule(kind="every", every_ms=every_seconds * 1000)
elif cron_expr:
schedule = CronSchedule(kind="cron", expr=cron_expr)
else:
return "Error: either every_seconds or cron_expr is required"
job = self._cron.add_job(
name=message[:30],
schedule=schedule,
message=message,
deliver=True,
channel=self._channel,
to=self._chat_id,
)
return f"Created job '{job.name}' (id: {job.id})"
def _list_jobs(self) -> str:
jobs = self._cron.list_jobs()
if not jobs:
return "No scheduled jobs."
lines = [f"- {j.name} (id: {j.id}, {j.schedule.kind})" for j in jobs]
return "Scheduled jobs:\n" + "\n".join(lines)
def _remove_job(self, job_id: str | None) -> str:
if not job_id:
return "Error: job_id is required for remove"
if self._cron.remove_job(job_id):
return f"Removed job {job_id}"
return f"Job {job_id} not found"

View File

@ -6,9 +6,20 @@ from typing import Any
from nanobot.agent.tools.base import Tool from nanobot.agent.tools.base import Tool
def _resolve_path(path: str, allowed_dir: Path | None = None) -> Path:
"""Resolve path and optionally enforce directory restriction."""
resolved = Path(path).expanduser().resolve()
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())):
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
return resolved
class ReadFileTool(Tool): class ReadFileTool(Tool):
"""Tool to read file contents.""" """Tool to read file contents."""
def __init__(self, allowed_dir: Path | None = None):
self._allowed_dir = allowed_dir
@property @property
def name(self) -> str: def name(self) -> str:
return "read_file" return "read_file"
@ -32,7 +43,7 @@ class ReadFileTool(Tool):
async def execute(self, path: str, **kwargs: Any) -> str: async def execute(self, path: str, **kwargs: Any) -> str:
try: try:
file_path = Path(path).expanduser() file_path = _resolve_path(path, self._allowed_dir)
if not file_path.exists(): if not file_path.exists():
return f"Error: File not found: {path}" return f"Error: File not found: {path}"
if not file_path.is_file(): if not file_path.is_file():
@ -40,8 +51,8 @@ class ReadFileTool(Tool):
content = file_path.read_text(encoding="utf-8") content = file_path.read_text(encoding="utf-8")
return content return content
except PermissionError: except PermissionError as e:
return f"Error: Permission denied: {path}" return f"Error: {e}"
except Exception as e: except Exception as e:
return f"Error reading file: {str(e)}" return f"Error reading file: {str(e)}"
@ -49,6 +60,9 @@ class ReadFileTool(Tool):
class WriteFileTool(Tool): class WriteFileTool(Tool):
"""Tool to write content to a file.""" """Tool to write content to a file."""
def __init__(self, allowed_dir: Path | None = None):
self._allowed_dir = allowed_dir
@property @property
def name(self) -> str: def name(self) -> str:
return "write_file" return "write_file"
@ -76,12 +90,12 @@ class WriteFileTool(Tool):
async def execute(self, path: str, content: str, **kwargs: Any) -> str: async def execute(self, path: str, content: str, **kwargs: Any) -> str:
try: try:
file_path = Path(path).expanduser() file_path = _resolve_path(path, self._allowed_dir)
file_path.parent.mkdir(parents=True, exist_ok=True) file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8") file_path.write_text(content, encoding="utf-8")
return f"Successfully wrote {len(content)} bytes to {path}" return f"Successfully wrote {len(content)} bytes to {path}"
except PermissionError: except PermissionError as e:
return f"Error: Permission denied: {path}" return f"Error: {e}"
except Exception as e: except Exception as e:
return f"Error writing file: {str(e)}" return f"Error writing file: {str(e)}"
@ -89,6 +103,9 @@ class WriteFileTool(Tool):
class EditFileTool(Tool): class EditFileTool(Tool):
"""Tool to edit a file by replacing text.""" """Tool to edit a file by replacing text."""
def __init__(self, allowed_dir: Path | None = None):
self._allowed_dir = allowed_dir
@property @property
def name(self) -> str: def name(self) -> str:
return "edit_file" return "edit_file"
@ -120,7 +137,7 @@ class EditFileTool(Tool):
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str: async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
try: try:
file_path = Path(path).expanduser() file_path = _resolve_path(path, self._allowed_dir)
if not file_path.exists(): if not file_path.exists():
return f"Error: File not found: {path}" return f"Error: File not found: {path}"
@ -138,8 +155,8 @@ class EditFileTool(Tool):
file_path.write_text(new_content, encoding="utf-8") file_path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {path}" return f"Successfully edited {path}"
except PermissionError: except PermissionError as e:
return f"Error: Permission denied: {path}" return f"Error: {e}"
except Exception as e: except Exception as e:
return f"Error editing file: {str(e)}" return f"Error editing file: {str(e)}"
@ -147,6 +164,9 @@ class EditFileTool(Tool):
class ListDirTool(Tool): class ListDirTool(Tool):
"""Tool to list directory contents.""" """Tool to list directory contents."""
def __init__(self, allowed_dir: Path | None = None):
self._allowed_dir = allowed_dir
@property @property
def name(self) -> str: def name(self) -> str:
return "list_dir" return "list_dir"
@ -170,7 +190,7 @@ class ListDirTool(Tool):
async def execute(self, path: str, **kwargs: Any) -> str: async def execute(self, path: str, **kwargs: Any) -> str:
try: try:
dir_path = Path(path).expanduser() dir_path = _resolve_path(path, self._allowed_dir)
if not dir_path.exists(): if not dir_path.exists():
return f"Error: Directory not found: {path}" return f"Error: Directory not found: {path}"
if not dir_path.is_dir(): if not dir_path.is_dir():
@ -185,7 +205,7 @@ class ListDirTool(Tool):
return f"Directory {path} is empty" return f"Directory {path} is empty"
return "\n".join(items) return "\n".join(items)
except PermissionError: except PermissionError as e:
return f"Error: Permission denied: {path}" return f"Error: {e}"
except Exception as e: except Exception as e:
return f"Error listing directory: {str(e)}" return f"Error listing directory: {str(e)}"

View File

@ -3,6 +3,8 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any from typing import Any
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
@ -102,6 +104,10 @@ class BaseChannel(ABC):
metadata: Optional channel-specific metadata. metadata: Optional channel-specific metadata.
""" """
if not self.is_allowed(sender_id): if not self.is_allowed(sender_id):
logger.warning(
f"Access denied for sender {sender_id} on channel {self.name}. "
f"Add them to allowFrom list in config to grant access."
)
return return
msg = InboundMessage( msg = InboundMessage(

View File

@ -0,0 +1,238 @@
"""DingTalk/DingDing channel implementation using Stream Mode."""
import asyncio
import json
import time
from typing import Any
from loguru import logger
import httpx
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DingTalkConfig
try:
from dingtalk_stream import (
DingTalkStreamClient,
Credential,
CallbackHandler,
CallbackMessage,
AckMessage,
)
from dingtalk_stream.chatbot import ChatbotMessage
DINGTALK_AVAILABLE = True
except ImportError:
DINGTALK_AVAILABLE = False
# Fallback so class definitions don't crash at module level
CallbackHandler = object # type: ignore[assignment,misc]
CallbackMessage = None # type: ignore[assignment,misc]
AckMessage = None # type: ignore[assignment,misc]
ChatbotMessage = None # type: ignore[assignment,misc]
class NanobotDingTalkHandler(CallbackHandler):
"""
Standard DingTalk Stream SDK Callback Handler.
Parses incoming messages and forwards them to the Nanobot channel.
"""
def __init__(self, channel: "DingTalkChannel"):
super().__init__()
self.channel = channel
async def process(self, message: CallbackMessage):
"""Process incoming stream message."""
try:
# Parse using SDK's ChatbotMessage for robust handling
chatbot_msg = ChatbotMessage.from_dict(message.data)
# Extract text content; fall back to raw dict if SDK object is empty
content = ""
if chatbot_msg.text:
content = chatbot_msg.text.content.strip()
if not content:
content = message.data.get("text", {}).get("content", "").strip()
if not content:
logger.warning(
f"Received empty or unsupported message type: {chatbot_msg.message_type}"
)
return AckMessage.STATUS_OK, "OK"
sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id
sender_name = chatbot_msg.sender_nick or "Unknown"
logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}")
# Forward to Nanobot via _on_message (non-blocking).
# Store reference to prevent GC before task completes.
task = asyncio.create_task(
self.channel._on_message(content, sender_id, sender_name)
)
self.channel._background_tasks.add(task)
task.add_done_callback(self.channel._background_tasks.discard)
return AckMessage.STATUS_OK, "OK"
except Exception as e:
logger.error(f"Error processing DingTalk message: {e}")
# Return OK to avoid retry loop from DingTalk server
return AckMessage.STATUS_OK, "Error"
class DingTalkChannel(BaseChannel):
"""
DingTalk channel using Stream Mode.
Uses WebSocket to receive events via `dingtalk-stream` SDK.
Uses direct HTTP API to send messages (SDK is mainly for receiving).
Note: Currently only supports private (1:1) chat. Group messages are
received but replies are sent back as private messages to the sender.
"""
name = "dingtalk"
def __init__(self, config: DingTalkConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DingTalkConfig = config
self._client: Any = None
self._http: httpx.AsyncClient | None = None
# Access Token management for sending messages
self._access_token: str | None = None
self._token_expiry: float = 0
# Hold references to background tasks to prevent GC
self._background_tasks: set[asyncio.Task] = set()
async def start(self) -> None:
"""Start the DingTalk bot with Stream Mode."""
try:
if not DINGTALK_AVAILABLE:
logger.error(
"DingTalk Stream SDK not installed. Run: pip install dingtalk-stream"
)
return
if not self.config.client_id or not self.config.client_secret:
logger.error("DingTalk client_id and client_secret not configured")
return
self._running = True
self._http = httpx.AsyncClient()
logger.info(
f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..."
)
credential = Credential(self.config.client_id, self.config.client_secret)
self._client = DingTalkStreamClient(credential)
# Register standard handler
handler = NanobotDingTalkHandler(self)
self._client.register_callback_handler(ChatbotMessage.TOPIC, handler)
logger.info("DingTalk bot started with Stream Mode")
# client.start() is an async infinite loop handling the websocket connection
await self._client.start()
except Exception as e:
logger.exception(f"Failed to start DingTalk channel: {e}")
async def stop(self) -> None:
"""Stop the DingTalk bot."""
self._running = False
# Close the shared HTTP client
if self._http:
await self._http.aclose()
self._http = None
# Cancel outstanding background tasks
for task in self._background_tasks:
task.cancel()
self._background_tasks.clear()
async def _get_access_token(self) -> str | None:
"""Get or refresh Access Token."""
if self._access_token and time.time() < self._token_expiry:
return self._access_token
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
data = {
"appKey": self.config.client_id,
"appSecret": self.config.client_secret,
}
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot refresh token")
return None
try:
resp = await self._http.post(url, json=data)
resp.raise_for_status()
res_data = resp.json()
self._access_token = res_data.get("accessToken")
# Expire 60s early to be safe
self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60
return self._access_token
except Exception as e:
logger.error(f"Failed to get DingTalk access token: {e}")
return None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through DingTalk."""
token = await self._get_access_token()
if not token:
return
# oToMessages/batchSend: sends to individual users (private chat)
# https://open.dingtalk.com/document/orgapp/robot-batch-send-messages
url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
headers = {"x-acs-dingtalk-access-token": token}
data = {
"robotCode": self.config.client_id,
"userIds": [msg.chat_id], # chat_id is the user's staffId
"msgKey": "sampleMarkdown",
"msgParam": json.dumps({
"text": msg.content,
"title": "Nanobot Reply",
}),
}
if not self._http:
logger.warning("DingTalk HTTP client not initialized, cannot send")
return
try:
resp = await self._http.post(url, json=data, headers=headers)
if resp.status_code != 200:
logger.error(f"DingTalk send failed: {resp.text}")
else:
logger.debug(f"DingTalk message sent to {msg.chat_id}")
except Exception as e:
logger.error(f"Error sending DingTalk message: {e}")
async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None:
"""Handle incoming message (called by NanobotDingTalkHandler).
Delegates to BaseChannel._handle_message() which enforces allow_from
permission checks before publishing to the bus.
"""
try:
logger.info(f"DingTalk inbound: {content} from {sender_name}")
await self._handle_message(
sender_id=sender_id,
chat_id=sender_id, # For private chat, chat_id == sender_id
content=str(content),
metadata={
"sender_name": sender_name,
"platform": "dingtalk",
},
)
except Exception as e:
logger.error(f"Error publishing DingTalk message: {e}")

261
nanobot/channels/discord.py Normal file
View File

@ -0,0 +1,261 @@
"""Discord channel implementation using Discord Gateway websocket."""
import asyncio
import json
from pathlib import Path
from typing import Any
import httpx
import websockets
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import DiscordConfig
DISCORD_API_BASE = "https://discord.com/api/v10"
MAX_ATTACHMENT_BYTES = 20 * 1024 * 1024 # 20MB
class DiscordChannel(BaseChannel):
"""Discord channel using Gateway websocket."""
name = "discord"
def __init__(self, config: DiscordConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: DiscordConfig = config
self._ws: websockets.WebSocketClientProtocol | None = None
self._seq: int | None = None
self._heartbeat_task: asyncio.Task | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
self._http: httpx.AsyncClient | None = None
async def start(self) -> None:
"""Start the Discord gateway connection."""
if not self.config.token:
logger.error("Discord bot token not configured")
return
self._running = True
self._http = httpx.AsyncClient(timeout=30.0)
while self._running:
try:
logger.info("Connecting to Discord gateway...")
async with websockets.connect(self.config.gateway_url) as ws:
self._ws = ws
await self._gateway_loop()
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Discord gateway error: {e}")
if self._running:
logger.info("Reconnecting to Discord gateway in 5 seconds...")
await asyncio.sleep(5)
async def stop(self) -> None:
"""Stop the Discord channel."""
self._running = False
if self._heartbeat_task:
self._heartbeat_task.cancel()
self._heartbeat_task = None
for task in self._typing_tasks.values():
task.cancel()
self._typing_tasks.clear()
if self._ws:
await self._ws.close()
self._ws = None
if self._http:
await self._http.aclose()
self._http = None
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Discord REST API."""
if not self._http:
logger.warning("Discord HTTP client not initialized")
return
url = f"{DISCORD_API_BASE}/channels/{msg.chat_id}/messages"
payload: dict[str, Any] = {"content": msg.content}
if msg.reply_to:
payload["message_reference"] = {"message_id": msg.reply_to}
payload["allowed_mentions"] = {"replied_user": False}
headers = {"Authorization": f"Bot {self.config.token}"}
try:
for attempt in range(3):
try:
response = await self._http.post(url, headers=headers, json=payload)
if response.status_code == 429:
data = response.json()
retry_after = float(data.get("retry_after", 1.0))
logger.warning(f"Discord rate limited, retrying in {retry_after}s")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return
except Exception as e:
if attempt == 2:
logger.error(f"Error sending Discord message: {e}")
else:
await asyncio.sleep(1)
finally:
await self._stop_typing(msg.chat_id)
async def _gateway_loop(self) -> None:
"""Main gateway loop: identify, heartbeat, dispatch events."""
if not self._ws:
return
async for raw in self._ws:
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from Discord gateway: {raw[:100]}")
continue
op = data.get("op")
event_type = data.get("t")
seq = data.get("s")
payload = data.get("d")
if seq is not None:
self._seq = seq
if op == 10:
# HELLO: start heartbeat and identify
interval_ms = payload.get("heartbeat_interval", 45000)
await self._start_heartbeat(interval_ms / 1000)
await self._identify()
elif op == 0 and event_type == "READY":
logger.info("Discord gateway READY")
elif op == 0 and event_type == "MESSAGE_CREATE":
await self._handle_message_create(payload)
elif op == 7:
# RECONNECT: exit loop to reconnect
logger.info("Discord gateway requested reconnect")
break
elif op == 9:
# INVALID_SESSION: reconnect
logger.warning("Discord gateway invalid session")
break
async def _identify(self) -> None:
"""Send IDENTIFY payload."""
if not self._ws:
return
identify = {
"op": 2,
"d": {
"token": self.config.token,
"intents": self.config.intents,
"properties": {
"os": "nanobot",
"browser": "nanobot",
"device": "nanobot",
},
},
}
await self._ws.send(json.dumps(identify))
async def _start_heartbeat(self, interval_s: float) -> None:
"""Start or restart the heartbeat loop."""
if self._heartbeat_task:
self._heartbeat_task.cancel()
async def heartbeat_loop() -> None:
while self._running and self._ws:
payload = {"op": 1, "d": self._seq}
try:
await self._ws.send(json.dumps(payload))
except Exception as e:
logger.warning(f"Discord heartbeat failed: {e}")
break
await asyncio.sleep(interval_s)
self._heartbeat_task = asyncio.create_task(heartbeat_loop())
async def _handle_message_create(self, payload: dict[str, Any]) -> None:
"""Handle incoming Discord messages."""
author = payload.get("author") or {}
if author.get("bot"):
return
sender_id = str(author.get("id", ""))
channel_id = str(payload.get("channel_id", ""))
content = payload.get("content") or ""
if not sender_id or not channel_id:
return
if not self.is_allowed(sender_id):
return
content_parts = [content] if content else []
media_paths: list[str] = []
media_dir = Path.home() / ".nanobot" / "media"
for attachment in payload.get("attachments") or []:
url = attachment.get("url")
filename = attachment.get("filename") or "attachment"
size = attachment.get("size") or 0
if not url or not self._http:
continue
if size and size > MAX_ATTACHMENT_BYTES:
content_parts.append(f"[attachment: {filename} - too large]")
continue
try:
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / f"{attachment.get('id', 'file')}_{filename.replace('/', '_')}"
resp = await self._http.get(url)
resp.raise_for_status()
file_path.write_bytes(resp.content)
media_paths.append(str(file_path))
content_parts.append(f"[attachment: {file_path}]")
except Exception as e:
logger.warning(f"Failed to download Discord attachment: {e}")
content_parts.append(f"[attachment: {filename} - download failed]")
reply_to = (payload.get("referenced_message") or {}).get("id")
await self._start_typing(channel_id)
await self._handle_message(
sender_id=sender_id,
chat_id=channel_id,
content="\n".join(p for p in content_parts if p) or "[empty message]",
media=media_paths,
metadata={
"message_id": str(payload.get("id", "")),
"guild_id": payload.get("guild_id"),
"reply_to": reply_to,
},
)
async def _start_typing(self, channel_id: str) -> None:
"""Start periodic typing indicator for a channel."""
await self._stop_typing(channel_id)
async def typing_loop() -> None:
url = f"{DISCORD_API_BASE}/channels/{channel_id}/typing"
headers = {"Authorization": f"Bot {self.config.token}"}
while self._running:
try:
await self._http.post(url, headers=headers)
except Exception:
pass
await asyncio.sleep(8)
self._typing_tasks[channel_id] = asyncio.create_task(typing_loop())
async def _stop_typing(self, channel_id: str) -> None:
"""Stop typing indicator for a channel."""
task = self._typing_tasks.pop(channel_id, None)
if task:
task.cancel()

403
nanobot/channels/email.py Normal file
View File

@ -0,0 +1,403 @@
"""Email channel implementation using IMAP polling + SMTP replies."""
import asyncio
import html
import imaplib
import re
import smtplib
import ssl
from datetime import date
from email import policy
from email.header import decode_header, make_header
from email.message import EmailMessage
from email.parser import BytesParser
from email.utils import parseaddr
from typing import Any
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import EmailConfig
class EmailChannel(BaseChannel):
"""
Email channel.
Inbound:
- Poll IMAP mailbox for unread messages.
- Convert each message into an inbound event.
Outbound:
- Send responses via SMTP back to the sender address.
"""
name = "email"
_IMAP_MONTHS = (
"Jan",
"Feb",
"Mar",
"Apr",
"May",
"Jun",
"Jul",
"Aug",
"Sep",
"Oct",
"Nov",
"Dec",
)
def __init__(self, config: EmailConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: EmailConfig = config
self._last_subject_by_chat: dict[str, str] = {}
self._last_message_id_by_chat: dict[str, str] = {}
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
self._MAX_PROCESSED_UIDS = 100000
async def start(self) -> None:
"""Start polling IMAP for inbound emails."""
if not self.config.consent_granted:
logger.warning(
"Email channel disabled: consent_granted is false. "
"Set channels.email.consentGranted=true after explicit user permission."
)
return
if not self._validate_config():
return
self._running = True
logger.info("Starting Email channel (IMAP polling mode)...")
poll_seconds = max(5, int(self.config.poll_interval_seconds))
while self._running:
try:
inbound_items = await asyncio.to_thread(self._fetch_new_messages)
for item in inbound_items:
sender = item["sender"]
subject = item.get("subject", "")
message_id = item.get("message_id", "")
if subject:
self._last_subject_by_chat[sender] = subject
if message_id:
self._last_message_id_by_chat[sender] = message_id
await self._handle_message(
sender_id=sender,
chat_id=sender,
content=item["content"],
metadata=item.get("metadata", {}),
)
except Exception as e:
logger.error(f"Email polling error: {e}")
await asyncio.sleep(poll_seconds)
async def stop(self) -> None:
"""Stop polling loop."""
self._running = False
async def send(self, msg: OutboundMessage) -> None:
"""Send email via SMTP."""
if not self.config.consent_granted:
logger.warning("Skip email send: consent_granted is false")
return
force_send = bool((msg.metadata or {}).get("force_send"))
if not self.config.auto_reply_enabled and not force_send:
logger.info("Skip automatic email reply: auto_reply_enabled is false")
return
if not self.config.smtp_host:
logger.warning("Email channel SMTP host not configured")
return
to_addr = msg.chat_id.strip()
if not to_addr:
logger.warning("Email channel missing recipient address")
return
base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply")
subject = self._reply_subject(base_subject)
if msg.metadata and isinstance(msg.metadata.get("subject"), str):
override = msg.metadata["subject"].strip()
if override:
subject = override
email_msg = EmailMessage()
email_msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
email_msg["To"] = to_addr
email_msg["Subject"] = subject
email_msg.set_content(msg.content or "")
in_reply_to = self._last_message_id_by_chat.get(to_addr)
if in_reply_to:
email_msg["In-Reply-To"] = in_reply_to
email_msg["References"] = in_reply_to
try:
await asyncio.to_thread(self._smtp_send, email_msg)
except Exception as e:
logger.error(f"Error sending email to {to_addr}: {e}")
raise
def _validate_config(self) -> bool:
missing = []
if not self.config.imap_host:
missing.append("imap_host")
if not self.config.imap_username:
missing.append("imap_username")
if not self.config.imap_password:
missing.append("imap_password")
if not self.config.smtp_host:
missing.append("smtp_host")
if not self.config.smtp_username:
missing.append("smtp_username")
if not self.config.smtp_password:
missing.append("smtp_password")
if missing:
logger.error(f"Email channel not configured, missing: {', '.join(missing)}")
return False
return True
def _smtp_send(self, msg: EmailMessage) -> None:
timeout = 30
if self.config.smtp_use_ssl:
with smtplib.SMTP_SSL(
self.config.smtp_host,
self.config.smtp_port,
timeout=timeout,
) as smtp:
smtp.login(self.config.smtp_username, self.config.smtp_password)
smtp.send_message(msg)
return
with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port, timeout=timeout) as smtp:
if self.config.smtp_use_tls:
smtp.starttls(context=ssl.create_default_context())
smtp.login(self.config.smtp_username, self.config.smtp_password)
smtp.send_message(msg)
def _fetch_new_messages(self) -> list[dict[str, Any]]:
"""Poll IMAP and return parsed unread messages."""
return self._fetch_messages(
search_criteria=("UNSEEN",),
mark_seen=self.config.mark_seen,
dedupe=True,
limit=0,
)
def fetch_messages_between_dates(
self,
start_date: date,
end_date: date,
limit: int = 20,
) -> list[dict[str, Any]]:
"""
Fetch messages in [start_date, end_date) by IMAP date search.
This is used for historical summarization tasks (e.g. "yesterday").
"""
if end_date <= start_date:
return []
return self._fetch_messages(
search_criteria=(
"SINCE",
self._format_imap_date(start_date),
"BEFORE",
self._format_imap_date(end_date),
),
mark_seen=False,
dedupe=False,
limit=max(1, int(limit)),
)
def _fetch_messages(
self,
search_criteria: tuple[str, ...],
mark_seen: bool,
dedupe: bool,
limit: int,
) -> list[dict[str, Any]]:
"""Fetch messages by arbitrary IMAP search criteria."""
messages: list[dict[str, Any]] = []
mailbox = self.config.imap_mailbox or "INBOX"
if self.config.imap_use_ssl:
client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
else:
client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
try:
client.login(self.config.imap_username, self.config.imap_password)
status, _ = client.select(mailbox)
if status != "OK":
return messages
status, data = client.search(None, *search_criteria)
if status != "OK" or not data:
return messages
ids = data[0].split()
if limit > 0 and len(ids) > limit:
ids = ids[-limit:]
for imap_id in ids:
status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
if status != "OK" or not fetched:
continue
raw_bytes = self._extract_message_bytes(fetched)
if raw_bytes is None:
continue
uid = self._extract_uid(fetched)
if dedupe and uid and uid in self._processed_uids:
continue
parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
if not sender:
continue
subject = self._decode_header_value(parsed.get("Subject", ""))
date_value = parsed.get("Date", "")
message_id = parsed.get("Message-ID", "").strip()
body = self._extract_text_body(parsed)
if not body:
body = "(empty email body)"
body = body[: self.config.max_body_chars]
content = (
f"Email received.\n"
f"From: {sender}\n"
f"Subject: {subject}\n"
f"Date: {date_value}\n\n"
f"{body}"
)
metadata = {
"message_id": message_id,
"subject": subject,
"date": date_value,
"sender_email": sender,
"uid": uid,
}
messages.append(
{
"sender": sender,
"subject": subject,
"message_id": message_id,
"content": content,
"metadata": metadata,
}
)
if dedupe and uid:
self._processed_uids.add(uid)
# mark_seen is the primary dedup; this set is a safety net
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
self._processed_uids.clear()
if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen")
finally:
try:
client.logout()
except Exception:
pass
return messages
@classmethod
def _format_imap_date(cls, value: date) -> str:
"""Format date for IMAP search (always English month abbreviations)."""
month = cls._IMAP_MONTHS[value.month - 1]
return f"{value.day:02d}-{month}-{value.year}"
@staticmethod
def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
for item in fetched:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
return bytes(item[1])
return None
@staticmethod
def _extract_uid(fetched: list[Any]) -> str:
for item in fetched:
if isinstance(item, tuple) and item and isinstance(item[0], (bytes, bytearray)):
head = bytes(item[0]).decode("utf-8", errors="ignore")
m = re.search(r"UID\s+(\d+)", head)
if m:
return m.group(1)
return ""
@staticmethod
def _decode_header_value(value: str) -> str:
if not value:
return ""
try:
return str(make_header(decode_header(value)))
except Exception:
return value
@classmethod
def _extract_text_body(cls, msg: Any) -> str:
"""Best-effort extraction of readable body text."""
if msg.is_multipart():
plain_parts: list[str] = []
html_parts: list[str] = []
for part in msg.walk():
if part.get_content_disposition() == "attachment":
continue
content_type = part.get_content_type()
try:
payload = part.get_content()
except Exception:
payload_bytes = part.get_payload(decode=True) or b""
charset = part.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
continue
if content_type == "text/plain":
plain_parts.append(payload)
elif content_type == "text/html":
html_parts.append(payload)
if plain_parts:
return "\n\n".join(plain_parts).strip()
if html_parts:
return cls._html_to_text("\n\n".join(html_parts)).strip()
return ""
try:
payload = msg.get_content()
except Exception:
payload_bytes = msg.get_payload(decode=True) or b""
charset = msg.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
return ""
if msg.get_content_type() == "text/html":
return cls._html_to_text(payload).strip()
return payload.strip()
@staticmethod
def _html_to_text(raw_html: str) -> str:
text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text)
def _reply_subject(self, base_subject: str) -> str:
subject = (base_subject or "").strip() or "nanobot reply"
prefix = self.config.subject_prefix or "Re: "
if subject.lower().startswith("re:"):
return subject
return f"{prefix}{subject}"

307
nanobot/channels/feishu.py Normal file
View File

@ -0,0 +1,307 @@
"""Feishu/Lark channel implementation using lark-oapi SDK with WebSocket long connection."""
import asyncio
import json
import re
import threading
from collections import OrderedDict
from typing import Any
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import FeishuConfig
try:
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
CreateMessageRequest,
CreateMessageRequestBody,
CreateMessageReactionRequest,
CreateMessageReactionRequestBody,
Emoji,
P2ImMessageReceiveV1,
)
FEISHU_AVAILABLE = True
except ImportError:
FEISHU_AVAILABLE = False
lark = None
Emoji = None
# Message type display mapping
MSG_TYPE_MAP = {
"image": "[image]",
"audio": "[audio]",
"file": "[file]",
"sticker": "[sticker]",
}
class FeishuChannel(BaseChannel):
"""
Feishu/Lark channel using WebSocket long connection.
Uses WebSocket to receive events - no public IP or webhook required.
Requires:
- App ID and App Secret from Feishu Open Platform
- Bot capability enabled
- Event subscription enabled (im.message.receive_v1)
"""
name = "feishu"
def __init__(self, config: FeishuConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: FeishuConfig = config
self._client: Any = None
self._ws_client: Any = None
self._ws_thread: threading.Thread | None = None
self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache
self._loop: asyncio.AbstractEventLoop | None = None
async def start(self) -> None:
"""Start the Feishu bot with WebSocket long connection."""
if not FEISHU_AVAILABLE:
logger.error("Feishu SDK not installed. Run: pip install lark-oapi")
return
if not self.config.app_id or not self.config.app_secret:
logger.error("Feishu app_id and app_secret not configured")
return
self._running = True
self._loop = asyncio.get_running_loop()
# Create Lark client for sending messages
self._client = lark.Client.builder() \
.app_id(self.config.app_id) \
.app_secret(self.config.app_secret) \
.log_level(lark.LogLevel.INFO) \
.build()
# Create event handler (only register message receive, ignore other events)
event_handler = lark.EventDispatcherHandler.builder(
self.config.encrypt_key or "",
self.config.verification_token or "",
).register_p2_im_message_receive_v1(
self._on_message_sync
).build()
# Create WebSocket client for long connection
self._ws_client = lark.ws.Client(
self.config.app_id,
self.config.app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.INFO
)
# Start WebSocket client in a separate thread
def run_ws():
try:
self._ws_client.start()
except Exception as e:
logger.error(f"Feishu WebSocket error: {e}")
self._ws_thread = threading.Thread(target=run_ws, daemon=True)
self._ws_thread.start()
logger.info("Feishu bot started with WebSocket long connection")
logger.info("No public IP required - using WebSocket to receive events")
# Keep running until stopped
while self._running:
await asyncio.sleep(1)
async def stop(self) -> None:
"""Stop the Feishu bot."""
self._running = False
if self._ws_client:
try:
self._ws_client.stop()
except Exception as e:
logger.warning(f"Error stopping WebSocket client: {e}")
logger.info("Feishu bot stopped")
def _add_reaction_sync(self, message_id: str, emoji_type: str) -> None:
"""Sync helper for adding reaction (runs in thread pool)."""
try:
request = CreateMessageReactionRequest.builder() \
.message_id(message_id) \
.request_body(
CreateMessageReactionRequestBody.builder()
.reaction_type(Emoji.builder().emoji_type(emoji_type).build())
.build()
).build()
response = self._client.im.v1.message_reaction.create(request)
if not response.success():
logger.warning(f"Failed to add reaction: code={response.code}, msg={response.msg}")
else:
logger.debug(f"Added {emoji_type} reaction to message {message_id}")
except Exception as e:
logger.warning(f"Error adding reaction: {e}")
async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> None:
"""
Add a reaction emoji to a message (non-blocking).
Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART
"""
if not self._client or not Emoji:
return
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._add_reaction_sync, message_id, emoji_type)
# Regex to match markdown tables (header + separator + data rows)
_TABLE_RE = re.compile(
r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)",
re.MULTILINE,
)
@staticmethod
def _parse_md_table(table_text: str) -> dict | None:
"""Parse a markdown table into a Feishu table element."""
lines = [l.strip() for l in table_text.strip().split("\n") if l.strip()]
if len(lines) < 3:
return None
split = lambda l: [c.strip() for c in l.strip("|").split("|")]
headers = split(lines[0])
rows = [split(l) for l in lines[2:]]
columns = [{"tag": "column", "name": f"c{i}", "display_name": h, "width": "auto"}
for i, h in enumerate(headers)]
return {
"tag": "table",
"page_size": len(rows) + 1,
"columns": columns,
"rows": [{f"c{i}": r[i] if i < len(r) else "" for i in range(len(headers))} for r in rows],
}
def _build_card_elements(self, content: str) -> list[dict]:
"""Split content into markdown + table elements for Feishu card."""
elements, last_end = [], 0
for m in self._TABLE_RE.finditer(content):
before = content[last_end:m.start()].strip()
if before:
elements.append({"tag": "markdown", "content": before})
elements.append(self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)})
last_end = m.end()
remaining = content[last_end:].strip()
if remaining:
elements.append({"tag": "markdown", "content": remaining})
return elements or [{"tag": "markdown", "content": content}]
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Feishu."""
if not self._client:
logger.warning("Feishu client not initialized")
return
try:
# Determine receive_id_type based on chat_id format
# open_id starts with "ou_", chat_id starts with "oc_"
if msg.chat_id.startswith("oc_"):
receive_id_type = "chat_id"
else:
receive_id_type = "open_id"
# Build card with markdown + table support
elements = self._build_card_elements(msg.content)
card = {
"config": {"wide_screen_mode": True},
"elements": elements,
}
content = json.dumps(card, ensure_ascii=False)
request = CreateMessageRequest.builder() \
.receive_id_type(receive_id_type) \
.request_body(
CreateMessageRequestBody.builder()
.receive_id(msg.chat_id)
.msg_type("interactive")
.content(content)
.build()
).build()
response = self._client.im.v1.message.create(request)
if not response.success():
logger.error(
f"Failed to send Feishu message: code={response.code}, "
f"msg={response.msg}, log_id={response.get_log_id()}"
)
else:
logger.debug(f"Feishu message sent to {msg.chat_id}")
except Exception as e:
logger.error(f"Error sending Feishu message: {e}")
def _on_message_sync(self, data: "P2ImMessageReceiveV1") -> None:
"""
Sync handler for incoming messages (called from WebSocket thread).
Schedules async handling in the main event loop.
"""
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop)
async def _on_message(self, data: "P2ImMessageReceiveV1") -> None:
"""Handle incoming message from Feishu."""
try:
event = data.event
message = event.message
sender = event.sender
# Deduplication check
message_id = message.message_id
if message_id in self._processed_message_ids:
return
self._processed_message_ids[message_id] = None
# Trim cache: keep most recent 500 when exceeds 1000
while len(self._processed_message_ids) > 1000:
self._processed_message_ids.popitem(last=False)
# Skip bot messages
sender_type = sender.sender_type
if sender_type == "bot":
return
sender_id = sender.sender_id.open_id if sender.sender_id else "unknown"
chat_id = message.chat_id
chat_type = message.chat_type # "p2p" or "group"
msg_type = message.message_type
# Add reaction to indicate "seen"
await self._add_reaction(message_id, "THUMBSUP")
# Parse message content
if msg_type == "text":
try:
content = json.loads(message.content).get("text", "")
except json.JSONDecodeError:
content = message.content or ""
else:
content = MSG_TYPE_MAP.get(msg_type, f"[{msg_type}]")
if not content:
return
# Forward to message bus
reply_to = chat_id if chat_type == "group" else sender_id
await self._handle_message(
sender_id=sender_id,
chat_id=reply_to,
content=content,
metadata={
"message_id": message_id,
"chat_type": chat_type,
"msg_type": msg_type,
}
)
except Exception as e:
logger.error(f"Error processing Feishu message: {e}")

View File

@ -1,7 +1,9 @@
"""Channel manager for coordinating chat channels.""" """Channel manager for coordinating chat channels."""
from __future__ import annotations
import asyncio import asyncio
from typing import Any from typing import Any, TYPE_CHECKING
from loguru import logger from loguru import logger
@ -10,6 +12,9 @@ from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Config from nanobot.config.schema import Config
if TYPE_CHECKING:
from nanobot.session.manager import SessionManager
class ChannelManager: class ChannelManager:
""" """
@ -21,9 +26,10 @@ class ChannelManager:
- Route outbound messages - Route outbound messages
""" """
def __init__(self, config: Config, bus: MessageBus): def __init__(self, config: Config, bus: MessageBus, session_manager: "SessionManager | None" = None):
self.config = config self.config = config
self.bus = bus self.bus = bus
self.session_manager = session_manager
self.channels: dict[str, BaseChannel] = {} self.channels: dict[str, BaseChannel] = {}
self._dispatch_task: asyncio.Task | None = None self._dispatch_task: asyncio.Task | None = None
@ -40,6 +46,7 @@ class ChannelManager:
self.config.channels.telegram, self.config.channels.telegram,
self.bus, self.bus,
groq_api_key=self.config.providers.groq.api_key, groq_api_key=self.config.providers.groq.api_key,
session_manager=self.session_manager,
) )
logger.info("Telegram channel enabled") logger.info("Telegram channel enabled")
except ImportError as e: except ImportError as e:
@ -56,6 +63,50 @@ class ChannelManager:
except ImportError as e: except ImportError as e:
logger.warning(f"WhatsApp channel not available: {e}") logger.warning(f"WhatsApp channel not available: {e}")
# Discord channel
if self.config.channels.discord.enabled:
try:
from nanobot.channels.discord import DiscordChannel
self.channels["discord"] = DiscordChannel(
self.config.channels.discord, self.bus
)
logger.info("Discord channel enabled")
except ImportError as e:
logger.warning(f"Discord channel not available: {e}")
# Feishu channel
if self.config.channels.feishu.enabled:
try:
from nanobot.channels.feishu import FeishuChannel
self.channels["feishu"] = FeishuChannel(
self.config.channels.feishu, self.bus
)
logger.info("Feishu channel enabled")
except ImportError as e:
logger.warning(f"Feishu channel not available: {e}")
# DingTalk channel
if self.config.channels.dingtalk.enabled:
try:
from nanobot.channels.dingtalk import DingTalkChannel
self.channels["dingtalk"] = DingTalkChannel(
self.config.channels.dingtalk, self.bus
)
logger.info("DingTalk channel enabled")
except ImportError as e:
logger.warning(f"DingTalk channel not available: {e}")
# Email channel
if self.config.channels.email.enabled:
try:
from nanobot.channels.email import EmailChannel
self.channels["email"] = EmailChannel(
self.config.channels.email, self.bus
)
logger.info("Email channel enabled")
except ImportError as e:
logger.warning(f"Email channel not available: {e}")
# Slack channel # Slack channel
if self.config.channels.slack.enabled: if self.config.channels.slack.enabled:
try: try:
@ -67,8 +118,15 @@ class ChannelManager:
except ImportError as e: except ImportError as e:
logger.warning(f"Slack channel not available: {e}") logger.warning(f"Slack channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
try:
await channel.start()
except Exception as e:
logger.error(f"Failed to start channel {name}: {e}")
async def start_all(self) -> None: async def start_all(self) -> None:
"""Start WhatsApp channel and the outbound dispatcher.""" """Start all channels and the outbound dispatcher."""
if not self.channels: if not self.channels:
logger.warning("No channels enabled") logger.warning("No channels enabled")
return return
@ -76,11 +134,11 @@ class ChannelManager:
# Start outbound dispatcher # Start outbound dispatcher
self._dispatch_task = asyncio.create_task(self._dispatch_outbound()) self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
# Start WhatsApp channel # Start channels
tasks = [] tasks = []
for name, channel in self.channels.items(): for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...") logger.info(f"Starting {name} channel...")
tasks.append(asyncio.create_task(channel.start())) tasks.append(asyncio.create_task(self._start_channel(name, channel)))
# Wait for all to complete (they should run forever) # Wait for all to complete (they should run forever)
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)

View File

@ -1,17 +1,23 @@
"""Telegram channel implementation using python-telegram-bot.""" """Telegram channel implementation using python-telegram-bot."""
from __future__ import annotations
import asyncio import asyncio
import re import re
from typing import TYPE_CHECKING
from loguru import logger from loguru import logger
from telegram import Update from telegram import BotCommand, Update
from telegram.ext import Application, MessageHandler, filters, ContextTypes from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel from nanobot.channels.base import BaseChannel
from nanobot.config.schema import TelegramConfig from nanobot.config.schema import TelegramConfig
if TYPE_CHECKING:
from nanobot.session.manager import SessionManager
def _markdown_to_telegram_html(text: str) -> str: def _markdown_to_telegram_html(text: str) -> str:
""" """
@ -85,12 +91,27 @@ class TelegramChannel(BaseChannel):
name = "telegram" name = "telegram"
def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""): # Commands registered with Telegram's command menu
BOT_COMMANDS = [
BotCommand("start", "Start the bot"),
BotCommand("reset", "Reset conversation history"),
BotCommand("help", "Show available commands"),
]
def __init__(
self,
config: TelegramConfig,
bus: MessageBus,
groq_api_key: str = "",
session_manager: SessionManager | None = None,
):
super().__init__(config, bus) super().__init__(config, bus)
self.config: TelegramConfig = config self.config: TelegramConfig = config
self.groq_api_key = groq_api_key self.groq_api_key = groq_api_key
self.session_manager = session_manager
self._app: Application | None = None self._app: Application | None = None
self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies self._chat_ids: dict[str, int] = {} # Map sender_id to chat_id for replies
self._typing_tasks: dict[str, asyncio.Task] = {} # chat_id -> typing loop task
async def start(self) -> None: async def start(self) -> None:
"""Start the Telegram bot with long polling.""" """Start the Telegram bot with long polling."""
@ -101,11 +122,15 @@ class TelegramChannel(BaseChannel):
self._running = True self._running = True
# Build the application # Build the application
self._app = ( builder = Application.builder().token(self.config.token)
Application.builder() if self.config.proxy:
.token(self.config.token) builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
.build() self._app = builder.build()
)
# Add command handlers
self._app.add_handler(CommandHandler("start", self._on_start))
self._app.add_handler(CommandHandler("reset", self._on_reset))
self._app.add_handler(CommandHandler("help", self._on_help))
# Add message handler for text, photos, voice, documents # Add message handler for text, photos, voice, documents
self._app.add_handler( self._app.add_handler(
@ -116,20 +141,22 @@ class TelegramChannel(BaseChannel):
) )
) )
# Add /start command handler
from telegram.ext import CommandHandler
self._app.add_handler(CommandHandler("start", self._on_start))
logger.info("Starting Telegram bot (polling mode)...") logger.info("Starting Telegram bot (polling mode)...")
# Initialize and start polling # Initialize and start polling
await self._app.initialize() await self._app.initialize()
await self._app.start() await self._app.start()
# Get bot info # Get bot info and register command menu
bot_info = await self._app.bot.get_me() bot_info = await self._app.bot.get_me()
logger.info(f"Telegram bot @{bot_info.username} connected") logger.info(f"Telegram bot @{bot_info.username} connected")
try:
await self._app.bot.set_my_commands(self.BOT_COMMANDS)
logger.debug("Telegram bot commands registered")
except Exception as e:
logger.warning(f"Failed to register bot commands: {e}")
# Start polling (this runs until stopped) # Start polling (this runs until stopped)
await self._app.updater.start_polling( await self._app.updater.start_polling(
allowed_updates=["message"], allowed_updates=["message"],
@ -144,6 +171,10 @@ class TelegramChannel(BaseChannel):
"""Stop the Telegram bot.""" """Stop the Telegram bot."""
self._running = False self._running = False
# Cancel all typing indicators
for chat_id in list(self._typing_tasks):
self._stop_typing(chat_id)
if self._app: if self._app:
logger.info("Stopping Telegram bot...") logger.info("Stopping Telegram bot...")
await self._app.updater.stop() await self._app.updater.stop()
@ -157,6 +188,9 @@ class TelegramChannel(BaseChannel):
logger.warning("Telegram bot not running") logger.warning("Telegram bot not running")
return return
# Stop typing indicator for this chat
self._stop_typing(msg.chat_id)
try: try:
# chat_id should be the Telegram chat ID (integer) # chat_id should be the Telegram chat ID (integer)
chat_id = int(msg.chat_id) chat_id = int(msg.chat_id)
@ -188,9 +222,45 @@ class TelegramChannel(BaseChannel):
user = update.effective_user user = update.effective_user
await update.message.reply_text( await update.message.reply_text(
f"👋 Hi {user.first_name}! I'm nanobot.\n\n" f"👋 Hi {user.first_name}! I'm nanobot.\n\n"
"Send me a message and I'll respond!" "Send me a message and I'll respond!\n"
"Type /help to see available commands."
) )
async def _on_reset(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /reset command — clear conversation history."""
if not update.message or not update.effective_user:
return
chat_id = str(update.message.chat_id)
session_key = f"{self.name}:{chat_id}"
if self.session_manager is None:
logger.warning("/reset called but session_manager is not available")
await update.message.reply_text("⚠️ Session management is not available.")
return
session = self.session_manager.get_or_create(session_key)
msg_count = len(session.messages)
session.clear()
self.session_manager.save(session)
logger.info(f"Session reset for {session_key} (cleared {msg_count} messages)")
await update.message.reply_text("🔄 Conversation history cleared. Let's start fresh!")
async def _on_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /help command — show available commands."""
if not update.message:
return
help_text = (
"🐈 <b>nanobot commands</b>\n\n"
"/start — Start the bot\n"
"/reset — Reset conversation history\n"
"/help — Show this help message\n\n"
"Just send me a text message to chat!"
)
await update.message.reply_text(help_text, parse_mode="HTML")
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming messages (text, photos, voice, documents).""" """Handle incoming messages (text, photos, voice, documents)."""
if not update.message or not update.effective_user: if not update.message or not update.effective_user:
@ -273,10 +343,15 @@ class TelegramChannel(BaseChannel):
logger.debug(f"Telegram message from {sender_id}: {content[:50]}...") logger.debug(f"Telegram message from {sender_id}: {content[:50]}...")
str_chat_id = str(chat_id)
# Start typing indicator before processing
self._start_typing(str_chat_id)
# Forward to the message bus # Forward to the message bus
await self._handle_message( await self._handle_message(
sender_id=sender_id, sender_id=sender_id,
chat_id=str(chat_id), chat_id=str_chat_id,
content=content, content=content,
media=media_paths, media=media_paths,
metadata={ metadata={
@ -288,6 +363,29 @@ class TelegramChannel(BaseChannel):
} }
) )
def _start_typing(self, chat_id: str) -> None:
"""Start sending 'typing...' indicator for a chat."""
# Cancel any existing typing task for this chat
self._stop_typing(chat_id)
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
def _stop_typing(self, chat_id: str) -> None:
"""Stop the typing indicator for a chat."""
task = self._typing_tasks.pop(chat_id, None)
if task and not task.done():
task.cancel()
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action until cancelled."""
try:
while self._app:
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
except Exception as e:
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
def _get_extension(self, media_type: str, mime_type: str | None) -> str: def _get_extension(self, media_type: str, mime_type: str | None) -> str:
"""Get file extension based on media type.""" """Get file extension based on media type."""
if mime_type: if mime_type:

View File

@ -100,21 +100,25 @@ class WhatsAppChannel(BaseChannel):
if msg_type == "message": if msg_type == "message":
# Incoming message from WhatsApp # Incoming message from WhatsApp
# Deprecated by whatsapp: old phone number style typically: <phone>@s.whatspp.net
pn = data.get("pn", "")
# New LID sytle typically:
sender = data.get("sender", "") sender = data.get("sender", "")
content = data.get("content", "") content = data.get("content", "")
# sender is typically: <phone>@s.whatsapp.net # Extract just the phone number or lid as chat_id
# Extract just the phone number as chat_id user_id = pn if pn else sender
chat_id = sender.split("@")[0] if "@" in sender else sender sender_id = user_id.split("@")[0] if "@" in user_id else user_id
logger.info(f"Sender {sender}")
# Handle voice transcription if it's a voice message # Handle voice transcription if it's a voice message
if content == "[Voice Message]": if content == "[Voice Message]":
logger.info(f"Voice message received from {chat_id}, but direct download from bridge is not yet supported.") logger.info(f"Voice message received from {sender_id}, but direct download from bridge is not yet supported.")
content = "[Voice Message: Transcription not available for WhatsApp yet]" content = "[Voice Message: Transcription not available for WhatsApp yet]"
await self._handle_message( await self._handle_message(
sender_id=chat_id, sender_id=sender_id,
chat_id=sender, # Use full JID for replies chat_id=sender, # Use full LID for replies
content=content, content=content,
metadata={ metadata={
"message_id": data.get("id"), "message_id": data.get("id"),

View File

@ -1,11 +1,19 @@
"""CLI commands for nanobot.""" """CLI commands for nanobot."""
import asyncio import asyncio
import atexit
import os
import signal
from pathlib import Path from pathlib import Path
import select
import sys
import typer import typer
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table from rich.table import Table
from rich.text import Text
from nanobot import __version__, __logo__ from nanobot import __version__, __logo__
@ -16,6 +24,146 @@ app = typer.Typer(
) )
console = Console() console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
# ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush
# ---------------------------------------------------------------------------
_READLINE = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
def _flush_pending_tty_input() -> None:
"""Drop unread keypresses typed while the model was generating output."""
try:
fd = sys.stdin.fileno()
if not os.isatty(fd):
return
except Exception:
return
try:
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
pass
try:
while True:
ready, _, _ = select.select([fd], [], [], 0)
if not ready:
break
if not os.read(fd, 4096):
break
except Exception:
return
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
def _restore_terminal() -> None:
"""Restore terminal to its original state (echo, line buffering, etc.)."""
if _SAVED_TERM_ATTRS is None:
return
try:
import termios
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, _SAVED_TERM_ATTRS)
except Exception:
pass
def _enable_line_editing() -> None:
"""Enable readline for arrow keys, line editing, and persistent history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS
# Save terminal state before readline touches it
try:
import termios
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
except Exception:
pass
history_file = Path.home() / ".nanobot" / "history" / "cli_history"
history_file.parent.mkdir(parents=True, exist_ok=True)
_HISTORY_FILE = history_file
try:
import readline
except ImportError:
return
_READLINE = readline
_USING_LIBEDIT = "libedit" in (readline.__doc__ or "").lower()
try:
if _USING_LIBEDIT:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
readline.parse_and_bind("set editing-mode emacs")
except Exception:
pass
try:
readline.read_history_file(str(history_file))
except Exception:
pass
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
def _prompt_text() -> str:
"""Build a readline-friendly colored prompt."""
if _READLINE is None:
return "You: "
# libedit on macOS does not honor GNU readline non-printing markers.
if _USING_LIBEDIT:
return "\033[1;34mYou:\033[0m "
return "\001\033[1;34m\002You:\001\033[0m\002 "
def _print_agent_response(response: str, render_markdown: bool) -> None:
"""Render assistant response with consistent terminal styling."""
content = response or ""
body = Markdown(content) if render_markdown else Text(content)
console.print()
console.print(
Panel(
body,
title=f"{__logo__} nanobot",
title_align="left",
border_style="cyan",
padding=(0, 1),
)
)
console.print()
def _is_exit_command(command: str) -> bool:
"""Return True when input should end interactive chat."""
return command.lower() in EXIT_COMMANDS
async def _read_interactive_input_async() -> str:
"""Read user input with arrow keys and history (runs input() in a thread)."""
try:
return await asyncio.to_thread(input, _prompt_text())
except EOFError as exc:
raise KeyboardInterrupt from exc
def version_callback(value: bool): def version_callback(value: bool):
@ -147,6 +295,24 @@ This file stores important information that should persist across sessions.
console.print(" [dim]Created memory/MEMORY.md[/dim]") console.print(" [dim]Created memory/MEMORY.md[/dim]")
def _make_provider(config):
"""Create LiteLLMProvider from config. Exits if no API key found."""
from nanobot.providers.litellm_provider import LiteLLMProvider
p = config.get_provider()
model = config.agents.defaults.model
if not (p and p.api_key) and not model.startswith("bedrock/"):
console.print("[red]Error: No API key configured.[/red]")
console.print("Set one in ~/.nanobot/config.json under providers section")
raise typer.Exit(1)
return LiteLLMProvider(
api_key=p.api_key if p else None,
api_base=config.get_api_base(),
default_model=model,
extra_headers=p.extra_headers if p else None,
provider_name=config.get_provider_name(),
)
# ============================================================================ # ============================================================================
# Gateway / Server # Gateway / Server
# ============================================================================ # ============================================================================
@ -160,9 +326,9 @@ def gateway(
"""Start the nanobot gateway.""" """Start the nanobot gateway."""
from nanobot.config.loader import load_config, get_data_dir from nanobot.config.loader import load_config, get_data_dir
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from nanobot.channels.manager import ChannelManager from nanobot.channels.manager import ChannelManager
from nanobot.session.manager import SessionManager
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService from nanobot.heartbeat.service import HeartbeatService
@ -174,28 +340,15 @@ def gateway(
console.print(f"{__logo__} Starting nanobot gateway on port {port}...") console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
config = load_config() config = load_config()
# Create components
bus = MessageBus() bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
# Create provider (supports OpenRouter, Anthropic, OpenAI, Bedrock) # Create cron service first (callback set after agent creation)
api_key = config.get_api_key() cron_store_path = get_data_dir() / "cron" / "jobs.json"
api_base = config.get_api_base() cron = CronService(cron_store_path)
model = config.agents.defaults.model
is_bedrock = model.startswith("bedrock/")
if not api_key and not is_bedrock:
console.print("[red]Error: No API key configured.[/red]")
console.print("Set one in ~/.nanobot/config.json under providers.openrouter.apiKey")
raise typer.Exit(1)
provider = LiteLLMProvider( # Create agent with cron service
api_key=api_key,
api_base=api_base,
default_model=config.agents.defaults.model
)
# Create agent
agent = AgentLoop( agent = AgentLoop(
bus=bus, bus=bus,
provider=provider, provider=provider,
@ -204,27 +357,29 @@ def gateway(
max_iterations=config.agents.defaults.max_tool_iterations, max_iterations=config.agents.defaults.max_tool_iterations,
brave_api_key=config.tools.web.search.api_key or None, brave_api_key=config.tools.web.search.api_key or None,
exec_config=config.tools.exec, exec_config=config.tools.exec,
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
) )
# Create cron service # Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None: async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent.""" """Execute a cron job through the agent."""
response = await agent.process_direct( response = await agent.process_direct(
job.payload.message, job.payload.message,
session_key=f"cron:{job.id}" session_key=f"cron:{job.id}",
channel=job.payload.channel or "cli",
chat_id=job.payload.to or "direct",
) )
# Optionally deliver to channel
if job.payload.deliver and job.payload.to: if job.payload.deliver and job.payload.to:
from nanobot.bus.events import OutboundMessage from nanobot.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage( await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "whatsapp", channel=job.payload.channel or "cli",
chat_id=job.payload.to, chat_id=job.payload.to,
content=response or "" content=response or ""
)) ))
return response return response
cron.on_job = on_cron_job
cron_store_path = get_data_dir() / "cron" / "jobs.json"
cron = CronService(cron_store_path, on_job=on_cron_job)
# Create heartbeat service # Create heartbeat service
async def on_heartbeat(prompt: str) -> str: async def on_heartbeat(prompt: str) -> str:
@ -239,7 +394,7 @@ def gateway(
) )
# Create channel manager # Create channel manager
channels = ChannelManager(config, bus) channels = ChannelManager(config, bus, session_manager=session_manager)
if channels.enabled_channels: if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}") console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
@ -281,30 +436,24 @@ def gateway(
def agent( def agent(
message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"), message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"),
session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"), session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"),
markdown: bool = typer.Option(True, "--markdown/--no-markdown", help="Render assistant output as Markdown"),
logs: bool = typer.Option(False, "--logs/--no-logs", help="Show nanobot runtime logs during chat"),
): ):
"""Interact with the agent directly.""" """Interact with the agent directly."""
from nanobot.config.loader import load_config from nanobot.config.loader import load_config
from nanobot.bus.queue import MessageBus from nanobot.bus.queue import MessageBus
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.agent.loop import AgentLoop from nanobot.agent.loop import AgentLoop
from loguru import logger
config = load_config() config = load_config()
api_key = config.get_api_key()
api_base = config.get_api_base()
model = config.agents.defaults.model
is_bedrock = model.startswith("bedrock/")
if not api_key and not is_bedrock:
console.print("[red]Error: No API key configured.[/red]")
raise typer.Exit(1)
bus = MessageBus() bus = MessageBus()
provider = LiteLLMProvider( provider = _make_provider(config)
api_key=api_key,
api_base=api_base, if logs:
default_model=config.agents.defaults.model logger.enable("nanobot")
) else:
logger.disable("nanobot")
agent_loop = AgentLoop( agent_loop = AgentLoop(
bus=bus, bus=bus,
@ -312,29 +461,65 @@ def agent(
workspace=config.workspace_path, workspace=config.workspace_path,
brave_api_key=config.tools.web.search.api_key or None, brave_api_key=config.tools.web.search.api_key or None,
exec_config=config.tools.exec, exec_config=config.tools.exec,
restrict_to_workspace=config.tools.restrict_to_workspace,
) )
# Show spinner when logs are off (no output to miss); skip when logs are on
def _thinking_ctx():
if logs:
from contextlib import nullcontext
return nullcontext()
return console.status("[dim]nanobot is thinking...[/dim]", spinner="dots")
if message: if message:
# Single message mode # Single message mode
async def run_once(): async def run_once():
response = await agent_loop.process_direct(message, session_id) with _thinking_ctx():
console.print(f"\n{__logo__} {response}") response = await agent_loop.process_direct(message, session_id)
_print_agent_response(response, render_markdown=markdown)
asyncio.run(run_once()) asyncio.run(run_once())
else: else:
# Interactive mode # Interactive mode
console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n") _enable_line_editing()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
# input() runs in a worker thread that can't be cancelled.
# Without this handler, asyncio.run() would hang waiting for it.
def _exit_on_sigint(signum, frame):
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive(): async def run_interactive():
while True: while True:
try: try:
user_input = console.input("[bold blue]You:[/bold blue] ") _flush_pending_tty_input()
if not user_input.strip(): user_input = await _read_interactive_input_async()
command = user_input.strip()
if not command:
continue continue
if _is_exit_command(command):
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
response = await agent_loop.process_direct(user_input, session_id) with _thinking_ctx():
console.print(f"\n{__logo__} {response}\n") response = await agent_loop.process_direct(user_input, session_id)
_print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt: except KeyboardInterrupt:
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
except EOFError:
_save_history()
_restore_terminal()
console.print("\nGoodbye!") console.print("\nGoodbye!")
break break
@ -370,6 +555,13 @@ def channels_status():
wa.bridge_url wa.bridge_url
) )
dc = config.channels.discord
table.add_row(
"Discord",
"" if dc.enabled else "",
dc.gateway_url
)
# Telegram # Telegram
tg = config.channels.telegram tg = config.channels.telegram
tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]" tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
@ -644,21 +836,24 @@ def status():
console.print(f"Workspace: {workspace} {'[green]✓[/green]' if workspace.exists() else '[red]✗[/red]'}") console.print(f"Workspace: {workspace} {'[green]✓[/green]' if workspace.exists() else '[red]✗[/red]'}")
if config_path.exists(): if config_path.exists():
from nanobot.providers.registry import PROVIDERS
console.print(f"Model: {config.agents.defaults.model}") console.print(f"Model: {config.agents.defaults.model}")
# Check API keys # Check API keys from registry
has_openrouter = bool(config.providers.openrouter.api_key) for spec in PROVIDERS:
has_anthropic = bool(config.providers.anthropic.api_key) p = getattr(config.providers, spec.name, None)
has_openai = bool(config.providers.openai.api_key) if p is None:
has_gemini = bool(config.providers.gemini.api_key) continue
has_vllm = bool(config.providers.vllm.api_base) if spec.is_local:
# Local deployments show api_base instead of api_key
console.print(f"OpenRouter API: {'[green]✓[/green]' if has_openrouter else '[dim]not set[/dim]'}") if p.api_base:
console.print(f"Anthropic API: {'[green]✓[/green]' if has_anthropic else '[dim]not set[/dim]'}") console.print(f"{spec.label}: [green]✓ {p.api_base}[/green]")
console.print(f"OpenAI API: {'[green]✓[/green]' if has_openai else '[dim]not set[/dim]'}") else:
console.print(f"Gemini API: {'[green]✓[/green]' if has_gemini else '[dim]not set[/dim]'}") console.print(f"{spec.label}: [dim]not set[/dim]")
vllm_status = f"[green]✓ {config.providers.vllm.api_base}[/green]" if has_vllm else "[dim]not set[/dim]" else:
console.print(f"vLLM/Local: {vllm_status}") has_key = bool(p.api_key)
console.print(f"{spec.label}: {'[green]✓[/green]' if has_key else '[dim]not set[/dim]'}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -34,6 +34,7 @@ def load_config(config_path: Path | None = None) -> Config:
try: try:
with open(path) as f: with open(path) as f:
data = json.load(f) data = json.load(f)
data = _migrate_config(data)
return Config.model_validate(convert_keys(data)) return Config.model_validate(convert_keys(data))
except (json.JSONDecodeError, ValueError) as e: except (json.JSONDecodeError, ValueError) as e:
print(f"Warning: Failed to load config from {path}: {e}") print(f"Warning: Failed to load config from {path}: {e}")
@ -61,6 +62,16 @@ def save_config(config: Config, config_path: Path | None = None) -> None:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
def _migrate_config(data: dict) -> dict:
"""Migrate old config formats to current."""
# Move tools.exec.restrictToWorkspace → tools.restrictToWorkspace
tools = data.get("tools", {})
exec_cfg = tools.get("exec", {})
if "restrictToWorkspace" in exec_cfg and "restrictToWorkspace" not in tools:
tools["restrictToWorkspace"] = exec_cfg.pop("restrictToWorkspace")
return data
def convert_keys(data: Any) -> Any: def convert_keys(data: Any) -> Any:
"""Convert camelCase keys to snake_case for Pydantic.""" """Convert camelCase keys to snake_case for Pydantic."""
if isinstance(data, dict): if isinstance(data, dict):

View File

@ -17,6 +17,64 @@ class TelegramConfig(BaseModel):
enabled: bool = False enabled: bool = False
token: str = "" # Bot token from @BotFather token: str = "" # Bot token from @BotFather
allow_from: list[str] = Field(default_factory=list) # Allowed user IDs or usernames allow_from: list[str] = Field(default_factory=list) # Allowed user IDs or usernames
proxy: str | None = None # HTTP/SOCKS5 proxy URL, e.g. "http://127.0.0.1:7890" or "socks5://127.0.0.1:1080"
class FeishuConfig(BaseModel):
"""Feishu/Lark channel configuration using WebSocket long connection."""
enabled: bool = False
app_id: str = "" # App ID from Feishu Open Platform
app_secret: str = "" # App Secret from Feishu Open Platform
encrypt_key: str = "" # Encrypt Key for event subscription (optional)
verification_token: str = "" # Verification Token for event subscription (optional)
allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids
class DingTalkConfig(BaseModel):
"""DingTalk channel configuration using Stream mode."""
enabled: bool = False
client_id: str = "" # AppKey
client_secret: str = "" # AppSecret
allow_from: list[str] = Field(default_factory=list) # Allowed staff_ids
class DiscordConfig(BaseModel):
"""Discord channel configuration."""
enabled: bool = False
token: str = "" # Bot token from Discord Developer Portal
allow_from: list[str] = Field(default_factory=list) # Allowed user IDs
gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
class EmailConfig(BaseModel):
"""Email channel configuration (IMAP inbound + SMTP outbound)."""
enabled: bool = False
consent_granted: bool = False # Explicit owner permission to access mailbox data
# IMAP (receive)
imap_host: str = ""
imap_port: int = 993
imap_username: str = ""
imap_password: str = ""
imap_mailbox: str = "INBOX"
imap_use_ssl: bool = True
# SMTP (send)
smtp_host: str = ""
smtp_port: int = 587
smtp_username: str = ""
smtp_password: str = ""
smtp_use_tls: bool = True
smtp_use_ssl: bool = False
from_address: str = ""
# Behavior
auto_reply_enabled: bool = True # If false, inbound email is read but no automatic reply is sent
poll_interval_seconds: int = 30
mark_seen: bool = True
max_body_chars: int = 12000
subject_prefix: str = "Re: "
allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
class SlackDMConfig(BaseModel): class SlackDMConfig(BaseModel):
@ -43,6 +101,10 @@ class ChannelsConfig(BaseModel):
"""Configuration for chat channels.""" """Configuration for chat channels."""
whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig) whatsapp: WhatsAppConfig = Field(default_factory=WhatsAppConfig)
telegram: TelegramConfig = Field(default_factory=TelegramConfig) telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
email: EmailConfig = Field(default_factory=EmailConfig)
slack: SlackConfig = Field(default_factory=SlackConfig) slack: SlackConfig = Field(default_factory=SlackConfig)
@ -64,6 +126,7 @@ class ProviderConfig(BaseModel):
"""LLM provider configuration.""" """LLM provider configuration."""
api_key: str = "" api_key: str = ""
api_base: str | None = None api_base: str | None = None
extra_headers: dict[str, str] | None = None # Custom headers (e.g. APP-Code for AiHubMix)
class ProvidersConfig(BaseModel): class ProvidersConfig(BaseModel):
@ -71,10 +134,14 @@ class ProvidersConfig(BaseModel):
anthropic: ProviderConfig = Field(default_factory=ProviderConfig) anthropic: ProviderConfig = Field(default_factory=ProviderConfig)
openai: ProviderConfig = Field(default_factory=ProviderConfig) openai: ProviderConfig = Field(default_factory=ProviderConfig)
openrouter: ProviderConfig = Field(default_factory=ProviderConfig) openrouter: ProviderConfig = Field(default_factory=ProviderConfig)
deepseek: ProviderConfig = Field(default_factory=ProviderConfig)
groq: ProviderConfig = Field(default_factory=ProviderConfig) groq: ProviderConfig = Field(default_factory=ProviderConfig)
zhipu: ProviderConfig = Field(default_factory=ProviderConfig) zhipu: ProviderConfig = Field(default_factory=ProviderConfig)
dashscope: ProviderConfig = Field(default_factory=ProviderConfig) # 阿里云通义千问
vllm: ProviderConfig = Field(default_factory=ProviderConfig) vllm: ProviderConfig = Field(default_factory=ProviderConfig)
gemini: ProviderConfig = Field(default_factory=ProviderConfig) gemini: ProviderConfig = Field(default_factory=ProviderConfig)
moonshot: ProviderConfig = Field(default_factory=ProviderConfig)
aihubmix: ProviderConfig = Field(default_factory=ProviderConfig) # AiHubMix API gateway
class GatewayConfig(BaseModel): class GatewayConfig(BaseModel):
@ -97,13 +164,13 @@ class WebToolsConfig(BaseModel):
class ExecToolConfig(BaseModel): class ExecToolConfig(BaseModel):
"""Shell exec tool configuration.""" """Shell exec tool configuration."""
timeout: int = 60 timeout: int = 60
restrict_to_workspace: bool = False # If true, block commands accessing paths outside workspace
class ToolsConfig(BaseModel): class ToolsConfig(BaseModel):
"""Tools configuration.""" """Tools configuration."""
web: WebToolsConfig = Field(default_factory=WebToolsConfig) web: WebToolsConfig = Field(default_factory=WebToolsConfig)
exec: ExecToolConfig = Field(default_factory=ExecToolConfig) exec: ExecToolConfig = Field(default_factory=ExecToolConfig)
restrict_to_workspace: bool = False # If true, restrict all tool access to workspace directory
class Config(BaseSettings): class Config(BaseSettings):
@ -119,27 +186,52 @@ class Config(BaseSettings):
"""Get expanded workspace path.""" """Get expanded workspace path."""
return Path(self.agents.defaults.workspace).expanduser() return Path(self.agents.defaults.workspace).expanduser()
def get_api_key(self) -> str | None: def _match_provider(self, model: str | None = None) -> tuple["ProviderConfig | None", str | None]:
"""Get API key in priority order: OpenRouter > Anthropic > OpenAI > Gemini > Zhipu > Groq > vLLM.""" """Match provider config and its registry name. Returns (config, spec_name)."""
return ( from nanobot.providers.registry import PROVIDERS
self.providers.openrouter.api_key or model_lower = (model or self.agents.defaults.model).lower()
self.providers.anthropic.api_key or
self.providers.openai.api_key or # Match by keyword (order follows PROVIDERS registry)
self.providers.gemini.api_key or for spec in PROVIDERS:
self.providers.zhipu.api_key or p = getattr(self.providers, spec.name, None)
self.providers.groq.api_key or if p and any(kw in model_lower for kw in spec.keywords) and p.api_key:
self.providers.vllm.api_key or return p, spec.name
None
) # Fallback: gateways first, then others (follows registry order)
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
if p and p.api_key:
return p, spec.name
return None, None
def get_provider(self, model: str | None = None) -> ProviderConfig | None:
"""Get matched provider config (api_key, api_base, extra_headers). Falls back to first available."""
p, _ = self._match_provider(model)
return p
def get_provider_name(self, model: str | None = None) -> str | None:
"""Get the registry name of the matched provider (e.g. "deepseek", "openrouter")."""
_, name = self._match_provider(model)
return name
def get_api_key(self, model: str | None = None) -> str | None:
"""Get API key for the given model. Falls back to first available key."""
p = self.get_provider(model)
return p.api_key if p else None
def get_api_base(self) -> str | None: def get_api_base(self, model: str | None = None) -> str | None:
"""Get API base URL if using OpenRouter, Zhipu or vLLM.""" """Get API base URL for the given model. Applies default URLs for known gateways."""
if self.providers.openrouter.api_key: from nanobot.providers.registry import find_by_name
return self.providers.openrouter.api_base or "https://openrouter.ai/api/v1" p, name = self._match_provider(model)
if self.providers.zhipu.api_key: if p and p.api_base:
return self.providers.zhipu.api_base return p.api_base
if self.providers.vllm.api_base: # Only gateways get a default api_base here. Standard providers
return self.providers.vllm.api_base # (like Moonshot) set their base URL via env vars in _setup_env
# to avoid polluting the global litellm.api_base.
if name:
spec = find_by_name(name)
if spec and spec.is_gateway and spec.default_api_base:
return spec.default_api_base
return None return None
class Config: class Config:

View File

@ -20,6 +20,7 @@ class LLMResponse:
tool_calls: list[ToolCallRequest] = field(default_factory=list) tool_calls: list[ToolCallRequest] = field(default_factory=list)
finish_reason: str = "stop" finish_reason: str = "stop"
usage: dict[str, int] = field(default_factory=dict) usage: dict[str, int] = field(default_factory=dict)
reasoning_content: str | None = None # Kimi, DeepSeek-R1 etc.
@property @property
def has_tool_calls(self) -> bool: def has_tool_calls(self) -> bool:

View File

@ -1,5 +1,6 @@
"""LiteLLM provider implementation for multi-provider support.""" """LiteLLM provider implementation for multi-provider support."""
import json
import os import os
from typing import Any from typing import Any
@ -7,6 +8,7 @@ import litellm
from litellm import acompletion from litellm import acompletion
from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest from nanobot.providers.base import LLMProvider, LLMResponse, ToolCallRequest
from nanobot.providers.registry import find_by_model, find_gateway
class LiteLLMProvider(LLMProvider): class LiteLLMProvider(LLMProvider):
@ -14,51 +16,88 @@ class LiteLLMProvider(LLMProvider):
LLM provider using LiteLLM for multi-provider support. LLM provider using LiteLLM for multi-provider support.
Supports OpenRouter, Anthropic, OpenAI, Gemini, and many other providers through Supports OpenRouter, Anthropic, OpenAI, Gemini, and many other providers through
a unified interface. a unified interface. Provider-specific logic is driven by the registry
(see providers/registry.py) no if-elif chains needed here.
""" """
def __init__( def __init__(
self, self,
api_key: str | None = None, api_key: str | None = None,
api_base: str | None = None, api_base: str | None = None,
default_model: str = "anthropic/claude-opus-4-5" default_model: str = "anthropic/claude-opus-4-5",
extra_headers: dict[str, str] | None = None,
provider_name: str | None = None,
): ):
super().__init__(api_key, api_base) super().__init__(api_key, api_base)
self.default_model = default_model self.default_model = default_model
self.extra_headers = extra_headers or {}
# Detect OpenRouter by api_key prefix or explicit api_base # Detect gateway / local deployment.
self.is_openrouter = ( # provider_name (from config key) is the primary signal;
(api_key and api_key.startswith("sk-or-")) or # api_key / api_base are fallback for auto-detection.
(api_base and "openrouter" in api_base) self._gateway = find_gateway(provider_name, api_key, api_base)
)
# Track if using custom endpoint (vLLM, etc.) # Configure environment variables
self.is_vllm = bool(api_base) and not self.is_openrouter
# Configure LiteLLM based on provider
if api_key: if api_key:
if self.is_openrouter: self._setup_env(api_key, api_base, default_model)
# OpenRouter mode - set key
os.environ["OPENROUTER_API_KEY"] = api_key
elif self.is_vllm:
# vLLM/custom endpoint - uses OpenAI-compatible API
os.environ["OPENAI_API_KEY"] = api_key
elif "anthropic" in default_model:
os.environ.setdefault("ANTHROPIC_API_KEY", api_key)
elif "openai" in default_model or "gpt" in default_model:
os.environ.setdefault("OPENAI_API_KEY", api_key)
elif "gemini" in default_model.lower():
os.environ.setdefault("GEMINI_API_KEY", api_key)
elif "zhipu" in default_model or "glm" in default_model or "zai" in default_model:
os.environ.setdefault("ZHIPUAI_API_KEY", api_key)
elif "groq" in default_model:
os.environ.setdefault("GROQ_API_KEY", api_key)
if api_base: if api_base:
litellm.api_base = api_base litellm.api_base = api_base
# Disable LiteLLM logging noise # Disable LiteLLM logging noise
litellm.suppress_debug_info = True litellm.suppress_debug_info = True
# Drop unsupported parameters for providers (e.g., gpt-5 rejects some params)
litellm.drop_params = True
def _setup_env(self, api_key: str, api_base: str | None, model: str) -> None:
"""Set environment variables based on detected provider."""
spec = self._gateway or find_by_model(model)
if not spec:
return
# Gateway/local overrides existing env; standard provider doesn't
if self._gateway:
os.environ[spec.env_key] = api_key
else:
os.environ.setdefault(spec.env_key, api_key)
# Resolve env_extras placeholders:
# {api_key} → user's API key
# {api_base} → user's api_base, falling back to spec.default_api_base
effective_base = api_base or spec.default_api_base
for env_name, env_val in spec.env_extras:
resolved = env_val.replace("{api_key}", api_key)
resolved = resolved.replace("{api_base}", effective_base)
os.environ.setdefault(env_name, resolved)
def _resolve_model(self, model: str) -> str:
"""Resolve model name by applying provider/gateway prefixes."""
if self._gateway:
# Gateway mode: apply gateway prefix, skip provider-specific prefixes
prefix = self._gateway.litellm_prefix
if self._gateway.strip_model_prefix:
model = model.split("/")[-1]
if prefix and not model.startswith(f"{prefix}/"):
model = f"{prefix}/{model}"
return model
# Standard mode: auto-prefix for known providers
spec = find_by_model(model)
if spec and spec.litellm_prefix:
if not any(model.startswith(s) for s in spec.skip_prefixes):
model = f"{spec.litellm_prefix}/{model}"
return model
def _apply_model_overrides(self, model: str, kwargs: dict[str, Any]) -> None:
"""Apply model-specific parameter overrides from the registry."""
model_lower = model.lower()
spec = find_by_model(model)
if spec:
for pattern, overrides in spec.model_overrides:
if pattern in model_lower:
kwargs.update(overrides)
return
async def chat( async def chat(
self, self,
@ -81,29 +120,7 @@ class LiteLLMProvider(LLMProvider):
Returns: Returns:
LLMResponse with content and/or tool calls. LLMResponse with content and/or tool calls.
""" """
model = model or self.default_model model = self._resolve_model(model or self.default_model)
# For OpenRouter, prefix model name if not already prefixed
if self.is_openrouter and not model.startswith("openrouter/"):
model = f"openrouter/{model}"
# For Zhipu/Z.ai, ensure prefix is present
# Handle cases like "glm-4.7-flash" -> "zai/glm-4.7-flash"
if ("glm" in model.lower() or "zhipu" in model.lower()) and not (
model.startswith("zhipu/") or
model.startswith("zai/") or
model.startswith("openrouter/")
):
model = f"zai/{model}"
# For vLLM, use hosted_vllm/ prefix per LiteLLM docs
# Convert openai/ prefix to hosted_vllm/ if user specified it
if self.is_vllm:
model = f"hosted_vllm/{model}"
# For Gemini, ensure gemini/ prefix if not already present
if "gemini" in model.lower() and not model.startswith("gemini/"):
model = f"gemini/{model}"
kwargs: dict[str, Any] = { kwargs: dict[str, Any] = {
"model": model, "model": model,
@ -112,10 +129,17 @@ class LiteLLMProvider(LLMProvider):
"temperature": temperature, "temperature": temperature,
} }
# Pass api_base directly for custom endpoints (vLLM, etc.) # Apply model-specific overrides (e.g. kimi-k2.5 temperature)
self._apply_model_overrides(model, kwargs)
# Pass api_base for custom endpoints
if self.api_base: if self.api_base:
kwargs["api_base"] = self.api_base kwargs["api_base"] = self.api_base
# Pass extra headers (e.g. APP-Code for AiHubMix)
if self.extra_headers:
kwargs["extra_headers"] = self.extra_headers
if tools: if tools:
kwargs["tools"] = tools kwargs["tools"] = tools
kwargs["tool_choice"] = "auto" kwargs["tool_choice"] = "auto"
@ -141,7 +165,6 @@ class LiteLLMProvider(LLMProvider):
# Parse arguments from JSON string if needed # Parse arguments from JSON string if needed
args = tc.function.arguments args = tc.function.arguments
if isinstance(args, str): if isinstance(args, str):
import json
try: try:
args = json.loads(args) args = json.loads(args)
except json.JSONDecodeError: except json.JSONDecodeError:
@ -161,11 +184,14 @@ class LiteLLMProvider(LLMProvider):
"total_tokens": response.usage.total_tokens, "total_tokens": response.usage.total_tokens,
} }
reasoning_content = getattr(message, "reasoning_content", None)
return LLMResponse( return LLMResponse(
content=message.content, content=message.content,
tool_calls=tool_calls, tool_calls=tool_calls,
finish_reason=choice.finish_reason or "stop", finish_reason=choice.finish_reason or "stop",
usage=usage, usage=usage,
reasoning_content=reasoning_content,
) )
def get_default_model(self) -> str: def get_default_model(self) -> str:

View File

@ -0,0 +1,340 @@
"""
Provider Registry single source of truth for LLM provider metadata.
Adding a new provider:
1. Add a ProviderSpec to PROVIDERS below.
2. Add a field to ProvidersConfig in config/schema.py.
Done. Env vars, prefixing, config matching, status display all derive from here.
Order matters it controls match priority and fallback. Gateways first.
Every entry writes out all fields so you can copy-paste as a template.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ProviderSpec:
"""One LLM provider's metadata. See PROVIDERS below for real examples.
Placeholders in env_extras values:
{api_key} the user's API key
{api_base} api_base from config, or this spec's default_api_base
"""
# identity
name: str # config field name, e.g. "dashscope"
keywords: tuple[str, ...] # model-name keywords for matching (lowercase)
env_key: str # LiteLLM env var, e.g. "DASHSCOPE_API_KEY"
display_name: str = "" # shown in `nanobot status`
# model prefixing
litellm_prefix: str = "" # "dashscope" → model becomes "dashscope/{model}"
skip_prefixes: tuple[str, ...] = () # don't prefix if model already starts with these
# extra env vars, e.g. (("ZHIPUAI_API_KEY", "{api_key}"),)
env_extras: tuple[tuple[str, str], ...] = ()
# gateway / local detection
is_gateway: bool = False # routes any model (OpenRouter, AiHubMix)
is_local: bool = False # local deployment (vLLM, Ollama)
detect_by_key_prefix: str = "" # match api_key prefix, e.g. "sk-or-"
detect_by_base_keyword: str = "" # match substring in api_base URL
default_api_base: str = "" # fallback base URL
# gateway behavior
strip_model_prefix: bool = False # strip "provider/" before re-prefixing
# per-model param overrides, e.g. (("kimi-k2.5", {"temperature": 1.0}),)
model_overrides: tuple[tuple[str, dict[str, Any]], ...] = ()
@property
def label(self) -> str:
return self.display_name or self.name.title()
# ---------------------------------------------------------------------------
# PROVIDERS — the registry. Order = priority. Copy any entry as template.
# ---------------------------------------------------------------------------
PROVIDERS: tuple[ProviderSpec, ...] = (
# === Gateways (detected by api_key / api_base, not model name) =========
# Gateways can route any model, so they win in fallback.
# OpenRouter: global gateway, keys start with "sk-or-"
ProviderSpec(
name="openrouter",
keywords=("openrouter",),
env_key="OPENROUTER_API_KEY",
display_name="OpenRouter",
litellm_prefix="openrouter", # claude-3 → openrouter/claude-3
skip_prefixes=(),
env_extras=(),
is_gateway=True,
is_local=False,
detect_by_key_prefix="sk-or-",
detect_by_base_keyword="openrouter",
default_api_base="https://openrouter.ai/api/v1",
strip_model_prefix=False,
model_overrides=(),
),
# AiHubMix: global gateway, OpenAI-compatible interface.
# strip_model_prefix=True: it doesn't understand "anthropic/claude-3",
# so we strip to bare "claude-3" then re-prefix as "openai/claude-3".
ProviderSpec(
name="aihubmix",
keywords=("aihubmix",),
env_key="OPENAI_API_KEY", # OpenAI-compatible
display_name="AiHubMix",
litellm_prefix="openai", # → openai/{model}
skip_prefixes=(),
env_extras=(),
is_gateway=True,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="aihubmix",
default_api_base="https://aihubmix.com/v1",
strip_model_prefix=True, # anthropic/claude-3 → claude-3 → openai/claude-3
model_overrides=(),
),
# === Standard providers (matched by model-name keywords) ===============
# Anthropic: LiteLLM recognizes "claude-*" natively, no prefix needed.
ProviderSpec(
name="anthropic",
keywords=("anthropic", "claude"),
env_key="ANTHROPIC_API_KEY",
display_name="Anthropic",
litellm_prefix="",
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# OpenAI: LiteLLM recognizes "gpt-*" natively, no prefix needed.
ProviderSpec(
name="openai",
keywords=("openai", "gpt"),
env_key="OPENAI_API_KEY",
display_name="OpenAI",
litellm_prefix="",
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# DeepSeek: needs "deepseek/" prefix for LiteLLM routing.
ProviderSpec(
name="deepseek",
keywords=("deepseek",),
env_key="DEEPSEEK_API_KEY",
display_name="DeepSeek",
litellm_prefix="deepseek", # deepseek-chat → deepseek/deepseek-chat
skip_prefixes=("deepseek/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Gemini: needs "gemini/" prefix for LiteLLM.
ProviderSpec(
name="gemini",
keywords=("gemini",),
env_key="GEMINI_API_KEY",
display_name="Gemini",
litellm_prefix="gemini", # gemini-pro → gemini/gemini-pro
skip_prefixes=("gemini/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Zhipu: LiteLLM uses "zai/" prefix.
# Also mirrors key to ZHIPUAI_API_KEY (some LiteLLM paths check that).
# skip_prefixes: don't add "zai/" when already routed via gateway.
ProviderSpec(
name="zhipu",
keywords=("zhipu", "glm", "zai"),
env_key="ZAI_API_KEY",
display_name="Zhipu AI",
litellm_prefix="zai", # glm-4 → zai/glm-4
skip_prefixes=("zhipu/", "zai/", "openrouter/", "hosted_vllm/"),
env_extras=(
("ZHIPUAI_API_KEY", "{api_key}"),
),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# DashScope: Qwen models, needs "dashscope/" prefix.
ProviderSpec(
name="dashscope",
keywords=("qwen", "dashscope"),
env_key="DASHSCOPE_API_KEY",
display_name="DashScope",
litellm_prefix="dashscope", # qwen-max → dashscope/qwen-max
skip_prefixes=("dashscope/", "openrouter/"),
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
# Moonshot: Kimi models, needs "moonshot/" prefix.
# LiteLLM requires MOONSHOT_API_BASE env var to find the endpoint.
# Kimi K2.5 API enforces temperature >= 1.0.
ProviderSpec(
name="moonshot",
keywords=("moonshot", "kimi"),
env_key="MOONSHOT_API_KEY",
display_name="Moonshot",
litellm_prefix="moonshot", # kimi-k2.5 → moonshot/kimi-k2.5
skip_prefixes=("moonshot/", "openrouter/"),
env_extras=(
("MOONSHOT_API_BASE", "{api_base}"),
),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="https://api.moonshot.ai/v1", # intl; use api.moonshot.cn for China
strip_model_prefix=False,
model_overrides=(
("kimi-k2.5", {"temperature": 1.0}),
),
),
# === Local deployment (matched by config key, NOT by api_base) =========
# vLLM / any OpenAI-compatible local server.
# Detected when config key is "vllm" (provider_name="vllm").
ProviderSpec(
name="vllm",
keywords=("vllm",),
env_key="HOSTED_VLLM_API_KEY",
display_name="vLLM/Local",
litellm_prefix="hosted_vllm", # Llama-3-8B → hosted_vllm/Llama-3-8B
skip_prefixes=(),
env_extras=(),
is_gateway=False,
is_local=True,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="", # user must provide in config
strip_model_prefix=False,
model_overrides=(),
),
# === Auxiliary (not a primary LLM provider) ============================
# Groq: mainly used for Whisper voice transcription, also usable for LLM.
# Needs "groq/" prefix for LiteLLM routing. Placed last — it rarely wins fallback.
ProviderSpec(
name="groq",
keywords=("groq",),
env_key="GROQ_API_KEY",
display_name="Groq",
litellm_prefix="groq", # llama3-8b-8192 → groq/llama3-8b-8192
skip_prefixes=("groq/",), # avoid double-prefix
env_extras=(),
is_gateway=False,
is_local=False,
detect_by_key_prefix="",
detect_by_base_keyword="",
default_api_base="",
strip_model_prefix=False,
model_overrides=(),
),
)
# ---------------------------------------------------------------------------
# Lookup helpers
# ---------------------------------------------------------------------------
def find_by_model(model: str) -> ProviderSpec | None:
"""Match a standard provider by model-name keyword (case-insensitive).
Skips gateways/local those are matched by api_key/api_base instead."""
model_lower = model.lower()
for spec in PROVIDERS:
if spec.is_gateway or spec.is_local:
continue
if any(kw in model_lower for kw in spec.keywords):
return spec
return None
def find_gateway(
provider_name: str | None = None,
api_key: str | None = None,
api_base: str | None = None,
) -> ProviderSpec | None:
"""Detect gateway/local provider.
Priority:
1. provider_name if it maps to a gateway/local spec, use it directly.
2. api_key prefix e.g. "sk-or-" OpenRouter.
3. api_base keyword e.g. "aihubmix" in URL AiHubMix.
A standard provider with a custom api_base (e.g. DeepSeek behind a proxy)
will NOT be mistaken for vLLM the old fallback is gone.
"""
# 1. Direct match by config key
if provider_name:
spec = find_by_name(provider_name)
if spec and (spec.is_gateway or spec.is_local):
return spec
# 2. Auto-detect by api_key prefix / api_base keyword
for spec in PROVIDERS:
if spec.detect_by_key_prefix and api_key and api_key.startswith(spec.detect_by_key_prefix):
return spec
if spec.detect_by_base_keyword and api_base and spec.detect_by_base_keyword in api_base:
return spec
return None
def find_by_name(name: str) -> ProviderSpec | None:
"""Find a provider spec by config field name, e.g. "dashscope"."""
for spec in PROVIDERS:
if spec.name == name:
return spec
return None

View File

@ -0,0 +1,40 @@
---
name: cron
description: Schedule reminders and recurring tasks.
---
# Cron
Use the `cron` tool to schedule reminders or recurring tasks.
## Two Modes
1. **Reminder** - message is sent directly to user
2. **Task** - message is a task description, agent executes and sends result
## Examples
Fixed reminder:
```
cron(action="add", message="Time to take a break!", every_seconds=1200)
```
Dynamic task (agent executes each time):
```
cron(action="add", message="Check HKUDS/nanobot GitHub stars and report", every_seconds=600)
```
List/remove:
```
cron(action="list")
cron(action="remove", job_id="abc123")
```
## Time Expressions
| User says | Parameters |
|-----------|------------|
| every 20 minutes | every_seconds: 1200 |
| every hour | every_seconds: 3600 |
| every day at 8am | cron_expr: "0 8 * * *" |
| weekdays at 5pm | cron_expr: "0 17 * * 1-5" |

View File

@ -1,6 +1,6 @@
[project] [project]
name = "nanobot-ai" name = "nanobot-ai"
version = "0.1.3.post4" version = "0.1.3.post5"
description = "A lightweight personal AI assistant framework" description = "A lightweight personal AI assistant framework"
requires-python = ">=3.11" requires-python = ">=3.11"
license = {text = "MIT"} license = {text = "MIT"}
@ -23,12 +23,15 @@ dependencies = [
"pydantic-settings>=2.0.0", "pydantic-settings>=2.0.0",
"websockets>=12.0", "websockets>=12.0",
"websocket-client>=1.6.0", "websocket-client>=1.6.0",
"httpx>=0.25.0", "httpx[socks]>=0.25.0",
"loguru>=0.7.0", "loguru>=0.7.0",
"readability-lxml>=0.8.0", "readability-lxml>=0.8.0",
"rich>=13.0.0", "rich>=13.0.0",
"croniter>=2.0.0", "croniter>=2.0.0",
"python-telegram-bot>=21.0", "dingtalk-stream>=0.4.0",
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
"slack-sdk>=3.26.0", "slack-sdk>=3.26.0",
] ]

1
test_docker.sh → tests/test_docker.sh Executable file → Normal file
View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -euo pipefail set -euo pipefail
cd "$(dirname "$0")/.." || exit 1
IMAGE_NAME="nanobot-test" IMAGE_NAME="nanobot-test"

311
tests/test_email_channel.py Normal file
View File

@ -0,0 +1,311 @@
from email.message import EmailMessage
from datetime import date
import pytest
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.email import EmailChannel
from nanobot.config.schema import EmailConfig
def _make_config() -> EmailConfig:
return EmailConfig(
enabled=True,
consent_granted=True,
imap_host="imap.example.com",
imap_port=993,
imap_username="bot@example.com",
imap_password="secret",
smtp_host="smtp.example.com",
smtp_port=587,
smtp_username="bot@example.com",
smtp_password="secret",
mark_seen=True,
)
def _make_raw_email(
from_addr: str = "alice@example.com",
subject: str = "Hello",
body: str = "This is the body.",
) -> bytes:
msg = EmailMessage()
msg["From"] = from_addr
msg["To"] = "bot@example.com"
msg["Subject"] = subject
msg["Message-ID"] = "<m1@example.com>"
msg.set_content(body)
return msg.as_bytes()
def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
raw = _make_raw_email(subject="Invoice", body="Please pay")
class FakeIMAP:
def __init__(self) -> None:
self.store_calls: list[tuple[bytes, str, str]] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def search(self, *_args):
return "OK", [b"1"]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(_make_config(), MessageBus())
items = channel._fetch_new_messages()
assert len(items) == 1
assert items[0]["sender"] == "alice@example.com"
assert items[0]["subject"] == "Invoice"
assert "Please pay" in items[0]["content"]
assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
# Same UID should be deduped in-process.
items_again = channel._fetch_new_messages()
assert items_again == []
def test_extract_text_body_falls_back_to_html() -> None:
msg = EmailMessage()
msg["From"] = "alice@example.com"
msg["To"] = "bot@example.com"
msg["Subject"] = "HTML only"
msg.add_alternative("<p>Hello<br>world</p>", subtype="html")
text = EmailChannel._extract_text_body(msg)
assert "Hello" in text
assert "world" in text
@pytest.mark.asyncio
async def test_start_returns_immediately_without_consent(monkeypatch) -> None:
cfg = _make_config()
cfg.consent_granted = False
channel = EmailChannel(cfg, MessageBus())
called = {"fetch": False}
def _fake_fetch():
called["fetch"] = True
return []
monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
await channel.start()
assert channel.is_running is False
assert called["fetch"] is False
@pytest.mark.asyncio
async def test_send_uses_smtp_and_reply_subject(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.timeout = timeout
self.started_tls = False
self.logged_in = False
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
self.started_tls = True
def login(self, _user: str, _pw: str):
self.logged_in = True
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
fake_instances: list[FakeSMTP] = []
def _smtp_factory(host: str, port: int, timeout: int = 30):
instance = FakeSMTP(host, port, timeout=timeout)
fake_instances.append(instance)
return instance
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
channel = EmailChannel(_make_config(), MessageBus())
channel._last_subject_by_chat["alice@example.com"] = "Invoice #42"
channel._last_message_id_by_chat["alice@example.com"] = "<m1@example.com>"
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Acknowledged.",
)
)
assert len(fake_instances) == 1
smtp = fake_instances[0]
assert smtp.started_tls is True
assert smtp.logged_in is True
assert len(smtp.sent_messages) == 1
sent = smtp.sent_messages[0]
assert sent["Subject"] == "Re: Invoice #42"
assert sent["To"] == "alice@example.com"
assert sent["In-Reply-To"] == "<m1@example.com>"
@pytest.mark.asyncio
async def test_send_skips_when_auto_reply_disabled(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
return None
def login(self, _user: str, _pw: str):
return None
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
fake_instances: list[FakeSMTP] = []
def _smtp_factory(host: str, port: int, timeout: int = 30):
instance = FakeSMTP(host, port, timeout=timeout)
fake_instances.append(instance)
return instance
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
cfg = _make_config()
cfg.auto_reply_enabled = False
channel = EmailChannel(cfg, MessageBus())
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Should not send.",
)
)
assert fake_instances == []
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Force send.",
metadata={"force_send": True},
)
)
assert len(fake_instances) == 1
assert len(fake_instances[0].sent_messages) == 1
@pytest.mark.asyncio
async def test_send_skips_when_consent_not_granted(monkeypatch) -> None:
class FakeSMTP:
def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
self.sent_messages: list[EmailMessage] = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def starttls(self, context=None):
return None
def login(self, _user: str, _pw: str):
return None
def send_message(self, msg: EmailMessage):
self.sent_messages.append(msg)
called = {"smtp": False}
def _smtp_factory(host: str, port: int, timeout: int = 30):
called["smtp"] = True
return FakeSMTP(host, port, timeout=timeout)
monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
cfg = _make_config()
cfg.consent_granted = False
channel = EmailChannel(cfg, MessageBus())
await channel.send(
OutboundMessage(
channel="email",
chat_id="alice@example.com",
content="Should not send.",
metadata={"force_send": True},
)
)
assert called["smtp"] is False
def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(monkeypatch) -> None:
raw = _make_raw_email(subject="Status", body="Yesterday update")
class FakeIMAP:
def __init__(self) -> None:
self.search_args = None
self.store_calls: list[tuple[bytes, str, str]] = []
def login(self, _user: str, _pw: str):
return "OK", [b"logged in"]
def select(self, _mailbox: str):
return "OK", [b"1"]
def search(self, *_args):
self.search_args = _args
return "OK", [b"5"]
def fetch(self, _imap_id: bytes, _parts: str):
return "OK", [(b"5 (UID 999 BODY[] {200})", raw), b")"]
def store(self, imap_id: bytes, op: str, flags: str):
self.store_calls.append((imap_id, op, flags))
return "OK", [b""]
def logout(self):
return "BYE", [b""]
fake = FakeIMAP()
monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
channel = EmailChannel(_make_config(), MessageBus())
items = channel.fetch_messages_between_dates(
start_date=date(2026, 2, 6),
end_date=date(2026, 2, 7),
limit=10,
)
assert len(items) == 1
assert items[0]["subject"] == "Status"
# search(None, "SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
assert fake.search_args is not None
assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
assert fake.store_calls == []