Some checks failed
CI / skip-ci-check (pull_request) Successful in 10s
CI / python-lint (pull_request) Has been cancelled
CI / test-backend (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / secret-scanning (pull_request) Has been cancelled
CI / dependency-scan (pull_request) Has been cancelled
CI / sast-scan (pull_request) Has been cancelled
CI / workflow-summary (pull_request) Has been cancelled
CI / lint-and-type-check (pull_request) Has been cancelled
- Add `browseDirectory` API endpoint to list directory contents. - Create `FolderBrowser` component for user interface to navigate directories. - Update `Scan` page to integrate folder browsing feature. - Define `DirectoryItem` and `BrowseDirectoryResponse` schemas for API responses.
1210 lines
45 KiB
Python
1210 lines
45 KiB
Python
"""Photo management endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import date, datetime
|
|
from typing import List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status
|
|
from fastapi.responses import JSONResponse, FileResponse, Response
|
|
from typing import Annotated
|
|
from rq import Queue
|
|
from redis import Redis
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.db.session import get_db
|
|
from backend.api.auth import get_current_user
|
|
from backend.api.users import get_current_admin_user
|
|
|
|
# Redis connection for RQ
|
|
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
|
queue = Queue(connection=redis_conn)
|
|
from backend.schemas.photos import (
|
|
PhotoImportRequest,
|
|
PhotoImportResponse,
|
|
PhotoResponse,
|
|
BulkAddFavoritesRequest,
|
|
BulkAddFavoritesResponse,
|
|
BulkDeletePhotosRequest,
|
|
BulkDeletePhotosResponse,
|
|
BulkRemoveFavoritesRequest,
|
|
BulkRemoveFavoritesResponse,
|
|
BrowseDirectoryResponse,
|
|
DirectoryItem,
|
|
)
|
|
from backend.schemas.search import (
|
|
PhotoSearchResult,
|
|
SearchPhotosResponse,
|
|
)
|
|
from backend.services.photo_service import (
|
|
find_photos_in_folder,
|
|
import_photo_from_path,
|
|
)
|
|
from backend.services.search_service import (
|
|
get_favorite_photos,
|
|
get_photo_face_count,
|
|
get_photo_person,
|
|
get_photo_tags,
|
|
get_photos_without_faces,
|
|
get_photos_without_tags,
|
|
get_processed_photos,
|
|
get_unprocessed_photos,
|
|
search_photos_by_date,
|
|
search_photos_by_name,
|
|
search_photos_by_tags,
|
|
)
|
|
# Note: Function passed as string path to avoid RQ serialization issues
|
|
|
|
router = APIRouter(prefix="/photos", tags=["photos"])
|
|
|
|
|
|
@router.get("", response_model=SearchPhotosResponse)
|
|
def search_photos(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"),
|
|
person_name: Optional[str] = Query(None, description="Person name for name search"),
|
|
tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"),
|
|
match_all: bool = Query(False, description="Match all tags (for tag search)"),
|
|
date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"),
|
|
date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"),
|
|
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
|
|
media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
) -> SearchPhotosResponse:
|
|
"""Search photos with filters.
|
|
|
|
Matches desktop search functionality exactly:
|
|
- Search by name: person_name required
|
|
- Search by date: date_from or date_to required
|
|
- Search by tags: tag_names required (comma-separated)
|
|
- Search no faces: returns photos without faces
|
|
- Search no tags: returns photos without tags
|
|
- Search processed: returns photos that have been processed for face detection
|
|
- Search unprocessed: returns photos that have not been processed for face detection
|
|
- Search favorites: returns photos favorited by current user
|
|
"""
|
|
from backend.db.models import PhotoFavorite
|
|
|
|
items: List[PhotoSearchResult] = []
|
|
total = 0
|
|
username = current_user["username"] if current_user else None
|
|
|
|
# Helper function to check if photo is favorite
|
|
def check_is_favorite(photo_id: int) -> bool:
|
|
if not username:
|
|
return False
|
|
favorite = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
return favorite is not None
|
|
|
|
# Parse date filters for use as additional filters (when not using date search type)
|
|
df = date.fromisoformat(date_from) if date_from else None
|
|
dt = date.fromisoformat(date_to) if date_to else None
|
|
|
|
# Parse tag filters for use as additional filters
|
|
tag_list = None
|
|
if tag_names:
|
|
tag_list = [t.strip() for t in tag_names.split(",") if t.strip()]
|
|
|
|
if search_type == "name":
|
|
if not person_name:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="person_name is required for name search",
|
|
)
|
|
results, total = search_photos_by_name(
|
|
db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size
|
|
)
|
|
for photo, full_name in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=full_name,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "date":
|
|
if not date_from and not date_to:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="At least one of date_from or date_to is required",
|
|
)
|
|
results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "tags":
|
|
if not tag_names:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="tag_names is required for tag search",
|
|
)
|
|
if not tag_list or len(tag_list) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="At least one tag name is required",
|
|
)
|
|
results, total = search_photos_by_tags(
|
|
db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size
|
|
)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "no_faces":
|
|
results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=None,
|
|
tags=tags,
|
|
has_faces=False,
|
|
face_count=0,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "no_tags":
|
|
results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=[],
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "processed":
|
|
results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "unprocessed":
|
|
results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=check_is_favorite(photo.id),
|
|
)
|
|
)
|
|
elif search_type == "favorites":
|
|
if not username:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required for favorites search",
|
|
)
|
|
results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size)
|
|
for photo in results:
|
|
tags = get_photo_tags(db, photo.id)
|
|
face_count = get_photo_face_count(db, photo.id)
|
|
person_name_val = get_photo_person(db, photo.id)
|
|
# Convert datetime to date for date_added
|
|
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
|
|
items.append(
|
|
PhotoSearchResult(
|
|
id=photo.id,
|
|
path=photo.path,
|
|
filename=photo.filename,
|
|
date_taken=photo.date_taken,
|
|
date_added=date_added,
|
|
processed=photo.processed,
|
|
media_type=photo.media_type or "image",
|
|
person_name=person_name_val,
|
|
tags=tags,
|
|
has_faces=face_count > 0,
|
|
face_count=face_count,
|
|
is_favorite=True, # All results are favorites
|
|
)
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid search_type: {search_type}",
|
|
)
|
|
|
|
return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total)
|
|
|
|
|
|
@router.post("/import", response_model=PhotoImportResponse)
|
|
def import_photos(
|
|
request: PhotoImportRequest,
|
|
) -> PhotoImportResponse:
|
|
"""Import photos from a folder path.
|
|
|
|
This endpoint enqueues a background job to scan and import photos.
|
|
"""
|
|
if not request.folder_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="folder_path is required",
|
|
)
|
|
|
|
# Validate folder exists
|
|
import os
|
|
|
|
if not os.path.isdir(request.folder_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Folder not found: {request.folder_path}",
|
|
)
|
|
|
|
# Estimate number of photos (quick scan)
|
|
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
|
|
|
|
# Enqueue job
|
|
# Pass function as string path to avoid serialization issues
|
|
job = queue.enqueue(
|
|
"backend.services.tasks.import_photos_task",
|
|
request.folder_path,
|
|
request.recursive,
|
|
job_timeout="1h", # Allow up to 1 hour for large imports
|
|
)
|
|
|
|
return PhotoImportResponse(
|
|
job_id=job.id,
|
|
message=f"Photo import job queued for {request.folder_path}",
|
|
folder_path=request.folder_path,
|
|
estimated_photos=estimated_photos,
|
|
)
|
|
|
|
|
|
@router.post("/import/upload")
|
|
async def upload_photos(
|
|
files: list[UploadFile] = File(...),
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Upload photo files directly.
|
|
|
|
This endpoint accepts file uploads and imports them immediately.
|
|
Files are saved to PHOTO_STORAGE_DIR before import.
|
|
For large batches, prefer the /import endpoint with folder_path.
|
|
"""
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
from backend.settings import PHOTO_STORAGE_DIR
|
|
|
|
# Ensure storage directory exists
|
|
storage_dir = Path(PHOTO_STORAGE_DIR)
|
|
storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
added_count = 0
|
|
existing_count = 0
|
|
errors = []
|
|
|
|
for file in files:
|
|
try:
|
|
# Generate unique filename to avoid conflicts
|
|
import uuid
|
|
|
|
file_ext = Path(file.filename).suffix
|
|
unique_filename = f"{uuid.uuid4()}{file_ext}"
|
|
stored_path = storage_dir / unique_filename
|
|
|
|
# Save uploaded file to storage
|
|
content = await file.read()
|
|
with open(stored_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
# Import photo from stored location
|
|
photo, is_new = import_photo_from_path(db, str(stored_path))
|
|
if is_new:
|
|
added_count += 1
|
|
else:
|
|
existing_count += 1
|
|
# If photo already exists, delete duplicate upload
|
|
if os.path.exists(stored_path):
|
|
os.remove(stored_path)
|
|
except Exception as e:
|
|
errors.append(f"Error uploading {file.filename}: {str(e)}")
|
|
|
|
return {
|
|
"message": f"Uploaded {len(files)} files",
|
|
"added": added_count,
|
|
"existing": existing_count,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
@router.get("/browse-directory", response_model=BrowseDirectoryResponse)
|
|
def browse_directory(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
path: str = Query("/", description="Directory path to list"),
|
|
) -> BrowseDirectoryResponse:
|
|
"""List directories and files in a given path.
|
|
|
|
No GUI required - uses os.listdir() to read filesystem.
|
|
Returns JSON with directory structure for web-based folder browser.
|
|
|
|
Args:
|
|
path: Directory path to list (can be relative or absolute)
|
|
|
|
Returns:
|
|
BrowseDirectoryResponse with current path, parent path, and items list
|
|
|
|
Raises:
|
|
HTTPException: If path doesn't exist, is not a directory, or access is denied
|
|
"""
|
|
import os
|
|
from pathlib import Path
|
|
|
|
try:
|
|
# Convert to absolute path
|
|
abs_path = os.path.abspath(path)
|
|
|
|
# Normalize path separators
|
|
abs_path = os.path.normpath(abs_path)
|
|
|
|
# Security: Optional - restrict to certain base paths
|
|
# For now, allow any path (server admin should configure file permissions)
|
|
# You can uncomment and customize this for production:
|
|
# allowed_bases = ["/home", "/mnt", "/opt/punimtag", "/media"]
|
|
# if not any(abs_path.startswith(base) for base in allowed_bases):
|
|
# raise HTTPException(
|
|
# status_code=status.HTTP_403_FORBIDDEN,
|
|
# detail=f"Path not allowed: {abs_path}"
|
|
# )
|
|
|
|
if not os.path.exists(abs_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Path does not exist: {abs_path}",
|
|
)
|
|
|
|
if not os.path.isdir(abs_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Path is not a directory: {abs_path}",
|
|
)
|
|
|
|
# Read directory contents
|
|
items = []
|
|
try:
|
|
for item in os.listdir(abs_path):
|
|
item_path = os.path.join(abs_path, item)
|
|
full_path = os.path.abspath(item_path)
|
|
|
|
# Skip if we can't access it (permission denied)
|
|
try:
|
|
is_dir = os.path.isdir(full_path)
|
|
is_file = os.path.isfile(full_path)
|
|
except (OSError, PermissionError):
|
|
# Skip items we can't access
|
|
continue
|
|
|
|
items.append(
|
|
DirectoryItem(
|
|
name=item,
|
|
path=full_path,
|
|
is_directory=is_dir,
|
|
is_file=is_file,
|
|
)
|
|
)
|
|
except PermissionError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Permission denied reading directory: {abs_path}",
|
|
)
|
|
|
|
# Sort: directories first, then files, both alphabetically
|
|
items.sort(key=lambda x: (not x.is_directory, x.name.lower()))
|
|
|
|
# Get parent path (None if at root)
|
|
parent_path = None
|
|
if abs_path != "/" and abs_path != os.path.dirname(abs_path):
|
|
parent_path = os.path.dirname(abs_path)
|
|
# Normalize parent path
|
|
parent_path = os.path.normpath(parent_path)
|
|
|
|
return BrowseDirectoryResponse(
|
|
current_path=abs_path,
|
|
parent_path=parent_path,
|
|
items=items,
|
|
)
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTP exceptions as-is
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error reading directory: {str(e)}",
|
|
)
|
|
|
|
|
|
@router.post("/browse-folder")
|
|
def browse_folder() -> dict:
|
|
"""Open native folder picker dialog and return selected folder path.
|
|
|
|
Uses tkinter to show a native OS folder picker dialog.
|
|
Returns the full absolute path of the selected folder.
|
|
|
|
Returns:
|
|
dict with 'path' (str) and 'success' (bool) keys
|
|
"""
|
|
import os
|
|
import sys
|
|
|
|
try:
|
|
import tkinter as tk
|
|
from tkinter import filedialog
|
|
except ImportError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="tkinter is not available. Cannot show folder picker.",
|
|
)
|
|
|
|
try:
|
|
# Create root window (hidden)
|
|
root = tk.Tk()
|
|
root.withdraw() # Hide main window
|
|
root.attributes('-topmost', True) # Bring to front
|
|
|
|
# Show folder picker dialog
|
|
folder_path = filedialog.askdirectory(
|
|
title="Select folder to scan",
|
|
mustexist=True
|
|
)
|
|
|
|
# Clean up
|
|
root.destroy()
|
|
|
|
if folder_path:
|
|
# Normalize path to absolute - use multiple methods to ensure full path
|
|
# Handle network paths (UNC paths on Windows, mounted shares on Linux)
|
|
|
|
# Check if it's a Windows UNC path (\\server\share or //server/share)
|
|
# UNC paths are already absolute, but realpath may not work correctly with them
|
|
is_unc_path = folder_path.startswith('\\\\') or folder_path.startswith('//')
|
|
|
|
if is_unc_path:
|
|
# For UNC paths, normalize separators but don't use realpath
|
|
# (realpath may not work correctly with UNC paths on Windows)
|
|
# os.path.normpath() handles UNC paths correctly on Windows
|
|
normalized_path = os.path.normpath(folder_path)
|
|
else:
|
|
# For regular paths (local or mounted network shares on Linux)
|
|
# First convert to absolute, then resolve any symlinks, then normalize
|
|
abs_path = os.path.abspath(folder_path)
|
|
try:
|
|
# Resolve any symlinks to get the real path
|
|
# This may fail for some network paths, so wrap in try/except
|
|
real_path = os.path.realpath(abs_path)
|
|
except (OSError, ValueError):
|
|
# If realpath fails (e.g., for some network paths), use abspath result
|
|
real_path = abs_path
|
|
# Normalize the path (remove redundant separators, etc.)
|
|
normalized_path = os.path.normpath(real_path)
|
|
|
|
# Ensure we have a full absolute path (not just folder name)
|
|
if not os.path.isabs(normalized_path):
|
|
# If somehow still not absolute, try again with current working directory
|
|
normalized_path = os.path.abspath(normalized_path)
|
|
|
|
# Verify the path exists and is a directory
|
|
# Note: For network paths, this check might fail if network is temporarily down,
|
|
# but if the user just selected it via the folder picker, it should be accessible
|
|
if not os.path.exists(normalized_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Selected path does not exist: {normalized_path}",
|
|
)
|
|
if not os.path.isdir(normalized_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Selected path is not a directory: {normalized_path}",
|
|
)
|
|
|
|
return {
|
|
"path": normalized_path,
|
|
"success": True,
|
|
"message": f"Selected folder: {normalized_path}"
|
|
}
|
|
else:
|
|
return {
|
|
"path": "",
|
|
"success": False,
|
|
"message": "No folder selected"
|
|
}
|
|
except Exception as e:
|
|
# Handle errors gracefully
|
|
error_msg = str(e)
|
|
|
|
# Check for common issues (display/headless server)
|
|
if "display" in error_msg.lower() or "DISPLAY" not in os.environ:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="No display available. Cannot show folder picker. "
|
|
"If running on a remote server, ensure X11 forwarding is enabled.",
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error showing folder picker: {error_msg}",
|
|
)
|
|
|
|
|
|
@router.get("/{photo_id}", response_model=PhotoResponse)
|
|
def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse:
|
|
"""Get photo by ID."""
|
|
from backend.db.models import Photo
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
return PhotoResponse.model_validate(photo)
|
|
|
|
|
|
@router.get("/{photo_id}/image")
|
|
def get_photo_image(
|
|
photo_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Serve photo image or video file for display (not download)."""
|
|
import os
|
|
import mimetypes
|
|
from backend.db.models import Photo
|
|
from starlette.responses import FileResponse
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
if not os.path.exists(photo.path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo file not found: {photo.path}",
|
|
)
|
|
|
|
# If it's a video, handle range requests for video streaming
|
|
if photo.media_type == "video":
|
|
media_type, _ = mimetypes.guess_type(photo.path)
|
|
if not media_type or not media_type.startswith('video/'):
|
|
media_type = "video/mp4"
|
|
|
|
file_size = os.path.getsize(photo.path)
|
|
# Get range header - Starlette uses lowercase
|
|
range_header = request.headers.get("range")
|
|
|
|
# Debug: log what we're getting (remove after debugging)
|
|
if photo_id == 737: # Only for this specific video
|
|
import json
|
|
debug_info = {
|
|
"range_header": range_header,
|
|
"all_headers": dict(request.headers),
|
|
"header_keys": list(request.headers.keys())
|
|
}
|
|
print(f"DEBUG photo 737: {json.dumps(debug_info, indent=2)}")
|
|
|
|
if range_header:
|
|
try:
|
|
# Parse range header: "bytes=start-end" or "bytes=start-" or "bytes=-suffix"
|
|
range_match = range_header.replace("bytes=", "").split("-")
|
|
start_str = range_match[0].strip()
|
|
end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else ""
|
|
|
|
start = int(start_str) if start_str else 0
|
|
end = int(end_str) if end_str else file_size - 1
|
|
|
|
# Validate range
|
|
if start < 0:
|
|
start = 0
|
|
if end >= file_size:
|
|
end = file_size - 1
|
|
if start > end:
|
|
return Response(
|
|
status_code=416,
|
|
headers={"Content-Range": f"bytes */{file_size}"}
|
|
)
|
|
|
|
# Read the requested chunk
|
|
chunk_size = end - start + 1
|
|
with open(photo.path, "rb") as f:
|
|
f.seek(start)
|
|
chunk = f.read(chunk_size)
|
|
|
|
return Response(
|
|
content=chunk,
|
|
status_code=206,
|
|
headers={
|
|
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(chunk_size),
|
|
"Content-Type": media_type,
|
|
"Content-Disposition": "inline",
|
|
"Cache-Control": "public, max-age=3600",
|
|
},
|
|
media_type=media_type,
|
|
)
|
|
except (ValueError, IndexError) as e:
|
|
# If range parsing fails, fall through to serve full file
|
|
pass
|
|
|
|
# No range request or parsing failed - serve full file with range support headers
|
|
response = FileResponse(
|
|
photo.path,
|
|
media_type=media_type,
|
|
)
|
|
response.headers["Content-Disposition"] = "inline"
|
|
response.headers["Accept-Ranges"] = "bytes"
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
return response
|
|
|
|
# Determine media type from file extension for images
|
|
media_type, _ = mimetypes.guess_type(photo.path)
|
|
if not media_type or not media_type.startswith('image/'):
|
|
media_type = "image/jpeg"
|
|
|
|
# Use FileResponse but set headers to display inline (not download)
|
|
response = FileResponse(
|
|
photo.path,
|
|
media_type=media_type,
|
|
)
|
|
# Set Content-Disposition to inline so browser displays instead of downloads
|
|
response.headers["Content-Disposition"] = "inline"
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
return response
|
|
|
|
|
|
@router.post("/{photo_id}/toggle-favorite")
|
|
def toggle_favorite(
|
|
photo_id: int,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Toggle favorite status of a photo for current user."""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify photo exists
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
# Check if already favorited
|
|
existing = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
|
|
if existing:
|
|
# Remove favorite
|
|
db.delete(existing)
|
|
is_favorite = False
|
|
else:
|
|
# Add favorite
|
|
favorite = PhotoFavorite(
|
|
username=username,
|
|
photo_id=photo_id
|
|
)
|
|
db.add(favorite)
|
|
is_favorite = True
|
|
|
|
db.commit()
|
|
|
|
return {
|
|
"photo_id": photo_id,
|
|
"is_favorite": is_favorite,
|
|
"message": "Favorite status updated"
|
|
}
|
|
|
|
|
|
@router.get("/{photo_id}/is-favorite")
|
|
def check_favorite(
|
|
photo_id: int,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Check if photo is favorited by current user."""
|
|
from backend.db.models import PhotoFavorite
|
|
|
|
username = current_user["username"]
|
|
|
|
favorite = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id == photo_id
|
|
).first()
|
|
|
|
return {
|
|
"photo_id": photo_id,
|
|
"is_favorite": favorite is not None
|
|
}
|
|
|
|
|
|
@router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse)
|
|
def bulk_add_favorites(
|
|
request: BulkAddFavoritesRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkAddFavoritesResponse:
|
|
"""Add multiple photos to favorites for current user.
|
|
|
|
Only adds favorites for photos that aren't already favorites.
|
|
Uses a single database transaction for better performance.
|
|
"""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
photo_ids = request.photo_ids
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify all photos exist
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = set(photo_ids) - found_ids
|
|
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photos not found: {sorted(missing_ids)}",
|
|
)
|
|
|
|
# Get existing favorites in a single query
|
|
existing_favorites = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id.in_(photo_ids)
|
|
).all()
|
|
existing_ids = {fav.photo_id for fav in existing_favorites}
|
|
|
|
# Only add favorites for photos that aren't already favorites
|
|
photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids]
|
|
|
|
added_count = 0
|
|
for photo_id in photos_to_add:
|
|
favorite = PhotoFavorite(
|
|
username=username,
|
|
photo_id=photo_id
|
|
)
|
|
db.add(favorite)
|
|
added_count += 1
|
|
|
|
db.commit()
|
|
|
|
return BulkAddFavoritesResponse(
|
|
message=f"Added {added_count} photo(s) to favorites",
|
|
added_count=added_count,
|
|
already_favorite_count=len(existing_ids),
|
|
total_requested=len(photo_ids),
|
|
)
|
|
|
|
|
|
@router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse)
|
|
def bulk_remove_favorites(
|
|
request: BulkRemoveFavoritesRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkRemoveFavoritesResponse:
|
|
"""Remove multiple photos from favorites for current user.
|
|
|
|
Only removes favorites for photos that are currently favorites.
|
|
Uses a single database transaction for better performance.
|
|
"""
|
|
from backend.db.models import Photo, PhotoFavorite
|
|
|
|
photo_ids = request.photo_ids
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
username = current_user["username"]
|
|
|
|
# Verify all photos exist
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = set(photo_ids) - found_ids
|
|
|
|
if missing_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photos not found: {sorted(missing_ids)}",
|
|
)
|
|
|
|
# Get existing favorites in a single query
|
|
existing_favorites = db.query(PhotoFavorite).filter(
|
|
PhotoFavorite.username == username,
|
|
PhotoFavorite.photo_id.in_(photo_ids)
|
|
).all()
|
|
existing_ids = {fav.photo_id for fav in existing_favorites}
|
|
|
|
# Only remove favorites for photos that are currently favorites
|
|
photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids]
|
|
|
|
removed_count = 0
|
|
for favorite in existing_favorites:
|
|
if favorite.photo_id in photos_to_remove:
|
|
db.delete(favorite)
|
|
removed_count += 1
|
|
|
|
db.commit()
|
|
|
|
return BulkRemoveFavoritesResponse(
|
|
message=f"Removed {removed_count} photo(s) from favorites",
|
|
removed_count=removed_count,
|
|
not_favorite_count=len(photo_ids) - len(existing_ids),
|
|
total_requested=len(photo_ids),
|
|
)
|
|
|
|
|
|
@router.post("/bulk-delete", response_model=BulkDeletePhotosResponse)
|
|
def bulk_delete_photos(
|
|
request: BulkDeletePhotosRequest,
|
|
current_admin: Annotated[dict, Depends(get_current_admin_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> BulkDeletePhotosResponse:
|
|
"""Delete multiple photos and all related data (faces, encodings, tags, favorites)."""
|
|
from backend.db.models import Photo, PhotoTagLinkage
|
|
|
|
photo_ids = list(dict.fromkeys(request.photo_ids))
|
|
if not photo_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="photo_ids list cannot be empty",
|
|
)
|
|
|
|
try:
|
|
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
|
|
found_ids = {photo.id for photo in photos}
|
|
missing_ids = sorted(set(photo_ids) - found_ids)
|
|
|
|
deleted_count = 0
|
|
for photo in photos:
|
|
# Remove tag linkages explicitly (in addition to cascade) to keep counts accurate
|
|
db.query(PhotoTagLinkage).filter(
|
|
PhotoTagLinkage.photo_id == photo.id
|
|
).delete(synchronize_session=False)
|
|
db.delete(photo)
|
|
deleted_count += 1
|
|
|
|
db.commit()
|
|
except HTTPException:
|
|
db.rollback()
|
|
raise
|
|
except Exception as exc: # pragma: no cover - safety net
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete photos: {exc}",
|
|
)
|
|
|
|
admin_username = current_admin.get("username", "unknown")
|
|
message_parts = [f"Deleted {deleted_count} photo(s)"]
|
|
if missing_ids:
|
|
message_parts.append(f"{len(missing_ids)} photo(s) not found")
|
|
message_parts.append(f"Request by admin: {admin_username}")
|
|
|
|
return BulkDeletePhotosResponse(
|
|
message="; ".join(message_parts),
|
|
deleted_count=deleted_count,
|
|
missing_photo_ids=missing_ids,
|
|
)
|
|
|
|
|
|
@router.post("/{photo_id}/open-folder")
|
|
def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict:
|
|
"""Open the folder containing the photo in the system file manager and select the file.
|
|
|
|
Matches desktop behavior and enhances it by selecting the specific file:
|
|
- Windows: uses explorer /select,"file_path"
|
|
- macOS: uses 'open -R' to reveal and select the file
|
|
- Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder
|
|
"""
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
from backend.db.models import Photo
|
|
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if not photo:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo {photo_id} not found",
|
|
)
|
|
|
|
# Ensure we have absolute path
|
|
file_path = os.path.abspath(photo.path)
|
|
folder = os.path.dirname(file_path)
|
|
|
|
if not os.path.exists(file_path):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Photo file not found: {file_path}",
|
|
)
|
|
|
|
if not os.path.exists(folder):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Folder not found: {folder}",
|
|
)
|
|
|
|
try:
|
|
# Try using showinfm package first (better cross-platform support)
|
|
try:
|
|
from showinfm import show_in_file_manager
|
|
show_in_file_manager(file_path)
|
|
return {
|
|
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
|
|
"folder": folder,
|
|
"file": file_path
|
|
}
|
|
except ImportError:
|
|
# showinfm not installed, fall back to manual commands
|
|
pass
|
|
except Exception as e:
|
|
# showinfm failed, fall back to manual commands
|
|
pass
|
|
|
|
# Fallback: Open folder and select the file using platform-specific commands
|
|
if os.name == "nt": # Windows
|
|
# Windows: explorer /select,"file_path" opens folder and selects the file
|
|
subprocess.run(["explorer", "/select,", file_path], check=False)
|
|
elif sys.platform == "darwin": # macOS
|
|
# macOS: open -R reveals the file in Finder and selects it
|
|
subprocess.run(["open", "-R", file_path], check=False)
|
|
else: # Linux and others
|
|
# Linux: Try file manager-specific commands to select the file
|
|
# Try different file managers based on desktop environment
|
|
opened = False
|
|
|
|
# Detect desktop environment first
|
|
desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
|
|
|
|
# Try Nautilus (GNOME) - supports --select option
|
|
if "gnome" in desktop_env or not desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["nautilus", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation
|
|
if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env):
|
|
try:
|
|
# For Nemo, we can try opening the folder first, then using a script or
|
|
# just opening the folder (Nemo may focus on the file if we pass it)
|
|
# Try opening with the file path - Nemo will open the folder
|
|
file_uri = f"file://{file_path}"
|
|
result = subprocess.Popen(
|
|
["nemo", file_uri],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
# Nemo will open the folder - selection may not work perfectly
|
|
# This is a limitation of Nemo
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Dolphin (KDE) - supports --select option
|
|
if not opened and "kde" in desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["dolphin", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try PCManFM (LXDE/LXQt) - supports --select option
|
|
if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env):
|
|
try:
|
|
result = subprocess.run(
|
|
["pcmanfm", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# Try Thunar (XFCE) - supports --select option
|
|
if not opened and "xfce" in desktop_env:
|
|
try:
|
|
result = subprocess.run(
|
|
["thunar", "--select", file_path],
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
pass
|
|
|
|
# If desktop-specific didn't work, try all file managers in order
|
|
if not opened:
|
|
file_managers = [
|
|
("nautilus", ["--select", file_path]),
|
|
("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI
|
|
("dolphin", ["--select", file_path]),
|
|
("thunar", ["--select", file_path]),
|
|
("pcmanfm", ["--select", file_path]),
|
|
]
|
|
|
|
for fm_name, fm_args in file_managers:
|
|
try:
|
|
result = subprocess.run(
|
|
[fm_name] + fm_args,
|
|
check=False,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
opened = True
|
|
break
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
continue
|
|
|
|
# Fallback: try xdg-open with the folder (will open folder but won't select file)
|
|
if not opened:
|
|
subprocess.run(["xdg-open", folder], check=False)
|
|
|
|
return {
|
|
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
|
|
"folder": folder,
|
|
"file": file_path
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to open folder: {str(e)}",
|
|
)
|
|
|