Some checks failed
CI / skip-ci-check (pull_request) Successful in 7s
CI / python-lint (pull_request) Has been cancelled
CI / test-backend (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / secret-scanning (pull_request) Has been cancelled
CI / dependency-scan (pull_request) Has been cancelled
CI / sast-scan (pull_request) Has been cancelled
CI / workflow-summary (pull_request) Has been cancelled
CI / lint-and-type-check (pull_request) Has been cancelled
- Added media_type to PhotoSearchResult interface to distinguish between images and videos. - Updated PhotoViewer component to support video playback, including URL handling and preloading logic. - Modified openPhoto function in Search page to open videos correctly. - Enhanced backend API to serve video files with range request support for better streaming experience. These changes improve the user experience by allowing seamless viewing of both images and videos in the application.
1102 lines
42 KiB
Python
1102 lines
42 KiB
Python
"""Photo management endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import date, datetime
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status
|
|
from fastapi.responses import JSONResponse, FileResponse, Response
|
|
from typing import Annotated
|
|
from rq import Queue
|
|
from redis import Redis
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.db.session import get_db
|
|
from backend.api.auth import get_current_user
|
|
from backend.api.users import get_current_admin_user
|
|
|
|
# Redis connection for RQ
|
|
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
|
queue = Queue(connection=redis_conn)
|
|
from backend.schemas.photos import (
|
|
PhotoImportRequest,
|
|
PhotoImportResponse,
|
|
PhotoResponse,
|
|
BulkAddFavoritesRequest,
|
|
BulkAddFavoritesResponse,
|
|
BulkDeletePhotosRequest,
|
|
BulkDeletePhotosResponse,
|
|
BulkRemoveFavoritesRequest,
|
|
BulkRemoveFavoritesResponse,
|
|
)
|
|
from backend.schemas.search import (
|
|
PhotoSearchResult,
|
|
SearchPhotosResponse,
|
|
)
|
|
from backend.services.photo_service import (
|
|
find_photos_in_folder,
|
|
import_photo_from_path,
|
|
)
|
|
from backend.services.search_service import (
|
|
get_favorite_photos,
|
|
get_photo_face_count,
|
|
get_photo_person,
|
|
get_photo_tags,
|
|
get_photos_without_faces,
|
|
get_photos_without_tags,
|
|
get_processed_photos,
|
|
get_unprocessed_photos,
|
|
search_photos_by_date,
|
|
search_photos_by_name,
|
|
search_photos_by_tags,
|
|
)
|
|
# Note: Function passed as string path to avoid RQ serialization issues
|
|
|
|
router = APIRouter(prefix="/photos", tags=["photos"])
|
|
|
|
|
|
@router.get("", response_model=SearchPhotosResponse)
|
|
def search_photos(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"),
|
|
person_name: Optional[str] = Query(None, description="Person name for name search"),
|
|
tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"),
|
|
match_all: bool = Query(False, description="Match all tags (for tag search)"),
|
|
date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"),
|
|
date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"),
|
|
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
|
|
media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
) -> SearchPhotosResponse:
|
|
"""Search photos with filters.
|
|
|
|
Matches desktop search functionality exactly:
|
|
- Search by name: person_name required
|
|
- Search by date: date_from or date_to required
|
|
- Search by tags: tag_names required (comma-separated)
|
|
- Search no faces: returns photos without faces
|
|
- Search no tags: returns photos without tags
|
|
- Search processed: returns photos that have been processed for face detection
|
|
- Search unprocessed: returns photos that have not been processed for face detection
|
|
- Search favorites: returns photos favorited by current user
|
|
"""
|
|
from backend.db.models import PhotoFavorite
|
|
|
|
items: List[PhotoSearchResult] = []
|
|
total = 0
|
|
username = current_user["username"] if current_user else None
|
|
|
|
# Helper function to check if photo is favorite
|
|
def check_is_favorite(photo_id: int) -> bool:
|
|
if not username:
|
|
return False
|
|
favorite = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
return favorite is not None
|
|
|
|
# Parse date filters for use as additional filters (when not using date search type)
|
|
df = date.fromisoformat(date_from) if date_from else None
|
|
dt = date.fromisoformat(date_to) if date_to else None
|
|
|
|
# Parse tag filters for use as additional filters
|
|
tag_list = None
|
|
if tag_names:
|
|
tag_list = [t.strip() for t in tag_names.split(",") if t.strip()]
|
|
|
|
if search_type == "name":
|
|
if not person_name:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="person_name is required for name search",
|
|
)
|
|
results, total = search_photos_by_name(
|
|
db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size
|
|
)
|
|
for photo, full_name in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=full_name,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "date":
|
|
if not date_from and not date_to:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="At least one of date_from or date_to is required",
|
|
)
|
|
results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "tags":
|
|
if not tag_names:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="tag_names is required for tag search",
|
|
)
|
|
if not tag_list or len(tag_list) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="At least one tag name is required",
|
|
)
|
|
results, total = search_photos_by_tags(
|
|
db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size
|
|
)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "no_faces":
|
|
results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=None,
|
|
tags=tags,
|
|
has_faces=False,
|
|
face_count=0,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "no_tags":
|
|
results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=[],
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "processed":
|
|
results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "unprocessed":
|
|
results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "favorites":
|
|
if not username:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required for favorites search",
|
|
)
|
|
results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=True, # All results are favorites
|
|
)
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid search_type: {search_type}",
|
|
)
|
|
|
|
return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total)
|
|
|
|
|
|
@router.post("/import", response_model=PhotoImportResponse)
|
|
def import_photos(
|
|
request: PhotoImportRequest,
|
|
) -> PhotoImportResponse:
|
|
"""Import photos from a folder path.
|
|
|
|
This endpoint enqueues a background job to scan and import photos.
|
|
"""
|
|
if not request.folder_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="folder_path is required",
|
|
)
|
|
|
|
# Validate folder exists
|
|
import os
|
|
|
|
if not os.path.isdir(request.folder_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Folder not found: {request.folder_path}",
|
|
)
|
|
|
|
# Estimate number of photos (quick scan)
|
|
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
|
|
|
|
# Enqueue job
|
|
# Pass function as string path to avoid serialization issues
|
|
job = queue.enqueue(
|
|
"backend.services.tasks.import_photos_task",
|
|
request.folder_path,
|
|
request.recursive,
|
|
job_timeout="1h", # Allow up to 1 hour for large imports
|
|
)
|
|
|
|
return PhotoImportResponse(
|
|
job_id=job.id,
|
|
message=f"Photo import job queued for {request.folder_path}",
|
|
folder_path=request.folder_path,
|
|
estimated_photos=estimated_photos,
|
|
)
|
|
|
|
|
|
@router.post("/import/upload")
|
|
async def upload_photos(
|
|
files: list[UploadFile] = File(...),
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Upload photo files directly.
|
|
|
|
This endpoint accepts file uploads and imports them immediately.
|
|
Files are saved to PHOTO_STORAGE_DIR before import.
|
|
For large batches, prefer the /import endpoint with folder_path.
|
|
"""
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
from backend.settings import PHOTO_STORAGE_DIR
|
|
|
|
# Ensure storage directory exists
|
|
storage_dir = Path(PHOTO_STORAGE_DIR)
|
|
storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
added_count = 0
|
|
existing_count = 0
|
|
errors = []
|
|
|
|
for file in files:
|
|
try:
|
|
# Generate unique filename to avoid conflicts
|
|
import uuid
|
|
|
|
file_ext = Path(file.filename).suffix
|
|
unique_filename = f"{uuid.uuid4()}{file_ext}"
|
|
stored_path = storage_dir / unique_filename
|
|
|
|
# Save uploaded file to storage
|
|
content = await file.read()
|
|
with open(stored_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
# Import photo from stored location
|
|
photo, is_new = import_photo_from_path(db, str(stored_path))
|
|
if is_new:
|
|
added_count += 1
|
|
else:
|
|
existing_count += 1
|
|
# If photo already exists, delete duplicate upload
|
|
if os.path.exists(stored_path):
|
|
os.remove(stored_path)
|
|
except Exception as e:
|
|
errors.append(f"Error uploading {file.filename}: {str(e)}")
|
|
|
|
return {
|
|
"message": f"Uploaded {len(files)} files",
|
|
"added": added_count,
|
|
"existing": existing_count,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
@router.post("/browse-folder")
|
|
def browse_folder() -> dict:
|
|
"""Open native folder picker dialog and return selected folder path.
|
|
|
|
Uses tkinter to show a native OS folder picker dialog.
|
|
Returns the full absolute path of the selected folder.
|
|
|
|
Returns:
|
|
dict with 'path' (str) and 'success' (bool) keys
|
|
"""
|
|
import os
|
|
import sys
|
|
|
|
try:
|
|
import tkinter as tk
|
|
from tkinter import filedialog
|
|
except ImportError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="tkinter is not available. Cannot show folder picker.",
|
|
)
|
|
|
|
try:
|
|
# Create root window (hidden)
|
|
root = tk.Tk()
|
|
root.withdraw() # Hide main window
|
|
root.attributes('-topmost', True) # Bring to front
|
|
|
|
# Show folder picker dialog
|
|
folder_path = filedialog.askdirectory(
|
|
title="Select folder to scan",
|
|
mustexist=True
|
|
)
|
|
|
|
# Clean up
|
|
root.destroy()
|
|
|
|
if folder_path:
|
|
# Normalize path to absolute - use multiple methods to ensure full path
|
|
# Handle network paths (UNC paths on Windows, mounted shares on Linux)
|
|
|
|
# Check if it's a Windows UNC path (\\server\share or //server/share)
|
|
# UNC paths are already absolute, but realpath may not work correctly with them
|
|
is_unc_path = folder_path.startswith('\\\\') or folder_path.startswith('//')
|
|
|
|
if is_unc_path:
|
|
# For UNC paths, normalize separators but don't use realpath
|
|
# (realpath may not work correctly with UNC paths on Windows)
|
|
# os.path.normpath() handles UNC paths correctly on Windows
|
|
normalized_path = os.path.normpath(folder_path)
|
|
else:
|
|
# For regular paths (local or mounted network shares on Linux)
|
|
# First convert to absolute, then resolve any symlinks, then normalize
|
|
abs_path = os.path.abspath(folder_path)
|
|
try:
|
|
# Resolve any symlinks to get the real path
|
|
# This may fail for some network paths, so wrap in try/except
|
|
real_path = os.path.realpath(abs_path)
|
|
except (OSError, ValueError):
|
|
# If realpath fails (e.g., for some network paths), use abspath result
|
|
real_path = abs_path
|
|
# Normalize the path (remove redundant separators, etc.)
|
|
normalized_path = os.path.normpath(real_path)
|
|
|
|
# Ensure we have a full absolute path (not just folder name)
|
|
if not os.path.isabs(normalized_path):
|
|
# If somehow still not absolute, try again with current working directory
|
|
normalized_path = os.path.abspath(normalized_path)
|
|
|
|
# Verify the path exists and is a directory
|
|
# Note: For network paths, this check might fail if network is temporarily down,
|
|
# but if the user just selected it via the folder picker, it should be accessible
|
|
if not os.path.exists(normalized_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Selected path does not exist: {normalized_path}",
|
|
)
|
|
if not os.path.isdir(normalized_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Selected path is not a directory: {normalized_path}",
|
|
)
|
|
|
|
return {
|
|
"path": normalized_path,
|
|
"success": True,
|
|
"message": f"Selected folder: {normalized_path}"
|
|
}
|
|
else:
|
|
return {
|
|
"path": "",
|
|
"success": False,
|
|
"message": "No folder selected"
|
|
}
|
|
except Exception as e:
|
|
# Handle errors gracefully
|
|
error_msg = str(e)
|
|
|
|
# Check for common issues (display/headless server)
|
|
if "display" in error_msg.lower() or "DISPLAY" not in os.environ:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="No display available. Cannot show folder picker. "
|
|
"If running on a remote server, ensure X11 forwarding is enabled.",
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error showing folder picker: {error_msg}",
|
|
)
|
|
|
|
|
|
@router.get("/{photo_id}", response_model=PhotoResponse)
|
|
def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse:
|
|
"""Get photo by ID."""
|
|
from backend.db.models import Photo
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
return PhotoResponse.model_validate(photo)
|
|
|
|
|
|
@router.get("/{photo_id}/image")
|
|
def get_photo_image(
|
|
photo_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Serve photo image or video file for display (not download)."""
|
|
import os
|
|
import mimetypes
|
|
from backend.db.models import Photo
|
|
from starlette.responses import FileResponse
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
if not os.path.exists(photo.path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo file not found: {photo.path}",
|
|
)
|
|
|
|
# If it's a video, handle range requests for video streaming
|
|
if photo.media_type == "video":
|
|
media_type, _ = mimetypes.guess_type(photo.path)
|
|
if not media_type or not media_type.startswith('video/'):
|
|
media_type = "video/mp4"
|
|
|
|
file_size = os.path.getsize(photo.path)
|
|
# Get range header - Starlette uses lowercase
|
|
range_header = request.headers.get("range")
|
|
|
|
# Debug: log what we're getting (remove after debugging)
|
|
if photo_id == 737: # Only for this specific video
|
|
import json
|
|
debug_info = {
|
|
"range_header": range_header,
|
|
"all_headers": dict(request.headers),
|
|
"header_keys": list(request.headers.keys())
|
|
}
|
|
print(f"DEBUG photo 737: {json.dumps(debug_info, indent=2)}")
|
|
|
|
if range_header:
|
|
try:
|
|
# Parse range header: "bytes=start-end" or "bytes=start-" or "bytes=-suffix"
|
|
range_match = range_header.replace("bytes=", "").split("-")
|
|
start_str = range_match[0].strip()
|
|
end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else ""
|
|
|
|
start = int(start_str) if start_str else 0
|
|
end = int(end_str) if end_str else file_size - 1
|
|
|
|
# Validate range
|
|
if start < 0:
|
|
start = 0
|
|
if end >= file_size:
|
|
end = file_size - 1
|
|
if start > end:
|
|
return Response(
|
|
status_code=416,
|
|
headers={"Content-Range": f"bytes */{file_size}"}
|
|
)
|
|
|
|
# Read the requested chunk
|
|
chunk_size = end - start + 1
|
|
with open(photo.path, "rb") as f:
|
|
f.seek(start)
|
|
chunk = f.read(chunk_size)
|
|
|
|
return Response(
|
|
content=chunk,
|
|
status_code=206,
|
|
headers={
|
|
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(chunk_size),
|
|
"Content-Type": media_type,
|
|
"Content-Disposition": "inline",
|
|
"Cache-Control": "public, max-age=3600",
|
|
},
|
|
media_type=media_type,
|
|
)
|
|
except (ValueError, IndexError) as e:
|
|
# If range parsing fails, fall through to serve full file
|
|
pass
|
|
|
|
# No range request or parsing failed - serve full file with range support headers
|
|
response = FileResponse(
|
|
photo.path,
|
|
media_type=media_type,
|
|
)
|
|
response.headers["Content-Disposition"] = "inline"
|
|
response.headers["Accept-Ranges"] = "bytes"
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
return response
|
|
|
|
# Determine media type from file extension for images
|
|
media_type, _ = mimetypes.guess_type(photo.path)
|
|
if not media_type or not media_type.startswith('image/'):
|
|
media_type = "image/jpeg"
|
|
|
|
# Use FileResponse but set headers to display inline (not download)
|
|
response = FileResponse(
|
|
photo.path,
|
|
media_type=media_type,
|
|
)
|
|
# Set Content-Disposition to inline so browser displays instead of downloads
|
|
response.headers["Content-Disposition"] = "inline"
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
return response
|
|
|
|
|
|
@router.post("/{photo_id}/toggle-favorite")
|
|
def toggle_favorite(
|
|
photo_id: int,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Toggle favorite status of a photo for current user."""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify photo exists
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
# Check if already favorited
|
|
existing = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
|
|
if existing:
|
|
# Remove favorite
|
|
db.delete(existing)
|
|
is_favorite = False
|
|
else:
|
|
# Add favorite
|
|
favorite = PhotoFavorite(
|
|
username=username,
|
|
photo_id=photo_id
|
|
)
|
|
db.add(favorite)
|
|
is_favorite = True
|
|
|
|
db.commit()
|
|
|
|
return {
|
|
"photo_id": photo_id,
|
|
"is_favorite": is_favorite,
|
|
"message": "Favorite status updated"
|
|
}
|
|
|
|
|
|
@router.get("/{photo_id}/is-favorite")
|
|
def check_favorite(
|
|
photo_id: int,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Check if photo is favorited by current user."""
|
|
from backend.db.models import PhotoFavorite
|
|
|
|
username = current_user["username"]
|
|
|
|
favorite = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
|
|
return {
|
|
"photo_id": photo_id,
|
|
"is_favorite": favorite is not None
|
|
}
|
|
|
|
|
|
@router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse)
|
|
def bulk_add_favorites(
|
|
request: BulkAddFavoritesRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkAddFavoritesResponse:
|
|
"""Add multiple photos to favorites for current user.
|
|
|
|
Only adds favorites for photos that aren't already favorites.
|
|
Uses a single database transaction for better performance.
|
|
"""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
photo_ids = request.photo_ids
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify all photos exist
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = set(photo_ids) - found_ids
|
|
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photos not found: {sorted(missing_ids)}",
|
|
)
|
|
|
|
# Get existing favorites in a single query
|
|
existing_favorites = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id.in_(photo_ids)
|
|
).all()
|
|
existing_ids = {fav.photo_id for fav in existing_favorites}
|
|
|
|
# Only add favorites for photos that aren't already favorites
|
|
photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids]
|
|
|
|
added_count = 0
|
|
for photo_id in photos_to_add:
|
|
favorite = PhotoFavorite(
|
|
username=username,
|
|
photo_id=photo_id
|
|
)
|
|
db.add(favorite)
|
|
added_count += 1
|
|
|
|
db.commit()
|
|
|
|
return BulkAddFavoritesResponse(
|
|
message=f"Added {added_count} photo(s) to favorites",
|
|
added_count=added_count,
|
|
already_favorite_count=len(existing_ids),
|
|
total_requested=len(photo_ids),
|
|
)
|
|
|
|
|
|
@router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse)
|
|
def bulk_remove_favorites(
|
|
request: BulkRemoveFavoritesRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkRemoveFavoritesResponse:
|
|
"""Remove multiple photos from favorites for current user.
|
|
|
|
Only removes favorites for photos that are currently favorites.
|
|
Uses a single database transaction for better performance.
|
|
"""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
photo_ids = request.photo_ids
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify all photos exist
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = set(photo_ids) - found_ids
|
|
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photos not found: {sorted(missing_ids)}",
|
|
)
|
|
|
|
# Get existing favorites in a single query
|
|
existing_favorites = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id.in_(photo_ids)
|
|
).all()
|
|
existing_ids = {fav.photo_id for fav in existing_favorites}
|
|
|
|
# Only remove favorites for photos that are currently favorites
|
|
photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids]
|
|
|
|
removed_count = 0
|
|
for favorite in existing_favorites:
|
|
if favorite.photo_id in photos_to_remove:
|
|
db.delete(favorite)
|
|
removed_count += 1
|
|
|
|
db.commit()
|
|
|
|
return BulkRemoveFavoritesResponse(
|
|
message=f"Removed {removed_count} photo(s) from favorites",
|
|
removed_count=removed_count,
|
|
not_favorite_count=len(photo_ids) - len(existing_ids),
|
|
total_requested=len(photo_ids),
|
|
)
|
|
|
|
|
|
@router.post("/bulk-delete", response_model=BulkDeletePhotosResponse)
|
|
def bulk_delete_photos(
|
|
request: BulkDeletePhotosRequest,
|
|
current_admin: Annotated[dict, Depends(get_current_admin_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkDeletePhotosResponse:
|
|
"""Delete multiple photos and all related data (faces, encodings, tags, favorites)."""
|
|
from backend.db.models import Photo, PhotoTagLinkage
|
|
|
|
photo_ids = list(dict.fromkeys(request.photo_ids))
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
try:
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = sorted(set(photo_ids) - found_ids)
|
|
|
|
deleted_count = 0
|
|
for photo in photos:
|
|
# Remove tag linkages explicitly (in addition to cascade) to keep counts accurate
|
|
db.query(PhotoTagLinkage).filter(
|
|
PhotoTagLinkage.photo_id == photo.id
|
|
).delete(synchronize_session=False)
|
|
db.delete(photo)
|
|
deleted_count += 1
|
|
|
|
db.commit()
|
|
except HTTPException:
|
|
db.rollback()
|
|
raise
|
|
except Exception as exc: # pragma: no cover - safety net
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete photos: {exc}",
|
|
)
|
|
|
|
admin_username = current_admin.get("username", "unknown")
|
|
message_parts = [f"Deleted {deleted_count} photo(s)"]
|
|
if missing_ids:
|
|
message_parts.append(f"{len(missing_ids)} photo(s) not found")
|
|
message_parts.append(f"Request by admin: {admin_username}")
|
|
|
|
return BulkDeletePhotosResponse(
|
|
message="; ".join(message_parts),
|
|
deleted_count=deleted_count,
|
|
missing_photo_ids=missing_ids,
|
|
)
|
|
|
|
|
|
@router.post("/{photo_id}/open-folder")
|
|
def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict:
|
|
"""Open the folder containing the photo in the system file manager and select the file.
|
|
|
|
Matches desktop behavior and enhances it by selecting the specific file:
|
|
- Windows: uses explorer /select,"file_path"
|
|
- macOS: uses 'open -R' to reveal and select the file
|
|
- Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder
|
|
"""
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
from backend.db.models import Photo
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
# Ensure we have absolute path
|
|
file_path = os.path.abspath(photo.path)
|
|
folder = os.path.dirname(file_path)
|
|
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo file not found: {file_path}",
|
|
)
|
|
|
|
if not os.path.exists(folder):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Folder not found: {folder}",
|
|
)
|
|
|
|
try:
|
|
# Try using showinfm package first (better cross-platform support)
|
|
try:
|
|
from showinfm import show_in_file_manager
|
|
show_in_file_manager(file_path)
|
|
return {
|
|
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
|
|
"folder": folder,
|
|
"file": file_path
|
|
}
|
|
except ImportError:
|
|
# showinfm not installed, fall back to manual commands
|
|
pass
|
|
except Exception as e:
|
|
# showinfm failed, fall back to manual commands
|
|
pass
|
|
|
|
# Fallback: Open folder and select the file using platform-specific commands
|
|
if os.name == "nt": # Windows
|
|
# Windows: explorer /select,"file_path" opens folder and selects the file
|
|
subprocess.run(["explorer", "/select,", file_path], check=False)
|
|
elif sys.platform == "darwin": # macOS
|
|
# macOS: open -R reveals the file in Finder and selects it
|
|
subprocess.run(["open", "-R", file_path], check=False)
|
|
else: # Linux and others
|
|
# Linux: Try file manager-specific commands to select the file
|
|
# Try different file managers based on desktop environment
|
|
opened = False
|
|
|
|
# Detect desktop environment first
|
|
desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
|
|
|
|
# Try Nautilus (GNOME) - supports --select option
|
|
if "gnome" in desktop_env or not desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["nautilus", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation
|
|
if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env):
|
|
try:
|
|
# For Nemo, we can try opening the folder first, then using a script or
|
|
# just opening the folder (Nemo may focus on the file if we pass it)
|
|
# Try opening with the file path - Nemo will open the folder
|
|
file_uri = f"file://{file_path}"
|
|
result = subprocess.Popen(
|
|
["nemo", file_uri],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
# Nemo will open the folder - selection may not work perfectly
|
|
# This is a limitation of Nemo
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Dolphin (KDE) - supports --select option
|
|
if not opened and "kde" in desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["dolphin", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try PCManFM (LXDE/LXQt) - supports --select option
|
|
if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env):
|
|
try:
|
|
result = subprocess.run(
|
|
["pcmanfm", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Thunar (XFCE) - supports --select option
|
|
if not opened and "xfce" in desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["thunar", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# If desktop-specific didn't work, try all file managers in order
|
|
if not opened:
|
|
file_managers = [
|
|
("nautilus", ["--select", file_path]),
|
|
("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI
|
|
("dolphin", ["--select", file_path]),
|
|
("thunar", ["--select", file_path]),
|
|
("pcmanfm", ["--select", file_path]),
|
|
]
|
|
|
|
for fm_name, fm_args in file_managers:
|
|
try:
|
|
result = subprocess.run(
|
|
[fm_name] + fm_args,
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
break
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
continue
|
|
|
|
# Fallback: try xdg-open with the folder (will open folder but won't select file)
|
|
if not opened:
|
|
subprocess.run(["xdg-open", folder], check=False)
|
|
|
|
return {
|
|
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
|
|
"folder": folder,
|
|
"file": file_path
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to open folder: {str(e)}",
|
|
)
|
|
|