from email.message import EmailMessage from datetime import date import pytest from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.email import EmailChannel from nanobot.config.schema import EmailConfig def _make_config() -> EmailConfig: return EmailConfig( enabled=True, consent_granted=True, imap_host="imap.example.com", imap_port=993, imap_username="bot@example.com", imap_password="secret", smtp_host="smtp.example.com", smtp_port=587, smtp_username="bot@example.com", smtp_password="secret", mark_seen=True, ) def _make_raw_email( from_addr: str = "alice@example.com", subject: str = "Hello", body: str = "This is the body.", ) -> bytes: msg = EmailMessage() msg["From"] = from_addr msg["To"] = "bot@example.com" msg["Subject"] = subject msg["Message-ID"] = "" msg.set_content(body) return msg.as_bytes() def test_fetch_new_messages_parses_unseen_and_marks_seen(monkeypatch) -> None: raw = _make_raw_email(subject="Invoice", body="Please pay") class FakeIMAP: def __init__(self) -> None: self.store_calls: list[tuple[bytes, str, str]] = [] def login(self, _user: str, _pw: str): return "OK", [b"logged in"] def select(self, _mailbox: str): return "OK", [b"1"] def search(self, *_args): return "OK", [b"1"] def fetch(self, _imap_id: bytes, _parts: str): return "OK", [(b"1 (UID 123 BODY[] {200})", raw), b")"] def store(self, imap_id: bytes, op: str, flags: str): self.store_calls.append((imap_id, op, flags)) return "OK", [b""] def logout(self): return "BYE", [b""] fake = FakeIMAP() monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) channel = EmailChannel(_make_config(), MessageBus()) items = channel._fetch_new_messages() assert len(items) == 1 assert items[0]["sender"] == "alice@example.com" assert items[0]["subject"] == "Invoice" assert "Please pay" in items[0]["content"] assert fake.store_calls == [(b"1", "+FLAGS", "\\Seen")] # Same UID should be deduped in-process. items_again = channel._fetch_new_messages() assert items_again == [] def test_extract_text_body_falls_back_to_html() -> None: msg = EmailMessage() msg["From"] = "alice@example.com" msg["To"] = "bot@example.com" msg["Subject"] = "HTML only" msg.add_alternative("

Hello
world

", subtype="html") text = EmailChannel._extract_text_body(msg) assert "Hello" in text assert "world" in text @pytest.mark.asyncio async def test_start_returns_immediately_without_consent(monkeypatch) -> None: cfg = _make_config() cfg.consent_granted = False channel = EmailChannel(cfg, MessageBus()) called = {"fetch": False} def _fake_fetch(): called["fetch"] = True return [] monkeypatch.setattr(channel, "_fetch_new_messages", _fake_fetch) await channel.start() assert channel.is_running is False assert called["fetch"] is False @pytest.mark.asyncio async def test_send_uses_smtp_and_reply_subject(monkeypatch) -> None: class FakeSMTP: def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: self.timeout = timeout self.started_tls = False self.logged_in = False self.sent_messages: list[EmailMessage] = [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def starttls(self, context=None): self.started_tls = True def login(self, _user: str, _pw: str): self.logged_in = True def send_message(self, msg: EmailMessage): self.sent_messages.append(msg) fake_instances: list[FakeSMTP] = [] def _smtp_factory(host: str, port: int, timeout: int = 30): instance = FakeSMTP(host, port, timeout=timeout) fake_instances.append(instance) return instance monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory) channel = EmailChannel(_make_config(), MessageBus()) channel._last_subject_by_chat["alice@example.com"] = "Invoice #42" channel._last_message_id_by_chat["alice@example.com"] = "" await channel.send( OutboundMessage( channel="email", chat_id="alice@example.com", content="Acknowledged.", ) ) assert len(fake_instances) == 1 smtp = fake_instances[0] assert smtp.started_tls is True assert smtp.logged_in is True assert len(smtp.sent_messages) == 1 sent = smtp.sent_messages[0] assert sent["Subject"] == "Re: Invoice #42" assert sent["To"] == "alice@example.com" assert sent["In-Reply-To"] == "" @pytest.mark.asyncio async def test_send_skips_when_auto_reply_disabled(monkeypatch) -> None: class FakeSMTP: def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: self.sent_messages: list[EmailMessage] = [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def starttls(self, context=None): return None def login(self, _user: str, _pw: str): return None def send_message(self, msg: EmailMessage): self.sent_messages.append(msg) fake_instances: list[FakeSMTP] = [] def _smtp_factory(host: str, port: int, timeout: int = 30): instance = FakeSMTP(host, port, timeout=timeout) fake_instances.append(instance) return instance monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory) cfg = _make_config() cfg.auto_reply_enabled = False channel = EmailChannel(cfg, MessageBus()) await channel.send( OutboundMessage( channel="email", chat_id="alice@example.com", content="Should not send.", ) ) assert fake_instances == [] await channel.send( OutboundMessage( channel="email", chat_id="alice@example.com", content="Force send.", metadata={"force_send": True}, ) ) assert len(fake_instances) == 1 assert len(fake_instances[0].sent_messages) == 1 @pytest.mark.asyncio async def test_send_skips_when_consent_not_granted(monkeypatch) -> None: class FakeSMTP: def __init__(self, _host: str, _port: int, timeout: int = 30) -> None: self.sent_messages: list[EmailMessage] = [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def starttls(self, context=None): return None def login(self, _user: str, _pw: str): return None def send_message(self, msg: EmailMessage): self.sent_messages.append(msg) called = {"smtp": False} def _smtp_factory(host: str, port: int, timeout: int = 30): called["smtp"] = True return FakeSMTP(host, port, timeout=timeout) monkeypatch.setattr("nanobot.channels.email.smtplib.SMTP", _smtp_factory) cfg = _make_config() cfg.consent_granted = False channel = EmailChannel(cfg, MessageBus()) await channel.send( OutboundMessage( channel="email", chat_id="alice@example.com", content="Should not send.", metadata={"force_send": True}, ) ) assert called["smtp"] is False def test_fetch_messages_between_dates_uses_imap_since_before_without_mark_seen(monkeypatch) -> None: raw = _make_raw_email(subject="Status", body="Yesterday update") class FakeIMAP: def __init__(self) -> None: self.search_args = None self.store_calls: list[tuple[bytes, str, str]] = [] def login(self, _user: str, _pw: str): return "OK", [b"logged in"] def select(self, _mailbox: str): return "OK", [b"1"] def search(self, *_args): self.search_args = _args return "OK", [b"5"] def fetch(self, _imap_id: bytes, _parts: str): return "OK", [(b"5 (UID 999 BODY[] {200})", raw), b")"] def store(self, imap_id: bytes, op: str, flags: str): self.store_calls.append((imap_id, op, flags)) return "OK", [b""] def logout(self): return "BYE", [b""] fake = FakeIMAP() monkeypatch.setattr("nanobot.channels.email.imaplib.IMAP4_SSL", lambda _h, _p: fake) channel = EmailChannel(_make_config(), MessageBus()) items = channel.fetch_messages_between_dates( start_date=date(2026, 2, 6), end_date=date(2026, 2, 7), limit=10, ) assert len(items) == 1 assert items[0]["subject"] == "Status" # search(None, "SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026") assert fake.search_args is not None assert fake.search_args[1:] == ("SINCE", "06-Feb-2026", "BEFORE", "07-Feb-2026") assert fake.store_calls == []