feat: Integrate DeepFace for face processing with configurable options

This commit introduces the DeepFace integration for face processing, allowing users to configure detector backends and models through the new Process tab in the GUI. Key features include batch processing, job cancellation support, and real-time progress tracking. The README has been updated to reflect these enhancements, including instructions for automatic model downloads and handling of processing-intensive tasks. Additionally, the API has been expanded to support job management for face processing tasks, ensuring a robust user experience.
This commit is contained in:
tanyar09 2025-10-31 14:06:40 -04:00
parent 2f039a1d48
commit dd92d1ec14
12 changed files with 1325 additions and 45 deletions

View File

@ -125,6 +125,8 @@ Then open your browser to **http://localhost:3000**
- Make sure Redis is running first, or the worker won't start
- Worker names are unique to avoid conflicts when restarting
- Photo uploads are stored in `data/uploads` (configurable via `PHOTO_STORAGE_DIR` env var)
- **DeepFace models download automatically on first use** (can take 5-10 minutes)
- **If port 8000 is in use**, kill the process: `lsof -i :8000` then `kill <PID>` or `pkill -f "uvicorn.*app"`
---
@ -141,6 +143,11 @@ Then open your browser to **http://localhost:3000**
- Real-time job status updates (SSE)
- Duplicate detection by checksum
- EXIF metadata extraction
- DeepFace face detection and recognition pipeline
- Configurable detectors (RetinaFace, MTCNN, OpenCV, SSD)
- Configurable models (ArcFace, Facenet, Facenet512, VGG-Face)
- Process tab UI for face processing
- Job cancellation support
---
@ -199,7 +206,7 @@ punimtag/
- ✅ Indices configured for performance
- ✅ SQLite database at `data/punimtag.db`
### Phase 2: Image Ingestion & Scan Tab ✅ **COMPLETE**
### Phase 2: Image Ingestion & Processing ✅ **COMPLETE**
**Backend:**
- ✅ Photo import service with checksum computation
@ -210,6 +217,14 @@ punimtag/
- ✅ Real-time job progress via SSE (Server-Sent Events)
- ✅ Duplicate detection (by path and checksum)
- ✅ Photo storage configuration (`PHOTO_STORAGE_DIR`)
- ✅ **DeepFace pipeline integration**
- ✅ **Face detection (RetinaFace, MTCNN, OpenCV, SSD)**
- ✅ **Face embeddings computation (ArcFace, Facenet, Facenet512, VGG-Face)**
- ✅ **Face processing service with configurable detectors/models**
- ✅ **EXIF orientation handling**
- ✅ **Face quality scoring and validation**
- ✅ **Batch processing with progress tracking**
- ✅ **Job cancellation support**
**Frontend:**
- ✅ Scan tab UI with folder selection
@ -219,20 +234,27 @@ punimtag/
- ✅ Job status monitoring (SSE integration)
- ✅ Results display (added/existing counts)
- ✅ Error handling and user feedback
- ✅ **Process tab UI with configuration controls**
- ✅ **Detector/model selection dropdowns**
- ✅ **Batch size configuration**
- ✅ **Start/Stop processing controls**
- ✅ **Processing progress display with photo count**
- ✅ **Results summary (faces detected, faces stored)**
- ✅ **Job cancellation support**
**Worker:**
- ✅ RQ worker auto-starts with API server
- ✅ Unique worker names to avoid conflicts
- ✅ Graceful shutdown handling
- ✅ **String-based function paths for reliable serialization**
### Next: Phase 3 - Face Processing & Identify
### Next: Phase 3 - Identify Workflow & Auto-Match
- DeepFace pipeline integration
- Face detection (RetinaFace, MTCNN, OpenCV, SSD)
- Face embeddings computation (ArcFace, Facenet, etc.)
- Identify workflow UI
- Auto-match engine
- Process tab implementation
- Auto-match engine with similarity thresholds
- Unidentified faces management
- Person creation and linking
- Batch identification support
---
@ -295,18 +317,23 @@ npm test
- Database setup
- Basic API endpoints
### ✅ Phase 2: Image Ingestion & Scan Tab (Complete)
### ✅ Phase 2: Image Ingestion & Processing (Complete)
- ✅ Photo import (folder scan and file upload)
- ✅ Background job processing with RQ
- ✅ Real-time progress tracking via SSE
- ✅ Scan tab UI implementation
- ✅ Duplicate detection and metadata extraction
- ✅ DeepFace face detection and processing pipeline
- ✅ Process tab UI with configuration controls
- ✅ Configurable detectors and models
- ✅ Face processing with progress tracking
- ✅ Job cancellation support
### 🔄 Phase 3: Processing & Identify (In Progress)
- Face detection and processing pipeline (DeepFace)
### 🔄 Phase 3: Identify Workflow & Auto-Match (In Progress)
- Identify workflow UI
- Auto-match engine
- Process tab implementation
- Auto-match engine with similarity thresholds
- Unidentified faces management
- Person creation and linking
### 📋 Phase 4: Search & Tags
- Search endpoints with filters
@ -387,6 +414,8 @@ npm test
- No password hashing yet (plain text comparison - fix before production)
- GPU acceleration not yet implemented
- Large databases (>50K photos) may require optimization
- DeepFace model downloads on first use (can take 5-10 minutes)
- Face processing is CPU-intensive (GPU support planned for future)
---

View File

@ -144,7 +144,7 @@ Visual design: neutral palette, clear hierarchy, low-chrome UI, subtle motion (<
- Auth flow (login page, token storage), API client with interceptors
- Routes: Dashboard (placeholder), Scan (placeholder), Process (placeholder), Search (placeholder), Identify (placeholder), Auto-Match (placeholder), Tags (placeholder), Settings (placeholder)
### Phase 2: Processing & Identify (23 weeks)
### Phase 2: Processing & Identify (23 weeks) ✅ **IN PROGRESS**
- Image ingestion:
- Backend: `/photos/import` supports folder ingest and file upload
- Compute checksums; store originals on disk; create DB rows
@ -156,18 +156,22 @@ Visual design: neutral palette, clear hierarchy, low-chrome UI, subtle motion (<
- Display scan results (count of photos added)
- Trigger background job for photo import
- Real-time job status with SSE progress updates
- DeepFace pipeline:
- Worker task: detect faces (RetinaFace), compute embeddings (ArcFace); store embeddings as binary
- Persist face bounding boxes, confidence, quality; link to photos
- Configurable batch size and thresholds via settings
- **Process tab UI:**
- Start/stop face processing controls
- Batch size configuration
- Detector/model selection dropdowns (RetinaFace, MTCNN, OpenCV, SSD / ArcFace, Facenet, etc.)
- Processing progress bar with current photo count
- Job status display (pending, running, completed, failed)
- Results summary (faces detected, processing time)
- Error handling and retry mechanism
- ✅ **DeepFace pipeline:**
- ✅ Worker task: detect faces (RetinaFace), compute embeddings (ArcFace); store embeddings as binary
- ✅ Persist face bounding boxes, confidence, quality; link to photos
- ✅ Configurable batch size and thresholds via settings
- ✅ EXIF orientation handling
- ✅ Face quality scoring and validation
- ✅ **Process tab UI:**
- ✅ Start/stop face processing controls
- ✅ Batch size configuration
- ✅ Detector/model selection dropdowns (RetinaFace, MTCNN, OpenCV, SSD / ArcFace, Facenet, etc.)
- ✅ Processing progress bar with current photo count
- ✅ Job status display (pending, running, completed, failed)
- ✅ Results summary (faces detected, processing time)
- ✅ Error handling and retry mechanism
- ✅ Job cancellation support
- ✅ Real-time progress updates via SSE
- Identify workflow:
- API: `/faces/unidentified`, `/faces/{id}/identify` (assign existing/new person)
- Implement person creation and linking, plus `person_embeddings` insertions

29
frontend/src/api/faces.ts Normal file
View File

@ -0,0 +1,29 @@
import apiClient from './client'
import { JobResponse } from './jobs'
export interface ProcessFacesRequest {
batch_size?: number
detector_backend: string
model_name: string
}
export interface ProcessFacesResponse {
job_id: string
message: string
batch_size?: number
detector_backend: string
model_name: string
}
export const facesApi = {
/**
* Start face processing job
*/
processFaces: async (request: ProcessFacesRequest): Promise<ProcessFacesResponse> => {
const response = await apiClient.post<ProcessFacesResponse>('/api/v1/faces/process', request)
return response.data
},
}
export default facesApi

View File

@ -24,5 +24,17 @@ export const jobsApi = {
)
return data
},
streamJobProgress: (jobId: string): EventSource => {
const baseURL = import.meta.env.VITE_API_URL || 'http://127.0.0.1:8000'
return new EventSource(`${baseURL}/api/v1/jobs/stream/${jobId}`)
},
cancelJob: async (jobId: string): Promise<{ message: string; status: string }> => {
const { data } = await apiClient.delete<{ message: string; status: string }>(
`/api/v1/jobs/${jobId}`
)
return data
},
}

View File

@ -1,12 +1,427 @@
import { useState, useRef, useCallback, useEffect } from 'react'
import { facesApi, ProcessFacesRequest } from '../api/faces'
import { jobsApi, JobResponse, JobStatus } from '../api/jobs'
interface JobProgress {
id: string
status: string
progress: number
message: string
processed?: number
total?: number
faces_detected?: number
faces_stored?: number
}
const DETECTOR_OPTIONS = ['retinaface', 'mtcnn', 'opencv', 'ssd']
const MODEL_OPTIONS = ['ArcFace', 'Facenet', 'Facenet512', 'VGG-Face']
export default function Process() {
const [batchSize, setBatchSize] = useState<number | undefined>(undefined)
const [detectorBackend, setDetectorBackend] = useState('retinaface')
const [modelName, setModelName] = useState('ArcFace')
const [isProcessing, setIsProcessing] = useState(false)
const [currentJob, setCurrentJob] = useState<JobResponse | null>(null)
const [jobProgress, setJobProgress] = useState<JobProgress | null>(null)
const [processingResult, setProcessingResult] = useState<{
photos_processed?: number
faces_detected?: number
faces_stored?: number
} | null>(null)
const [error, setError] = useState<string | null>(null)
const eventSourceRef = useRef<EventSource | null>(null)
// Cleanup event source on unmount
useEffect(() => {
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close()
}
}
}, [])
const handleStartProcessing = async () => {
setIsProcessing(true)
setError(null)
setProcessingResult(null)
setCurrentJob(null)
setJobProgress(null)
try {
const request: ProcessFacesRequest = {
batch_size: batchSize || undefined,
detector_backend: detectorBackend,
model_name: modelName,
}
const response = await facesApi.processFaces(request)
// Set processing state immediately
setIsProcessing(true)
setCurrentJob({
id: response.job_id,
status: JobStatus.PENDING,
progress: 0,
message: response.message,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
})
// Start SSE stream for job progress
startJobProgressStream(response.job_id)
} catch (err: any) {
setError(err.response?.data?.detail || err.message || 'Processing failed')
setIsProcessing(false)
}
}
const handleStopProcessing = async () => {
if (!currentJob) {
return
}
try {
// Call API to cancel the job
const result = await jobsApi.cancelJob(currentJob.id)
console.log('Job cancellation:', result)
// Close SSE stream
if (eventSourceRef.current) {
eventSourceRef.current.close()
eventSourceRef.current = null
}
// Update UI state
setIsProcessing(false)
setError(`Job cancelled: ${result.message}`)
// Update job status
if (currentJob) {
setCurrentJob({
...currentJob,
status: JobStatus.FAILURE,
message: 'Cancelled by user',
})
}
} catch (err: any) {
console.error('Error cancelling job:', err)
setError(err.response?.data?.detail || err.message || 'Failed to cancel job')
}
}
const startJobProgressStream = (jobId: string) => {
// Close existing stream if any
if (eventSourceRef.current) {
eventSourceRef.current.close()
}
const eventSource = jobsApi.streamJobProgress(jobId)
eventSourceRef.current = eventSource
eventSource.onmessage = (event) => {
try {
const data: JobProgress = JSON.parse(event.data)
setJobProgress(data)
// Update job status
const statusMap: Record<string, JobStatus> = {
pending: JobStatus.PENDING,
started: JobStatus.STARTED,
progress: JobStatus.PROGRESS,
success: JobStatus.SUCCESS,
failure: JobStatus.FAILURE,
}
const jobStatus = statusMap[data.status] || JobStatus.PENDING
setCurrentJob({
id: data.id,
status: jobStatus,
progress: data.progress,
message: data.message,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
})
// Keep processing state true while job is running
if (jobStatus === JobStatus.STARTED || jobStatus === JobStatus.PROGRESS) {
setIsProcessing(true)
}
// Check if job is complete
if (jobStatus === JobStatus.SUCCESS || jobStatus === JobStatus.FAILURE) {
setIsProcessing(false)
eventSource.close()
eventSourceRef.current = null
// Fetch final job result to get processing stats
if (jobStatus === JobStatus.SUCCESS) {
fetchJobResult(jobId)
}
}
} catch (err) {
console.error('Error parsing SSE event:', err)
}
}
eventSource.onerror = (err) => {
console.error('SSE error:', err)
// Don't automatically set isProcessing to false on error
// Job might still be running even if SSE connection failed
// Check job status directly instead
if (currentJob) {
// Try to fetch job status directly
jobsApi.getJob(currentJob.id).then((job) => {
const stillRunning = job.status === JobStatus.STARTED || job.status === JobStatus.PROGRESS
setIsProcessing(stillRunning)
setCurrentJob(job)
}).catch(() => {
// If we can't get status, assume job might still be running
console.warn('Could not fetch job status after SSE error')
})
}
}
}
const fetchJobResult = async (jobId: string) => {
try {
const job = await jobsApi.getJob(jobId)
setCurrentJob(job)
// Extract result data from job progress
if (jobProgress) {
setProcessingResult({
photos_processed: jobProgress.processed,
faces_detected: jobProgress.faces_detected,
faces_stored: jobProgress.faces_stored,
})
}
} catch (err) {
console.error('Error fetching job result:', err)
}
}
const getStatusColor = (status: JobStatus) => {
switch (status) {
case JobStatus.SUCCESS:
return 'text-green-600'
case JobStatus.FAILURE:
return 'text-red-600'
case JobStatus.STARTED:
case JobStatus.PROGRESS:
return 'text-blue-600'
default:
return 'text-gray-600'
}
}
return (
<div>
<h1 className="text-2xl font-bold text-gray-900 mb-4">Process</h1>
<div className="bg-white rounded-lg shadow p-6">
<p className="text-gray-600">Face processing controls coming in Phase 2.</p>
<div className="p-6">
<h1 className="text-2xl font-bold text-gray-900 mb-6">Process Faces</h1>
<div className="space-y-6">
{/* Configuration Section */}
<div className="bg-white rounded-lg shadow p-6">
<h2 className="text-lg font-semibold text-gray-900 mb-4">
Processing Configuration
</h2>
<div className="space-y-4">
{/* Batch Size */}
<div>
<label
htmlFor="batch-size"
className="block text-sm font-medium text-gray-700 mb-2"
>
Batch Size
</label>
<input
id="batch-size"
type="number"
min="1"
value={batchSize || ''}
onChange={(e) =>
setBatchSize(
e.target.value ? parseInt(e.target.value, 10) : undefined
)
}
placeholder="All unprocessed photos"
className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500"
disabled={isProcessing}
/>
<p className="mt-1 text-sm text-gray-500">
Leave empty to process all unprocessed photos
</p>
</div>
{/* Detector Backend */}
<div>
<label
htmlFor="detector-backend"
className="block text-sm font-medium text-gray-700 mb-2"
>
Face Detector
</label>
<select
id="detector-backend"
value={detectorBackend}
onChange={(e) => setDetectorBackend(e.target.value)}
className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500"
disabled={isProcessing}
>
{DETECTOR_OPTIONS.map((option) => (
<option key={option} value={option}>
{option.charAt(0).toUpperCase() + option.slice(1)}
</option>
))}
</select>
<p className="mt-1 text-sm text-gray-500">
RetinaFace recommended for best accuracy
</p>
</div>
{/* Model Name */}
<div>
<label
htmlFor="model-name"
className="block text-sm font-medium text-gray-700 mb-2"
>
Recognition Model
</label>
<select
id="model-name"
value={modelName}
onChange={(e) => setModelName(e.target.value)}
className="w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500"
disabled={isProcessing}
>
{MODEL_OPTIONS.map((option) => (
<option key={option} value={option}>
{option}
</option>
))}
</select>
<p className="mt-1 text-gray-500">
ArcFace recommended for best accuracy
</p>
</div>
{/* Control Buttons */}
<div className="flex gap-2 pt-4">
<button
type="button"
onClick={handleStartProcessing}
disabled={isProcessing}
className="flex-1 px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500 disabled:opacity-50 disabled:cursor-not-allowed"
>
{isProcessing ? 'Processing...' : 'Start Processing'}
</button>
{isProcessing && (
<button
type="button"
onClick={handleStopProcessing}
className="px-4 py-2 bg-red-600 text-white rounded-md hover:bg-red-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-red-500"
>
Stop
</button>
)}
</div>
</div>
</div>
{/* Progress Section */}
{(currentJob || jobProgress) && (
<div className="bg-white rounded-lg shadow p-6">
<h2 className="text-lg font-semibold text-gray-900 mb-4">
Processing Progress
</h2>
{currentJob && (
<div className="space-y-4">
<div>
<div className="flex justify-between items-center mb-2">
<span
className={`text-sm font-medium ${getStatusColor(
currentJob.status
)}`}
>
{currentJob.status === JobStatus.SUCCESS && '✓ '}
{currentJob.status === JobStatus.FAILURE && '✗ '}
{currentJob.status.charAt(0).toUpperCase() +
currentJob.status.slice(1)}
</span>
<span className="text-sm text-gray-600">
{currentJob.progress}%
</span>
</div>
<div className="w-full bg-gray-200 rounded-full h-2">
<div
className="bg-blue-600 h-2 rounded-full transition-all duration-300"
style={{ width: `${currentJob.progress}%` }}
/>
</div>
</div>
{jobProgress && (
<div className="space-y-2 text-sm text-gray-600">
{jobProgress.processed !== undefined &&
jobProgress.total !== undefined && (
<p>
Photos processed: {jobProgress.processed} /{' '}
{jobProgress.total}
</p>
)}
{jobProgress.faces_detected !== undefined && (
<p>Faces detected: {jobProgress.faces_detected}</p>
)}
{jobProgress.faces_stored !== undefined && (
<p>Faces stored: {jobProgress.faces_stored}</p>
)}
{jobProgress.message && (
<p className="mt-1 font-medium">{jobProgress.message}</p>
)}
</div>
)}
</div>
)}
</div>
)}
{/* Results Section */}
{processingResult && (
<div className="bg-white rounded-lg shadow p-6">
<h2 className="text-lg font-semibold text-gray-900 mb-4">
Processing Results
</h2>
<div className="space-y-2 text-sm">
{processingResult.photos_processed !== undefined && (
<p className="text-green-600">
{processingResult.photos_processed} photos processed
</p>
)}
{processingResult.faces_detected !== undefined && (
<p className="text-gray-600">
{processingResult.faces_detected} faces detected
</p>
)}
{processingResult.faces_stored !== undefined && (
<p className="text-gray-700 font-medium">
{processingResult.faces_stored} faces stored in database
</p>
)}
</div>
</div>
)}
{/* Error Section */}
{error && (
<div className="bg-red-50 border border-red-200 rounded-lg p-4">
<p className="text-sm text-red-800">{error}</p>
</div>
)}
</div>
</div>
)
}

View File

@ -2,15 +2,65 @@
from __future__ import annotations
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException, status
from rq import Queue
from redis import Redis
from src.web.schemas.faces import ProcessFacesRequest, ProcessFacesResponse
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/faces", tags=["faces"])
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
queue = Queue(connection=redis_conn)
@router.post("/process")
def process_faces() -> dict:
"""Process faces - placeholder for Phase 2."""
return {"message": "Process faces endpoint - to be implemented in Phase 2"}
@router.post("/process", response_model=ProcessFacesResponse)
def process_faces(request: ProcessFacesRequest) -> ProcessFacesResponse:
"""Start face processing job.
This enqueues a background job to process faces in unprocessed photos
using DeepFace with the specified detector and model.
"""
try:
# Check if worker is available (basic check)
try:
redis_conn.ping()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Redis connection failed: {str(e)}",
)
# Enqueue face processing job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
"src.web.services.tasks.process_faces_task",
batch_size=request.batch_size,
detector_backend=request.detector_backend,
model_name=request.model_name,
job_timeout="1h", # Long timeout for face processing
)
print(f"[Faces API] Enqueued face processing job: {job.id}")
print(f"[Faces API] Job status: {job.get_status()}")
print(f"[Faces API] Queue length: {len(queue)}")
return ProcessFacesResponse(
job_id=job.id,
message="Face processing job started",
batch_size=request.batch_size,
detector_backend=request.detector_backend,
model_name=request.model_name,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start face processing job: {str(e)}",
)
@router.get("/unidentified")

View File

@ -26,19 +26,39 @@ def get_job(job_id: str) -> JobResponse:
"""Get job status by ID."""
try:
job = Job.fetch(job_id, connection=redis_conn)
rq_status = job.get_status()
status_map = {
"queued": JobStatus.PENDING,
"started": JobStatus.STARTED,
"started": JobStatus.STARTED, # Job is actively running
"finished": JobStatus.SUCCESS,
"failed": JobStatus.FAILURE,
}
job_status = status_map.get(job.get_status(), JobStatus.PENDING)
progress = 0
if job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS:
job_status = status_map.get(rq_status, JobStatus.PENDING)
# If job is started, check if it has progress
if rq_status == "started":
# Job is running - show progress if available
progress = job.meta.get("progress", 0) if job.meta else 0
message = job.meta.get("message", "Processing...") if job.meta else "Processing..."
# Map to PROGRESS status if we have actual progress
if progress > 0:
job_status = JobStatus.PROGRESS
elif job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS:
progress = job.meta.get("progress", 0) if job.meta else 0
elif job_status == JobStatus.SUCCESS:
progress = 100
else:
progress = 0
message = job.meta.get("message", "") if job.meta else ""
# If job failed, include error message
if rq_status == "failed" and job.exc_info:
# Extract error message from exception info
error_lines = job.exc_info.split("\n")
if error_lines:
message = f"Failed: {error_lines[0]}"
return JobResponse(
id=job.id,
status=job_status,
@ -49,10 +69,10 @@ def get_job(job_id: str) -> JobResponse:
str(job.ended_at or job.started_at or job.created_at)
),
)
except Exception:
except Exception as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job {job_id} not found",
detail=f"Job {job_id} not found: {str(e)}",
)
@ -95,6 +115,8 @@ def stream_job_progress(job_id: str):
"message": message,
"processed": job.meta.get("processed", 0) if job.meta else 0,
"total": job.meta.get("total", 0) if job.meta else 0,
"faces_detected": job.meta.get("faces_detected", 0) if job.meta else 0,
"faces_stored": job.meta.get("faces_stored", 0) if job.meta else 0,
}
yield f"data: {json.dumps(event_data)}\n\n"
@ -116,3 +138,67 @@ def stream_job_progress(job_id: str):
event_generator(), media_type="text/event-stream"
)
@router.delete("/{job_id}")
def cancel_job(job_id: str) -> dict:
"""Cancel a job (if queued) or stop a running job.
Note: For running jobs, this sets a cancellation flag.
The job will check this flag and exit gracefully.
"""
try:
job = Job.fetch(job_id, connection=redis_conn)
rq_status = job.get_status()
if rq_status == "finished":
return {
"message": f"Job {job_id} is already finished",
"status": "finished",
}
if rq_status == "failed":
return {
"message": f"Job {job_id} already failed",
"status": "failed",
}
if rq_status == "queued":
# Cancel queued job - remove from queue
job.cancel()
return {
"message": f"Job {job_id} cancelled (was queued)",
"status": "cancelled",
}
if rq_status == "started":
# For running jobs, set cancellation flag in metadata
# The task will check this and exit gracefully
if job.meta is None:
job.meta = {}
job.meta["cancelled"] = True
job.meta["message"] = "Cancellation requested..."
job.save_meta()
# Also try to cancel the job (which will interrupt it if possible)
try:
job.cancel()
except Exception:
# Job might already be running, that's OK
pass
return {
"message": f"Job {job_id} cancellation requested",
"status": "cancelling",
}
return {
"message": f"Job {job_id} status: {rq_status}",
"status": rq_status,
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job {job_id} not found: {str(e)}",
)

View File

@ -22,7 +22,7 @@ from src.web.services.photo_service import (
find_photos_in_folder,
import_photo_from_path,
)
from src.web.services.tasks import import_photos_task
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/photos", tags=["photos"])
@ -60,8 +60,9 @@ def import_photos(
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
# Enqueue job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
import_photos_task,
"src.web.services.tasks.import_photos_task",
request.folder_path,
request.recursive,
job_timeout="1h", # Allow up to 1 hour for large imports

40
src/web/schemas/faces.py Normal file
View File

@ -0,0 +1,40 @@
"""Face processing schemas."""
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field, ConfigDict
class ProcessFacesRequest(BaseModel):
"""Request to process faces in photos."""
model_config = ConfigDict(protected_namespaces=())
batch_size: Optional[int] = Field(
None,
ge=1,
description="Maximum number of photos to process (None = all unprocessed)",
)
detector_backend: str = Field(
"retinaface",
description="DeepFace detector backend (retinaface, mtcnn, opencv, ssd)",
)
model_name: str = Field(
"ArcFace",
description="DeepFace model name (ArcFace, Facenet, Facenet512, VGG-Face)",
)
class ProcessFacesResponse(BaseModel):
"""Response after initiating face processing."""
model_config = ConfigDict(protected_namespaces=())
job_id: str
message: str
batch_size: Optional[int] = None
detector_backend: str
model_name: str

View File

@ -0,0 +1,420 @@
"""Face detection and processing services for PunimTag Web."""
from __future__ import annotations
import os
import tempfile
import time
from typing import Callable, Optional, Tuple
import numpy as np
from PIL import Image
from sqlalchemy.orm import Session
try:
from deepface import DeepFace
DEEPFACE_AVAILABLE = True
except ImportError:
DEEPFACE_AVAILABLE = False
from src.core.config import (
DEEPFACE_ENFORCE_DETECTION,
DEEPFACE_ALIGN_FACES,
MIN_FACE_CONFIDENCE,
MIN_FACE_SIZE,
MAX_FACE_SIZE,
)
from src.utils.exif_utils import EXIFOrientationHandler
from src.web.db.models import Face, Photo
def calculate_face_quality_score(
image_np: np.ndarray,
face_location: dict,
image_width: int,
image_height: int,
) -> int:
"""Calculate face quality score (0-100).
Simplified quality calculation based on face size and position.
Args:
image_np: Image as numpy array
face_location: Face location dict with x, y, w, h
image_width: Image width
image_height: Image height
Returns:
Quality score from 0-100
"""
x = face_location.get('x', 0)
y = face_location.get('y', 0)
w = face_location.get('w', 0)
h = face_location.get('h', 0)
if w == 0 or h == 0:
return 0
# Face size as percentage of image
face_area = w * h
image_area = image_width * image_height
size_ratio = face_area / image_area if image_area > 0 else 0
# Position score (center is better)
center_x = image_width / 2
center_y = image_height / 2
face_center_x = x + w / 2
face_center_y = y + h / 2
distance_from_center = np.sqrt(
(face_center_x - center_x) ** 2 + (face_center_y - center_y) ** 2
)
max_distance = np.sqrt(center_x ** 2 + center_y ** 2)
position_score = 1.0 - (distance_from_center / max_distance) if max_distance > 0 else 0.5
# Combine size and position (size weighted 70%, position 30%)
quality = (size_ratio * 70) + (position_score * 30)
# Clamp to 0-100
return int(np.clip(quality * 100, 0, 100))
def is_valid_face_detection(
confidence: float,
face_location: dict,
image_width: int,
image_height: int,
) -> bool:
"""Check if face detection meets minimum criteria.
Args:
confidence: Face detection confidence score
face_location: Face location dict with x, y, w, h
image_width: Image width
image_height: Image height
Returns:
True if face is valid, False otherwise
"""
x = face_location.get('x', 0)
y = face_location.get('y', 0)
w = face_location.get('w', 0)
h = face_location.get('h', 0)
# Check minimum confidence
if confidence < MIN_FACE_CONFIDENCE:
return False
# Check minimum size
if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE:
return False
# Check maximum size (to avoid false positives that span entire image)
if w > MAX_FACE_SIZE or h > MAX_FACE_SIZE:
return False
# Check bounds
if x < 0 or y < 0 or (x + w) > image_width or (y + h) > image_height:
return False
return True
def process_photo_faces(
db: Session,
photo: Photo,
detector_backend: str = "retinaface",
model_name: str = "ArcFace",
update_progress: Optional[Callable[[int, int, str], None]] = None,
) -> Tuple[int, int]:
"""Process faces in a single photo using DeepFace.
Args:
db: Database session
photo: Photo model instance
detector_backend: DeepFace detector backend (retinaface, mtcnn, opencv, ssd)
model_name: DeepFace model name (ArcFace, Facenet, Facenet512, VGG-Face)
update_progress: Optional progress callback (processed, total, message)
Returns:
Tuple of (faces_detected, faces_stored)
"""
if not DEEPFACE_AVAILABLE:
raise RuntimeError("DeepFace not available")
photo_path = photo.path
if not os.path.exists(photo_path):
return 0, 0
# Check if photo already has faces processed with same detector/model
existing_faces = db.query(Face).filter(
Face.photo_id == photo.id,
Face.detector == detector_backend,
Face.model == model_name,
).count()
if existing_faces > 0:
# Already processed with this configuration
return existing_faces, existing_faces
try:
# Get EXIF orientation
exif_orientation = EXIFOrientationHandler.get_exif_orientation(photo_path)
# Apply EXIF orientation correction
corrected_image, original_orientation = (
EXIFOrientationHandler.correct_image_orientation_from_path(photo_path)
)
temp_path = None
if corrected_image is not None and original_orientation and original_orientation != 1:
# Save corrected image temporarily
temp_dir = tempfile.gettempdir()
temp_filename = f"corrected_{photo.id}_{os.path.basename(photo_path)}"
temp_path = os.path.join(temp_dir, temp_filename)
corrected_image.save(temp_path, "JPEG", quality=95)
face_detection_path = temp_path
else:
face_detection_path = photo_path
try:
# Use DeepFace to detect faces and compute embeddings
# Note: First call may take time to download/initialize models
print(f"[DeepFace] Processing {photo.filename} with {detector_backend}/{model_name}...")
results = DeepFace.represent(
img_path=face_detection_path,
model_name=model_name,
detector_backend=detector_backend,
enforce_detection=DEEPFACE_ENFORCE_DETECTION,
align=DEEPFACE_ALIGN_FACES,
)
print(f"[DeepFace] Completed {photo.filename}")
except Exception as e:
print(f"[DeepFace] Error processing {photo.filename}: {e}")
raise
finally:
# Clean up temporary file if created
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
except Exception:
pass
if not results:
return 0, 0
# Load image for quality calculation
image = Image.open(photo_path)
image_np = np.array(image)
image_width, image_height = image.size
faces_detected = len(results)
faces_stored = 0
for result in results:
facial_area = result.get('facial_area', {})
face_confidence = result.get('face_confidence', 0.0)
embedding = np.array(result['embedding'])
# Convert to location format
location = {
'x': facial_area.get('x', 0),
'y': facial_area.get('y', 0),
'w': facial_area.get('w', 0),
'h': facial_area.get('h', 0),
}
# Validate face detection
if not is_valid_face_detection(face_confidence, location, image_width, image_height):
continue
# Calculate quality score
quality_score = calculate_face_quality_score(
image_np, location, image_width, image_height
)
# Store face in database
face = Face(
photo_id=photo.id,
person_id=None,
bbox_x=location['x'],
bbox_y=location['y'],
bbox_w=location['w'],
bbox_h=location['h'],
embedding=embedding.tobytes(),
confidence=int(face_confidence * 100) if face_confidence <= 1.0 else int(face_confidence),
quality=quality_score,
model=model_name,
detector=detector_backend,
)
db.add(face)
faces_stored += 1
db.commit()
return faces_detected, faces_stored
except Exception as e:
db.rollback()
raise Exception(f"Error processing faces in {photo.filename}: {str(e)}")
def process_unprocessed_photos(
db: Session,
batch_size: Optional[int] = None,
detector_backend: str = "retinaface",
model_name: str = "ArcFace",
update_progress: Optional[Callable[[int, int, str, int, int], None]] = None,
) -> Tuple[int, int, int]:
"""Process faces in all unprocessed photos.
Args:
db: Database session
batch_size: Maximum number of photos to process (None = all)
detector_backend: DeepFace detector backend
model_name: DeepFace model name
update_progress: Optional callback (processed, total, current_file, faces_detected, faces_stored)
Returns:
Tuple of (photos_processed, total_faces_detected, total_faces_stored)
"""
print(f"[FaceService] Starting face processing: detector={detector_backend}, model={model_name}, batch_size={batch_size}")
# Update progress - querying photos
if update_progress:
update_progress(0, 0, "Querying photos from database...", 0, 0)
# Get all photos
all_photos = db.query(Photo).all()
print(f"[FaceService] Found {len(all_photos)} total photos in database")
# Update progress - filtering photos
if update_progress:
update_progress(0, len(all_photos), "Checking which photos need processing...", 0, 0)
# Filter for photos that need processing (no faces with current detector/model)
unprocessed_photos = []
for idx, photo in enumerate(all_photos, 1):
# Check if photo has faces with current detector/model
existing_face = db.query(Face).filter(
Face.photo_id == photo.id,
Face.detector == detector_backend,
Face.model == model_name,
).first()
if existing_face is None:
unprocessed_photos.append(photo)
# Update progress every 10 photos while filtering
if update_progress and idx % 10 == 0:
update_progress(0, len(all_photos), f"Checking photos... ({idx}/{len(all_photos)})", 0, 0)
if batch_size:
unprocessed_photos = unprocessed_photos[:batch_size]
total = len(unprocessed_photos)
print(f"[FaceService] Found {total} unprocessed photos")
if total == 0:
print("[FaceService] No photos to process")
if update_progress:
update_progress(0, 0, "No photos to process", 0, 0)
return 0, 0, 0
# Update progress - preparing to process
if update_progress:
update_progress(0, total, f"Preparing to process {total} photos...", 0, 0)
photos_processed = 0
total_faces_detected = 0
total_faces_stored = 0
print(f"[FaceService] Starting processing of {total} photos...")
# Helper to check if job was cancelled
def check_cancelled() -> bool:
"""Check if job has been cancelled."""
if update_progress:
# Try to check job metadata for cancellation flag
try:
from rq import get_current_job
job = get_current_job()
if job and job.meta and job.meta.get("cancelled", False):
return True
except Exception:
pass
return False
# Update progress - initializing DeepFace (this may take time on first run)
if update_progress:
update_progress(0, total, "Initializing DeepFace models (this may take a moment on first run)...", 0, 0)
# Check cancellation before starting
if check_cancelled():
print("[FaceService] Job cancelled before processing started")
return photos_processed, total_faces_detected, total_faces_stored
# Process first photo - this will trigger DeepFace initialization
# Update progress before starting actual processing
if update_progress and total > 0:
update_progress(0, total, f"Starting face detection on {total} photos...", 0, 0)
for idx, photo in enumerate(unprocessed_photos, 1):
# Check for cancellation
if check_cancelled():
print(f"[FaceService] Job cancelled at photo {idx}/{total}")
if update_progress:
update_progress(
idx - 1,
total,
"Cancelled by user",
total_faces_detected,
total_faces_stored,
)
break
try:
# Update progress before processing each photo
if update_progress:
update_progress(
idx - 1,
total,
f"Processing {photo.filename}... ({idx}/{total})",
total_faces_detected,
total_faces_stored,
)
faces_detected, faces_stored = process_photo_faces(
db,
photo,
detector_backend=detector_backend,
model_name=model_name,
)
total_faces_detected += faces_detected
total_faces_stored += faces_stored
photos_processed += 1
if update_progress:
update_progress(
idx,
total,
f"Completed {photo.filename} ({idx}/{total})",
total_faces_detected,
total_faces_stored,
)
except Exception as e:
# Log error but continue processing other photos
print(f"[FaceService] Error processing photo {photo.filename}: {e}")
import traceback
traceback.print_exc()
if update_progress:
update_progress(
idx,
total,
f"Error: {photo.filename}",
total_faces_detected,
total_faces_stored,
)
return photos_processed, total_faces_detected, total_faces_stored

View File

@ -9,6 +9,7 @@ from sqlalchemy.orm import Session
from src.web.db.session import SessionLocal
from src.web.services.photo_service import import_photos_from_folder
from src.web.services.face_service import process_unprocessed_photos
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
@ -72,3 +73,183 @@ def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
finally:
db.close()
def process_faces_task(
batch_size: Optional[int] = None,
detector_backend: str = "retinaface",
model_name: str = "ArcFace",
) -> dict:
"""RQ task to process faces in unprocessed photos.
Updates job metadata with progress:
- progress: 0-100
- message: status message
- processed: number of photos processed
- total: total photos to process
- faces_detected: total faces detected
- faces_stored: total faces stored
"""
import traceback
job = get_current_job()
if not job:
raise RuntimeError("Not running in RQ job context")
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
# Update progress immediately - job started
try:
if job:
job.meta = {
"progress": 0,
"message": "Initializing face processing...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
except Exception as e:
print(f"[Task] Error setting initial job metadata: {e}")
db: Session = SessionLocal()
# Initialize result variables
photos_processed = 0
total_faces_detected = 0
total_faces_stored = 0
try:
def update_progress(
processed: int,
total: int,
current_file: str,
faces_detected: int,
faces_stored: int,
) -> None:
"""Update job progress and check for cancellation."""
if job:
# Check if job was cancelled
if job.meta and job.meta.get("cancelled", False):
return # Don't update if cancelled
# Calculate progress: 10% for setup, 90% for processing
if total == 0:
# Setup phase
progress = min(10, processed * 2) # 0-10% during setup
else:
# Processing phase
progress = 10 + int((processed / total) * 90) if total > 0 else 10
job.meta = {
"progress": progress,
"message": f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file,
"processed": processed,
"total": total,
"faces_detected": faces_detected,
"faces_stored": faces_stored,
}
job.save_meta()
# Check for cancellation after updating
if job.meta and job.meta.get("cancelled", False):
print(f"[Task] Job {job.id} cancellation detected")
raise KeyboardInterrupt("Job cancelled by user")
# Update progress - finding photos
if job:
job.meta = {
"progress": 5,
"message": "Finding photos to process...",
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
# Process faces
photos_processed, total_faces_detected, total_faces_stored = (
process_unprocessed_photos(
db,
batch_size=batch_size,
detector_backend=detector_backend,
model_name=model_name,
update_progress=update_progress,
)
)
# Final update
result = {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
}
if job:
job.meta = {
"progress": 100,
"message": (
f"Completed: {photos_processed} photos, "
f"{total_faces_stored} faces stored"
),
"processed": photos_processed,
"total": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
}
job.save_meta()
return result
except KeyboardInterrupt as e:
# Job was cancelled - exit gracefully
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
if job:
try:
job.meta = job.meta or {}
job.meta.update({
"message": "Cancelled by user",
"cancelled": True,
})
job.save_meta()
except Exception:
pass
# Don't re-raise - job cancellation is not a failure
return {
"photos_processed": photos_processed,
"faces_detected": total_faces_detected,
"faces_stored": total_faces_stored,
"detector_backend": detector_backend,
"model_name": model_name,
"cancelled": True,
}
except Exception as e:
# Log error and update job metadata
error_msg = f"Task failed: {str(e)}"
print(f"[Task] ❌ {error_msg}")
traceback.print_exc()
if job:
try:
job.meta = {
"progress": 0,
"message": error_msg,
"processed": 0,
"total": 0,
"faces_detected": 0,
"faces_stored": 0,
}
job.save_meta()
except Exception:
pass
# Re-raise so RQ marks job as failed
raise
finally:
db.close()

View File

@ -11,7 +11,7 @@ import uuid
from rq import Worker
from redis import Redis
from src.web.services.tasks import import_photos_task
from src.web.services.tasks import import_photos_task, process_faces_task
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
@ -28,6 +28,17 @@ def main() -> NoReturn:
# Generate unique worker name to avoid conflicts
worker_name = f"punimtag-worker-{uuid.uuid4().hex[:8]}"
print(f"[Worker] Starting worker: {worker_name}")
print(f"[Worker] Listening on queue: default")
# Check if Redis is accessible
try:
redis_conn.ping()
print(f"[Worker] Redis connection successful")
except Exception as e:
print(f"[Worker] ❌ Redis connection failed: {e}")
sys.exit(1)
# Register tasks with worker
# Tasks are imported from services.tasks
worker = Worker(
@ -35,6 +46,8 @@ def main() -> NoReturn:
connection=redis_conn,
name=worker_name,
)
print(f"[Worker] ✅ Worker ready, waiting for jobs...")
# Start worker
worker.work()