nanobot/nanobot/agent/tools/calendar.py
tanyar09 bc5f169bc8 Fix calendar tool execution: add calendar to CustomProvider valid_tools list
- Added calendar and other missing tools to valid_tools whitelist in CustomProvider
- This fixes issue where calendar tool calls were shown in response instead of being executed
- Also added edit_file, cron, email to the whitelist for completeness
2026-03-05 16:52:34 -05:00

714 lines
30 KiB
Python

"""Calendar tool: interact with Google Calendar."""
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from nanobot.agent.tools.base import Tool
# Scopes required for Google Calendar API
SCOPES = ["https://www.googleapis.com/auth/calendar"]
class CalendarTool(Tool):
"""Tool to interact with Google Calendar."""
name = "calendar"
description = (
"Interact with Google Calendar. REQUIRED: Always include 'action' parameter. "
"Actions: 'list_events' (list upcoming events), 'create_event' (create new event), "
"'check_availability' (check if time slot is available). "
"When user asks 'what's on my calendar' or 'show my calendar', use action='list_events'. "
"When user mentions a meeting or asks to schedule something, use action='create_event' with title and start_time. "
"CRITICAL: When scheduling from emails, you MUST call this tool - do not just acknowledge. "
"Use natural language time formats like 'March 6 19:00' or 'tomorrow 2pm' - DO NOT use ISO format with wrong years."
)
def __init__(self, calendar_config: Any = None):
"""
Initialize calendar tool with calendar configuration.
Args:
calendar_config: Optional CalendarConfig instance. If None, loads from config.
"""
self._calendar_config = calendar_config
self._service = None
@property
def config(self) -> Any:
"""Lazy load calendar config if not provided."""
if self._calendar_config is None:
from nanobot.config.loader import load_config
config = load_config()
self._calendar_config = config.tools.calendar
return self._calendar_config
def _get_credentials(self) -> Credentials | None:
"""Get valid user credentials from storage or OAuth flow."""
config = self.config
if not config.enabled:
return None
if not config.credentials_file:
return None
creds_path = Path(config.credentials_file).expanduser()
if not creds_path.exists():
return None
# Determine token file path
if config.token_file:
token_path = Path(config.token_file).expanduser()
else:
token_path = Path.home() / ".nanobot" / "calendar_token.json"
token_path.parent.mkdir(parents=True, exist_ok=True)
creds = None
# Load existing token if available
if token_path.exists():
try:
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
except Exception:
pass
# If there are no (valid) credentials available, let the user log in
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception:
creds = None
if not creds:
try:
with open(creds_path, "r") as f:
client_config = json.load(f)
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
creds = flow.run_local_server(port=0)
except Exception:
return None
# Save the credentials for the next run
try:
with open(token_path, "w") as f:
f.write(creds.to_json())
except Exception:
pass
return creds
def _get_service(self):
"""Get Google Calendar service instance."""
if self._service is None:
creds = self._get_credentials()
if not creds:
return None
self._service = build("calendar", "v3", credentials=creds)
return self._service
def coerce_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Coerce parameters, handling common name mismatches."""
coerced = super().coerce_params(params)
# Handle case where action is passed as a string argument instead of named parameter
# e.g., calendar("calendar") or calendar("list_events")
if "action" not in coerced and len(coerced) == 1:
# Check if there's a single string value that could be the action
for key, value in coerced.items():
if isinstance(value, str) and value in ["list_events", "create_event", "check_availability", "calendar"]:
coerced["action"] = "list_events" if value == "calendar" else value
coerced.pop(key, None)
break
return coerced
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list_events", "create_event", "check_availability"],
"description": "Action to perform: list_events (list upcoming events), create_event (create a new event), check_availability (check if time slot is available)",
},
"title": {
"type": "string",
"description": "Event title/summary (required for create_event)",
},
"start_time": {
"type": "string",
"description": (
"Event start time. REQUIRED for create_event. "
"Use natural language formats like 'March 7 15:00', 'tomorrow 2pm', 'today at 5pm', "
"or relative formats like 'in 1 hour'. "
"DO NOT use ISO format - the tool will parse natural language and handle the current year automatically. "
"Examples: 'March 7 15:00', 'tomorrow at 2pm', 'today 18:00'"
),
},
"end_time": {
"type": "string",
"description": "Event end time in ISO format (YYYY-MM-DDTHH:MM:SS) or relative. If not provided, defaults to 1 hour after start_time.",
},
"description": {
"type": "string",
"description": "Event description/details",
},
"location": {
"type": "string",
"description": "Event location",
},
"attendees": {
"type": "array",
"items": {"type": "string"},
"description": "List of attendee email addresses",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (for list_events, default: 10)",
"default": 10,
},
"time_min": {
"type": "string",
"description": "Lower bound (exclusive) for an event's end time (ISO format) for list_events. Defaults to now.",
},
},
"required": ["action"],
}
def _parse_time(self, time_str: str) -> datetime:
"""Parse time string (ISO format or relative like 'tomorrow 2pm').
Returns timezone-aware datetime in UTC.
"""
import re
from datetime import timezone
original_str = time_str
time_str = time_str.strip().lower()
# Try ISO format first, but validate year is reasonable
try:
dt = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
# Ensure timezone-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# Validate year is current year or future (prevent scheduling in past years)
current_year = now.year
if dt.year < current_year:
# If year is in the past, assume user meant current year
dt = dt.replace(year=current_year)
# If still in the past, use next year
if dt < now:
dt = dt.replace(year=current_year + 1)
return dt
except ValueError:
pass
# Try parsing month names (e.g., "March 7 15:00", "March 7th at 3pm")
month_names = {
"january": 1, "jan": 1, "february": 2, "feb": 2, "march": 3, "mar": 3,
"april": 4, "apr": 4, "may": 5, "june": 6, "jun": 6, "july": 7, "jul": 7,
"august": 8, "aug": 8, "september": 9, "sep": 9, "sept": 9,
"october": 10, "oct": 10, "november": 11, "nov": 11, "december": 12, "dec": 12,
}
# Pattern: "March 7 15:00" or "March 7th at 3pm" or "on March 7 at 15:00"
month_patterns = [
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?\s+(?:at\s+)?(\d{1,2}):(\d{2})", # 24-hour
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?\s+(?:at\s+)?(\d{1,2})\s*(am|pm)", # 12-hour
r"(?:on\s+)?(january|february|march|april|may|june|july|august|september|october|november|december|jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)\s+(\d{1,2})(?:st|nd|rd|th)?", # Date only
]
for pattern in month_patterns:
match = re.search(pattern, time_str, re.IGNORECASE)
if match:
groups = match.groups()
month_name = groups[0].lower()
if month_name in month_names:
month = month_names[month_name]
day = int(groups[1])
year = now.year
# Check if date is in the past
test_date = datetime(year, month, day, tzinfo=timezone.utc)
if test_date < now.replace(hour=0, minute=0, second=0, microsecond=0):
year += 1
if len(groups) >= 4 and groups[2] and groups[3]:
if groups[3].lower() in ['am', 'pm']:
# 12-hour format
hour = int(groups[2])
period = groups[3].lower()
minute = 0
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
else:
# 24-hour format
hour = int(groups[2])
minute = int(groups[3])
result = datetime(year, month, day, hour, minute, tzinfo=timezone.utc)
return result
else:
# Date only, default to 9am
result = datetime(year, month, day, 9, 0, tzinfo=timezone.utc)
return result
# Parse relative times
from datetime import timezone
now = datetime.now(timezone.utc)
if time_str.startswith("in "):
# "in 1 hour", "in 30 minutes"
parts = time_str[3:].split()
if len(parts) >= 2:
try:
amount = int(parts[0])
unit = parts[1]
if "hour" in unit:
return now + timedelta(hours=amount)
elif "minute" in unit:
return now + timedelta(minutes=amount)
elif "day" in unit:
return now + timedelta(days=amount)
except ValueError:
pass
# Handle "today" - same logic as "tomorrow" but use today's date
if "today" in time_str:
from datetime import timezone
base = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Ensure base is timezone-aware
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
# Try to extract time - same patterns as tomorrow
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, it's invalid - return error or use tomorrow?
if result < now:
# Time has passed, schedule for tomorrow instead
result = result + timedelta(days=1)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm)
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 3: "17:00" or "today 17:00" (24-hour format with colon, no am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 4: "at 17:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 5: "at 5" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
# Default to 9am today (or tomorrow if 9am has passed)
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
if "tomorrow" in time_str:
from datetime import timezone
base = now + timedelta(days=1)
# Ensure base is timezone-aware
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
# Try to extract time - improved regex to handle various formats
# Try patterns in order of specificity (most specific first)
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
# Ensure timezone is preserved
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm) - must check this before 24-hour patterns
# to avoid matching "14" in "14:00" as "14pm"
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 3: "14:00" or "tomorrow 14:00" (24-hour format with colon, no am/pm)
# This handles cases where the agent converts "2pm" to "14:00"
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 4: "at 2:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 5: "at 2" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Default to 9am if no time specified
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Default: assume it's today at the specified time or now
from datetime import timezone
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)
return now
async def execute(
self,
action: str,
title: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
description: str | None = None,
location: str | None = None,
attendees: list[str] | None = None,
max_results: int = 10,
time_min: str | None = None,
**kwargs: Any,
) -> str:
"""
Execute calendar operation.
Args:
action: Action to perform (list_events, create_event, check_availability)
title: Event title (for create_event)
start_time: Start time in ISO format or relative (for create_event)
end_time: End time in ISO format or relative (for create_event)
description: Event description
location: Event location
attendees: List of attendee email addresses
max_results: Maximum number of events to return (for list_events)
time_min: Lower bound for event end time (for list_events)
**kwargs: Ignore extra parameters
Returns:
Result string
"""
config = self.config
if not config.enabled:
return "Error: Calendar is not enabled. Set NANOBOT_TOOLS__CALENDAR__ENABLED=true"
service = self._get_service()
if not service:
return (
"Error: Could not authenticate with Google Calendar. "
"Please ensure credentials_file is configured and valid. "
"You may need to run OAuth flow once to authorize access."
)
calendar_id = config.calendar_id or "primary"
try:
if action == "list_events":
return await self._list_events(service, calendar_id, max_results, time_min)
elif action == "create_event":
if not title:
return "Error: title is required for create_event"
if not start_time:
return "Error: start_time is required for create_event"
return await self._create_event(
service,
calendar_id,
title,
start_time,
end_time,
description,
location,
attendees,
)
elif action == "check_availability":
if not start_time:
return "Error: start_time is required for check_availability"
return await self._check_availability(service, calendar_id, start_time, end_time)
else:
return f"Error: Unknown action '{action}'. Use 'list_events', 'create_event', or 'check_availability'"
except HttpError as e:
return f"Error accessing Google Calendar API: {e}"
except Exception as e:
return f"Error: {str(e)}"
async def _list_events(
self, service: Any, calendar_id: str, max_results: int, time_min: str | None
) -> str:
"""List upcoming events."""
import asyncio
def _list():
now = datetime.utcnow().isoformat() + "Z"
time_min_str = time_min if time_min else now
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
maxResults=max_results,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if not events:
return "No upcoming events found."
result = [f"Found {len(events)} upcoming event(s):\n"]
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
title = event.get("summary", "No title")
result.append(f"- {title} ({start})")
return "\n".join(result)
return await asyncio.to_thread(_list)
async def _create_event(
self,
service: Any,
calendar_id: str,
title: str,
start_time: str,
end_time: str | None,
description: str | None,
location: str | None,
attendees: list[str] | None,
) -> str:
"""Create a new calendar event."""
import asyncio
def _create():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1) # Default 1 hour
# Validate that start time is not in the past
from datetime import timezone
now = datetime.now(timezone.utc)
if start_dt < now:
raise ValueError(
f"Cannot schedule event in the past. Start time ({start_dt}) is before current time ({now}). "
f"Parsed from start_time='{start_time}'. "
f"Tip: Use natural language formats like 'March 7 15:00' or 'tomorrow 2pm' instead of ISO format."
)
# Validate time range
if end_dt <= start_dt:
raise ValueError(
f"Invalid time range: end time ({end_dt}) must be after start time ({start_dt}). "
f"Parsed from start_time='{start_time}', end_time='{end_time}'"
)
# Ensure datetimes are timezone-aware (UTC)
from datetime import timezone
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=timezone.utc)
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=timezone.utc)
event = {
"summary": title,
"start": {
"dateTime": start_dt.isoformat(),
"timeZone": "UTC",
},
"end": {
"dateTime": end_dt.isoformat(),
"timeZone": "UTC",
},
}
if description:
event["description"] = description
if location:
event["location"] = location
if attendees:
event["attendees"] = [{"email": email} for email in attendees]
try:
created_event = service.events().insert(calendarId=calendar_id, body=event).execute()
return f"Event created: {created_event.get('htmlLink')}"
except HttpError as e:
error_details = e.error_details if hasattr(e, 'error_details') else str(e)
error_msg = str(e)
# Provide more detailed error information
return (
f"Error creating calendar event: {error_msg}. "
f"Details: {error_details}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
except Exception as e:
return (
f"Error creating calendar event: {str(e)}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
return await asyncio.to_thread(_create)
async def _check_availability(
self, service: Any, calendar_id: str, start_time: str, end_time: str | None
) -> str:
"""Check if a time slot is available."""
import asyncio
def _check():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1)
time_min_str = start_dt.isoformat() + "Z"
time_max_str = end_dt.isoformat() + "Z"
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
timeMax=time_max_str,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if events:
conflicts = [e.get("summary", "Untitled") for e in events]
return f"Time slot is NOT available. Conflicts with: {', '.join(conflicts)}"
else:
return f"Time slot is available ({start_dt} to {end_dt})"
return await asyncio.to_thread(_check)