fix: prevent server crashes during photo processing
Some checks failed
CI / skip-ci-check (pull_request) Successful in 1m48s
CI / lint-and-type-check (pull_request) Failing after 2m26s
CI / python-lint (pull_request) Failing after 2m12s
CI / test-backend (pull_request) Successful in 4m2s
CI / build (pull_request) Successful in 4m53s
CI / secret-scanning (pull_request) Successful in 1m56s
CI / dependency-scan (pull_request) Successful in 1m54s
CI / sast-scan (pull_request) Successful in 3m6s
CI / workflow-summary (pull_request) Failing after 1m47s

- Add database connection health checks every 10 photos
- Add session refresh logic to recover from connection errors
- Improve error handling for database disconnections/timeouts
- Add explicit image cleanup to prevent memory leaks
- Add connection error detection throughout processing pipeline
- Gracefully handle database connection failures instead of crashing

Fixes issue where server would crash during long-running photo processing
tasks when database connections were lost or timed out.
This commit is contained in:
Tanya 2026-01-21 11:52:07 -05:00
parent 51081c1b5d
commit 0d37fe07ca
2 changed files with 128 additions and 5 deletions

View File

@ -477,9 +477,14 @@ def process_photo_faces(
return 0, 0
# Load image for quality calculation
# Use context manager to ensure image is closed properly to free memory
image = Image.open(photo_path)
image_np = np.array(image)
image_width, image_height = image.size
try:
image_np = np.array(image)
image_width, image_height = image.size
finally:
# Explicitly close image to free memory immediately
image.close()
# Count total faces from DeepFace
faces_detected = len(results)
@ -736,8 +741,19 @@ def process_photo_faces(
# If commit fails, rollback and log the error
db.rollback()
error_msg = str(commit_error)
error_str_lower = error_msg.lower()
# Check if it's a connection/disconnection error
is_connection_error = any(keyword in error_str_lower for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'server closed', 'connection reset',
'connection pool', 'connection refused'
])
try:
_print_with_stderr(f"[FaceService] Failed to commit {faces_stored} faces for {photo.filename}: {error_msg}")
if is_connection_error:
_print_with_stderr(f"[FaceService] ⚠️ Database connection error detected - session may need refresh")
import traceback
traceback.print_exc()
except (BrokenPipeError, OSError):
@ -747,8 +763,7 @@ def process_photo_faces(
# This ensures the return value accurately reflects what was actually saved
faces_stored = 0
# Re-raise to be caught by outer exception handler in process_unprocessed_photos
# This allows the batch to continue processing other photos
# Re-raise with connection error flag so caller can refresh session
raise Exception(f"Database commit failed for {photo.filename}: {error_msg}")
# Mark photo as processed after handling faces (desktop parity)
@ -756,7 +771,18 @@ def process_photo_faces(
photo.processed = True
db.add(photo)
db.commit()
except Exception:
except Exception as mark_error:
# Log connection errors for debugging
error_str = str(mark_error).lower()
is_connection_error = any(keyword in error_str for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'server closed', 'connection reset'
])
if is_connection_error:
try:
_print_with_stderr(f"[FaceService] ⚠️ Database connection error while marking photo as processed: {mark_error}")
except (BrokenPipeError, OSError):
pass
db.rollback()
# Log summary
@ -1259,6 +1285,26 @@ def process_unprocessed_photos(
update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0)
for idx, photo in enumerate(unprocessed_photos, 1):
# Periodic database health check every 10 photos to catch connection issues early
if idx > 1 and idx % 10 == 0:
try:
from sqlalchemy import text
db.execute(text("SELECT 1"))
db.commit()
except Exception as health_check_error:
# Database connection is stale - this will be caught and handled below
error_str = str(health_check_error).lower()
is_connection_error = any(keyword in error_str for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'server closed', 'connection reset'
])
if is_connection_error:
try:
print(f"[FaceService] ⚠️ Database health check failed at photo {idx}/{total}: {health_check_error}")
print(f"[FaceService] Session may need refresh - will be handled by error handler")
except (BrokenPipeError, OSError):
pass
# Check for cancellation BEFORE starting each photo
# This is the primary cancellation point - we stop before starting a new photo
if check_cancelled():
@ -1385,6 +1431,14 @@ def process_unprocessed_photos(
except (BrokenPipeError, OSError):
pass
# Check if it's a database connection error
error_str = str(e).lower()
is_db_connection_error = any(keyword in error_str for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'database', 'server closed', 'connection reset',
'connection pool', 'connection refused'
])
# Refresh database session after error to ensure it's in a good state
# This prevents session state issues from affecting subsequent photos
# Note: process_photo_faces already does db.rollback(), but we ensure
@ -1394,6 +1448,23 @@ def process_unprocessed_photos(
db.rollback()
# Expire the current photo object to clear any stale state
db.expire(photo)
# If it's a connection error, try to refresh the session
if is_db_connection_error:
try:
# Test if session is still alive
from sqlalchemy import text
db.execute(text("SELECT 1"))
db.commit()
except Exception:
# Session is dead - need to get a new one from the caller
# We can't create a new SessionLocal here, so we'll raise a special exception
try:
print(f"[FaceService] ⚠️ Database session is dead after connection error - caller should refresh session")
except (BrokenPipeError, OSError):
pass
# Re-raise with a flag that indicates session needs refresh
raise Exception(f"Database connection lost - session needs refresh: {str(e)}")
except Exception as session_error:
# If session refresh fails, log but don't fail the batch
try:

View File

@ -119,6 +119,34 @@ def process_faces_task(
total_faces_detected = 0
total_faces_stored = 0
def refresh_db_session():
"""Refresh database session if it becomes stale or disconnected.
This prevents crashes when the database connection is lost during long-running
processing tasks. Closes the old session and creates a new one.
"""
nonlocal db
try:
# Test if the session is still alive by executing a simple query
from sqlalchemy import text
db.execute(text("SELECT 1"))
db.commit() # Ensure transaction is clean
except Exception as e:
# Session is stale or disconnected - create a new one
try:
print(f"[Task] Database session disconnected, refreshing... Error: {e}")
except (BrokenPipeError, OSError):
pass
try:
db.close()
except Exception:
pass
db = SessionLocal()
try:
print(f"[Task] Database session refreshed")
except (BrokenPipeError, OSError):
pass
try:
def update_progress(
processed: int,
@ -181,6 +209,9 @@ def process_faces_task(
# Process faces
# Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails
try:
# Refresh session before starting processing to ensure it's healthy
refresh_db_session()
photos_processed, total_faces_detected, total_faces_stored = (
process_unprocessed_photos(
db,
@ -191,6 +222,27 @@ def process_faces_task(
)
)
except Exception as e:
# Check if it's a database connection error
error_str = str(e).lower()
is_db_error = any(keyword in error_str for keyword in [
'connection', 'disconnect', 'timeout', 'closed', 'lost',
'operationalerror', 'database', 'server closed', 'connection reset',
'connection pool', 'connection refused', 'session needs refresh'
])
if is_db_error:
# Try to refresh the session - this helps if the error is recoverable
# but we don't retry the entire batch to avoid reprocessing photos
try:
print(f"[Task] Database error detected, attempting to refresh session: {e}")
refresh_db_session()
print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.")
except Exception as refresh_error:
try:
print(f"[Task] Failed to refresh database session: {refresh_error}")
except (BrokenPipeError, OSError):
pass
# If process_unprocessed_photos fails, preserve any progress made
# and re-raise so the outer handler can log it properly
try: