diff --git a/admin-frontend/src/api/jobs.ts b/admin-frontend/src/api/jobs.ts index 86366bd..1a9d9c6 100644 --- a/admin-frontend/src/api/jobs.ts +++ b/admin-frontend/src/api/jobs.ts @@ -6,6 +6,7 @@ export enum JobStatus { PROGRESS = 'progress', SUCCESS = 'success', FAILURE = 'failure', + CANCELLED = 'cancelled', } export interface JobResponse { diff --git a/admin-frontend/src/pages/Process.tsx b/admin-frontend/src/pages/Process.tsx index 7cb70ce..402593a 100644 --- a/admin-frontend/src/pages/Process.tsx +++ b/admin-frontend/src/pages/Process.tsx @@ -25,11 +25,6 @@ export default function Process() { const [isProcessing, setIsProcessing] = useState(false) const [currentJob, setCurrentJob] = useState(null) const [jobProgress, setJobProgress] = useState(null) - const [processingResult, setProcessingResult] = useState<{ - photos_processed?: number - faces_detected?: number - faces_stored?: number - } | null>(null) const [error, setError] = useState(null) const eventSourceRef = useRef(null) @@ -45,7 +40,6 @@ export default function Process() { const handleStartProcessing = async () => { setIsProcessing(true) setError(null) - setProcessingResult(null) setCurrentJob(null) setJobProgress(null) @@ -79,21 +73,38 @@ export default function Process() { } const handleStopProcessing = async () => { - if (!currentJob) { + // Use jobProgress if currentJob is not available (might happen if status is still Pending) + const jobId = currentJob?.id || jobProgress?.id + + if (!jobId) { + console.error('Cannot stop: No job ID available') + setError('Cannot stop: No active job found') return } + console.log(`[Process] STOP button clicked for job ${jobId}`) + try { // Call API to cancel the job - const result = await jobsApi.cancelJob(currentJob.id) - console.log('Job cancellation requested:', result) + console.log(`[Process] Calling cancelJob API for job ${jobId}`) + const result = await jobsApi.cancelJob(jobId) + console.log('[Process] Job cancellation requested:', result) // Update job status to show cancellation is in progress - setCurrentJob({ - ...currentJob, - status: JobStatus.PROGRESS, - message: 'Cancellation requested - finishing current photo...', - }) + if (currentJob) { + setCurrentJob({ + ...currentJob, + status: JobStatus.PROGRESS, + message: 'Cancellation requested - finishing current photo...', + }) + } else if (jobProgress) { + // If currentJob is not set, update jobProgress + setJobProgress({ + ...jobProgress, + status: 'progress', + message: 'Cancellation requested - finishing current photo...', + }) + } // Don't close SSE stream yet - keep it open to wait for job to actually stop // The job will finish the current photo, then stop and send a final status update @@ -103,8 +114,14 @@ export default function Process() { // This will be checked in the SSE handler setError(null) // Clear any previous errors } catch (err: any) { - console.error('Error cancelling job:', err) - setError(err.response?.data?.detail || err.message || 'Failed to cancel job') + console.error('[Process] Error cancelling job:', err) + const errorMessage = err.response?.data?.detail || err.message || 'Failed to cancel job' + setError(errorMessage) + console.error('[Process] Full error details:', { + message: err.message, + response: err.response?.data, + status: err.response?.status, + }) } } @@ -129,6 +146,7 @@ export default function Process() { progress: JobStatus.PROGRESS, success: JobStatus.SUCCESS, failure: JobStatus.FAILURE, + cancelled: JobStatus.CANCELLED, } const jobStatus = statusMap[data.status] || JobStatus.PENDING @@ -148,18 +166,29 @@ export default function Process() { } // Check if job is complete - if (jobStatus === JobStatus.SUCCESS || jobStatus === JobStatus.FAILURE) { + if (jobStatus === JobStatus.SUCCESS || jobStatus === JobStatus.FAILURE || jobStatus === JobStatus.CANCELLED) { setIsProcessing(false) eventSource.close() eventSourceRef.current = null - // Show cancellation message if job was cancelled - if (data.message && (data.message.includes('Cancelled') || data.message.includes('cancelled'))) { - setError(`Job cancelled: ${data.message}`) + // Handle cancelled jobs + if (jobStatus === JobStatus.CANCELLED) { + const progressInfo = data.processed !== undefined && data.total !== undefined + ? ` (processed ${data.processed} of ${data.total} photos)` + : '' + setError(`Processing stopped: ${data.message || 'Cancelled by user'}${progressInfo}`) + } + // Show error message for failures + else if (jobStatus === JobStatus.FAILURE) { + // Show failure message with progress info if available + const progressInfo = data.processed !== undefined && data.total !== undefined + ? ` (processed ${data.processed} of ${data.total} photos)` + : '' + setError(`Processing failed: ${data.message || 'Unknown error'}${progressInfo}`) } - // Fetch final job result to get processing stats - if (jobStatus === JobStatus.SUCCESS) { + // Fetch final job result to get processing stats for successful or cancelled jobs + if (jobStatus === JobStatus.SUCCESS || jobStatus === JobStatus.CANCELLED) { fetchJobResult(jobId) } } @@ -191,15 +220,6 @@ export default function Process() { try { const job = await jobsApi.getJob(jobId) setCurrentJob(job) - - // Extract result data from job progress - if (jobProgress) { - setProcessingResult({ - photos_processed: jobProgress.processed, - faces_detected: jobProgress.faces_detected, - faces_stored: jobProgress.faces_stored, - }) - } } catch (err) { console.error('Error fetching job result:', err) } @@ -347,8 +367,16 @@ export default function Process() { {isProcessing && ( @@ -375,8 +403,11 @@ export default function Process() { > {currentJob.status === JobStatus.SUCCESS && '✓ '} {currentJob.status === JobStatus.FAILURE && '✗ '} - {currentJob.status.charAt(0).toUpperCase() + - currentJob.status.slice(1)} + {currentJob.status === JobStatus.CANCELLED && '⏹ '} + {currentJob.status === JobStatus.CANCELLED + ? 'Stopped' + : currentJob.status.charAt(0).toUpperCase() + + currentJob.status.slice(1)} {currentJob.progress}% @@ -415,32 +446,6 @@ export default function Process() { )} - {/* Results Section */} - {processingResult && ( -
-

- Processing Results -

- -
- {processingResult.photos_processed !== undefined && ( -

- ✓ {processingResult.photos_processed} photos processed -

- )} - {processingResult.faces_detected !== undefined && ( -

- {processingResult.faces_detected} faces detected -

- )} - {processingResult.faces_stored !== undefined && ( -

- {processingResult.faces_stored} faces stored in database -

- )} -
-
- )} {/* Error Section */} {error && ( diff --git a/backend/api/jobs.py b/backend/api/jobs.py index c4a9512..1ee0b2a 100644 --- a/backend/api/jobs.py +++ b/backend/api/jobs.py @@ -54,7 +54,14 @@ def get_job(job_id: str) -> JobResponse: # Check if job was cancelled if job.meta and job.meta.get("cancelled", False): - job_status = JobStatus.FAILURE + # If job finished gracefully after cancellation, mark as CANCELLED + if rq_status == "finished": + job_status = JobStatus.CANCELLED + # If still running, show current status but with cancellation message + elif rq_status == "started": + job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED + else: + job_status = JobStatus.CANCELLED message = job.meta.get("message", "Cancelled by user") # If job failed, include error message @@ -93,19 +100,31 @@ def stream_job_progress(job_id: str): while True: try: job = Job.fetch(job_id, connection=redis_conn) + rq_status = job.get_status() status_map = { "queued": JobStatus.PENDING, "started": JobStatus.STARTED, "finished": JobStatus.SUCCESS, "failed": JobStatus.FAILURE, } - job_status = status_map.get(job.get_status(), JobStatus.PENDING) + job_status = status_map.get(rq_status, JobStatus.PENDING) - # Check if job was cancelled first + # Check if job was cancelled - this takes priority if job.meta and job.meta.get("cancelled", False): - job_status = JobStatus.FAILURE + # If job is finished and was cancelled, it completed gracefully + if rq_status == "finished": + job_status = JobStatus.CANCELLED + progress = job.meta.get("progress", 0) if job.meta else 0 + # If job is still running but cancellation was requested, keep it as PROGRESS/STARTED + # until it actually stops + elif rq_status == "started": + # Job is still running - let it finish current photo + progress = job.meta.get("progress", 0) if job.meta else 0 + job_status = JobStatus.PROGRESS if progress > 0 else JobStatus.STARTED + else: + job_status = JobStatus.CANCELLED + progress = job.meta.get("progress", 0) if job.meta else 0 message = job.meta.get("message", "Cancelled by user") - progress = job.meta.get("progress", 0) if job.meta else 0 else: progress = 0 if job_status == JobStatus.STARTED: @@ -140,8 +159,8 @@ def stream_job_progress(job_id: str): last_progress = progress last_message = message - # Stop streaming if job is complete or failed - if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE): + # Stop streaming if job is complete, failed, or cancelled + if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE, JobStatus.CANCELLED): break time.sleep(0.5) # Poll every 500ms @@ -164,8 +183,10 @@ def cancel_job(job_id: str) -> dict: The job will check this flag and exit gracefully. """ try: + print(f"[Jobs API] Cancel request for job_id={job_id}") job = Job.fetch(job_id, connection=redis_conn) rq_status = job.get_status() + print(f"[Jobs API] Job {job_id} current status: {rq_status}") if rq_status == "finished": return { @@ -182,6 +203,7 @@ def cancel_job(job_id: str) -> dict: if rq_status == "queued": # Cancel queued job - remove from queue job.cancel() + print(f"[Jobs API] ✓ Cancelled queued job {job_id}") return { "message": f"Job {job_id} cancelled (was queued)", "status": "cancelled", @@ -190,30 +212,50 @@ def cancel_job(job_id: str) -> dict: if rq_status == "started": # For running jobs, set cancellation flag in metadata # The task will check this and exit gracefully + print(f"[Jobs API] Setting cancellation flag for running job {job_id}") if job.meta is None: job.meta = {} job.meta["cancelled"] = True job.meta["message"] = "Cancellation requested..." + # CRITICAL: Save metadata immediately and verify it was saved job.save_meta() + print(f"[Jobs API] Saved metadata for job {job_id}") + + # Verify the flag was saved by fetching fresh + try: + fresh_job = Job.fetch(job_id, connection=redis_conn) + if not fresh_job.meta or not fresh_job.meta.get("cancelled", False): + print(f"[Jobs API] ❌ WARNING: Cancellation flag NOT found after save for job {job_id}") + print(f"[Jobs API] Fresh job meta: {fresh_job.meta}") + else: + print(f"[Jobs API] ✓ Verified: Cancellation flag is set for job {job_id}") + except Exception as verify_error: + print(f"[Jobs API] ⚠️ Could not verify cancellation flag: {verify_error}") # Also try to cancel the job (which will interrupt it if possible) + # This sends a signal to the worker process try: job.cancel() - except Exception: - # Job might already be running, that's OK - pass + print(f"[Jobs API] ✓ RQ cancel() called for job {job_id}") + except Exception as cancel_error: + # Job might already be running, that's OK - metadata flag will be checked + print(f"[Jobs API] Note: RQ cancel() raised exception (may be expected): {cancel_error}") return { "message": f"Job {job_id} cancellation requested", "status": "cancelling", } + print(f"[Jobs API] Job {job_id} in unexpected status: {rq_status}") return { "message": f"Job {job_id} status: {rq_status}", "status": rq_status, } except Exception as e: + print(f"[Jobs API] ❌ Error cancelling job {job_id}: {e}") + import traceback + traceback.print_exc() raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found: {str(e)}", diff --git a/backend/db/models.py b/backend/db/models.py index 9376e00..45fe127 100644 --- a/backend/db/models.py +++ b/backend/db/models.py @@ -18,6 +18,7 @@ from sqlalchemy import ( Text, UniqueConstraint, CheckConstraint, + TypeDecorator, ) from sqlalchemy.orm import declarative_base, relationship @@ -29,6 +30,46 @@ if TYPE_CHECKING: Base = declarative_base() +class PrismaCompatibleDateTime(TypeDecorator): + """ + DateTime type that stores in a format compatible with Prisma's SQLite driver. + + Prisma's SQLite driver has issues with microseconds in datetime strings. + This type ensures datetimes are stored in ISO format without microseconds: + 'YYYY-MM-DD HH:MM:SS' instead of 'YYYY-MM-DD HH:MM:SS.ffffff' + """ + impl = DateTime + cache_ok = True + + def process_bind_param(self, value, dialect): + """Convert Python datetime to SQL string format.""" + if value is None: + return None + if isinstance(value, datetime): + # Strip microseconds and format as ISO string without microseconds + # This ensures Prisma can read it correctly + value = value.replace(microsecond=0) + return value.strftime('%Y-%m-%d %H:%M:%S') + return value + + def process_result_value(self, value, dialect): + """Convert SQL string back to Python datetime.""" + if value is None: + return None + if isinstance(value, str): + # Parse ISO format string + try: + # Try parsing with microseconds first (for existing data) + if '.' in value: + return datetime.strptime(value.split('.')[0], '%Y-%m-%d %H:%M:%S') + else: + return datetime.strptime(value, '%Y-%m-%d %H:%M:%S') + except ValueError: + # Fallback to ISO format parser + return datetime.fromisoformat(value.replace('Z', '+00:00')) + return value + + class Photo(Base): """Photo model - matches desktop schema exactly.""" @@ -37,7 +78,7 @@ class Photo(Base): id = Column(Integer, primary_key=True, autoincrement=True, index=True) path = Column(Text, unique=True, nullable=False, index=True) filename = Column(Text, nullable=False) - date_added = Column(DateTime, default=datetime.utcnow, nullable=False) + date_added = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) date_taken = Column(Date, nullable=True, index=True) processed = Column(Boolean, default=False, nullable=False, index=True) file_hash = Column(Text, nullable=True, index=True) # Nullable to support existing photos without hashes @@ -71,7 +112,7 @@ class Person(Base): middle_name = Column(Text, nullable=True) maiden_name = Column(Text, nullable=True) date_of_birth = Column(Date, nullable=True) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) faces = relationship("Face", back_populates="person") person_encodings = relationship( @@ -142,7 +183,7 @@ class PersonEncoding(Base): quality_score = Column(Numeric, default=0.0, nullable=False, index=True) detector_backend = Column(Text, default="retinaface", nullable=False) model_name = Column(Text, default="ArcFace", nullable=False) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) person = relationship("Person", back_populates="person_encodings") face = relationship("Face", back_populates="person_encodings") @@ -160,7 +201,7 @@ class Tag(Base): id = Column(Integer, primary_key=True, autoincrement=True, index=True) tag_name = Column(Text, unique=True, nullable=False, index=True) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) photo_tags = relationship( "PhotoTagLinkage", back_populates="tag", cascade="all, delete-orphan" @@ -179,7 +220,7 @@ class PhotoTagLinkage(Base): Integer, default=0, nullable=False, server_default="0" ) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) photo = relationship("Photo", back_populates="photo_tags") tag = relationship("Tag", back_populates="photo_tags") @@ -200,7 +241,7 @@ class PhotoFavorite(Base): id = Column(Integer, primary_key=True, autoincrement=True) username = Column(Text, nullable=False, index=True) photo_id = Column(Integer, ForeignKey("photos.id"), nullable=False, index=True) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) photo = relationship("Photo", back_populates="favorites") @@ -231,8 +272,8 @@ class User(Base): index=True, ) password_change_required = Column(Boolean, default=True, nullable=False, index=True) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) - last_login = Column(DateTime, nullable=True) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) + last_login = Column(PrismaCompatibleDateTime, nullable=True) __table_args__ = ( Index("idx_users_username", "username"), @@ -256,7 +297,7 @@ class PhotoPersonLinkage(Base): photo_id = Column(Integer, ForeignKey("photos.id"), nullable=False, index=True) person_id = Column(Integer, ForeignKey("people.id"), nullable=False, index=True) identified_by_user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True) - created_date = Column(DateTime, default=datetime.utcnow, nullable=False) + created_date = Column(PrismaCompatibleDateTime, default=datetime.utcnow, nullable=False) photo = relationship("Photo", back_populates="video_people") person = relationship("Person", back_populates="video_photos") diff --git a/backend/schemas/jobs.py b/backend/schemas/jobs.py index 3a24e1d..c4623bf 100644 --- a/backend/schemas/jobs.py +++ b/backend/schemas/jobs.py @@ -16,6 +16,7 @@ class JobStatus(str, Enum): PROGRESS = "progress" SUCCESS = "success" FAILURE = "failure" + CANCELLED = "cancelled" class JobResponse(BaseModel): diff --git a/backend/scripts/fix_datetime_format.py b/backend/scripts/fix_datetime_format.py new file mode 100755 index 0000000..e0c26a6 --- /dev/null +++ b/backend/scripts/fix_datetime_format.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +""" +Fix DateTime format in SQLite database to be compatible with Prisma. + +This script updates all DateTime columns to remove microseconds, +ensuring Prisma's SQLite driver can read them correctly. +""" + +import sqlite3 +from pathlib import Path +from datetime import datetime +import sys + +def fix_datetime_in_db(db_path: str) -> None: + """Fix datetime formats in SQLite database.""" + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # Tables and their datetime columns + datetime_columns = { + 'people': ['created_date', 'date_of_birth'], + 'photos': ['date_added', 'date_taken'], + 'faces': ['created_date'], + 'tags': ['created_date'], + 'phototaglinkage': ['created_date'], + 'person_encodings': ['created_date'], + 'photo_favorites': ['created_date'], + 'users': ['created_date', 'last_login'], + 'photo_person_linkage': ['created_date'], + } + + fixed_count = 0 + + for table, columns in datetime_columns.items(): + # Check if table exists + cursor.execute(""" + SELECT name FROM sqlite_master + WHERE type='table' AND name=? + """, (table,)) + if not cursor.fetchone(): + print(f"⚠️ Table '{table}' does not exist, skipping...") + continue + + for column in columns: + # Check if column exists + cursor.execute(f"PRAGMA table_info({table})") + columns_info = cursor.fetchall() + column_exists = any(col[1] == column for col in columns_info) + + if not column_exists: + print(f"⚠️ Column '{table}.{column}' does not exist, skipping...") + continue + + # Get primary key column name + cursor.execute(f"PRAGMA table_info({table})") + columns_info = cursor.fetchall() + pk_column = None + for col in columns_info: + if col[5] == 1: # pk flag + pk_column = col[1] + break + + if not pk_column: + print(f"⚠️ Table '{table}' has no primary key, skipping column '{column}'...") + continue + + # Get all rows with this column + cursor.execute(f"SELECT {pk_column}, {column} FROM {table} WHERE {column} IS NOT NULL") + rows = cursor.fetchall() + + for row_id, dt_value in rows: + if dt_value is None: + continue + + try: + # Parse the datetime value + if isinstance(dt_value, str): + # Check if it's a Date (YYYY-MM-DD) or DateTime (YYYY-MM-DD HH:MM:SS) + if ' ' in dt_value or 'T' in dt_value: + # It's a DateTime - try parsing with microseconds + if '.' in dt_value: + dt = datetime.strptime(dt_value.split('.')[0], '%Y-%m-%d %H:%M:%S') + elif 'T' in dt_value: + # ISO format with T + dt = datetime.fromisoformat(dt_value.replace('Z', '+00:00').split('.')[0]) + else: + dt = datetime.strptime(dt_value, '%Y-%m-%d %H:%M:%S') + + # Format without microseconds + new_value = dt.strftime('%Y-%m-%d %H:%M:%S') + + # Update if different + if new_value != dt_value: + cursor.execute( + f"UPDATE {table} SET {column} = ? WHERE {pk_column} = ?", + (new_value, row_id) + ) + fixed_count += 1 + print(f"✅ Fixed {table}.{column} for {pk_column}={row_id}: {dt_value} -> {new_value}") + else: + # It's a Date (YYYY-MM-DD) - this is fine, no need to fix + pass + except (ValueError, TypeError) as e: + print(f"⚠️ Could not parse {table}.{column} for {pk_column}={row_id}: {dt_value} ({e})") + continue + + conn.commit() + conn.close() + + print(f"\n✅ Fixed {fixed_count} datetime values") + print("✅ Database datetime format is now Prisma-compatible") + + +if __name__ == "__main__": + # Get database path from environment or use default + import os + from pathlib import Path + + db_url = os.getenv("DATABASE_URL", "sqlite:///data/punimtag.db") + + if db_url.startswith("sqlite:///"): + db_path = db_url.replace("sqlite:///", "") + elif db_url.startswith("file:"): + db_path = db_url.replace("file:", "") + else: + print("❌ This script only works with SQLite databases") + print(f" DATABASE_URL: {db_url}") + sys.exit(1) + + # Resolve relative path + if not Path(db_path).is_absolute(): + # Assume relative to project root + project_root = Path(__file__).parent.parent.parent + db_path = project_root / db_path + + db_path = str(db_path) + + if not Path(db_path).exists(): + print(f"❌ Database file not found: {db_path}") + sys.exit(1) + + print(f"🔧 Fixing datetime format in: {db_path}\n") + fix_datetime_in_db(db_path) + print("\n✅ Done!") + diff --git a/backend/services/face_service.py b/backend/services/face_service.py index e53cef0..6b6903f 100644 --- a/backend/services/face_service.py +++ b/backend/services/face_service.py @@ -723,7 +723,27 @@ def process_photo_faces( db.add(face) faces_stored += 1 - db.commit() + # Commit all faces at once - wrap in try-except to handle database errors gracefully + try: + db.commit() + except Exception as commit_error: + # If commit fails, rollback and log the error + db.rollback() + error_msg = str(commit_error) + try: + _print_with_stderr(f"[FaceService] Failed to commit {faces_stored} faces for {photo.filename}: {error_msg}") + import traceback + traceback.print_exc() + except (BrokenPipeError, OSError): + pass + + # Reset faces_stored since commit failed - faces weren't actually stored + # This ensures the return value accurately reflects what was actually saved + faces_stored = 0 + + # Re-raise to be caught by outer exception handler in process_unprocessed_photos + # This allows the batch to continue processing other photos + raise Exception(f"Database commit failed for {photo.filename}: {error_msg}") # Mark photo as processed after handling faces (desktop parity) try: @@ -1063,239 +1083,375 @@ def process_unprocessed_photos( Returns: Tuple of (photos_processed, total_faces_detected, total_faces_stored) """ - print(f"[FaceService] Starting face processing: detector={detector_backend}, model={model_name}, batch_size={batch_size}") - overall_start = time.time() - - # Update progress - querying unprocessed photos - if update_progress: - batch_msg = f"Finding up to {batch_size} photos" if batch_size else "Finding photos" - update_progress(0, 0, f"{batch_msg} that need processing...", 0, 0) - - # Desktop parity: find photos that are not yet processed - # Also filter out videos (only process images for face detection) - query_start = time.time() - # Filter for unprocessed photos, excluding videos - unprocessed_query = db.query(Photo).filter( - Photo.processed == False, # noqa: E712 - Photo.media_type != 'video' # Skip videos (videos are marked as processed and not processed for faces) - ) - - # Apply batch size limit BEFORE executing query to avoid loading unnecessary photos - # When batch_size is set, only that many photos are fetched from the database - if batch_size: - unprocessed_query = unprocessed_query.limit(batch_size) - - # Execute query - only loads batch_size photos if limit was set - unprocessed_photos = unprocessed_query.all() - query_time = time.time() - query_start - print(f"[FaceService] Query completed in {query_time:.2f}s") - - total = len(unprocessed_photos) - print(f"[FaceService] Found {total} unprocessed photos") - if total == 0: - print("[FaceService] No photos to process") - if update_progress: - update_progress(0, 0, "No photos to process", 0, 0) - return 0, 0, 0 - - # Update progress - preparing to process - if update_progress: - update_progress(0, total, f"Preparing to process {total} photos...", 0, 0) - + # Initialize result variables at function level so they're available in exception handler photos_processed = 0 total_faces_detected = 0 total_faces_stored = 0 - print(f"[FaceService] Starting processing of {total} photos...") - - # Helper to check if job was cancelled - def check_cancelled() -> bool: - """Check if job has been cancelled.""" + try: + print(f"[FaceService] Starting face processing: detector={detector_backend}, model={model_name}, batch_size={batch_size}") + overall_start = time.time() + + # Update progress - querying unprocessed photos if update_progress: - # Try to check job metadata for cancellation flag + try: + batch_msg = f"Finding up to {batch_size} photos" if batch_size else "Finding photos" + update_progress(0, 0, f"{batch_msg} that need processing...", 0, 0) + except Exception as e: + # If update_progress fails, log but continue + try: + print(f"[FaceService] Warning: update_progress failed: {e}") + except (BrokenPipeError, OSError): + pass + + # Desktop parity: find photos that are not yet processed + # Also filter out videos (only process images for face detection) + query_start = time.time() + # Filter for unprocessed photos, excluding videos + unprocessed_query = db.query(Photo).filter( + Photo.processed == False, # noqa: E712 + Photo.media_type != 'video' # Skip videos (videos are marked as processed and not processed for faces) + ) + + # Apply batch size limit BEFORE executing query to avoid loading unnecessary photos + # When batch_size is set, only that many photos are fetched from the database + if batch_size: + unprocessed_query = unprocessed_query.limit(batch_size) + + # Execute query - only loads batch_size photos if limit was set + unprocessed_photos = unprocessed_query.all() + query_time = time.time() - query_start + print(f"[FaceService] Query completed in {query_time:.2f}s") + + total = len(unprocessed_photos) + print(f"[FaceService] Found {total} unprocessed photos") + if total == 0: + print("[FaceService] No photos to process") + if update_progress: + update_progress(0, 0, "No photos to process", 0, 0) + return 0, 0, 0 + + # Update progress - preparing to process + if update_progress: + update_progress(0, total, f"Preparing to process {total} photos...", 0, 0) + + photos_processed = 0 + total_faces_detected = 0 + total_faces_stored = 0 + + print(f"[FaceService] Starting processing of {total} photos...") + + # Helper to check if job was cancelled + def check_cancelled() -> bool: + """Check if job has been cancelled by fetching fresh job metadata from Redis. + + This function MUST fetch the job fresh from Redis to get the latest cancellation + status, because the cancellation flag is set by the API endpoint in a + different process. The cached job object won't see the updated metadata. + + Also checks RQ's built-in cancellation status. + """ try: from rq import get_current_job + from redis import Redis + from rq.job import Job + job = get_current_job() - if job and job.meta and job.meta.get("cancelled", False): - return True - except Exception: - pass - return False - - # Pre-warm DeepFace models BEFORE processing photos - # This moves the model loading delay to initialization phase (with progress updates) - # instead of causing delay during first photo processing - if total > 0: - print(f"[FaceService] Pre-warming DeepFace models...") - _pre_warm_deepface(detector_backend, model_name, update_progress) - - # Check cancellation after pre-warming - if check_cancelled(): - print("[FaceService] Job cancelled before processing started") - return photos_processed, total_faces_detected, total_faces_stored - - # Initialize PoseDetector ONCE for the entire batch (reuse across all photos) - # This avoids reinitializing RetinaFace for every photo, which is very slow - pose_detector = None - if RETINAFACE_AVAILABLE: - try: - print(f"[FaceService] Initializing RetinaFace pose detector...") - pose_detector = PoseDetector() - print(f"[FaceService] Pose detector initialized successfully") - except Exception as e: - print(f"[FaceService] ⚠️ Pose detection not available: {e}, will skip pose detection") - pose_detector = None - - # Update progress - models are ready, starting photo processing - if update_progress and total > 0: - update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0) - - for idx, photo in enumerate(unprocessed_photos, 1): - # Check for cancellation - if check_cancelled(): - print(f"[FaceService] Job cancelled at photo {idx}/{total}") - if update_progress: + if not job: + return False + + job_id = job.id + + # Method 1: Check RQ's built-in cancellation status + # RQ sets this when job.cancel() is called try: + # Check if job has is_canceled attribute (RQ 1.x) + if hasattr(job, 'is_canceled') and job.is_canceled: + print(f"[FaceService] ⚠️ STOP: RQ is_canceled=True detected for job {job_id}") + return True + # Check if job has is_cancelled attribute (some RQ versions) + if hasattr(job, 'is_cancelled') and job.is_cancelled: + print(f"[FaceService] ⚠️ STOP: RQ is_cancelled=True detected for job {job_id}") + return True + except Exception as cancel_check_error: + try: + print(f"[FaceService] Debug: Error checking RQ cancellation: {cancel_check_error}") + except (BrokenPipeError, OSError): + pass + + # Method 2: Fetch job fresh from Redis to get latest metadata + # The cancellation flag is set by the API endpoint in a different process + try: + # Use the job's connection if available, otherwise create new one + if hasattr(job, 'connection') and job.connection: + connection = job.connection + else: + connection = Redis(host="localhost", port=6379, db=0, decode_responses=False) + + # Fetch fresh job from Redis + fresh_job = Job.fetch(job_id, connection=connection) + + # Check metadata flag + cancelled = fresh_job.meta and fresh_job.meta.get("cancelled", False) + if cancelled: + print(f"[FaceService] ⚠️ STOP: CANCELLATION FLAG DETECTED in job {job_id} metadata - stopping after current photo") + print(f"[FaceService] Fresh job meta: {fresh_job.meta}") + else: + # Debug: log when we check but don't find cancellation (only occasionally to avoid spam) + import random + if random.random() < 0.01: # Log 1% of checks + print(f"[FaceService] Debug: Checked cancellation for job {job_id}, not cancelled (meta: {fresh_job.meta})") + return cancelled + except Exception as fetch_error: + # If fetch fails, try using the cached job object as fallback + try: + cancelled = job.meta and job.meta.get("cancelled", False) + if cancelled: + print(f"[FaceService] ⚠️ STOP: CANCELLATION DETECTED (cached metadata) in job {job_id}") + return cancelled + except Exception: + pass + # Log the fetch error for debugging + try: + print(f"[FaceService] Warning: Could not fetch fresh job for cancellation check: {fetch_error}") + except (BrokenPipeError, OSError): + pass + except Exception as e: + # If we can't check, assume not cancelled (fail-safe) + try: + print(f"[FaceService] Warning: Could not check cancellation status: {e}") + except (BrokenPipeError, OSError): + pass + return False + + # Pre-warm DeepFace models BEFORE processing photos + # This moves the model loading delay to initialization phase (with progress updates) + # instead of causing delay during first photo processing + if total > 0: + print(f"[FaceService] Pre-warming DeepFace models...") + _pre_warm_deepface(detector_backend, model_name, update_progress) + + # Check cancellation after pre-warming + if check_cancelled(): + print("[FaceService] Job cancelled before processing started") + return photos_processed, total_faces_detected, total_faces_stored + + # Initialize PoseDetector ONCE for the entire batch (reuse across all photos) + # This avoids reinitializing RetinaFace for every photo, which is very slow + pose_detector = None + if RETINAFACE_AVAILABLE: + try: + print(f"[FaceService] Initializing RetinaFace pose detector...") + pose_detector = PoseDetector() + print(f"[FaceService] Pose detector initialized successfully") + except Exception as e: + print(f"[FaceService] ⚠️ Pose detection not available: {e}, will skip pose detection") + pose_detector = None + + # Update progress - models are ready, starting photo processing + if update_progress and total > 0: + update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0) + + for idx, photo in enumerate(unprocessed_photos, 1): + # Check for cancellation BEFORE starting each photo + # This is the primary cancellation point - we stop before starting a new photo + if check_cancelled(): + print(f"[FaceService] ⚠️ Job cancellation detected at photo {idx}/{total} - stopping before processing") + if update_progress: + try: + update_progress( + idx - 1, + total, + "Stopped by user", + total_faces_detected, + total_faces_stored, + ) + except Exception: + pass + # Raise KeyboardInterrupt to signal cancellation to the task handler + raise KeyboardInterrupt("Job cancelled by user") + + try: + # Update progress before processing each photo + if update_progress: update_progress( idx - 1, total, - "Cancelled by user", + f"Processing {photo.filename}... ({idx}/{total})", total_faces_detected, total_faces_stored, ) - except KeyboardInterrupt: - # Expected when cancellation is detected - pass - # Raise KeyboardInterrupt to signal cancellation to the task handler - raise KeyboardInterrupt("Job cancelled by user") - + + # Time the first photo to see if there's still delay after pre-warming + if idx == 1: + first_photo_start = time.time() + print(f"[FaceService] Starting first photo processing...") + + # Process photo fully (including pose detection, DeepFace, and database commit) + # CRITICAL: This photo will complete FULLY even if cancellation is requested + # The cancellation check happens ONLY BEFORE starting this photo and AFTER completing it + # This ensures no partial data is saved - the entire photo processing (including DB commit) + # completes before we check for cancellation + print(f"[FaceService] Processing photo {idx}/{total}: {photo.filename} (cancellation will be checked AFTER this photo completes)") + faces_detected, faces_stored = process_photo_faces( + db, + photo, + detector_backend=detector_backend, + model_name=model_name, + pose_detector=pose_detector, # Reuse the same detector for all photos + ) + + # Photo processing is COMPLETE at this point (including DB commit) + # All faces have been stored in the database + total_faces_detected += faces_detected + total_faces_stored += faces_stored + photos_processed += 1 + + # Log timing for first photo + if idx == 1: + first_photo_time = time.time() - first_photo_start + print(f"[FaceService] First photo completed in {first_photo_time:.2f}s") + + # Log completion with explicit confirmation that photo is fully processed + print(f"[FaceService] ✓ Photo {idx}/{total} ({photo.filename}) FULLY COMPLETED: {faces_detected} faces detected, {faces_stored} faces stored in database") + + # Update progress to show completion (including pose detection and DB commit) + # This happens AFTER the entire photo processing is complete + if update_progress: + try: + update_progress( + idx, + total, + f"Completed {photo.filename} ({idx}/{total})", + total_faces_detected, + total_faces_stored, + ) + except KeyboardInterrupt: + # If cancellation was detected during update_progress, check again + if check_cancelled(): + print(f"[FaceService] Job cancelled during progress update after photo {idx}/{total} (photo is already fully processed)") + # Raise KeyboardInterrupt to signal cancellation to the task handler + raise KeyboardInterrupt("Job cancelled by user after completing current photo") + # Re-raise if it wasn't a cancellation + raise + + # NOW check for cancellation - photo is fully complete (including DB commit) + # This ensures the entire photo processing is done (pose detection, DeepFace, validation, DB commit), + # and the progress shows "Completed", then stops before the next one + if check_cancelled(): + print(f"[FaceService] ⚠️ STOPPING: Cancellation detected AFTER photo {idx}/{total} ({photo.filename}) fully completed") + print(f"[FaceService] Photo {idx} is safe: {faces_stored} faces stored in database before stopping") + # Raise KeyboardInterrupt to signal cancellation to the task handler + raise KeyboardInterrupt("Job cancelled by user after completing current photo") + except KeyboardInterrupt: + # Cancellation was requested - stop processing gracefully + print(f"[FaceService] Job cancelled during processing of photo {idx}/{total}") + if check_cancelled(): + if update_progress: + try: + update_progress( + idx, + total, + "Cancelled by user", + total_faces_detected, + total_faces_stored, + ) + except KeyboardInterrupt: + pass + break + # Re-raise if it wasn't a cancellation + raise + except Exception as e: + # Log error but continue processing other photos + try: + print(f"[FaceService] Error processing photo {photo.filename}: {e}") + except (BrokenPipeError, OSError): + pass # Ignore broken pipe errors + + # Try to print traceback, but don't fail if stdout is closed + try: + import traceback + traceback.print_exc() + except (BrokenPipeError, OSError): + # If printing fails, at least log the error type + try: + print(f"[FaceService] Error type: {type(e).__name__}: {str(e)}") + except (BrokenPipeError, OSError): + pass + + # Refresh database session after error to ensure it's in a good state + # This prevents session state issues from affecting subsequent photos + # Note: process_photo_faces already does db.rollback(), but we ensure + # the session is clean for the next photo + try: + # Ensure any partial transaction is rolled back (may be redundant but safe) + db.rollback() + # Expire the current photo object to clear any stale state + db.expire(photo) + except Exception as session_error: + # If session refresh fails, log but don't fail the batch + try: + print(f"[FaceService] Warning: Could not refresh session after error: {session_error}") + except (BrokenPipeError, OSError): + pass + + if update_progress: + try: + update_progress( + idx, + total, + f"Error: {photo.filename}", + total_faces_detected, + total_faces_stored, + ) + except KeyboardInterrupt: + # If cancellation detected during error progress update, stop + if check_cancelled(): + print(f"[FaceService] Job cancelled after error on photo {idx}/{total}") + break + + # Check for cancellation after handling error + if check_cancelled(): + print(f"[FaceService] Job cancelled after error on photo {idx}/{total}") + if update_progress: + try: + update_progress( + idx, + total, + "Cancelled by user - finished current photo", + total_faces_detected, + total_faces_stored, + ) + except KeyboardInterrupt: + pass + break + + except Exception as e: + # Catch any exceptions that occur outside the photo processing loop + # (e.g., during initialization, model loading, database queries, etc.) try: - # Update progress before processing each photo - if update_progress: + print(f"[FaceService] Fatal error in process_unprocessed_photos: {e}") + import traceback + traceback.print_exc() + except (BrokenPipeError, OSError): + pass + + # Try to update progress with error message + if update_progress: + try: update_progress( - idx - 1, - total, - f"Processing {photo.filename}... ({idx}/{total})", + photos_processed, + 0, + f"Fatal error: {str(e)}", total_faces_detected, total_faces_stored, ) - - # Time the first photo to see if there's still delay after pre-warming - if idx == 1: - first_photo_start = time.time() - print(f"[FaceService] Starting first photo processing...") - - # Process photo fully (including pose detection, DeepFace, and database commit) - # This ensures all data is complete before checking for cancellation - faces_detected, faces_stored = process_photo_faces( - db, - photo, - detector_backend=detector_backend, - model_name=model_name, - pose_detector=pose_detector, # Reuse the same detector for all photos - ) - - total_faces_detected += faces_detected - total_faces_stored += faces_stored - photos_processed += 1 - - # Log timing for first photo - if idx == 1: - first_photo_time = time.time() - first_photo_start - print(f"[FaceService] First photo completed in {first_photo_time:.2f}s") - - # Update progress to show completion (including pose detection) - # This happens AFTER the entire photo processing is complete - if update_progress: - try: - update_progress( - idx, - total, - f"Completed {photo.filename} ({idx}/{total})", - total_faces_detected, - total_faces_stored, - ) - except KeyboardInterrupt: - # If cancellation was detected during update_progress, check again - if check_cancelled(): - print(f"[FaceService] Job cancelled during progress update after photo {idx}/{total}") - # Raise KeyboardInterrupt to signal cancellation to the task handler - raise KeyboardInterrupt("Job cancelled by user after completing current photo") - # Re-raise if it wasn't a cancellation - raise - - # Check for cancellation AFTER updating progress (photo is fully complete) - # This ensures the entire photo processing is done (including pose detection and DB commit), - # and the progress shows "Completed", then stops before the next one - if check_cancelled(): - print(f"[FaceService] Job cancelled after completing photo {idx}/{total} (including pose detection)") - # Raise KeyboardInterrupt to signal cancellation to the task handler - raise KeyboardInterrupt("Job cancelled by user after completing current photo") - except KeyboardInterrupt: - # Cancellation was requested - stop processing gracefully - print(f"[FaceService] Job cancelled during processing of photo {idx}/{total}") - if check_cancelled(): - if update_progress: - try: - update_progress( - idx, - total, - "Cancelled by user", - total_faces_detected, - total_faces_stored, - ) - except KeyboardInterrupt: - pass - break - # Re-raise if it wasn't a cancellation - raise - except Exception as e: - # Log error but continue processing other photos - try: - print(f"[FaceService] Error processing photo {photo.filename}: {e}") - except (BrokenPipeError, OSError): - pass # Ignore broken pipe errors - - # Try to print traceback, but don't fail if stdout is closed - try: - import traceback - traceback.print_exc() - except (BrokenPipeError, OSError): - # If printing fails, at least log the error type - try: - print(f"[FaceService] Error type: {type(e).__name__}: {str(e)}") - except (BrokenPipeError, OSError): - pass - if update_progress: - try: - update_progress( - idx, - total, - f"Error: {photo.filename}", - total_faces_detected, - total_faces_stored, - ) - except KeyboardInterrupt: - # If cancellation detected during error progress update, stop - if check_cancelled(): - print(f"[FaceService] Job cancelled after error on photo {idx}/{total}") - break - - # Check for cancellation after handling error - if check_cancelled(): - print(f"[FaceService] Job cancelled after error on photo {idx}/{total}") - if update_progress: - try: - update_progress( - idx, - total, - "Cancelled by user - finished current photo", - total_faces_detected, - total_faces_stored, - ) - except KeyboardInterrupt: - pass - break + except Exception: + pass + + # Re-raise so the task handler can log it properly + # But we've already logged progress, so the batch can show partial results + raise return photos_processed, total_faces_detected, total_faces_stored diff --git a/backend/services/tasks.py b/backend/services/tasks.py index 1064ba3..1776692 100644 --- a/backend/services/tasks.py +++ b/backend/services/tasks.py @@ -127,34 +127,44 @@ def process_faces_task( faces_detected: int, faces_stored: int, ) -> None: - """Update job progress and check for cancellation.""" - if job: - # Check if job was cancelled - if job.meta and job.meta.get("cancelled", False): - return # Don't update if cancelled - - # Calculate progress: 10% for setup, 90% for processing - if total == 0: - # Setup phase - progress = min(10, processed * 2) # 0-10% during setup - else: - # Processing phase - progress = 10 + int((processed / total) * 90) if total > 0 else 10 - - job.meta = { - "progress": progress, - "message": f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file, - "processed": processed, - "total": total, - "faces_detected": faces_detected, - "faces_stored": faces_stored, - } - job.save_meta() - - # Check for cancellation after updating - if job.meta and job.meta.get("cancelled", False): - print(f"[Task] Job {job.id} cancellation detected") - raise KeyboardInterrupt("Job cancelled by user") + """Update job progress. + + NOTE: This function does NOT check for cancellation to avoid interrupting + photo processing. Cancellation is checked in process_unprocessed_photos + BEFORE starting each photo and AFTER completing it, ensuring the current + photo always finishes fully. + """ + try: + if job: + # Calculate progress: 10% for setup, 90% for processing + if total == 0: + # Setup phase + progress = min(10, processed * 2) # 0-10% during setup + else: + # Processing phase + progress = 10 + int((processed / total) * 90) if total > 0 else 10 + + # Avoid duplicating "Processing" if current_file already contains it + if current_file.startswith("Processing "): + message = f"{current_file} ({processed}/{total})" if total > 0 else current_file + else: + message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file + + job.meta = { + "progress": progress, + "message": message, + "processed": processed, + "total": total, + "faces_detected": faces_detected, + "faces_stored": faces_stored, + } + job.save_meta() + except Exception as e: + # Log but don't fail the batch if progress update fails + try: + print(f"[Task] Warning: Failed to update progress: {e}") + except (BrokenPipeError, OSError): + pass # Update progress - finding photos if job: @@ -169,15 +179,26 @@ def process_faces_task( job.save_meta() # Process faces - photos_processed, total_faces_detected, total_faces_stored = ( - process_unprocessed_photos( - db, - batch_size=batch_size, - detector_backend=detector_backend, - model_name=model_name, - update_progress=update_progress, + # Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails + try: + photos_processed, total_faces_detected, total_faces_stored = ( + process_unprocessed_photos( + db, + batch_size=batch_size, + detector_backend=detector_backend, + model_name=model_name, + update_progress=update_progress, + ) ) - ) + except Exception as e: + # If process_unprocessed_photos fails, preserve any progress made + # and re-raise so the outer handler can log it properly + try: + print(f"[Task] Error in process_unprocessed_photos: {e}") + except (BrokenPipeError, OSError): + pass + # Re-raise to be handled by outer exception handler + raise # Final update result = { @@ -252,13 +273,15 @@ def process_faces_task( if job: try: + # Preserve progress information if available + existing_meta = job.meta or {} job.meta = { - "progress": 0, + "progress": existing_meta.get("progress", 0), "message": error_msg, - "processed": 0, - "total": 0, - "faces_detected": 0, - "faces_stored": 0, + "processed": existing_meta.get("processed", photos_processed), + "total": existing_meta.get("total", 0), + "faces_detected": existing_meta.get("faces_detected", total_faces_detected), + "faces_stored": existing_meta.get("faces_stored", total_faces_stored), } job.save_meta() except Exception: