Fix cron scheduled tasks not executing

- Add auto-start functionality to cron service when jobs are added
  if service is not running and event loop is available
- Add 'reminder' field to CronPayload to distinguish between
  simple reminders (send message directly) and tasks (execute via agent)
- Update cron tool to accept 'reminder' parameter
- Fix callback logic to check reminder field: reminders send directly,
  tasks are processed through agent
- Ensures both 'remind me to X' and 'schedule a task to do X' work correctly
This commit is contained in:
tanyar09 2026-03-03 13:09:44 -05:00
parent ac334e9cf7
commit d9919828c5
4 changed files with 160 additions and 41 deletions

View File

@ -26,7 +26,7 @@ class CronTool(Tool):
@property @property
def description(self) -> str: def description(self) -> str:
return "Schedule reminders and recurring tasks. Actions: add, list, remove." return "Schedule reminders and recurring tasks. REQUIRED: Always include 'action' parameter ('add', 'list', or 'remove'). For reminders, use action='add' with message and timing (in_seconds, at, every_seconds, or cron_expr)."
@property @property
def parameters(self) -> dict[str, Any]: def parameters(self) -> dict[str, Any]:
@ -36,7 +36,7 @@ class CronTool(Tool):
"action": { "action": {
"type": "string", "type": "string",
"enum": ["add", "list", "remove"], "enum": ["add", "list", "remove"],
"description": "Action to perform" "description": "REQUIRED: Action to perform. Use 'add' to create a reminder, 'list' to see all jobs, or 'remove' to delete a job."
}, },
"message": { "message": {
"type": "string", "type": "string",
@ -56,7 +56,15 @@ class CronTool(Tool):
}, },
"at": { "at": {
"type": "string", "type": "string",
"description": "ISO datetime for one-time execution (e.g. '2026-02-12T10:30:00')" "description": "ISO datetime string for one-time execution. Format: YYYY-MM-DDTHH:MM:SS (e.g. '2026-03-03T12:19:30'). You MUST calculate this from the current time shown in your system prompt plus the requested seconds/minutes, then format as ISO string."
},
"in_seconds": {
"type": "integer",
"description": "Alternative to 'at': Schedule reminder in N seconds from now. Use this instead of calculating 'at' manually. Example: in_seconds=25 for 'remind me in 25 seconds'."
},
"reminder": {
"type": "boolean",
"description": "If true, this is a simple reminder (message sent directly to user). If false or omitted, this is a task (agent executes the message). Use reminder=true for 'remind me to X', reminder=false for 'schedule a task to do X'."
}, },
"job_id": { "job_id": {
"type": "string", "type": "string",
@ -74,11 +82,18 @@ class CronTool(Tool):
cron_expr: str | None = None, cron_expr: str | None = None,
tz: str | None = None, tz: str | None = None,
at: str | None = None, at: str | None = None,
in_seconds: int | None = None,
reminder: bool = False,
job_id: str | None = None, job_id: str | None = None,
**kwargs: Any **kwargs: Any
) -> str: ) -> str:
from loguru import logger
logger.debug(f"CronTool.execute: action={action}, message={message[:50] if message else None}, every_seconds={every_seconds}, at={at}, in_seconds={in_seconds}, reminder={reminder}, channel={self._channel}, chat_id={self._chat_id}")
if action == "add": if action == "add":
return self._add_job(message, every_seconds, cron_expr, tz, at) result = self._add_job(message, every_seconds, cron_expr, tz, at, in_seconds, reminder)
logger.debug(f"CronTool._add_job result: {result}")
return result
elif action == "list": elif action == "list":
return self._list_jobs() return self._list_jobs()
elif action == "remove": elif action == "remove":
@ -92,45 +107,103 @@ class CronTool(Tool):
cron_expr: str | None, cron_expr: str | None,
tz: str | None, tz: str | None,
at: str | None, at: str | None,
in_seconds: int | None = None,
reminder: bool = False,
) -> str: ) -> str:
if not message: if not message:
return "Error: message is required for add" return "Error: message is required for add"
if not self._channel or not self._chat_id:
return "Error: no session context (channel/chat_id)" # Use defaults for CLI mode if context not set
if tz and not cron_expr: channel = self._channel or "cli"
return "Error: tz can only be used with cron_expr" chat_id = self._chat_id or "direct"
if tz:
# Validate timezone only if used with cron_expr
if tz and cron_expr:
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
try: try:
ZoneInfo(tz) ZoneInfo(tz)
except (KeyError, Exception): except (KeyError, Exception):
return f"Error: unknown timezone '{tz}'" return f"Error: unknown timezone '{tz}'"
elif tz and not cron_expr:
# Ignore tz if not used with cron_expr (common mistake)
tz = None
# Build schedule # Build schedule - prioritize 'in_seconds' for relative time, then 'at' for absolute time
delete_after = False delete_after = False
if every_seconds:
schedule = CronSchedule(kind="every", every_ms=every_seconds * 1000) # Handle relative time (in_seconds) - compute datetime automatically
elif cron_expr: if in_seconds is not None:
schedule = CronSchedule(kind="cron", expr=cron_expr, tz=tz) from datetime import datetime, timedelta
elif at: from time import time as _time
future_time = datetime.now() + timedelta(seconds=in_seconds)
at = future_time.isoformat()
# Fall through to 'at' handling below
if at:
# One-time reminder at specific time
from datetime import datetime from datetime import datetime
try:
# Check if agent passed description text, Python code, or other invalid values
if "iso datetime" in at.lower() or "e.g." in at.lower() or "example" in at.lower() or at.startswith("("):
return f"Error: You passed description text '{at}' instead of an actual datetime string. You must: 1) Read current time from system prompt (e.g. '2026-03-03 12:19:04'), 2) Add requested seconds/minutes to it, 3) Format as ISO string like '2026-03-03T12:19:29'. Do NOT use description text or examples."
if "datetime.now()" in at or "timedelta" in at:
return f"Error: You passed Python code '{at}' instead of an actual datetime string. You must compute the datetime value first, then pass the ISO format string (e.g. '2026-03-03T12:19:29')."
dt = datetime.fromisoformat(at) dt = datetime.fromisoformat(at)
# If datetime is naive (no timezone), assume local timezone
if dt.tzinfo is None:
import time
# Get local timezone offset
local_offset = time.timezone if (time.daylight == 0) else time.altzone
# Convert naive datetime to UTC-aware for consistent timestamp calculation
dt = dt.replace(tzinfo=None)
# Calculate timestamp assuming local time
at_ms = int(dt.timestamp() * 1000) at_ms = int(dt.timestamp() * 1000)
else:
at_ms = int(dt.timestamp() * 1000)
# Validate that the time is in the future (allow 5 second buffer for processing)
from time import time as _time
from datetime import datetime as _dt
now_ms = int(_time() * 1000)
buffer_ms = 5000 # 5 second buffer for processing time
if at_ms <= (now_ms + buffer_ms):
now_str = _dt.now().strftime("%Y-%m-%d %H:%M:%S")
scheduled_str = _dt.fromtimestamp(at_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
diff_sec = (now_ms - at_ms) / 1000
if diff_sec > 0:
return f"Error: scheduled time ({scheduled_str}) is in the past by {diff_sec:.0f} seconds. Current time is {now_str}. You must ADD the requested seconds to the current time. Example: if current time is 12:21:46 and user wants reminder in 25 seconds, calculate 12:21:46 + 25 seconds = 12:22:11, then pass '2026-03-03T12:22:11'."
else:
return f"Error: scheduled time ({scheduled_str}) is too close to current time ({now_str}). You must ADD the requested seconds to the current time. Example: if current time is 12:21:46 and user wants reminder in 25 seconds, calculate 12:21:46 + 25 seconds = 12:22:11, then pass '2026-03-03T12:22:11'."
schedule = CronSchedule(kind="at", at_ms=at_ms) schedule = CronSchedule(kind="at", at_ms=at_ms)
delete_after = True delete_after = True
except (ValueError, Exception) as e:
return f"Error: invalid datetime format for 'at': {str(e)}. Expected ISO format like '2026-03-03T12:05:30', not Python code."
elif every_seconds:
# Recurring reminder
schedule = CronSchedule(kind="every", every_ms=every_seconds * 1000)
elif cron_expr:
# Cron expression
schedule = CronSchedule(kind="cron", expr=cron_expr, tz=tz)
else: else:
return "Error: either every_seconds, cron_expr, or at is required" return "Error: either every_seconds, cron_expr, or at is required"
try:
job = self._cron.add_job( job = self._cron.add_job(
name=message[:30], name=message[:30],
schedule=schedule, schedule=schedule,
message=message, message=message,
deliver=True, deliver=True,
channel=self._channel, channel=channel,
to=self._chat_id, to=chat_id,
delete_after_run=delete_after, delete_after_run=delete_after,
reminder=reminder,
) )
return f"Created job '{job.name}' (id: {job.id})" return f"Created job '{job.name}' (id: {job.id})"
except Exception as e:
return f"Error creating cron job: {str(e)}"
def _list_jobs(self) -> str: def _list_jobs(self) -> str:
jobs = self._cron.list_jobs() jobs = self._cron.list_jobs()

View File

@ -420,6 +420,20 @@ def gateway(
# Set cron callback (needs agent) # Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None: async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent.""" """Execute a cron job through the agent."""
# Check if this is a simple reminder or a task
if job.payload.reminder:
# Simple reminder - send message directly without agent processing
if job.payload.deliver and job.payload.to:
from nanobot.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=job.payload.message,
metadata={"source": "cron_reminder", "job_id": job.id} # Mark as reminder
))
return job.payload.message
else:
# Task mode - process through agent
response = await agent.process_direct( response = await agent.process_direct(
job.payload.message, job.payload.message,
session_key=f"cron:{job.id}", session_key=f"cron:{job.id}",

View File

@ -57,6 +57,7 @@ class CronService:
self.on_job = on_job # Callback to execute job, returns response text self.on_job = on_job # Callback to execute job, returns response text
self._store: CronStore | None = None self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None self._timer_task: asyncio.Task | None = None
self._start_task: asyncio.Task | None = None
self._running = False self._running = False
def _load_store(self) -> CronStore: def _load_store(self) -> CronStore:
@ -86,6 +87,7 @@ class CronService:
deliver=j["payload"].get("deliver", False), deliver=j["payload"].get("deliver", False),
channel=j["payload"].get("channel"), channel=j["payload"].get("channel"),
to=j["payload"].get("to"), to=j["payload"].get("to"),
reminder=j["payload"].get("reminder", False),
), ),
state=CronJobState( state=CronJobState(
next_run_at_ms=j.get("state", {}).get("nextRunAtMs"), next_run_at_ms=j.get("state", {}).get("nextRunAtMs"),
@ -133,6 +135,7 @@ class CronService:
"deliver": j.payload.deliver, "deliver": j.payload.deliver,
"channel": j.payload.channel, "channel": j.payload.channel,
"to": j.payload.to, "to": j.payload.to,
"reminder": j.payload.reminder,
}, },
"state": { "state": {
"nextRunAtMs": j.state.next_run_at_ms, "nextRunAtMs": j.state.next_run_at_ms,
@ -165,6 +168,9 @@ class CronService:
if self._timer_task: if self._timer_task:
self._timer_task.cancel() self._timer_task.cancel()
self._timer_task = None self._timer_task = None
if self._start_task:
self._start_task.cancel()
self._start_task = None
def _recompute_next_runs(self) -> None: def _recompute_next_runs(self) -> None:
"""Recompute next run times for all enabled jobs.""" """Recompute next run times for all enabled jobs."""
@ -189,7 +195,28 @@ class CronService:
self._timer_task.cancel() self._timer_task.cancel()
next_wake = self._get_next_wake_ms() next_wake = self._get_next_wake_ms()
if not next_wake or not self._running: if not next_wake:
return
# Auto-start if not running and there's an event loop
if not self._running:
try:
loop = asyncio.get_running_loop()
# Schedule start in the background (only if not already starting)
if not self._start_task or self._start_task.done():
async def auto_start():
try:
if not self._running:
await self.start()
except Exception as e:
logger.error(f"Failed to auto-start cron service: {e}")
finally:
self._start_task = None
self._start_task = loop.create_task(auto_start())
return # Will be re-armed after start
except RuntimeError:
# No event loop running, can't start
logger.warning("Cron service not started and no event loop available. Timer will not run.")
return return
delay_ms = max(0, next_wake - _now_ms()) delay_ms = max(0, next_wake - _now_ms())
@ -269,6 +296,7 @@ class CronService:
channel: str | None = None, channel: str | None = None,
to: str | None = None, to: str | None = None,
delete_after_run: bool = False, delete_after_run: bool = False,
reminder: bool = False,
) -> CronJob: ) -> CronJob:
"""Add a new job.""" """Add a new job."""
store = self._load_store() store = self._load_store()
@ -285,6 +313,7 @@ class CronService:
deliver=deliver, deliver=deliver,
channel=channel, channel=channel,
to=to, to=to,
reminder=reminder,
), ),
state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)), state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)),
created_at_ms=now, created_at_ms=now,

View File

@ -27,6 +27,9 @@ class CronPayload:
deliver: bool = False deliver: bool = False
channel: str | None = None # e.g. "whatsapp" channel: str | None = None # e.g. "whatsapp"
to: str | None = None # e.g. phone number to: str | None = None # e.g. phone number
# If True, this is a simple reminder (send message directly)
# If False, this is a task (agent executes the message)
reminder: bool = False
@dataclass @dataclass