tanyar09 2e69dc7ca8 Fix email channel: skip progress updates and improve deduplication
- Skip progress updates (tool call hints) for email channel to prevent spam
- Mark skipped emails (from self/replies) as seen to avoid reprocessing
- Track skipped UIDs to prevent checking same emails repeatedly
- Reduce log noise by summarizing skipped emails instead of logging each one
2026-03-05 15:14:56 -05:00

347 lines
15 KiB
Python

"""Email tool: read emails from IMAP mailbox."""
import asyncio
import imaplib
import ssl
from datetime import date
from email import policy
from email.header import decode_header, make_header
from email.parser import BytesParser
from email.utils import parseaddr
from typing import Any
from nanobot.agent.tools.base import Tool
class EmailTool(Tool):
"""Read emails from configured IMAP mailbox."""
name = "read_emails"
description = "USE THIS TOOL FOR ALL EMAIL QUERIES. When user asks about emails, latest email, email sender, inbox, etc., you MUST call read_emails(). DO NOT use exec() with mail/tail/awk commands. DO NOT use read_file() on /var/mail or memory files. DO NOT try alternative methods. This is the ONLY way to read emails - it connects to IMAP and fetches real-time data. For 'latest email' queries, use limit=1. CRITICAL: When user asks for specific fields like 'From and Subject' or 'sender and subject', extract and return ONLY those fields from the tool output. Do NOT summarize or analyze the email body content unless the user specifically asks for it. If user asks 'give me the from and subject', respond with just: 'From: [email] Subject: [subject]'. Parameters: limit (1-50, default 10, use 1 for latest), unread_only (bool, default false), mark_seen (bool, default false). Returns formatted email list with sender, subject, date, and body."
def __init__(self, email_config: Any = None):
"""
Initialize email tool with email configuration.
Args:
email_config: Optional EmailConfig instance. If None, loads from config.
"""
self._email_config = email_config
@property
def config(self) -> Any:
"""Lazy load email config if not provided."""
if self._email_config is None:
from nanobot.config.loader import load_config
config = load_config()
self._email_config = config.channels.email
return self._email_config
def coerce_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Coerce parameters, handling common name mismatches."""
coerced = super().coerce_params(params)
# Map 'count' to 'limit' if limit not present
if 'count' in coerced and 'limit' not in coerced:
try:
coerced['limit'] = int(coerced.pop('count'))
except (ValueError, TypeError):
pass
# Remove unsupported parameters
supported = {'limit', 'unread_only', 'mark_seen'}
coerced = {k: v for k, v in coerced.items() if k in supported}
return coerced
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of emails to return. REQUIRED for 'latest email' queries - always use limit=1. For multiple emails use limit=5, limit=10, etc. (default: 10, max: 50)",
"minimum": 1,
"maximum": 50,
},
"unread_only": {
"type": "boolean",
"description": "If true, only return unread emails. If false, returns all emails including read ones (default: false)",
},
"mark_seen": {
"type": "boolean",
"description": "If true, mark emails as read after fetching. If false, leave read/unread status unchanged (default: false)",
},
},
"required": [],
}
async def execute(self, limit: int = 10, unread_only: bool = False, mark_seen: bool = False, **kwargs: Any) -> str:
"""
Read emails from IMAP mailbox.
Args:
limit: Maximum number of emails to return (use limit=1 for latest email)
unread_only: If true, only fetch unread emails
mark_seen: If true, mark emails as read after fetching
**kwargs: Ignore any extra parameters (like count, sort_by, direction)
Returns:
Formatted string with email information
"""
# Handle common parameter name mismatches (agent sometimes uses 'count' instead of 'limit')
# Also handle if count is passed as a positional argument via kwargs
if 'count' in kwargs:
try:
limit = int(kwargs['count'])
except (ValueError, TypeError):
pass
# Also check if limit was passed in kwargs (in case it wasn't a named parameter)
if 'limit' in kwargs:
try:
limit = int(kwargs['limit'])
except (ValueError, TypeError):
pass
# Ignore unsupported parameters like sort_by, direction, reverse, etc.
try:
config = self.config
except Exception as e:
return f"Error loading email configuration: {str(e)}"
if not config:
return "Error: Email configuration not found"
if not hasattr(config, 'enabled') or not config.enabled:
return "Error: Email channel is not enabled in configuration. Set NANOBOT_CHANNELS__EMAIL__ENABLED=true"
if not hasattr(config, 'consent_granted') or not config.consent_granted:
return "Error: Email access consent not granted. Set NANOBOT_CHANNELS__EMAIL__CONSENT_GRANTED=true"
if not hasattr(config, 'imap_host') or not config.imap_host:
return "Error: IMAP host not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_HOST"
if not hasattr(config, 'imap_username') or not config.imap_username:
return "Error: IMAP username not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_USERNAME"
if not hasattr(config, 'imap_password') or not config.imap_password:
return "Error: IMAP password not configured. Set NANOBOT_CHANNELS__EMAIL__IMAP_PASSWORD"
# Limit to reasonable maximum
try:
limit = min(max(1, int(limit)), 50)
except (ValueError, TypeError):
limit = 10
try:
messages = await asyncio.to_thread(
self._fetch_messages,
unread_only=unread_only,
mark_seen=mark_seen,
limit=limit,
)
if not messages:
if unread_only:
return "No unread emails found in your inbox."
else:
return f"No emails found in your inbox. The mailbox appears to be empty or there was an issue retrieving emails."
result_parts = [f"Found {len(messages)} email(s):\n"]
for i, msg in enumerate(messages, 1):
result_parts.append(f"\n--- Email {i} ---")
result_parts.append(f"From: {msg['sender']}")
result_parts.append(f"Subject: {msg['subject']}")
result_parts.append(f"Date: {msg['metadata']['date']}")
# Only include body content if specifically requested, otherwise keep it brief
result_parts.append(f"\nBody: {msg['content'][:500]}..." if len(msg['content']) > 500 else f"\nBody: {msg['content']}")
return "\n".join(result_parts)
except Exception as e:
import traceback
error_details = traceback.format_exc()
return f"Error reading emails: {str(e)}\n\nDetails: {error_details}"
def _fetch_messages(
self,
unread_only: bool,
mark_seen: bool,
limit: int,
) -> list[dict[str, Any]]:
"""Fetch messages from IMAP mailbox."""
messages: list[dict[str, Any]] = []
mailbox = self.config.imap_mailbox or "INBOX"
# Build search criteria
if unread_only:
search_criteria = ("UNSEEN",)
else:
search_criteria = ("ALL",)
# Connect to IMAP server
try:
if self.config.imap_use_ssl:
client = imaplib.IMAP4_SSL(self.config.imap_host, self.config.imap_port)
else:
client = imaplib.IMAP4(self.config.imap_host, self.config.imap_port)
except Exception as e:
raise Exception(f"Failed to connect to IMAP server {self.config.imap_host}:{self.config.imap_port}: {str(e)}")
try:
client.login(self.config.imap_username, self.config.imap_password.strip())
except imaplib.IMAP4.error as e:
error_msg = str(e)
if "AUTHENTICATE" in error_msg.upper() or "LOGIN" in error_msg.upper():
raise Exception(
f"IMAP authentication failed. Please check:\n"
f"1. Your email username: {self.config.imap_username}\n"
f"2. Your password/app password is correct\n"
f"3. For Gmail: Enable 2-Step Verification and create an App Password at https://myaccount.google.com/apppasswords\n"
f"4. IMAP is enabled in your email account settings\n"
f"Original error: {error_msg}"
)
raise
try:
status, _ = client.select(mailbox)
if status != "OK":
return messages
status, data = client.search(None, *search_criteria)
if status != "OK" or not data:
return messages
ids = data[0].split()
if limit > 0 and len(ids) > limit:
# Get most recent emails (last N)
ids = ids[-limit:]
for imap_id in ids:
status, fetched = client.fetch(imap_id, "(BODY.PEEK[] UID)")
if status != "OK" or not fetched:
continue
raw_bytes = self._extract_message_bytes(fetched)
if raw_bytes is None:
continue
parsed = BytesParser(policy=policy.default).parsebytes(raw_bytes)
sender = parseaddr(parsed.get("From", ""))[1].strip().lower()
if not sender:
# Try to get display name if email not found
from_addr = parsed.get("From", "")
sender = from_addr if from_addr else "unknown"
subject = self._decode_header_value(parsed.get("Subject", ""))
date_value = parsed.get("Date", "")
message_id = parsed.get("Message-ID", "").strip()
body = self._extract_text_body(parsed)
if not body:
body = "(empty email body)"
# Limit body length
max_chars = getattr(self.config, 'max_body_chars', 12000)
body = body[:max_chars]
content = (
f"Email received.\n"
f"From: {sender}\n"
f"Subject: {subject}\n"
f"Date: {date_value}\n\n"
f"{body}"
)
metadata = {
"message_id": message_id,
"subject": subject,
"date": date_value,
"sender_email": sender,
}
messages.append({
"sender": sender,
"subject": subject,
"message_id": message_id,
"content": content,
"metadata": metadata,
})
if mark_seen:
client.store(imap_id, "+FLAGS", "\\Seen")
finally:
try:
client.logout()
except Exception:
pass
return messages
@staticmethod
def _extract_message_bytes(fetched: list[Any]) -> bytes | None:
"""Extract raw message bytes from IMAP fetch response."""
for item in fetched:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], (bytes, bytearray)):
return bytes(item[1])
return None
@staticmethod
def _decode_header_value(value: str) -> str:
"""Decode email header value (handles encoded words)."""
if not value:
return ""
try:
return str(make_header(decode_header(value)))
except Exception:
return value
@staticmethod
def _extract_text_body(msg: Any) -> str:
"""Extract readable text body from email message."""
if msg.is_multipart():
plain_parts: list[str] = []
html_parts: list[str] = []
for part in msg.walk():
if part.get_content_disposition() == "attachment":
continue
content_type = part.get_content_type()
try:
payload = part.get_content()
except Exception:
payload_bytes = part.get_payload(decode=True) or b""
charset = part.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
continue
if content_type == "text/plain":
plain_parts.append(payload)
elif content_type == "text/html":
html_parts.append(payload)
if plain_parts:
return "\n\n".join(plain_parts).strip()
if html_parts:
# Simple HTML to text conversion
import re
import html
text = re.sub(r"<\s*br\s*/?>", "\n", "\n\n".join(html_parts), flags=re.IGNORECASE)
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
return ""
try:
payload = msg.get_content()
except Exception:
payload_bytes = msg.get_payload(decode=True) or b""
charset = msg.get_content_charset() or "utf-8"
payload = payload_bytes.decode(charset, errors="replace")
if not isinstance(payload, str):
return ""
if msg.get_content_type() == "text/html":
import re
import html
text = re.sub(r"<\s*br\s*/?>", "\n", payload, flags=re.IGNORECASE)
text = re.sub(r"<\s*/\s*p\s*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return html.unescape(text).strip()
return payload.strip()