punimtag/backend/api/people.py
Tanya c69604573d feat: Improve face identification process with validation and error handling
This commit enhances the face identification process by adding validation checks for person ID and names, ensuring that users provide necessary information before proceeding. It also introduces detailed logging for better debugging and user feedback during the identification process. Additionally, error handling is improved to provide user-friendly messages in case of failures, enhancing the overall user experience.
2026-01-05 13:37:45 -05:00

321 lines
11 KiB
Python

"""People management endpoints."""
from __future__ import annotations
from datetime import datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from sqlalchemy import func
from sqlalchemy.orm import Session
from backend.db.session import get_db
from backend.db.models import Person, Face, PersonEncoding, PhotoPersonLinkage, Photo
from backend.api.auth import get_current_user_with_id
from backend.schemas.people import (
PeopleListResponse,
PersonCreateRequest,
PersonResponse,
PersonUpdateRequest,
PersonWithFacesResponse,
PeopleWithFacesListResponse,
)
from backend.schemas.faces import PersonFacesResponse, PersonFaceItem, AcceptMatchesRequest, IdentifyFaceResponse
from backend.services.face_service import accept_auto_match_matches
router = APIRouter(prefix="/people", tags=["people"])
@router.get("", response_model=PeopleListResponse)
def list_people(
last_name: str | None = Query(None, description="Filter by last name (case-insensitive)"),
db: Session = Depends(get_db),
) -> PeopleListResponse:
"""List all people sorted by last_name, first_name.
Optionally filter by last_name if provided (case-insensitive search).
"""
query = db.query(Person)
if last_name:
# Case-insensitive search on last_name
query = query.filter(func.lower(Person.last_name).contains(func.lower(last_name)))
people = query.order_by(Person.last_name.asc(), Person.first_name.asc()).all()
items = [PersonResponse.model_validate(p) for p in people]
return PeopleListResponse(items=items, total=len(items))
@router.get("/with-faces", response_model=PeopleWithFacesListResponse)
def list_people_with_faces(
last_name: str | None = Query(None, description="Filter by last name or maiden name (case-insensitive)"),
db: Session = Depends(get_db),
) -> PeopleWithFacesListResponse:
"""List all people with face counts and video counts, sorted by last_name, first_name.
Optionally filter by last_name or maiden_name if provided (case-insensitive search).
Returns all people, including those with zero faces or videos.
"""
# Query people with face counts using LEFT OUTER JOIN to include people with no faces
query = (
db.query(
Person,
func.count(Face.id.distinct()).label('face_count')
)
.outerjoin(Face, Person.id == Face.person_id)
.group_by(Person.id)
)
if last_name:
# Case-insensitive search on both last_name and maiden_name
search_term = last_name.lower()
query = query.filter(
(func.lower(Person.last_name).contains(search_term)) |
((Person.maiden_name.isnot(None)) & (func.lower(Person.maiden_name).contains(search_term)))
)
results = query.order_by(Person.last_name.asc(), Person.first_name.asc()).all()
# Get video counts separately for each person
person_ids = [person.id for person, _ in results]
video_counts = {}
if person_ids:
video_count_query = (
db.query(
PhotoPersonLinkage.person_id,
func.count(PhotoPersonLinkage.id).label('video_count')
)
.join(Photo, PhotoPersonLinkage.photo_id == Photo.id)
.filter(
PhotoPersonLinkage.person_id.in_(person_ids),
Photo.media_type == "video"
)
.group_by(PhotoPersonLinkage.person_id)
)
for person_id, video_count in video_count_query.all():
video_counts[person_id] = video_count
items = [
PersonWithFacesResponse(
id=person.id,
first_name=person.first_name,
last_name=person.last_name,
middle_name=person.middle_name,
maiden_name=person.maiden_name,
date_of_birth=person.date_of_birth,
face_count=face_count or 0, # Convert None to 0 for people with no faces
video_count=video_counts.get(person.id, 0), # Get video count or default to 0
)
for person, face_count in results
]
return PeopleWithFacesListResponse(items=items, total=len(items))
@router.post("", response_model=PersonResponse, status_code=status.HTTP_201_CREATED)
def create_person(request: PersonCreateRequest, db: Session = Depends(get_db)) -> PersonResponse:
"""Create a new person."""
first_name = request.first_name.strip()
last_name = request.last_name.strip()
middle_name = request.middle_name.strip() if request.middle_name else None
maiden_name = request.maiden_name.strip() if request.maiden_name else None
# Explicitly set created_date to ensure it's a valid datetime object
person = Person(
first_name=first_name,
last_name=last_name,
middle_name=middle_name,
maiden_name=maiden_name,
date_of_birth=request.date_of_birth,
created_date=datetime.utcnow(),
)
db.add(person)
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
db.refresh(person)
return PersonResponse.model_validate(person)
@router.get("/{person_id}", response_model=PersonResponse)
def get_person(person_id: int, db: Session = Depends(get_db)) -> PersonResponse:
"""Get person by ID."""
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Person {person_id} not found")
return PersonResponse.model_validate(person)
@router.put("/{person_id}", response_model=PersonResponse)
def update_person(
person_id: int,
request: PersonUpdateRequest,
db: Session = Depends(get_db),
) -> PersonResponse:
"""Update person information."""
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Person {person_id} not found")
# Update fields
person.first_name = request.first_name.strip()
person.last_name = request.last_name.strip()
person.middle_name = request.middle_name.strip() if request.middle_name else None
person.maiden_name = request.maiden_name.strip() if request.maiden_name else None
person.date_of_birth = request.date_of_birth
try:
db.commit()
db.refresh(person)
except Exception as e:
db.rollback()
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
return PersonResponse.model_validate(person)
@router.get("/{person_id}/faces", response_model=PersonFacesResponse)
def get_person_faces(person_id: int, db: Session = Depends(get_db)) -> PersonFacesResponse:
"""Get all faces for a specific person."""
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Person {person_id} not found")
from backend.db.models import Photo
faces = (
db.query(Face)
.join(Photo, Face.photo_id == Photo.id)
.filter(Face.person_id == person_id)
.order_by(Photo.filename)
.all()
)
items = [
PersonFaceItem(
id=face.id,
photo_id=face.photo_id,
photo_path=face.photo.path,
photo_filename=face.photo.filename,
location=face.location,
face_confidence=float(face.face_confidence),
quality_score=float(face.quality_score),
detector_backend=face.detector_backend,
model_name=face.model_name,
)
for face in faces
]
return PersonFacesResponse(person_id=person_id, items=items, total=len(items))
@router.get("/{person_id}/videos")
def get_person_videos(person_id: int, db: Session = Depends(get_db)) -> dict:
"""Get all videos linked to a specific person."""
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Person {person_id} not found")
# Get all video linkages for this person
linkages = (
db.query(PhotoPersonLinkage, Photo)
.join(Photo, PhotoPersonLinkage.photo_id == Photo.id)
.filter(
PhotoPersonLinkage.person_id == person_id,
Photo.media_type == "video"
)
.order_by(Photo.filename)
.all()
)
items = [
{
"id": photo.id,
"filename": photo.filename,
"path": photo.path,
"date_taken": photo.date_taken.isoformat() if photo.date_taken else None,
"date_added": photo.date_added.isoformat() if photo.date_added else None,
"linkage_id": linkage.id,
}
for linkage, photo in linkages
]
return {
"person_id": person_id,
"items": items,
"total": len(items),
}
@router.post("/{person_id}/accept-matches", response_model=IdentifyFaceResponse)
def accept_matches(
person_id: int,
request: AcceptMatchesRequest,
current_user: Annotated[dict, Depends(get_current_user_with_id)],
db: Session = Depends(get_db),
) -> IdentifyFaceResponse:
"""Accept auto-match matches for a person.
Matches desktop auto-match save workflow exactly:
1. Identifies selected faces with this person
2. Inserts person_encodings for each identified face
3. Updates person encodings (removes old, adds current)
Tracks which user identified the faces.
"""
from backend.api.auth import get_current_user_with_id
user_id = current_user["user_id"]
identified_count, updated_count = accept_auto_match_matches(
db, person_id, request.face_ids, user_id=user_id
)
return IdentifyFaceResponse(
identified_face_ids=request.face_ids,
person_id=person_id,
created_person=False,
)
@router.delete("/{person_id}")
def delete_person(person_id: int, db: Session = Depends(get_db)) -> Response:
"""Delete a person and all their linkages.
This will:
1. Delete all person_encodings for this person
2. Unlink all faces (set person_id to NULL)
3. Delete all video linkages (PhotoPersonLinkage records)
4. Delete the person record
"""
person = db.query(Person).filter(Person.id == person_id).first()
if not person:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Person {person_id} not found",
)
try:
# Delete all person_encodings for this person
db.query(PersonEncoding).filter(PersonEncoding.person_id == person_id).delete(synchronize_session=False)
# Unlink all faces (set person_id to NULL)
db.query(Face).filter(Face.person_id == person_id).update(
{"person_id": None}, synchronize_session=False
)
# Delete all video linkages (PhotoPersonLinkage records)
db.query(PhotoPersonLinkage).filter(PhotoPersonLinkage.person_id == person_id).delete(synchronize_session=False)
# Delete the person record
db.delete(person)
db.commit()
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete person: {str(e)}",
)