"""Video person identification endpoints.""" from __future__ import annotations from datetime import date from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, Query, status from fastapi.responses import FileResponse from sqlalchemy.orm import Session from backend.db.session import get_db from backend.db.models import Photo, User from backend.api.auth import get_current_user_with_id from backend.schemas.videos import ( ListVideosResponse, VideoListItem, PersonInfo, VideoPeopleResponse, VideoPersonInfo, IdentifyVideoRequest, IdentifyVideoResponse, RemoveVideoPersonResponse, ) from backend.services.video_service import ( list_videos_for_identification, get_video_people, identify_person_in_video, remove_person_from_video, get_video_people_count, ) from backend.services.thumbnail_service import get_video_thumbnail_path router = APIRouter(prefix="/videos", tags=["videos"]) @router.get("", response_model=ListVideosResponse) def list_videos( current_user: Annotated[dict, Depends(get_current_user_with_id)], folder_path: Optional[str] = Query(None, description="Filter by folder path"), date_from: Optional[str] = Query(None, description="Filter by date taken (from, YYYY-MM-DD)"), date_to: Optional[str] = Query(None, description="Filter by date taken (to, YYYY-MM-DD)"), has_people: Optional[bool] = Query(None, description="Filter videos with/without identified people"), person_name: Optional[str] = Query(None, description="Filter videos containing person with this name"), sort_by: str = Query("filename", description="Sort field: filename, date_taken, date_added"), sort_dir: str = Query("asc", description="Sort direction: asc or desc"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), db: Session = Depends(get_db), ) -> ListVideosResponse: """List videos for person identification.""" # Parse date filters date_from_parsed = None date_to_parsed = None if date_from: try: date_from_parsed = date.fromisoformat(date_from) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date_from format: {date_from}. Use YYYY-MM-DD", ) if date_to: try: date_to_parsed = date.fromisoformat(date_to) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid date_to format: {date_to}. Use YYYY-MM-DD", ) # Validate sort parameters if sort_by not in ["filename", "date_taken", "date_added"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid sort_by: {sort_by}. Must be filename, date_taken, or date_added", ) if sort_dir not in ["asc", "desc"]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid sort_dir: {sort_dir}. Must be asc or desc", ) # Get videos videos, total = list_videos_for_identification( db=db, folder_path=folder_path, date_from=date_from_parsed, date_to=date_to_parsed, has_people=has_people, person_name=person_name, sort_by=sort_by, sort_dir=sort_dir, page=page, page_size=page_size, ) # Build response items items = [] for video in videos: # Get people for this video people_data = get_video_people(db, video.id) identified_people = [] for person, linkage in people_data: identified_people.append( PersonInfo( id=person.id, first_name=person.first_name, last_name=person.last_name, middle_name=person.middle_name, maiden_name=person.maiden_name, date_of_birth=person.date_of_birth, ) ) # Convert date_added to date if it's datetime date_added = video.date_added if hasattr(date_added, "date"): date_added = date_added.date() items.append( VideoListItem( id=video.id, filename=video.filename, path=video.path, date_taken=video.date_taken, date_added=date_added, identified_people=identified_people, identified_people_count=len(identified_people), ) ) return ListVideosResponse(items=items, page=page, page_size=page_size, total=total) @router.get("/{video_id}/people", response_model=VideoPeopleResponse) def get_video_people_endpoint( video_id: int, db: Session = Depends(get_db), ) -> VideoPeopleResponse: """Get all people identified in a video.""" # Verify video exists video = db.query(Photo).filter( Photo.id == video_id, Photo.media_type == "video" ).first() if not video: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Video {video_id} not found", ) # Get people people_data = get_video_people(db, video_id) people = [] for person, linkage in people_data: # Get username if identified_by_user_id exists username = None if linkage.identified_by_user_id: user = db.query(User).filter(User.id == linkage.identified_by_user_id).first() if user: username = user.username people.append( VideoPersonInfo( person_id=person.id, first_name=person.first_name, last_name=person.last_name, middle_name=person.middle_name, maiden_name=person.maiden_name, date_of_birth=person.date_of_birth, identified_by=username, identified_date=linkage.created_date, ) ) return VideoPeopleResponse(video_id=video_id, people=people) @router.post("/{video_id}/identify", response_model=IdentifyVideoResponse) def identify_person_in_video_endpoint( video_id: int, request: IdentifyVideoRequest, current_user: Annotated[dict, Depends(get_current_user_with_id)], db: Session = Depends(get_db), ) -> IdentifyVideoResponse: """Identify a person in a video.""" user_id = current_user.get("id") try: person, created_person = identify_person_in_video( db=db, video_id=video_id, person_id=request.person_id, first_name=request.first_name, last_name=request.last_name, middle_name=request.middle_name, maiden_name=request.maiden_name, date_of_birth=request.date_of_birth, user_id=user_id, ) message = ( f"Person '{person.first_name} {person.last_name}' identified in video" if not created_person else f"Created new person '{person.first_name} {person.last_name}' and identified in video" ) return IdentifyVideoResponse( video_id=video_id, person_id=person.id, created_person=created_person, message=message, ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) @router.delete("/{video_id}/people/{person_id}", response_model=RemoveVideoPersonResponse) def remove_person_from_video_endpoint( video_id: int, person_id: int, current_user: Annotated[dict, Depends(get_current_user_with_id)], db: Session = Depends(get_db), ) -> RemoveVideoPersonResponse: """Remove person identification from video.""" try: removed = remove_person_from_video( db=db, video_id=video_id, person_id=person_id, ) if removed: return RemoveVideoPersonResponse( video_id=video_id, person_id=person_id, removed=True, message=f"Person {person_id} removed from video {video_id}", ) else: return RemoveVideoPersonResponse( video_id=video_id, person_id=person_id, removed=False, message=f"Person {person_id} not found in video {video_id}", ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) @router.get("/{video_id}/thumbnail") def get_video_thumbnail( video_id: int, db: Session = Depends(get_db), ) -> FileResponse: """Get video thumbnail (generated on-demand and cached).""" # Verify video exists video = db.query(Photo).filter( Photo.id == video_id, Photo.media_type == "video" ).first() if not video: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Video {video_id} not found", ) # Generate or get cached thumbnail thumbnail_path = get_video_thumbnail_path(video.path) if not thumbnail_path or not thumbnail_path.exists(): raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate video thumbnail", ) # Return thumbnail with caching headers response = FileResponse( str(thumbnail_path), media_type="image/jpeg", ) response.headers["Cache-Control"] = "public, max-age=86400" # Cache for 1 day return response @router.get("/{video_id}/video") def get_video_file( video_id: int, db: Session = Depends(get_db), ) -> FileResponse: """Serve video file for playback.""" import os import mimetypes # Verify video exists video = db.query(Photo).filter( Photo.id == video_id, Photo.media_type == "video" ).first() if not video: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Video {video_id} not found", ) if not os.path.exists(video.path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Video file not found: {video.path}", ) # Determine media type from file extension media_type, _ = mimetypes.guess_type(video.path) if not media_type or not media_type.startswith('video/'): media_type = "video/mp4" # Use FileResponse with range request support for video streaming response = FileResponse( video.path, media_type=media_type, ) response.headers["Content-Disposition"] = "inline" response.headers["Accept-Ranges"] = "bytes" response.headers["Cache-Control"] = "public, max-age=3600" return response