punimtag/backend/api/videos.py
tanyar09 70923e0ecf
Some checks failed
CI / skip-ci-check (pull_request) Successful in 7s
CI / python-lint (pull_request) Has been cancelled
CI / test-backend (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / secret-scanning (pull_request) Has been cancelled
CI / dependency-scan (pull_request) Has been cancelled
CI / sast-scan (pull_request) Has been cancelled
CI / workflow-summary (pull_request) Has been cancelled
CI / lint-and-type-check (pull_request) Has been cancelled
feat: Enhance photo and video handling in admin frontend
- Added media_type to PhotoSearchResult interface to distinguish between images and videos.
- Updated PhotoViewer component to support video playback, including URL handling and preloading logic.
- Modified openPhoto function in Search page to open videos correctly.
- Enhanced backend API to serve video files with range request support for better streaming experience.

These changes improve the user experience by allowing seamless viewing of both images and videos in the application.
2026-01-28 17:45:45 +00:00

428 lines
14 KiB
Python

"""Video person identification endpoints."""
from __future__ import annotations
from datetime import date
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import FileResponse, Response, StreamingResponse
from sqlalchemy.orm import Session
from backend.db.session import get_db
from backend.db.models import Photo, User
from backend.api.auth import get_current_user_with_id
from backend.schemas.videos import (
ListVideosResponse,
VideoListItem,
PersonInfo,
VideoPeopleResponse,
VideoPersonInfo,
IdentifyVideoRequest,
IdentifyVideoResponse,
RemoveVideoPersonResponse,
)
from backend.services.video_service import (
list_videos_for_identification,
get_video_people,
identify_person_in_video,
remove_person_from_video,
get_video_people_count,
)
from backend.services.thumbnail_service import get_video_thumbnail_path
router = APIRouter(prefix="/videos", tags=["videos"])
@router.get("", response_model=ListVideosResponse)
def list_videos(
current_user: Annotated[dict, Depends(get_current_user_with_id)],
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
date_from: Optional[str] = Query(None, description="Filter by date taken (from, YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, description="Filter by date taken (to, YYYY-MM-DD)"),
has_people: Optional[bool] = Query(None, description="Filter videos with/without identified people"),
person_name: Optional[str] = Query(None, description="Filter videos containing person with this name"),
sort_by: str = Query("filename", description="Sort field: filename, date_taken, date_added"),
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
) -> ListVideosResponse:
"""List videos for person identification."""
# Parse date filters
date_from_parsed = None
date_to_parsed = None
if date_from:
try:
date_from_parsed = date.fromisoformat(date_from)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid date_from format: {date_from}. Use YYYY-MM-DD",
)
if date_to:
try:
date_to_parsed = date.fromisoformat(date_to)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid date_to format: {date_to}. Use YYYY-MM-DD",
)
# Validate sort parameters
if sort_by not in ["filename", "date_taken", "date_added"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid sort_by: {sort_by}. Must be filename, date_taken, or date_added",
)
if sort_dir not in ["asc", "desc"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid sort_dir: {sort_dir}. Must be asc or desc",
)
# Get videos
videos, total = list_videos_for_identification(
db=db,
folder_path=folder_path,
date_from=date_from_parsed,
date_to=date_to_parsed,
has_people=has_people,
person_name=person_name,
sort_by=sort_by,
sort_dir=sort_dir,
page=page,
page_size=page_size,
)
# Build response items
items = []
for video in videos:
# Get people for this video
people_data = get_video_people(db, video.id)
identified_people = []
for person, linkage in people_data:
identified_people.append(
PersonInfo(
id=person.id,
first_name=person.first_name,
last_name=person.last_name,
middle_name=person.middle_name,
maiden_name=person.maiden_name,
date_of_birth=person.date_of_birth,
)
)
# Convert date_added to date if it's datetime
date_added = video.date_added
if hasattr(date_added, "date"):
date_added = date_added.date()
items.append(
VideoListItem(
id=video.id,
filename=video.filename,
path=video.path,
date_taken=video.date_taken,
date_added=date_added,
identified_people=identified_people,
identified_people_count=len(identified_people),
)
)
return ListVideosResponse(items=items, page=page, page_size=page_size, total=total)
@router.get("/{video_id}/people", response_model=VideoPeopleResponse)
def get_video_people_endpoint(
video_id: int,
db: Session = Depends(get_db),
) -> VideoPeopleResponse:
"""Get all people identified in a video."""
# Verify video exists
video = db.query(Photo).filter(
Photo.id == video_id,
Photo.media_type == "video"
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Video {video_id} not found",
)
# Get people
people_data = get_video_people(db, video_id)
people = []
for person, linkage in people_data:
# Get username if identified_by_user_id exists
username = None
if linkage.identified_by_user_id:
user = db.query(User).filter(User.id == linkage.identified_by_user_id).first()
if user:
username = user.username
people.append(
VideoPersonInfo(
person_id=person.id,
first_name=person.first_name,
last_name=person.last_name,
middle_name=person.middle_name,
maiden_name=person.maiden_name,
date_of_birth=person.date_of_birth,
identified_by=username,
identified_date=linkage.created_date,
)
)
return VideoPeopleResponse(video_id=video_id, people=people)
@router.post("/{video_id}/identify", response_model=IdentifyVideoResponse)
def identify_person_in_video_endpoint(
video_id: int,
request: IdentifyVideoRequest,
current_user: Annotated[dict, Depends(get_current_user_with_id)],
db: Session = Depends(get_db),
) -> IdentifyVideoResponse:
"""Identify a person in a video."""
user_id = current_user.get("id")
try:
person, created_person = identify_person_in_video(
db=db,
video_id=video_id,
person_id=request.person_id,
first_name=request.first_name,
last_name=request.last_name,
middle_name=request.middle_name,
maiden_name=request.maiden_name,
date_of_birth=request.date_of_birth,
user_id=user_id,
)
message = (
f"Person '{person.first_name} {person.last_name}' identified in video"
if not created_person
else f"Created new person '{person.first_name} {person.last_name}' and identified in video"
)
return IdentifyVideoResponse(
video_id=video_id,
person_id=person.id,
created_person=created_person,
message=message,
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
@router.delete("/{video_id}/people/{person_id}", response_model=RemoveVideoPersonResponse)
def remove_person_from_video_endpoint(
video_id: int,
person_id: int,
current_user: Annotated[dict, Depends(get_current_user_with_id)],
db: Session = Depends(get_db),
) -> RemoveVideoPersonResponse:
"""Remove person identification from video."""
try:
removed = remove_person_from_video(
db=db,
video_id=video_id,
person_id=person_id,
)
if removed:
return RemoveVideoPersonResponse(
video_id=video_id,
person_id=person_id,
removed=True,
message=f"Person {person_id} removed from video {video_id}",
)
else:
return RemoveVideoPersonResponse(
video_id=video_id,
person_id=person_id,
removed=False,
message=f"Person {person_id} not found in video {video_id}",
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
@router.get("/{video_id}/thumbnail")
def get_video_thumbnail(
video_id: int,
db: Session = Depends(get_db),
) -> FileResponse:
"""Get video thumbnail (generated on-demand and cached)."""
# Verify video exists
video = db.query(Photo).filter(
Photo.id == video_id,
Photo.media_type == "video"
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Video {video_id} not found",
)
# Generate or get cached thumbnail
thumbnail_path = get_video_thumbnail_path(video.path)
if not thumbnail_path or not thumbnail_path.exists():
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate video thumbnail",
)
# Return thumbnail with caching headers
response = FileResponse(
str(thumbnail_path),
media_type="image/jpeg",
)
response.headers["Cache-Control"] = "public, max-age=86400" # Cache for 1 day
return response
@router.get("/{video_id}/video")
def get_video_file(
video_id: int,
request: Request,
db: Session = Depends(get_db),
):
"""Serve video file for playback with range request support."""
import os
import mimetypes
from starlette.responses import FileResponse
# Verify video exists
video = db.query(Photo).filter(
Photo.id == video_id,
Photo.media_type == "video"
).first()
if not video:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Video {video_id} not found",
)
if not os.path.exists(video.path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Video file not found: {video.path}",
)
# Determine media type from file extension
media_type, _ = mimetypes.guess_type(video.path)
if not media_type or not media_type.startswith('video/'):
media_type = "video/mp4"
file_size = os.path.getsize(video.path)
# Get range header - Starlette normalizes headers to lowercase
range_header = request.headers.get("range")
# Debug: Write to file to verify code execution
try:
with open("/tmp/video_debug.log", "a") as f:
all_headers = {k: v for k, v in request.headers.items()}
f.write(f"Video {video_id}: range_header={range_header}, all_headers={all_headers}\n")
if hasattr(request, 'scope'):
scope_headers = request.scope.get("headers", [])
f.write(f" scope headers: {scope_headers}\n")
f.flush()
except Exception as e:
with open("/tmp/video_debug.log", "a") as f:
f.write(f"Debug write error: {e}\n")
f.flush()
# Also check request scope directly as fallback
if not range_header and hasattr(request, 'scope'):
scope_headers = request.scope.get("headers", [])
for header_name, header_value in scope_headers:
if header_name.lower() == b"range":
range_header = header_value.decode() if isinstance(header_value, bytes) else header_value
with open("/tmp/video_debug.log", "a") as f:
f.write(f" Found range in scope: {range_header}\n")
f.flush()
break
if range_header:
try:
# Parse range header: "bytes=start-end"
range_match = range_header.replace("bytes=", "").split("-")
start_str = range_match[0].strip()
end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else ""
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1
# Validate range
if start < 0:
start = 0
if end >= file_size:
end = file_size - 1
if start > end:
return Response(
status_code=416,
headers={"Content-Range": f"bytes */{file_size}"}
)
# Read the requested chunk
chunk_size = end - start + 1
def generate_chunk():
with open(video.path, "rb") as f:
f.seek(start)
remaining = chunk_size
while remaining > 0:
chunk = f.read(min(8192, remaining))
if not chunk:
break
yield chunk
remaining -= len(chunk)
from fastapi.responses import StreamingResponse
return StreamingResponse(
generate_chunk(),
status_code=206,
headers={
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(chunk_size),
"Content-Type": media_type,
"Content-Disposition": "inline",
"Cache-Control": "public, max-age=3600",
},
media_type=media_type,
)
except (ValueError, IndexError):
# If range parsing fails, fall through to serve full file
pass
# No range request or parsing failed - serve full file with range support headers
response = FileResponse(
video.path,
media_type=media_type,
)
response.headers["Content-Disposition"] = "inline"
response.headers["Accept-Ranges"] = "bytes"
response.headers["Cache-Control"] = "public, max-age=3600"
return response