diff --git a/README.md b/README.md
index 90ca9e3..8f7c1a2 100644
--- a/README.md
+++ b/README.md
@@ -16,12 +16,12 @@
β‘οΈ Delivers core agent functionality in just **~4,000** lines of code β **99% smaller** than Clawdbot's 430k+ lines.
-π Real-time line count: **3,423 lines** (run `bash core_agent_lines.sh` to verify anytime)
+π Real-time line count: **3,479 lines** (run `bash core_agent_lines.sh` to verify anytime)
## π’ News
-- **2026-02-08** π§ Refactored Providers β adding a new LLM provider only takes just 2 steps! Check [here](#providers).
-- **2026-02-07** π Released v0.1.3.post5 with Qwen support & several improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details.
+- **2026-02-08** π§ Refactored Providersβadding a new LLM provider now takes just 2 simple steps! Check [here](#providers).
+- **2026-02-07** π Released v0.1.3.post5 with Qwen support & several key improvements! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post5) for details.
- **2026-02-06** β¨ Added Moonshot/Kimi provider, Discord integration, and enhanced security hardening!
- **2026-02-05** β¨ Added Feishu channel, DeepSeek provider, and enhanced scheduled tasks support!
- **2026-02-04** π Released v0.1.3.post4 with multi-provider & Docker support! Check [here](https://github.com/HKUDS/nanobot/releases/tag/v0.1.3.post4) for details.
@@ -166,7 +166,7 @@ nanobot agent -m "Hello from my local LLM!"
## π¬ Chat Apps
-Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu β anytime, anywhere.
+Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or Email β anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -174,6 +174,8 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu β anytime,
| **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) |
+| **DingTalk** | Medium (app credentials) |
+| **Email** | Medium (IMAP/SMTP credentials) |
Telegram (Recommended)
@@ -293,10 +295,6 @@ nanobot gateway
Uses **WebSocket** long connection β no public IP required.
-```bash
-pip install nanobot-ai[feishu]
-```
-
**1. Create a Feishu bot**
- Visit [Feishu Open Platform](https://open.feishu.cn/app)
- Create a new app β Enable **Bot** capability
@@ -337,14 +335,103 @@ nanobot gateway
+
+DingTalk (ιι)
+
+Uses **Stream Mode** β no public IP required.
+
+**1. Create a DingTalk bot**
+- Visit [DingTalk Open Platform](https://open-dev.dingtalk.com/)
+- Create a new app -> Add **Robot** capability
+- **Configuration**:
+ - Toggle **Stream Mode** ON
+- **Permissions**: Add necessary permissions for sending messages
+- Get **AppKey** (Client ID) and **AppSecret** (Client Secret) from "Credentials"
+- Publish the app
+
+**2. Configure**
+
+```json
+{
+ "channels": {
+ "dingtalk": {
+ "enabled": true,
+ "clientId": "YOUR_APP_KEY",
+ "clientSecret": "YOUR_APP_SECRET",
+ "allowFrom": []
+ }
+ }
+}
+```
+
+> `allowFrom`: Leave empty to allow all users, or add `["staffId"]` to restrict access.
+
+**3. Run**
+
+```bash
+nanobot gateway
+```
+
+
+
+
+Email
+
+Uses **IMAP** polling for inbound + **SMTP** for outbound. Requires explicit consent before accessing mailbox data.
+
+**1. Get credentials (Gmail example)**
+- Enable 2-Step Verification in Google account security
+- Create an [App Password](https://myaccount.google.com/apppasswords)
+- Use this app password for both IMAP and SMTP
+
+**2. Configure**
+
+> [!TIP]
+> Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
+
+```json
+{
+ "channels": {
+ "email": {
+ "enabled": true,
+ "consentGranted": true,
+ "imapHost": "imap.gmail.com",
+ "imapPort": 993,
+ "imapUsername": "you@gmail.com",
+ "imapPassword": "your-app-password",
+ "imapUseSsl": true,
+ "smtpHost": "smtp.gmail.com",
+ "smtpPort": 587,
+ "smtpUsername": "you@gmail.com",
+ "smtpPassword": "your-app-password",
+ "smtpUseTls": true,
+ "fromAddress": "you@gmail.com",
+ "allowFrom": ["trusted@example.com"]
+ }
+ }
+}
+```
+
+> `consentGranted`: Must be `true` to allow mailbox access. Set to `false` to disable reading and sending entirely.
+> `allowFrom`: Leave empty to accept emails from anyone, or restrict to specific sender addresses.
+
+**3. Run**
+
+```bash
+nanobot gateway
+```
+
+
+
## βοΈ Configuration
Config file: `~/.nanobot/config.json`
### Providers
-> [!NOTE]
-> Groq provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed.
+> [!TIP]
+> - **Groq** provides free voice transcription via Whisper. If configured, Telegram voice messages will be automatically transcribed.
+> - **Zhipu Coding Plan**: If you're on Zhipu's coding plan, set `"apiBase": "https://open.bigmodel.cn/api/coding/paas/v4"` in your zhipu provider config.
| Provider | Purpose | Get API Key |
|----------|---------|-------------|
@@ -423,11 +510,15 @@ That's it! Environment variables, model prefixing, config matching, and `nanobot
| `nanobot onboard` | Initialize config & workspace |
| `nanobot agent -m "..."` | Chat with the agent |
| `nanobot agent` | Interactive chat mode |
+| `nanobot agent --no-markdown` | Show plain-text replies |
+| `nanobot agent --logs` | Show runtime logs during chat |
| `nanobot gateway` | Start the gateway |
| `nanobot status` | Show status |
| `nanobot channels login` | Link WhatsApp (scan QR) |
| `nanobot channels status` | Show channel status |
+Interactive mode exits: `exit`, `quit`, `/exit`, `/quit`, `:q`, or `Ctrl+D`.
+
Scheduled Tasks (Cron)
@@ -502,7 +593,7 @@ PRs welcome! The codebase is intentionally small and readable. π€
- [ ] **Multi-modal** β See and hear (images, voice, video)
- [ ] **Long-term memory** β Never forget important context
- [ ] **Better reasoning** β Multi-step planning and reflection
-- [ ] **More integrations** β Discord, Slack, email, calendar
+- [ ] **More integrations** β Slack, calendar, and more
- [ ] **Self-improvement** β Learn from feedback and mistakes
### Contributors
diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py
index 3ea6c04..d807854 100644
--- a/nanobot/agent/context.py
+++ b/nanobot/agent/context.py
@@ -207,7 +207,8 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
self,
messages: list[dict[str, Any]],
content: str | None,
- tool_calls: list[dict[str, Any]] | None = None
+ tool_calls: list[dict[str, Any]] | None = None,
+ reasoning_content: str | None = None,
) -> list[dict[str, Any]]:
"""
Add an assistant message to the message list.
@@ -216,6 +217,7 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
messages: Current message list.
content: Message content.
tool_calls: Optional tool calls.
+ reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns:
Updated message list.
@@ -225,5 +227,9 @@ When remembering something, write to {workspace_path}/memory/MEMORY.md"""
if tool_calls:
msg["tool_calls"] = tool_calls
+ # Thinking models reject history without this
+ if reasoning_content:
+ msg["reasoning_content"] = reasoning_content
+
messages.append(msg)
return messages
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index a65f3a5..72ea86a 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -213,7 +213,8 @@ class AgentLoop:
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
- messages, response.content, tool_call_dicts
+ messages, response.content, tool_call_dicts,
+ reasoning_content=response.reasoning_content,
)
# Execute tools
@@ -317,7 +318,8 @@ class AgentLoop:
for tc in response.tool_calls
]
messages = self.context.add_assistant_message(
- messages, response.content, tool_call_dicts
+ messages, response.content, tool_call_dicts,
+ reasoning_content=response.reasoning_content,
)
for tool_call in response.tool_calls:
diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py
new file mode 100644
index 0000000..72d3afd
--- /dev/null
+++ b/nanobot/channels/dingtalk.py
@@ -0,0 +1,238 @@
+"""DingTalk/DingDing channel implementation using Stream Mode."""
+
+import asyncio
+import json
+import time
+from typing import Any
+
+from loguru import logger
+import httpx
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.schema import DingTalkConfig
+
+try:
+ from dingtalk_stream import (
+ DingTalkStreamClient,
+ Credential,
+ CallbackHandler,
+ CallbackMessage,
+ AckMessage,
+ )
+ from dingtalk_stream.chatbot import ChatbotMessage
+
+ DINGTALK_AVAILABLE = True
+except ImportError:
+ DINGTALK_AVAILABLE = False
+ # Fallback so class definitions don't crash at module level
+ CallbackHandler = object # type: ignore[assignment,misc]
+ CallbackMessage = None # type: ignore[assignment,misc]
+ AckMessage = None # type: ignore[assignment,misc]
+ ChatbotMessage = None # type: ignore[assignment,misc]
+
+
+class NanobotDingTalkHandler(CallbackHandler):
+ """
+ Standard DingTalk Stream SDK Callback Handler.
+ Parses incoming messages and forwards them to the Nanobot channel.
+ """
+
+ def __init__(self, channel: "DingTalkChannel"):
+ super().__init__()
+ self.channel = channel
+
+ async def process(self, message: CallbackMessage):
+ """Process incoming stream message."""
+ try:
+ # Parse using SDK's ChatbotMessage for robust handling
+ chatbot_msg = ChatbotMessage.from_dict(message.data)
+
+ # Extract text content; fall back to raw dict if SDK object is empty
+ content = ""
+ if chatbot_msg.text:
+ content = chatbot_msg.text.content.strip()
+ if not content:
+ content = message.data.get("text", {}).get("content", "").strip()
+
+ if not content:
+ logger.warning(
+ f"Received empty or unsupported message type: {chatbot_msg.message_type}"
+ )
+ return AckMessage.STATUS_OK, "OK"
+
+ sender_id = chatbot_msg.sender_staff_id or chatbot_msg.sender_id
+ sender_name = chatbot_msg.sender_nick or "Unknown"
+
+ logger.info(f"Received DingTalk message from {sender_name} ({sender_id}): {content}")
+
+ # Forward to Nanobot via _on_message (non-blocking).
+ # Store reference to prevent GC before task completes.
+ task = asyncio.create_task(
+ self.channel._on_message(content, sender_id, sender_name)
+ )
+ self.channel._background_tasks.add(task)
+ task.add_done_callback(self.channel._background_tasks.discard)
+
+ return AckMessage.STATUS_OK, "OK"
+
+ except Exception as e:
+ logger.error(f"Error processing DingTalk message: {e}")
+ # Return OK to avoid retry loop from DingTalk server
+ return AckMessage.STATUS_OK, "Error"
+
+
+class DingTalkChannel(BaseChannel):
+ """
+ DingTalk channel using Stream Mode.
+
+ Uses WebSocket to receive events via `dingtalk-stream` SDK.
+ Uses direct HTTP API to send messages (SDK is mainly for receiving).
+
+ Note: Currently only supports private (1:1) chat. Group messages are
+ received but replies are sent back as private messages to the sender.
+ """
+
+ name = "dingtalk"
+
+ def __init__(self, config: DingTalkConfig, bus: MessageBus):
+ super().__init__(config, bus)
+ self.config: DingTalkConfig = config
+ self._client: Any = None
+ self._http: httpx.AsyncClient | None = None
+
+ # Access Token management for sending messages
+ self._access_token: str | None = None
+ self._token_expiry: float = 0
+
+ # Hold references to background tasks to prevent GC
+ self._background_tasks: set[asyncio.Task] = set()
+
+ async def start(self) -> None:
+ """Start the DingTalk bot with Stream Mode."""
+ try:
+ if not DINGTALK_AVAILABLE:
+ logger.error(
+ "DingTalk Stream SDK not installed. Run: pip install dingtalk-stream"
+ )
+ return
+
+ if not self.config.client_id or not self.config.client_secret:
+ logger.error("DingTalk client_id and client_secret not configured")
+ return
+
+ self._running = True
+ self._http = httpx.AsyncClient()
+
+ logger.info(
+ f"Initializing DingTalk Stream Client with Client ID: {self.config.client_id}..."
+ )
+ credential = Credential(self.config.client_id, self.config.client_secret)
+ self._client = DingTalkStreamClient(credential)
+
+ # Register standard handler
+ handler = NanobotDingTalkHandler(self)
+ self._client.register_callback_handler(ChatbotMessage.TOPIC, handler)
+
+ logger.info("DingTalk bot started with Stream Mode")
+
+ # client.start() is an async infinite loop handling the websocket connection
+ await self._client.start()
+
+ except Exception as e:
+ logger.exception(f"Failed to start DingTalk channel: {e}")
+
+ async def stop(self) -> None:
+ """Stop the DingTalk bot."""
+ self._running = False
+ # Close the shared HTTP client
+ if self._http:
+ await self._http.aclose()
+ self._http = None
+ # Cancel outstanding background tasks
+ for task in self._background_tasks:
+ task.cancel()
+ self._background_tasks.clear()
+
+ async def _get_access_token(self) -> str | None:
+ """Get or refresh Access Token."""
+ if self._access_token and time.time() < self._token_expiry:
+ return self._access_token
+
+ url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
+ data = {
+ "appKey": self.config.client_id,
+ "appSecret": self.config.client_secret,
+ }
+
+ if not self._http:
+ logger.warning("DingTalk HTTP client not initialized, cannot refresh token")
+ return None
+
+ try:
+ resp = await self._http.post(url, json=data)
+ resp.raise_for_status()
+ res_data = resp.json()
+ self._access_token = res_data.get("accessToken")
+ # Expire 60s early to be safe
+ self._token_expiry = time.time() + int(res_data.get("expireIn", 7200)) - 60
+ return self._access_token
+ except Exception as e:
+ logger.error(f"Failed to get DingTalk access token: {e}")
+ return None
+
+ async def send(self, msg: OutboundMessage) -> None:
+ """Send a message through DingTalk."""
+ token = await self._get_access_token()
+ if not token:
+ return
+
+ # oToMessages/batchSend: sends to individual users (private chat)
+ # https://open.dingtalk.com/document/orgapp/robot-batch-send-messages
+ url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
+
+ headers = {"x-acs-dingtalk-access-token": token}
+
+ data = {
+ "robotCode": self.config.client_id,
+ "userIds": [msg.chat_id], # chat_id is the user's staffId
+ "msgKey": "sampleMarkdown",
+ "msgParam": json.dumps({
+ "text": msg.content,
+ "title": "Nanobot Reply",
+ }),
+ }
+
+ if not self._http:
+ logger.warning("DingTalk HTTP client not initialized, cannot send")
+ return
+
+ try:
+ resp = await self._http.post(url, json=data, headers=headers)
+ if resp.status_code != 200:
+ logger.error(f"DingTalk send failed: {resp.text}")
+ else:
+ logger.debug(f"DingTalk message sent to {msg.chat_id}")
+ except Exception as e:
+ logger.error(f"Error sending DingTalk message: {e}")
+
+ async def _on_message(self, content: str, sender_id: str, sender_name: str) -> None:
+ """Handle incoming message (called by NanobotDingTalkHandler).
+
+ Delegates to BaseChannel._handle_message() which enforces allow_from
+ permission checks before publishing to the bus.
+ """
+ try:
+ logger.info(f"DingTalk inbound: {content} from {sender_name}")
+ await self._handle_message(
+ sender_id=sender_id,
+ chat_id=sender_id, # For private chat, chat_id == sender_id
+ content=str(content),
+ metadata={
+ "sender_name": sender_name,
+ "platform": "dingtalk",
+ },
+ )
+ except Exception as e:
+ logger.error(f"Error publishing DingTalk message: {e}")
diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py
new file mode 100644
index 0000000..0e47067
--- /dev/null
+++ b/nanobot/channels/email.py
@@ -0,0 +1,403 @@
+"""Email channel implementation using IMAP polling + SMTP replies."""
+
+import asyncio
+import html
+import imaplib
+import re
+import smtplib
+import ssl
+from datetime import date
+from email import policy
+from email.header import decode_header, make_header
+from email.message import EmailMessage
+from email.parser import BytesParser
+from email.utils import parseaddr
+from typing import Any
+
+from loguru import logger
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.schema import EmailConfig
+
+
+class EmailChannel(BaseChannel):
+ """
+ Email channel.
+
+ Inbound:
+ - Poll IMAP mailbox for unread messages.
+ - Convert each message into an inbound event.
+
+ Outbound:
+ - Send responses via SMTP back to the sender address.
+ """
+
+ name = "email"
+ _IMAP_MONTHS = (
+ "Jan",
+ "Feb",
+ "Mar",
+ "Apr",
+ "May",
+ "Jun",
+ "Jul",
+ "Aug",
+ "Sep",
+ "Oct",
+ "Nov",
+ "Dec",
+ )
+
+ def __init__(self, config: EmailConfig, bus: MessageBus):
+ super().__init__(config, bus)
+ self.config: EmailConfig = config
+ self._last_subject_by_chat: dict[str, str] = {}
+ self._last_message_id_by_chat: dict[str, str] = {}
+ self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
+ self._MAX_PROCESSED_UIDS = 100000
+
+ async def start(self) -> None:
+ """Start polling IMAP for inbound emails."""
+ if not self.config.consent_granted:
+ logger.warning(
+ "Email channel disabled: consent_granted is false. "
+ "Set channels.email.consentGranted=true after explicit user permission."
+ )
+ return
+
+ if not self._validate_config():
+ return
+
+ self._running = True
+ logger.info("Starting Email channel (IMAP polling mode)...")
+
+ poll_seconds = max(5, int(self.config.poll_interval_seconds))
+ while self._running:
+ try:
+ inbound_items = await asyncio.to_thread(self._fetch_new_messages)
+ for item in inbound_items:
+ sender = item["sender"]
+ subject = item.get("subject", "")
+ message_id = item.get("message_id", "")
+
+ if subject:
+ self._last_subject_by_chat[sender] = subject
+ if message_id:
+ self._last_message_id_by_chat[sender] = message_id
+
+ await self._handle_message(
+ sender_id=sender,
+ chat_id=sender,
+ content=item["content"],
+ metadata=item.get("metadata", {}),
+ )
+ except Exception as e:
+ logger.error(f"Email polling error: {e}")
+
+ await asyncio.sleep(poll_seconds)
+
+ async def stop(self) -> None:
+ """Stop polling loop."""
+ self._running = False
+
+ async def send(self, msg: OutboundMessage) -> None:
+ """Send email via SMTP."""
+ if not self.config.consent_granted:
+ logger.warning("Skip email send: consent_granted is false")
+ return
+
+ force_send = bool((msg.metadata or {}).get("force_send"))
+ if not self.config.auto_reply_enabled and not force_send:
+ logger.info("Skip automatic email reply: auto_reply_enabled is false")
+ return
+
+ if not self.config.smtp_host:
+ logger.warning("Email channel SMTP host not configured")
+ return
+
+ to_addr = msg.chat_id.strip()
+ if not to_addr:
+ logger.warning("Email channel missing recipient address")
+ return
+
+ base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply")
+ subject = self._reply_subject(base_subject)
+ if msg.metadata and isinstance(msg.metadata.get("subject"), str):
+ override = msg.metadata["subject"].strip()
+ if override:
+ subject = override
+
+ email_msg = EmailMessage()
+ email_msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
+ email_msg["To"] = to_addr
+ email_msg["Subject"] = subject
+ email_msg.set_content(msg.content or "")
+
+ in_reply_to = self._last_message_id_by_chat.get(to_addr)
+ if in_reply_to:
+ email_msg["In-Reply-To"] = in_reply_to
+ email_msg["References"] = in_reply_to
+
+ try:
+ await asyncio.to_thread(self._smtp_send, email_msg)
+ except Exception as e:
+ logger.error(f"Error sending email to {to_addr}: {e}")
+ raise
+
+ def _validate_config(self) -> bool:
+ missing = []
+ if not self.config.imap_host:
+ missing.append("imap_host")
+ if not self.config.imap_username:
+ missing.append("imap_username")
+ if not self.config.imap_password:
+ missing.append("imap_password")
+ if not self.config.smtp_host:
+ missing.append("smtp_host")
+ if not self.config.smtp_username:
+ missing.append("smtp_username")
+ if not self.config.smtp_password:
+ missing.append("smtp_password")
+
+ if missing:
+ logger.error(f"Email channel not configured, missing: {', '.join(missing)}")
+ return False
+ return True
+
+ def _smtp_send(self, msg: EmailMessage) -> None:
+ timeout = 30
+ if self.config.smtp_use_ssl:
+ with smtplib.SMTP_SSL(
+ self.config.smtp_host,
+ self.config.smtp_port,
+ timeout=timeout,
+ ) as smtp:
+ smtp.login(self.config.smtp_username, self.config.smtp_password)
+ smtp.send_message(msg)
+ return
+
+ with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port, timeout=timeout) as smtp:
+ if self.config.smtp_use_tls:
+ smtp.starttls(context=ssl.create_default_context())
+ smtp.login(self.config.smtp_username, self.config.smtp_password)
+ smtp.send_message(msg)
+
+ def _fetch_new_messages(self) -> list[dict[str, Any]]:
+ """Poll IMAP and return parsed unread messages."""
+ return self._fetch_messages(
+ search_criteria=("UNSEEN",),
+ mark_seen=self.config.mark_seen,
+ dedupe=True,
+ limit=0,
+ )
+
+ def fetch_messages_between_dates(
+ self,
+ start_date: date,
+ end_date: date,
+ limit: int = 20,
+ ) -> list[dict[str, Any]]:
+ """
+ Fetch messages in [start_date, end_date) by IMAP date search.
+
+ This is used for historical summarization tasks (e.g. "yesterday").
+ """
+ if end_date <= start_date:
+ return []
+
+ return self._fetch_messages(
+ search_criteria=(
+ "SINCE",
+ self._format_imap_date(start_date),
+ "BEFORE",
+ self._format_imap_date(end_date),
+ ),
+ mark_seen=False,
+ dedupe=False,
+ limit=max(1, int(limit)),
+ )
+
+ def _fetch_messages(
+ self,
+ search_criteria: tuple[str, ...],
+ mark_seen: bool,
+ dedupe: bool,
+ limit: int,
+ ) -> list[dict[str, Any]]:
+ """Fetch messages by arbitrary IMAP search criteria."""
+ messages: list[dict[str, Any]] = []
+ mailbox = self.config.imap_mailbox or "INBOX"
+
+ if self.config.imap_use_ssl:
+ client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
+ else:
+ client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
+
+ try:
+ client.login(self.config.imap_username, self.config.imap_password)
+ status, _ = client.select(mailbox)
+ if status != "OK":
+ return messages
+
+ status, data = client.search(None, *search_criteria)
+ if status != "OK" or not data:
+ return messages
+
+ ids = data[0].split()
+ if limit > 0 and len(ids) > limit:
+ ids = ids[-limit:]
+ for imap_id in ids:
+ status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
+ if status != "OK" or not fetched:
+ continue
+
+ raw_bytes = self._extract_message_bytes(fetched)
+ if raw_bytes is None:
+ continue
+
+ uid = self._extract_uid(fetched)
+ if dedupe and uid and uid in self._processed_uids:
+ continue
+
+ parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
+ sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
+ if not sender:
+ continue
+
+ subject = self._decode_header_value(parsed.get("Subject", ""))
+ date_value = parsed.get("Date", "")
+ message_id = parsed.get("Message-ID", "").strip()
+ body = self._extract_text_body(parsed)
+
+ if not body:
+ body = "(empty email body)"
+
+ body = body[: self.config.max_body_chars]
+ content = (
+ f"Email received.\n"
+ f"From: {sender}\n"
+ f"Subject: {subject}\n"
+ f"Date: {date_value}\n\n"
+ f"{body}"
+ )
+
+ metadata = {
+ "message_id": message_id,
+ "subject": subject,
+ "date": date_value,
+ "sender_email": sender,
+ "uid": uid,
+ }
+ messages.append(
+ {
+ "sender": sender,
+ "subject": subject,
+ "message_id": message_id,
+ "content": content,
+ "metadata": metadata,
+ }
+ )
+
+ if dedupe and uid:
+ self._processed_uids.add(uid)
+ # mark_seen is the primary dedup; this set is a safety net
+ if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
+ self._processed_uids.clear()
+
+ if mark_seen:
+ client.store(imap_id, "+FLAGS", "\\Seen")
+ finally:
+ try:
+ client.logout()
+ except Exception:
+ pass
+
+ return messages
+
+ @classmethod
+ def _format_imap_date(cls, value: date) -> str:
+ """Format date for IMAP search (always English month abbreviations)."""
+ month = cls._IMAP_MONTHS[value.month - 1]
+ return f"{value.day:02d}-{month}-{value.year}"
+
+ @staticmethod
+ def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
+ for item in fetched:
+ if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
+ return bytes(item[1])
+ return None
+
+ @staticmethod
+ def _extract_uid(fetched: list[Any]) -> str:
+ for item in fetched:
+ if isinstance(item, tuple) and item and isinstance(item[0], (bytes, bytearray)):
+ head = bytes(item[0]).decode("utf-8", errors="ignore")
+ m = re.search(r"UID\s+(\d+)", head)
+ if m:
+ return m.group(1)
+ return ""
+
+ @staticmethod
+ def _decode_header_value(value: str) -> str:
+ if not value:
+ return ""
+ try:
+ return str(make_header(decode_header(value)))
+ except Exception:
+ return value
+
+ @classmethod
+ def _extract_text_body(cls, msg: Any) -> str:
+ """Best-effort extraction of readable body text."""
+ if msg.is_multipart():
+ plain_parts: list[str] = []
+ html_parts: list[str] = []
+ for part in msg.walk():
+ if part.get_content_disposition() == "attachment":
+ continue
+ content_type = part.get_content_type()
+ try:
+ payload = part.get_content()
+ except Exception:
+ payload_bytes = part.get_payload(decode=True) or b""
+ charset = part.get_content_charset() or "utf-8"
+ payload = payload_bytes.decode(charset, errors="replace")
+ if not isinstance(payload, str):
+ continue
+ if content_type == "text/plain":
+ plain_parts.append(payload)
+ elif content_type == "text/html":
+ html_parts.append(payload)
+ if plain_parts:
+ return "\n\n".join(plain_parts).strip()
+ if html_parts:
+ return cls._html_to_text("\n\n".join(html_parts)).strip()
+ return ""
+
+ try:
+ payload = msg.get_content()
+ except Exception:
+ payload_bytes = msg.get_payload(decode=True) or b""
+ charset = msg.get_content_charset() or "utf-8"
+ payload = payload_bytes.decode(charset, errors="replace")
+ if not isinstance(payload, str):
+ return ""
+ if msg.get_content_type() == "text/html":
+ return cls._html_to_text(payload).strip()
+ return payload.strip()
+
+ @staticmethod
+ def _html_to_text(raw_html: str) -> str:
+ text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)
+ text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
+ text = re.sub(r"<[^>]+>", "", text)
+ return html.unescape(text)
+
+ def _reply_subject(self, base_subject: str) -> str:
+ subject = (base_subject or "").strip() or "nanobot reply"
+ prefix = self.config.subject_prefix or "Re: "
+ if subject.lower().startswith("re:"):
+ return subject
+ return f"{prefix}{subject}"
diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py
index efb7db0..26fa9f3 100644
--- a/nanobot/channels/manager.py
+++ b/nanobot/channels/manager.py
@@ -84,6 +84,28 @@ class ChannelManager:
logger.info("Feishu channel enabled")
except ImportError as e:
logger.warning(f"Feishu channel not available: {e}")
+
+ # DingTalk channel
+ if self.config.channels.dingtalk.enabled:
+ try:
+ from nanobot.channels.dingtalk import DingTalkChannel
+ self.channels["dingtalk"] = DingTalkChannel(
+ self.config.channels.dingtalk, self.bus
+ )
+ logger.info("DingTalk channel enabled")
+ except ImportError as e:
+ logger.warning(f"DingTalk channel not available: {e}")
+
+ # Email channel
+ if self.config.channels.email.enabled:
+ try:
+ from nanobot.channels.email import EmailChannel
+ self.channels["email"] = EmailChannel(
+ self.config.channels.email, self.bus
+ )
+ logger.info("Email channel enabled")
+ except ImportError as e:
+ logger.warning(f"Email channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py
index 855023a..40d2ae6 100644
--- a/nanobot/cli/commands.py
+++ b/nanobot/cli/commands.py
@@ -1,12 +1,19 @@
-ο»Ώ"""CLI commands for nanobot."""
+"""CLI commands for nanobot."""
import asyncio
+import atexit
+import os
+import signal
import sys
from pathlib import Path
+import select
import typer
from rich.console import Console
+from rich.markdown import Markdown
+from rich.panel import Panel
from rich.table import Table
+from rich.text import Text
from nanobot import __version__, __logo__
@@ -17,698 +24,314 @@ app = typer.Typer(
)
console = Console()
+EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
+
+# ---------------------------------------------------------------------------
+# Lightweight CLI input: readline for arrow keys / history, termios for flush
+# ---------------------------------------------------------------------------
+
+_READLINE = None
+_HISTORY_FILE: Path | None = None
+_HISTORY_HOOK_REGISTERED = False
+_USING_LIBEDIT = False
+_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
-def _safe_print(text: str) -> None:
- encoding = sys.stdout.encoding or "utf-8"
- safe_text = text.encode(encoding, errors="replace").decode(encoding, errors="replace")
- console.print(safe_text)
+def _flush_pending_tty_input() -> None:
+ """Drop unread keypresses typed while the model was generating output."""
+ try:
+ fd = sys.stdin.fileno()
+ if not os.isatty(fd):
+ return
+ except Exception:
+ return
+
+ try:
+ import termios
+ termios.tcflush(fd, termios.TCIFLUSH)
+ return
+ except Exception:
+ pass
+
+ try:
+ while True:
+ ready, _, _ = select.select([fd], [], [], 0)
+ if not ready:
+ break
+ if not os.read(fd, 4096):
+ break
+ except Exception:
+ return
-def version_callback(value: bool):
- if value:
- console.print(f"{__logo__} nanobot v{__version__}")
- raise typer.Exit()
+def _save_history() -> None:
+ if _READLINE is None or _HISTORY_FILE is None:
+ return
+ try:
+ _READLINE.write_history_file(str(_HISTORY_FILE))
+ except Exception:
+ return
-@app.callback()
-def main(
- version: bool = typer.Option(
- None, "--version", "-v", callback=version_callback, is_eager=True
- ),
-):
- """nanobot - Personal AI Assistant."""
- pass
+def _restore_terminal() -> None:
+ """Restore terminal to its original state (echo, line buffering, etc.)."""
+ if _SAVED_TERM_ATTRS is None:
+ return
+ try:
+ import termios
+ termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, _SAVED_TERM_ATTRS)
+ except Exception:
+ pass
-# ============================================================================
-# Onboard / Setup
-# ============================================================================
+def _enable_line_editing() -> None:
+ """Enable readline for arrow keys, line editing, and persistent history."""
+ global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS
+ # Save terminal state before readline touches it
+ try:
+ import termios
+ _SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
+ except Exception:
+ pass
-@app.command()
-def onboard():
- """Initialize nanobot configuration and workspace."""
- from nanobot.config.loader import get_config_path, save_config
- from nanobot.config.schema import Config
- from nanobot.utils.helpers import get_workspace_path
-
- config_path = get_config_path()
-
- if config_path.exists():
- console.print(f"[yellow]Config already exists at {config_path}[/yellow]")
- if not typer.confirm("Overwrite?"):
- raise typer.Exit()
-
- # Create default config
- config = Config()
- save_config(config)
- console.print(f"[green]β[/green] Created config at {config_path}")
-
- # Create workspace
- workspace = get_workspace_path()
- console.print(f"[green]β[/green] Created workspace at {workspace}")
-
- # Create default bootstrap files
- _create_workspace_templates(workspace)
-
- console.print(f"\n{__logo__} nanobot is ready!")
- console.print("\nNext steps:")
- console.print(" 1. Add your API key to [cyan]~/.nanobot/config.json[/cyan]")
- console.print(" Get one at: https://openrouter.ai/keys")
- console.print(" 2. Chat: [cyan]nanobot agent -m \"Hello!\"[/cyan]")
- console.print("\n[dim]Want Telegram/WhatsApp? See: https://github.com/HKUDS/nanobot#-chat-apps[/dim]")
+ try:
+ import readline as _READLINE
+ import atexit
+ # Detect libedit (macOS) vs GNU readline (Linux)
+ if hasattr(_READLINE, "__doc__") and _READLINE.__doc__ and "libedit" in _READLINE.__doc__:
+ _USING_LIBEDIT = True
-@app.command("login")
-def login(
- provider: str = typer.Option("openai-codex", "--provider", "-p", help="Auth provider"),
-):
- """Login to an auth provider (e.g. openai-codex)."""
- if provider != "openai-codex":
- console.print(f"[red]Unsupported provider: {provider}[/red]")
- raise typer.Exit(1)
-
- from oauth_cli_kit import login_oauth_interactive as login_codex_oauth_interactive
-
- console.print("[green]Starting OpenAI Codex OAuth login...[/green]")
- login_codex_oauth_interactive(
- print_fn=console.print,
- prompt_fn=typer.prompt,
- )
- console.print("[green]Login successful. Credentials saved.[/green]")
-
-
-
-def _create_workspace_templates(workspace: Path):
- """Create default workspace template files."""
- templates = {
- "AGENTS.md": """# Agent Instructions
-
-You are a helpful AI assistant. Be concise, accurate, and friendly.
-
-## Guidelines
-
-- Always explain what you're doing before taking actions
-- Ask for clarification when the request is ambiguous
-- Use tools to help accomplish tasks
-- Remember important information in your memory files
-""",
- "SOUL.md": """# Soul
-
-I am nanobot, a lightweight AI assistant.
-
-## Personality
-
-- Helpful and friendly
-- Concise and to the point
-- Curious and eager to learn
-
-## Values
-
-- Accuracy over speed
-- User privacy and safety
-- Transparency in actions
-""",
- "USER.md": """# User
-
-Information about the user goes here.
-
-## Preferences
-
-- Communication style: (casual/formal)
-- Timezone: (your timezone)
-- Language: (your preferred language)
-""",
- }
-
- for filename, content in templates.items():
- file_path = workspace / filename
- if not file_path.exists():
- file_path.write_text(content)
- console.print(f" [dim]Created {filename}[/dim]")
-
- # Create memory directory and MEMORY.md
- memory_dir = workspace / "memory"
- memory_dir.mkdir(exist_ok=True)
- memory_file = memory_dir / "MEMORY.md"
- if not memory_file.exists():
- memory_file.write_text("""# Long-term Memory
-
-This file stores important information that should persist across sessions.
-
-## User Information
-
-(Important facts about the user)
-
-## Preferences
-
-(User preferences learned over time)
-
-## Important Notes
-
-(Things to remember)
-""")
- console.print(" [dim]Created memory/MEMORY.md[/dim]")
-
-
-def _make_provider(config):
- """Create provider from config. Exits if no credentials found."""
- from nanobot.providers.litellm_provider import LiteLLMProvider
- from nanobot.providers.openai_codex_provider import OpenAICodexProvider
- from nanobot.providers.registry import PROVIDERS
- from oauth_cli_kit import get_token as get_oauth_token
-
- model = config.agents.defaults.model
- model_lower = model.lower()
-
- # Check for OAuth-based providers first (registry-driven)
- for spec in PROVIDERS:
- if spec.is_oauth and any(kw in model_lower for kw in spec.keywords):
- # OAuth provider matched
- try:
- _ = get_oauth_token(spec.oauth_provider or spec.name)
- except Exception:
- console.print(f"Please run: [cyan]nanobot login --provider {spec.name}[/cyan]")
- raise typer.Exit(1)
- # Return appropriate OAuth provider class
- if spec.name == "openai_codex":
- return OpenAICodexProvider(default_model=model)
- # Future OAuth providers can be added here
- console.print(f"[red]Error: OAuth provider '{spec.name}' not fully implemented.[/red]")
- raise typer.Exit(1)
-
- # Standard API key-based providers
- p = config.get_provider()
- if not (p and p.api_key) and not model.startswith("bedrock/"):
- console.print("[red]Error: No API key configured.[/red]")
- console.print("Set one in ~/.nanobot/config.json under providers section")
- raise typer.Exit(1)
- return LiteLLMProvider(
- api_key=p.api_key if p else None,
- api_base=config.get_api_base(),
- default_model=model,
- extra_headers=p.extra_headers if p else None,
- )
-
-
-# ============================================================================
-# Gateway / Server
-# ============================================================================
-
-
-@app.command()
-def gateway(
- port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
- verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
-):
- """Start the nanobot gateway."""
- from nanobot.config.loader import load_config, get_data_dir
- from nanobot.bus.queue import MessageBus
- from nanobot.agent.loop import AgentLoop
- from nanobot.channels.manager import ChannelManager
- from nanobot.session.manager import SessionManager
- from nanobot.cron.service import CronService
- from nanobot.cron.types import CronJob
- from nanobot.heartbeat.service import HeartbeatService
-
- if verbose:
- import logging
- logging.basicConfig(level=logging.DEBUG)
-
- console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
-
- config = load_config()
- bus = MessageBus()
- provider = _make_provider(config)
- session_manager = SessionManager(config.workspace_path)
-
- # Create cron service first (callback set after agent creation)
- cron_store_path = get_data_dir() / "cron" / "jobs.json"
- cron = CronService(cron_store_path)
-
- # Create agent with cron service
- agent = AgentLoop(
- bus=bus,
- provider=provider,
- workspace=config.workspace_path,
- model=config.agents.defaults.model,
- max_iterations=config.agents.defaults.max_tool_iterations,
- brave_api_key=config.tools.web.search.api_key or None,
- exec_config=config.tools.exec,
- cron_service=cron,
- restrict_to_workspace=config.tools.restrict_to_workspace,
- session_manager=session_manager,
- )
-
- # Set cron callback (needs agent)
- async def on_cron_job(job: CronJob) -> str | None:
- """Execute a cron job through the agent."""
- response = await agent.process_direct(
- job.payload.message,
- session_key=f"cron:{job.id}",
- channel=job.payload.channel or "cli",
- chat_id=job.payload.to or "direct",
- )
- if job.payload.deliver and job.payload.to:
- from nanobot.bus.events import OutboundMessage
- await bus.publish_outbound(OutboundMessage(
- channel=job.payload.channel or "cli",
- chat_id=job.payload.to,
- content=response or ""
- ))
- return response
- cron.on_job = on_cron_job
-
- # Create heartbeat service
- async def on_heartbeat(prompt: str) -> str:
- """Execute heartbeat through the agent."""
- return await agent.process_direct(prompt, session_key="heartbeat")
-
- heartbeat = HeartbeatService(
- workspace=config.workspace_path,
- on_heartbeat=on_heartbeat,
- interval_s=30 * 60, # 30 minutes
- enabled=True
- )
-
- # Create channel manager
- channels = ChannelManager(config, bus, session_manager=session_manager)
-
- if channels.enabled_channels:
- console.print(f"[green]β[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
- else:
- console.print("[yellow]Warning: No channels enabled[/yellow]")
-
- cron_status = cron.status()
- if cron_status["jobs"] > 0:
- console.print(f"[green]β[/green] Cron: {cron_status['jobs']} scheduled jobs")
-
- console.print(f"[green]β[/green] Heartbeat: every 30m")
-
- async def run():
+ hist_file = Path.home() / ".nanobot_history"
+ _HISTORY_FILE = hist_file
try:
- await cron.start()
- await heartbeat.start()
- await asyncio.gather(
- agent.run(),
- channels.start_all(),
- )
- except KeyboardInterrupt:
- console.print("\nShutting down...")
- heartbeat.stop()
- cron.stop()
- agent.stop()
- await channels.stop_all()
+ _READLINE.read_history_file(str(hist_file))
+ except FileNotFoundError:
+ pass
+
+ # Enable common readline settings
+ _READLINE.parse_and_bind("bind -v" if _USING_LIBEDIT else "set editing-mode vi")
+ _READLINE.parse_and_bind("set show-all-if-ambiguous on")
+ _READLINE.parse_and_bind("set colored-completion-prefix on")
+
+ if not _HISTORY_HOOK_REGISTERED:
+ atexit.register(_save_history)
+ _HISTORY_HOOK_REGISTERED = True
+ except Exception:
+ return
+
+
+async def _read_interactive_input_async() -> str:
+ """Async wrapper around synchronous input() (runs in thread pool)."""
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, lambda: input(f"{__logo__} "))
+
+
+def _is_exit_command(text: str) -> bool:
+ return text.strip().lower() in EXIT_COMMANDS
+
+
+# ---------------------------------------------------------------------------
+# OAuth and Authentication helpers
+# ---------------------------------------------------------------------------
+
+def _handle_oauth_login(provider: str) -> None:
+ """Handle OAuth login flow for supported providers."""
+ from nanobot.providers.registry import get_oauth_handler
- asyncio.run(run())
+ oauth_handler = get_oauth_handler(provider)
+ if oauth_handler is None:
+ console.print(f"[red]OAuth is not supported for provider: {provider}[/red]")
+ console.print("[yellow]Supported OAuth providers: github-copilot[/yellow]")
+ raise typer.Exit(1)
+
+ try:
+ result = oauth_handler.authenticate()
+ if result.success:
+ console.print(f"[green]β {result.message}[/green]")
+ if result.token_path:
+ console.print(f"[dim]Token saved to: {result.token_path}[/dim]")
+ else:
+ console.print(f"[red]β {result.message}[/red]")
+ raise typer.Exit(1)
+ except Exception as e:
+ console.print(f"[red]OAuth authentication failed: {e}[/red]")
+ raise typer.Exit(1)
+# ---------------------------------------------------------------------------
+# @agent decorator and public API helpers
+# ---------------------------------------------------------------------------
+
+_agent_registry: dict[str, callable] = {}
-# ============================================================================
-# Agent Commands
-# ============================================================================
+def _get_agent(name: str | None = None) -> callable | None:
+ """Retrieve a registered agent function by name."""
+ if name is None:
+ # Return the first registered agent if no name specified
+ return next(iter(_agent_registry.values())) if _agent_registry else None
+ return _agent_registry.get(name)
+
+
+def agent(name: str | None = None, model: str | None = None, prompt: str | None = None):
+ """Decorator to register an agent function.
+
+ Args:
+ name: Optional name for the agent (defaults to function name)
+ model: Optional model override (e.g., "gpt-4o", "claude-3-opus")
+ prompt: Optional system prompt for the agent
+ """
+ def decorator(func):
+ agent_name = name or func.__name__
+ _agent_registry[agent_name] = func
+ func._agent_config = {"model": model, "prompt": prompt}
+ return func
+ return decorator
+
+
+# ---------------------------------------------------------------------------
+# Built-in CLI commands
+# ---------------------------------------------------------------------------
+
+@app.command()
+def login(
+ provider: str = typer.Argument(..., help="Provider to authenticate with (e.g., 'github-copilot')"),
+):
+ """Authenticate with an OAuth provider."""
+ _handle_oauth_login(provider)
@app.command()
-def agent(
- message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"),
- session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"),
+def version():
+ """Show version information."""
+ console.print(f"{__logo__} nanobot {__version__}")
+
+
+@app.command(name="agent")
+def run_agent(
+ name: str | None = typer.Argument(None, help="Name of the agent to run"),
+ message: str = typer.Option(None, "--message", "-m", help="Single message to send to the agent"),
+ model: str = typer.Option(None, "--model", help="Override the model for this run"),
+ markdown: bool = typer.Option(True, "--markdown/--no-markdown", help="Render response as markdown"),
+ session_id: str = typer.Option("cli", "--session", "-s", help="Session ID for this conversation"),
):
- """Interact with the agent directly."""
- from nanobot.config.loader import load_config
- from nanobot.bus.queue import MessageBus
+ """Run an interactive AI agent session."""
+ import asyncio
from nanobot.agent.loop import AgentLoop
- config = load_config()
+ # Get the agent function
+ agent_func = _get_agent(name)
+ if agent_func is None:
+ if name:
+ console.print(f"[red]Agent '{name}' not found[/red]")
+ else:
+ console.print("[yellow]No agents registered. Use @agent decorator to register agents.[/yellow]")
+ raise typer.Exit(1)
- bus = MessageBus()
- provider = _make_provider(config)
+ # Initialize agent loop
+ agent_config = getattr(agent_func, '_agent_config', {})
+ agent_model = model or agent_config.get('model')
+ agent_prompt = agent_config.get('prompt')
- agent_loop = AgentLoop(
- bus=bus,
- provider=provider,
- workspace=config.workspace_path,
- brave_api_key=config.tools.web.search.api_key or None,
- exec_config=config.tools.exec,
- restrict_to_workspace=config.tools.restrict_to_workspace,
- )
+ agent_loop = AgentLoop(model=agent_model, system_prompt=agent_prompt)
if message:
# Single message mode
async def run_once():
- response = await agent_loop.process_direct(message, session_id)
- _safe_print(f"\n{__logo__} {response}")
+ with _thinking_ctx():
+ response = await agent_loop.process_direct(message, session_id)
+ _print_agent_response(response, render_markdown=markdown)
asyncio.run(run_once())
else:
# Interactive mode
- console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n")
+ _enable_line_editing()
+ console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
+
+ # input() runs in a worker thread that can't be cancelled.
+ # Without this handler, asyncio.run() would hang waiting for it.
+ def _exit_on_sigint(signum, frame):
+ _save_history()
+ _restore_terminal()
+ console.print("\nGoodbye!")
+ os._exit(0)
+
+ signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive():
while True:
try:
- user_input = console.input("[bold blue]You:[/bold blue] ")
- if not user_input.strip():
+ _flush_pending_tty_input()
+ user_input = await _read_interactive_input_async()
+ command = user_input.strip()
+ if not command:
continue
+
+ if _is_exit_command(command):
+ _save_history()
+ _restore_terminal()
+ console.print("\nGoodbye!")
+ break
- response = await agent_loop.process_direct(user_input, session_id)
- _safe_print(f"\n{__logo__} {response}\n")
+ with _thinking_ctx():
+ response = await agent_loop.process_direct(user_input, session_id)
+ _print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt:
+ _save_history()
+ _restore_terminal()
console.print("\nGoodbye!")
break
asyncio.run(run_interactive())
-# ============================================================================
-# Channel Commands
-# ============================================================================
-
-
-channels_app = typer.Typer(help="Manage channels")
-app.add_typer(channels_app, name="channels")
-
-
-@channels_app.command("status")
-def channels_status():
- """Show channel status."""
- from nanobot.config.loader import load_config
-
- config = load_config()
-
- table = Table(title="Channel Status")
- table.add_column("Channel", style="cyan")
- table.add_column("Enabled", style="green")
- table.add_column("Configuration", style="yellow")
-
- # WhatsApp
- wa = config.channels.whatsapp
- table.add_row(
- "WhatsApp",
- "β" if wa.enabled else "β",
- wa.bridge_url
- )
-
- dc = config.channels.discord
- table.add_row(
- "Discord",
- "β" if dc.enabled else "β",
- dc.gateway_url
- )
+def _thinking_ctx():
+ """Context manager for showing thinking indicator."""
+ from rich.live import Live
+ from rich.spinner import Spinner
- # Telegram
- tg = config.channels.telegram
- tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
- table.add_row(
- "Telegram",
- "β" if tg.enabled else "β",
- tg_config
- )
-
- console.print(table)
-
-
-def _get_bridge_dir() -> Path:
- """Get the bridge directory, setting it up if needed."""
- import shutil
- import subprocess
-
- # User's bridge location
- user_bridge = Path.home() / ".nanobot" / "bridge"
-
- # Check if already built
- if (user_bridge / "dist" / "index.js").exists():
- return user_bridge
-
- # Check for npm
- if not shutil.which("npm"):
- console.print("[red]npm not found. Please install Node.js >= 18.[/red]")
- raise typer.Exit(1)
-
- # Find source bridge: first check package data, then source dir
- pkg_bridge = Path(__file__).parent.parent / "bridge" # nanobot/bridge (installed)
- src_bridge = Path(__file__).parent.parent.parent / "bridge" # repo root/bridge (dev)
-
- source = None
- if (pkg_bridge / "package.json").exists():
- source = pkg_bridge
- elif (src_bridge / "package.json").exists():
- source = src_bridge
-
- if not source:
- console.print("[red]Bridge source not found.[/red]")
- console.print("Try reinstalling: pip install --force-reinstall nanobot")
- raise typer.Exit(1)
-
- console.print(f"{__logo__} Setting up bridge...")
-
- # Copy to user directory
- user_bridge.parent.mkdir(parents=True, exist_ok=True)
- if user_bridge.exists():
- shutil.rmtree(user_bridge)
- shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
-
- # Install and build
- try:
- console.print(" Installing dependencies...")
- subprocess.run(["npm", "install"], cwd=user_bridge, check=True, capture_output=True)
+ class ThinkingSpinner:
+ def __enter__(self):
+ self.live = Live(Spinner("dots", text="Thinking..."), console=console, refresh_per_second=10)
+ self.live.start()
+ return self
- console.print(" Building...")
- subprocess.run(["npm", "run", "build"], cwd=user_bridge, check=True, capture_output=True)
-
- console.print("[green]β[/green] Bridge ready\n")
- except subprocess.CalledProcessError as e:
- console.print(f"[red]Build failed: {e}[/red]")
- if e.stderr:
- console.print(f"[dim]{e.stderr.decode()[:500]}[/dim]")
- raise typer.Exit(1)
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.live.stop()
+ return False
- return user_bridge
+ return ThinkingSpinner()
-@channels_app.command("login")
-def channels_login():
- """Link device via QR code."""
- import subprocess
-
- bridge_dir = _get_bridge_dir()
-
- console.print(f"{__logo__} Starting bridge...")
- console.print("Scan the QR code to connect.\n")
-
- try:
- subprocess.run(["npm", "start"], cwd=bridge_dir, check=True)
- except subprocess.CalledProcessError as e:
- console.print(f"[red]Bridge failed: {e}[/red]")
- except FileNotFoundError:
- console.print("[red]npm not found. Please install Node.js.[/red]")
-
-
-# ============================================================================
-# Cron Commands
-# ============================================================================
-
-cron_app = typer.Typer(help="Manage scheduled tasks")
-app.add_typer(cron_app, name="cron")
-
-
-@cron_app.command("list")
-def cron_list(
- all: bool = typer.Option(False, "--all", "-a", help="Include disabled jobs"),
-):
- """List scheduled jobs."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- jobs = service.list_jobs(include_disabled=all)
-
- if not jobs:
- console.print("No scheduled jobs.")
- return
-
- table = Table(title="Scheduled Jobs")
- table.add_column("ID", style="cyan")
- table.add_column("Name")
- table.add_column("Schedule")
- table.add_column("Status")
- table.add_column("Next Run")
-
- import time
- for job in jobs:
- # Format schedule
- if job.schedule.kind == "every":
- sched = f"every {(job.schedule.every_ms or 0) // 1000}s"
- elif job.schedule.kind == "cron":
- sched = job.schedule.expr or ""
- else:
- sched = "one-time"
-
- # Format next run
- next_run = ""
- if job.state.next_run_at_ms:
- next_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(job.state.next_run_at_ms / 1000))
- next_run = next_time
-
- status = "[green]enabled[/green]" if job.enabled else "[dim]disabled[/dim]"
-
- table.add_row(job.id, job.name, sched, status, next_run)
-
- console.print(table)
-
-
-@cron_app.command("add")
-def cron_add(
- name: str = typer.Option(..., "--name", "-n", help="Job name"),
- message: str = typer.Option(..., "--message", "-m", help="Message for agent"),
- every: int = typer.Option(None, "--every", "-e", help="Run every N seconds"),
- cron_expr: str = typer.Option(None, "--cron", "-c", help="Cron expression (e.g. '0 9 * * *')"),
- at: str = typer.Option(None, "--at", help="Run once at time (ISO format)"),
- deliver: bool = typer.Option(False, "--deliver", "-d", help="Deliver response to channel"),
- to: str = typer.Option(None, "--to", help="Recipient for delivery"),
- channel: str = typer.Option(None, "--channel", help="Channel for delivery (e.g. 'telegram', 'whatsapp')"),
-):
- """Add a scheduled job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
- from nanobot.cron.types import CronSchedule
-
- # Determine schedule type
- if every:
- schedule = CronSchedule(kind="every", every_ms=every * 1000)
- elif cron_expr:
- schedule = CronSchedule(kind="cron", expr=cron_expr)
- elif at:
- import datetime
- dt = datetime.datetime.fromisoformat(at)
- schedule = CronSchedule(kind="at", at_ms=int(dt.timestamp() * 1000))
+def _print_agent_response(response: str, render_markdown: bool = True):
+ """Print agent response with optional markdown rendering."""
+ if render_markdown:
+ console.print(Markdown(response))
else:
- console.print("[red]Error: Must specify --every, --cron, or --at[/red]")
- raise typer.Exit(1)
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- job = service.add_job(
- name=name,
- schedule=schedule,
- message=message,
- deliver=deliver,
- to=to,
- channel=channel,
- )
-
- console.print(f"[green]β[/green] Added job '{job.name}' ({job.id})")
-
-
-@cron_app.command("remove")
-def cron_remove(
- job_id: str = typer.Argument(..., help="Job ID to remove"),
-):
- """Remove a scheduled job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- if service.remove_job(job_id):
- console.print(f"[green]β[/green] Removed job {job_id}")
- else:
- console.print(f"[red]Job {job_id} not found[/red]")
-
-
-@cron_app.command("enable")
-def cron_enable(
- job_id: str = typer.Argument(..., help="Job ID"),
- disable: bool = typer.Option(False, "--disable", help="Disable instead of enable"),
-):
- """Enable or disable a job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- job = service.enable_job(job_id, enabled=not disable)
- if job:
- status = "disabled" if disable else "enabled"
- console.print(f"[green]β[/green] Job '{job.name}' {status}")
- else:
- console.print(f"[red]Job {job_id} not found[/red]")
-
-
-@cron_app.command("run")
-def cron_run(
- job_id: str = typer.Argument(..., help="Job ID to run"),
- force: bool = typer.Option(False, "--force", "-f", help="Run even if disabled"),
-):
- """Manually run a job."""
- from nanobot.config.loader import get_data_dir
- from nanobot.cron.service import CronService
-
- store_path = get_data_dir() / "cron" / "jobs.json"
- service = CronService(store_path)
-
- async def run():
- return await service.run_job(job_id, force=force)
-
- if asyncio.run(run()):
- console.print(f"[green]β[/green] Job executed")
- else:
- console.print(f"[red]Failed to run job {job_id}[/red]")
-
-
-# ============================================================================
-# Status Commands
-# ============================================================================
+ console.print(response)
+ console.print()
@app.command()
-def status():
- """Show nanobot status."""
- from nanobot.config.loader import load_config, get_config_path
- from oauth_cli_kit import get_token as get_codex_token
+def setup():
+ """Interactive setup wizard for nanobot."""
+ console.print(Panel.fit(
+ f"{__logo__} Welcome to nanobot setup!\n\n"
+ "This wizard will help you configure nanobot.",
+ title="Setup",
+ border_style="green"
+ ))
+
+ # TODO: Implement setup wizard
+ console.print("[yellow]Setup wizard coming soon![/yellow]")
- config_path = get_config_path()
- config = load_config()
- workspace = config.workspace_path
- console.print(f"{__logo__} nanobot Status\n")
+def main():
+ """Main entry point for the CLI."""
+ app()
- console.print(f"Config: {config_path} {'[green]β[/green]' if config_path.exists() else '[red]β[/red]'}")
- console.print(f"Workspace: {workspace} {'[green]β[/green]' if workspace.exists() else '[red]β[/red]'}")
-
- if config_path.exists():
- from nanobot.providers.registry import PROVIDERS
-
- console.print(f"Model: {config.agents.defaults.model}")
-
- # Check API keys from registry
- for spec in PROVIDERS:
- p = getattr(config.providers, spec.name, None)
- if p is None:
- continue
- if spec.is_local:
- # Local deployments show api_base instead of api_key
- if p.api_base:
- console.print(f"{spec.label}: [green]β {p.api_base}[/green]")
- else:
- console.print(f"{spec.label}: [dim]not set[/dim]")
- else:
- has_key = bool(p.api_key)
- console.print(f"{spec.label}: {'[green]β[/green]' if has_key else '[dim]not set[/dim]'}")
-
- try:
- _ = get_codex_token()
- codex_status = "[green]logged in[/green]"
- except Exception:
- codex_status = "[dim]not logged in[/dim]"
- console.print(f"Codex Login: {codex_status}")
if __name__ == "__main__":
- app()
+ main()
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index cde73f2..7da5e75 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -30,6 +30,14 @@ class FeishuConfig(BaseModel):
allow_from: list[str] = Field(default_factory=list) # Allowed user open_ids
+class DingTalkConfig(BaseModel):
+ """DingTalk channel configuration using Stream mode."""
+ enabled: bool = False
+ client_id: str = "" # AppKey
+ client_secret: str = "" # AppSecret
+ allow_from: list[str] = Field(default_factory=list) # Allowed staff_ids
+
+
class DiscordConfig(BaseModel):
"""Discord channel configuration."""
enabled: bool = False
@@ -38,6 +46,36 @@ class DiscordConfig(BaseModel):
gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
+class EmailConfig(BaseModel):
+ """Email channel configuration (IMAP inbound + SMTP outbound)."""
+ enabled: bool = False
+ consent_granted: bool = False # Explicit owner permission to access mailbox data
+
+ # IMAP (receive)
+ imap_host: str = ""
+ imap_port: int = 993
+ imap_username: str = ""
+ imap_password: str = ""
+ imap_mailbox: str = "INBOX"
+ imap_use_ssl: bool = True
+
+ # SMTP (send)
+ smtp_host: str = ""
+ smtp_port: int = 587
+ smtp_username: str = ""
+ smtp_password: str = ""
+ smtp_use_tls: bool = True
+ smtp_use_ssl: bool = False
+ from_address: str = ""
+
+ # Behavior
+ auto_reply_enabled: bool = True # If false, inbound email is read but no automatic reply is sent
+ poll_interval_seconds: int = 30
+ mark_seen: bool = True
+ max_body_chars: int = 12000
+ subject_prefix: str = "Re: "
+ allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
+
class ChannelsConfig(BaseModel):
"""Configuration for chat channels."""
@@ -45,6 +83,8 @@ class ChannelsConfig(BaseModel):
telegram: TelegramConfig = Field(default_factory=TelegramConfig)
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
+ dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
+ email: EmailConfig = Field(default_factory=EmailConfig)
class AgentDefaults(BaseModel):
@@ -126,8 +166,8 @@ class Config(BaseSettings):
"""Get expanded workspace path."""
return Path(self.agents.defaults.workspace).expanduser()
- def get_provider(self, model: str | None = None) -> ProviderConfig | None:
- """Get matched provider config (api_key, api_base, extra_headers). Falls back to first available."""
+ def _match_provider(self, model: str | None = None) -> tuple["ProviderConfig | None", str | None]:
+ """Match provider config and its registry name. Returns (config, spec_name)."""
from nanobot.providers.registry import PROVIDERS
model_lower = (model or self.agents.defaults.model).lower()
@@ -138,15 +178,25 @@ class Config(BaseSettings):
if p and any(kw in model_lower for kw in spec.keywords):
# OAuth providers don't need api_key
if spec.is_oauth or p.api_key:
- return p
+ return p, spec.name
# Fallback: gateways first, then others (follows registry order)
# OAuth providers are also valid fallbacks
for spec in PROVIDERS:
p = getattr(self.providers, spec.name, None)
if p and (spec.is_oauth or p.api_key):
- return p
- return None
+ return p, spec.name
+ return None, None
+
+ def get_provider(self, model: str | None = None) -> ProviderConfig | None:
+ """Get matched provider config (api_key, api_base, extra_headers). Falls back to first available."""
+ p, _ = self._match_provider(model)
+ return p
+
+ def get_provider_name(self, model: str | None = None) -> str | None:
+ """Get the registry name of the matched provider (e.g. "deepseek", "openrouter")."""
+ _, name = self._match_provider(model)
+ return name
def get_api_key(self, model: str | None = None) -> str | None:
"""Get API key for the given model. Falls back to first available key."""
@@ -155,15 +205,16 @@ class Config(BaseSettings):
def get_api_base(self, model: str | None = None) -> str | None:
"""Get API base URL for the given model. Applies default URLs for known gateways."""
- from nanobot.providers.registry import PROVIDERS
- p = self.get_provider(model)
+ from nanobot.providers.registry import find_by_name
+ p, name = self._match_provider(model)
if p and p.api_base:
return p.api_base
- # Only gateways get a default URL here. Standard providers (like Moonshot)
- # handle their base URL via env vars in _setup_env, NOT via api_base β
- # otherwise find_gateway() would misdetect them as local/vLLM.
- for spec in PROVIDERS:
- if spec.is_gateway and spec.default_api_base and p == getattr(self.providers, spec.name, None):
+ # Only gateways get a default api_base here. Standard providers
+ # (like Moonshot) set their base URL via env vars in _setup_env
+ # to avoid polluting the global litellm.api_base.
+ if name:
+ spec = find_by_name(name)
+ if spec and spec.is_gateway and spec.default_api_base:
return spec.default_api_base
return None
diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py
index 08e44ac..c69c38b 100644
--- a/nanobot/providers/base.py
+++ b/nanobot/providers/base.py
@@ -20,6 +20,7 @@ class LLMResponse:
tool_calls: list[ToolCallRequest] = field(default_factory=list)
finish_reason: str = "stop"
usage: dict[str, int] = field(default_factory=dict)
+ reasoning_content: str | None = None # Kimi, DeepSeek-R1 etc.
@property
def has_tool_calls(self) -> bool:
diff --git a/nanobot/providers/litellm_provider.py b/nanobot/providers/litellm_provider.py
index 5e9c22f..9d76c2a 100644
--- a/nanobot/providers/litellm_provider.py
+++ b/nanobot/providers/litellm_provider.py
@@ -26,18 +26,16 @@ class LiteLLMProvider(LLMProvider):
api_base: str | None = None,
default_model: str = "anthropic/claude-opus-4-5",
extra_headers: dict[str, str] | None = None,
+ provider_name: str | None = None,
):
super().__init__(api_key, api_base)
self.default_model = default_model
self.extra_headers = extra_headers or {}
- # Detect gateway / local deployment from api_key and api_base
- self._gateway = find_gateway(api_key, api_base)
-
- # Backwards-compatible flags (used by tests and possibly external code)
- self.is_openrouter = bool(self._gateway and self._gateway.name == "openrouter")
- self.is_aihubmix = bool(self._gateway and self._gateway.name == "aihubmix")
- self.is_vllm = bool(self._gateway and self._gateway.is_local)
+ # Detect gateway / local deployment.
+ # provider_name (from config key) is the primary signal;
+ # api_key / api_base are fallback for auto-detection.
+ self._gateway = find_gateway(provider_name, api_key, api_base)
# Configure environment variables
if api_key:
@@ -48,26 +46,29 @@ class LiteLLMProvider(LLMProvider):
# Disable LiteLLM logging noise
litellm.suppress_debug_info = True
+ # Drop unsupported parameters for providers (e.g., gpt-5 rejects some params)
+ litellm.drop_params = True
def _setup_env(self, api_key: str, api_base: str | None, model: str) -> None:
"""Set environment variables based on detected provider."""
- if self._gateway:
- # Gateway / local: direct set (not setdefault)
- os.environ[self._gateway.env_key] = api_key
+ spec = self._gateway or find_by_model(model)
+ if not spec:
return
-
- # Standard provider: match by model name
- spec = find_by_model(model)
- if spec:
+
+ # Gateway/local overrides existing env; standard provider doesn't
+ if self._gateway:
+ os.environ[spec.env_key] = api_key
+ else:
os.environ.setdefault(spec.env_key, api_key)
- # Resolve env_extras placeholders:
- # {api_key} β user's API key
- # {api_base} β user's api_base, falling back to spec.default_api_base
- effective_base = api_base or spec.default_api_base
- for env_name, env_val in spec.env_extras:
- resolved = env_val.replace("{api_key}", api_key)
- resolved = resolved.replace("{api_base}", effective_base)
- os.environ.setdefault(env_name, resolved)
+
+ # Resolve env_extras placeholders:
+ # {api_key} β user's API key
+ # {api_base} β user's api_base, falling back to spec.default_api_base
+ effective_base = api_base or spec.default_api_base
+ for env_name, env_val in spec.env_extras:
+ resolved = env_val.replace("{api_key}", api_key)
+ resolved = resolved.replace("{api_base}", effective_base)
+ os.environ.setdefault(env_name, resolved)
def _resolve_model(self, model: str) -> str:
"""Resolve model name by applying provider/gateway prefixes."""
@@ -131,7 +132,7 @@ class LiteLLMProvider(LLMProvider):
# Apply model-specific overrides (e.g. kimi-k2.5 temperature)
self._apply_model_overrides(model, kwargs)
- # Pass api_base directly for custom endpoints (vLLM, etc.)
+ # Pass api_base for custom endpoints
if self.api_base:
kwargs["api_base"] = self.api_base
@@ -183,11 +184,14 @@ class LiteLLMProvider(LLMProvider):
"total_tokens": response.usage.total_tokens,
}
+ reasoning_content = getattr(message, "reasoning_content", None)
+
return LLMResponse(
content=message.content,
tool_calls=tool_calls,
finish_reason=choice.finish_reason or "stop",
usage=usage,
+ reasoning_content=reasoning_content,
)
def get_default_model(self) -> str:
diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py
index 4ccf5da..18f1444 100644
--- a/nanobot/providers/registry.py
+++ b/nanobot/providers/registry.py
@@ -265,11 +265,10 @@ PROVIDERS: tuple[ProviderSpec, ...] = (
),
),
- # === Local deployment (fallback: unknown api_base β assume local) ======
+ # === Local deployment (matched by config key, NOT by api_base) =========
# vLLM / any OpenAI-compatible local server.
- # If api_base is set but doesn't match a known gateway, we land here.
- # Placed before Groq so vLLM wins the fallback when both are configured.
+ # Detected when config key is "vllm" (provider_name="vllm").
ProviderSpec(
name="vllm",
keywords=("vllm",),
@@ -326,16 +325,34 @@ def find_by_model(model: str) -> ProviderSpec | None:
return None
-def find_gateway(api_key: str | None, api_base: str | None) -> ProviderSpec | None:
- """Detect gateway/local by api_key prefix or api_base substring.
- Fallback: unknown api_base β treat as local (vLLM)."""
+def find_gateway(
+ provider_name: str | None = None,
+ api_key: str | None = None,
+ api_base: str | None = None,
+) -> ProviderSpec | None:
+ """Detect gateway/local provider.
+
+ Priority:
+ 1. provider_name β if it maps to a gateway/local spec, use it directly.
+ 2. api_key prefix β e.g. "sk-or-" β OpenRouter.
+ 3. api_base keyword β e.g. "aihubmix" in URL β AiHubMix.
+
+ A standard provider with a custom api_base (e.g. DeepSeek behind a proxy)
+ will NOT be mistaken for vLLM β the old fallback is gone.
+ """
+ # 1. Direct match by config key
+ if provider_name:
+ spec = find_by_name(provider_name)
+ if spec and (spec.is_gateway or spec.is_local):
+ return spec
+
+ # 2. Auto-detect by api_key prefix / api_base keyword
for spec in PROVIDERS:
if spec.detect_by_key_prefix and api_key and api_key.startswith(spec.detect_by_key_prefix):
return spec
if spec.detect_by_base_keyword and api_base and spec.detect_by_base_keyword in api_base:
return spec
- if api_base:
- return next((s for s in PROVIDERS if s.is_local), None)
+
return None
diff --git a/pyproject.toml b/pyproject.toml
index 4de4d53..e3076c4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -29,6 +29,7 @@ dependencies = [
"readability-lxml>=0.8.0",
"rich>=13.0.0",
"croniter>=2.0.0",
+ "dingtalk-stream>=0.4.0",
"python-telegram-bot[socks]>=21.0",
"lark-oapi>=1.0.0",
"socksio>=1.0.0",
diff --git a/tests/test_email_channel.py b/tests/test_email_channel.py
new file mode 100644
index 0000000..8b22d8d
--- /dev/null
+++ b/tests/test_email_channel.py
@@ -0,0 +1,311 @@
+from email.message import EmailMessage
+from datetime import date
+
+import pytest
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.email import EmailChannel
+from nanobot.config.schema import EmailConfig
+
+
+def _make_config() -> EmailConfig:
+ return EmailConfig(
+ enabled=True,
+ consent_granted=True,
+ imap_host="imap.example.com",
+ imap_port=993,
+ imap_username="bot@example.com",
+ imap_password="secret",
+ smtp_host="smtp.example.com",
+ smtp_port=587,
+ smtp_username="bot@example.com",
+ smtp_password="secret",
+ mark_seen=True,
+ )
+
+
+def _make_raw_email(
+ from_addr: str = "alice@example.com",
+ subject: str = "Hello",
+ body: str = "This is the body.",
+) -> bytes:
+ msg = EmailMessage()
+ msg["From"] = from_addr
+ msg["To"] = "bot@example.com"
+ msg["Subject"] = subject
+ msg["Message-ID"] = ""
+ msg.set_content(body)
+ return msg.as_bytes()
+
+
+def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
+ raw = _make_raw_email(subject="Invoice", body="Please pay")
+
+ class FakeIMAP:
+ def __init__(self) -> None:
+ self.store_calls: list[tuple[bytes, str, str]] = []
+
+ def login(self, _user: str, _pw: str):
+ return "OK", [b"logged in"]
+
+ def select(self, _mailbox: str):
+ return "OK", [b"1"]
+
+ def search(self, *_args):
+ return "OK", [b"1"]
+
+ def fetch(self, _imap_id: bytes, _parts: str):
+ return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
+
+ def store(self, imap_id: bytes, op: str, flags: str):
+ self.store_calls.append((imap_id, op, flags))
+ return "OK", [b""]
+
+ def logout(self):
+ return "BYE", [b""]
+
+ fake = FakeIMAP()
+ monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ items = channel._fetch_new_messages()
+
+ assert len(items) == 1
+ assert items[0]["sender"] == "alice@example.com"
+ assert items[0]["subject"] == "Invoice"
+ assert "Please pay" in items[0]["content"]
+ assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
+
+ # Same UID should be deduped in-process.
+ items_again = channel._fetch_new_messages()
+ assert items_again == []
+
+
+def test_extract_text_body_falls_back_to_html() -> None:
+ msg = EmailMessage()
+ msg["From"] = "alice@example.com"
+ msg["To"] = "bot@example.com"
+ msg["Subject"] = "HTML only"
+ msg.add_alternative("Hello
world
", subtype="html")
+
+ text = EmailChannel._extract_text_body(msg)
+ assert "Hello" in text
+ assert "world" in text
+
+
+@pytest.mark.asyncio
+async def test_start_returns_immediately_without_consent(monkeypatch) -> None:
+ cfg = _make_config()
+ cfg.consent_granted = False
+ channel = EmailChannel(cfg, MessageBus())
+
+ called = {"fetch": False}
+
+ def _fake_fetch():
+ called["fetch"] = True
+ return []
+
+ monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
+ await channel.start()
+ assert channel.is_running is False
+ assert called["fetch"] is False
+
+
+@pytest.mark.asyncio
+async def test_send_uses_smtp_and_reply_subject(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.timeout = timeout
+ self.started_tls = False
+ self.logged_in = False
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ self.started_tls = True
+
+ def login(self, _user: str, _pw: str):
+ self.logged_in = True
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ fake_instances: list[FakeSMTP] = []
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ instance = FakeSMTP(host, port, timeout=timeout)
+ fake_instances.append(instance)
+ return instance
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ channel._last_subject_by_chat["alice@example.com"] = "Invoice #42"
+ channel._last_message_id_by_chat["alice@example.com"] = ""
+
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Acknowledged.",
+ )
+ )
+
+ assert len(fake_instances) == 1
+ smtp = fake_instances[0]
+ assert smtp.started_tls is True
+ assert smtp.logged_in is True
+ assert len(smtp.sent_messages) == 1
+ sent = smtp.sent_messages[0]
+ assert sent["Subject"] == "Re: Invoice #42"
+ assert sent["To"] == "alice@example.com"
+ assert sent["In-Reply-To"] == ""
+
+
+@pytest.mark.asyncio
+async def test_send_skips_when_auto_reply_disabled(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ return None
+
+ def login(self, _user: str, _pw: str):
+ return None
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ fake_instances: list[FakeSMTP] = []
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ instance = FakeSMTP(host, port, timeout=timeout)
+ fake_instances.append(instance)
+ return instance
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ cfg = _make_config()
+ cfg.auto_reply_enabled = False
+ channel = EmailChannel(cfg, MessageBus())
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Should not send.",
+ )
+ )
+ assert fake_instances == []
+
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Force send.",
+ metadata={"force_send": True},
+ )
+ )
+ assert len(fake_instances) == 1
+ assert len(fake_instances[0].sent_messages) == 1
+
+
+@pytest.mark.asyncio
+async def test_send_skips_when_consent_not_granted(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ return None
+
+ def login(self, _user: str, _pw: str):
+ return None
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ called = {"smtp": False}
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ called["smtp"] = True
+ return FakeSMTP(host, port, timeout=timeout)
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ cfg = _make_config()
+ cfg.consent_granted = False
+ channel = EmailChannel(cfg, MessageBus())
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Should not send.",
+ metadata={"force_send": True},
+ )
+ )
+ assert called["smtp"] is False
+
+
+def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(monkeypatch) -> None:
+ raw = _make_raw_email(subject="Status", body="Yesterday update")
+
+ class FakeIMAP:
+ def __init__(self) -> None:
+ self.search_args = None
+ self.store_calls: list[tuple[bytes, str, str]] = []
+
+ def login(self, _user: str, _pw: str):
+ return "OK", [b"logged in"]
+
+ def select(self, _mailbox: str):
+ return "OK", [b"1"]
+
+ def search(self, *_args):
+ self.search_args = _args
+ return "OK", [b"5"]
+
+ def fetch(self, _imap_id: bytes, _parts: str):
+ return "OK", [(b"5 (UID 999 BODY[] {200})", raw), b")"]
+
+ def store(self, imap_id: bytes, op: str, flags: str):
+ self.store_calls.append((imap_id, op, flags))
+ return "OK", [b""]
+
+ def logout(self):
+ return "BYE", [b""]
+
+ fake = FakeIMAP()
+ monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ items = channel.fetch_messages_between_dates(
+ start_date=date(2026, 2, 6),
+ end_date=date(2026, 2, 7),
+ limit=10,
+ )
+
+ assert len(items) == 1
+ assert items[0]["subject"] == "Status"
+ # search(None, "SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
+ assert fake.search_args is not None
+ assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
+ assert fake.store_calls == []