Tanya f9fafcbb1a
All checks were successful
CI / skip-ci-check (pull_request) Successful in 1m49s
CI / lint-and-type-check (pull_request) Successful in 2m28s
CI / python-lint (pull_request) Successful in 2m12s
CI / test-backend (pull_request) Successful in 4m5s
CI / build (pull_request) Successful in 4m53s
CI / secret-scanning (pull_request) Successful in 1m56s
CI / dependency-scan (pull_request) Successful in 1m54s
CI / sast-scan (pull_request) Successful in 3m2s
CI / workflow-summary (pull_request) Successful in 1m48s
fix: prevent server crashes during photo processing
- Add database connection health checks every 10 photos
- Add session refresh logic to recover from connection errors
- Improve error handling for database disconnections/timeouts
- Add explicit image cleanup to prevent memory leaks
- Add connection error detection throughout processing pipeline
- Gracefully handle database connection failures instead of crashing

Fixes issue where server would crash during long-running photo processing
tasks when database connections were lost or timed out.
2026-01-21 12:47:37 -05:00

348 lines
13 KiB
Python

"""RQ worker tasks for PunimTag."""
from __future__ import annotations
from typing import Optional
from rq import get_current_job
from sqlalchemy.orm import Session
from backend.db.session import SessionLocal
from backend.services.photo_service import import_photos_from_folder
from backend.services.face_service import process_unprocessed_photos
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
"""RQ task to import photos from a folder.
Updates job metadata with progress:
- progress: 0-100
- message: status message
- processed: number of photos processed
- total: total photos found
- added: number of new photos added
- existing: number of photos that already existed
"""
job = get_current_job()
if not job:
raise RuntimeError("Not running in RQ job context")
db: Session = SessionLocal()
try:
def update_progress(processed: int, total: int, current_file: str) -> None:
"""Update job progress."""
if job:
progress = int((processed / total) * 100) if total > 0 else 0
job.meta = {
"progress": progress,
"message": f"Processing {current_file}... ({processed}/{total})",
"processed": processed,
"total": total,
}
job.save_meta()
# Import photos
added, existing = import_photos_from_folder(
db, folder_path, recursive, update_progress
)
# Final update
total_processed = added + existing
result = {
"folder_path": folder_path,
"recursive": recursive,
"added": added,
"existing": existing,
"total": total_processed,
}
if job:
job.meta = {
"progress": 100,
"message": f"Completed: {added} new, {existing} existing",
"processed": total_processed,
"total": total_processed,
"added": added,
"existing": existing,
}
job.save_meta()
return result
finally:
db.close()
def process_faces_task(
batch_size: Optional[int] = None,
detector_backend: str = "retinaface",
model_name: str = "ArcFace",
) -> dict:
"""RQ task to process faces in unprocessed photos.
Updates job metadata with progress:
- progress: 0-100
- message: status message
- processed: number of photos processed
- total: total photos to process
- faces_detected: total faces detected
- faces_stored: total faces stored
"""
import traceback
job = get_current_job()
if not job:
raise RuntimeError("Not running in RQ job context")
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
# Update progress immediately - job started
try:
if job:
job.meta = {
"progress": 0,
"message": "Initializing face processing...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
except Exception as e:
print(f"[Task] Error setting initial job metadata: {e}")
db: Session = SessionLocal()
# Initialize result variables
photos_processed = 0
total_faces_detected = 0
total_faces_stored = 0
def refresh_db_session():
"""Refresh database session if it becomes stale or disconnected.
This prevents crashes when the database connection is lost during long-running
processing tasks. Closes the old session and creates a new one.
"""
nonlocal db
try:
# Test if the session is still alive by executing a simple query
from sqlalchemy import text
db.execute(text("SELECT 1"))
db.commit() # Ensure transaction is clean
except Exception as e:
# Session is stale or disconnected - create a new one
try:
print(f"[Task] Database session disconnected, refreshing... Error: {e}")
except (BrokenPipeError, OSError):
pass
try:
db.close()
except Exception:
pass
db = SessionLocal()
try:
print(f"[Task] Database session refreshed")
except (BrokenPipeError, OSError):
pass
try:
def update_progress(
processed: int,
total: int,
current_file: str,
faces_detected: int,
faces_stored: int,
) -> None:
"""Update job progress.
NOTE: This function does NOT check for cancellation to avoid interrupting
photo processing. Cancellation is checked in process_unprocessed_photos
BEFORE starting each photo and AFTER completing it, ensuring the current
photo always finishes fully.
"""
try:
if job:
# Calculate progress: 10% for setup, 90% for processing
if total == 0:
# Setup phase
progress = min(10, processed * 2) # 0-10% during setup
else:
# Processing phase
progress = 10 + int((processed / total) * 90) if total > 0 else 10
# Avoid duplicating "Processing" if current_file already contains it
if current_file.startswith("Processing "):
message = f"{current_file} ({processed}/{total})" if total > 0 else current_file
else:
message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file
job.meta = {
"progress": progress,
"message": message,
"processed": processed,
"total": total,
"faces_detected": faces_detected,
"faces_stored": faces_stored,
}
job.save_meta()
except Exception as e:
# Log but don't fail the batch if progress update fails
try:
print(f"[Task] Warning: Failed to update progress: {e}")
except (BrokenPipeError, OSError):
pass
# Update progress - finding photos
if job:
job.meta = {
"progress": 5,
"message": "Finding photos to process...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
# Process faces
# Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails
try:
# Refresh session before starting processing to ensure it's healthy
refresh_db_session()
photos_processed, total_faces_detected, total_faces_stored = (
process_unprocessed_photos(
db,
batch_size=batch_size,
detector_backend=detector_backend,
model_name=model_name,
update_progress=update_progress,
)
)
except Exception as e:
# Check if it's a database connection error
error_str = str(e).lower()
is_db_error = any(keyword in error_str for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'database', 'server closed', 'connection reset',
'connection pool', 'connection refused', 'session needs refresh'
])
if is_db_error:
# Try to refresh the session - this helps if the error is recoverable
# but we don't retry the entire batch to avoid reprocessing photos
try:
print(f"[Task] Database error detected, attempting to refresh session: {e}")
refresh_db_session()
print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.")
except Exception as refresh_error:
try:
print(f"[Task] Failed to refresh database session: {refresh_error}")
except (BrokenPipeError, OSError):
pass
# If process_unprocessed_photos fails, preserve any progress made
# and re-raise so the outer handler can log it properly
try:
print(f"[Task] Error in process_unprocessed_photos: {e}")
except (BrokenPipeError, OSError):
pass
# Re-raise to be handled by outer exception handler
raise
# Final update
result = {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
}
if job:
job.meta = {
"progress": 100,
"message": (
f"Completed: {photos_processed} photos, "
f"{total_faces_stored} faces stored"
),
"processed": photos_processed,
"total": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
}
job.save_meta()
return result
except KeyboardInterrupt as e:
# Job was cancelled - exit gracefully
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
if job:
try:
job.meta = job.meta or {}
job.meta.update({
"progress": job.meta.get("progress", 0),
"message": "Cancelled by user - finished current photo",
"cancelled": True,
"processed": job.meta.get("processed", photos_processed),
"total": job.meta.get("total", 0),
"faces_detected": job.meta.get("faces_detected", total_faces_detected),
"faces_stored": job.meta.get("faces_stored", total_faces_stored),
})
job.save_meta()
except Exception:
pass
# Don't re-raise - job cancellation is not a failure
return {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
"cancelled": True,
}
except Exception as e:
# Log error and update job metadata
error_msg = f"Task failed: {str(e)}"
try:
print(f"[Task] ❌ {error_msg}")
except (BrokenPipeError, OSError):
pass # Ignore broken pipe errors when printing
# Try to print traceback, but don't fail if stdout is closed
try:
traceback.print_exc()
except (BrokenPipeError, OSError):
# If printing fails, at least log the error type
try:
print(f"[Task] Error type: {type(e).__name__}: {str(e)}")
except (BrokenPipeError, OSError):
pass
if job:
try:
# Preserve progress information if available
existing_meta = job.meta or {}
job.meta = {
"progress": existing_meta.get("progress", 0),
"message": error_msg,
"processed": existing_meta.get("processed", photos_processed),
"total": existing_meta.get("total", 0),
"faces_detected": existing_meta.get("faces_detected", total_faces_detected),
"faces_stored": existing_meta.get("faces_stored", total_faces_stored),
}
job.save_meta()
except Exception:
pass
# Re-raise so RQ marks job as failed
raise
finally:
db.close()