nanobot/nanobot/agent/tools/calendar.py
tanyar09 4f50cfac3c Add multi-bot Docker setup and improve MCP/tool reliability
Document and add multi-bot Docker workflows with env layering scripts, and update agent/tool configuration handling to make MCP/email/calendar behavior more robust for day-to-day operations.

Made-with: Cursor
2026-03-27 13:06:24 -04:00

1150 lines
52 KiB
Python

"""Calendar tool: interact with Google Calendar."""
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from google.auth.exceptions import RefreshError
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from loguru import logger
from nanobot.agent.tools.base import Tool
# Scopes required for Google Calendar API
SCOPES = ["https://www.googleapis.com/auth/calendar"]
# Appended to failures so the model does not invent meetings when the API/auth did not succeed.
_NO_CALENDAR_DATA_FOR_MODEL = (
"\n\n[NO_CALENDAR_DATA] Google Calendar was not read successfully. "
"You MUST NOT infer, guess, or invent meetings, times, locations, or titles. "
"Tell the user only that calendar could not be accessed and what they should fix "
"(e.g. credentials_file, OAuth / calendar_token.json)."
)
class CalendarTool(Tool):
"""Tool to interact with Google Calendar."""
name = "calendar"
description = (
"Interact with Google Calendar. REQUIRED: Always include 'action' parameter.\n\n"
"Actions:\n"
"- list_events: List upcoming events with details (title, time, ID). ALWAYS call this FIRST before update_event or delete_event.\n"
"- create_event: Create new event. Requires title and start_time.\n"
"- update_event: Update/reschedule an event. Requires event_id (get from list_events) and start_time (new time).\n"
"- delete_event: Delete a single event. Requires event_id (get from list_events).\n"
"- delete_events: Delete multiple events. Requires event_ids array (get from list_events).\n"
"- check_availability: Check if a time slot is available.\n\n"
"CRITICAL WORKFLOW for deletion (cancel/delete meetings):\n"
"DO NOT EXPLAIN - EXECUTE IMMEDIATELY:\n"
"1. IMMEDIATELY call calendar(action='list_events', time_min='today') - DO NOT explain, just execute\n"
"2. Extract ALL event IDs from the response (long strings after '[ID: ' or in 'Event IDs:' line)\n"
"3. IMMEDIATELY call calendar(action='delete_events', event_ids=[...]) with the extracted IDs\n"
"NEVER use placeholder values like 'ID1', '[get event ID...]', or 'list_events'.\n"
"NEVER explain what you will do - just execute the tools immediately.\n"
"When you call this tool, the system will execute it automatically. Do not show JSON in your response - just call the tool.\n\n"
"If the tool result contains [NO_CALENDAR_DATA] or starts with 'Error:' and describes auth/API failure, "
"you have zero reliable calendar information—report the problem only; never make up events."
)
def __init__(self, calendar_config: Any = None):
"""
Initialize calendar tool with calendar configuration.
Args:
calendar_config: Optional CalendarConfig instance. If None, loads from config.
"""
self._calendar_config = calendar_config
self._service = None
self._calendar_refresh_rejected = False
@property
def config(self) -> Any:
"""Lazy load calendar config if not provided."""
if self._calendar_config is None:
from nanobot.config.loader import load_config
config = load_config()
self._calendar_config = config.tools.calendar
return self._calendar_config
def _get_timezone(self):
"""Get configured timezone or default to UTC."""
import pytz
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
try:
return pytz.timezone(tz_str)
except Exception:
return pytz.UTC
def _get_credentials(self) -> Credentials | None:
"""Get valid user credentials from storage or OAuth flow."""
self._calendar_refresh_rejected = False
config = self.config
if not config.enabled:
return None
if not config.credentials_file:
return None
creds_path = Path(config.credentials_file).expanduser()
if not creds_path.exists():
return None
# Determine token file path
if config.token_file:
token_path = Path(config.token_file).expanduser()
else:
token_path = Path.home() / ".nanobot" / "calendar_token.json"
token_path.parent.mkdir(parents=True, exist_ok=True)
creds = None
# Load existing token if available
if token_path.exists():
try:
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
except Exception:
pass
# If there are no (valid) credentials available, let the user log in
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except RefreshError as e:
self._calendar_refresh_rejected = True
logger.warning("Google Calendar OAuth refresh failed: {}", e)
creds = None
except Exception:
creds = None
if not creds:
try:
with open(creds_path, "r") as f:
client_config = json.load(f)
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
creds = flow.run_local_server(port=0)
except Exception:
return None
# Save the credentials for the next run
try:
with open(token_path, "w") as f:
f.write(creds.to_json())
except Exception:
pass
return creds
def _get_service(self):
"""Get Google Calendar service instance."""
if self._service is None:
creds = self._get_credentials()
if not creds:
return None
self._service = build("calendar", "v3", credentials=creds)
return self._service
def coerce_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Coerce parameters, handling common name mismatches."""
# Handle nested "parameters" key (some LLMs wrap params this way)
if "parameters" in params and isinstance(params["parameters"], dict):
# Extract nested parameters and merge with top-level
nested = params.pop("parameters")
params = {**params, **nested}
# Remove common non-parameter keys that LLMs sometimes include
params.pop("function", None)
params.pop("functionName", None)
params.pop("function_name", None)
params.pop("tz", None) # Not a valid parameter
coerced = super().coerce_params(params)
# Handle case where action is passed as a string argument instead of named parameter
# e.g., calendar("calendar") or calendar("list_events")
if "action" not in coerced and len(coerced) == 1:
# Check if there's a single string value that could be the action
for key, value in coerced.items():
if isinstance(value, str) and value in ["list_events", "create_event", "delete_event", "delete_events", "update_event", "check_availability", "calendar"]:
coerced["action"] = "list_events" if value == "calendar" else value
coerced.pop(key, None)
break
return coerced
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list_events", "create_event", "delete_event", "delete_events", "update_event", "check_availability"],
"description": "Action to perform: list_events (list upcoming events with details), create_event (create a new event), delete_event (delete a single event by ID), delete_events (delete multiple events by IDs), update_event (update/reschedule an event - requires event_id), check_availability (check if time slot is available)",
},
"event_id": {
"type": "string",
"description": "Event ID (required for delete_event). Get event IDs by calling list_events first.",
},
"event_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of event IDs (required for delete_events). Get event IDs by calling list_events first.",
},
"title": {
"type": "string",
"description": "Event title/summary (required for create_event)",
},
"start_time": {
"type": "string",
"description": (
"Event start time. REQUIRED for create_event. "
"Use natural language formats like 'March 7 15:00', 'tomorrow 2pm', 'today at 5pm', "
"or relative formats like 'in 1 hour'. "
"DO NOT use ISO format - the tool will parse natural language and handle the current year automatically. "
"Examples: 'March 7 15:00', 'tomorrow at 2pm', 'today 18:00'"
),
},
"end_time": {
"type": "string",
"description": "Event end time in ISO format (YYYY-MM-DDTHH:MM:SS) or relative. If not provided, defaults to 1 hour after start_time.",
},
"description": {
"type": "string",
"description": "Event description/details",
},
"location": {
"type": "string",
"description": "Event location",
},
"attendees": {
"type": "array",
"items": {"type": "string"},
"description": "List of attendee email addresses",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (for list_events, default: 10)",
"default": 10,
},
"time_min": {
"type": "string",
"description": "Lower bound (exclusive) for an event's end time (ISO format) for list_events. Defaults to now.",
},
},
"required": ["action"],
}
def _parse_time(self, time_str: str) -> datetime:
"""Parse time string (ISO format or relative like 'tomorrow 2pm').
Returns timezone-aware datetime in the configured timezone (or UTC if not configured).
"""
import re
from datetime import timezone
import pytz
# Get configured timezone or default to UTC
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
try:
tz = pytz.timezone(tz_str)
except Exception:
# Invalid timezone, fall back to UTC
tz = pytz.UTC
original_str = time_str
time_str = time_str.strip().lower()
# Try ISO format first, but validate year is reasonable
now = datetime.now(tz)
try:
dt = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
# Ensure timezone-aware - if no timezone, assume configured timezone
if dt.tzinfo is None:
dt = tz.localize(dt)
# Validate year is current year or future (prevent scheduling in past years)
current_year = now.year
if dt.year < current_year:
# If year is in the past, assume user meant current year
dt = dt.replace(year=current_year)
# If still in the past, use next year
if dt < now:
dt = dt.replace(year=current_year + 1)
return dt
except ValueError:
pass
# Try parsing month names (e.g., "March 7 15:00", "March 7th at 3pm")
month_names = {
"january": 1, "jan": 1, "february": 2, "feb": 2, "march": 3, "mar": 3,
"april": 4, "apr": 4, "may": 5, "june": 6, "jun": 6, "july": 7, "jul": 7,
"august": 8, "aug": 8, "september": 9, "sep": 9, "sept": 9,
"october": 10, "oct": 10, "november": 11, "nov": 11, "december": 12, "dec": 12,
}
# Pattern: "March 7 15:00" or "March 7th at 3pm" or "on March 7 at 15:00"
month_patterns = [
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?\s+(?:at\s+)?(\d{1,2}):(\d{2})", # 24-hour
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?\s+(?:at\s+)?(\d{1,2})\s*(am|pm)", # 12-hour
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?", # Date only
]
for pattern in month_patterns:
match = re.search(pattern, time_str, re.IGNORECASE)
if match:
groups = match.groups()
month_name = groups[0].lower()
if month_name in month_names:
month = month_names[month_name]
day = int(groups[1])
year = now.year
# Check if date is in the past
test_date = tz.localize(datetime(year, month, day))
if test_date < now.replace(hour=0, minute=0, second=0, microsecond=0):
year += 1
if len(groups) >= 4 and groups[2] and groups[3]:
if groups[3].lower() in ['am', 'pm']:
# 12-hour format
hour = int(groups[2])
period = groups[3].lower()
minute = 0
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
else:
# 24-hour format
hour = int(groups[2])
minute = int(groups[3])
result = tz.localize(datetime(year, month, day, hour, minute))
return result
else:
# Date only, default to 9am
result = tz.localize(datetime(year, month, day, 9, 0))
return result
# Parse relative times
now = datetime.now(tz)
if time_str.startswith("in "):
# "in 1 hour", "in 30 minutes"
parts = time_str[3:].split()
if len(parts) >= 2:
try:
amount = int(parts[0])
unit = parts[1]
if "hour" in unit:
return now + timedelta(hours=amount)
elif "minute" in unit:
return now + timedelta(minutes=amount)
elif "day" in unit:
return now + timedelta(days=amount)
except ValueError:
pass
# Handle "today" - same logic as "tomorrow" but use today's date
if "today" in time_str:
base = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Try to extract time - same patterns as tomorrow
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If time has passed today, it's invalid - return error or use tomorrow?
if result < now:
# Time has passed, schedule for tomorrow instead
result = result + timedelta(days=1)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm)
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 3: "17:00" or "today 17:00" (24-hour format with colon, no am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 4: "at 17:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 5: "at 5" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result < now:
result = result + timedelta(days=1)
return result
# Default to 9am today (or tomorrow if 9am has passed)
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
if result < now:
result = result + timedelta(days=1)
return result
if "tomorrow" in time_str:
base = now + timedelta(days=1)
# Try to extract time - improved regex to handle various formats
# Try patterns in order of specificity (most specific first)
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm) - must check this before 24-hour patterns
# to avoid matching "14" in "14:00" as "14pm"
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
return result
# Pattern 3: "14:00" or "tomorrow 14:00" (24-hour format with colon, no am/pm)
# This handles cases where the agent converts "2pm" to "14:00"
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
return result
# Pattern 4: "at 2:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
return result
# Pattern 5: "at 2" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
return result
# Default to 9am if no time specified
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
return result
# Default: assume it's today at the specified time or now
return now
def _parse_time_with_date(self, time_str: str, base_date: Any) -> datetime:
"""Parse a time string (like '4pm') in the context of a specific date.
This is used when updating events - if user says 'move to 4pm',
we want to use the original event's date, not 'today' relative to now.
"""
import re
from datetime import date
tz = self._get_timezone()
# Create a datetime for the base date at midnight in the configured timezone
base_dt = tz.localize(datetime.combine(base_date, datetime.min.time()))
time_str_lower = time_str.strip().lower()
# Pattern 1: "4pm" or "4 pm" (hour only with am/pm)
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str_lower, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
return base_dt.replace(hour=hour, minute=0, second=0, microsecond=0)
# Pattern 2: "4:00pm" or "4:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str_lower, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
return base_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
# Pattern 3: "16:00" (24-hour format)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str_lower, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
return base_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If no simple time pattern matches, fall back to full _parse_time
# but adjust the date to match base_date
parsed = self._parse_time(time_str)
if parsed.date() != base_date:
# Adjust to use the base_date, keeping the time
return parsed.replace(year=base_date.year, month=base_date.month, day=base_date.day)
return parsed
async def execute(
self,
action: str,
title: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
description: str | None = None,
location: str | None = None,
attendees: list[str] | None = None,
max_results: int = 10,
time_min: str | None = None,
event_id: str | None = None,
event_ids: list[str] | None = None,
**kwargs: Any,
) -> str:
"""
Execute calendar operation.
Args:
action: Action to perform (list_events, create_event, check_availability)
title: Event title (for create_event)
start_time: Start time in ISO format or relative (for create_event)
end_time: End time in ISO format or relative (for create_event)
description: Event description
location: Event location
attendees: List of attendee email addresses
max_results: Maximum number of events to return (for list_events)
time_min: Lower bound for event end time (for list_events)
**kwargs: Ignore extra parameters
Returns:
Result string
"""
config = self.config
if not config.enabled:
return (
"Error: Calendar is not enabled. Set NANOBOT_TOOLS__CALENDAR__ENABLED=true"
+ _NO_CALENDAR_DATA_FOR_MODEL
)
service = self._get_service()
if not service:
msg = (
"Error: Could not authenticate with Google Calendar. "
"Please ensure credentials_file is configured and valid. "
"You may need to run OAuth flow once to authorize access."
)
if self._calendar_refresh_rejected:
msg += (
" Google rejected the saved refresh token (expired or revoked). "
"Re-authorize on a machine with a browser and replace ~/.nanobot/calendar_token.json on the host, "
"then restart the container. If the token file is mounted :ro, refreshes cannot be persisted—"
"use rw or re-auth on the host when that happens."
)
return msg + _NO_CALENDAR_DATA_FOR_MODEL
calendar_id = config.calendar_id or "primary"
try:
if action == "list_events":
return await self._list_events(service, calendar_id, max_results, time_min)
elif action == "create_event":
if not title:
return "Error: title is required for create_event"
if not start_time:
return "Error: start_time is required for create_event"
return await self._create_event(
service,
calendar_id,
title,
start_time,
end_time,
description,
location,
attendees,
)
elif action == "update_event":
if not event_id:
return (
"ERROR: event_id is required for update_event. "
"YOU MUST call calendar(action='list_events', time_min='today') FIRST to get the actual event ID. "
"Do NOT use placeholder values."
)
# Validate event_id is not a placeholder
if any(placeholder in event_id.lower() for placeholder in [
"get event", "from calendar", "list_events", "<id", "placeholder",
"[get", "event id", "id of", "id from", "[id from", "previous call"
]) or len(event_id) < 15:
return (
f"STOP: Invalid event_id '{event_id}' - this is a placeholder, not a real ID. "
"You MUST call calendar(action='list_events', time_min='today') NOW to get the actual event ID. "
"Do NOT explain - just call list_events immediately, then extract the ID from the response and call update_event again."
)
# Check if user wants to cancel (via status parameter in kwargs)
status = kwargs.get("status", "").lower() if kwargs else ""
if status == "cancelled":
# To cancel a meeting, we delete it
return await self._delete_event(service, calendar_id, event_id)
# For update_event, start_time and end_time are the new times
return await self._update_event(
service,
calendar_id,
event_id,
title,
start_time, # New start time
end_time, # New end time
description,
location,
attendees,
)
elif action == "delete_event":
if not event_id:
return (
"STOP: event_id is required for delete_event. "
"DO NOT EXPLAIN. IMMEDIATELY call calendar(action='list_events', time_min='today') NOW. "
"Then extract event IDs from the response and call delete_events (plural) with those IDs."
)
# Validate event_id is not a placeholder
if any(placeholder in event_id.lower() for placeholder in [
"get event", "from calendar", "list_events", "<id", "placeholder",
"[get", "event id", "id of", "id from", "[id from", "previous call", "list events call"
]) or len(event_id) < 15:
return (
f"STOP: Invalid event_id '{event_id}' - this is a placeholder, not a real ID. "
f"DO NOT EXPLAIN. IMMEDIATELY call calendar(action='list_events', time_min='today') NOW. "
f"Then extract event IDs from the response and call delete_events (plural) with those IDs. "
f"Event IDs are long alphanumeric strings (20+ characters, no spaces) from the list_events response."
)
return await self._delete_event(service, calendar_id, event_id)
elif action == "delete_events":
if not event_ids:
return (
"STOP: event_ids (array) is required for delete_events. "
"DO NOT EXPLAIN. IMMEDIATELY call calendar(action='list_events', time_min='today') NOW. "
"Then extract ALL event IDs from the response (they appear after '[ID: ' or in the 'Event IDs:' line) "
"and call delete_events with those IDs."
)
return await self._delete_events(service, calendar_id, event_ids)
elif action == "check_availability":
if not start_time:
return "Error: start_time is required for check_availability"
return await self._check_availability(service, calendar_id, start_time, end_time)
else:
return f"Error: Unknown action '{action}'. Use 'list_events', 'create_event', 'delete_event', 'delete_events', 'update_event', or 'check_availability'"
except HttpError as e:
return f"Error accessing Google Calendar API: {e}" + _NO_CALENDAR_DATA_FOR_MODEL
except Exception as e:
return f"Error: {str(e)}" + _NO_CALENDAR_DATA_FOR_MODEL
async def _list_events(
self, service: Any, calendar_id: str, max_results: int, time_min: str | None
) -> str:
"""List upcoming events."""
import asyncio
def _list():
now = datetime.utcnow().isoformat() + "Z"
# Handle "now", "today", or None - convert to ISO format
if not time_min or time_min.lower() in ["now", "today"]:
time_min_str = now
elif time_min.endswith("Z") or "+" in time_min or "-" in time_min[-6:]:
# Already in ISO format
time_min_str = time_min
else:
# Try to parse as natural language and convert to ISO
try:
parsed_dt = self._parse_time(time_min)
time_min_str = parsed_dt.isoformat()
except Exception:
# If parsing fails, use current time
time_min_str = now
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
maxResults=max_results,
singleEvents=True,
orderBy="startTime",
# Request additional fields to get attendees and attachments
fields="items(id,summary,start,end,description,location,attendees(email,responseStatus),attachments(fileUrl,title))",
)
.execute()
)
events = events_result.get("items", [])
if not events:
return "No upcoming events found."
result = [f"Found {len(events)} upcoming event(s):\n"]
event_ids = []
for idx, event in enumerate(events, 1):
start = event["start"].get("dateTime", event["start"].get("date"))
end = event["end"].get("dateTime", event["end"].get("date"))
title = event.get("summary", "No title")
event_id = event.get("id", "")
event_ids.append(event_id)
# Extract additional information
description = event.get("description", "")
location = event.get("location", "")
attendees = event.get("attendees", [])
attachments = event.get("attachments", [])
# Add position indicator - "last" means the one with the latest time (usually the last in list)
position_note = ""
if idx == len(events):
position_note = " (LAST - latest time)"
elif idx == 1:
position_note = " (FIRST - earliest time)"
# Format: include numbered list with position indicators
result.append(f"{idx}. {title} ({start} - {end}) [ID: {event_id}]{position_note}")
# Add location if present
if location:
result.append(f" Location: {location}")
# Add attendees if present
if attendees:
attendee_emails = [att.get("email", "") for att in attendees if att.get("email")]
if attendee_emails:
result.append(f" Attendees: {', '.join(attendee_emails)}")
# Add attachments if present
if attachments:
attachment_info = []
for att in attachments:
file_name = att.get("fileUrl", "").split("/")[-1] if att.get("fileUrl") else att.get("title", "Unknown file")
attachment_info.append(file_name)
if attachment_info:
result.append(f" Attachments: {', '.join(attachment_info)}")
# Add description if present (truncate if too long)
if description:
desc_preview = description[:100] + "..." if len(description) > 100 else description
result.append(f" Description: {desc_preview}")
# Also include a summary line with all IDs for easy extraction
result.append(f"\nEvent IDs: {', '.join(event_ids)}")
result.append(f"\nNote: 'last meeting' means the meeting with the latest time (usually #{len(events)} in the list above).")
return "\n".join(result)
return await asyncio.to_thread(_list)
async def _create_event(
self,
service: Any,
calendar_id: str,
title: str,
start_time: str,
end_time: str | None,
description: str | None,
location: str | None,
attendees: list[str] | None,
) -> str:
"""Create a new calendar event."""
import asyncio
def _create():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1) # Default 1 hour
# Get configured timezone
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
# Validate that start time is not in the past
tz = self._get_timezone()
now = datetime.now(tz)
if start_dt < now:
raise ValueError(
f"Cannot schedule event in the past. Start time ({start_dt}) is before current time ({now}). "
f"Parsed from start_time='{start_time}'. "
f"Tip: Use natural language formats like 'March 7 15:00' or 'tomorrow 2pm' instead of ISO format."
)
# Validate time range
if end_dt <= start_dt:
raise ValueError(
f"Invalid time range: end time ({end_dt}) must be after start time ({start_dt}). "
f"Parsed from start_time='{start_time}', end_time='{end_time}'"
)
event = {
"summary": title,
"start": {
"dateTime": start_dt.isoformat(),
"timeZone": tz_str,
},
"end": {
"dateTime": end_dt.isoformat(),
"timeZone": tz_str,
},
}
if description:
event["description"] = description
if location:
event["location"] = location
if attendees:
event["attendees"] = [{"email": email} for email in attendees]
try:
created_event = service.events().insert(calendarId=calendar_id, body=event).execute()
return f"Event created: {created_event.get('htmlLink')}"
except HttpError as e:
error_details = e.error_details if hasattr(e, 'error_details') else str(e)
error_msg = str(e)
# Provide more detailed error information
return (
f"Error creating calendar event: {error_msg}. "
f"Details: {error_details}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
except Exception as e:
return (
f"Error creating calendar event: {str(e)}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
return await asyncio.to_thread(_create)
async def _check_availability(
self, service: Any, calendar_id: str, start_time: str, end_time: str | None
) -> str:
"""Check if a time slot is available."""
import asyncio
def _check():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1)
time_min_str = start_dt.isoformat() + "Z"
time_max_str = end_dt.isoformat() + "Z"
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
timeMax=time_max_str,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if events:
conflicts = [e.get("summary", "Untitled") for e in events]
return f"Time slot is NOT available. Conflicts with: {', '.join(conflicts)}"
else:
return f"Time slot is available ({start_dt} to {end_dt})"
return await asyncio.to_thread(_check)
async def _delete_event(
self, service: Any, calendar_id: str, event_id: str
) -> str:
"""Delete a calendar event by ID."""
import asyncio
def _delete():
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
return f"Event deleted successfully (ID: {event_id})"
except HttpError as e:
if e.resp.status == 404:
return f"Error: Event not found (ID: {event_id}). It may have already been deleted."
return f"Error deleting event: {e}"
except Exception as e:
return f"Error deleting event: {str(e)}"
return await asyncio.to_thread(_delete)
async def _delete_events(
self, service: Any, calendar_id: str, event_ids: list[str]
) -> str:
"""Delete multiple calendar events by IDs."""
import asyncio
def _delete_all():
# Filter out placeholder/invalid IDs
valid_ids = []
invalid_ids = []
for event_id in event_ids:
if not isinstance(event_id, str):
invalid_ids.append(str(event_id))
continue
# Check for common invalid patterns:
# 1. Placeholders like "ID1", "ID2", etc.
# 2. Error messages or instructions (contain common words)
# 3. Too short (real Google Calendar IDs are typically 20+ characters)
# 4. Contains spaces (real IDs don't have spaces)
invalid_patterns = [
event_id.upper() in ["ID1", "ID2", "ID3", "ID4", "ID5"],
len(event_id) < 15, # Real Google Calendar IDs are longer
" " in event_id, # Real IDs don't have spaces
any(word in event_id.lower() for word in [
"get", "event", "id", "by", "calling", "list", "first",
"extract", "from", "response", "error", "invalid"
]), # Likely an instruction/error message
]
if any(invalid_patterns):
invalid_ids.append(event_id)
else:
valid_ids.append(event_id)
if invalid_ids:
return (
f"ERROR: Invalid event IDs detected: {invalid_ids}. "
f"STOP and call calendar(action='list_events', time_min='today') FIRST to get actual event IDs. "
f"Do NOT call delete_events again until you have called list_events and extracted the real IDs. "
f"Event IDs are long alphanumeric strings (20+ characters, no spaces) from the list_events response."
)
if not valid_ids:
return (
"ERROR: No valid event IDs provided. "
"YOU MUST call calendar(action='list_events', time_min='today') FIRST. "
"Do NOT call delete_events again. Call list_events now, extract the IDs from the response "
"(they appear after '[ID: ' or in the 'Event IDs:' line), then call delete_events with those IDs."
)
deleted = []
failed = []
for event_id in valid_ids:
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
deleted.append(event_id)
except HttpError as e:
if e.resp.status == 404:
# Already deleted, count as success
deleted.append(event_id)
else:
failed.append((event_id, str(e)))
except Exception as e:
failed.append((event_id, str(e)))
result_parts = []
if deleted:
result_parts.append(f"Successfully deleted {len(deleted)} event(s).")
if failed:
result_parts.append(f"Failed to delete {len(failed)} event(s): {failed}")
return " ".join(result_parts) if result_parts else "No events to delete."
return await asyncio.to_thread(_delete_all)
async def _update_event(
self,
service: Any,
calendar_id: str,
event_id: str,
title: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
description: str | None = None,
location: str | None = None,
attendees: list[str] | None = None,
) -> str:
"""Update/reschedule a calendar event."""
import asyncio
def _update():
try:
# Get existing event
event = service.events().get(calendarId=calendar_id, eventId=event_id).execute()
# Save ORIGINAL start/end times BEFORE we modify them (needed for duration calculation)
original_start_raw = event["start"].get("dateTime", event["start"].get("date"))
original_end_raw = event["end"].get("dateTime", event["end"].get("date"))
# Get original event date for context
try:
if "T" in original_start_raw:
original_start_dt = datetime.fromisoformat(original_start_raw.replace("Z", "+00:00"))
original_date = original_start_dt.date()
old_start_dt = original_start_dt
else:
original_date = datetime.fromisoformat(original_start_raw).date()
tz = self._get_timezone()
old_start_dt = tz.localize(datetime.combine(original_date, datetime.min.time()))
except Exception:
# Fallback to today if parsing fails
tz = self._get_timezone()
original_date = datetime.now(tz).date()
old_start_dt = datetime.now(tz)
try:
if "T" in original_end_raw:
old_end_dt = datetime.fromisoformat(original_end_raw.replace("Z", "+00:00"))
else:
tz = self._get_timezone()
old_end_dt = tz.localize(datetime.combine(original_date, datetime.min.time()))
except Exception:
old_end_dt = old_start_dt + timedelta(hours=1) # Default 1 hour duration
# Update fields if provided
if title:
event["summary"] = title
if start_time:
# Parse start_time in context of the original event's date
# If it's just a time like "4pm", use the original event's date
start_dt = self._parse_time_with_date(start_time, original_date)
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
event["start"] = {
"dateTime": start_dt.isoformat(),
"timeZone": tz_str,
}
if end_time:
# Parse end_time in context of the original event's date (or new start date if start_time was provided)
base_date = original_date
if start_time and "start" in event:
try:
new_start_raw = event["start"].get("dateTime", "")
if "T" in new_start_raw:
base_date = datetime.fromisoformat(new_start_raw.replace("Z", "+00:00")).date()
except Exception:
pass
end_dt = self._parse_time_with_date(end_time, base_date)
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
event["end"] = {
"dateTime": end_dt.isoformat(),
"timeZone": tz_str,
}
elif start_time and "start" in event:
# If start_time changed but end_time not provided, adjust end_time to maintain duration
# Use the ORIGINAL start/end times we saved above
duration = old_end_dt - old_start_dt
# Get the NEW start time (already updated above)
new_start_dt = datetime.fromisoformat(event["start"].get("dateTime", "").replace("Z", "+00:00"))
new_end_dt = new_start_dt + duration
config = self.config
tz_str = getattr(config, 'timezone', 'UTC') or 'UTC'
event["end"] = {
"dateTime": new_end_dt.isoformat(),
"timeZone": tz_str,
}
if description is not None:
event["description"] = description
if location is not None:
event["location"] = location
if attendees is not None:
event["attendees"] = [{"email": email} for email in attendees]
# Update the event
updated_event = service.events().update(
calendarId=calendar_id,
eventId=event_id,
body=event
).execute()
return f"Event updated successfully: {updated_event.get('htmlLink')}"
except HttpError as e:
if e.resp.status == 404:
return f"Error: Event not found (ID: {event_id}). It may have been deleted."
return f"Error updating event: {e}"
except Exception as e:
return f"Error updating event: {str(e)}"
return await asyncio.to_thread(_update)