punimtag/backend/api/photos.py
tanyar09 7a981b069a
All checks were successful
CI / skip-ci-check (pull_request) Successful in 8s
CI / lint-and-type-check (pull_request) Successful in 1m12s
CI / python-lint (pull_request) Successful in 36s
CI / test-backend (pull_request) Successful in 3m47s
CI / build (pull_request) Successful in 3m28s
CI / secret-scanning (pull_request) Successful in 14s
CI / dependency-scan (pull_request) Successful in 13s
CI / sast-scan (pull_request) Successful in 1m33s
CI / workflow-summary (pull_request) Successful in 5s
feat: Enhance logging and error handling for job streaming and photo uploads
- Added new logging scripts for quick access to service logs and troubleshooting.
- Updated job streaming API to support authentication via query parameters for EventSource.
- Improved photo upload process to capture and validate EXIF dates and original modification times.
- Enhanced error handling for file uploads and EXIF extraction failures.
- Introduced new configuration options in ecosystem.config.js to prevent infinite crash loops.
2026-02-04 19:30:05 +00:00

1345 lines
52 KiB
Python

"""Photo management endpoints."""
from __future__ import annotations
from datetime import date, datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status, Request
from fastapi.responses import JSONResponse, FileResponse, Response
from typing import Annotated
from rq import Queue
from redis import Redis
from sqlalchemy.orm import Session
from backend.db.session import get_db
from backend.api.auth import get_current_user
from backend.api.users import get_current_admin_user
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
queue = Queue(connection=redis_conn)
from backend.schemas.photos import (
PhotoImportRequest,
PhotoImportResponse,
PhotoResponse,
BulkAddFavoritesRequest,
BulkAddFavoritesResponse,
BulkDeletePhotosRequest,
BulkDeletePhotosResponse,
BulkRemoveFavoritesRequest,
BulkRemoveFavoritesResponse,
BrowseDirectoryResponse,
DirectoryItem,
)
from backend.schemas.search import (
PhotoSearchResult,
SearchPhotosResponse,
)
from backend.services.photo_service import (
find_photos_in_folder,
import_photo_from_path,
)
from backend.services.search_service import (
get_favorite_photos,
get_photo_face_count,
get_photo_person,
get_photo_tags,
get_photos_without_faces,
get_photos_without_tags,
get_processed_photos,
get_unprocessed_photos,
search_photos_by_date,
search_photos_by_name,
search_photos_by_tags,
)
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/photos", tags=["photos"])
@router.get("", response_model=SearchPhotosResponse)
def search_photos(
current_user: Annotated[dict, Depends(get_current_user)],
search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"),
person_name: Optional[str] = Query(None, description="Person name for name search"),
tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"),
match_all: bool = Query(False, description="Match all tags (for tag search)"),
date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"),
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
) -> SearchPhotosResponse:
"""Search photos with filters.
Matches desktop search functionality exactly:
- Search by name: person_name required
- Search by date: date_from or date_to required
- Search by tags: tag_names required (comma-separated)
- Search no faces: returns photos without faces
- Search no tags: returns photos without tags
- Search processed: returns photos that have been processed for face detection
- Search unprocessed: returns photos that have not been processed for face detection
- Search favorites: returns photos favorited by current user
"""
from backend.db.models import PhotoFavorite
items: List[PhotoSearchResult] = []
total = 0
username = current_user["username"] if current_user else None
# Helper function to check if photo is favorite
def check_is_favorite(photo_id: int) -> bool:
if not username:
return False
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return favorite is not None
# Parse date filters for use as additional filters (when not using date search type)
df = date.fromisoformat(date_from) if date_from else None
dt = date.fromisoformat(date_to) if date_to else None
# Parse tag filters for use as additional filters
tag_list = None
if tag_names:
tag_list = [t.strip() for t in tag_names.split(",") if t.strip()]
if search_type == "name":
if not person_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="person_name is required for name search",
)
results, total = search_photos_by_name(
db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size
)
for photo, full_name in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=full_name,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "date":
if not date_from and not date_to:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one of date_from or date_to is required",
)
results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "tags":
if not tag_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="tag_names is required for tag search",
)
if not tag_list or len(tag_list) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one tag name is required",
)
results, total = search_photos_by_tags(
db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size
)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_faces":
results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=None,
tags=tags,
has_faces=False,
face_count=0,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_tags":
results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=[],
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "processed":
results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "unprocessed":
results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "favorites":
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required for favorites search",
)
results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=True, # All results are favorites
)
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid search_type: {search_type}",
)
return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total)
@router.post("/import", response_model=PhotoImportResponse)
def import_photos(
request: PhotoImportRequest,
current_user: Annotated[dict, Depends(get_current_user)],
) -> PhotoImportResponse:
"""Import photos from a folder path.
This endpoint enqueues a background job to scan and import photos.
"""
if not request.folder_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="folder_path is required",
)
# Validate folder exists
import os
if not os.path.isdir(request.folder_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Folder not found: {request.folder_path}",
)
# Estimate number of photos (quick scan)
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
# Enqueue job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
"backend.services.tasks.import_photos_task",
request.folder_path,
request.recursive,
job_timeout="1h", # Allow up to 1 hour for large imports
)
return PhotoImportResponse(
job_id=job.id,
message=f"Photo import job queued for {request.folder_path}",
folder_path=request.folder_path,
estimated_photos=estimated_photos,
)
@router.post("/import/upload")
async def upload_photos(
request: Request,
db: Session = Depends(get_db),
) -> dict:
"""Upload photo files directly.
This endpoint accepts file uploads and imports them immediately.
Files are saved to PHOTO_STORAGE_DIR before import.
For large batches, prefer the /import endpoint with folder_path.
"""
import os
import shutil
from pathlib import Path
from datetime import datetime, date
from backend.settings import PHOTO_STORAGE_DIR
# Ensure storage directory exists
storage_dir = Path(PHOTO_STORAGE_DIR)
storage_dir.mkdir(parents=True, exist_ok=True)
added_count = 0
existing_count = 0
errors = []
# Read form data first to get both files and metadata
form_data = await request.form()
import logging
logger = logging.getLogger(__name__)
# Extract file metadata (EXIF dates and original modification timestamps) from form data
# These are captured from the ORIGINAL file BEFORE upload, so they preserve the real dates
file_original_mtime = {}
file_exif_dates = {}
files = []
# Extract files first using getlist (handles multiple files with same key)
files = form_data.getlist('files')
# Extract metadata from form data
for key, value in form_data.items():
if key.startswith('file_exif_date_'):
# Extract EXIF date from browser (format: file_exif_date_<filename>)
filename = key.replace('file_exif_date_', '')
file_exif_dates[filename] = str(value)
elif key.startswith('file_original_mtime_'):
# Extract original file modification time from browser (format: file_original_mtime_<filename>)
# This is the modification date from the ORIGINAL file before upload
filename = key.replace('file_original_mtime_', '')
try:
file_original_mtime[filename] = int(value)
except (ValueError, TypeError) as e:
logger.debug(f"Could not parse original mtime for {filename}: {e}")
# If no files found in form_data, try to get them from request directly
if not files:
# Fallback: try to get files from request.files() if available
try:
if hasattr(request, '_form'):
form = await request.form()
files = form.getlist('files')
except:
pass
if not files:
raise HTTPException(status_code=400, detail="No files provided")
for file in files:
try:
# Generate unique filename to avoid conflicts
import uuid
file_ext = Path(file.filename).suffix
unique_filename = f"{uuid.uuid4()}{file_ext}"
stored_path = storage_dir / unique_filename
# Save uploaded file to storage
content = await file.read()
with open(stored_path, "wb") as f:
f.write(content)
# Extract date metadata from browser BEFORE upload
# Priority: 1) Browser EXIF date, 2) Original file modification date (from before upload)
# This ensures we use the ORIGINAL file's metadata, not the server's copy
browser_exif_date = None
file_last_modified = None
# First try: Use EXIF date extracted in browser (from original file)
if file.filename in file_exif_dates:
exif_date_str = file_exif_dates[file.filename]
logger.info(f"[UPLOAD] Found browser EXIF date for {file.filename}: {exif_date_str}")
try:
# Parse EXIF date string (format: "YYYY:MM:DD HH:MM:SS" or ISO format)
from dateutil import parser
exif_datetime = parser.parse(exif_date_str)
browser_exif_date = exif_datetime.date()
# Validate the date
if browser_exif_date > date.today() or browser_exif_date < date(1900, 1, 1):
logger.warning(f"[UPLOAD] Browser EXIF date {browser_exif_date} is invalid for {file.filename}, trying original mtime")
browser_exif_date = None
else:
logger.info(f"[UPLOAD] Parsed browser EXIF date: {browser_exif_date} for {file.filename}")
except Exception as e:
logger.warning(f"[UPLOAD] Could not parse browser EXIF date '{exif_date_str}' for {file.filename}: {e}, trying original mtime")
browser_exif_date = None
else:
logger.debug(f"[UPLOAD] No browser EXIF date found for {file.filename}")
# Second try: Use original file modification time (captured BEFORE upload)
if file.filename in file_original_mtime:
timestamp_ms = file_original_mtime[file.filename]
logger.info(f"[UPLOAD] Found original mtime for {file.filename}: {timestamp_ms}")
try:
file_last_modified = datetime.fromtimestamp(timestamp_ms / 1000.0).date()
# Validate the date
if file_last_modified > date.today() or file_last_modified < date(1900, 1, 1):
logger.warning(f"[UPLOAD] Original file mtime {file_last_modified} is invalid for {file.filename}")
file_last_modified = None
else:
logger.info(f"[UPLOAD] Parsed original mtime: {file_last_modified} for {file.filename}")
except (ValueError, OSError) as e:
logger.warning(f"[UPLOAD] Could not parse original mtime timestamp {timestamp_ms} for {file.filename}: {e}")
file_last_modified = None
else:
logger.debug(f"[UPLOAD] No original mtime found for {file.filename}")
logger.info(f"[UPLOAD] Calling import_photo_from_path for {file.filename} with browser_exif_date={browser_exif_date}, file_last_modified={file_last_modified}")
# Import photo from stored location
# Pass browser-extracted EXIF date and file modification time separately
# Priority: browser_exif_date > server EXIF extraction > file_last_modified
photo, is_new = import_photo_from_path(
db,
str(stored_path),
is_uploaded_file=True,
file_last_modified=file_last_modified,
browser_exif_date=browser_exif_date
)
if is_new:
added_count += 1
else:
existing_count += 1
# If photo already exists, delete duplicate upload
if os.path.exists(stored_path):
os.remove(stored_path)
except Exception as e:
errors.append(f"Error uploading {file.filename}: {str(e)}")
return {
"message": f"Uploaded {len(files)} files",
"added": added_count,
"existing": existing_count,
"errors": errors,
}
@router.get("/browse-directory", response_model=BrowseDirectoryResponse)
def browse_directory(
current_user: Annotated[dict, Depends(get_current_user)],
path: str = Query("/", description="Directory path to list"),
) -> BrowseDirectoryResponse:
"""List directories and files in a given path.
No GUI required - uses os.listdir() to read filesystem.
Returns JSON with directory structure for web-based folder browser.
Args:
path: Directory path to list (can be relative or absolute)
Returns:
BrowseDirectoryResponse with current path, parent path, and items list
Raises:
HTTPException: If path doesn't exist, is not a directory, or access is denied
"""
import os
from pathlib import Path
try:
# Convert to absolute path
abs_path = os.path.abspath(path)
# Normalize path separators
abs_path = os.path.normpath(abs_path)
# Security: Optional - restrict to certain base paths
# For now, allow any path (server admin should configure file permissions)
# You can uncomment and customize this for production:
# allowed_bases = ["/home", "/mnt", "/opt/punimtag", "/media"]
# if not any(abs_path.startswith(base) for base in allowed_bases):
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail=f"Path not allowed: {abs_path}"
# )
if not os.path.exists(abs_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Path does not exist: {abs_path}",
)
if not os.path.isdir(abs_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Path is not a directory: {abs_path}",
)
# Read directory contents
items = []
try:
for item in os.listdir(abs_path):
item_path = os.path.join(abs_path, item)
full_path = os.path.abspath(item_path)
# Skip if we can't access it (permission denied)
try:
is_dir = os.path.isdir(full_path)
is_file = os.path.isfile(full_path)
except (OSError, PermissionError):
# Skip items we can't access
continue
items.append(
DirectoryItem(
name=item,
path=full_path,
is_directory=is_dir,
is_file=is_file,
)
)
except PermissionError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied reading directory: {abs_path}",
)
# Sort: directories first, then files, both alphabetically
items.sort(key=lambda x: (not x.is_directory, x.name.lower()))
# Get parent path (None if at root)
parent_path = None
if abs_path != "/" and abs_path != os.path.dirname(abs_path):
parent_path = os.path.dirname(abs_path)
# Normalize parent path
parent_path = os.path.normpath(parent_path)
return BrowseDirectoryResponse(
current_path=abs_path,
parent_path=parent_path,
items=items,
)
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error reading directory: {str(e)}",
)
@router.post("/browse-folder")
def browse_folder() -> dict:
"""Open native folder picker dialog and return selected folder path.
Uses tkinter to show a native OS folder picker dialog.
Returns the full absolute path of the selected folder.
Returns:
dict with 'path' (str) and 'success' (bool) keys
"""
import os
import sys
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="tkinter is not available. Cannot show folder picker.",
)
try:
# Create root window (hidden)
root = tk.Tk()
root.withdraw() # Hide main window
root.attributes('-topmost', True) # Bring to front
# Show folder picker dialog
folder_path = filedialog.askdirectory(
title="Select folder to scan",
mustexist=True
)
# Clean up
root.destroy()
if folder_path:
# Normalize path to absolute - use multiple methods to ensure full path
# Handle network paths (UNC paths on Windows, mounted shares on Linux)
# Check if it's a Windows UNC path (\\server\share or //server/share)
# UNC paths are already absolute, but realpath may not work correctly with them
is_unc_path = folder_path.startswith('\\\\') or folder_path.startswith('//')
if is_unc_path:
# For UNC paths, normalize separators but don't use realpath
# (realpath may not work correctly with UNC paths on Windows)
# os.path.normpath() handles UNC paths correctly on Windows
normalized_path = os.path.normpath(folder_path)
else:
# For regular paths (local or mounted network shares on Linux)
# First convert to absolute, then resolve any symlinks, then normalize
abs_path = os.path.abspath(folder_path)
try:
# Resolve any symlinks to get the real path
# This may fail for some network paths, so wrap in try/except
real_path = os.path.realpath(abs_path)
except (OSError, ValueError):
# If realpath fails (e.g., for some network paths), use abspath result
real_path = abs_path
# Normalize the path (remove redundant separators, etc.)
normalized_path = os.path.normpath(real_path)
# Ensure we have a full absolute path (not just folder name)
if not os.path.isabs(normalized_path):
# If somehow still not absolute, try again with current working directory
normalized_path = os.path.abspath(normalized_path)
# Verify the path exists and is a directory
# Note: For network paths, this check might fail if network is temporarily down,
# but if the user just selected it via the folder picker, it should be accessible
if not os.path.exists(normalized_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Selected path does not exist: {normalized_path}",
)
if not os.path.isdir(normalized_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Selected path is not a directory: {normalized_path}",
)
return {
"path": normalized_path,
"success": True,
"message": f"Selected folder: {normalized_path}"
}
else:
return {
"path": "",
"success": False,
"message": "No folder selected"
}
except Exception as e:
# Handle errors gracefully
error_msg = str(e)
# Check for common issues (display/headless server)
if "display" in error_msg.lower() or "DISPLAY" not in os.environ:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No display available. Cannot show folder picker. "
"If running on a remote server, ensure X11 forwarding is enabled.",
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error showing folder picker: {error_msg}",
)
@router.get("/{photo_id}", response_model=PhotoResponse)
def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse:
"""Get photo by ID."""
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
return PhotoResponse.model_validate(photo)
@router.get("/{photo_id}/image")
def get_photo_image(
photo_id: int,
request: Request,
db: Session = Depends(get_db)
):
"""Serve photo image or video file for display (not download)."""
import os
import mimetypes
from backend.db.models import Photo
from starlette.responses import FileResponse
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
if not os.path.exists(photo.path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {photo.path}",
)
# If it's a video, handle range requests for video streaming
if photo.media_type == "video":
media_type, _ = mimetypes.guess_type(photo.path)
if not media_type or not media_type.startswith('video/'):
media_type = "video/mp4"
file_size = os.path.getsize(photo.path)
# Get range header - Starlette uses lowercase
range_header = request.headers.get("range")
# Debug: log what we're getting (remove after debugging)
if photo_id == 737: # Only for this specific video
import json
debug_info = {
"range_header": range_header,
"all_headers": dict(request.headers),
"header_keys": list(request.headers.keys())
}
print(f"DEBUG photo 737: {json.dumps(debug_info, indent=2)}")
if range_header:
try:
# Parse range header: "bytes=start-end" or "bytes=start-" or "bytes=-suffix"
range_match = range_header.replace("bytes=", "").split("-")
start_str = range_match[0].strip()
end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else ""
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1
# Validate range
if start < 0:
start = 0
if end >= file_size:
end = file_size - 1
if start > end:
return Response(
status_code=416,
headers={"Content-Range": f"bytes */{file_size}"}
)
# Read the requested chunk
chunk_size = end - start + 1
with open(photo.path, "rb") as f:
f.seek(start)
chunk = f.read(chunk_size)
return Response(
content=chunk,
status_code=206,
headers={
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(chunk_size),
"Content-Type": media_type,
"Content-Disposition": "inline",
"Cache-Control": "public, max-age=3600",
},
media_type=media_type,
)
except (ValueError, IndexError) as e:
# If range parsing fails, fall through to serve full file
pass
# No range request or parsing failed - serve full file with range support headers
response = FileResponse(
photo.path,
media_type=media_type,
)
response.headers["Content-Disposition"] = "inline"
response.headers["Accept-Ranges"] = "bytes"
response.headers["Cache-Control"] = "public, max-age=3600"
return response
# Determine media type from file extension for images
media_type, _ = mimetypes.guess_type(photo.path)
if not media_type or not media_type.startswith('image/'):
media_type = "image/jpeg"
# Use FileResponse but set headers to display inline (not download)
response = FileResponse(
photo.path,
media_type=media_type,
)
# Set Content-Disposition to inline so browser displays instead of downloads
response.headers["Content-Disposition"] = "inline"
response.headers["Cache-Control"] = "public, max-age=3600"
return response
@router.post("/{photo_id}/toggle-favorite")
def toggle_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Toggle favorite status of a photo for current user."""
from backend.db.models import Photo, PhotoFavorite
username = current_user["username"]
# Verify photo exists
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Check if already favorited
existing = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
if existing:
# Remove favorite
db.delete(existing)
is_favorite = False
else:
# Add favorite
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
is_favorite = True
db.commit()
return {
"photo_id": photo_id,
"is_favorite": is_favorite,
"message": "Favorite status updated"
}
@router.get("/{photo_id}/is-favorite")
def check_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Check if photo is favorited by current user."""
from backend.db.models import PhotoFavorite
username = current_user["username"]
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return {
"photo_id": photo_id,
"is_favorite": favorite is not None
}
@router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse)
def bulk_add_favorites(
request: BulkAddFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkAddFavoritesResponse:
"""Add multiple photos to favorites for current user.
Only adds favorites for photos that aren't already favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only add favorites for photos that aren't already favorites
photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids]
added_count = 0
for photo_id in photos_to_add:
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
added_count += 1
db.commit()
return BulkAddFavoritesResponse(
message=f"Added {added_count} photo(s) to favorites",
added_count=added_count,
already_favorite_count=len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse)
def bulk_remove_favorites(
request: BulkRemoveFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkRemoveFavoritesResponse:
"""Remove multiple photos from favorites for current user.
Only removes favorites for photos that are currently favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only remove favorites for photos that are currently favorites
photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids]
removed_count = 0
for favorite in existing_favorites:
if favorite.photo_id in photos_to_remove:
db.delete(favorite)
removed_count += 1
db.commit()
return BulkRemoveFavoritesResponse(
message=f"Removed {removed_count} photo(s) from favorites",
removed_count=removed_count,
not_favorite_count=len(photo_ids) - len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-delete", response_model=BulkDeletePhotosResponse)
def bulk_delete_photos(
request: BulkDeletePhotosRequest,
current_admin: Annotated[dict, Depends(get_current_admin_user)],
db: Session = Depends(get_db),
) -> BulkDeletePhotosResponse:
"""Delete multiple photos and all related data (faces, encodings, tags, favorites).
If a photo's file is in the uploads folder, it will also be deleted from the filesystem
to prevent duplicate uploads.
"""
import os
import logging
from pathlib import Path
from backend.db.models import Photo, PhotoTagLinkage
from backend.settings import PHOTO_STORAGE_DIR
logger = logging.getLogger(__name__)
photo_ids = list(dict.fromkeys(request.photo_ids))
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
# Get the uploads folder path for comparison
uploads_dir = Path(PHOTO_STORAGE_DIR).resolve()
try:
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = sorted(set(photo_ids) - found_ids)
deleted_count = 0
files_deleted_count = 0
for photo in photos:
# Only delete file from filesystem if it's directly in the uploads folder
# Do NOT delete files from other folders (main photo storage, etc.)
photo_path = Path(photo.path).resolve()
# Strict check: only delete if parent directory is exactly the uploads folder
if photo_path.parent == uploads_dir:
try:
if photo_path.exists():
os.remove(photo_path)
files_deleted_count += 1
logger.warning(f"DELETED file from uploads folder: {photo_path} (Photo ID: {photo.id})")
else:
logger.warning(f"Photo file not found (already deleted?): {photo_path} (Photo ID: {photo.id})")
except OSError as e:
logger.error(f"Failed to delete file {photo_path} (Photo ID: {photo.id}): {e}")
# Continue with database deletion even if file deletion fails
else:
# File is not in uploads folder - do not delete from filesystem
logger.info(f"Photo {photo.id} is not in uploads folder (path: {photo_path.parent}, uploads: {uploads_dir}), skipping file deletion")
# Remove tag linkages explicitly (in addition to cascade) to keep counts accurate
db.query(PhotoTagLinkage).filter(
PhotoTagLinkage.photo_id == photo.id
).delete(synchronize_session=False)
db.delete(photo)
deleted_count += 1
db.commit()
except HTTPException:
db.rollback()
raise
except Exception as exc: # pragma: no cover - safety net
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete photos: {exc}",
)
admin_username = current_admin.get("username", "unknown")
message_parts = [f"Deleted {deleted_count} photo(s)"]
if files_deleted_count > 0:
message_parts.append(f"{files_deleted_count} file(s) removed from uploads folder")
if missing_ids:
message_parts.append(f"{len(missing_ids)} photo(s) not found")
message_parts.append(f"Request by admin: {admin_username}")
return BulkDeletePhotosResponse(
message="; ".join(message_parts),
deleted_count=deleted_count,
missing_photo_ids=missing_ids,
)
@router.post("/{photo_id}/open-folder")
def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict:
"""Open the folder containing the photo in the system file manager and select the file.
Matches desktop behavior and enhances it by selecting the specific file:
- Windows: uses explorer /select,"file_path"
- macOS: uses 'open -R' to reveal and select the file
- Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder
"""
import os
import sys
import subprocess
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Ensure we have absolute path
file_path = os.path.abspath(photo.path)
folder = os.path.dirname(file_path)
if not os.path.exists(file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {file_path}",
)
if not os.path.exists(folder):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Folder not found: {folder}",
)
try:
# Try using showinfm package first (better cross-platform support)
try:
from showinfm import show_in_file_manager
show_in_file_manager(file_path)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except ImportError:
# showinfm not installed, fall back to manual commands
pass
except Exception as e:
# showinfm failed, fall back to manual commands
pass
# Fallback: Open folder and select the file using platform-specific commands
if os.name == "nt": # Windows
# Windows: explorer /select,"file_path" opens folder and selects the file
subprocess.run(["explorer", "/select,", file_path], check=False)
elif sys.platform == "darwin": # macOS
# macOS: open -R reveals the file in Finder and selects it
subprocess.run(["open", "-R", file_path], check=False)
else: # Linux and others
# Linux: Try file manager-specific commands to select the file
# Try different file managers based on desktop environment
opened = False
# Detect desktop environment first
desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
# Try Nautilus (GNOME) - supports --select option
if "gnome" in desktop_env or not desktop_env:
try:
result = subprocess.run(
["nautilus", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation
if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env):
try:
# For Nemo, we can try opening the folder first, then using a script or
# just opening the folder (Nemo may focus on the file if we pass it)
# Try opening with the file path - Nemo will open the folder
file_uri = f"file://{file_path}"
result = subprocess.Popen(
["nemo", file_uri],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Nemo will open the folder - selection may not work perfectly
# This is a limitation of Nemo
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Dolphin (KDE) - supports --select option
if not opened and "kde" in desktop_env:
try:
result = subprocess.run(
["dolphin", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try PCManFM (LXDE/LXQt) - supports --select option
if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env):
try:
result = subprocess.run(
["pcmanfm", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Thunar (XFCE) - supports --select option
if not opened and "xfce" in desktop_env:
try:
result = subprocess.run(
["thunar", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# If desktop-specific didn't work, try all file managers in order
if not opened:
file_managers = [
("nautilus", ["--select", file_path]),
("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI
("dolphin", ["--select", file_path]),
("thunar", ["--select", file_path]),
("pcmanfm", ["--select", file_path]),
]
for fm_name, fm_args in file_managers:
try:
result = subprocess.run(
[fm_name] + fm_args,
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
break
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
# Fallback: try xdg-open with the folder (will open folder but won't select file)
if not opened:
subprocess.run(["xdg-open", folder], check=False)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to open folder: {str(e)}",
)