nanobot/nanobot/cron/service.py
tanyar09 d9919828c5 Fix cron scheduled tasks not executing
- Add auto-start functionality to cron service when jobs are added
  if service is not running and event loop is available
- Add 'reminder' field to CronPayload to distinguish between
  simple reminders (send message directly) and tasks (execute via agent)
- Update cron tool to accept 'reminder' parameter
- Fix callback logic to check reminder field: reminders send directly,
  tasks are processed through agent
- Ensures both 'remind me to X' and 'schedule a task to do X' work correctly
2026-03-03 13:09:44 -05:00

382 lines
13 KiB
Python

"""Cron service for scheduling agent tasks."""
import asyncio
import json
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Coroutine
from loguru import logger
from nanobot.cron.types import CronJob, CronJobState, CronPayload, CronSchedule, CronStore
def _now_ms() -> int:
return int(time.time() * 1000)
def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
"""Compute next run time in ms."""
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
if schedule.kind == "every":
if not schedule.every_ms or schedule.every_ms <= 0:
return None
# Next interval from now
return now_ms + schedule.every_ms
if schedule.kind == "cron" and schedule.expr:
try:
from croniter import croniter
from zoneinfo import ZoneInfo
# Use caller-provided reference time for deterministic scheduling
base_time = now_ms / 1000
tz = ZoneInfo(schedule.tz) if schedule.tz else datetime.now().astimezone().tzinfo
base_dt = datetime.fromtimestamp(base_time, tz=tz)
cron = croniter(schedule.expr, base_dt)
next_dt = cron.get_next(datetime)
return int(next_dt.timestamp() * 1000)
except Exception:
return None
return None
class CronService:
"""Service for managing and executing scheduled jobs."""
def __init__(
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
):
self.store_path = store_path
self.on_job = on_job # Callback to execute job, returns response text
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._start_task: asyncio.Task | None = None
self._running = False
def _load_store(self) -> CronStore:
"""Load jobs from disk."""
if self._store:
return self._store
if self.store_path.exists():
try:
data = json.loads(self.store_path.read_text())
jobs = []
for j in data.get("jobs", []):
jobs.append(CronJob(
id=j["id"],
name=j["name"],
enabled=j.get("enabled", True),
schedule=CronSchedule(
kind=j["schedule"]["kind"],
at_ms=j["schedule"].get("atMs"),
every_ms=j["schedule"].get("everyMs"),
expr=j["schedule"].get("expr"),
tz=j["schedule"].get("tz"),
),
payload=CronPayload(
kind=j["payload"].get("kind", "agent_turn"),
message=j["payload"].get("message", ""),
deliver=j["payload"].get("deliver", False),
channel=j["payload"].get("channel"),
to=j["payload"].get("to"),
reminder=j["payload"].get("reminder", False),
),
state=CronJobState(
next_run_at_ms=j.get("state", {}).get("nextRunAtMs"),
last_run_at_ms=j.get("state", {}).get("lastRunAtMs"),
last_status=j.get("state", {}).get("lastStatus"),
last_error=j.get("state", {}).get("lastError"),
),
created_at_ms=j.get("createdAtMs", 0),
updated_at_ms=j.get("updatedAtMs", 0),
delete_after_run=j.get("deleteAfterRun", False),
))
self._store = CronStore(jobs=jobs)
except Exception as e:
logger.warning(f"Failed to load cron store: {e}")
self._store = CronStore()
else:
self._store = CronStore()
return self._store
def _save_store(self) -> None:
"""Save jobs to disk."""
if not self._store:
return
self.store_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"version": self._store.version,
"jobs": [
{
"id": j.id,
"name": j.name,
"enabled": j.enabled,
"schedule": {
"kind": j.schedule.kind,
"atMs": j.schedule.at_ms,
"everyMs": j.schedule.every_ms,
"expr": j.schedule.expr,
"tz": j.schedule.tz,
},
"payload": {
"kind": j.payload.kind,
"message": j.payload.message,
"deliver": j.payload.deliver,
"channel": j.payload.channel,
"to": j.payload.to,
"reminder": j.payload.reminder,
},
"state": {
"nextRunAtMs": j.state.next_run_at_ms,
"lastRunAtMs": j.state.last_run_at_ms,
"lastStatus": j.state.last_status,
"lastError": j.state.last_error,
},
"createdAtMs": j.created_at_ms,
"updatedAtMs": j.updated_at_ms,
"deleteAfterRun": j.delete_after_run,
}
for j in self._store.jobs
]
}
self.store_path.write_text(json.dumps(data, indent=2))
async def start(self) -> None:
"""Start the cron service."""
self._running = True
self._load_store()
self._recompute_next_runs()
self._save_store()
self._arm_timer()
logger.info(f"Cron service started with {len(self._store.jobs if self._store else [])} jobs")
def stop(self) -> None:
"""Stop the cron service."""
self._running = False
if self._timer_task:
self._timer_task.cancel()
self._timer_task = None
if self._start_task:
self._start_task.cancel()
self._start_task = None
def _recompute_next_runs(self) -> None:
"""Recompute next run times for all enabled jobs."""
if not self._store:
return
now = _now_ms()
for job in self._store.jobs:
if job.enabled:
job.state.next_run_at_ms = _compute_next_run(job.schedule, now)
def _get_next_wake_ms(self) -> int | None:
"""Get the earliest next run time across all jobs."""
if not self._store:
return None
times = [j.state.next_run_at_ms for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms]
return min(times) if times else None
def _arm_timer(self) -> None:
"""Schedule the next timer tick."""
if self._timer_task:
self._timer_task.cancel()
next_wake = self._get_next_wake_ms()
if not next_wake:
return
# Auto-start if not running and there's an event loop
if not self._running:
try:
loop = asyncio.get_running_loop()
# Schedule start in the background (only if not already starting)
if not self._start_task or self._start_task.done():
async def auto_start():
try:
if not self._running:
await self.start()
except Exception as e:
logger.error(f"Failed to auto-start cron service: {e}")
finally:
self._start_task = None
self._start_task = loop.create_task(auto_start())
return # Will be re-armed after start
except RuntimeError:
# No event loop running, can't start
logger.warning("Cron service not started and no event loop available. Timer will not run.")
return
delay_ms = max(0, next_wake - _now_ms())
delay_s = delay_ms / 1000
async def tick():
await asyncio.sleep(delay_s)
if self._running:
await self._on_timer()
self._timer_task = asyncio.create_task(tick())
async def _on_timer(self) -> None:
"""Handle timer tick - run due jobs."""
if not self._store:
return
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
for job in due_jobs:
await self._execute_job(job)
self._save_store()
self._arm_timer()
async def _execute_job(self, job: CronJob) -> None:
"""Execute a single job."""
start_ms = _now_ms()
logger.info(f"Cron: executing job '{job.name}' ({job.id})")
try:
response = None
if self.on_job:
response = await self.on_job(job)
job.state.last_status = "ok"
job.state.last_error = None
logger.info(f"Cron: job '{job.name}' completed")
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)
logger.error(f"Cron: job '{job.name}' failed: {e}")
job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms()
# Handle one-shot jobs
if job.schedule.kind == "at":
if job.delete_after_run:
self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
else:
job.enabled = False
job.state.next_run_at_ms = None
else:
# Compute next run
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
# ========== Public API ==========
def list_jobs(self, include_disabled: bool = False) -> list[CronJob]:
"""List all jobs."""
store = self._load_store()
jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled]
return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf'))
def add_job(
self,
name: str,
schedule: CronSchedule,
message: str,
deliver: bool = False,
channel: str | None = None,
to: str | None = None,
delete_after_run: bool = False,
reminder: bool = False,
) -> CronJob:
"""Add a new job."""
store = self._load_store()
now = _now_ms()
job = CronJob(
id=str(uuid.uuid4())[:8],
name=name,
enabled=True,
schedule=schedule,
payload=CronPayload(
kind="agent_turn",
message=message,
deliver=deliver,
channel=channel,
to=to,
reminder=reminder,
),
state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)),
created_at_ms=now,
updated_at_ms=now,
delete_after_run=delete_after_run,
)
store.jobs.append(job)
self._save_store()
self._arm_timer()
logger.info(f"Cron: added job '{name}' ({job.id})")
return job
def remove_job(self, job_id: str) -> bool:
"""Remove a job by ID."""
store = self._load_store()
before = len(store.jobs)
store.jobs = [j for j in store.jobs if j.id != job_id]
removed = len(store.jobs) < before
if removed:
self._save_store()
self._arm_timer()
logger.info(f"Cron: removed job {job_id}")
return removed
def enable_job(self, job_id: str, enabled: bool = True) -> CronJob | None:
"""Enable or disable a job."""
store = self._load_store()
for job in store.jobs:
if job.id == job_id:
job.enabled = enabled
job.updated_at_ms = _now_ms()
if enabled:
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
else:
job.state.next_run_at_ms = None
self._save_store()
self._arm_timer()
return job
return None
async def run_job(self, job_id: str, force: bool = False) -> bool:
"""Manually run a job."""
store = self._load_store()
for job in store.jobs:
if job.id == job_id:
if not force and not job.enabled:
return False
await self._execute_job(job)
self._save_store()
self._arm_timer()
return True
return False
def status(self) -> dict:
"""Get service status."""
store = self._load_store()
return {
"enabled": self._running,
"jobs": len(store.jobs),
"next_wake_at_ms": self._get_next_wake_ms(),
}