All checks were successful
CI / skip-ci-check (pull_request) Successful in 8s
CI / lint-and-type-check (pull_request) Successful in 1m12s
CI / python-lint (pull_request) Successful in 36s
CI / test-backend (pull_request) Successful in 3m47s
CI / build (pull_request) Successful in 3m28s
CI / secret-scanning (pull_request) Successful in 14s
CI / dependency-scan (pull_request) Successful in 13s
CI / sast-scan (pull_request) Successful in 1m33s
CI / workflow-summary (pull_request) Successful in 5s
- Added new logging scripts for quick access to service logs and troubleshooting. - Updated job streaming API to support authentication via query parameters for EventSource. - Improved photo upload process to capture and validate EXIF dates and original modification times. - Enhanced error handling for file uploads and EXIF extraction failures. - Introduced new configuration options in ecosystem.config.js to prevent infinite crash loops.
284 lines
11 KiB
Python
284 lines
11 KiB
Python
"""Job management endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, status
|
|
from fastapi.responses import StreamingResponse
|
|
from rq import Queue
|
|
from rq.job import Job
|
|
from redis import Redis
|
|
import json
|
|
import time
|
|
from typing import Optional
|
|
|
|
from backend.schemas.jobs import JobResponse, JobStatus
|
|
from backend.api.auth import get_current_user_from_token
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
|
|
|
# Redis connection for RQ
|
|
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
|
queue = Queue(connection=redis_conn)
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobResponse)
|
|
def get_job(job_id: str) -> JobResponse:
|
|
"""Get job status by ID."""
|
|
try:
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
status_map = {
|
|
"queued": JobStatus.PENDING,
|
|
"started": JobStatus.STARTED, # Job is actively running
|
|
"finished": JobStatus.SUCCESS,
|
|
"failed": JobStatus.FAILURE,
|
|
}
|
|
job_status = status_map.get(rq_status, JobStatus.PENDING)
|
|
|
|
# If job is started, check if it has progress
|
|
if rq_status == "started":
|
|
# Job is running - show progress if available
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
message = job.meta.get("message", "Processing...") if job.meta else "Processing..."
|
|
# Map to PROGRESS status if we have actual progress
|
|
if progress > 0:
|
|
job_status = JobStatus.PROGRESS
|
|
elif job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS:
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
elif job_status == JobStatus.SUCCESS:
|
|
progress = 100
|
|
else:
|
|
progress = 0
|
|
|
|
message = job.meta.get("message", "") if job.meta else ""
|
|
|
|
# Check if job was cancelled
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
# If job finished gracefully after cancellation, mark as CANCELLED
|
|
if rq_status == "finished":
|
|
job_status = JobStatus.CANCELLED
|
|
# If still running, show current status but with cancellation message
|
|
elif rq_status == "started":
|
|
job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED
|
|
else:
|
|
job_status = JobStatus.CANCELLED
|
|
message = job.meta.get("message", "Cancelled by user")
|
|
|
|
# If job failed, include error message
|
|
if rq_status == "failed" and job.exc_info:
|
|
# Extract error message from exception info
|
|
error_lines = job.exc_info.split("\n")
|
|
if error_lines:
|
|
message = f"Failed: {error_lines[0]}"
|
|
|
|
return JobResponse(
|
|
id=job.id,
|
|
status=job_status,
|
|
progress=progress,
|
|
message=message,
|
|
created_at=datetime.fromisoformat(str(job.created_at)),
|
|
updated_at=datetime.fromisoformat(
|
|
str(job.ended_at or job.started_at or job.created_at)
|
|
),
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Job {job_id} not found: {str(e)}",
|
|
)
|
|
|
|
|
|
@router.get("/stream/{job_id}")
|
|
def stream_job_progress(
|
|
job_id: str,
|
|
token: Optional[str] = Query(None, description="JWT token for authentication"),
|
|
):
|
|
"""Stream job progress via Server-Sent Events (SSE).
|
|
|
|
Note: EventSource cannot send custom headers, so authentication
|
|
is done via query parameter 'token'.
|
|
"""
|
|
# Authenticate user via token query parameter (required for EventSource)
|
|
if not token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required. Provide 'token' query parameter.",
|
|
)
|
|
|
|
try:
|
|
get_current_user_from_token(token)
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
def event_generator():
|
|
"""Generate SSE events for job progress."""
|
|
last_progress = -1
|
|
last_message = ""
|
|
|
|
while True:
|
|
try:
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
status_map = {
|
|
"queued": JobStatus.PENDING,
|
|
"started": JobStatus.STARTED,
|
|
"finished": JobStatus.SUCCESS,
|
|
"failed": JobStatus.FAILURE,
|
|
}
|
|
job_status = status_map.get(rq_status, JobStatus.PENDING)
|
|
|
|
# Check if job was cancelled - this takes priority
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
# If job is finished and was cancelled, it completed gracefully
|
|
if rq_status == "finished":
|
|
job_status = JobStatus.CANCELLED
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
# If job is still running but cancellation was requested, keep it as PROGRESS/STARTED
|
|
# until it actually stops
|
|
elif rq_status == "started":
|
|
# Job is still running - let it finish current photo
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED
|
|
else:
|
|
job_status = JobStatus.CANCELLED
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
message = job.meta.get("message", "Cancelled by user")
|
|
else:
|
|
progress = 0
|
|
if job_status == JobStatus.STARTED:
|
|
# Job is running - show progress if available
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
# Map to PROGRESS status if we have actual progress
|
|
if progress > 0:
|
|
job_status = JobStatus.PROGRESS
|
|
elif job_status == JobStatus.PROGRESS:
|
|
progress = job.meta.get("progress", 0) if job.meta else 0
|
|
elif job_status == JobStatus.SUCCESS:
|
|
progress = 100
|
|
elif job_status == JobStatus.FAILURE:
|
|
progress = 0
|
|
|
|
message = job.meta.get("message", "") if job.meta else ""
|
|
|
|
# Only send event if progress or message changed
|
|
if progress != last_progress or message != last_message:
|
|
event_data = {
|
|
"id": job.id,
|
|
"status": job_status.value,
|
|
"progress": progress,
|
|
"message": message,
|
|
"processed": job.meta.get("processed", 0) if job.meta else 0,
|
|
"total": job.meta.get("total", 0) if job.meta else 0,
|
|
"faces_detected": job.meta.get("faces_detected", 0) if job.meta else 0,
|
|
"faces_stored": job.meta.get("faces_stored", 0) if job.meta else 0,
|
|
}
|
|
|
|
yield f"data: {json.dumps(event_data)}\n\n"
|
|
last_progress = progress
|
|
last_message = message
|
|
|
|
# Stop streaming if job is complete, failed, or cancelled
|
|
if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE, JobStatus.CANCELLED):
|
|
break
|
|
|
|
time.sleep(0.5) # Poll every 500ms
|
|
|
|
except Exception as e:
|
|
error_data = {"error": str(e)}
|
|
yield f"data: {json.dumps(error_data)}\n\n"
|
|
break
|
|
|
|
return StreamingResponse(
|
|
event_generator(), media_type="text/event-stream"
|
|
)
|
|
|
|
|
|
@router.delete("/{job_id}")
|
|
def cancel_job(job_id: str) -> dict:
|
|
"""Cancel a job (if queued) or stop a running job.
|
|
|
|
Note: For running jobs, this sets a cancellation flag.
|
|
The job will check this flag and exit gracefully.
|
|
"""
|
|
try:
|
|
print(f"[Jobs API] Cancel request for job_id={job_id}")
|
|
job = Job.fetch(job_id, connection=redis_conn)
|
|
rq_status = job.get_status()
|
|
print(f"[Jobs API] Job {job_id} current status: {rq_status}")
|
|
|
|
if rq_status == "finished":
|
|
return {
|
|
"message": f"Job {job_id} is already finished",
|
|
"status": "finished",
|
|
}
|
|
|
|
if rq_status == "failed":
|
|
return {
|
|
"message": f"Job {job_id} already failed",
|
|
"status": "failed",
|
|
}
|
|
|
|
if rq_status == "queued":
|
|
# Cancel queued job - remove from queue
|
|
job.cancel()
|
|
print(f"[Jobs API] ✓ Cancelled queued job {job_id}")
|
|
return {
|
|
"message": f"Job {job_id} cancelled (was queued)",
|
|
"status": "cancelled",
|
|
}
|
|
|
|
if rq_status == "started":
|
|
# For running jobs, set cancellation flag in metadata
|
|
# The task will check this and exit gracefully
|
|
print(f"[Jobs API] Setting cancellation flag for running job {job_id}")
|
|
if job.meta is None:
|
|
job.meta = {}
|
|
job.meta["cancelled"] = True
|
|
job.meta["message"] = "Cancellation requested..."
|
|
# CRITICAL: Save metadata immediately and verify it was saved
|
|
job.save_meta()
|
|
print(f"[Jobs API] Saved metadata for job {job_id}")
|
|
|
|
# Verify the flag was saved by fetching fresh
|
|
try:
|
|
fresh_job = Job.fetch(job_id, connection=redis_conn)
|
|
if not fresh_job.meta or not fresh_job.meta.get("cancelled", False):
|
|
print(f"[Jobs API] ❌ WARNING: Cancellation flag NOT found after save for job {job_id}")
|
|
print(f"[Jobs API] Fresh job meta: {fresh_job.meta}")
|
|
else:
|
|
print(f"[Jobs API] ✓ Verified: Cancellation flag is set for job {job_id}")
|
|
except Exception as verify_error:
|
|
print(f"[Jobs API] ⚠️ Could not verify cancellation flag: {verify_error}")
|
|
|
|
# Also try to cancel the job (which will interrupt it if possible)
|
|
# This sends a signal to the worker process
|
|
try:
|
|
job.cancel()
|
|
print(f"[Jobs API] ✓ RQ cancel() called for job {job_id}")
|
|
except Exception as cancel_error:
|
|
# Job might already be running, that's OK - metadata flag will be checked
|
|
print(f"[Jobs API] Note: RQ cancel() raised exception (may be expected): {cancel_error}")
|
|
|
|
return {
|
|
"message": f"Job {job_id} cancellation requested",
|
|
"status": "cancelling",
|
|
}
|
|
|
|
print(f"[Jobs API] Job {job_id} in unexpected status: {rq_status}")
|
|
return {
|
|
"message": f"Job {job_id} status: {rq_status}",
|
|
"status": rq_status,
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"[Jobs API] ❌ Error cancelling job {job_id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Job {job_id} not found: {str(e)}",
|
|
)
|
|
|