"""RQ worker tasks for PunimTag.""" from __future__ import annotations from typing import Optional from rq import get_current_job from sqlalchemy.orm import Session from backend.db.session import SessionLocal from backend.services.photo_service import import_photos_from_folder from backend.services.face_service import process_unprocessed_photos def import_photos_task(folder_path: str, recursive: bool = True) -> dict: """RQ task to import photos from a folder. Updates job metadata with progress: - progress: 0-100 - message: status message - processed: number of photos processed - total: total photos found - added: number of new photos added - existing: number of photos that already existed """ job = get_current_job() if not job: raise RuntimeError("Not running in RQ job context") db: Session = SessionLocal() try: def update_progress(processed: int, total: int, current_file: str) -> None: """Update job progress.""" if job: progress = int((processed / total) * 100) if total > 0 else 0 job.meta = { "progress": progress, "message": f"Processing {current_file}... ({processed}/{total})", "processed": processed, "total": total, } job.save_meta() # Import photos added, existing = import_photos_from_folder( db, folder_path, recursive, update_progress ) # Final update total_processed = added + existing result = { "folder_path": folder_path, "recursive": recursive, "added": added, "existing": existing, "total": total_processed, } if job: job.meta = { "progress": 100, "message": f"Completed: {added} new, {existing} existing", "processed": total_processed, "total": total_processed, "added": added, "existing": existing, } job.save_meta() return result finally: db.close() def process_faces_task( batch_size: Optional[int] = None, detector_backend: str = "retinaface", model_name: str = "ArcFace", ) -> dict: """RQ task to process faces in unprocessed photos. Updates job metadata with progress: - progress: 0-100 - message: status message - processed: number of photos processed - total: total photos to process - faces_detected: total faces detected - faces_stored: total faces stored """ import traceback job = get_current_job() if not job: raise RuntimeError("Not running in RQ job context") print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}") # Update progress immediately - job started try: if job: job.meta = { "progress": 0, "message": "Initializing face processing...", "processed": 0, "total": 0, "faces_detected": 0, "faces_stored": 0, } job.save_meta() except Exception as e: print(f"[Task] Error setting initial job metadata: {e}") db: Session = SessionLocal() # Initialize result variables photos_processed = 0 total_faces_detected = 0 total_faces_stored = 0 def refresh_db_session(): """Refresh database session if it becomes stale or disconnected. This prevents crashes when the database connection is lost during long-running processing tasks. Closes the old session and creates a new one. """ nonlocal db try: # Test if the session is still alive by executing a simple query from sqlalchemy import text db.execute(text("SELECT 1")) db.commit() # Ensure transaction is clean except Exception as e: # Session is stale or disconnected - create a new one try: print(f"[Task] Database session disconnected, refreshing... Error: {e}") except (BrokenPipeError, OSError): pass try: db.close() except Exception: pass db = SessionLocal() try: print(f"[Task] Database session refreshed") except (BrokenPipeError, OSError): pass try: def update_progress( processed: int, total: int, current_file: str, faces_detected: int, faces_stored: int, ) -> None: """Update job progress. NOTE: This function does NOT check for cancellation to avoid interrupting photo processing. Cancellation is checked in process_unprocessed_photos BEFORE starting each photo and AFTER completing it, ensuring the current photo always finishes fully. """ try: if job: # Calculate progress: 10% for setup, 90% for processing if total == 0: # Setup phase progress = min(10, processed * 2) # 0-10% during setup else: # Processing phase progress = 10 + int((processed / total) * 90) if total > 0 else 10 # Avoid duplicating "Processing" if current_file already contains it if current_file.startswith("Processing "): message = f"{current_file} ({processed}/{total})" if total > 0 else current_file else: message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file job.meta = { "progress": progress, "message": message, "processed": processed, "total": total, "faces_detected": faces_detected, "faces_stored": faces_stored, } job.save_meta() except Exception as e: # Log but don't fail the batch if progress update fails try: print(f"[Task] Warning: Failed to update progress: {e}") except (BrokenPipeError, OSError): pass # Update progress - finding photos if job: job.meta = { "progress": 5, "message": "Finding photos to process...", "processed": 0, "total": 0, "faces_detected": 0, "faces_stored": 0, } job.save_meta() # Process faces # Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails try: # Refresh session before starting processing to ensure it's healthy refresh_db_session() photos_processed, total_faces_detected, total_faces_stored = ( process_unprocessed_photos( db, batch_size=batch_size, detector_backend=detector_backend, model_name=model_name, update_progress=update_progress, ) ) except Exception as e: # Check if it's a database connection error error_str = str(e).lower() is_db_error = any(keyword in error_str for keyword in [ 'connection', 'disconnect', 'timeout', 'closed', 'lost', 'operationalerror', 'database', 'server closed', 'connection reset', 'connection pool', 'connection refused', 'session needs refresh' ]) if is_db_error: # Try to refresh the session - this helps if the error is recoverable # but we don't retry the entire batch to avoid reprocessing photos try: print(f"[Task] Database error detected, attempting to refresh session: {e}") refresh_db_session() print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.") except Exception as refresh_error: try: print(f"[Task] Failed to refresh database session: {refresh_error}") except (BrokenPipeError, OSError): pass # If process_unprocessed_photos fails, preserve any progress made # and re-raise so the outer handler can log it properly try: print(f"[Task] Error in process_unprocessed_photos: {e}") except (BrokenPipeError, OSError): pass # Re-raise to be handled by outer exception handler raise # Final update result = { "photos_processed": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, "detector_backend": detector_backend, "model_name": model_name, } if job: job.meta = { "progress": 100, "message": ( f"Completed: {photos_processed} photos, " f"{total_faces_stored} faces stored" ), "processed": photos_processed, "total": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, } job.save_meta() return result except KeyboardInterrupt as e: # Job was cancelled - exit gracefully print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user") if job: try: job.meta = job.meta or {} job.meta.update({ "progress": job.meta.get("progress", 0), "message": "Cancelled by user - finished current photo", "cancelled": True, "processed": job.meta.get("processed", photos_processed), "total": job.meta.get("total", 0), "faces_detected": job.meta.get("faces_detected", total_faces_detected), "faces_stored": job.meta.get("faces_stored", total_faces_stored), }) job.save_meta() except Exception: pass # Don't re-raise - job cancellation is not a failure return { "photos_processed": photos_processed, "faces_detected": total_faces_detected, "faces_stored": total_faces_stored, "detector_backend": detector_backend, "model_name": model_name, "cancelled": True, } except Exception as e: # Log error and update job metadata error_msg = f"Task failed: {str(e)}" try: print(f"[Task] ❌ {error_msg}") except (BrokenPipeError, OSError): pass # Ignore broken pipe errors when printing # Try to print traceback, but don't fail if stdout is closed try: traceback.print_exc() except (BrokenPipeError, OSError): # If printing fails, at least log the error type try: print(f"[Task] Error type: {type(e).__name__}: {str(e)}") except (BrokenPipeError, OSError): pass if job: try: # Preserve progress information if available existing_meta = job.meta or {} job.meta = { "progress": existing_meta.get("progress", 0), "message": error_msg, "processed": existing_meta.get("processed", photos_processed), "total": existing_meta.get("total", 0), "faces_detected": existing_meta.get("faces_detected", total_faces_detected), "faces_stored": existing_meta.get("faces_stored", total_faces_stored), } job.save_meta() except Exception: pass # Re-raise so RQ marks job as failed raise finally: db.close()