Fix email channel: skip progress updates and improve deduplication
- Skip progress updates (tool call hints) for email channel to prevent spam - Mark skipped emails (from self/replies) as seen to avoid reprocessing - Track skipped UIDs to prevent checking same emails repeatedly - Reduce log noise by summarizing skipped emails instead of logging each one
This commit is contained in:
parent
a6d70f3d14
commit
2e69dc7ca8
@ -122,6 +122,21 @@ class AgentLoop:
|
||||
# Cron tool (for scheduling)
|
||||
if self.cron_service:
|
||||
self.tools.register(CronTool(self.cron_service))
|
||||
|
||||
# Email tool (if email channel is configured)
|
||||
try:
|
||||
from nanobot.agent.tools.email import EmailTool
|
||||
from nanobot.config.loader import load_config
|
||||
config = load_config()
|
||||
if config.channels.email.enabled:
|
||||
email_tool = EmailTool(email_config=config.channels.email)
|
||||
self.tools.register(email_tool)
|
||||
logger.info(f"Email tool '{email_tool.name}' registered successfully")
|
||||
else:
|
||||
logger.debug("Email tool not registered: email channel not enabled")
|
||||
except Exception as e:
|
||||
logger.warning(f"Email tool not available: {e}")
|
||||
# Email tool not available or not configured - silently skip
|
||||
|
||||
async def _connect_mcp(self) -> None:
|
||||
"""Connect to configured MCP servers (one-time, lazy)."""
|
||||
@ -362,6 +377,9 @@ class AgentLoop:
|
||||
)
|
||||
|
||||
async def _bus_progress(content: str) -> None:
|
||||
# Skip progress updates for email channel to avoid sending intermediate tool call hints as emails
|
||||
if msg.channel == "email":
|
||||
return
|
||||
await self.bus.publish_outbound(OutboundMessage(
|
||||
channel=msg.channel, chat_id=msg.chat_id, content=content,
|
||||
metadata=msg.metadata or {},
|
||||
|
||||
346
nanobot/agent/tools/email.py
Normal file
346
nanobot/agent/tools/email.py
Normal file
@ -0,0 +1,346 @@
|
||||
"""Email tool: read emails from IMAP mailbox."""
|
||||
|
||||
import asyncio
|
||||
import imaplib
|
||||
import ssl
|
||||
from datetime import date
|
||||
from email import policy
|
||||
from email.header import decode_header, make_header
|
||||
from email.parser import BytesParser
|
||||
from email.utils import parseaddr
|
||||
from typing import Any
|
||||
|
||||
from nanobot.agent.tools.base import Tool
|
||||
|
||||
|
||||
class EmailTool(Tool):
|
||||
"""Read emails from configured IMAP mailbox."""
|
||||
|
||||
name = "read_emails"
|
||||
description = "USE THIS TOOL FOR ALL EMAIL QUERIES. When user asks about emails, latest email, email sender, inbox, etc., you MUST call read_emails(). DO NOT use exec() with mail/tail/awk commands. DO NOT use read_file() on /var/mail or memory files. DO NOT try alternative methods. This is the ONLY way to read emails - it connects to IMAP and fetches real-time data. For 'latest email' queries, use limit=1. CRITICAL: When user asks for specific fields like 'From and Subject' or 'sender and subject', extract and return ONLY those fields from the tool output. Do NOT summarize or analyze the email body content unless the user specifically asks for it. If user asks 'give me the from and subject', respond with just: 'From: [email] Subject: [subject]'. Parameters: limit (1-50, default 10, use 1 for latest), unread_only (bool, default false), mark_seen (bool, default false). Returns formatted email list with sender, subject, date, and body."
|
||||
|
||||
def __init__(self, email_config: Any = None):
|
||||
"""
|
||||
Initialize email tool with email configuration.
|
||||
|
||||
Args:
|
||||
email_config: Optional EmailConfig instance. If None, loads from config.
|
||||
"""
|
||||
self._email_config = email_config
|
||||
|
||||
@property
|
||||
def config(self) -> Any:
|
||||
"""Lazy load email config if not provided."""
|
||||
if self._email_config is None:
|
||||
from nanobot.config.loader import load_config
|
||||
config = load_config()
|
||||
self._email_config = config.channels.email
|
||||
return self._email_config
|
||||
|
||||
def coerce_params(self, params: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Coerce parameters, handling common name mismatches."""
|
||||
coerced = super().coerce_params(params)
|
||||
# Map 'count' to 'limit' if limit not present
|
||||
if 'count' in coerced and 'limit' not in coerced:
|
||||
try:
|
||||
coerced['limit'] = int(coerced.pop('count'))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
# Remove unsupported parameters
|
||||
supported = {'limit', 'unread_only', 'mark_seen'}
|
||||
coerced = {k: v for k, v in coerced.items() if k in supported}
|
||||
return coerced
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Number of emails to return. REQUIRED for 'latest email' queries - always use limit=1. For multiple emails use limit=5, limit=10, etc. (default: 10, max: 50)",
|
||||
"minimum": 1,
|
||||
"maximum": 50,
|
||||
},
|
||||
"unread_only": {
|
||||
"type": "boolean",
|
||||
"description": "If true, only return unread emails. If false, returns all emails including read ones (default: false)",
|
||||
},
|
||||
"mark_seen": {
|
||||
"type": "boolean",
|
||||
"description": "If true, mark emails as read after fetching. If false, leave read/unread status unchanged (default: false)",
|
||||
},
|
||||
},
|
||||
"required": [],
|
||||
}
|
||||
|
||||
async def execute(self, limit: int = 10, unread_only: bool = False, mark_seen: bool = False, **kwargs: Any) -> str:
|
||||
"""
|
||||
Read emails from IMAP mailbox.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of emails to return (use limit=1 for latest email)
|
||||
unread_only: If true, only fetch unread emails
|
||||
mark_seen: If true, mark emails as read after fetching
|
||||
**kwargs: Ignore any extra parameters (like count, sort_by, direction)
|
||||
|
||||
Returns:
|
||||
Formatted string with email information
|
||||
"""
|
||||
# Handle common parameter name mismatches (agent sometimes uses 'count' instead of 'limit')
|
||||
# Also handle if count is passed as a positional argument via kwargs
|
||||
if 'count' in kwargs:
|
||||
try:
|
||||
limit = int(kwargs['count'])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
# Also check if limit was passed in kwargs (in case it wasn't a named parameter)
|
||||
if 'limit' in kwargs:
|
||||
try:
|
||||
limit = int(kwargs['limit'])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
# Ignore unsupported parameters like sort_by, direction, reverse, etc.
|
||||
try:
|
||||
config = self.config
|
||||
except Exception as e:
|
||||
return f"Error loading email configuration: {str(e)}"
|
||||
|
||||
if not config:
|
||||
return "Error: Email configuration not found"
|
||||
|
||||
if not hasattr(config, 'enabled') or not config.enabled:
|
||||
return "Error: Email channel is not enabled in configuration. Set NANOBOT_CHANNELS__EMAIL__ENABLED=true"
|
||||
|
||||
if not hasattr(config, 'consent_granted') or not config.consent_granted:
|
||||
return "Error: Email access consent not granted. Set NANOBOT_CHANNELS__EMAIL__CONSENT_GRANTED=true"
|
||||
|
||||
if not hasattr(config, 'imap_host') or not config.imap_host:
|
||||
return "Error: IMAP host not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_HOST"
|
||||
|
||||
if not hasattr(config, 'imap_username') or not config.imap_username:
|
||||
return "Error: IMAP username not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_USERNAME"
|
||||
|
||||
if not hasattr(config, 'imap_password') or not config.imap_password:
|
||||
return "Error: IMAP password not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_PASSWORD"
|
||||
|
||||
# Limit to reasonable maximum
|
||||
try:
|
||||
limit = min(max(1, int(limit)), 50)
|
||||
except (ValueError, TypeError):
|
||||
limit = 10
|
||||
|
||||
try:
|
||||
messages = await asyncio.to_thread(
|
||||
self._fetch_messages,
|
||||
unread_only=unread_only,
|
||||
mark_seen=mark_seen,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
if not messages:
|
||||
if unread_only:
|
||||
return "No unread emails found in your inbox."
|
||||
else:
|
||||
return f"No emails found in your inbox. The mailbox appears to be empty or there was an issue retrieving emails."
|
||||
|
||||
result_parts = [f"Found {len(messages)} email(s):\n"]
|
||||
for i, msg in enumerate(messages, 1):
|
||||
result_parts.append(f"\n--- Email {i} ---")
|
||||
result_parts.append(f"From: {msg['sender']}")
|
||||
result_parts.append(f"Subject: {msg['subject']}")
|
||||
result_parts.append(f"Date: {msg['metadata']['date']}")
|
||||
# Only include body content if specifically requested, otherwise keep it brief
|
||||
result_parts.append(f"\nBody: {msg['content'][:500]}..." if len(msg['content']) > 500 else f"\nBody: {msg['content']}")
|
||||
|
||||
return "\n".join(result_parts)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
error_details = traceback.format_exc()
|
||||
return f"Error reading emails: {str(e)}\n\nDetails: {error_details}"
|
||||
|
||||
def _fetch_messages(
|
||||
self,
|
||||
unread_only: bool,
|
||||
mark_seen: bool,
|
||||
limit: int,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch messages from IMAP mailbox."""
|
||||
messages: list[dict[str, Any]] = []
|
||||
mailbox = self.config.imap_mailbox or "INBOX"
|
||||
|
||||
# Build search criteria
|
||||
if unread_only:
|
||||
search_criteria = ("UNSEEN",)
|
||||
else:
|
||||
search_criteria = ("ALL",)
|
||||
|
||||
# Connect to IMAP server
|
||||
try:
|
||||
if self.config.imap_use_ssl:
|
||||
client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
|
||||
else:
|
||||
client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to connect to IMAP server {self.config.imap_host}:{self.config.imap_port}: {str(e)}")
|
||||
|
||||
try:
|
||||
client.login(self.config.imap_username, self.config.imap_password.strip())
|
||||
except imaplib.IMAP4.error as e:
|
||||
error_msg = str(e)
|
||||
if "AUTHENTICATE" in error_msg.upper() or "LOGIN" in error_msg.upper():
|
||||
raise Exception(
|
||||
f"IMAP authentication failed. Please check:\n"
|
||||
f"1. Your email username: {self.config.imap_username}\n"
|
||||
f"2. Your password/app password is correct\n"
|
||||
f"3. For Gmail: Enable 2-Step Verification and create an App Password at https://myaccount.google.com/apppasswords\n"
|
||||
f"4. IMAP is enabled in your email account settings\n"
|
||||
f"Original error: {error_msg}"
|
||||
)
|
||||
raise
|
||||
|
||||
try:
|
||||
status, _ = client.select(mailbox)
|
||||
if status != "OK":
|
||||
return messages
|
||||
|
||||
status, data = client.search(None, *search_criteria)
|
||||
if status != "OK" or not data:
|
||||
return messages
|
||||
|
||||
ids = data[0].split()
|
||||
if limit > 0 and len(ids) > limit:
|
||||
# Get most recent emails (last N)
|
||||
ids = ids[-limit:]
|
||||
|
||||
for imap_id in ids:
|
||||
status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
|
||||
if status != "OK" or not fetched:
|
||||
continue
|
||||
|
||||
raw_bytes = self._extract_message_bytes(fetched)
|
||||
if raw_bytes is None:
|
||||
continue
|
||||
|
||||
parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
|
||||
sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
|
||||
if not sender:
|
||||
# Try to get display name if email not found
|
||||
from_addr = parsed.get("From", "")
|
||||
sender = from_addr if from_addr else "unknown"
|
||||
|
||||
subject = self._decode_header_value(parsed.get("Subject", ""))
|
||||
date_value = parsed.get("Date", "")
|
||||
message_id = parsed.get("Message-ID", "").strip()
|
||||
body = self._extract_text_body(parsed)
|
||||
|
||||
if not body:
|
||||
body = "(empty email body)"
|
||||
|
||||
# Limit body length
|
||||
max_chars = getattr(self.config, 'max_body_chars', 12000)
|
||||
body = body[:max_chars]
|
||||
|
||||
content = (
|
||||
f"Email received.\n"
|
||||
f"From: {sender}\n"
|
||||
f"Subject: {subject}\n"
|
||||
f"Date: {date_value}\n\n"
|
||||
f"{body}"
|
||||
)
|
||||
|
||||
metadata = {
|
||||
"message_id": message_id,
|
||||
"subject": subject,
|
||||
"date": date_value,
|
||||
"sender_email": sender,
|
||||
}
|
||||
|
||||
messages.append({
|
||||
"sender": sender,
|
||||
"subject": subject,
|
||||
"message_id": message_id,
|
||||
"content": content,
|
||||
"metadata": metadata,
|
||||
})
|
||||
|
||||
if mark_seen:
|
||||
client.store(imap_id, "+FLAGS", "\\Seen")
|
||||
finally:
|
||||
try:
|
||||
client.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return messages
|
||||
|
||||
@staticmethod
|
||||
def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
|
||||
"""Extract raw message bytes from IMAP fetch response."""
|
||||
for item in fetched:
|
||||
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
|
||||
return bytes(item[1])
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _decode_header_value(value: str) -> str:
|
||||
"""Decode email header value (handles encoded words)."""
|
||||
if not value:
|
||||
return ""
|
||||
try:
|
||||
return str(make_header(decode_header(value)))
|
||||
except Exception:
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _extract_text_body(msg: Any) -> str:
|
||||
"""Extract readable text body from email message."""
|
||||
if msg.is_multipart():
|
||||
plain_parts: list[str] = []
|
||||
html_parts: list[str] = []
|
||||
for part in msg.walk():
|
||||
if part.get_content_disposition() == "attachment":
|
||||
continue
|
||||
content_type = part.get_content_type()
|
||||
try:
|
||||
payload = part.get_content()
|
||||
except Exception:
|
||||
payload_bytes = part.get_payload(decode=True) or b""
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
payload = payload_bytes.decode(charset, errors="replace")
|
||||
if not isinstance(payload, str):
|
||||
continue
|
||||
if content_type == "text/plain":
|
||||
plain_parts.append(payload)
|
||||
elif content_type == "text/html":
|
||||
html_parts.append(payload)
|
||||
if plain_parts:
|
||||
return "\n\n".join(plain_parts).strip()
|
||||
if html_parts:
|
||||
# Simple HTML to text conversion
|
||||
import re
|
||||
import html
|
||||
text = re.sub(r"<\s*br\s*/?>", "\n", "\n\n".join(html_parts), flags=re.IGNORECASE)
|
||||
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
|
||||
text = re.sub(r"<[^>]+>", "", text)
|
||||
return html.unescape(text).strip()
|
||||
return ""
|
||||
|
||||
try:
|
||||
payload = msg.get_content()
|
||||
except Exception:
|
||||
payload_bytes = msg.get_payload(decode=True) or b""
|
||||
charset = msg.get_content_charset() or "utf-8"
|
||||
payload = payload_bytes.decode(charset, errors="replace")
|
||||
if not isinstance(payload, str):
|
||||
return ""
|
||||
if msg.get_content_type() == "text/html":
|
||||
import re
|
||||
import html
|
||||
text = re.sub(r"<\s*br\s*/?>", "\n", payload, flags=re.IGNORECASE)
|
||||
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
|
||||
text = re.sub(r"<[^>]+>", "", text)
|
||||
return html.unescape(text).strip()
|
||||
return payload.strip()
|
||||
|
||||
@ -6,6 +6,7 @@ import imaplib
|
||||
import re
|
||||
import smtplib
|
||||
import ssl
|
||||
import uuid
|
||||
from datetime import date
|
||||
from email import policy
|
||||
from email.header import decode_header, make_header
|
||||
@ -57,6 +58,8 @@ class EmailChannel(BaseChannel):
|
||||
self._last_message_id_by_chat: dict[str, str] = {}
|
||||
self._processed_uids: set[str] = set() # Capped to prevent unbounded growth
|
||||
self._MAX_PROCESSED_UIDS = 100000
|
||||
self._sent_message_ids: set[str] = set() # Track Message-IDs of emails we sent to prevent feedback loops
|
||||
self._MAX_SENT_MESSAGE_IDS = 10000
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start polling IMAP for inbound emails."""
|
||||
@ -134,6 +137,12 @@ class EmailChannel(BaseChannel):
|
||||
email_msg["To"] = to_addr
|
||||
email_msg["Subject"] = subject
|
||||
email_msg.set_content(msg.content or "")
|
||||
|
||||
# Generate a Message-ID for the email we're sending (to track and prevent feedback loops)
|
||||
from_email = email_msg["From"]
|
||||
domain = from_email.split("@")[-1] if "@" in from_email else "nanobot.local"
|
||||
message_id = f"<{uuid.uuid4()}@{domain}>"
|
||||
email_msg["Message-ID"] = message_id
|
||||
|
||||
in_reply_to = self._last_message_id_by_chat.get(to_addr)
|
||||
if in_reply_to:
|
||||
@ -142,6 +151,13 @@ class EmailChannel(BaseChannel):
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(self._smtp_send, email_msg)
|
||||
# Track this Message-ID so we can ignore replies to it (prevent feedback loops)
|
||||
self._sent_message_ids.add(message_id)
|
||||
# Trim if too large
|
||||
if len(self._sent_message_ids) > self._MAX_SENT_MESSAGE_IDS:
|
||||
# Remove oldest entries (simple approach: keep recent ones)
|
||||
self._sent_message_ids.clear()
|
||||
logger.debug(f"Sent email with Message-ID: {message_id} to {to_addr}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending email to {to_addr}: {e}")
|
||||
raise
|
||||
@ -248,6 +264,10 @@ class EmailChannel(BaseChannel):
|
||||
ids = data[0].split()
|
||||
if limit > 0 and len(ids) > limit:
|
||||
ids = ids[-limit:]
|
||||
|
||||
our_email = (self.config.from_address or self.config.smtp_username or self.config.imap_username).strip().lower()
|
||||
skipped_count = 0
|
||||
|
||||
for imap_id in ids:
|
||||
status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
|
||||
if status != "OK" or not fetched:
|
||||
@ -265,10 +285,46 @@ class EmailChannel(BaseChannel):
|
||||
sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
|
||||
if not sender:
|
||||
continue
|
||||
|
||||
|
||||
# Skip emails from ourselves (prevent feedback loops)
|
||||
if sender == our_email:
|
||||
# Track skipped UIDs to avoid reprocessing
|
||||
if uid and dedupe:
|
||||
self._processed_uids.add(uid)
|
||||
# Trim if too large
|
||||
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
|
||||
# Remove oldest entries (simple approach: keep recent ones)
|
||||
self._processed_uids.clear()
|
||||
# Mark as seen so it doesn't keep appearing in UNSEEN searches
|
||||
if mark_seen:
|
||||
try:
|
||||
client.store(imap_id, "+FLAGS", "\\Seen")
|
||||
except Exception:
|
||||
pass
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
subject = self._decode_header_value(parsed.get("Subject", ""))
|
||||
date_value = parsed.get("Date", "")
|
||||
message_id = parsed.get("Message-ID", "").strip()
|
||||
in_reply_to = parsed.get("In-Reply-To", "").strip()
|
||||
|
||||
# Skip emails that are replies to emails we sent (prevent feedback loops)
|
||||
if in_reply_to and in_reply_to in self._sent_message_ids:
|
||||
# Track skipped UIDs to avoid reprocessing
|
||||
if uid and dedupe:
|
||||
self._processed_uids.add(uid)
|
||||
if len(self._processed_uids) > self._MAX_PROCESSED_UIDS:
|
||||
self._processed_uids.clear()
|
||||
# Mark as seen so it doesn't keep appearing in UNSEEN searches
|
||||
if mark_seen:
|
||||
try:
|
||||
client.store(imap_id, "+FLAGS", "\\Seen")
|
||||
except Exception:
|
||||
pass
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
body = self._extract_text_body(parsed)
|
||||
|
||||
if not body:
|
||||
@ -313,6 +369,10 @@ class EmailChannel(BaseChannel):
|
||||
client.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Log summary of skipped emails (only if significant number) - reduces log noise
|
||||
if skipped_count > 0:
|
||||
logger.debug(f"Skipped {skipped_count} email(s) from self or replies to our emails")
|
||||
|
||||
return messages
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user