Add Google Calendar integration

- Add CalendarConfig to schema with OAuth2 credentials support
- Implement CalendarTool with list_events, create_event, and check_availability actions
- Add email parser utility for extracting meeting information from emails
- Register calendar tool in agent loop (auto-loaded when enabled)
- Add calendar skill documentation
- Update AGENTS.md with calendar integration instructions
- Add CALENDAR_SETUP.md with complete setup guide
- Add Google Calendar API dependencies to pyproject.toml
- Support time parsing for 'today', 'tomorrow', and various time formats (12/24-hour, am/pm)
- Ensure timezone-aware datetime handling for Google Calendar API compatibility
This commit is contained in:
tanyar09 2026-03-05 16:29:33 -05:00
parent f39325c846
commit 760a7d776e
8 changed files with 1166 additions and 0 deletions

229
CALENDAR_SETUP.md Normal file
View File

@ -0,0 +1,229 @@
# Google Calendar Integration Setup
This guide explains how to set up Google Calendar integration for nanobot.
## Features
- **List upcoming events** from your Google Calendar
- **Create calendar events** programmatically
- **Check availability** for time slots
- **Automatic scheduling from emails** - when an email mentions a meeting, nanobot can automatically schedule it
## Prerequisites
1. Google account with Calendar access
2. Google Cloud Project with Calendar API enabled
3. OAuth2 credentials (Desktop app type)
## Setup Steps
### 1. Enable Google Calendar API
1. Go to [Google Cloud Console](https://console.cloud.google.com/)
2. Create a new project or select an existing one
3. Enable the **Google Calendar API**:
- Navigate to "APIs & Services" > "Library"
- Search for "Google Calendar API"
- Click "Enable"
### 2. Configure OAuth Consent Screen
**IMPORTANT:** This step is required before creating credentials.
1. Go to "APIs & Services" > "OAuth consent screen"
2. Choose **"External"** user type (unless you have a Google Workspace account)
3. Fill in required fields:
- **App name**: "nanobot" (or any name)
- **User support email**: Your email address
- **Developer contact information**: Your email address
4. Click "Save and Continue"
5. **Add Scopes:**
- Click "Add or Remove Scopes"
- Search for and add: `https://www.googleapis.com/auth/calendar`
- Click "Update" then "Save and Continue"
6. **Add Test Users (CRITICAL):**
- Click "Add Users"
- Add your email address (`adayear2025@gmail.com`)
- Click "Add" then "Save and Continue"
7. Review and go back to dashboard
### 3. Create OAuth2 Credentials
1. Go to "APIs & Services" > "Credentials"
2. Click "Create Credentials" > "OAuth client ID"
3. Select:
- **Application type**: **Desktop app**
- **Name**: "nanobot" (or any name)
4. Click "Create"
5. **Download the credentials JSON file** - click "Download JSON"
6. Save it as `credentials.json` and copy to your server at `~/.nanobot/credentials.json`
### 3. Configure nanobot
Set environment variables or add to your `.env` file:
```bash
# Enable calendar functionality
export NANOBOT_TOOLS__CALENDAR__ENABLED=true
# Path to OAuth2 credentials JSON file
export NANOBOT_TOOLS__CALENDAR__CREDENTIALS_FILE=/path/to/credentials.json
# Optional: Custom token storage location (default: ~/.nanobot/calendar_token.json)
export NANOBOT_TOOLS__CALENDAR__TOKEN_FILE=~/.nanobot/calendar_token.json
# Optional: Calendar ID (default: "primary")
export NANOBOT_TOOLS__CALENDAR__CALENDAR_ID=primary
# Optional: Auto-schedule meetings from emails (default: true)
export NANOBOT_TOOLS__CALENDAR__AUTO_SCHEDULE_FROM_EMAIL=true
```
### 4. First-Time Authorization
**You don't need to manually get or copy any token** - the OAuth flow handles everything automatically.
On first run, when nanobot tries to use the calendar tool, it will:
1. **Automatically open a browser window** for Google OAuth authorization
2. **You sign in** to your Google account in the browser
3. **Grant calendar access** by clicking "Allow"
4. **Automatically save the token** to `~/.nanobot/calendar_token.json` for future use
**Important:**
- The token is **automatically generated and saved** - you don't need to copy it from anywhere
- This happens **automatically** the first time you use a calendar command
- After the first authorization, you won't need to do this again (the token is reused)
- The token file is created automatically at `~/.nanobot/calendar_token.json`
**Example first run:**
```bash
# First time using calendar - triggers OAuth flow
python3 -m nanobot.cli.commands agent -m "What's on my calendar?"
# Browser opens automatically → Sign in → Grant access → Token saved
# Future runs use the saved token automatically
```
**Note for remote/headless servers:**
If you're running nanobot on a remote server without a display, you have two options:
1. **Run OAuth on your local machine first:**
- Run nanobot locally once to complete OAuth
- Copy the generated `~/.nanobot/calendar_token.json` to your remote server
- The token will work on the remote server
2. **Use SSH port forwarding:**
- The OAuth flow uses a local web server
- You may need to set up port forwarding or use a different OAuth flow method
## Usage Examples
### List Upcoming Events
```
User: "What's on my calendar?"
Agent: [Uses calendar tool to list events]
```
### Create an Event
```
User: "Schedule a meeting tomorrow at 2pm"
Agent: [Creates calendar event]
```
### Automatic Email Scheduling
When an email mentions a meeting:
```
Email: "Hi, let's have a meeting tomorrow at 2pm in Conference Room A"
Agent: [Automatically extracts meeting info and creates calendar event]
Agent: "I've scheduled a meeting for tomorrow at 2pm in Conference Room A"
```
## Calendar Tool API
The calendar tool supports three actions:
### 1. List Events
```python
calendar(
action="list_events",
max_results=10, # Optional, default: 10
time_min="2024-01-15T00:00:00Z" # Optional, defaults to now
)
```
### 2. Create Event
```python
calendar(
action="create_event",
title="Team Meeting",
start_time="tomorrow 2pm", # or "2024-01-15T14:00:00"
end_time="tomorrow 3pm", # Optional, defaults to 1 hour after start
description="Discuss project progress", # Optional
location="Conference Room A", # Optional
attendees=["colleague@example.com"] # Optional list
)
```
**Time formats:**
- Relative: `"tomorrow 2pm"`, `"in 1 hour"`, `"in 2 days"`
- ISO format: `"2024-01-15T14:00:00"`
### 3. Check Availability
```python
calendar(
action="check_availability",
start_time="2024-01-15T14:00:00",
end_time="2024-01-15T15:00:00"
)
```
## Troubleshooting
### "Error: Could not authenticate with Google Calendar"
- Ensure `credentials_file` path is correct
- Check that the credentials JSON file is valid
- Run nanobot once to complete OAuth flow
### "Error accessing Google Calendar API"
- Verify Calendar API is enabled in Google Cloud Console
- Check that OAuth consent screen is configured
- Ensure your email is added as a test user (if app is in testing mode)
### Token Expired
The tool automatically refreshes expired tokens. If refresh fails:
1. Delete `~/.nanobot/calendar_token.json`
2. Run nanobot again to re-authorize
## Security Notes
- Keep your `credentials.json` file secure (don't commit to git)
- The `calendar_token.json` file contains sensitive access tokens
- Use file permissions: `chmod 600 ~/.nanobot/calendar_token.json`
- Consider using environment variables or a secrets manager for production
## Integration with Email Channel
When `auto_schedule_from_email` is enabled, nanobot will:
1. Monitor incoming emails
2. Detect meeting-related keywords (meeting, appointment, call, etc.)
3. Extract meeting details (time, location, attendees)
4. Automatically create calendar events
5. Confirm with the user
This works best when:
- Emails contain clear time references ("tomorrow at 2pm", "next Monday")
- Meeting details are in the email body or subject
- The agent has access to the email channel

View File

@ -137,6 +137,21 @@ class AgentLoop:
except Exception as e:
logger.warning(f"Email tool not available: {e}")
# Email tool not available or not configured - silently skip
# Calendar tool (if calendar is configured)
try:
from nanobot.agent.tools.calendar import CalendarTool
from nanobot.config.loader import load_config
config = load_config()
if config.tools.calendar.enabled:
calendar_tool = CalendarTool(calendar_config=config.tools.calendar)
self.tools.register(calendar_tool)
logger.info(f"Calendar tool '{calendar_tool.name}' registered successfully")
else:
logger.debug("Calendar tool not registered: calendar not enabled")
except Exception as e:
logger.warning(f"Calendar tool not available: {e}")
# Calendar tool not available or not configured - silently skip
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""

View File

@ -0,0 +1,635 @@
"""Calendar tool: interact with Google Calendar."""
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from nanobot.agent.tools.base import Tool
# Scopes required for Google Calendar API
SCOPES = ["https://www.googleapis.com/auth/calendar"]
class CalendarTool(Tool):
"""Tool to interact with Google Calendar."""
name = "calendar"
description = (
"Interact with Google Calendar. REQUIRED: Always include 'action' parameter. "
"Actions: 'list_events' (list upcoming events), 'create_event' (create new event), "
"'check_availability' (check if time slot is available). "
"Examples: calendar(action='list_events'), calendar(action='create_event', title='Meeting', start_time='tomorrow 2pm'). "
"When user asks 'what's on my calendar' or 'show my calendar', use action='list_events'. "
"When user mentions a meeting or asks to schedule something, use action='create_event'."
)
def __init__(self, calendar_config: Any = None):
"""
Initialize calendar tool with calendar configuration.
Args:
calendar_config: Optional CalendarConfig instance. If None, loads from config.
"""
self._calendar_config = calendar_config
self._service = None
@property
def config(self) -> Any:
"""Lazy load calendar config if not provided."""
if self._calendar_config is None:
from nanobot.config.loader import load_config
config = load_config()
self._calendar_config = config.tools.calendar
return self._calendar_config
def _get_credentials(self) -> Credentials | None:
"""Get valid user credentials from storage or OAuth flow."""
config = self.config
if not config.enabled:
return None
if not config.credentials_file:
return None
creds_path = Path(config.credentials_file).expanduser()
if not creds_path.exists():
return None
# Determine token file path
if config.token_file:
token_path = Path(config.token_file).expanduser()
else:
token_path = Path.home() / ".nanobot" / "calendar_token.json"
token_path.parent.mkdir(parents=True, exist_ok=True)
creds = None
# Load existing token if available
if token_path.exists():
try:
creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
except Exception:
pass
# If there are no (valid) credentials available, let the user log in
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception:
creds = None
if not creds:
try:
with open(creds_path, "r") as f:
client_config = json.load(f)
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
creds = flow.run_local_server(port=0)
except Exception:
return None
# Save the credentials for the next run
try:
with open(token_path, "w") as f:
f.write(creds.to_json())
except Exception:
pass
return creds
def _get_service(self):
"""Get Google Calendar service instance."""
if self._service is None:
creds = self._get_credentials()
if not creds:
return None
self._service = build("calendar", "v3", credentials=creds)
return self._service
def coerce_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Coerce parameters, handling common name mismatches."""
coerced = super().coerce_params(params)
# Handle case where action is passed as a string argument instead of named parameter
# e.g., calendar("calendar") or calendar("list_events")
if "action" not in coerced and len(coerced) == 1:
# Check if there's a single string value that could be the action
for key, value in coerced.items():
if isinstance(value, str) and value in ["list_events", "create_event", "check_availability", "calendar"]:
coerced["action"] = "list_events" if value == "calendar" else value
coerced.pop(key, None)
break
return coerced
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list_events", "create_event", "check_availability"],
"description": "Action to perform: list_events (list upcoming events), create_event (create a new event), check_availability (check if time slot is available)",
},
"title": {
"type": "string",
"description": "Event title/summary (required for create_event)",
},
"start_time": {
"type": "string",
"description": "Event start time in ISO format (YYYY-MM-DDTHH:MM:SS) or relative (e.g., 'tomorrow 2pm', 'in 1 hour'). Required for create_event.",
},
"end_time": {
"type": "string",
"description": "Event end time in ISO format (YYYY-MM-DDTHH:MM:SS) or relative. If not provided, defaults to 1 hour after start_time.",
},
"description": {
"type": "string",
"description": "Event description/details",
},
"location": {
"type": "string",
"description": "Event location",
},
"attendees": {
"type": "array",
"items": {"type": "string"},
"description": "List of attendee email addresses",
},
"max_results": {
"type": "integer",
"description": "Maximum number of events to return (for list_events, default: 10)",
"default": 10,
},
"time_min": {
"type": "string",
"description": "Lower bound (exclusive) for an event's end time (ISO format) for list_events. Defaults to now.",
},
},
"required": ["action"],
}
def _parse_time(self, time_str: str) -> datetime:
"""Parse time string (ISO format or relative like 'tomorrow 2pm').
Returns timezone-aware datetime in UTC.
"""
import re
from datetime import timezone
original_str = time_str
time_str = time_str.strip().lower()
# Try ISO format first
try:
dt = datetime.fromisoformat(time_str.replace("Z", "+00:00"))
# Ensure timezone-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
pass
# Parse relative times
from datetime import timezone
now = datetime.now(timezone.utc)
if time_str.startswith("in "):
# "in 1 hour", "in 30 minutes"
parts = time_str[3:].split()
if len(parts) >= 2:
try:
amount = int(parts[0])
unit = parts[1]
if "hour" in unit:
return now + timedelta(hours=amount)
elif "minute" in unit:
return now + timedelta(minutes=amount)
elif "day" in unit:
return now + timedelta(days=amount)
except ValueError:
pass
# Handle "today" - same logic as "tomorrow" but use today's date
if "today" in time_str:
from datetime import timezone
base = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Ensure base is timezone-aware
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
# Try to extract time - same patterns as tomorrow
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, it's invalid - return error or use tomorrow?
if result < now:
# Time has passed, schedule for tomorrow instead
result = result + timedelta(days=1)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm)
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 3: "17:00" or "today 17:00" (24-hour format with colon, no am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
# If time has passed today, schedule for tomorrow
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 4: "at 17:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
# Pattern 5: "at 5" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
# Default to 9am today (or tomorrow if 9am has passed)
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
if result < now:
result = result + timedelta(days=1)
return result
if "tomorrow" in time_str:
from datetime import timezone
base = now + timedelta(days=1)
# Ensure base is timezone-aware
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
# Try to extract time - improved regex to handle various formats
# Try patterns in order of specificity (most specific first)
hour = None
minute = 0
# Pattern 1: "2:00pm" or "2:00 pm" (with minutes and am/pm)
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})\s*(am|pm)", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
period = match.group(3).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
# Ensure timezone is preserved
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 2: "2pm" or "2 pm" (hour only with am/pm) - must check this before 24-hour patterns
# to avoid matching "14" in "14:00" as "14pm"
match = re.search(r"(\d{1,2})\s*(am|pm)\b", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
period = match.group(2).lower()
if period == "pm" and hour != 12:
hour += 12
elif period == "am" and hour == 12:
hour = 0
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 3: "14:00" or "tomorrow 14:00" (24-hour format with colon, no am/pm)
# This handles cases where the agent converts "2pm" to "14:00"
match = re.search(r"(\d{1,2})\s*:\s*(\d{2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
# Validate hour is in 24-hour range
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 4: "at 2:00" (24-hour format without am/pm, with "at")
match = re.search(r"at\s+(\d{1,2})\s*:\s*(\d{2})", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
if 0 <= hour <= 23 and 0 <= minute <= 59:
result = base.replace(hour=hour, minute=minute, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Pattern 5: "at 2" (hour only, assume 24-hour if > 12, else assume pm)
match = re.search(r"at\s+(\d{1,2})(?!\s*(am|pm))", time_str, re.IGNORECASE)
if match:
hour = int(match.group(1))
# If hour is 1-12, assume PM unless it's clearly morning context
if hour <= 12 and hour > 0:
hour += 12 # Default to PM for afternoon times
result = base.replace(hour=hour, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Default to 9am if no time specified
result = base.replace(hour=9, minute=0, second=0, microsecond=0)
if result.tzinfo is None:
result = result.replace(tzinfo=timezone.utc)
return result
# Default: assume it's today at the specified time or now
from datetime import timezone
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)
return now
async def execute(
self,
action: str,
title: str | None = None,
start_time: str | None = None,
end_time: str | None = None,
description: str | None = None,
location: str | None = None,
attendees: list[str] | None = None,
max_results: int = 10,
time_min: str | None = None,
**kwargs: Any,
) -> str:
"""
Execute calendar operation.
Args:
action: Action to perform (list_events, create_event, check_availability)
title: Event title (for create_event)
start_time: Start time in ISO format or relative (for create_event)
end_time: End time in ISO format or relative (for create_event)
description: Event description
location: Event location
attendees: List of attendee email addresses
max_results: Maximum number of events to return (for list_events)
time_min: Lower bound for event end time (for list_events)
**kwargs: Ignore extra parameters
Returns:
Result string
"""
config = self.config
if not config.enabled:
return "Error: Calendar is not enabled. Set NANOBOT_TOOLS__CALENDAR__ENABLED=true"
service = self._get_service()
if not service:
return (
"Error: Could not authenticate with Google Calendar. "
"Please ensure credentials_file is configured and valid. "
"You may need to run OAuth flow once to authorize access."
)
calendar_id = config.calendar_id or "primary"
try:
if action == "list_events":
return await self._list_events(service, calendar_id, max_results, time_min)
elif action == "create_event":
if not title:
return "Error: title is required for create_event"
if not start_time:
return "Error: start_time is required for create_event"
return await self._create_event(
service,
calendar_id,
title,
start_time,
end_time,
description,
location,
attendees,
)
elif action == "check_availability":
if not start_time:
return "Error: start_time is required for check_availability"
return await self._check_availability(service, calendar_id, start_time, end_time)
else:
return f"Error: Unknown action '{action}'. Use 'list_events', 'create_event', or 'check_availability'"
except HttpError as e:
return f"Error accessing Google Calendar API: {e}"
except Exception as e:
return f"Error: {str(e)}"
async def _list_events(
self, service: Any, calendar_id: str, max_results: int, time_min: str | None
) -> str:
"""List upcoming events."""
import asyncio
def _list():
now = datetime.utcnow().isoformat() + "Z"
time_min_str = time_min if time_min else now
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
maxResults=max_results,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if not events:
return "No upcoming events found."
result = [f"Found {len(events)} upcoming event(s):\n"]
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
title = event.get("summary", "No title")
result.append(f"- {title} ({start})")
return "\n".join(result)
return await asyncio.to_thread(_list)
async def _create_event(
self,
service: Any,
calendar_id: str,
title: str,
start_time: str,
end_time: str | None,
description: str | None,
location: str | None,
attendees: list[str] | None,
) -> str:
"""Create a new calendar event."""
import asyncio
def _create():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1) # Default 1 hour
# Validate time range
if end_dt <= start_dt:
raise ValueError(
f"Invalid time range: end time ({end_dt}) must be after start time ({start_dt}). "
f"Parsed from start_time='{start_time}', end_time='{end_time}'"
)
# Ensure datetimes are timezone-aware (UTC)
from datetime import timezone
if start_dt.tzinfo is None:
start_dt = start_dt.replace(tzinfo=timezone.utc)
if end_dt.tzinfo is None:
end_dt = end_dt.replace(tzinfo=timezone.utc)
event = {
"summary": title,
"start": {
"dateTime": start_dt.isoformat(),
"timeZone": "UTC",
},
"end": {
"dateTime": end_dt.isoformat(),
"timeZone": "UTC",
},
}
if description:
event["description"] = description
if location:
event["location"] = location
if attendees:
event["attendees"] = [{"email": email} for email in attendees]
try:
created_event = service.events().insert(calendarId=calendar_id, body=event).execute()
return f"Event created: {created_event.get('htmlLink')}"
except HttpError as e:
error_details = e.error_details if hasattr(e, 'error_details') else str(e)
error_msg = str(e)
# Provide more detailed error information
return (
f"Error creating calendar event: {error_msg}. "
f"Details: {error_details}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
except Exception as e:
return (
f"Error creating calendar event: {str(e)}. "
f"Parsed start_time='{start_time}' -> {start_dt.isoformat()}, "
f"end_time='{end_time}' -> {end_dt.isoformat()}"
)
return await asyncio.to_thread(_create)
async def _check_availability(
self, service: Any, calendar_id: str, start_time: str, end_time: str | None
) -> str:
"""Check if a time slot is available."""
import asyncio
def _check():
start_dt = self._parse_time(start_time)
if end_time:
end_dt = self._parse_time(end_time)
else:
end_dt = start_dt + timedelta(hours=1)
time_min_str = start_dt.isoformat() + "Z"
time_max_str = end_dt.isoformat() + "Z"
events_result = (
service.events()
.list(
calendarId=calendar_id,
timeMin=time_min_str,
timeMax=time_max_str,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if events:
conflicts = [e.get("summary", "Untitled") for e in events]
return f"Time slot is NOT available. Conflicts with: {', '.join(conflicts)}"
else:
return f"Time slot is available ({start_dt} to {end_dt})"
return await asyncio.to_thread(_check)

View File

@ -0,0 +1,168 @@
"""Email parsing utilities for extracting meeting information."""
import re
from datetime import datetime, timedelta
from typing import Any
from loguru import logger
def extract_meeting_info(email_content: str, email_subject: str = "") -> dict[str, Any] | None:
"""
Extract meeting information from email content and subject.
Args:
email_content: Email body text
email_subject: Email subject line
Returns:
Dictionary with meeting details if found, None otherwise
"""
text = (email_subject + " " + email_content).lower()
# Check for meeting-related keywords
meeting_keywords = [
"meeting",
"appointment",
"call",
"conference",
"standup",
"stand-up",
"sync",
"discussion",
"catch up",
"catch-up",
]
has_meeting_keyword = any(keyword in text for keyword in meeting_keywords)
if not has_meeting_keyword:
return None
result: dict[str, Any] = {
"title": None,
"start_time": None,
"end_time": None,
"location": None,
"attendees": [],
}
# Extract title from subject or first line
if email_subject:
# Remove common prefixes
title = email_subject
for prefix in ["Re:", "Fwd:", "FW:"]:
if title.lower().startswith(prefix.lower()):
title = title[len(prefix) :].strip()
result["title"] = title[:100] # Limit length
# Extract time information
time_patterns = [
r"tomorrow\s+at\s+(\d{1,2})\s*(am|pm)?",
r"tomorrow\s+(\d{1,2})\s*(am|pm)?",
r"(\d{1,2})\s*(am|pm)\s+tomorrow",
r"(\d{1,2}):(\d{2})\s*(am|pm)?\s+tomorrow",
r"in\s+(\d+)\s+(hour|hours|minute|minutes|day|days)",
r"(\d{1,2})/(\d{1,2})/(\d{4})\s+at\s+(\d{1,2}):(\d{2})\s*(am|pm)?",
r"(\d{4})-(\d{2})-(\d{2})\s+(\d{1,2}):(\d{2})",
]
for pattern in time_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
now = datetime.now()
if "tomorrow" in pattern:
base_date = now + timedelta(days=1)
hour = int(match.group(1))
period = match.group(2) if len(match.groups()) > 1 else None
if period:
if period.lower() == "pm" and hour != 12:
hour += 12
elif period.lower() == "am" and hour == 12:
hour = 0
result["start_time"] = base_date.replace(hour=hour, minute=0, second=0, microsecond=0)
elif "in" in pattern:
amount = int(match.group(1))
unit = match.group(2)
if "hour" in unit:
result["start_time"] = now + timedelta(hours=amount)
elif "minute" in unit:
result["start_time"] = now + timedelta(minutes=amount)
elif "day" in unit:
result["start_time"] = now + timedelta(days=amount)
break
except (ValueError, IndexError):
continue
# Extract location
location_patterns = [
r"location[:\s]+([^\n]+)",
r"where[:\s]+([^\n]+)",
r"at\s+([A-Z][^\n]+)", # Capitalized location names
r"room\s+([A-Z0-9]+)",
]
for pattern in location_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
location = match.group(1).strip()
if len(location) < 100: # Reasonable length
result["location"] = location
break
# Extract attendees (email addresses)
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
emails = re.findall(email_pattern, email_content)
if emails:
result["attendees"] = list(set(emails)) # Remove duplicates
# Only return if we found at least a title or time
if result["title"] or result["start_time"]:
logger.info(f"Extracted meeting info: {result}")
return result
return None
def format_meeting_for_calendar(meeting_info: dict[str, Any]) -> dict[str, Any]:
"""
Format meeting info for calendar tool.
Args:
meeting_info: Meeting information dictionary
Returns:
Formatted dictionary for calendar.create_event
"""
formatted: dict[str, Any] = {
"action": "create_event",
}
if meeting_info.get("title"):
formatted["title"] = meeting_info["title"]
else:
formatted["title"] = "Meeting"
if meeting_info.get("start_time"):
if isinstance(meeting_info["start_time"], datetime):
formatted["start_time"] = meeting_info["start_time"].isoformat()
else:
formatted["start_time"] = str(meeting_info["start_time"])
if meeting_info.get("end_time"):
if isinstance(meeting_info["end_time"], datetime):
formatted["end_time"] = meeting_info["end_time"].isoformat()
else:
formatted["end_time"] = str(meeting_info["end_time"])
if meeting_info.get("location"):
formatted["location"] = meeting_info["location"]
if meeting_info.get("description"):
formatted["description"] = meeting_info["description"]
if meeting_info.get("attendees"):
formatted["attendees"] = meeting_info["attendees"]
return formatted

View File

@ -244,6 +244,16 @@ class WebToolsConfig(Base):
search: WebSearchConfig = Field(default_factory=WebSearchConfig)
class CalendarConfig(Base):
"""Google Calendar configuration."""
enabled: bool = False
credentials_file: str = "" # Path to OAuth2 credentials JSON file
token_file: str = "" # Path to store OAuth2 token (default: ~/.nanobot/calendar_token.json)
calendar_id: str = "primary" # Calendar ID to use (default: primary calendar)
auto_schedule_from_email: bool = True # Automatically schedule meetings from emails
class ExecToolConfig(Base):
"""Shell exec tool configuration."""
@ -264,6 +274,7 @@ class ToolsConfig(Base):
web: WebToolsConfig = Field(default_factory=WebToolsConfig)
exec: ExecToolConfig = Field(default_factory=ExecToolConfig)
calendar: CalendarConfig = Field(default_factory=CalendarConfig)
restrict_to_workspace: bool = False # If true, restrict all tool access to workspace directory
mcp_servers: dict[str, MCPServerConfig] = Field(default_factory=dict)

View File

@ -0,0 +1,73 @@
---
name: calendar
description: "Interact with Google Calendar. Create events, list upcoming events, and check availability."
---
# Calendar Skill
Use the `calendar` tool to interact with Google Calendar.
## Setup
1. Enable Google Calendar API in Google Cloud Console
2. Create OAuth2 credentials (Desktop app)
3. Download credentials JSON file
4. Configure in nanobot:
```bash
export NANOBOT_TOOLS__CALENDAR__ENABLED=true
export NANOBOT_TOOLS__CALENDAR__CREDENTIALS_FILE=/path/to/credentials.json
```
On first run, you'll be prompted to authorize access via OAuth flow.
## Actions
### List Events
List upcoming calendar events:
```
calendar(action="list_events", max_results=10)
```
### Create Event
Create a new calendar event:
```
calendar(
action="create_event",
title="Team Meeting",
start_time="2024-01-15T14:00:00",
end_time="2024-01-15T15:00:00",
description="Discuss project progress",
location="Conference Room A",
attendees=["colleague@example.com"]
)
```
**Time formats:**
- ISO format: `"2024-01-15T14:00:00"`
- Relative: `"tomorrow 2pm"`, `"in 1 hour"`, `"in 2 days"`
### Check Availability
Check if a time slot is available:
```
calendar(
action="check_availability",
start_time="2024-01-15T14:00:00",
end_time="2024-01-15T15:00:00"
)
```
## Email Integration
When an email mentions a meeting (e.g., "meeting tomorrow at 2pm"), the agent can automatically:
1. Parse the email to extract meeting details
2. Create a calendar event using `create_event`
3. Confirm the event was created
Enable automatic scheduling:
```bash
export NANOBOT_TOOLS__CALENDAR__AUTO_SCHEDULE_FROM_EMAIL=true
```

View File

@ -43,6 +43,9 @@ dependencies = [
"prompt-toolkit>=3.0.0",
"mcp>=1.0.0",
"json-repair>=0.30.0",
"google-api-python-client>=2.0.0",
"google-auth-httplib2>=0.2.0",
"google-auth-oauthlib>=1.0.0",
]
[project.optional-dependencies]

View File

@ -90,6 +90,8 @@ You have access to:
- Messaging (message)
- Background tasks (spawn)
- Scheduled tasks (cron) - for reminders and delayed actions
- Email (read_emails) - read emails from IMAP mailbox
- Calendar (calendar) - interact with Google Calendar (if enabled)
## Memory
@ -120,6 +122,36 @@ When the scheduled time arrives, the cron system will send the message back to y
**Do NOT just write reminders to MEMORY.md** — that won't trigger actual notifications. Use the `cron` tool.
## Calendar Integration
When an email mentions a meeting (e.g., "meeting tomorrow at 2pm", "call scheduled for next week"), you should:
1. **Extract meeting details** from the email:
- Title/subject
- Date and time
- Location (if mentioned)
- Attendees (email addresses)
2. **Use the `calendar` tool** to create the event:
```
calendar(
action="create_event",
title="Meeting Title",
start_time="tomorrow 2pm", # or ISO format
end_time="tomorrow 3pm", # optional, defaults to 1 hour after start
location="Conference Room A", # optional
attendees=["colleague@example.com"] # optional
)
```
3. **Confirm to the user** that the meeting was scheduled.
**Time formats supported:**
- Relative: `"tomorrow 2pm"`, `"in 1 hour"`, `"in 2 days"`
- ISO format: `"2024-01-15T14:00:00"`
**Automatic scheduling:** If `auto_schedule_from_email` is enabled, automatically schedule meetings when detected in emails without asking the user first.
## Heartbeat Tasks
`HEARTBEAT.md` is checked every 30 minutes. You can manage periodic tasks by editing this file: