"""Job management endpoints.""" from __future__ import annotations from datetime import datetime from fastapi import APIRouter, HTTPException, status from fastapi.responses import StreamingResponse from rq import Queue from rq.job import Job from redis import Redis import json import time from backend.schemas.jobs import JobResponse, JobStatus router = APIRouter(prefix="/jobs", tags=["jobs"]) # Redis connection for RQ redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) queue = Queue(connection=redis_conn) @router.get("/{job_id}", response_model=JobResponse) def get_job(job_id: str) -> JobResponse: """Get job status by ID.""" try: job = Job.fetch(job_id, connection=redis_conn) rq_status = job.get_status() status_map = { "queued": JobStatus.PENDING, "started": JobStatus.STARTED, # Job is actively running "finished": JobStatus.SUCCESS, "failed": JobStatus.FAILURE, } job_status = status_map.get(rq_status, JobStatus.PENDING) # If job is started, check if it has progress if rq_status == "started": # Job is running - show progress if available progress = job.meta.get("progress", 0) if job.meta else 0 message = job.meta.get("message", "Processing...") if job.meta else "Processing..." # Map to PROGRESS status if we have actual progress if progress > 0: job_status = JobStatus.PROGRESS elif job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS: progress = job.meta.get("progress", 0) if job.meta else 0 elif job_status == JobStatus.SUCCESS: progress = 100 else: progress = 0 message = job.meta.get("message", "") if job.meta else "" # Check if job was cancelled if job.meta and job.meta.get("cancelled", False): job_status = JobStatus.FAILURE message = job.meta.get("message", "Cancelled by user") # If job failed, include error message if rq_status == "failed" and job.exc_info: # Extract error message from exception info error_lines = job.exc_info.split("\n") if error_lines: message = f"Failed: {error_lines[0]}" return JobResponse( id=job.id, status=job_status, progress=progress, message=message, created_at=datetime.fromisoformat(str(job.created_at)), updated_at=datetime.fromisoformat( str(job.ended_at or job.started_at or job.created_at) ), ) except Exception as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found: {str(e)}", ) @router.get("/stream/{job_id}") def stream_job_progress(job_id: str): """Stream job progress via Server-Sent Events (SSE).""" def event_generator(): """Generate SSE events for job progress.""" last_progress = -1 last_message = "" while True: try: job = Job.fetch(job_id, connection=redis_conn) status_map = { "queued": JobStatus.PENDING, "started": JobStatus.STARTED, "finished": JobStatus.SUCCESS, "failed": JobStatus.FAILURE, } job_status = status_map.get(job.get_status(), JobStatus.PENDING) # Check if job was cancelled first if job.meta and job.meta.get("cancelled", False): job_status = JobStatus.FAILURE message = job.meta.get("message", "Cancelled by user") progress = job.meta.get("progress", 0) if job.meta else 0 else: progress = 0 if job_status == JobStatus.STARTED: # Job is running - show progress if available progress = job.meta.get("progress", 0) if job.meta else 0 # Map to PROGRESS status if we have actual progress if progress > 0: job_status = JobStatus.PROGRESS elif job_status == JobStatus.PROGRESS: progress = job.meta.get("progress", 0) if job.meta else 0 elif job_status == JobStatus.SUCCESS: progress = 100 elif job_status == JobStatus.FAILURE: progress = 0 message = job.meta.get("message", "") if job.meta else "" # Only send event if progress or message changed if progress != last_progress or message != last_message: event_data = { "id": job.id, "status": job_status.value, "progress": progress, "message": message, "processed": job.meta.get("processed", 0) if job.meta else 0, "total": job.meta.get("total", 0) if job.meta else 0, "faces_detected": job.meta.get("faces_detected", 0) if job.meta else 0, "faces_stored": job.meta.get("faces_stored", 0) if job.meta else 0, } yield f"data: {json.dumps(event_data)}\n\n" last_progress = progress last_message = message # Stop streaming if job is complete or failed if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE): break time.sleep(0.5) # Poll every 500ms except Exception as e: error_data = {"error": str(e)} yield f"data: {json.dumps(error_data)}\n\n" break return StreamingResponse( event_generator(), media_type="text/event-stream" ) @router.delete("/{job_id}") def cancel_job(job_id: str) -> dict: """Cancel a job (if queued) or stop a running job. Note: For running jobs, this sets a cancellation flag. The job will check this flag and exit gracefully. """ try: job = Job.fetch(job_id, connection=redis_conn) rq_status = job.get_status() if rq_status == "finished": return { "message": f"Job {job_id} is already finished", "status": "finished", } if rq_status == "failed": return { "message": f"Job {job_id} already failed", "status": "failed", } if rq_status == "queued": # Cancel queued job - remove from queue job.cancel() return { "message": f"Job {job_id} cancelled (was queued)", "status": "cancelled", } if rq_status == "started": # For running jobs, set cancellation flag in metadata # The task will check this and exit gracefully if job.meta is None: job.meta = {} job.meta["cancelled"] = True job.meta["message"] = "Cancellation requested..." job.save_meta() # Also try to cancel the job (which will interrupt it if possible) try: job.cancel() except Exception: # Job might already be running, that's OK pass return { "message": f"Job {job_id} cancellation requested", "status": "cancelling", } return { "message": f"Job {job_id} status: {rq_status}", "status": rq_status, } except Exception as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found: {str(e)}", )