From f9fafcbb1ae88d3597760628150ed124b4f90925 Mon Sep 17 00:00:00 2001 From: Tanya Date: Wed, 21 Jan 2026 11:52:07 -0500 Subject: [PATCH] fix: prevent server crashes during photo processing - Add database connection health checks every 10 photos - Add session refresh logic to recover from connection errors - Improve error handling for database disconnections/timeouts - Add explicit image cleanup to prevent memory leaks - Add connection error detection throughout processing pipeline - Gracefully handle database connection failures instead of crashing Fixes issue where server would crash during long-running photo processing tasks when database connections were lost or timed out. --- backend/services/face_service.py | 81 ++++++++++++++++++++++++++++++-- backend/services/tasks.py | 52 ++++++++++++++++++++ 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/backend/services/face_service.py b/backend/services/face_service.py index 706d348..761fb7b 100644 --- a/backend/services/face_service.py +++ b/backend/services/face_service.py @@ -477,9 +477,14 @@ def process_photo_faces( return 0, 0 # Load image for quality calculation + # Use context manager to ensure image is closed properly to free memory image = Image.open(photo_path) - image_np = np.array(image) - image_width, image_height = image.size + try: + image_np = np.array(image) + image_width, image_height = image.size + finally: + # Explicitly close image to free memory immediately + image.close() # Count total faces from DeepFace faces_detected = len(results) @@ -736,8 +741,19 @@ def process_photo_faces( # If commit fails, rollback and log the error db.rollback() error_msg = str(commit_error) + error_str_lower = error_msg.lower() + + # Check if it's a connection/disconnection error + is_connection_error = any(keyword in error_str_lower for keyword in [ + 'connection', 'disconnect', 'timeout', 'closed', 'lost', + 'operationalerror', 'server closed', 'connection reset', + 'connection pool', 'connection refused' + ]) + try: _print_with_stderr(f"[FaceService] Failed to commit {faces_stored} faces for {photo.filename}: {error_msg}") + if is_connection_error: + _print_with_stderr(f"[FaceService] ⚠️ Database connection error detected - session may need refresh") import traceback traceback.print_exc() except (BrokenPipeError, OSError): @@ -747,8 +763,7 @@ def process_photo_faces( # This ensures the return value accurately reflects what was actually saved faces_stored = 0 - # Re-raise to be caught by outer exception handler in process_unprocessed_photos - # This allows the batch to continue processing other photos + # Re-raise with connection error flag so caller can refresh session raise Exception(f"Database commit failed for {photo.filename}: {error_msg}") # Mark photo as processed after handling faces (desktop parity) @@ -756,7 +771,18 @@ def process_photo_faces( photo.processed = True db.add(photo) db.commit() - except Exception: + except Exception as mark_error: + # Log connection errors for debugging + error_str = str(mark_error).lower() + is_connection_error = any(keyword in error_str for keyword in [ + 'connection', 'disconnect', 'timeout', 'closed', 'lost', + 'operationalerror', 'server closed', 'connection reset' + ]) + if is_connection_error: + try: + _print_with_stderr(f"[FaceService] ⚠️ Database connection error while marking photo as processed: {mark_error}") + except (BrokenPipeError, OSError): + pass db.rollback() # Log summary @@ -1259,6 +1285,26 @@ def process_unprocessed_photos( update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0) for idx, photo in enumerate(unprocessed_photos, 1): + # Periodic database health check every 10 photos to catch connection issues early + if idx > 1 and idx % 10 == 0: + try: + from sqlalchemy import text + db.execute(text("SELECT 1")) + db.commit() + except Exception as health_check_error: + # Database connection is stale - this will be caught and handled below + error_str = str(health_check_error).lower() + is_connection_error = any(keyword in error_str for keyword in [ + 'connection', 'disconnect', 'timeout', 'closed', 'lost', + 'operationalerror', 'server closed', 'connection reset' + ]) + if is_connection_error: + try: + print(f"[FaceService] ⚠️ Database health check failed at photo {idx}/{total}: {health_check_error}") + print(f"[FaceService] Session may need refresh - will be handled by error handler") + except (BrokenPipeError, OSError): + pass + # Check for cancellation BEFORE starting each photo # This is the primary cancellation point - we stop before starting a new photo if check_cancelled(): @@ -1385,6 +1431,14 @@ def process_unprocessed_photos( except (BrokenPipeError, OSError): pass + # Check if it's a database connection error + error_str = str(e).lower() + is_db_connection_error = any(keyword in error_str for keyword in [ + 'connection', 'disconnect', 'timeout', 'closed', 'lost', + 'operationalerror', 'database', 'server closed', 'connection reset', + 'connection pool', 'connection refused' + ]) + # Refresh database session after error to ensure it's in a good state # This prevents session state issues from affecting subsequent photos # Note: process_photo_faces already does db.rollback(), but we ensure @@ -1394,6 +1448,23 @@ def process_unprocessed_photos( db.rollback() # Expire the current photo object to clear any stale state db.expire(photo) + + # If it's a connection error, try to refresh the session + if is_db_connection_error: + try: + # Test if session is still alive + from sqlalchemy import text + db.execute(text("SELECT 1")) + db.commit() + except Exception: + # Session is dead - need to get a new one from the caller + # We can't create a new SessionLocal here, so we'll raise a special exception + try: + print(f"[FaceService] ⚠️ Database session is dead after connection error - caller should refresh session") + except (BrokenPipeError, OSError): + pass + # Re-raise with a flag that indicates session needs refresh + raise Exception(f"Database connection lost - session needs refresh: {str(e)}") except Exception as session_error: # If session refresh fails, log but don't fail the batch try: diff --git a/backend/services/tasks.py b/backend/services/tasks.py index 1776692..d376e20 100644 --- a/backend/services/tasks.py +++ b/backend/services/tasks.py @@ -119,6 +119,34 @@ def process_faces_task( total_faces_detected = 0 total_faces_stored = 0 + def refresh_db_session(): + """Refresh database session if it becomes stale or disconnected. + + This prevents crashes when the database connection is lost during long-running + processing tasks. Closes the old session and creates a new one. + """ + nonlocal db + try: + # Test if the session is still alive by executing a simple query + from sqlalchemy import text + db.execute(text("SELECT 1")) + db.commit() # Ensure transaction is clean + except Exception as e: + # Session is stale or disconnected - create a new one + try: + print(f"[Task] Database session disconnected, refreshing... Error: {e}") + except (BrokenPipeError, OSError): + pass + try: + db.close() + except Exception: + pass + db = SessionLocal() + try: + print(f"[Task] Database session refreshed") + except (BrokenPipeError, OSError): + pass + try: def update_progress( processed: int, @@ -181,6 +209,9 @@ def process_faces_task( # Process faces # Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails try: + # Refresh session before starting processing to ensure it's healthy + refresh_db_session() + photos_processed, total_faces_detected, total_faces_stored = ( process_unprocessed_photos( db, @@ -191,6 +222,27 @@ def process_faces_task( ) ) except Exception as e: + # Check if it's a database connection error + error_str = str(e).lower() + is_db_error = any(keyword in error_str for keyword in [ + 'connection', 'disconnect', 'timeout', 'closed', 'lost', + 'operationalerror', 'database', 'server closed', 'connection reset', + 'connection pool', 'connection refused', 'session needs refresh' + ]) + + if is_db_error: + # Try to refresh the session - this helps if the error is recoverable + # but we don't retry the entire batch to avoid reprocessing photos + try: + print(f"[Task] Database error detected, attempting to refresh session: {e}") + refresh_db_session() + print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.") + except Exception as refresh_error: + try: + print(f"[Task] Failed to refresh database session: {refresh_error}") + except (BrokenPipeError, OSError): + pass + # If process_unprocessed_photos fails, preserve any progress made # and re-raise so the outer handler can log it properly try: