"""RQ worker tasks for PunimTag.""" from __future__ import annotations from typing import Optional from rq import get_current_job from sqlalchemy.orm import Session from backend.db.session import SessionLocal from backend.services.photo_service import import_photos_from_folder from backend.services.face_service import process_unprocessed_photos def import_photos_task(folder_path: str, recursive: bool = True) -> dict: """RQ task to import photos from a folder. Updates job metadata with progress: - progress: 0-100 - message: status message - processed: number of photos processed - total: total photos found - added: number of new photos added - existing: number of photos that already existed """ job = get_current_job() if not job: raise RuntimeError("Not running in RQ job context") db: Session = SessionLocal() try: def update_progress(processed: int, total: int, current_file: str) -> None: """Update job progress.""" if job: progress = int((processed / total) * 100) if total > 0 else 0 job.meta = { "progress": progress, "message": f"Processing {current_file}... ({processed}/{total})", "processed": processed, "total": total, } job.save_meta() # Import photos added, existing = import_photos_from_folder( db, folder_path, recursive, update_progress ) # Final update total_processed = added + existing result = { "folder_path": folder_path, "recursive": recursive, "added": added, "existing": existing, "total": total_processed, } if job: job.meta = { "progress": 100, "message": f"Completed: {added} new, {existing} existing", "processed": total_processed, "total": total_processed, "added": added, "existing": existing, } job.save_meta() return result finally: db.close() def process_faces_task( batch_size: Optional[int] = None, detector_backend: str = "retinaface", model_name: str = "ArcFace", ) -> dict: """RQ task to process faces in unprocessed photos. Updates job metadata with progress: - progress: 0-100 - message: status message - processed: number of photos processed - total: total photos to process - faces_detected: total faces detected - faces_stored: total faces stored """ import traceback job = get_current_job() if not job: raise RuntimeError("Not running in RQ job context") print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}") # Update progress immediately - job started try: if job: job.meta = { "progress": 0, "message": "Initializing face processing...", "processed": 0, "total": 0, "faces_detected": 0, "faces_stored": 0, } job.save_meta() except Exception as e: print(f"[Task] Error setting initial job metadata: {e}") db: Session = SessionLocal() # Initialize result variables photos_processed = 0 total_faces_detected = 0 total_faces_stored = 0 try: def update_progress( processed: int, total: int, current_file: str, faces_detected: int, faces_stored: int, ) -> None: """Update job progress and check for cancellation.""" if job: # Check if job was cancelled if job.meta and job.meta.get("cancelled", False): return # Don't update if cancelled # Calculate progress: 10% for setup, 90% for processing if total == 0: # Setup phase progress = min(10, processed * 2) # 0-10% during setup else: # Processing phase progress = 10 + int((processed / total) * 90) if total > 0 else 10 job.meta = { "progress": progress, "message": f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file, "processed": processed, "total": total, "faces_detected": faces_detected, "faces_stored": faces_stored, } job.save_meta() # Check for cancellation after updating if job.meta and job.meta.get("cancelled", False): print(f"[Task] Job {job.id} cancellation detected") raise KeyboardInterrupt("Job cancelled by user") # Update progress - finding photos if job: job.meta = { "progress": 5, "message": "Finding photos to process...", "processed": 0, "total": 0, "faces_detected": 0, "faces_stored": 0, } job.save_meta() # Process faces photos_processed, total_faces_detected, total_faces_stored = ( process_unprocessed_photos( db, batch_size=batch_size, detector_backend=detector_backend, model_name=model_name, update_progress=update_progress, ) ) # Final update result = { "photos_processed": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, "detector_backend": detector_backend, "model_name": model_name, } if job: job.meta = { "progress": 100, "message": ( f"Completed: {photos_processed} photos, " f"{total_faces_stored} faces stored" ), "processed": photos_processed, "total": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, } job.save_meta() return result except KeyboardInterrupt as e: # Job was cancelled - exit gracefully print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user") if job: try: job.meta = job.meta or {} job.meta.update({ "progress": job.meta.get("progress", 0), "message": "Cancelled by user - finished current photo", "cancelled": True, "processed": job.meta.get("processed", photos_processed), "total": job.meta.get("total", 0), "faces_detected": job.meta.get("faces_detected", total_faces_detected), "faces_stored": job.meta.get("faces_stored", total_faces_stored), }) job.save_meta() except Exception: pass # Don't re-raise - job cancellation is not a failure return { "photos_processed": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, "detector_backend": detector_backend, "model_name": model_name, "cancelled": True, } except Exception as e: # Log error and update job metadata error_msg = f"Task failed: {str(e)}" try: print(f"[Task] ❌ {error_msg}") except (BrokenPipeError, OSError): pass # Ignore broken pipe errors when printing # Try to print traceback, but don't fail if stdout is closed try: traceback.print_exc() except (BrokenPipeError, OSError): # If printing fails, at least log the error type try: print(f"[Task] Error type: {type(e).__name__}: {str(e)}") except (BrokenPipeError, OSError): pass if job: try: job.meta = { "progress": 0, "message": error_msg, "processed": 0, "total": 0, "faces_detected": 0, "faces_stored": 0, } job.save_meta() except Exception: pass # Re-raise so RQ marks job as failed raise finally: db.close()