All checks were successful
CI / skip-ci-check (pull_request) Successful in 1m49s
CI / lint-and-type-check (pull_request) Successful in 2m28s
CI / python-lint (pull_request) Successful in 2m12s
CI / test-backend (pull_request) Successful in 4m5s
CI / build (pull_request) Successful in 4m53s
CI / secret-scanning (pull_request) Successful in 1m56s
CI / dependency-scan (pull_request) Successful in 1m54s
CI / sast-scan (pull_request) Successful in 3m2s
CI / workflow-summary (pull_request) Successful in 1m48s
- Add database connection health checks every 10 photos - Add session refresh logic to recover from connection errors - Improve error handling for database disconnections/timeouts - Add explicit image cleanup to prevent memory leaks - Add connection error detection throughout processing pipeline - Gracefully handle database connection failures instead of crashing Fixes issue where server would crash during long-running photo processing tasks when database connections were lost or timed out.
348 lines
13 KiB
Python
348 lines
13 KiB
Python
"""RQ worker tasks for PunimTag."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from rq import get_current_job
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.db.session import SessionLocal
|
|
from backend.services.photo_service import import_photos_from_folder
|
|
from backend.services.face_service import process_unprocessed_photos
|
|
|
|
|
|
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
|
|
"""RQ task to import photos from a folder.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos found
|
|
- added: number of new photos added
|
|
- existing: number of photos that already existed
|
|
"""
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
try:
|
|
def update_progress(processed: int, total: int, current_file: str) -> None:
|
|
"""Update job progress."""
|
|
if job:
|
|
progress = int((processed / total) * 100) if total > 0 else 0
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": f"Processing {current_file}... ({processed}/{total})",
|
|
"processed": processed,
|
|
"total": total,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Import photos
|
|
added, existing = import_photos_from_folder(
|
|
db, folder_path, recursive, update_progress
|
|
)
|
|
|
|
# Final update
|
|
total_processed = added + existing
|
|
result = {
|
|
"folder_path": folder_path,
|
|
"recursive": recursive,
|
|
"added": added,
|
|
"existing": existing,
|
|
"total": total_processed,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": f"Completed: {added} new, {existing} existing",
|
|
"processed": total_processed,
|
|
"total": total_processed,
|
|
"added": added,
|
|
"existing": existing,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def process_faces_task(
|
|
batch_size: Optional[int] = None,
|
|
detector_backend: str = "retinaface",
|
|
model_name: str = "ArcFace",
|
|
) -> dict:
|
|
"""RQ task to process faces in unprocessed photos.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos to process
|
|
- faces_detected: total faces detected
|
|
- faces_stored: total faces stored
|
|
"""
|
|
import traceback
|
|
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
|
|
|
|
# Update progress immediately - job started
|
|
try:
|
|
if job:
|
|
job.meta = {
|
|
"progress": 0,
|
|
"message": "Initializing face processing...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
except Exception as e:
|
|
print(f"[Task] Error setting initial job metadata: {e}")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
# Initialize result variables
|
|
photos_processed = 0
|
|
total_faces_detected = 0
|
|
total_faces_stored = 0
|
|
|
|
def refresh_db_session():
|
|
"""Refresh database session if it becomes stale or disconnected.
|
|
|
|
This prevents crashes when the database connection is lost during long-running
|
|
processing tasks. Closes the old session and creates a new one.
|
|
"""
|
|
nonlocal db
|
|
try:
|
|
# Test if the session is still alive by executing a simple query
|
|
from sqlalchemy import text
|
|
db.execute(text("SELECT 1"))
|
|
db.commit() # Ensure transaction is clean
|
|
except Exception as e:
|
|
# Session is stale or disconnected - create a new one
|
|
try:
|
|
print(f"[Task] Database session disconnected, refreshing... Error: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
try:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
db = SessionLocal()
|
|
try:
|
|
print(f"[Task] Database session refreshed")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
try:
|
|
def update_progress(
|
|
processed: int,
|
|
total: int,
|
|
current_file: str,
|
|
faces_detected: int,
|
|
faces_stored: int,
|
|
) -> None:
|
|
"""Update job progress.
|
|
|
|
NOTE: This function does NOT check for cancellation to avoid interrupting
|
|
photo processing. Cancellation is checked in process_unprocessed_photos
|
|
BEFORE starting each photo and AFTER completing it, ensuring the current
|
|
photo always finishes fully.
|
|
"""
|
|
try:
|
|
if job:
|
|
# Calculate progress: 10% for setup, 90% for processing
|
|
if total == 0:
|
|
# Setup phase
|
|
progress = min(10, processed * 2) # 0-10% during setup
|
|
else:
|
|
# Processing phase
|
|
progress = 10 + int((processed / total) * 90) if total > 0 else 10
|
|
|
|
# Avoid duplicating "Processing" if current_file already contains it
|
|
if current_file.startswith("Processing "):
|
|
message = f"{current_file} ({processed}/{total})" if total > 0 else current_file
|
|
else:
|
|
message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file
|
|
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": message,
|
|
"processed": processed,
|
|
"total": total,
|
|
"faces_detected": faces_detected,
|
|
"faces_stored": faces_stored,
|
|
}
|
|
job.save_meta()
|
|
except Exception as e:
|
|
# Log but don't fail the batch if progress update fails
|
|
try:
|
|
print(f"[Task] Warning: Failed to update progress: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
# Update progress - finding photos
|
|
if job:
|
|
job.meta = {
|
|
"progress": 5,
|
|
"message": "Finding photos to process...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Process faces
|
|
# Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails
|
|
try:
|
|
# Refresh session before starting processing to ensure it's healthy
|
|
refresh_db_session()
|
|
|
|
photos_processed, total_faces_detected, total_faces_stored = (
|
|
process_unprocessed_photos(
|
|
db,
|
|
batch_size=batch_size,
|
|
detector_backend=detector_backend,
|
|
model_name=model_name,
|
|
update_progress=update_progress,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
# Check if it's a database connection error
|
|
error_str = str(e).lower()
|
|
is_db_error = any(keyword in error_str for keyword in [
|
|
'connection', 'disconnect', 'timeout', 'closed', 'lost',
|
|
'operationalerror', 'database', 'server closed', 'connection reset',
|
|
'connection pool', 'connection refused', 'session needs refresh'
|
|
])
|
|
|
|
if is_db_error:
|
|
# Try to refresh the session - this helps if the error is recoverable
|
|
# but we don't retry the entire batch to avoid reprocessing photos
|
|
try:
|
|
print(f"[Task] Database error detected, attempting to refresh session: {e}")
|
|
refresh_db_session()
|
|
print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.")
|
|
except Exception as refresh_error:
|
|
try:
|
|
print(f"[Task] Failed to refresh database session: {refresh_error}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
# If process_unprocessed_photos fails, preserve any progress made
|
|
# and re-raise so the outer handler can log it properly
|
|
try:
|
|
print(f"[Task] Error in process_unprocessed_photos: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
# Re-raise to be handled by outer exception handler
|
|
raise
|
|
|
|
# Final update
|
|
result = {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": (
|
|
f"Completed: {photos_processed} photos, "
|
|
f"{total_faces_stored} faces stored"
|
|
),
|
|
"processed": photos_processed,
|
|
"total": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
except KeyboardInterrupt as e:
|
|
# Job was cancelled - exit gracefully
|
|
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
|
|
if job:
|
|
try:
|
|
job.meta = job.meta or {}
|
|
job.meta.update({
|
|
"progress": job.meta.get("progress", 0),
|
|
"message": "Cancelled by user - finished current photo",
|
|
"cancelled": True,
|
|
"processed": job.meta.get("processed", photos_processed),
|
|
"total": job.meta.get("total", 0),
|
|
"faces_detected": job.meta.get("faces_detected", total_faces_detected),
|
|
"faces_stored": job.meta.get("faces_stored", total_faces_stored),
|
|
})
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
# Don't re-raise - job cancellation is not a failure
|
|
return {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
"cancelled": True,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Log error and update job metadata
|
|
error_msg = f"Task failed: {str(e)}"
|
|
try:
|
|
print(f"[Task] ❌ {error_msg}")
|
|
except (BrokenPipeError, OSError):
|
|
pass # Ignore broken pipe errors when printing
|
|
|
|
# Try to print traceback, but don't fail if stdout is closed
|
|
try:
|
|
traceback.print_exc()
|
|
except (BrokenPipeError, OSError):
|
|
# If printing fails, at least log the error type
|
|
try:
|
|
print(f"[Task] Error type: {type(e).__name__}: {str(e)}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
if job:
|
|
try:
|
|
# Preserve progress information if available
|
|
existing_meta = job.meta or {}
|
|
job.meta = {
|
|
"progress": existing_meta.get("progress", 0),
|
|
"message": error_msg,
|
|
"processed": existing_meta.get("processed", photos_processed),
|
|
"total": existing_meta.get("total", 0),
|
|
"faces_detected": existing_meta.get("faces_detected", total_faces_detected),
|
|
"faces_stored": existing_meta.get("faces_stored", total_faces_stored),
|
|
}
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
|
|
# Re-raise so RQ marks job as failed
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|