The HEARTBEAT_OK_TOKEN comparison was broken because the token
itself ("HEARTBEAT_OK" with underscore) was being compared against
a response string that had underscores removed. This made the
condition always fail, preventing the heartbeat service from
recognizing "no tasks" responses.
Now both sides of the comparison remove underscores consistently,
allowing proper matching of the HEARTBEAT_OK token.
131 lines
4.2 KiB
Python
131 lines
4.2 KiB
Python
"""Heartbeat service - periodic agent wake-up to check for tasks."""
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Coroutine
|
|
|
|
from loguru import logger
|
|
|
|
# Default interval: 30 minutes
|
|
DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60
|
|
|
|
# The prompt sent to agent during heartbeat
|
|
HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists).
|
|
Follow any instructions or tasks listed there.
|
|
If nothing needs attention, reply with just: HEARTBEAT_OK"""
|
|
|
|
# Token that indicates "nothing to do"
|
|
HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK"
|
|
|
|
|
|
def _is_heartbeat_empty(content: str | None) -> bool:
|
|
"""Check if HEARTBEAT.md has no actionable content."""
|
|
if not content:
|
|
return True
|
|
|
|
# Lines to skip: empty, headers, HTML comments, empty checkboxes
|
|
skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"}
|
|
|
|
for line in content.split("\n"):
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or line.startswith("<!--") or line in skip_patterns:
|
|
continue
|
|
return False # Found actionable content
|
|
|
|
return True
|
|
|
|
|
|
class HeartbeatService:
|
|
"""
|
|
Periodic heartbeat service that wakes the agent to check for tasks.
|
|
|
|
The agent reads HEARTBEAT.md from the workspace and executes any
|
|
tasks listed there. If nothing needs attention, it replies HEARTBEAT_OK.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
workspace: Path,
|
|
on_heartbeat: Callable[[str], Coroutine[Any, Any, str]] | None = None,
|
|
interval_s: int = DEFAULT_HEARTBEAT_INTERVAL_S,
|
|
enabled: bool = True,
|
|
):
|
|
self.workspace = workspace
|
|
self.on_heartbeat = on_heartbeat
|
|
self.interval_s = interval_s
|
|
self.enabled = enabled
|
|
self._running = False
|
|
self._task: asyncio.Task | None = None
|
|
|
|
@property
|
|
def heartbeat_file(self) -> Path:
|
|
return self.workspace / "HEARTBEAT.md"
|
|
|
|
def _read_heartbeat_file(self) -> str | None:
|
|
"""Read HEARTBEAT.md content."""
|
|
if self.heartbeat_file.exists():
|
|
try:
|
|
return self.heartbeat_file.read_text()
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
async def start(self) -> None:
|
|
"""Start the heartbeat service."""
|
|
if not self.enabled:
|
|
logger.info("Heartbeat disabled")
|
|
return
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._run_loop())
|
|
logger.info(f"Heartbeat started (every {self.interval_s}s)")
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the heartbeat service."""
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
self._task = None
|
|
|
|
async def _run_loop(self) -> None:
|
|
"""Main heartbeat loop."""
|
|
while self._running:
|
|
try:
|
|
await asyncio.sleep(self.interval_s)
|
|
if self._running:
|
|
await self._tick()
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Heartbeat error: {e}")
|
|
|
|
async def _tick(self) -> None:
|
|
"""Execute a single heartbeat tick."""
|
|
content = self._read_heartbeat_file()
|
|
|
|
# Skip if HEARTBEAT.md is empty or doesn't exist
|
|
if _is_heartbeat_empty(content):
|
|
logger.debug("Heartbeat: no tasks (HEARTBEAT.md empty)")
|
|
return
|
|
|
|
logger.info("Heartbeat: checking for tasks...")
|
|
|
|
if self.on_heartbeat:
|
|
try:
|
|
response = await self.on_heartbeat(HEARTBEAT_PROMPT)
|
|
|
|
# Check if agent said "nothing to do"
|
|
if HEARTBEAT_OK_TOKEN.replace("_", "") in response.upper().replace("_", ""):
|
|
logger.info("Heartbeat: OK (no action needed)")
|
|
else:
|
|
logger.info(f"Heartbeat: completed task")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Heartbeat execution failed: {e}")
|
|
|
|
async def trigger_now(self) -> str | None:
|
|
"""Manually trigger a heartbeat."""
|
|
if self.on_heartbeat:
|
|
return await self.on_heartbeat(HEARTBEAT_PROMPT)
|
|
return None
|