diff --git a/README.md b/README.md
index 502a42f..8f7c1a2 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
-📏 Real-time line count: **3,448 lines** (run `bash core_agent_lines.sh` to verify anytime)
+📏 Real-time line count: **3,479 lines** (run `bash core_agent_lines.sh` to verify anytime)
## 📢 News
@@ -166,7 +166,7 @@ nanobot agent -m "Hello from my local LLM!"
## 💬 Chat Apps
-Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime, anywhere.
+Talk to your nanobot through Telegram, Discord, WhatsApp, Feishu, DingTalk, or Email — anytime, anywhere.
| Channel | Setup |
|---------|-------|
@@ -174,6 +174,8 @@ Talk to your nanobot through Telegram, Discord, WhatsApp, or Feishu — anytime,
| **Discord** | Easy (bot token + intents) |
| **WhatsApp** | Medium (scan QR) |
| **Feishu** | Medium (app credentials) |
+| **DingTalk** | Medium (app credentials) |
+| **Email** | Medium (IMAP/SMTP credentials) |
Telegram (Recommended)
@@ -372,6 +374,55 @@ nanobot gateway
+
+Email
+
+Uses **IMAP** polling for inbound + **SMTP** for outbound. Requires explicit consent before accessing mailbox data.
+
+**1. Get credentials (Gmail example)**
+- Enable 2-Step Verification in Google account security
+- Create an [App Password](https://myaccount.google.com/apppasswords)
+- Use this app password for both IMAP and SMTP
+
+**2. Configure**
+
+> [!TIP]
+> Set `"autoReplyEnabled": false` if you only want to read/analyze emails without sending automatic replies.
+
+```json
+{
+ "channels": {
+ "email": {
+ "enabled": true,
+ "consentGranted": true,
+ "imapHost": "imap.gmail.com",
+ "imapPort": 993,
+ "imapUsername": "you@gmail.com",
+ "imapPassword": "your-app-password",
+ "imapUseSsl": true,
+ "smtpHost": "smtp.gmail.com",
+ "smtpPort": 587,
+ "smtpUsername": "you@gmail.com",
+ "smtpPassword": "your-app-password",
+ "smtpUseTls": true,
+ "fromAddress": "you@gmail.com",
+ "allowFrom": ["trusted@example.com"]
+ }
+ }
+}
+```
+
+> `consentGranted`: Must be `true` to allow mailbox access. Set to `false` to disable reading and sending entirely.
+> `allowFrom`: Leave empty to accept emails from anyone, or restrict to specific sender addresses.
+
+**3. Run**
+
+```bash
+nanobot gateway
+```
+
+
+
## ⚙️ Configuration
Config file: `~/.nanobot/config.json`
@@ -542,7 +593,7 @@ PRs welcome! The codebase is intentionally small and readable. 🤗
- [ ] **Multi-modal** — See and hear (images, voice, video)
- [ ] **Long-term memory** — Never forget important context
- [ ] **Better reasoning** — Multi-step planning and reflection
-- [ ] **More integrations** — Discord, Slack, email, calendar
+- [ ] **More integrations** — Slack, calendar, and more
- [ ] **Self-improvement** — Learn from feedback and mistakes
### Contributors
diff --git a/nanobot/channels/email.py b/nanobot/channels/email.py
new file mode 100644
index 0000000..0e47067
--- /dev/null
+++ b/nanobot/channels/email.py
@@ -0,0 +1,403 @@
+"""Email channel implementation using IMAP polling + SMTP replies."""
+
+import asyncio
+import html
+import imaplib
+import re
+import smtplib
+import ssl
+from datetime import date
+from email import policy
+from email.header import decode_header, make_header
+from email.message import EmailMessage
+from email.parser import BytesParser
+from email.utils import parseaddr
+from typing import Any
+
+from loguru import logger
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.base import BaseChannel
+from nanobot.config.schema import EmailConfig
+
+
+class EmailChannel(BaseChannel):
+ """
+ Email channel.
+
+ Inbound:
+ - Poll IMAP mailbox for unread messages.
+ - Convert each message into an inbound event.
+
+ Outbound:
+ - Send responses via SMTP back to the sender address.
+ """
+
+ name = "email"
+ _IMAP_MONTHS = (
+ "Jan",
+ "Feb",
+ "Mar",
+ "Apr",
+ "May",
+ "Jun",
+ "Jul",
+ "Aug",
+ "Sep",
+ "Oct",
+ "Nov",
+ "Dec",
+ )
+
+ def __init__(self, config: EmailConfig, bus: MessageBus):
+ super().__init__(config, bus)
+ self.config: EmailConfig = config
+ self._last_subject_by_chat: dict[str, str] = {}
+ self._last_message_id_by_chat: dict[str, str] = {}
+ self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
+ self._MAX_PROCESSED_UIDS = 100000
+
+ async def start(self) -> None:
+ """Start polling IMAP for inbound emails."""
+ if not self.config.consent_granted:
+ logger.warning(
+ "Email channel disabled: consent_granted is false. "
+ "Set channels.email.consentGranted=true after explicit user permission."
+ )
+ return
+
+ if not self._validate_config():
+ return
+
+ self._running = True
+ logger.info("Starting Email channel (IMAP polling mode)...")
+
+ poll_seconds = max(5, int(self.config.poll_interval_seconds))
+ while self._running:
+ try:
+ inbound_items = await asyncio.to_thread(self._fetch_new_messages)
+ for item in inbound_items:
+ sender = item["sender"]
+ subject = item.get("subject", "")
+ message_id = item.get("message_id", "")
+
+ if subject:
+ self._last_subject_by_chat[sender] = subject
+ if message_id:
+ self._last_message_id_by_chat[sender] = message_id
+
+ await self._handle_message(
+ sender_id=sender,
+ chat_id=sender,
+ content=item["content"],
+ metadata=item.get("metadata", {}),
+ )
+ except Exception as e:
+ logger.error(f"Email polling error: {e}")
+
+ await asyncio.sleep(poll_seconds)
+
+ async def stop(self) -> None:
+ """Stop polling loop."""
+ self._running = False
+
+ async def send(self, msg: OutboundMessage) -> None:
+ """Send email via SMTP."""
+ if not self.config.consent_granted:
+ logger.warning("Skip email send: consent_granted is false")
+ return
+
+ force_send = bool((msg.metadata or {}).get("force_send"))
+ if not self.config.auto_reply_enabled and not force_send:
+ logger.info("Skip automatic email reply: auto_reply_enabled is false")
+ return
+
+ if not self.config.smtp_host:
+ logger.warning("Email channel SMTP host not configured")
+ return
+
+ to_addr = msg.chat_id.strip()
+ if not to_addr:
+ logger.warning("Email channel missing recipient address")
+ return
+
+ base_subject = self._last_subject_by_chat.get(to_addr, "nanobot reply")
+ subject = self._reply_subject(base_subject)
+ if msg.metadata and isinstance(msg.metadata.get("subject"), str):
+ override = msg.metadata["subject"].strip()
+ if override:
+ subject = override
+
+ email_msg = EmailMessage()
+ email_msg["From"] = self.config.from_address or self.config.smtp_username or self.config.imap_username
+ email_msg["To"] = to_addr
+ email_msg["Subject"] = subject
+ email_msg.set_content(msg.content or "")
+
+ in_reply_to = self._last_message_id_by_chat.get(to_addr)
+ if in_reply_to:
+ email_msg["In-Reply-To"] = in_reply_to
+ email_msg["References"] = in_reply_to
+
+ try:
+ await asyncio.to_thread(self._smtp_send, email_msg)
+ except Exception as e:
+ logger.error(f"Error sending email to {to_addr}: {e}")
+ raise
+
+ def _validate_config(self) -> bool:
+ missing = []
+ if not self.config.imap_host:
+ missing.append("imap_host")
+ if not self.config.imap_username:
+ missing.append("imap_username")
+ if not self.config.imap_password:
+ missing.append("imap_password")
+ if not self.config.smtp_host:
+ missing.append("smtp_host")
+ if not self.config.smtp_username:
+ missing.append("smtp_username")
+ if not self.config.smtp_password:
+ missing.append("smtp_password")
+
+ if missing:
+ logger.error(f"Email channel not configured, missing: {', '.join(missing)}")
+ return False
+ return True
+
+ def _smtp_send(self, msg: EmailMessage) -> None:
+ timeout = 30
+ if self.config.smtp_use_ssl:
+ with smtplib.SMTP_SSL(
+ self.config.smtp_host,
+ self.config.smtp_port,
+ timeout=timeout,
+ ) as smtp:
+ smtp.login(self.config.smtp_username, self.config.smtp_password)
+ smtp.send_message(msg)
+ return
+
+ with smtplib.SMTP(self.config.smtp_host, self.config.smtp_port, timeout=timeout) as smtp:
+ if self.config.smtp_use_tls:
+ smtp.starttls(context=ssl.create_default_context())
+ smtp.login(self.config.smtp_username, self.config.smtp_password)
+ smtp.send_message(msg)
+
+ def _fetch_new_messages(self) -> list[dict[str, Any]]:
+ """Poll IMAP and return parsed unread messages."""
+ return self._fetch_messages(
+ search_criteria=("UNSEEN",),
+ mark_seen=self.config.mark_seen,
+ dedupe=True,
+ limit=0,
+ )
+
+ def fetch_messages_between_dates(
+ self,
+ start_date: date,
+ end_date: date,
+ limit: int = 20,
+ ) -> list[dict[str, Any]]:
+ """
+ Fetch messages in [start_date, end_date) by IMAP date search.
+
+ This is used for historical summarization tasks (e.g. "yesterday").
+ """
+ if end_date <= start_date:
+ return []
+
+ return self._fetch_messages(
+ search_criteria=(
+ "SINCE",
+ self._format_imap_date(start_date),
+ "BEFORE",
+ self._format_imap_date(end_date),
+ ),
+ mark_seen=False,
+ dedupe=False,
+ limit=max(1, int(limit)),
+ )
+
+ def _fetch_messages(
+ self,
+ search_criteria: tuple[str, ...],
+ mark_seen: bool,
+ dedupe: bool,
+ limit: int,
+ ) -> list[dict[str, Any]]:
+ """Fetch messages by arbitrary IMAP search criteria."""
+ messages: list[dict[str, Any]] = []
+ mailbox = self.config.imap_mailbox or "INBOX"
+
+ if self.config.imap_use_ssl:
+ client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
+ else:
+ client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
+
+ try:
+ client.login(self.config.imap_username, self.config.imap_password)
+ status, _ = client.select(mailbox)
+ if status != "OK":
+ return messages
+
+ status, data = client.search(None, *search_criteria)
+ if status != "OK" or not data:
+ return messages
+
+ ids = data[0].split()
+ if limit > 0 and len(ids) > limit:
+ ids = ids[-limit:]
+ for imap_id in ids:
+ status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
+ if status != "OK" or not fetched:
+ continue
+
+ raw_bytes = self._extract_message_bytes(fetched)
+ if raw_bytes is None:
+ continue
+
+ uid = self._extract_uid(fetched)
+ if dedupe and uid and uid in self._processed_uids:
+ continue
+
+ parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
+ sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
+ if not sender:
+ continue
+
+ subject = self._decode_header_value(parsed.get("Subject", ""))
+ date_value = parsed.get("Date", "")
+ message_id = parsed.get("Message-ID", "").strip()
+ body = self._extract_text_body(parsed)
+
+ if not body:
+ body = "(empty email body)"
+
+ body = body[: self.config.max_body_chars]
+ content = (
+ f"Email received.\n"
+ f"From: {sender}\n"
+ f"Subject: {subject}\n"
+ f"Date: {date_value}\n\n"
+ f"{body}"
+ )
+
+ metadata = {
+ "message_id": message_id,
+ "subject": subject,
+ "date": date_value,
+ "sender_email": sender,
+ "uid": uid,
+ }
+ messages.append(
+ {
+ "sender": sender,
+ "subject": subject,
+ "message_id": message_id,
+ "content": content,
+ "metadata": metadata,
+ }
+ )
+
+ if dedupe and uid:
+ self._processed_uids.add(uid)
+ # mark_seen is the primary dedup; this set is a safety net
+ if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
+ self._processed_uids.clear()
+
+ if mark_seen:
+ client.store(imap_id, "+FLAGS", "\\Seen")
+ finally:
+ try:
+ client.logout()
+ except Exception:
+ pass
+
+ return messages
+
+ @classmethod
+ def _format_imap_date(cls, value: date) -> str:
+ """Format date for IMAP search (always English month abbreviations)."""
+ month = cls._IMAP_MONTHS[value.month - 1]
+ return f"{value.day:02d}-{month}-{value.year}"
+
+ @staticmethod
+ def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
+ for item in fetched:
+ if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
+ return bytes(item[1])
+ return None
+
+ @staticmethod
+ def _extract_uid(fetched: list[Any]) -> str:
+ for item in fetched:
+ if isinstance(item, tuple) and item and isinstance(item[0], (bytes, bytearray)):
+ head = bytes(item[0]).decode("utf-8", errors="ignore")
+ m = re.search(r"UID\s+(\d+)", head)
+ if m:
+ return m.group(1)
+ return ""
+
+ @staticmethod
+ def _decode_header_value(value: str) -> str:
+ if not value:
+ return ""
+ try:
+ return str(make_header(decode_header(value)))
+ except Exception:
+ return value
+
+ @classmethod
+ def _extract_text_body(cls, msg: Any) -> str:
+ """Best-effort extraction of readable body text."""
+ if msg.is_multipart():
+ plain_parts: list[str] = []
+ html_parts: list[str] = []
+ for part in msg.walk():
+ if part.get_content_disposition() == "attachment":
+ continue
+ content_type = part.get_content_type()
+ try:
+ payload = part.get_content()
+ except Exception:
+ payload_bytes = part.get_payload(decode=True) or b""
+ charset = part.get_content_charset() or "utf-8"
+ payload = payload_bytes.decode(charset, errors="replace")
+ if not isinstance(payload, str):
+ continue
+ if content_type == "text/plain":
+ plain_parts.append(payload)
+ elif content_type == "text/html":
+ html_parts.append(payload)
+ if plain_parts:
+ return "\n\n".join(plain_parts).strip()
+ if html_parts:
+ return cls._html_to_text("\n\n".join(html_parts)).strip()
+ return ""
+
+ try:
+ payload = msg.get_content()
+ except Exception:
+ payload_bytes = msg.get_payload(decode=True) or b""
+ charset = msg.get_content_charset() or "utf-8"
+ payload = payload_bytes.decode(charset, errors="replace")
+ if not isinstance(payload, str):
+ return ""
+ if msg.get_content_type() == "text/html":
+ return cls._html_to_text(payload).strip()
+ return payload.strip()
+
+ @staticmethod
+ def _html_to_text(raw_html: str) -> str:
+ text = re.sub(r"<\s*br\s*/?>", "\n", raw_html, flags=re.IGNORECASE)
+ text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
+ text = re.sub(r"<[^>]+>", "", text)
+ return html.unescape(text)
+
+ def _reply_subject(self, base_subject: str) -> str:
+ subject = (base_subject or "").strip() or "nanobot reply"
+ prefix = self.config.subject_prefix or "Re: "
+ if subject.lower().startswith("re:"):
+ return subject
+ return f"{prefix}{subject}"
diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py
index 1501a28..26fa9f3 100644
--- a/nanobot/channels/manager.py
+++ b/nanobot/channels/manager.py
@@ -95,6 +95,17 @@ class ChannelManager:
logger.info("DingTalk channel enabled")
except ImportError as e:
logger.warning(f"DingTalk channel not available: {e}")
+
+ # Email channel
+ if self.config.channels.email.enabled:
+ try:
+ from nanobot.channels.email import EmailChannel
+ self.channels["email"] = EmailChannel(
+ self.config.channels.email, self.bus
+ )
+ logger.info("Email channel enabled")
+ except ImportError as e:
+ logger.warning(f"Email channel not available: {e}")
async def _start_channel(self, name: str, channel: BaseChannel) -> None:
"""Start a channel and log any exceptions."""
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index edea307..aa9729b 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -46,6 +46,36 @@ class DiscordConfig(BaseModel):
gateway_url: str = "wss://gateway.discord.gg/?v=10&encoding=json"
intents: int = 37377 # GUILDS + GUILD_MESSAGES + DIRECT_MESSAGES + MESSAGE_CONTENT
+class EmailConfig(BaseModel):
+ """Email channel configuration (IMAP inbound + SMTP outbound)."""
+ enabled: bool = False
+ consent_granted: bool = False # Explicit owner permission to access mailbox data
+
+ # IMAP (receive)
+ imap_host: str = ""
+ imap_port: int = 993
+ imap_username: str = ""
+ imap_password: str = ""
+ imap_mailbox: str = "INBOX"
+ imap_use_ssl: bool = True
+
+ # SMTP (send)
+ smtp_host: str = ""
+ smtp_port: int = 587
+ smtp_username: str = ""
+ smtp_password: str = ""
+ smtp_use_tls: bool = True
+ smtp_use_ssl: bool = False
+ from_address: str = ""
+
+ # Behavior
+ auto_reply_enabled: bool = True # If false, inbound email is read but no automatic reply is sent
+ poll_interval_seconds: int = 30
+ mark_seen: bool = True
+ max_body_chars: int = 12000
+ subject_prefix: str = "Re: "
+ allow_from: list[str] = Field(default_factory=list) # Allowed sender email addresses
+
class ChannelsConfig(BaseModel):
"""Configuration for chat channels."""
@@ -54,6 +84,7 @@ class ChannelsConfig(BaseModel):
discord: DiscordConfig = Field(default_factory=DiscordConfig)
feishu: FeishuConfig = Field(default_factory=FeishuConfig)
dingtalk: DingTalkConfig = Field(default_factory=DingTalkConfig)
+ email: EmailConfig = Field(default_factory=EmailConfig)
class AgentDefaults(BaseModel):
diff --git a/tests/test_email_channel.py b/tests/test_email_channel.py
new file mode 100644
index 0000000..8b22d8d
--- /dev/null
+++ b/tests/test_email_channel.py
@@ -0,0 +1,311 @@
+from email.message import EmailMessage
+from datetime import date
+
+import pytest
+
+from nanobot.bus.events import OutboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.channels.email import EmailChannel
+from nanobot.config.schema import EmailConfig
+
+
+def _make_config() -> EmailConfig:
+ return EmailConfig(
+ enabled=True,
+ consent_granted=True,
+ imap_host="imap.example.com",
+ imap_port=993,
+ imap_username="bot@example.com",
+ imap_password="secret",
+ smtp_host="smtp.example.com",
+ smtp_port=587,
+ smtp_username="bot@example.com",
+ smtp_password="secret",
+ mark_seen=True,
+ )
+
+
+def _make_raw_email(
+ from_addr: str = "alice@example.com",
+ subject: str = "Hello",
+ body: str = "This is the body.",
+) -> bytes:
+ msg = EmailMessage()
+ msg["From"] = from_addr
+ msg["To"] = "bot@example.com"
+ msg["Subject"] = subject
+ msg["Message-ID"] = ""
+ msg.set_content(body)
+ return msg.as_bytes()
+
+
+def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None:
+ raw = _make_raw_email(subject="Invoice", body="Please pay")
+
+ class FakeIMAP:
+ def __init__(self) -> None:
+ self.store_calls: list[tuple[bytes, str, str]] = []
+
+ def login(self, _user: str, _pw: str):
+ return "OK", [b"logged in"]
+
+ def select(self, _mailbox: str):
+ return "OK", [b"1"]
+
+ def search(self, *_args):
+ return "OK", [b"1"]
+
+ def fetch(self, _imap_id: bytes, _parts: str):
+ return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"]
+
+ def store(self, imap_id: bytes, op: str, flags: str):
+ self.store_calls.append((imap_id, op, flags))
+ return "OK", [b""]
+
+ def logout(self):
+ return "BYE", [b""]
+
+ fake = FakeIMAP()
+ monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ items = channel._fetch_new_messages()
+
+ assert len(items) == 1
+ assert items[0]["sender"] == "alice@example.com"
+ assert items[0]["subject"] == "Invoice"
+ assert "Please pay" in items[0]["content"]
+ assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")]
+
+ # Same UID should be deduped in-process.
+ items_again = channel._fetch_new_messages()
+ assert items_again == []
+
+
+def test_extract_text_body_falls_back_to_html() -> None:
+ msg = EmailMessage()
+ msg["From"] = "alice@example.com"
+ msg["To"] = "bot@example.com"
+ msg["Subject"] = "HTML only"
+ msg.add_alternative("Hello
world
", subtype="html")
+
+ text = EmailChannel._extract_text_body(msg)
+ assert "Hello" in text
+ assert "world" in text
+
+
+@pytest.mark.asyncio
+async def test_start_returns_immediately_without_consent(monkeypatch) -> None:
+ cfg = _make_config()
+ cfg.consent_granted = False
+ channel = EmailChannel(cfg, MessageBus())
+
+ called = {"fetch": False}
+
+ def _fake_fetch():
+ called["fetch"] = True
+ return []
+
+ monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch)
+ await channel.start()
+ assert channel.is_running is False
+ assert called["fetch"] is False
+
+
+@pytest.mark.asyncio
+async def test_send_uses_smtp_and_reply_subject(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.timeout = timeout
+ self.started_tls = False
+ self.logged_in = False
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ self.started_tls = True
+
+ def login(self, _user: str, _pw: str):
+ self.logged_in = True
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ fake_instances: list[FakeSMTP] = []
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ instance = FakeSMTP(host, port, timeout=timeout)
+ fake_instances.append(instance)
+ return instance
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ channel._last_subject_by_chat["alice@example.com"] = "Invoice #42"
+ channel._last_message_id_by_chat["alice@example.com"] = ""
+
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Acknowledged.",
+ )
+ )
+
+ assert len(fake_instances) == 1
+ smtp = fake_instances[0]
+ assert smtp.started_tls is True
+ assert smtp.logged_in is True
+ assert len(smtp.sent_messages) == 1
+ sent = smtp.sent_messages[0]
+ assert sent["Subject"] == "Re: Invoice #42"
+ assert sent["To"] == "alice@example.com"
+ assert sent["In-Reply-To"] == ""
+
+
+@pytest.mark.asyncio
+async def test_send_skips_when_auto_reply_disabled(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ return None
+
+ def login(self, _user: str, _pw: str):
+ return None
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ fake_instances: list[FakeSMTP] = []
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ instance = FakeSMTP(host, port, timeout=timeout)
+ fake_instances.append(instance)
+ return instance
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ cfg = _make_config()
+ cfg.auto_reply_enabled = False
+ channel = EmailChannel(cfg, MessageBus())
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Should not send.",
+ )
+ )
+ assert fake_instances == []
+
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Force send.",
+ metadata={"force_send": True},
+ )
+ )
+ assert len(fake_instances) == 1
+ assert len(fake_instances[0].sent_messages) == 1
+
+
+@pytest.mark.asyncio
+async def test_send_skips_when_consent_not_granted(monkeypatch) -> None:
+ class FakeSMTP:
+ def __init__(self, _host: str, _port: int, timeout: int = 30) -> None:
+ self.sent_messages: list[EmailMessage] = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def starttls(self, context=None):
+ return None
+
+ def login(self, _user: str, _pw: str):
+ return None
+
+ def send_message(self, msg: EmailMessage):
+ self.sent_messages.append(msg)
+
+ called = {"smtp": False}
+
+ def _smtp_factory(host: str, port: int, timeout: int = 30):
+ called["smtp"] = True
+ return FakeSMTP(host, port, timeout=timeout)
+
+ monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory)
+
+ cfg = _make_config()
+ cfg.consent_granted = False
+ channel = EmailChannel(cfg, MessageBus())
+ await channel.send(
+ OutboundMessage(
+ channel="email",
+ chat_id="alice@example.com",
+ content="Should not send.",
+ metadata={"force_send": True},
+ )
+ )
+ assert called["smtp"] is False
+
+
+def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(monkeypatch) -> None:
+ raw = _make_raw_email(subject="Status", body="Yesterday update")
+
+ class FakeIMAP:
+ def __init__(self) -> None:
+ self.search_args = None
+ self.store_calls: list[tuple[bytes, str, str]] = []
+
+ def login(self, _user: str, _pw: str):
+ return "OK", [b"logged in"]
+
+ def select(self, _mailbox: str):
+ return "OK", [b"1"]
+
+ def search(self, *_args):
+ self.search_args = _args
+ return "OK", [b"5"]
+
+ def fetch(self, _imap_id: bytes, _parts: str):
+ return "OK", [(b"5 (UID 999 BODY[] {200})", raw), b")"]
+
+ def store(self, imap_id: bytes, op: str, flags: str):
+ self.store_calls.append((imap_id, op, flags))
+ return "OK", [b""]
+
+ def logout(self):
+ return "BYE", [b""]
+
+ fake = FakeIMAP()
+ monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake)
+
+ channel = EmailChannel(_make_config(), MessageBus())
+ items = channel.fetch_messages_between_dates(
+ start_date=date(2026, 2, 6),
+ end_date=date(2026, 2, 7),
+ limit=10,
+ )
+
+ assert len(items) == 1
+ assert items[0]["subject"] == "Status"
+ # search(None, "SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
+ assert fake.search_args is not None
+ assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026")
+ assert fake.store_calls == []