punimtag/backend/api/faces.py
Tanya 6b6b1449b2 Modified files:
backend/config.py - Added MIN_AUTO_MATCH_FACE_SIZE_RATIO = 0.005
backend/services/face_service.py - Multiple changes:
Added load_face_encoding() function (supports float32 and float64)
Added _calculate_face_size_ratio() function
Updated find_similar_faces() to filter small faces
Updated find_auto_match_matches() to exclude small reference faces
Fixed reference face quality calculation (use actual quality, not hardcoded 0.5)
Fixed duplicate detection (exclude faces from same photo)
Updated confidence threshold from 40% to 50%
Updated confidence calibration (moderate version)
backend/api/faces.py - Updated default tolerance to 0.5 for auto-match endpoints
backend/schemas/faces.py - Updated default tolerance to 0.5
admin-frontend/src/pages/AutoMatch.tsx - Updated default tolerance to 0.5
admin-frontend/src/api/faces.ts - Added tolerance parameter support
2026-02-06 14:16:11 -05:00

1030 lines
38 KiB
Python

"""Face management endpoints."""
from __future__ import annotations
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import FileResponse, Response
from rq import Queue
from redis import Redis
from sqlalchemy import func
from sqlalchemy.orm import Session
from typing import Annotated
from backend.db.session import get_db
from backend.api.auth import get_current_user_with_id
from backend.schemas.faces import (
ProcessFacesRequest,
ProcessFacesResponse,
UnidentifiedFacesQuery,
UnidentifiedFacesResponse,
FaceItem,
SimilarFacesResponse,
SimilarFaceItem,
BatchSimilarityRequest,
BatchSimilarityResponse,
FaceSimilarityPair,
IdentifyFaceRequest,
IdentifyFaceResponse,
FaceUnmatchResponse,
BatchUnmatchRequest,
BatchUnmatchResponse,
AutoMatchRequest,
AutoMatchResponse,
AutoMatchPersonItem,
AutoMatchFaceItem,
AutoMatchPeopleResponse,
AutoMatchPersonSummary,
AutoMatchPersonMatchesResponse,
AcceptMatchesRequest,
MaintenanceFacesResponse,
MaintenanceFaceItem,
DeleteFacesRequest,
DeleteFacesResponse,
)
from backend.schemas.people import PersonCreateRequest, PersonResponse
from backend.db.models import Face, Person, PersonEncoding, Photo
from backend.services.face_service import (
list_unidentified_faces,
find_similar_faces,
calculate_batch_similarities,
find_auto_match_matches,
accept_auto_match_matches,
get_auto_match_people_list,
get_auto_match_person_matches as get_person_matches_service,
)
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/faces", tags=["faces"])
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
queue = Queue(connection=redis_conn)
@router.post("/process", response_model=ProcessFacesResponse)
def process_faces(request: ProcessFacesRequest) -> ProcessFacesResponse:
"""Start face processing job.
This enqueues a background job to process faces in unprocessed photos
using DeepFace with the specified detector and model.
"""
try:
# Check if worker is available (basic check)
try:
redis_conn.ping()
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Redis connection failed: {str(e)}",
)
# Enqueue face processing job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
"backend.services.tasks.process_faces_task",
batch_size=request.batch_size,
detector_backend=request.detector_backend,
model_name=request.model_name,
job_timeout="1h", # Long timeout for face processing
)
print(f"[Faces API] Enqueued face processing job: {job.id}")
print(f"[Faces API] Job status: {job.get_status()}")
print(f"[Faces API] Queue length: {len(queue)}")
return ProcessFacesResponse(
job_id=job.id,
message="Face processing job started",
batch_size=request.batch_size,
detector_backend=request.detector_backend,
model_name=request.model_name,
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start face processing job: {str(e)}",
)
@router.get("/unidentified", response_model=UnidentifiedFacesResponse)
def get_unidentified_faces(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=2000),
min_quality: float = Query(0.0, ge=0.0, le=1.0),
date_taken_from: str | None = Query(None, description="Filter by date taken (from)"),
date_taken_to: str | None = Query(None, description="Filter by date taken (to)"),
date_processed: str | None = Query(None, description="Filter by date processed (exact date)"),
sort_by: str = Query("quality"),
sort_dir: str = Query("desc"),
tag_names: str | None = Query(None, description="Comma-separated tag names for filtering"),
match_all: bool = Query(False, description="Match all tags (for tag filtering)"),
photo_ids: str | None = Query(None, description="Comma-separated photo IDs for filtering"),
include_excluded: bool = Query(False, description="Include excluded faces in results"),
db: Session = Depends(get_db),
) -> UnidentifiedFacesResponse:
"""Get unidentified faces with filters and pagination."""
from datetime import date as _date
try:
dtf = _date.fromisoformat(date_taken_from) if date_taken_from and date_taken_from.strip() else None
except (ValueError, AttributeError, TypeError):
dtf = None
try:
dtt = _date.fromisoformat(date_taken_to) if date_taken_to and date_taken_to.strip() else None
except (ValueError, AttributeError, TypeError):
dtt = None
try:
dp = _date.fromisoformat(date_processed) if date_processed and date_processed.strip() else None
except (ValueError, AttributeError, TypeError):
dp = None
# Parse tag names
tag_names_list = None
if tag_names:
tag_names_list = [t.strip() for t in tag_names.split(',') if t.strip()]
# Parse photo IDs
photo_ids_list = None
if photo_ids:
try:
photo_ids_list = [int(pid.strip()) for pid in photo_ids.split(',') if pid.strip()]
except ValueError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid photo_ids format")
# Convert single date_processed to date_processed_from and date_processed_to (exact date match)
date_processed_from = dp
date_processed_to = dp
faces, total = list_unidentified_faces(
db,
page=page,
page_size=page_size,
min_quality=min_quality,
date_taken_from=dtf,
date_taken_to=dtt,
date_processed_from=date_processed_from,
date_processed_to=date_processed_to,
sort_by=sort_by,
sort_dir=sort_dir,
tag_names=tag_names_list,
match_all=match_all,
photo_ids=photo_ids_list,
include_excluded=include_excluded,
)
items = [
FaceItem(
id=f.id,
photo_id=f.photo_id,
quality_score=float(f.quality_score),
face_confidence=float(getattr(f, "face_confidence", 0.0)),
location=f.location,
pose_mode=getattr(f, "pose_mode", None) or "frontal",
excluded=getattr(f, "excluded", False),
)
for f in faces
]
return UnidentifiedFacesResponse(items=items, page=page, page_size=page_size, total=total)
@router.get("/{face_id}/similar", response_model=SimilarFacesResponse)
def get_similar_faces(
face_id: int,
include_excluded: bool = Query(False, description="Include excluded faces in results"),
db: Session = Depends(get_db)
) -> SimilarFacesResponse:
"""Return similar unidentified faces for a given face."""
import logging
logger = logging.getLogger(__name__)
logger.info(f"API: get_similar_faces called for face_id={face_id}, include_excluded={include_excluded}")
# Validate face exists
base = db.query(Face).filter(Face.id == face_id).first()
if not base:
logger.warning(f"API: Face {face_id} not found")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Face {face_id} not found")
logger.info(f"API: Calling find_similar_faces for face_id={face_id}, include_excluded={include_excluded}")
# Use 0.6 tolerance for Identify People (more lenient for manual review)
results = find_similar_faces(db, face_id, tolerance=0.6, include_excluded=include_excluded)
logger.info(f"API: find_similar_faces returned {len(results)} results")
items = [
SimilarFaceItem(
id=f.id,
photo_id=f.photo_id,
similarity=confidence_pct / 100.0, # Convert confidence percentage to [0,1] for API compatibility
location=f.location,
quality_score=float(f.quality_score),
filename=f.photo.filename if f.photo else "unknown",
pose_mode=getattr(f, "pose_mode", None) or "frontal",
)
for f, distance, confidence_pct in results
]
logger.info(f"API: Returning {len(items)} items for face_id={face_id}")
return SimilarFacesResponse(base_face_id=face_id, items=items)
@router.post("/batch-similarity", response_model=BatchSimilarityResponse)
def get_batch_similarities(
request: BatchSimilarityRequest,
db: Session = Depends(get_db),
) -> BatchSimilarityResponse:
"""Calculate similarities between all pairs of faces in the provided list.
Loads all faces once from database and calculates similarities between all pairs.
Much more efficient than calling /similar for each face individually.
"""
import logging
logger = logging.getLogger(__name__)
logger.info(f"API: batch_similarity called for {len(request.face_ids)} faces")
# Calculate similarities between all pairs
# Use 0.6 tolerance for Identify People (more lenient for manual review)
pairs = calculate_batch_similarities(
db,
request.face_ids,
min_confidence=request.min_confidence,
tolerance=0.6,
)
# Convert to response format
items = [
FaceSimilarityPair(
face_id_1=face_id_1,
face_id_2=face_id_2,
similarity=similarity,
confidence_pct=confidence_pct,
)
for face_id_1, face_id_2, similarity, confidence_pct in pairs
]
logger.info(f"API: batch_similarity returning {len(items)} pairs")
return BatchSimilarityResponse(pairs=items)
@router.post("/{face_id}/identify", response_model=IdentifyFaceResponse)
def identify_face(
face_id: int,
request: IdentifyFaceRequest,
current_user: Annotated[dict, Depends(get_current_user_with_id)],
db: Session = Depends(get_db),
) -> IdentifyFaceResponse:
"""Assign a face (and optional batch) to a person, creating if needed.
Also inserts into person_encodings for each identified face as desktop does.
Tracks which user identified the face.
"""
user_id = current_user["user_id"]
# Validate target face
face = db.query(Face).filter(Face.id == face_id).first()
if not face:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Face {face_id} not found")
target_face_ids = [face_id]
if request.additional_face_ids:
target_face_ids.extend([fid for fid in request.additional_face_ids if fid != face_id])
# Get or create person
created_person = False
person: Person | None = None
if request.person_id:
person = db.query(Person).filter(Person.id == request.person_id).first()
if not person:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="person_id not found")
else:
# Validate required fields for creation
first_name = (request.first_name or "").strip()
last_name = (request.last_name or "").strip()
middle_name = request.middle_name.strip() if request.middle_name else None
maiden_name = request.maiden_name.strip() if request.maiden_name else None
if not (first_name and last_name):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="first_name and last_name are required to create a person",
)
# Explicitly set created_date to ensure it's a valid datetime object
person = Person(
first_name=first_name,
last_name=last_name,
middle_name=middle_name,
maiden_name=maiden_name,
date_of_birth=request.date_of_birth,
created_date=datetime.utcnow(),
)
db.add(person)
db.flush() # get person.id
created_person = True
# Link faces and insert person_encodings
identified_ids: list[int] = []
for fid in target_face_ids:
f = db.query(Face).filter(Face.id == fid).first()
if not f:
continue
f.person_id = person.id
f.identified_by_user_id = user_id
db.add(f)
# Insert person_encoding
pe = PersonEncoding(
person_id=person.id,
face_id=f.id,
encoding=f.encoding,
quality_score=f.quality_score,
detector_backend=f.detector_backend,
model_name=f.model_name,
)
db.add(pe)
identified_ids.append(f.id)
db.commit()
return IdentifyFaceResponse(identified_face_ids=identified_ids, person_id=person.id, created_person=created_person)
@router.get("/{face_id}/crop")
def get_face_crop(face_id: int, db: Session = Depends(get_db)) -> Response:
"""Serve face crop image extracted from photo using face location."""
import os
import json
import ast
import tempfile
from PIL import Image
from backend.db.models import Face, Photo
from src.utils.exif_utils import EXIFOrientationHandler
face = db.query(Face).filter(Face.id == face_id).first()
if not face:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Face {face_id} not found")
photo = db.query(Photo).filter(Photo.id == face.photo_id).first()
if not photo or not os.path.exists(photo.path):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Photo file not found")
try:
# Parse location (stored as text); support JSON or Python-literal formats
if isinstance(face.location, str):
try:
location = json.loads(face.location)
except Exception:
location = ast.literal_eval(face.location)
else:
location = face.location
# DeepFace format: {x, y, w, h}
x = int(location.get('x', 0) or 0)
y = int(location.get('y', 0) or 0)
w = int(location.get('w', 0) or 0)
h = int(location.get('h', 0) or 0)
# If invalid dimensions, return client error similar to desktop behavior
if w <= 0 or h <= 0:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid face box")
# Load image with EXIF correction (same as desktop)
# Desktop logic: use corrected image only if it's not None AND orientation != 1
corrected_image, original_orientation = EXIFOrientationHandler.correct_image_orientation_from_path(photo.path)
if corrected_image is not None and original_orientation and original_orientation != 1:
# Copy the image to ensure it's not tied to closed file handle
image = corrected_image.copy()
else:
# Use original image if no correction needed or correction fails
image = Image.open(photo.path)
# Calculate crop bounds with padding (20% like desktop)
padding_x = max(0, int(w * 0.2))
padding_y = max(0, int(h * 0.2))
crop_left = max(0, int(x - padding_x))
crop_top = max(0, int(y - padding_y))
crop_right = min(int(image.width), int(x + w + padding_x))
crop_bottom = min(int(image.height), int(y + h + padding_y))
# Ensure bounds make a valid box
if crop_right <= crop_left or crop_bottom <= crop_top:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid crop bounds")
face_crop = image.crop((crop_left, crop_top, crop_right, crop_bottom))
# Resize if too small (minimum 200px width, like desktop)
if face_crop.width > 0 and face_crop.width < 200:
ratio = 200 / face_crop.width
new_width = 200
new_height = int(face_crop.height * ratio)
face_crop = face_crop.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Save to bytes instead of temp file to avoid Content-Length issues
from io import BytesIO
output = BytesIO()
face_crop.save(output, format="JPEG", quality=95)
output.seek(0)
image_bytes = output.read()
output.close()
return Response(
content=image_bytes,
media_type="image/jpeg",
headers={
"Content-Disposition": "inline",
"Cache-Control": "public, max-age=3600",
},
)
except HTTPException:
raise
except Exception as e:
print(f"[Faces API] get_face_crop error for face {face_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to extract face crop: {str(e)}",
)
@router.put("/{face_id}/excluded", response_model=dict)
def toggle_face_excluded(
face_id: int,
excluded: bool = Query(..., description="Set excluded status"),
db: Session = Depends(get_db),
) -> dict:
"""Toggle excluded status for a face."""
face = db.query(Face).filter(Face.id == face_id).first()
if not face:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Face {face_id} not found")
face.excluded = excluded
db.commit()
return {"face_id": face_id, "excluded": excluded, "message": f"Face {'excluded' if excluded else 'included'} successfully"}
@router.post("/{face_id}/unmatch", response_model=FaceUnmatchResponse)
def unmatch_face(face_id: int, db: Session = Depends(get_db)) -> FaceUnmatchResponse:
"""Unmatch a face from its person (set person_id to NULL)."""
face = db.query(Face).filter(Face.id == face_id).first()
if not face:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Face {face_id} not found")
if face.person_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Face {face_id} is not currently matched to any person",
)
# Store person_id for response message
old_person_id = face.person_id
# Unmatch the face
face.person_id = None
# Also delete associated person_encodings for this face
db.query(PersonEncoding).filter(PersonEncoding.face_id == face_id).delete()
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to unmatch face: {str(e)}",
)
return FaceUnmatchResponse(
face_id=face_id,
message=f"Face {face_id} unlinked from person {old_person_id}",
)
@router.post("/batch-unmatch", response_model=BatchUnmatchResponse)
def batch_unmatch_faces(request: BatchUnmatchRequest, db: Session = Depends(get_db)) -> BatchUnmatchResponse:
"""Batch unmatch multiple faces from their people."""
if not request.face_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="face_ids list cannot be empty",
)
# Validate all faces exist
faces = db.query(Face).filter(Face.id.in_(request.face_ids)).all()
found_ids = {f.id for f in faces}
missing_ids = set(request.face_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Faces not found: {sorted(missing_ids)}",
)
# Filter to only faces that are currently matched
matched_faces = [f for f in faces if f.person_id is not None]
if not matched_faces:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="None of the specified faces are currently matched to any person",
)
# Unmatch all matched faces
face_ids_to_unmatch = [f.id for f in matched_faces]
# Collect person_ids that will be affected (before unlinking)
affected_person_ids = {f.person_id for f in matched_faces if f.person_id is not None}
for face in matched_faces:
face.person_id = None
# Delete associated person_encodings for these faces
db.query(PersonEncoding).filter(PersonEncoding.face_id.in_(face_ids_to_unmatch)).delete(synchronize_session=False)
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to batch unmatch faces: {str(e)}",
)
# Auto-deletion of people without faces is disabled
# People are kept even if they have no identified faces remaining
# deleted_person_ids = []
# if affected_person_ids:
# for person_id in affected_person_ids:
# # Check if person has any faces left
# face_count = db.query(func.count(Face.id)).filter(Face.person_id == person_id).scalar()
# if face_count == 0:
# # Person has no faces left, delete them
# person = db.query(Person).filter(Person.id == person_id).first()
# if person:
# db.delete(person)
# deleted_person_ids.append(person_id)
#
# if deleted_person_ids:
# try:
# db.commit()
# except Exception as e:
# db.rollback()
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail=f"Failed to delete people with no faces: {str(e)}",
# )
message = f"Successfully unlinked {len(face_ids_to_unmatch)} face(s)"
return BatchUnmatchResponse(
unmatched_face_ids=face_ids_to_unmatch,
count=len(face_ids_to_unmatch),
message=message,
)
@router.post("/auto-match", response_model=AutoMatchResponse)
def auto_match_faces(
request: AutoMatchRequest,
db: Session = Depends(get_db),
) -> AutoMatchResponse:
"""Start auto-match process with tolerance threshold and optional auto-acceptance.
Matches desktop auto-match workflow exactly:
1. Gets all identified people (one face per person, best quality >= 0.3)
2. For each person, finds similar unidentified faces (confidence >= 40%)
3. Returns matches grouped by person, sorted by person name
If auto_accept=True:
- Only processes persons with frontal or tilted reference faces (not profile)
- Only processes persons with reference face quality > 50% (quality_score > 0.5)
- Only matches with frontal or tilted unidentified faces (not profile)
- Only auto-accepts matches with similarity >= threshold
- Only auto-accepts faces with quality > 50% (quality_score > 0.5)
"""
from backend.db.models import Person, Photo
from sqlalchemy import func
# Track statistics for auto-accept
auto_accepted_faces = 0
skipped_persons = 0
skipped_matches = 0
# Find matches for all identified people
# Filter by frontal reference faces if auto_accept enabled
matches_data = find_auto_match_matches(
db,
tolerance=request.tolerance,
filter_frontal_only=request.auto_accept
)
# If auto_accept enabled, process matches automatically
if request.auto_accept and matches_data:
for person_id, reference_face_id, reference_face, similar_faces in matches_data:
# Filter matches by criteria:
# 1. Match face must be frontal (already filtered by find_similar_faces)
# 2. Similarity must be >= threshold
# 3. Quality must be > 50% (quality_score > 0.5)
qualifying_faces = []
for face, distance, confidence_pct in similar_faces:
# Check similarity threshold
if confidence_pct < request.auto_accept_threshold:
skipped_matches += 1
continue
# Check quality threshold (only accept faces with quality > 50%)
face_quality = float(face.quality_score) if face.quality_score is not None else 0.0
if face_quality <= 0.5:
skipped_matches += 1
continue
qualifying_faces.append(face.id)
# Auto-accept qualifying faces
if qualifying_faces:
try:
identified_count, updated_count = accept_auto_match_matches(
db, person_id, qualifying_faces
)
auto_accepted_faces += identified_count
except Exception as e:
print(f"Error auto-accepting matches for person {person_id}: {e}")
if not matches_data:
return AutoMatchResponse(
people=[],
total_people=0,
total_matches=0,
auto_accepted=request.auto_accept,
auto_accepted_faces=auto_accepted_faces,
skipped_persons=skipped_persons,
skipped_matches=skipped_matches,
)
# Build response matching desktop format
people_items = []
total_matches = 0
for person_id, reference_face_id, reference_face, similar_faces in matches_data:
# Get person details
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
continue
# Build person name (matching desktop)
name_parts = []
if person.first_name:
name_parts.append(person.first_name)
if person.middle_name:
name_parts.append(person.middle_name)
if person.last_name:
name_parts.append(person.last_name)
if person.maiden_name:
name_parts.append(f"({person.maiden_name})")
person_name = ' '.join(name_parts) if name_parts else "Unknown"
# Get face count for this person (matching desktop)
face_count = (
db.query(func.count(Face.id))
.filter(Face.person_id == person_id)
.scalar() or 0
)
# Get reference face photo info
reference_photo = db.query(Photo).filter(Photo.id == reference_face.photo_id).first()
if not reference_photo:
continue
# Get reference face pose_mode
reference_pose_mode = reference_face.pose_mode or 'frontal'
# Build matches list
match_items = []
for face, distance, confidence_pct in similar_faces:
# Get photo info for this match
match_photo = db.query(Photo).filter(Photo.id == face.photo_id).first()
if not match_photo:
continue
match_items.append(
AutoMatchFaceItem(
id=face.id,
photo_id=face.photo_id,
photo_filename=match_photo.filename,
location=face.location,
quality_score=float(face.quality_score),
similarity=confidence_pct, # Confidence percentage (0-100)
distance=distance,
pose_mode=face.pose_mode or 'frontal',
)
)
if match_items:
people_items.append(
AutoMatchPersonItem(
person_id=person_id,
person_name=person_name,
reference_face_id=reference_face_id,
reference_photo_id=reference_face.photo_id,
reference_photo_filename=reference_photo.filename,
reference_location=reference_face.location,
reference_pose_mode=reference_pose_mode,
face_count=face_count,
matches=match_items,
total_matches=len(match_items),
)
)
total_matches += len(match_items)
return AutoMatchResponse(
people=people_items,
total_people=len(people_items),
total_matches=total_matches,
auto_accepted=request.auto_accept,
auto_accepted_faces=auto_accepted_faces,
skipped_persons=skipped_persons,
skipped_matches=skipped_matches,
)
@router.get("/auto-match/people", response_model=AutoMatchPeopleResponse)
def get_auto_match_people(
filter_frontal_only: bool = Query(False, description="Only include frontal/tilted reference faces"),
tolerance: float = Query(0.5, ge=0.0, le=1.0, description="Tolerance threshold"),
db: Session = Depends(get_db),
) -> AutoMatchPeopleResponse:
"""Get list of people for auto-match (without matches) - fast initial load.
Returns just the people list with reference faces, without calculating matches.
This allows fast initial page load, then matches can be loaded on-demand via
/auto-match/people/{person_id}/matches endpoint.
Note: Only returns people if there are unidentified faces in the database
(since people can't have matches if there are no unidentified faces).
"""
from backend.db.models import Person, Photo
# Get people list (fast - no match calculations, but checks for unidentified faces)
people_data = get_auto_match_people_list(
db,
filter_frontal_only=filter_frontal_only,
tolerance=tolerance
)
if not people_data:
return AutoMatchPeopleResponse(people=[], total_people=0)
# Build response
people_items = []
for person_id, reference_face, person_name, face_count in people_data:
# Get person details
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
continue
# Get reference face photo info
reference_photo = db.query(Photo).filter(Photo.id == reference_face.photo_id).first()
if not reference_photo:
continue
# Get reference face pose_mode
reference_pose_mode = reference_face.pose_mode or 'frontal'
people_items.append(
AutoMatchPersonSummary(
person_id=person_id,
person_name=person_name,
reference_face_id=reference_face.id,
reference_photo_id=reference_face.photo_id,
reference_photo_filename=reference_photo.filename,
reference_location=reference_face.location,
reference_pose_mode=reference_pose_mode,
face_count=face_count,
total_matches=0, # Will be loaded separately
)
)
return AutoMatchPeopleResponse(
people=people_items,
total_people=len(people_items),
)
@router.get("/auto-match/people/{person_id}/matches", response_model=AutoMatchPersonMatchesResponse)
def get_auto_match_person_matches(
person_id: int,
tolerance: float = Query(0.5, ge=0.0, le=1.0, description="Tolerance threshold"),
filter_frontal_only: bool = Query(False, description="Only return frontal/tilted faces"),
db: Session = Depends(get_db),
) -> AutoMatchPersonMatchesResponse:
"""Get matches for a specific person - for lazy loading.
This endpoint is called on-demand when user navigates to a person.
"""
from backend.db.models import Photo
# Get matches for this person
similar_faces = get_person_matches_service(
db,
person_id=person_id,
tolerance=tolerance,
filter_frontal_only=filter_frontal_only,
)
if not similar_faces:
return AutoMatchPersonMatchesResponse(
person_id=person_id,
matches=[],
total_matches=0,
)
# Build matches list
match_items = []
for face, distance, confidence_pct in similar_faces:
# Get photo info for this match
match_photo = db.query(Photo).filter(Photo.id == face.photo_id).first()
if not match_photo:
continue
match_items.append(
AutoMatchFaceItem(
id=face.id,
photo_id=face.photo_id,
photo_filename=match_photo.filename,
location=face.location,
quality_score=float(face.quality_score),
similarity=confidence_pct, # Confidence percentage (0-100)
distance=distance,
pose_mode=face.pose_mode or 'frontal',
)
)
return AutoMatchPersonMatchesResponse(
person_id=person_id,
matches=match_items,
total_matches=len(match_items),
)
@router.get("/maintenance", response_model=MaintenanceFacesResponse)
def list_all_faces(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=2000),
min_quality: float = Query(0.0, ge=0.0, le=1.0),
max_quality: float = Query(1.0, ge=0.0, le=1.0),
excluded_filter: str = Query("all", description="Filter by excluded status: all, excluded, included"),
identified_filter: str = Query("all", description="Filter by identified status: all, identified, unidentified"),
db: Session = Depends(get_db),
) -> MaintenanceFacesResponse:
"""List all faces with person info and file path for maintenance.
Returns all faces (both identified and unidentified) with their associated
person information (if identified) and photo file path.
"""
# Build query with quality filter
query = (
db.query(Face, Photo, Person)
.join(Photo, Face.photo_id == Photo.id)
.outerjoin(Person, Face.person_id == Person.id)
.filter(Face.quality_score >= min_quality)
.filter(Face.quality_score <= max_quality)
)
# Filter by excluded status
if excluded_filter == "excluded":
query = query.filter(Face.excluded.is_(True))
elif excluded_filter == "included":
query = query.filter(Face.excluded.is_(False))
# "all" means no additional filter
# Filter by identified status
if identified_filter == "identified":
query = query.filter(Face.person_id.isnot(None))
elif identified_filter == "unidentified":
query = query.filter(Face.person_id.is_(None))
# "all" means no additional filter
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
results = query.order_by(Face.id.desc()).offset(offset).limit(page_size).all()
# Build response items
items = []
for face, photo, person in results:
person_name = None
if person:
# Build full name
name_parts = []
if person.first_name:
name_parts.append(person.first_name)
if person.middle_name:
name_parts.append(person.middle_name)
if person.last_name:
name_parts.append(person.last_name)
if person.maiden_name:
name_parts.append(f"({person.maiden_name})")
person_name = " ".join(name_parts) if name_parts else None
items.append(
MaintenanceFaceItem(
id=face.id,
photo_id=face.photo_id,
photo_path=photo.path,
photo_filename=photo.filename,
quality_score=float(face.quality_score),
person_id=face.person_id,
person_name=person_name,
excluded=face.excluded,
)
)
return MaintenanceFacesResponse(items=items, total=total)
@router.post("/delete", response_model=DeleteFacesResponse)
def delete_faces(
request: DeleteFacesRequest,
db: Session = Depends(get_db),
) -> DeleteFacesResponse:
"""Delete multiple faces from the database.
This permanently removes faces and their associated encodings.
Also removes person_encodings associated with these faces.
Note: People are kept even if they have no identified faces remaining
after face deletion. Auto-deletion of people without faces is disabled.
"""
if not request.face_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="face_ids list cannot be empty",
)
# Validate all faces exist
faces = db.query(Face).filter(Face.id.in_(request.face_ids)).all()
found_ids = {f.id for f in faces}
missing_ids = set(request.face_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Faces not found: {sorted(missing_ids)}",
)
# Collect person_ids that will be affected (before deletion)
# Only include faces that are identified (have a person_id)
affected_person_ids = {f.person_id for f in faces if f.person_id is not None}
# Delete associated person_encodings for these faces
db.query(PersonEncoding).filter(PersonEncoding.face_id.in_(request.face_ids)).delete(synchronize_session=False)
# Delete the faces
db.query(Face).filter(Face.id.in_(request.face_ids)).delete(synchronize_session=False)
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete faces: {str(e)}",
)
# Auto-deletion of people without faces is disabled
# People are kept even if they have no identified faces remaining
# deleted_person_ids = []
# if affected_person_ids:
# for person_id in affected_person_ids:
# # Check if person has any faces left after deletion
# face_count = db.query(func.count(Face.id)).filter(Face.person_id == person_id).scalar()
# if face_count == 0:
# # Person has no faces left, delete them
# person = db.query(Person).filter(Person.id == person_id).first()
# if person:
# db.delete(person)
# deleted_person_ids.append(person_id)
#
# if deleted_person_ids:
# try:
# db.commit()
# except Exception as e:
# db.rollback()
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail=f"Failed to delete people with no faces: {str(e)}",
# )
message = f"Successfully deleted {len(request.face_ids)} face(s)"
return DeleteFacesResponse(
deleted_face_ids=request.face_ids,
count=len(request.face_ids),
message=message,
)