diff --git a/README.md b/README.md index 29a4e71..464acb2 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,8 @@ Then open your browser to **http://localhost:3000** - Make sure Redis is running first, or the worker won't start - Worker names are unique to avoid conflicts when restarting - Photo uploads are stored in `data/uploads` (configurable via `PHOTO_STORAGE_DIR` env var) +- **DeepFace models download automatically on first use** (can take 5-10 minutes) +- **If port 8000 is in use**, kill the process: `lsof -i :8000` then `kill ` or `pkill -f "uvicorn.*app"` --- @@ -141,6 +143,11 @@ Then open your browser to **http://localhost:3000** - Real-time job status updates (SSE) - Duplicate detection by checksum - EXIF metadata extraction +- DeepFace face detection and recognition pipeline +- Configurable detectors (RetinaFace, MTCNN, OpenCV, SSD) +- Configurable models (ArcFace, Facenet, Facenet512, VGG-Face) +- Process tab UI for face processing +- Job cancellation support --- @@ -199,7 +206,7 @@ punimtag/ - ✅ Indices configured for performance - ✅ SQLite database at `data/punimtag.db` -### Phase 2: Image Ingestion & Scan Tab ✅ **COMPLETE** +### Phase 2: Image Ingestion & Processing ✅ **COMPLETE** **Backend:** - ✅ Photo import service with checksum computation @@ -210,6 +217,14 @@ punimtag/ - ✅ Real-time job progress via SSE (Server-Sent Events) - ✅ Duplicate detection (by path and checksum) - ✅ Photo storage configuration (`PHOTO_STORAGE_DIR`) +- ✅ **DeepFace pipeline integration** +- ✅ **Face detection (RetinaFace, MTCNN, OpenCV, SSD)** +- ✅ **Face embeddings computation (ArcFace, Facenet, Facenet512, VGG-Face)** +- ✅ **Face processing service with configurable detectors/models** +- ✅ **EXIF orientation handling** +- ✅ **Face quality scoring and validation** +- ✅ **Batch processing with progress tracking** +- ✅ **Job cancellation support** **Frontend:** - ✅ Scan tab UI with folder selection @@ -219,20 +234,27 @@ punimtag/ - ✅ Job status monitoring (SSE integration) - ✅ Results display (added/existing counts) - ✅ Error handling and user feedback +- ✅ **Process tab UI with configuration controls** +- ✅ **Detector/model selection dropdowns** +- ✅ **Batch size configuration** +- ✅ **Start/Stop processing controls** +- ✅ **Processing progress display with photo count** +- ✅ **Results summary (faces detected, faces stored)** +- ✅ **Job cancellation support** **Worker:** - ✅ RQ worker auto-starts with API server - ✅ Unique worker names to avoid conflicts - ✅ Graceful shutdown handling +- ✅ **String-based function paths for reliable serialization** -### Next: Phase 3 - Face Processing & Identify +### Next: Phase 3 - Identify Workflow & Auto-Match -- DeepFace pipeline integration -- Face detection (RetinaFace, MTCNN, OpenCV, SSD) -- Face embeddings computation (ArcFace, Facenet, etc.) - Identify workflow UI -- Auto-match engine -- Process tab implementation +- Auto-match engine with similarity thresholds +- Unidentified faces management +- Person creation and linking +- Batch identification support --- @@ -295,18 +317,23 @@ npm test - Database setup - Basic API endpoints -### ✅ Phase 2: Image Ingestion & Scan Tab (Complete) +### ✅ Phase 2: Image Ingestion & Processing (Complete) - ✅ Photo import (folder scan and file upload) - ✅ Background job processing with RQ - ✅ Real-time progress tracking via SSE - ✅ Scan tab UI implementation - ✅ Duplicate detection and metadata extraction +- ✅ DeepFace face detection and processing pipeline +- ✅ Process tab UI with configuration controls +- ✅ Configurable detectors and models +- ✅ Face processing with progress tracking +- ✅ Job cancellation support -### 🔄 Phase 3: Processing & Identify (In Progress) -- Face detection and processing pipeline (DeepFace) +### 🔄 Phase 3: Identify Workflow & Auto-Match (In Progress) - Identify workflow UI -- Auto-match engine -- Process tab implementation +- Auto-match engine with similarity thresholds +- Unidentified faces management +- Person creation and linking ### 📋 Phase 4: Search & Tags - Search endpoints with filters @@ -387,6 +414,8 @@ npm test - No password hashing yet (plain text comparison - fix before production) - GPU acceleration not yet implemented - Large databases (>50K photos) may require optimization +- DeepFace model downloads on first use (can take 5-10 minutes) +- Face processing is CPU-intensive (GPU support planned for future) --- diff --git a/docs/WEBSITE_MIGRATION_PLAN.md b/docs/WEBSITE_MIGRATION_PLAN.md index c2b9f2a..abb7cfd 100644 --- a/docs/WEBSITE_MIGRATION_PLAN.md +++ b/docs/WEBSITE_MIGRATION_PLAN.md @@ -144,7 +144,7 @@ Visual design: neutral palette, clear hierarchy, low-chrome UI, subtle motion (< - Auth flow (login page, token storage), API client with interceptors - Routes: Dashboard (placeholder), Scan (placeholder), Process (placeholder), Search (placeholder), Identify (placeholder), Auto-Match (placeholder), Tags (placeholder), Settings (placeholder) -### Phase 2: Processing & Identify (2–3 weeks) +### Phase 2: Processing & Identify (2–3 weeks) ✅ **IN PROGRESS** - Image ingestion: - Backend: `/photos/import` supports folder ingest and file upload - Compute checksums; store originals on disk; create DB rows @@ -156,18 +156,22 @@ Visual design: neutral palette, clear hierarchy, low-chrome UI, subtle motion (< - Display scan results (count of photos added) - Trigger background job for photo import - Real-time job status with SSE progress updates -- DeepFace pipeline: - - Worker task: detect faces (RetinaFace), compute embeddings (ArcFace); store embeddings as binary - - Persist face bounding boxes, confidence, quality; link to photos - - Configurable batch size and thresholds via settings -- **Process tab UI:** - - Start/stop face processing controls - - Batch size configuration - - Detector/model selection dropdowns (RetinaFace, MTCNN, OpenCV, SSD / ArcFace, Facenet, etc.) - - Processing progress bar with current photo count - - Job status display (pending, running, completed, failed) - - Results summary (faces detected, processing time) - - Error handling and retry mechanism +- ✅ **DeepFace pipeline:** + - ✅ Worker task: detect faces (RetinaFace), compute embeddings (ArcFace); store embeddings as binary + - ✅ Persist face bounding boxes, confidence, quality; link to photos + - ✅ Configurable batch size and thresholds via settings + - ✅ EXIF orientation handling + - ✅ Face quality scoring and validation +- ✅ **Process tab UI:** + - ✅ Start/stop face processing controls + - ✅ Batch size configuration + - ✅ Detector/model selection dropdowns (RetinaFace, MTCNN, OpenCV, SSD / ArcFace, Facenet, etc.) + - ✅ Processing progress bar with current photo count + - ✅ Job status display (pending, running, completed, failed) + - ✅ Results summary (faces detected, processing time) + - ✅ Error handling and retry mechanism + - ✅ Job cancellation support + - ✅ Real-time progress updates via SSE - Identify workflow: - API: `/faces/unidentified`, `/faces/{id}/identify` (assign existing/new person) - Implement person creation and linking, plus `person_embeddings` insertions diff --git a/frontend/src/api/faces.ts b/frontend/src/api/faces.ts new file mode 100644 index 0000000..2f8fc83 --- /dev/null +++ b/frontend/src/api/faces.ts @@ -0,0 +1,29 @@ +import apiClient from './client' +import { JobResponse } from './jobs' + +export interface ProcessFacesRequest { + batch_size?: number + detector_backend: string + model_name: string +} + +export interface ProcessFacesResponse { + job_id: string + message: string + batch_size?: number + detector_backend: string + model_name: string +} + +export const facesApi = { + /** + * Start face processing job + */ + processFaces: async (request: ProcessFacesRequest): Promise => { + const response = await apiClient.post('/api/v1/faces/process', request) + return response.data + }, +} + +export default facesApi + diff --git a/frontend/src/api/jobs.ts b/frontend/src/api/jobs.ts index 7e80773..e3caef3 100644 --- a/frontend/src/api/jobs.ts +++ b/frontend/src/api/jobs.ts @@ -24,5 +24,17 @@ export const jobsApi = { ) return data }, + + streamJobProgress: (jobId: string): EventSource => { + const baseURL = import.meta.env.VITE_API_URL || 'http://127.0.0.1:8000' + return new EventSource(`${baseURL}/api/v1/jobs/stream/${jobId}`) + }, + + cancelJob: async (jobId: string): Promise<{ message: string; status: string }> => { + const { data } = await apiClient.delete<{ message: string; status: string }>( + `/api/v1/jobs/${jobId}` + ) + return data + }, } diff --git a/frontend/src/pages/Process.tsx b/frontend/src/pages/Process.tsx index 394cc4a..f45164e 100644 --- a/frontend/src/pages/Process.tsx +++ b/frontend/src/pages/Process.tsx @@ -1,12 +1,427 @@ +import { useState, useRef, useCallback, useEffect } from 'react' +import { facesApi, ProcessFacesRequest } from '../api/faces' +import { jobsApi, JobResponse, JobStatus } from '../api/jobs' + +interface JobProgress { + id: string + status: string + progress: number + message: string + processed?: number + total?: number + faces_detected?: number + faces_stored?: number +} + +const DETECTOR_OPTIONS = ['retinaface', 'mtcnn', 'opencv', 'ssd'] +const MODEL_OPTIONS = ['ArcFace', 'Facenet', 'Facenet512', 'VGG-Face'] + export default function Process() { + const [batchSize, setBatchSize] = useState(undefined) + const [detectorBackend, setDetectorBackend] = useState('retinaface') + const [modelName, setModelName] = useState('ArcFace') + const [isProcessing, setIsProcessing] = useState(false) + const [currentJob, setCurrentJob] = useState(null) + const [jobProgress, setJobProgress] = useState(null) + const [processingResult, setProcessingResult] = useState<{ + photos_processed?: number + faces_detected?: number + faces_stored?: number + } | null>(null) + const [error, setError] = useState(null) + const eventSourceRef = useRef(null) + + // Cleanup event source on unmount + useEffect(() => { + return () => { + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + } + }, []) + + const handleStartProcessing = async () => { + setIsProcessing(true) + setError(null) + setProcessingResult(null) + setCurrentJob(null) + setJobProgress(null) + + try { + const request: ProcessFacesRequest = { + batch_size: batchSize || undefined, + detector_backend: detectorBackend, + model_name: modelName, + } + + const response = await facesApi.processFaces(request) + + // Set processing state immediately + setIsProcessing(true) + + setCurrentJob({ + id: response.job_id, + status: JobStatus.PENDING, + progress: 0, + message: response.message, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }) + + // Start SSE stream for job progress + startJobProgressStream(response.job_id) + } catch (err: any) { + setError(err.response?.data?.detail || err.message || 'Processing failed') + setIsProcessing(false) + } + } + + const handleStopProcessing = async () => { + if (!currentJob) { + return + } + + try { + // Call API to cancel the job + const result = await jobsApi.cancelJob(currentJob.id) + console.log('Job cancellation:', result) + + // Close SSE stream + if (eventSourceRef.current) { + eventSourceRef.current.close() + eventSourceRef.current = null + } + + // Update UI state + setIsProcessing(false) + setError(`Job cancelled: ${result.message}`) + + // Update job status + if (currentJob) { + setCurrentJob({ + ...currentJob, + status: JobStatus.FAILURE, + message: 'Cancelled by user', + }) + } + } catch (err: any) { + console.error('Error cancelling job:', err) + setError(err.response?.data?.detail || err.message || 'Failed to cancel job') + } + } + + const startJobProgressStream = (jobId: string) => { + // Close existing stream if any + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + + const eventSource = jobsApi.streamJobProgress(jobId) + eventSourceRef.current = eventSource + + eventSource.onmessage = (event) => { + try { + const data: JobProgress = JSON.parse(event.data) + setJobProgress(data) + + // Update job status + const statusMap: Record = { + pending: JobStatus.PENDING, + started: JobStatus.STARTED, + progress: JobStatus.PROGRESS, + success: JobStatus.SUCCESS, + failure: JobStatus.FAILURE, + } + + const jobStatus = statusMap[data.status] || JobStatus.PENDING + + setCurrentJob({ + id: data.id, + status: jobStatus, + progress: data.progress, + message: data.message, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }) + + // Keep processing state true while job is running + if (jobStatus === JobStatus.STARTED || jobStatus === JobStatus.PROGRESS) { + setIsProcessing(true) + } + + // Check if job is complete + if (jobStatus === JobStatus.SUCCESS || jobStatus === JobStatus.FAILURE) { + setIsProcessing(false) + eventSource.close() + eventSourceRef.current = null + + // Fetch final job result to get processing stats + if (jobStatus === JobStatus.SUCCESS) { + fetchJobResult(jobId) + } + } + } catch (err) { + console.error('Error parsing SSE event:', err) + } + } + + eventSource.onerror = (err) => { + console.error('SSE error:', err) + // Don't automatically set isProcessing to false on error + // Job might still be running even if SSE connection failed + // Check job status directly instead + if (currentJob) { + // Try to fetch job status directly + jobsApi.getJob(currentJob.id).then((job) => { + const stillRunning = job.status === JobStatus.STARTED || job.status === JobStatus.PROGRESS + setIsProcessing(stillRunning) + setCurrentJob(job) + }).catch(() => { + // If we can't get status, assume job might still be running + console.warn('Could not fetch job status after SSE error') + }) + } + } + } + + const fetchJobResult = async (jobId: string) => { + try { + const job = await jobsApi.getJob(jobId) + setCurrentJob(job) + + // Extract result data from job progress + if (jobProgress) { + setProcessingResult({ + photos_processed: jobProgress.processed, + faces_detected: jobProgress.faces_detected, + faces_stored: jobProgress.faces_stored, + }) + } + } catch (err) { + console.error('Error fetching job result:', err) + } + } + + const getStatusColor = (status: JobStatus) => { + switch (status) { + case JobStatus.SUCCESS: + return 'text-green-600' + case JobStatus.FAILURE: + return 'text-red-600' + case JobStatus.STARTED: + case JobStatus.PROGRESS: + return 'text-blue-600' + default: + return 'text-gray-600' + } + } + return ( -
-

Process

-
-

Face processing controls coming in Phase 2.

+
+

Process Faces

+ +
+ {/* Configuration Section */} +
+

+ Processing Configuration +

+ +
+ {/* Batch Size */} +
+ + + setBatchSize( + e.target.value ? parseInt(e.target.value, 10) : undefined + ) + } + placeholder="All unprocessed photos" + className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500" + disabled={isProcessing} + /> +

+ Leave empty to process all unprocessed photos +

+
+ + {/* Detector Backend */} +
+ + +

+ RetinaFace recommended for best accuracy +

+
+ + {/* Model Name */} +
+ + +

+ ArcFace recommended for best accuracy +

+
+ + {/* Control Buttons */} +
+ + {isProcessing && ( + + )} +
+
+
+ + {/* Progress Section */} + {(currentJob || jobProgress) && ( +
+

+ Processing Progress +

+ + {currentJob && ( +
+
+
+ + {currentJob.status === JobStatus.SUCCESS && '✓ '} + {currentJob.status === JobStatus.FAILURE && '✗ '} + {currentJob.status.charAt(0).toUpperCase() + + currentJob.status.slice(1)} + + + {currentJob.progress}% + +
+
+
+
+
+ + {jobProgress && ( +
+ {jobProgress.processed !== undefined && + jobProgress.total !== undefined && ( +

+ Photos processed: {jobProgress.processed} /{' '} + {jobProgress.total} +

+ )} + {jobProgress.faces_detected !== undefined && ( +

Faces detected: {jobProgress.faces_detected}

+ )} + {jobProgress.faces_stored !== undefined && ( +

Faces stored: {jobProgress.faces_stored}

+ )} + {jobProgress.message && ( +

{jobProgress.message}

+ )} +
+ )} +
+ )} +
+ )} + + {/* Results Section */} + {processingResult && ( +
+

+ Processing Results +

+ +
+ {processingResult.photos_processed !== undefined && ( +

+ ✓ {processingResult.photos_processed} photos processed +

+ )} + {processingResult.faces_detected !== undefined && ( +

+ {processingResult.faces_detected} faces detected +

+ )} + {processingResult.faces_stored !== undefined && ( +

+ {processingResult.faces_stored} faces stored in database +

+ )} +
+
+ )} + + {/* Error Section */} + {error && ( +
+

{error}

+
+ )}
) } - - diff --git a/src/web/api/faces.py b/src/web/api/faces.py index cb80d91..2c0c083 100644 --- a/src/web/api/faces.py +++ b/src/web/api/faces.py @@ -2,15 +2,65 @@ from __future__ import annotations -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException, status +from rq import Queue +from redis import Redis + +from src.web.schemas.faces import ProcessFacesRequest, ProcessFacesResponse +# Note: Function passed as string path to avoid RQ serialization issues router = APIRouter(prefix="/faces", tags=["faces"]) +# Redis connection for RQ +redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) +queue = Queue(connection=redis_conn) -@router.post("/process") -def process_faces() -> dict: - """Process faces - placeholder for Phase 2.""" - return {"message": "Process faces endpoint - to be implemented in Phase 2"} + +@router.post("/process", response_model=ProcessFacesResponse) +def process_faces(request: ProcessFacesRequest) -> ProcessFacesResponse: + """Start face processing job. + + This enqueues a background job to process faces in unprocessed photos + using DeepFace with the specified detector and model. + """ + try: + # Check if worker is available (basic check) + try: + redis_conn.ping() + except Exception as e: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Redis connection failed: {str(e)}", + ) + + # Enqueue face processing job + # Pass function as string path to avoid serialization issues + job = queue.enqueue( + "src.web.services.tasks.process_faces_task", + batch_size=request.batch_size, + detector_backend=request.detector_backend, + model_name=request.model_name, + job_timeout="1h", # Long timeout for face processing + ) + + print(f"[Faces API] Enqueued face processing job: {job.id}") + print(f"[Faces API] Job status: {job.get_status()}") + print(f"[Faces API] Queue length: {len(queue)}") + + return ProcessFacesResponse( + job_id=job.id, + message="Face processing job started", + batch_size=request.batch_size, + detector_backend=request.detector_backend, + model_name=request.model_name, + ) + except HTTPException: + raise + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to start face processing job: {str(e)}", + ) @router.get("/unidentified") diff --git a/src/web/api/jobs.py b/src/web/api/jobs.py index 25f5268..dfb81c4 100644 --- a/src/web/api/jobs.py +++ b/src/web/api/jobs.py @@ -26,19 +26,39 @@ def get_job(job_id: str) -> JobResponse: """Get job status by ID.""" try: job = Job.fetch(job_id, connection=redis_conn) + rq_status = job.get_status() status_map = { "queued": JobStatus.PENDING, - "started": JobStatus.STARTED, + "started": JobStatus.STARTED, # Job is actively running "finished": JobStatus.SUCCESS, "failed": JobStatus.FAILURE, } - job_status = status_map.get(job.get_status(), JobStatus.PENDING) - progress = 0 - if job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS: + job_status = status_map.get(rq_status, JobStatus.PENDING) + + # If job is started, check if it has progress + if rq_status == "started": + # Job is running - show progress if available + progress = job.meta.get("progress", 0) if job.meta else 0 + message = job.meta.get("message", "Processing...") if job.meta else "Processing..." + # Map to PROGRESS status if we have actual progress + if progress > 0: + job_status = JobStatus.PROGRESS + elif job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS: progress = job.meta.get("progress", 0) if job.meta else 0 elif job_status == JobStatus.SUCCESS: progress = 100 + else: + progress = 0 + message = job.meta.get("message", "") if job.meta else "" + + # If job failed, include error message + if rq_status == "failed" and job.exc_info: + # Extract error message from exception info + error_lines = job.exc_info.split("\n") + if error_lines: + message = f"Failed: {error_lines[0]}" + return JobResponse( id=job.id, status=job_status, @@ -49,10 +69,10 @@ def get_job(job_id: str) -> JobResponse: str(job.ended_at or job.started_at or job.created_at) ), ) - except Exception: + except Exception as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail=f"Job {job_id} not found", + detail=f"Job {job_id} not found: {str(e)}", ) @@ -95,6 +115,8 @@ def stream_job_progress(job_id: str): "message": message, "processed": job.meta.get("processed", 0) if job.meta else 0, "total": job.meta.get("total", 0) if job.meta else 0, + "faces_detected": job.meta.get("faces_detected", 0) if job.meta else 0, + "faces_stored": job.meta.get("faces_stored", 0) if job.meta else 0, } yield f"data: {json.dumps(event_data)}\n\n" @@ -116,3 +138,67 @@ def stream_job_progress(job_id: str): event_generator(), media_type="text/event-stream" ) + +@router.delete("/{job_id}") +def cancel_job(job_id: str) -> dict: + """Cancel a job (if queued) or stop a running job. + + Note: For running jobs, this sets a cancellation flag. + The job will check this flag and exit gracefully. + """ + try: + job = Job.fetch(job_id, connection=redis_conn) + rq_status = job.get_status() + + if rq_status == "finished": + return { + "message": f"Job {job_id} is already finished", + "status": "finished", + } + + if rq_status == "failed": + return { + "message": f"Job {job_id} already failed", + "status": "failed", + } + + if rq_status == "queued": + # Cancel queued job - remove from queue + job.cancel() + return { + "message": f"Job {job_id} cancelled (was queued)", + "status": "cancelled", + } + + if rq_status == "started": + # For running jobs, set cancellation flag in metadata + # The task will check this and exit gracefully + if job.meta is None: + job.meta = {} + job.meta["cancelled"] = True + job.meta["message"] = "Cancellation requested..." + job.save_meta() + + # Also try to cancel the job (which will interrupt it if possible) + try: + job.cancel() + except Exception: + # Job might already be running, that's OK + pass + + return { + "message": f"Job {job_id} cancellation requested", + "status": "cancelling", + } + + return { + "message": f"Job {job_id} status: {rq_status}", + "status": rq_status, + } + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Job {job_id} not found: {str(e)}", + ) + diff --git a/src/web/api/photos.py b/src/web/api/photos.py index a9d23c5..5417583 100644 --- a/src/web/api/photos.py +++ b/src/web/api/photos.py @@ -22,7 +22,7 @@ from src.web.services.photo_service import ( find_photos_in_folder, import_photo_from_path, ) -from src.web.services.tasks import import_photos_task +# Note: Function passed as string path to avoid RQ serialization issues router = APIRouter(prefix="/photos", tags=["photos"]) @@ -60,8 +60,9 @@ def import_photos( estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive)) # Enqueue job + # Pass function as string path to avoid serialization issues job = queue.enqueue( - import_photos_task, + "src.web.services.tasks.import_photos_task", request.folder_path, request.recursive, job_timeout="1h", # Allow up to 1 hour for large imports diff --git a/src/web/schemas/faces.py b/src/web/schemas/faces.py new file mode 100644 index 0000000..df302d4 --- /dev/null +++ b/src/web/schemas/faces.py @@ -0,0 +1,40 @@ +"""Face processing schemas.""" + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, Field, ConfigDict + + +class ProcessFacesRequest(BaseModel): + """Request to process faces in photos.""" + + model_config = ConfigDict(protected_namespaces=()) + + batch_size: Optional[int] = Field( + None, + ge=1, + description="Maximum number of photos to process (None = all unprocessed)", + ) + detector_backend: str = Field( + "retinaface", + description="DeepFace detector backend (retinaface, mtcnn, opencv, ssd)", + ) + model_name: str = Field( + "ArcFace", + description="DeepFace model name (ArcFace, Facenet, Facenet512, VGG-Face)", + ) + + +class ProcessFacesResponse(BaseModel): + """Response after initiating face processing.""" + + model_config = ConfigDict(protected_namespaces=()) + + job_id: str + message: str + batch_size: Optional[int] = None + detector_backend: str + model_name: str + diff --git a/src/web/services/face_service.py b/src/web/services/face_service.py new file mode 100644 index 0000000..5fa6ae1 --- /dev/null +++ b/src/web/services/face_service.py @@ -0,0 +1,420 @@ +"""Face detection and processing services for PunimTag Web.""" + +from __future__ import annotations + +import os +import tempfile +import time +from typing import Callable, Optional, Tuple + +import numpy as np +from PIL import Image +from sqlalchemy.orm import Session + +try: + from deepface import DeepFace + DEEPFACE_AVAILABLE = True +except ImportError: + DEEPFACE_AVAILABLE = False + +from src.core.config import ( + DEEPFACE_ENFORCE_DETECTION, + DEEPFACE_ALIGN_FACES, + MIN_FACE_CONFIDENCE, + MIN_FACE_SIZE, + MAX_FACE_SIZE, +) +from src.utils.exif_utils import EXIFOrientationHandler +from src.web.db.models import Face, Photo + + +def calculate_face_quality_score( + image_np: np.ndarray, + face_location: dict, + image_width: int, + image_height: int, +) -> int: + """Calculate face quality score (0-100). + + Simplified quality calculation based on face size and position. + + Args: + image_np: Image as numpy array + face_location: Face location dict with x, y, w, h + image_width: Image width + image_height: Image height + + Returns: + Quality score from 0-100 + """ + x = face_location.get('x', 0) + y = face_location.get('y', 0) + w = face_location.get('w', 0) + h = face_location.get('h', 0) + + if w == 0 or h == 0: + return 0 + + # Face size as percentage of image + face_area = w * h + image_area = image_width * image_height + size_ratio = face_area / image_area if image_area > 0 else 0 + + # Position score (center is better) + center_x = image_width / 2 + center_y = image_height / 2 + face_center_x = x + w / 2 + face_center_y = y + h / 2 + + distance_from_center = np.sqrt( + (face_center_x - center_x) ** 2 + (face_center_y - center_y) ** 2 + ) + max_distance = np.sqrt(center_x ** 2 + center_y ** 2) + position_score = 1.0 - (distance_from_center / max_distance) if max_distance > 0 else 0.5 + + # Combine size and position (size weighted 70%, position 30%) + quality = (size_ratio * 70) + (position_score * 30) + + # Clamp to 0-100 + return int(np.clip(quality * 100, 0, 100)) + + +def is_valid_face_detection( + confidence: float, + face_location: dict, + image_width: int, + image_height: int, +) -> bool: + """Check if face detection meets minimum criteria. + + Args: + confidence: Face detection confidence score + face_location: Face location dict with x, y, w, h + image_width: Image width + image_height: Image height + + Returns: + True if face is valid, False otherwise + """ + x = face_location.get('x', 0) + y = face_location.get('y', 0) + w = face_location.get('w', 0) + h = face_location.get('h', 0) + + # Check minimum confidence + if confidence < MIN_FACE_CONFIDENCE: + return False + + # Check minimum size + if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE: + return False + + # Check maximum size (to avoid false positives that span entire image) + if w > MAX_FACE_SIZE or h > MAX_FACE_SIZE: + return False + + # Check bounds + if x < 0 or y < 0 or (x + w) > image_width or (y + h) > image_height: + return False + + return True + + +def process_photo_faces( + db: Session, + photo: Photo, + detector_backend: str = "retinaface", + model_name: str = "ArcFace", + update_progress: Optional[Callable[[int, int, str], None]] = None, +) -> Tuple[int, int]: + """Process faces in a single photo using DeepFace. + + Args: + db: Database session + photo: Photo model instance + detector_backend: DeepFace detector backend (retinaface, mtcnn, opencv, ssd) + model_name: DeepFace model name (ArcFace, Facenet, Facenet512, VGG-Face) + update_progress: Optional progress callback (processed, total, message) + + Returns: + Tuple of (faces_detected, faces_stored) + """ + if not DEEPFACE_AVAILABLE: + raise RuntimeError("DeepFace not available") + + photo_path = photo.path + if not os.path.exists(photo_path): + return 0, 0 + + # Check if photo already has faces processed with same detector/model + existing_faces = db.query(Face).filter( + Face.photo_id == photo.id, + Face.detector == detector_backend, + Face.model == model_name, + ).count() + + if existing_faces > 0: + # Already processed with this configuration + return existing_faces, existing_faces + + try: + # Get EXIF orientation + exif_orientation = EXIFOrientationHandler.get_exif_orientation(photo_path) + + # Apply EXIF orientation correction + corrected_image, original_orientation = ( + EXIFOrientationHandler.correct_image_orientation_from_path(photo_path) + ) + + temp_path = None + if corrected_image is not None and original_orientation and original_orientation != 1: + # Save corrected image temporarily + temp_dir = tempfile.gettempdir() + temp_filename = f"corrected_{photo.id}_{os.path.basename(photo_path)}" + temp_path = os.path.join(temp_dir, temp_filename) + corrected_image.save(temp_path, "JPEG", quality=95) + face_detection_path = temp_path + else: + face_detection_path = photo_path + + try: + # Use DeepFace to detect faces and compute embeddings + # Note: First call may take time to download/initialize models + print(f"[DeepFace] Processing {photo.filename} with {detector_backend}/{model_name}...") + results = DeepFace.represent( + img_path=face_detection_path, + model_name=model_name, + detector_backend=detector_backend, + enforce_detection=DEEPFACE_ENFORCE_DETECTION, + align=DEEPFACE_ALIGN_FACES, + ) + print(f"[DeepFace] Completed {photo.filename}") + except Exception as e: + print(f"[DeepFace] Error processing {photo.filename}: {e}") + raise + finally: + # Clean up temporary file if created + if temp_path and os.path.exists(temp_path): + try: + os.remove(temp_path) + except Exception: + pass + + if not results: + return 0, 0 + + # Load image for quality calculation + image = Image.open(photo_path) + image_np = np.array(image) + image_width, image_height = image.size + + faces_detected = len(results) + faces_stored = 0 + + for result in results: + facial_area = result.get('facial_area', {}) + face_confidence = result.get('face_confidence', 0.0) + embedding = np.array(result['embedding']) + + # Convert to location format + location = { + 'x': facial_area.get('x', 0), + 'y': facial_area.get('y', 0), + 'w': facial_area.get('w', 0), + 'h': facial_area.get('h', 0), + } + + # Validate face detection + if not is_valid_face_detection(face_confidence, location, image_width, image_height): + continue + + # Calculate quality score + quality_score = calculate_face_quality_score( + image_np, location, image_width, image_height + ) + + # Store face in database + face = Face( + photo_id=photo.id, + person_id=None, + bbox_x=location['x'], + bbox_y=location['y'], + bbox_w=location['w'], + bbox_h=location['h'], + embedding=embedding.tobytes(), + confidence=int(face_confidence * 100) if face_confidence <= 1.0 else int(face_confidence), + quality=quality_score, + model=model_name, + detector=detector_backend, + ) + + db.add(face) + faces_stored += 1 + + db.commit() + return faces_detected, faces_stored + + except Exception as e: + db.rollback() + raise Exception(f"Error processing faces in {photo.filename}: {str(e)}") + + +def process_unprocessed_photos( + db: Session, + batch_size: Optional[int] = None, + detector_backend: str = "retinaface", + model_name: str = "ArcFace", + update_progress: Optional[Callable[[int, int, str, int, int], None]] = None, +) -> Tuple[int, int, int]: + """Process faces in all unprocessed photos. + + Args: + db: Database session + batch_size: Maximum number of photos to process (None = all) + detector_backend: DeepFace detector backend + model_name: DeepFace model name + update_progress: Optional callback (processed, total, current_file, faces_detected, faces_stored) + + Returns: + Tuple of (photos_processed, total_faces_detected, total_faces_stored) + """ + print(f"[FaceService] Starting face processing: detector={detector_backend}, model={model_name}, batch_size={batch_size}") + + # Update progress - querying photos + if update_progress: + update_progress(0, 0, "Querying photos from database...", 0, 0) + + # Get all photos + all_photos = db.query(Photo).all() + print(f"[FaceService] Found {len(all_photos)} total photos in database") + + # Update progress - filtering photos + if update_progress: + update_progress(0, len(all_photos), "Checking which photos need processing...", 0, 0) + + # Filter for photos that need processing (no faces with current detector/model) + unprocessed_photos = [] + for idx, photo in enumerate(all_photos, 1): + # Check if photo has faces with current detector/model + existing_face = db.query(Face).filter( + Face.photo_id == photo.id, + Face.detector == detector_backend, + Face.model == model_name, + ).first() + + if existing_face is None: + unprocessed_photos.append(photo) + + # Update progress every 10 photos while filtering + if update_progress and idx % 10 == 0: + update_progress(0, len(all_photos), f"Checking photos... ({idx}/{len(all_photos)})", 0, 0) + + if batch_size: + unprocessed_photos = unprocessed_photos[:batch_size] + + total = len(unprocessed_photos) + print(f"[FaceService] Found {total} unprocessed photos") + if total == 0: + print("[FaceService] No photos to process") + if update_progress: + update_progress(0, 0, "No photos to process", 0, 0) + return 0, 0, 0 + + # Update progress - preparing to process + if update_progress: + update_progress(0, total, f"Preparing to process {total} photos...", 0, 0) + + photos_processed = 0 + total_faces_detected = 0 + total_faces_stored = 0 + + print(f"[FaceService] Starting processing of {total} photos...") + + # Helper to check if job was cancelled + def check_cancelled() -> bool: + """Check if job has been cancelled.""" + if update_progress: + # Try to check job metadata for cancellation flag + try: + from rq import get_current_job + job = get_current_job() + if job and job.meta and job.meta.get("cancelled", False): + return True + except Exception: + pass + return False + + # Update progress - initializing DeepFace (this may take time on first run) + if update_progress: + update_progress(0, total, "Initializing DeepFace models (this may take a moment on first run)...", 0, 0) + + # Check cancellation before starting + if check_cancelled(): + print("[FaceService] Job cancelled before processing started") + return photos_processed, total_faces_detected, total_faces_stored + + # Process first photo - this will trigger DeepFace initialization + # Update progress before starting actual processing + if update_progress and total > 0: + update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0) + + for idx, photo in enumerate(unprocessed_photos, 1): + # Check for cancellation + if check_cancelled(): + print(f"[FaceService] Job cancelled at photo {idx}/{total}") + if update_progress: + update_progress( + idx - 1, + total, + "Cancelled by user", + total_faces_detected, + total_faces_stored, + ) + break + + try: + # Update progress before processing each photo + if update_progress: + update_progress( + idx - 1, + total, + f"Processing {photo.filename}... ({idx}/{total})", + total_faces_detected, + total_faces_stored, + ) + + faces_detected, faces_stored = process_photo_faces( + db, + photo, + detector_backend=detector_backend, + model_name=model_name, + ) + + total_faces_detected += faces_detected + total_faces_stored += faces_stored + photos_processed += 1 + + if update_progress: + update_progress( + idx, + total, + f"Completed {photo.filename} ({idx}/{total})", + total_faces_detected, + total_faces_stored, + ) + except Exception as e: + # Log error but continue processing other photos + print(f"[FaceService] Error processing photo {photo.filename}: {e}") + import traceback + traceback.print_exc() + if update_progress: + update_progress( + idx, + total, + f"Error: {photo.filename}", + total_faces_detected, + total_faces_stored, + ) + + return photos_processed, total_faces_detected, total_faces_stored + diff --git a/src/web/services/tasks.py b/src/web/services/tasks.py index d192d34..0152333 100644 --- a/src/web/services/tasks.py +++ b/src/web/services/tasks.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import Session from src.web.db.session import SessionLocal from src.web.services.photo_service import import_photos_from_folder +from src.web.services.face_service import process_unprocessed_photos def import_photos_task(folder_path: str, recursive: bool = True) -> dict: @@ -72,3 +73,183 @@ def import_photos_task(folder_path: str, recursive: bool = True) -> dict: finally: db.close() + +def process_faces_task( + batch_size: Optional[int] = None, + detector_backend: str = "retinaface", + model_name: str = "ArcFace", +) -> dict: + """RQ task to process faces in unprocessed photos. + + Updates job metadata with progress: + - progress: 0-100 + - message: status message + - processed: number of photos processed + - total: total photos to process + - faces_detected: total faces detected + - faces_stored: total faces stored + """ + import traceback + + job = get_current_job() + if not job: + raise RuntimeError("Not running in RQ job context") + + print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}") + + # Update progress immediately - job started + try: + if job: + job.meta = { + "progress": 0, + "message": "Initializing face processing...", + "processed": 0, + "total": 0, + "faces_detected": 0, + "faces_stored": 0, + } + job.save_meta() + except Exception as e: + print(f"[Task] Error setting initial job metadata: {e}") + + db: Session = SessionLocal() + + # Initialize result variables + photos_processed = 0 + total_faces_detected = 0 + total_faces_stored = 0 + + try: + def update_progress( + processed: int, + total: int, + current_file: str, + faces_detected: int, + faces_stored: int, + ) -> None: + """Update job progress and check for cancellation.""" + if job: + # Check if job was cancelled + if job.meta and job.meta.get("cancelled", False): + return # Don't update if cancelled + + # Calculate progress: 10% for setup, 90% for processing + if total == 0: + # Setup phase + progress = min(10, processed * 2) # 0-10% during setup + else: + # Processing phase + progress = 10 + int((processed / total) * 90) if total > 0 else 10 + + job.meta = { + "progress": progress, + "message": f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file, + "processed": processed, + "total": total, + "faces_detected": faces_detected, + "faces_stored": faces_stored, + } + job.save_meta() + + # Check for cancellation after updating + if job.meta and job.meta.get("cancelled", False): + print(f"[Task] Job {job.id} cancellation detected") + raise KeyboardInterrupt("Job cancelled by user") + + # Update progress - finding photos + if job: + job.meta = { + "progress": 5, + "message": "Finding photos to process...", + "processed": 0, + "total": 0, + "faces_detected": 0, + "faces_stored": 0, + } + job.save_meta() + + # Process faces + photos_processed, total_faces_detected, total_faces_stored = ( + process_unprocessed_photos( + db, + batch_size=batch_size, + detector_backend=detector_backend, + model_name=model_name, + update_progress=update_progress, + ) + ) + + # Final update + result = { + "photos_processed": photos_processed, + "faces_detected": total_faces_detected, + "faces_stored": total_faces_stored, + "detector_backend": detector_backend, + "model_name": model_name, + } + + if job: + job.meta = { + "progress": 100, + "message": ( + f"Completed: {photos_processed} photos, " + f"{total_faces_stored} faces stored" + ), + "processed": photos_processed, + "total": photos_processed, + "faces_detected": total_faces_detected, + "faces_stored": total_faces_stored, + } + job.save_meta() + + return result + + except KeyboardInterrupt as e: + # Job was cancelled - exit gracefully + print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user") + if job: + try: + job.meta = job.meta or {} + job.meta.update({ + "message": "Cancelled by user", + "cancelled": True, + }) + job.save_meta() + except Exception: + pass + # Don't re-raise - job cancellation is not a failure + return { + "photos_processed": photos_processed, + "faces_detected": total_faces_detected, + "faces_stored": total_faces_stored, + "detector_backend": detector_backend, + "model_name": model_name, + "cancelled": True, + } + + except Exception as e: + # Log error and update job metadata + error_msg = f"Task failed: {str(e)}" + print(f"[Task] ❌ {error_msg}") + traceback.print_exc() + + if job: + try: + job.meta = { + "progress": 0, + "message": error_msg, + "processed": 0, + "total": 0, + "faces_detected": 0, + "faces_stored": 0, + } + job.save_meta() + except Exception: + pass + + # Re-raise so RQ marks job as failed + raise + + finally: + db.close() + diff --git a/src/web/worker.py b/src/web/worker.py index b698b1b..3aacdd2 100644 --- a/src/web/worker.py +++ b/src/web/worker.py @@ -11,7 +11,7 @@ import uuid from rq import Worker from redis import Redis -from src.web.services.tasks import import_photos_task +from src.web.services.tasks import import_photos_task, process_faces_task # Redis connection for RQ redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) @@ -28,6 +28,17 @@ def main() -> NoReturn: # Generate unique worker name to avoid conflicts worker_name = f"punimtag-worker-{uuid.uuid4().hex[:8]}" + print(f"[Worker] Starting worker: {worker_name}") + print(f"[Worker] Listening on queue: default") + + # Check if Redis is accessible + try: + redis_conn.ping() + print(f"[Worker] Redis connection successful") + except Exception as e: + print(f"[Worker] ❌ Redis connection failed: {e}") + sys.exit(1) + # Register tasks with worker # Tasks are imported from services.tasks worker = Worker( @@ -35,6 +46,8 @@ def main() -> NoReturn: connection=redis_conn, name=worker_name, ) + + print(f"[Worker] ✅ Worker ready, waiting for jobs...") # Start worker worker.work()