Tanya 0b95cd2492 feat: Add job cancellation support and update job status handling
This commit introduces a new `CANCELLED` status to the job management system, allowing users to cancel ongoing jobs. The frontend is updated to handle job cancellation requests, providing user feedback during the cancellation process. Additionally, the backend is enhanced to manage job statuses more effectively, ensuring that jobs can be marked as cancelled and that appropriate messages are displayed to users. This improvement enhances the overall user experience by providing better control over job processing.
2026-01-05 13:09:32 -05:00

296 lines
10 KiB
Python

"""RQ worker tasks for PunimTag."""
from __future__ import annotations
from typing import Optional
from rq import get_current_job
from sqlalchemy.orm import Session
from backend.db.session import SessionLocal
from backend.services.photo_service import import_photos_from_folder
from backend.services.face_service import process_unprocessed_photos
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
"""RQ task to import photos from a folder.
Updates job metadata with progress:
- progress: 0-100
- message: status message
- processed: number of photos processed
- total: total photos found
- added: number of new photos added
- existing: number of photos that already existed
"""
job = get_current_job()
if not job:
raise RuntimeError("Not running in RQ job context")
db: Session = SessionLocal()
try:
def update_progress(processed: int, total: int, current_file: str) -> None:
"""Update job progress."""
if job:
progress = int((processed / total) * 100) if total > 0 else 0
job.meta = {
"progress": progress,
"message": f"Processing {current_file}... ({processed}/{total})",
"processed": processed,
"total": total,
}
job.save_meta()
# Import photos
added, existing = import_photos_from_folder(
db, folder_path, recursive, update_progress
)
# Final update
total_processed = added + existing
result = {
"folder_path": folder_path,
"recursive": recursive,
"added": added,
"existing": existing,
"total": total_processed,
}
if job:
job.meta = {
"progress": 100,
"message": f"Completed: {added} new, {existing} existing",
"processed": total_processed,
"total": total_processed,
"added": added,
"existing": existing,
}
job.save_meta()
return result
finally:
db.close()
def process_faces_task(
batch_size: Optional[int] = None,
detector_backend: str = "retinaface",
model_name: str = "ArcFace",
) -> dict:
"""RQ task to process faces in unprocessed photos.
Updates job metadata with progress:
- progress: 0-100
- message: status message
- processed: number of photos processed
- total: total photos to process
- faces_detected: total faces detected
- faces_stored: total faces stored
"""
import traceback
job = get_current_job()
if not job:
raise RuntimeError("Not running in RQ job context")
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
# Update progress immediately - job started
try:
if job:
job.meta = {
"progress": 0,
"message": "Initializing face processing...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
except Exception as e:
print(f"[Task] Error setting initial job metadata: {e}")
db: Session = SessionLocal()
# Initialize result variables
photos_processed = 0
total_faces_detected = 0
total_faces_stored = 0
try:
def update_progress(
processed: int,
total: int,
current_file: str,
faces_detected: int,
faces_stored: int,
) -> None:
"""Update job progress.
NOTE: This function does NOT check for cancellation to avoid interrupting
photo processing. Cancellation is checked in process_unprocessed_photos
BEFORE starting each photo and AFTER completing it, ensuring the current
photo always finishes fully.
"""
try:
if job:
# Calculate progress: 10% for setup, 90% for processing
if total == 0:
# Setup phase
progress = min(10, processed * 2) # 0-10% during setup
else:
# Processing phase
progress = 10 + int((processed / total) * 90) if total > 0 else 10
# Avoid duplicating "Processing" if current_file already contains it
if current_file.startswith("Processing "):
message = f"{current_file} ({processed}/{total})" if total > 0 else current_file
else:
message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file
job.meta = {
"progress": progress,
"message": message,
"processed": processed,
"total": total,
"faces_detected": faces_detected,
"faces_stored": faces_stored,
}
job.save_meta()
except Exception as e:
# Log but don't fail the batch if progress update fails
try:
print(f"[Task] Warning: Failed to update progress: {e}")
except (BrokenPipeError, OSError):
pass
# Update progress - finding photos
if job:
job.meta = {
"progress": 5,
"message": "Finding photos to process...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
# Process faces
# Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails
try:
photos_processed, total_faces_detected, total_faces_stored = (
process_unprocessed_photos(
db,
batch_size=batch_size,
detector_backend=detector_backend,
model_name=model_name,
update_progress=update_progress,
)
)
except Exception as e:
# If process_unprocessed_photos fails, preserve any progress made
# and re-raise so the outer handler can log it properly
try:
print(f"[Task] Error in process_unprocessed_photos: {e}")
except (BrokenPipeError, OSError):
pass
# Re-raise to be handled by outer exception handler
raise
# Final update
result = {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
}
if job:
job.meta = {
"progress": 100,
"message": (
f"Completed: {photos_processed} photos, "
f"{total_faces_stored} faces stored"
),
"processed": photos_processed,
"total": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
}
job.save_meta()
return result
except KeyboardInterrupt as e:
# Job was cancelled - exit gracefully
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
if job:
try:
job.meta = job.meta or {}
job.meta.update({
"progress": job.meta.get("progress", 0),
"message": "Cancelled by user - finished current photo",
"cancelled": True,
"processed": job.meta.get("processed", photos_processed),
"total": job.meta.get("total", 0),
"faces_detected": job.meta.get("faces_detected", total_faces_detected),
"faces_stored": job.meta.get("faces_stored", total_faces_stored),
})
job.save_meta()
except Exception:
pass
# Don't re-raise - job cancellation is not a failure
return {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
"cancelled": True,
}
except Exception as e:
# Log error and update job metadata
error_msg = f"Task failed: {str(e)}"
try:
print(f"[Task] ❌ {error_msg}")
except (BrokenPipeError, OSError):
pass # Ignore broken pipe errors when printing
# Try to print traceback, but don't fail if stdout is closed
try:
traceback.print_exc()
except (BrokenPipeError, OSError):
# If printing fails, at least log the error type
try:
print(f"[Task] Error type: {type(e).__name__}: {str(e)}")
except (BrokenPipeError, OSError):
pass
if job:
try:
# Preserve progress information if available
existing_meta = job.meta or {}
job.meta = {
"progress": existing_meta.get("progress", 0),
"message": error_msg,
"processed": existing_meta.get("processed", photos_processed),
"total": existing_meta.get("total", 0),
"faces_detected": existing_meta.get("faces_detected", total_faces_detected),
"faces_stored": existing_meta.get("faces_stored", total_faces_stored),
}
job.save_meta()
except Exception:
pass
# Re-raise so RQ marks job as failed
raise
finally:
db.close()