"""Photo management endpoints.""" from __future__ import annotations from datetime import date, datetime from typing import List, Optional from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status from fastapi.responses import JSONResponse, FileResponse, Response from typing import Annotated from rq import Queue from redis import Redis from sqlalchemy.orm import Session from backend.db.session import get_db from backend.api.auth import get_current_user from backend.api.users import get_current_admin_user # Redis connection for RQ redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) queue = Queue(connection=redis_conn) from backend.schemas.photos import ( PhotoImportRequest, PhotoImportResponse, PhotoResponse, BulkAddFavoritesRequest, BulkAddFavoritesResponse, BulkDeletePhotosRequest, BulkDeletePhotosResponse, BulkRemoveFavoritesRequest, BulkRemoveFavoritesResponse, BrowseDirectoryResponse, DirectoryItem, ) from backend.schemas.search import ( PhotoSearchResult, SearchPhotosResponse, ) from backend.services.photo_service import ( find_photos_in_folder, import_photo_from_path, ) from backend.services.search_service import ( get_favorite_photos, get_photo_face_count, get_photo_person, get_photo_tags, get_photos_without_faces, get_photos_without_tags, get_processed_photos, get_unprocessed_photos, search_photos_by_date, search_photos_by_name, search_photos_by_tags, ) # Note: Function passed as string path to avoid RQ serialization issues router = APIRouter(prefix="/photos", tags=["photos"]) @router.get("", response_model=SearchPhotosResponse) def search_photos( current_user: Annotated[dict, Depends(get_current_user)], search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"), person_name: Optional[str] = Query(None, description="Person name for name search"), tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"), match_all: bool = Query(False, description="Match all tags (for tag search)"), date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"), date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"), folder_path: Optional[str] = Query(None, description="Filter by folder path"), media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), db: Session = Depends(get_db), ) -> SearchPhotosResponse: """Search photos with filters. Matches desktop search functionality exactly: - Search by name: person_name required - Search by date: date_from or date_to required - Search by tags: tag_names required (comma-separated) - Search no faces: returns photos without faces - Search no tags: returns photos without tags - Search processed: returns photos that have been processed for face detection - Search unprocessed: returns photos that have not been processed for face detection - Search favorites: returns photos favorited by current user """ from backend.db.models import PhotoFavorite items: List[PhotoSearchResult] = [] total = 0 username = current_user["username"] if current_user else None # Helper function to check if photo is favorite def check_is_favorite(photo_id: int) -> bool: if not username: return False favorite = db.query(PhotoFavorite).filter( PhotoFavorite.username == username, PhotoFavorite.photo_id == photo_id ).first() return favorite is not None # Parse date filters for use as additional filters (when not using date search type) df = date.fromisoformat(date_from) if date_from else None dt = date.fromisoformat(date_to) if date_to else None # Parse tag filters for use as additional filters tag_list = None if tag_names: tag_list = [t.strip() for t in tag_names.split(",") if t.strip()] if search_type == "name": if not person_name: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="person_name is required for name search", ) results, total = search_photos_by_name( db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size ) for photo, full_name in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=full_name, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "date": if not date_from and not date_to: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="At least one of date_from or date_to is required", ) results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size) for photo in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "tags": if not tag_names: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="tag_names is required for tag search", ) if not tag_list or len(tag_list) == 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="At least one tag name is required", ) results, total = search_photos_by_tags( db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size ) for photo in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "no_faces": results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size) for photo in results: tags = get_photo_tags(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=None, tags=tags, has_faces=False, face_count=0, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "no_tags": results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size) for photo in results: face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=[], has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "processed": results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size) for photo in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "unprocessed": results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size) for photo in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=check_is_favorite(photo.id), ) ) elif search_type == "favorites": if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required for favorites search", ) results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size) for photo in results: tags = get_photo_tags(db, photo.id) face_count = get_photo_face_count(db, photo.id) person_name_val = get_photo_person(db, photo.id) # Convert datetime to date for date_added date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added items.append( PhotoSearchResult( id=photo.id, path=photo.path, filename=photo.filename, date_taken=photo.date_taken, date_added=date_added, processed=photo.processed, media_type=photo.media_type or "image", person_name=person_name_val, tags=tags, has_faces=face_count > 0, face_count=face_count, is_favorite=True, # All results are favorites ) ) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid search_type: {search_type}", ) return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total) @router.post("/import", response_model=PhotoImportResponse) def import_photos( request: PhotoImportRequest, ) -> PhotoImportResponse: """Import photos from a folder path. This endpoint enqueues a background job to scan and import photos. """ if not request.folder_path: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="folder_path is required", ) # Validate folder exists import os if not os.path.isdir(request.folder_path): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Folder not found: {request.folder_path}", ) # Estimate number of photos (quick scan) estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive)) # Enqueue job # Pass function as string path to avoid serialization issues job = queue.enqueue( "backend.services.tasks.import_photos_task", request.folder_path, request.recursive, job_timeout="1h", # Allow up to 1 hour for large imports ) return PhotoImportResponse( job_id=job.id, message=f"Photo import job queued for {request.folder_path}", folder_path=request.folder_path, estimated_photos=estimated_photos, ) @router.post("/import/upload") async def upload_photos( files: list[UploadFile] = File(...), db: Session = Depends(get_db), ) -> dict: """Upload photo files directly. This endpoint accepts file uploads and imports them immediately. Files are saved to PHOTO_STORAGE_DIR before import. For large batches, prefer the /import endpoint with folder_path. """ import os import shutil from pathlib import Path from backend.settings import PHOTO_STORAGE_DIR # Ensure storage directory exists storage_dir = Path(PHOTO_STORAGE_DIR) storage_dir.mkdir(parents=True, exist_ok=True) added_count = 0 existing_count = 0 errors = [] for file in files: try: # Generate unique filename to avoid conflicts import uuid file_ext = Path(file.filename).suffix unique_filename = f"{uuid.uuid4()}{file_ext}" stored_path = storage_dir / unique_filename # Save uploaded file to storage content = await file.read() with open(stored_path, "wb") as f: f.write(content) # Import photo from stored location photo, is_new = import_photo_from_path(db, str(stored_path)) if is_new: added_count += 1 else: existing_count += 1 # If photo already exists, delete duplicate upload if os.path.exists(stored_path): os.remove(stored_path) except Exception as e: errors.append(f"Error uploading {file.filename}: {str(e)}") return { "message": f"Uploaded {len(files)} files", "added": added_count, "existing": existing_count, "errors": errors, } @router.get("/browse-directory", response_model=BrowseDirectoryResponse) def browse_directory( current_user: Annotated[dict, Depends(get_current_user)], path: str = Query("/", description="Directory path to list"), ) -> BrowseDirectoryResponse: """List directories and files in a given path. No GUI required - uses os.listdir() to read filesystem. Returns JSON with directory structure for web-based folder browser. Args: path: Directory path to list (can be relative or absolute) Returns: BrowseDirectoryResponse with current path, parent path, and items list Raises: HTTPException: If path doesn't exist, is not a directory, or access is denied """ import os from pathlib import Path try: # Convert to absolute path abs_path = os.path.abspath(path) # Normalize path separators abs_path = os.path.normpath(abs_path) # Security: Optional - restrict to certain base paths # For now, allow any path (server admin should configure file permissions) # You can uncomment and customize this for production: # allowed_bases = ["/home", "/mnt", "/opt/punimtag", "/media"] # if not any(abs_path.startswith(base) for base in allowed_bases): # raise HTTPException( # status_code=status.HTTP_403_FORBIDDEN, # detail=f"Path not allowed: {abs_path}" # ) if not os.path.exists(abs_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Path does not exist: {abs_path}", ) if not os.path.isdir(abs_path): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Path is not a directory: {abs_path}", ) # Read directory contents items = [] try: for item in os.listdir(abs_path): item_path = os.path.join(abs_path, item) full_path = os.path.abspath(item_path) # Skip if we can't access it (permission denied) try: is_dir = os.path.isdir(full_path) is_file = os.path.isfile(full_path) except (OSError, PermissionError): # Skip items we can't access continue items.append( DirectoryItem( name=item, path=full_path, is_directory=is_dir, is_file=is_file, ) ) except PermissionError: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission denied reading directory: {abs_path}", ) # Sort: directories first, then files, both alphabetically items.sort(key=lambda x: (not x.is_directory, x.name.lower())) # Get parent path (None if at root) parent_path = None if abs_path != "/" and abs_path != os.path.dirname(abs_path): parent_path = os.path.dirname(abs_path) # Normalize parent path parent_path = os.path.normpath(parent_path) return BrowseDirectoryResponse( current_path=abs_path, parent_path=parent_path, items=items, ) except HTTPException: # Re-raise HTTP exceptions as-is raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error reading directory: {str(e)}", ) @router.post("/browse-folder") def browse_folder() -> dict: """Open native folder picker dialog and return selected folder path. Uses tkinter to show a native OS folder picker dialog. Returns the full absolute path of the selected folder. Returns: dict with 'path' (str) and 'success' (bool) keys """ import os import sys try: import tkinter as tk from tkinter import filedialog except ImportError: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="tkinter is not available. Cannot show folder picker.", ) try: # Create root window (hidden) root = tk.Tk() root.withdraw() # Hide main window root.attributes('-topmost', True) # Bring to front # Show folder picker dialog folder_path = filedialog.askdirectory( title="Select folder to scan", mustexist=True ) # Clean up root.destroy() if folder_path: # Normalize path to absolute - use multiple methods to ensure full path # Handle network paths (UNC paths on Windows, mounted shares on Linux) # Check if it's a Windows UNC path (\\server\share or //server/share) # UNC paths are already absolute, but realpath may not work correctly with them is_unc_path = folder_path.startswith('\\\\') or folder_path.startswith('//') if is_unc_path: # For UNC paths, normalize separators but don't use realpath # (realpath may not work correctly with UNC paths on Windows) # os.path.normpath() handles UNC paths correctly on Windows normalized_path = os.path.normpath(folder_path) else: # For regular paths (local or mounted network shares on Linux) # First convert to absolute, then resolve any symlinks, then normalize abs_path = os.path.abspath(folder_path) try: # Resolve any symlinks to get the real path # This may fail for some network paths, so wrap in try/except real_path = os.path.realpath(abs_path) except (OSError, ValueError): # If realpath fails (e.g., for some network paths), use abspath result real_path = abs_path # Normalize the path (remove redundant separators, etc.) normalized_path = os.path.normpath(real_path) # Ensure we have a full absolute path (not just folder name) if not os.path.isabs(normalized_path): # If somehow still not absolute, try again with current working directory normalized_path = os.path.abspath(normalized_path) # Verify the path exists and is a directory # Note: For network paths, this check might fail if network is temporarily down, # but if the user just selected it via the folder picker, it should be accessible if not os.path.exists(normalized_path): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Selected path does not exist: {normalized_path}", ) if not os.path.isdir(normalized_path): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Selected path is not a directory: {normalized_path}", ) return { "path": normalized_path, "success": True, "message": f"Selected folder: {normalized_path}" } else: return { "path": "", "success": False, "message": "No folder selected" } except Exception as e: # Handle errors gracefully error_msg = str(e) # Check for common issues (display/headless server) if "display" in error_msg.lower() or "DISPLAY" not in os.environ: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No display available. Cannot show folder picker. " "If running on a remote server, ensure X11 forwarding is enabled.", ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error showing folder picker: {error_msg}", ) @router.get("/{photo_id}", response_model=PhotoResponse) def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse: """Get photo by ID.""" from backend.db.models import Photo photo = db.query(Photo).filter(Photo.id == photo_id).first() if not photo: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo {photo_id} not found", ) return PhotoResponse.model_validate(photo) @router.get("/{photo_id}/image") def get_photo_image( photo_id: int, request: Request, db: Session = Depends(get_db) ): """Serve photo image or video file for display (not download).""" import os import mimetypes from backend.db.models import Photo from starlette.responses import FileResponse photo = db.query(Photo).filter(Photo.id == photo_id).first() if not photo: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo {photo_id} not found", ) if not os.path.exists(photo.path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo file not found: {photo.path}", ) # If it's a video, handle range requests for video streaming if photo.media_type == "video": media_type, _ = mimetypes.guess_type(photo.path) if not media_type or not media_type.startswith('video/'): media_type = "video/mp4" file_size = os.path.getsize(photo.path) # Get range header - Starlette uses lowercase range_header = request.headers.get("range") # Debug: log what we're getting (remove after debugging) if photo_id == 737: # Only for this specific video import json debug_info = { "range_header": range_header, "all_headers": dict(request.headers), "header_keys": list(request.headers.keys()) } print(f"DEBUG photo 737: {json.dumps(debug_info, indent=2)}") if range_header: try: # Parse range header: "bytes=start-end" or "bytes=start-" or "bytes=-suffix" range_match = range_header.replace("bytes=", "").split("-") start_str = range_match[0].strip() end_str = range_match[1].strip() if len(range_match) > 1 and range_match[1] else "" start = int(start_str) if start_str else 0 end = int(end_str) if end_str else file_size - 1 # Validate range if start < 0: start = 0 if end >= file_size: end = file_size - 1 if start > end: return Response( status_code=416, headers={"Content-Range": f"bytes */{file_size}"} ) # Read the requested chunk chunk_size = end - start + 1 with open(photo.path, "rb") as f: f.seek(start) chunk = f.read(chunk_size) return Response( content=chunk, status_code=206, headers={ "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(chunk_size), "Content-Type": media_type, "Content-Disposition": "inline", "Cache-Control": "public, max-age=3600", }, media_type=media_type, ) except (ValueError, IndexError) as e: # If range parsing fails, fall through to serve full file pass # No range request or parsing failed - serve full file with range support headers response = FileResponse( photo.path, media_type=media_type, ) response.headers["Content-Disposition"] = "inline" response.headers["Accept-Ranges"] = "bytes" response.headers["Cache-Control"] = "public, max-age=3600" return response # Determine media type from file extension for images media_type, _ = mimetypes.guess_type(photo.path) if not media_type or not media_type.startswith('image/'): media_type = "image/jpeg" # Use FileResponse but set headers to display inline (not download) response = FileResponse( photo.path, media_type=media_type, ) # Set Content-Disposition to inline so browser displays instead of downloads response.headers["Content-Disposition"] = "inline" response.headers["Cache-Control"] = "public, max-age=3600" return response @router.post("/{photo_id}/toggle-favorite") def toggle_favorite( photo_id: int, current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> dict: """Toggle favorite status of a photo for current user.""" from backend.db.models import Photo, PhotoFavorite username = current_user["username"] # Verify photo exists photo = db.query(Photo).filter(Photo.id == photo_id).first() if not photo: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo {photo_id} not found", ) # Check if already favorited existing = db.query(PhotoFavorite).filter( PhotoFavorite.username == username, PhotoFavorite.photo_id == photo_id ).first() if existing: # Remove favorite db.delete(existing) is_favorite = False else: # Add favorite favorite = PhotoFavorite( username=username, photo_id=photo_id ) db.add(favorite) is_favorite = True db.commit() return { "photo_id": photo_id, "is_favorite": is_favorite, "message": "Favorite status updated" } @router.get("/{photo_id}/is-favorite") def check_favorite( photo_id: int, current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> dict: """Check if photo is favorited by current user.""" from backend.db.models import PhotoFavorite username = current_user["username"] favorite = db.query(PhotoFavorite).filter( PhotoFavorite.username == username, PhotoFavorite.photo_id == photo_id ).first() return { "photo_id": photo_id, "is_favorite": favorite is not None } @router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse) def bulk_add_favorites( request: BulkAddFavoritesRequest, current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> BulkAddFavoritesResponse: """Add multiple photos to favorites for current user. Only adds favorites for photos that aren't already favorites. Uses a single database transaction for better performance. """ from backend.db.models import Photo, PhotoFavorite photo_ids = request.photo_ids if not photo_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="photo_ids list cannot be empty", ) username = current_user["username"] # Verify all photos exist photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all() found_ids = {photo.id for photo in photos} missing_ids = set(photo_ids) - found_ids if missing_ids: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photos not found: {sorted(missing_ids)}", ) # Get existing favorites in a single query existing_favorites = db.query(PhotoFavorite).filter( PhotoFavorite.username == username, PhotoFavorite.photo_id.in_(photo_ids) ).all() existing_ids = {fav.photo_id for fav in existing_favorites} # Only add favorites for photos that aren't already favorites photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids] added_count = 0 for photo_id in photos_to_add: favorite = PhotoFavorite( username=username, photo_id=photo_id ) db.add(favorite) added_count += 1 db.commit() return BulkAddFavoritesResponse( message=f"Added {added_count} photo(s) to favorites", added_count=added_count, already_favorite_count=len(existing_ids), total_requested=len(photo_ids), ) @router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse) def bulk_remove_favorites( request: BulkRemoveFavoritesRequest, current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> BulkRemoveFavoritesResponse: """Remove multiple photos from favorites for current user. Only removes favorites for photos that are currently favorites. Uses a single database transaction for better performance. """ from backend.db.models import Photo, PhotoFavorite photo_ids = request.photo_ids if not photo_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="photo_ids list cannot be empty", ) username = current_user["username"] # Verify all photos exist photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all() found_ids = {photo.id for photo in photos} missing_ids = set(photo_ids) - found_ids if missing_ids: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photos not found: {sorted(missing_ids)}", ) # Get existing favorites in a single query existing_favorites = db.query(PhotoFavorite).filter( PhotoFavorite.username == username, PhotoFavorite.photo_id.in_(photo_ids) ).all() existing_ids = {fav.photo_id for fav in existing_favorites} # Only remove favorites for photos that are currently favorites photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids] removed_count = 0 for favorite in existing_favorites: if favorite.photo_id in photos_to_remove: db.delete(favorite) removed_count += 1 db.commit() return BulkRemoveFavoritesResponse( message=f"Removed {removed_count} photo(s) from favorites", removed_count=removed_count, not_favorite_count=len(photo_ids) - len(existing_ids), total_requested=len(photo_ids), ) @router.post("/bulk-delete", response_model=BulkDeletePhotosResponse) def bulk_delete_photos( request: BulkDeletePhotosRequest, current_admin: Annotated[dict, Depends(get_current_admin_user)], db: Session = Depends(get_db), ) -> BulkDeletePhotosResponse: """Delete multiple photos and all related data (faces, encodings, tags, favorites).""" from backend.db.models import Photo, PhotoTagLinkage photo_ids = list(dict.fromkeys(request.photo_ids)) if not photo_ids: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="photo_ids list cannot be empty", ) try: photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all() found_ids = {photo.id for photo in photos} missing_ids = sorted(set(photo_ids) - found_ids) deleted_count = 0 for photo in photos: # Remove tag linkages explicitly (in addition to cascade) to keep counts accurate db.query(PhotoTagLinkage).filter( PhotoTagLinkage.photo_id == photo.id ).delete(synchronize_session=False) db.delete(photo) deleted_count += 1 db.commit() except HTTPException: db.rollback() raise except Exception as exc: # pragma: no cover - safety net db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete photos: {exc}", ) admin_username = current_admin.get("username", "unknown") message_parts = [f"Deleted {deleted_count} photo(s)"] if missing_ids: message_parts.append(f"{len(missing_ids)} photo(s) not found") message_parts.append(f"Request by admin: {admin_username}") return BulkDeletePhotosResponse( message="; ".join(message_parts), deleted_count=deleted_count, missing_photo_ids=missing_ids, ) @router.post("/{photo_id}/open-folder") def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict: """Open the folder containing the photo in the system file manager and select the file. Matches desktop behavior and enhances it by selecting the specific file: - Windows: uses explorer /select,"file_path" - macOS: uses 'open -R' to reveal and select the file - Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder """ import os import sys import subprocess from backend.db.models import Photo photo = db.query(Photo).filter(Photo.id == photo_id).first() if not photo: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo {photo_id} not found", ) # Ensure we have absolute path file_path = os.path.abspath(photo.path) folder = os.path.dirname(file_path) if not os.path.exists(file_path): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Photo file not found: {file_path}", ) if not os.path.exists(folder): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Folder not found: {folder}", ) try: # Try using showinfm package first (better cross-platform support) try: from showinfm import show_in_file_manager show_in_file_manager(file_path) return { "message": f"Opened folder and selected file: {os.path.basename(file_path)}", "folder": folder, "file": file_path } except ImportError: # showinfm not installed, fall back to manual commands pass except Exception as e: # showinfm failed, fall back to manual commands pass # Fallback: Open folder and select the file using platform-specific commands if os.name == "nt": # Windows # Windows: explorer /select,"file_path" opens folder and selects the file subprocess.run(["explorer", "/select,", file_path], check=False) elif sys.platform == "darwin": # macOS # macOS: open -R reveals the file in Finder and selects it subprocess.run(["open", "-R", file_path], check=False) else: # Linux and others # Linux: Try file manager-specific commands to select the file # Try different file managers based on desktop environment opened = False # Detect desktop environment first desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower() # Try Nautilus (GNOME) - supports --select option if "gnome" in desktop_env or not desktop_env: try: result = subprocess.run( ["nautilus", "--select", file_path], check=False, timeout=5 ) if result.returncode == 0: opened = True except (FileNotFoundError, subprocess.TimeoutExpired): pass # Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env): try: # For Nemo, we can try opening the folder first, then using a script or # just opening the folder (Nemo may focus on the file if we pass it) # Try opening with the file path - Nemo will open the folder file_uri = f"file://{file_path}" result = subprocess.Popen( ["nemo", file_uri], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Nemo will open the folder - selection may not work perfectly # This is a limitation of Nemo opened = True except (FileNotFoundError, subprocess.TimeoutExpired): pass # Try Dolphin (KDE) - supports --select option if not opened and "kde" in desktop_env: try: result = subprocess.run( ["dolphin", "--select", file_path], check=False, timeout=5 ) if result.returncode == 0: opened = True except (FileNotFoundError, subprocess.TimeoutExpired): pass # Try PCManFM (LXDE/LXQt) - supports --select option if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env): try: result = subprocess.run( ["pcmanfm", "--select", file_path], check=False, timeout=5 ) if result.returncode == 0: opened = True except (FileNotFoundError, subprocess.TimeoutExpired): pass # Try Thunar (XFCE) - supports --select option if not opened and "xfce" in desktop_env: try: result = subprocess.run( ["thunar", "--select", file_path], check=False, timeout=5 ) if result.returncode == 0: opened = True except (FileNotFoundError, subprocess.TimeoutExpired): pass # If desktop-specific didn't work, try all file managers in order if not opened: file_managers = [ ("nautilus", ["--select", file_path]), ("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI ("dolphin", ["--select", file_path]), ("thunar", ["--select", file_path]), ("pcmanfm", ["--select", file_path]), ] for fm_name, fm_args in file_managers: try: result = subprocess.run( [fm_name] + fm_args, check=False, timeout=5 ) if result.returncode == 0: opened = True break except (FileNotFoundError, subprocess.TimeoutExpired): continue # Fallback: try xdg-open with the folder (will open folder but won't select file) if not opened: subprocess.run(["xdg-open", folder], check=False) return { "message": f"Opened folder and selected file: {os.path.basename(file_path)}", "folder": folder, "file": file_path } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to open folder: {str(e)}", )