punimtag/backend/api/photos.py
tanyar09 70923e0ecf
Some checks failed
CI / skip-ci-check (pull_request) Successful in 7s
CI / python-lint (pull_request) Has been cancelled
CI / test-backend (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / secret-scanning (pull_request) Has been cancelled
CI / dependency-scan (pull_request) Has been cancelled
CI / sast-scan (pull_request) Has been cancelled
CI / workflow-summary (pull_request) Has been cancelled
CI / lint-and-type-check (pull_request) Has been cancelled
feat: Enhance photo and video handling in admin frontend
- Added media_type to PhotoSearchResult interface to distinguish between images and videos.
- Updated PhotoViewer component to support video playback, including URL handling and preloading logic.
- Modified openPhoto function in Search page to open videos correctly.
- Enhanced backend API to serve video files with range request support for better streaming experience.

These changes improve the user experience by allowing seamless viewing of both images and videos in the application.
2026-01-28 17:45:45 +00:00

1102 lines
42 KiB
Python

"""Photo management endpoints."""
from __future__ import annotations
from datetime import date, datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status
from fastapi.responses import JSONResponse, FileResponse, Response
from typing import Annotated
from rq import Queue
from redis import Redis
from sqlalchemy.orm import Session
from backend.db.session import get_db
from backend.api.auth import get_current_user
from backend.api.users import get_current_admin_user
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
queue = Queue(connection=redis_conn)
from backend.schemas.photos import (
PhotoImportRequest,
PhotoImportResponse,
PhotoResponse,
BulkAddFavoritesRequest,
BulkAddFavoritesResponse,
BulkDeletePhotosRequest,
BulkDeletePhotosResponse,
BulkRemoveFavoritesRequest,
BulkRemoveFavoritesResponse,
)
from backend.schemas.search import (
PhotoSearchResult,
SearchPhotosResponse,
)
from backend.services.photo_service import (
find_photos_in_folder,
import_photo_from_path,
)
from backend.services.search_service import (
get_favorite_photos,
get_photo_face_count,
get_photo_person,
get_photo_tags,
get_photos_without_faces,
get_photos_without_tags,
get_processed_photos,
get_unprocessed_photos,
search_photos_by_date,
search_photos_by_name,
search_photos_by_tags,
)
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/photos", tags=["photos"])
@router.get("", response_model=SearchPhotosResponse)
def search_photos(
current_user: Annotated[dict, Depends(get_current_user)],
search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"),
person_name: Optional[str] = Query(None, description="Person name for name search"),
tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"),
match_all: bool = Query(False, description="Match all tags (for tag search)"),
date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"),
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
) -> SearchPhotosResponse:
"""Search photos with filters.
Matches desktop search functionality exactly:
- Search by name: person_name required
- Search by date: date_from or date_to required
- Search by tags: tag_names required (comma-separated)
- Search no faces: returns photos without faces
- Search no tags: returns photos without tags
- Search processed: returns photos that have been processed for face detection
- Search unprocessed: returns photos that have not been processed for face detection
- Search favorites: returns photos favorited by current user
"""
from backend.db.models import PhotoFavorite
items: List[PhotoSearchResult] = []
total = 0
username = current_user["username"] if current_user else None
# Helper function to check if photo is favorite
def check_is_favorite(photo_id: int) -> bool:
if not username:
return False
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return favorite is not None
# Parse date filters for use as additional filters (when not using date search type)
df = date.fromisoformat(date_from) if date_from else None
dt = date.fromisoformat(date_to) if date_to else None
# Parse tag filters for use as additional filters
tag_list = None
if tag_names:
tag_list = [t.strip() for t in tag_names.split(",") if t.strip()]
if search_type == "name":
if not person_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="person_name is required for name search",
)
results, total = search_photos_by_name(
db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size
)
for photo, full_name in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=full_name,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "date":
if not date_from and not date_to:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one of date_from or date_to is required",
)
results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "tags":
if not tag_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="tag_names is required for tag search",
)
if not tag_list or len(tag_list) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one tag name is required",
)
results, total = search_photos_by_tags(
db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size
)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_faces":
results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=None,
tags=tags,
has_faces=False,
face_count=0,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_tags":
results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=[],
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "processed":
results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "unprocessed":
results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "favorites":
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required for favorites search",
)
results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
media_type=photo.media_type or "image",
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=True, # All results are favorites
)
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid search_type: {search_type}",
)
return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total)
@router.post("/import", response_model=PhotoImportResponse)
def import_photos(
request: PhotoImportRequest,
) -> PhotoImportResponse:
"""Import photos from a folder path.
This endpoint enqueues a background job to scan and import photos.
"""
if not request.folder_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="folder_path is required",
)
# Validate folder exists
import os
if not os.path.isdir(request.folder_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Folder not found: {request.folder_path}",
)
# Estimate number of photos (quick scan)
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
# Enqueue job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
"backend.services.tasks.import_photos_task",
request.folder_path,
request.recursive,
job_timeout="1h", # Allow up to 1 hour for large imports
)
return PhotoImportResponse(
job_id=job.id,
message=f"Photo import job queued for {request.folder_path}",
folder_path=request.folder_path,
estimated_photos=estimated_photos,
)
@router.post("/import/upload")
async def upload_photos(
files: list[UploadFile] = File(...),
db: Session = Depends(get_db),
) -> dict:
"""Upload photo files directly.
This endpoint accepts file uploads and imports them immediately.
Files are saved to PHOTO_STORAGE_DIR before import.
For large batches, prefer the /import endpoint with folder_path.
"""
import os
import shutil
from pathlib import Path
from backend.settings import PHOTO_STORAGE_DIR
# Ensure storage directory exists
storage_dir = Path(PHOTO_STORAGE_DIR)
storage_dir.mkdir(parents=True, exist_ok=True)
added_count = 0
existing_count = 0
errors = []
for file in files:
try:
# Generate unique filename to avoid conflicts
import uuid
file_ext = Path(file.filename).suffix
unique_filename = f"{uuid.uuid4()}{file_ext}"
stored_path = storage_dir / unique_filename
# Save uploaded file to storage
content = await file.read()
with open(stored_path, "wb") as f:
f.write(content)
# Import photo from stored location
photo, is_new = import_photo_from_path(db, str(stored_path))
if is_new:
added_count += 1
else:
existing_count += 1
# If photo already exists, delete duplicate upload
if os.path.exists(stored_path):
os.remove(stored_path)
except Exception as e:
errors.append(f"Error uploading {file.filename}: {str(e)}")
return {
"message": f"Uploaded {len(files)} files",
"added": added_count,
"existing": existing_count,
"errors": errors,
}
@router.post("/browse-folder")
def browse_folder() -> dict:
"""Open native folder picker dialog and return selected folder path.
Uses tkinter to show a native OS folder picker dialog.
Returns the full absolute path of the selected folder.
Returns:
dict with 'path' (str) and 'success' (bool) keys
"""
import os
import sys
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="tkinter is not available. Cannot show folder picker.",
)
try:
# Create root window (hidden)
root = tk.Tk()
root.withdraw() # Hide main window
root.attributes('-topmost', True) # Bring to front
# Show folder picker dialog
folder_path = filedialog.askdirectory(
title="Select folder to scan",
mustexist=True
)
# Clean up
root.destroy()
if folder_path:
# Normalize path to absolute - use multiple methods to ensure full path
# Handle network paths (UNC paths on Windows, mounted shares on Linux)
# Check if it's a Windows UNC path (\\server\share or //server/share)
# UNC paths are already absolute, but realpath may not work correctly with them
is_unc_path = folder_path.startswith('\\\\') or folder_path.startswith('//')
if is_unc_path:
# For UNC paths, normalize separators but don't use realpath
# (realpath may not work correctly with UNC paths on Windows)
# os.path.normpath() handles UNC paths correctly on Windows
normalized_path = os.path.normpath(folder_path)
else:
# For regular paths (local or mounted network shares on Linux)
# First convert to absolute, then resolve any symlinks, then normalize
abs_path = os.path.abspath(folder_path)
try:
# Resolve any symlinks to get the real path
# This may fail for some network paths, so wrap in try/except
real_path = os.path.realpath(abs_path)
except (OSError, ValueError):
# If realpath fails (e.g., for some network paths), use abspath result
real_path = abs_path
# Normalize the path (remove redundant separators, etc.)
normalized_path = os.path.normpath(real_path)
# Ensure we have a full absolute path (not just folder name)
if not os.path.isabs(normalized_path):
# If somehow still not absolute, try again with current working directory
normalized_path = os.path.abspath(normalized_path)
# Verify the path exists and is a directory
# Note: For network paths, this check might fail if network is temporarily down,
# but if the user just selected it via the folder picker, it should be accessible
if not os.path.exists(normalized_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Selected path does not exist: {normalized_path}",
)
if not os.path.isdir(normalized_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Selected path is not a directory: {normalized_path}",
)
return {
"path": normalized_path,
"success": True,
"message": f"Selected folder: {normalized_path}"
}
else:
return {
"path": "",
"success": False,
"message": "No folder selected"
}
except Exception as e:
# Handle errors gracefully
error_msg = str(e)
# Check for common issues (display/headless server)
if "display" in error_msg.lower() or "DISPLAY" not in os.environ:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No display available. Cannot show folder picker. "
"If running on a remote server, ensure X11 forwarding is enabled.",
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error showing folder picker: {error_msg}",
)
@router.get("/{photo_id}", response_model=PhotoResponse)
def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse:
"""Get photo by ID."""
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
return PhotoResponse.model_validate(photo)
@router.get("/{photo_id}/image")
def get_photo_image(
photo_id: int,
request: Request,
db: Session = Depends(get_db)
):
"""Serve photo image or video file for display (not download)."""
import os
import mimetypes
from backend.db.models import Photo
from starlette.responses import FileResponse
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
if not os.path.exists(photo.path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {photo.path}",
)
# If it's a video, handle range requests for video streaming
if photo.media_type == "video":
media_type, _ = mimetypes.guess_type(photo.path)
if not media_type or not media_type.startswith('video/'):
media_type = "video/mp4"
file_size = os.path.getsize(photo.path)
# Get range header - Starlette uses lowercase
range_header = request.headers.get("range")
# Debug: log what we're getting (remove after debugging)
if photo_id == 737: # Only for this specific video
import json
debug_info = {
"range_header": range_header,
"all_headers": dict(request.headers),
"header_keys": list(request.headers.keys())
}
print(f"DEBUG photo 737: {json.dumps(debug_info, indent=2)}")
if range_header:
try:
# Parse range header: "bytes=start-end" or "bytes=start-" or "bytes=-suffix"
range_match = range_header.replace("bytes=", "").split("-")
start_str = range_match[0].strip()
end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else ""
start = int(start_str) if start_str else 0
end = int(end_str) if end_str else file_size - 1
# Validate range
if start < 0:
start = 0
if end >= file_size:
end = file_size - 1
if start > end:
return Response(
status_code=416,
headers={"Content-Range": f"bytes */{file_size}"}
)
# Read the requested chunk
chunk_size = end - start + 1
with open(photo.path, "rb") as f:
f.seek(start)
chunk = f.read(chunk_size)
return Response(
content=chunk,
status_code=206,
headers={
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(chunk_size),
"Content-Type": media_type,
"Content-Disposition": "inline",
"Cache-Control": "public, max-age=3600",
},
media_type=media_type,
)
except (ValueError, IndexError) as e:
# If range parsing fails, fall through to serve full file
pass
# No range request or parsing failed - serve full file with range support headers
response = FileResponse(
photo.path,
media_type=media_type,
)
response.headers["Content-Disposition"] = "inline"
response.headers["Accept-Ranges"] = "bytes"
response.headers["Cache-Control"] = "public, max-age=3600"
return response
# Determine media type from file extension for images
media_type, _ = mimetypes.guess_type(photo.path)
if not media_type or not media_type.startswith('image/'):
media_type = "image/jpeg"
# Use FileResponse but set headers to display inline (not download)
response = FileResponse(
photo.path,
media_type=media_type,
)
# Set Content-Disposition to inline so browser displays instead of downloads
response.headers["Content-Disposition"] = "inline"
response.headers["Cache-Control"] = "public, max-age=3600"
return response
@router.post("/{photo_id}/toggle-favorite")
def toggle_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Toggle favorite status of a photo for current user."""
from backend.db.models import Photo, PhotoFavorite
username = current_user["username"]
# Verify photo exists
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Check if already favorited
existing = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
if existing:
# Remove favorite
db.delete(existing)
is_favorite = False
else:
# Add favorite
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
is_favorite = True
db.commit()
return {
"photo_id": photo_id,
"is_favorite": is_favorite,
"message": "Favorite status updated"
}
@router.get("/{photo_id}/is-favorite")
def check_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Check if photo is favorited by current user."""
from backend.db.models import PhotoFavorite
username = current_user["username"]
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return {
"photo_id": photo_id,
"is_favorite": favorite is not None
}
@router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse)
def bulk_add_favorites(
request: BulkAddFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkAddFavoritesResponse:
"""Add multiple photos to favorites for current user.
Only adds favorites for photos that aren't already favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only add favorites for photos that aren't already favorites
photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids]
added_count = 0
for photo_id in photos_to_add:
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
added_count += 1
db.commit()
return BulkAddFavoritesResponse(
message=f"Added {added_count} photo(s) to favorites",
added_count=added_count,
already_favorite_count=len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse)
def bulk_remove_favorites(
request: BulkRemoveFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkRemoveFavoritesResponse:
"""Remove multiple photos from favorites for current user.
Only removes favorites for photos that are currently favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only remove favorites for photos that are currently favorites
photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids]
removed_count = 0
for favorite in existing_favorites:
if favorite.photo_id in photos_to_remove:
db.delete(favorite)
removed_count += 1
db.commit()
return BulkRemoveFavoritesResponse(
message=f"Removed {removed_count} photo(s) from favorites",
removed_count=removed_count,
not_favorite_count=len(photo_ids) - len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-delete", response_model=BulkDeletePhotosResponse)
def bulk_delete_photos(
request: BulkDeletePhotosRequest,
current_admin: Annotated[dict, Depends(get_current_admin_user)],
db: Session = Depends(get_db),
) -> BulkDeletePhotosResponse:
"""Delete multiple photos and all related data (faces, encodings, tags, favorites)."""
from backend.db.models import Photo, PhotoTagLinkage
photo_ids = list(dict.fromkeys(request.photo_ids))
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
try:
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = sorted(set(photo_ids) - found_ids)
deleted_count = 0
for photo in photos:
# Remove tag linkages explicitly (in addition to cascade) to keep counts accurate
db.query(PhotoTagLinkage).filter(
PhotoTagLinkage.photo_id == photo.id
).delete(synchronize_session=False)
db.delete(photo)
deleted_count += 1
db.commit()
except HTTPException:
db.rollback()
raise
except Exception as exc: # pragma: no cover - safety net
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete photos: {exc}",
)
admin_username = current_admin.get("username", "unknown")
message_parts = [f"Deleted {deleted_count} photo(s)"]
if missing_ids:
message_parts.append(f"{len(missing_ids)} photo(s) not found")
message_parts.append(f"Request by admin: {admin_username}")
return BulkDeletePhotosResponse(
message="; ".join(message_parts),
deleted_count=deleted_count,
missing_photo_ids=missing_ids,
)
@router.post("/{photo_id}/open-folder")
def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict:
"""Open the folder containing the photo in the system file manager and select the file.
Matches desktop behavior and enhances it by selecting the specific file:
- Windows: uses explorer /select,"file_path"
- macOS: uses 'open -R' to reveal and select the file
- Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder
"""
import os
import sys
import subprocess
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Ensure we have absolute path
file_path = os.path.abspath(photo.path)
folder = os.path.dirname(file_path)
if not os.path.exists(file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {file_path}",
)
if not os.path.exists(folder):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Folder not found: {folder}",
)
try:
# Try using showinfm package first (better cross-platform support)
try:
from showinfm import show_in_file_manager
show_in_file_manager(file_path)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except ImportError:
# showinfm not installed, fall back to manual commands
pass
except Exception as e:
# showinfm failed, fall back to manual commands
pass
# Fallback: Open folder and select the file using platform-specific commands
if os.name == "nt": # Windows
# Windows: explorer /select,"file_path" opens folder and selects the file
subprocess.run(["explorer", "/select,", file_path], check=False)
elif sys.platform == "darwin": # macOS
# macOS: open -R reveals the file in Finder and selects it
subprocess.run(["open", "-R", file_path], check=False)
else: # Linux and others
# Linux: Try file manager-specific commands to select the file
# Try different file managers based on desktop environment
opened = False
# Detect desktop environment first
desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
# Try Nautilus (GNOME) - supports --select option
if "gnome" in desktop_env or not desktop_env:
try:
result = subprocess.run(
["nautilus", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation
if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env):
try:
# For Nemo, we can try opening the folder first, then using a script or
# just opening the folder (Nemo may focus on the file if we pass it)
# Try opening with the file path - Nemo will open the folder
file_uri = f"file://{file_path}"
result = subprocess.Popen(
["nemo", file_uri],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Nemo will open the folder - selection may not work perfectly
# This is a limitation of Nemo
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Dolphin (KDE) - supports --select option
if not opened and "kde" in desktop_env:
try:
result = subprocess.run(
["dolphin", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try PCManFM (LXDE/LXQt) - supports --select option
if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env):
try:
result = subprocess.run(
["pcmanfm", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Thunar (XFCE) - supports --select option
if not opened and "xfce" in desktop_env:
try:
result = subprocess.run(
["thunar", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# If desktop-specific didn't work, try all file managers in order
if not opened:
file_managers = [
("nautilus", ["--select", file_path]),
("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI
("dolphin", ["--select", file_path]),
("thunar", ["--select", file_path]),
("pcmanfm", ["--select", file_path]),
]
for fm_name, fm_args in file_managers:
try:
result = subprocess.run(
[fm_name] + fm_args,
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
break
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
# Fallback: try xdg-open with the folder (will open folder but won't select file)
if not opened:
subprocess.run(["xdg-open", folder], check=False)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to open folder: {str(e)}",
)