This commit introduces a new `CANCELLED` status to the job management system, allowing users to cancel ongoing jobs. The frontend is updated to handle job cancellation requests, providing user feedback during the cancellation process. Additionally, the backend is enhanced to manage job statuses more effectively, ensuring that jobs can be marked as cancelled and that appropriate messages are displayed to users. This improvement enhances the overall user experience by providing better control over job processing.
264 lines
11 KiB
Python
264 lines
11 KiB
Python
"""Job management endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, HTTPException, status
|
|
from fastapi.responses import StreamingResponse
|
|
from rq import Queue
|
|
from rq.job import Job
|
|
from redis import Redis
|
|
import json
|
|
import time
|
|
|
|
from backend.schemas.jobs import JobResponse, JobStatus
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
|
|
|
# Redis connection for RQ
|
|
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
|
queue = Queue(connection=redis_conn)
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobResponse)
|
|
def get_job(job_id: str) -> JobResponse:
|
|
"""Get job status by ID."""
|
|
try:
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
status_map = {
|
|
"queued": JobStatus.PENDING,
|
|
"started": JobStatus.STARTED, # Job is actively running
|
|
"finished": JobStatus.SUCCESS,
|
|
"failed": JobStatus.FAILURE,
|
|
}
|
|
job_status = status_map.get(rq_status, JobStatus.PENDING)
|
|
|
|
# If job is started, check if it has progress
|
|
if rq_status == "started":
|
|
# Job is running - show progress if available
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
message = job.meta.get("message", "Processing...") if job.meta else "Processing..."
|
|
# Map to PROGRESS status if we have actual progress
|
|
if progress > 0:
|
|
job_status = JobStatus.PROGRESS
|
|
elif job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS:
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
elif job_status == JobStatus.SUCCESS:
|
|
progress = 100
|
|
else:
|
|
progress = 0
|
|
|
|
message = job.meta.get("message", "") if job.meta else ""
|
|
|
|
# Check if job was cancelled
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
# If job finished gracefully after cancellation, mark as CANCELLED
|
|
if rq_status == "finished":
|
|
job_status = JobStatus.CANCELLED
|
|
# If still running, show current status but with cancellation message
|
|
elif rq_status == "started":
|
|
job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED
|
|
else:
|
|
job_status = JobStatus.CANCELLED
|
|
message = job.meta.get("message", "Cancelled by user")
|
|
|
|
# If job failed, include error message
|
|
if rq_status == "failed" and job.exc_info:
|
|
# Extract error message from exception info
|
|
error_lines = job.exc_info.split("\n")
|
|
if error_lines:
|
|
message = f"Failed: {error_lines[0]}"
|
|
|
|
return JobResponse(
|
|
id=job.id,
|
|
status=job_status,
|
|
progress=progress,
|
|
message=message,
|
|
created_at=datetime.fromisoformat(str(job.created_at)),
|
|
updated_at=datetime.fromisoformat(
|
|
str(job.ended_at or job.started_at or job.created_at)
|
|
),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Job {job_id} not found: {str(e)}",
|
|
)
|
|
|
|
|
|
@router.get("/stream/{job_id}")
|
|
def stream_job_progress(job_id: str):
|
|
"""Stream job progress via Server-Sent Events (SSE)."""
|
|
|
|
def event_generator():
|
|
"""Generate SSE events for job progress."""
|
|
last_progress = -1
|
|
last_message = ""
|
|
|
|
while True:
|
|
try:
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
status_map = {
|
|
"queued": JobStatus.PENDING,
|
|
"started": JobStatus.STARTED,
|
|
"finished": JobStatus.SUCCESS,
|
|
"failed": JobStatus.FAILURE,
|
|
}
|
|
job_status = status_map.get(rq_status, JobStatus.PENDING)
|
|
|
|
# Check if job was cancelled - this takes priority
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
# If job is finished and was cancelled, it completed gracefully
|
|
if rq_status == "finished":
|
|
job_status = JobStatus.CANCELLED
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
# If job is still running but cancellation was requested, keep it as PROGRESS/STARTED
|
|
# until it actually stops
|
|
elif rq_status == "started":
|
|
# Job is still running - let it finish current photo
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED
|
|
else:
|
|
job_status = JobStatus.CANCELLED
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
message = job.meta.get("message", "Cancelled by user")
|
|
else:
|
|
progress = 0
|
|
if job_status == JobStatus.STARTED:
|
|
# Job is running - show progress if available
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
# Map to PROGRESS status if we have actual progress
|
|
if progress > 0:
|
|
job_status = JobStatus.PROGRESS
|
|
elif job_status == JobStatus.PROGRESS:
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
elif job_status == JobStatus.SUCCESS:
|
|
progress = 100
|
|
elif job_status == JobStatus.FAILURE:
|
|
progress = 0
|
|
|
|
message = job.meta.get("message", "") if job.meta else ""
|
|
|
|
# Only send event if progress or message changed
|
|
if progress != last_progress or message != last_message:
|
|
event_data = {
|
|
"id": job.id,
|
|
"status": job_status.value,
|
|
"progress": progress,
|
|
"message": message,
|
|
"processed": job.meta.get("processed", 0) if job.meta else 0,
|
|
"total": job.meta.get("total", 0) if job.meta else 0,
|
|
"faces_detected": job.meta.get("faces_detected", 0) if job.meta else 0,
|
|
"faces_stored": job.meta.get("faces_stored", 0) if job.meta else 0,
|
|
}
|
|
|
|
yield f"data: {json.dumps(event_data)}\n\n"
|
|
last_progress = progress
|
|
last_message = message
|
|
|
|
# Stop streaming if job is complete, failed, or cancelled
|
|
if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE, JobStatus.CANCELLED):
|
|
break
|
|
|
|
time.sleep(0.5) # Poll every 500ms
|
|
|
|
except Exception as e:
|
|
error_data = {"error": str(e)}
|
|
yield f"data: {json.dumps(error_data)}\n\n"
|
|
break
|
|
|
|
return StreamingResponse(
|
|
event_generator(), media_type="text/event-stream"
|
|
)
|
|
|
|
|
|
@router.delete("/{job_id}")
|
|
def cancel_job(job_id: str) -> dict:
|
|
"""Cancel a job (if queued) or stop a running job.
|
|
|
|
Note: For running jobs, this sets a cancellation flag.
|
|
The job will check this flag and exit gracefully.
|
|
"""
|
|
try:
|
|
print(f"[Jobs API] Cancel request for job_id={job_id}")
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
print(f"[Jobs API] Job {job_id} current status: {rq_status}")
|
|
|
|
if rq_status == "finished":
|
|
return {
|
|
"message": f"Job {job_id} is already finished",
|
|
"status": "finished",
|
|
}
|
|
|
|
if rq_status == "failed":
|
|
return {
|
|
"message": f"Job {job_id} already failed",
|
|
"status": "failed",
|
|
}
|
|
|
|
if rq_status == "queued":
|
|
# Cancel queued job - remove from queue
|
|
job.cancel()
|
|
print(f"[Jobs API] ✓ Cancelled queued job {job_id}")
|
|
return {
|
|
"message": f"Job {job_id} cancelled (was queued)",
|
|
"status": "cancelled",
|
|
}
|
|
|
|
if rq_status == "started":
|
|
# For running jobs, set cancellation flag in metadata
|
|
# The task will check this and exit gracefully
|
|
print(f"[Jobs API] Setting cancellation flag for running job {job_id}")
|
|
if job.meta is None:
|
|
job.meta = {}
|
|
job.meta["cancelled"] = True
|
|
job.meta["message"] = "Cancellation requested..."
|
|
# CRITICAL: Save metadata immediately and verify it was saved
|
|
job.save_meta()
|
|
print(f"[Jobs API] Saved metadata for job {job_id}")
|
|
|
|
# Verify the flag was saved by fetching fresh
|
|
try:
|
|
fresh_job = Job.fetch(job_id, connection=redis_conn)
|
|
if not fresh_job.meta or not fresh_job.meta.get("cancelled", False):
|
|
print(f"[Jobs API] ❌ WARNING: Cancellation flag NOT found after save for job {job_id}")
|
|
print(f"[Jobs API] Fresh job meta: {fresh_job.meta}")
|
|
else:
|
|
print(f"[Jobs API] ✓ Verified: Cancellation flag is set for job {job_id}")
|
|
except Exception as verify_error:
|
|
print(f"[Jobs API] ⚠️ Could not verify cancellation flag: {verify_error}")
|
|
|
|
# Also try to cancel the job (which will interrupt it if possible)
|
|
# This sends a signal to the worker process
|
|
try:
|
|
job.cancel()
|
|
print(f"[Jobs API] ✓ RQ cancel() called for job {job_id}")
|
|
except Exception as cancel_error:
|
|
# Job might already be running, that's OK - metadata flag will be checked
|
|
print(f"[Jobs API] Note: RQ cancel() raised exception (may be expected): {cancel_error}")
|
|
|
|
return {
|
|
"message": f"Job {job_id} cancellation requested",
|
|
"status": "cancelling",
|
|
}
|
|
|
|
print(f"[Jobs API] Job {job_id} in unexpected status: {rq_status}")
|
|
return {
|
|
"message": f"Job {job_id} status: {rq_status}",
|
|
"status": rq_status,
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"[Jobs API] ❌ Error cancelling job {job_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Job {job_id} not found: {str(e)}",
|
|
)
|
|
|