punimtag/backend/api/photos.py
Tanya 68d280e8f5 feat: Add new analysis documents and update installation scripts for backend integration
This commit introduces several new analysis documents, including Auto-Match Load Performance Analysis, Folder Picker Analysis, Monorepo Migration Summary, and various performance analysis documents. Additionally, the installation scripts are updated to reflect changes in backend service paths, ensuring proper integration with the new backend structure. These enhancements provide better documentation and streamline the setup process for users.
2025-12-30 15:04:32 -05:00

972 lines
36 KiB
Python

"""Photo management endpoints."""
from __future__ import annotations
from datetime import date, datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status
from fastapi.responses import JSONResponse, FileResponse
from typing import Annotated
from rq import Queue
from redis import Redis
from sqlalchemy.orm import Session
from backend.db.session import get_db
from backend.api.auth import get_current_user
from backend.api.users import get_current_admin_user
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
queue = Queue(connection=redis_conn)
from backend.schemas.photos import (
PhotoImportRequest,
PhotoImportResponse,
PhotoResponse,
BulkAddFavoritesRequest,
BulkAddFavoritesResponse,
BulkDeletePhotosRequest,
BulkDeletePhotosResponse,
BulkRemoveFavoritesRequest,
BulkRemoveFavoritesResponse,
)
from backend.schemas.search import (
PhotoSearchResult,
SearchPhotosResponse,
)
from backend.services.photo_service import (
find_photos_in_folder,
import_photo_from_path,
)
from backend.services.search_service import (
get_favorite_photos,
get_photo_face_count,
get_photo_person,
get_photo_tags,
get_photos_without_faces,
get_photos_without_tags,
get_processed_photos,
get_unprocessed_photos,
search_photos_by_date,
search_photos_by_name,
search_photos_by_tags,
)
# Note: Function passed as string path to avoid RQ serialization issues
router = APIRouter(prefix="/photos", tags=["photos"])
@router.get("", response_model=SearchPhotosResponse)
def search_photos(
current_user: Annotated[dict, Depends(get_current_user)],
search_type: str = Query("name", description="Search type: name, date, tags, no_faces, no_tags, processed, unprocessed, favorites"),
person_name: Optional[str] = Query(None, description="Person name for name search"),
tag_names: Optional[str] = Query(None, description="Comma-separated tag names for tag search"),
match_all: bool = Query(False, description="Match all tags (for tag search)"),
date_from: Optional[str] = Query(None, description="Date from (YYYY-MM-DD)"),
date_to: Optional[str] = Query(None, description="Date to (YYYY-MM-DD)"),
folder_path: Optional[str] = Query(None, description="Filter by folder path"),
media_type: Optional[str] = Query(None, description="Filter by media type: 'all', 'image', or 'video'"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
) -> SearchPhotosResponse:
"""Search photos with filters.
Matches desktop search functionality exactly:
- Search by name: person_name required
- Search by date: date_from or date_to required
- Search by tags: tag_names required (comma-separated)
- Search no faces: returns photos without faces
- Search no tags: returns photos without tags
- Search processed: returns photos that have been processed for face detection
- Search unprocessed: returns photos that have not been processed for face detection
- Search favorites: returns photos favorited by current user
"""
from backend.db.models import PhotoFavorite
items: List[PhotoSearchResult] = []
total = 0
username = current_user["username"] if current_user else None
# Helper function to check if photo is favorite
def check_is_favorite(photo_id: int) -> bool:
if not username:
return False
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return favorite is not None
# Parse date filters for use as additional filters (when not using date search type)
df = date.fromisoformat(date_from) if date_from else None
dt = date.fromisoformat(date_to) if date_to else None
# Parse tag filters for use as additional filters
tag_list = None
if tag_names:
tag_list = [t.strip() for t in tag_names.split(",") if t.strip()]
if search_type == "name":
if not person_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="person_name is required for name search",
)
results, total = search_photos_by_name(
db, person_name, folder_path, media_type, df, dt, tag_list, match_all, page, page_size
)
for photo, full_name in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=full_name,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "date":
if not date_from and not date_to:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one of date_from or date_to is required",
)
results, total = search_photos_by_date(db, df, dt, folder_path, media_type, tag_list, match_all, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "tags":
if not tag_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="tag_names is required for tag search",
)
if not tag_list or len(tag_list) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one tag name is required",
)
results, total = search_photos_by_tags(
db, tag_list, match_all, folder_path, media_type, df, dt, page, page_size
)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_faces":
results, total = get_photos_without_faces(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=None,
tags=tags,
has_faces=False,
face_count=0,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "no_tags":
results, total = get_photos_without_tags(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=[],
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "processed":
results, total = get_processed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "unprocessed":
results, total = get_unprocessed_photos(db, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=check_is_favorite(photo.id),
)
)
elif search_type == "favorites":
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required for favorites search",
)
results, total = get_favorite_photos(db, username, folder_path, media_type, df, dt, page, page_size)
for photo in results:
tags = get_photo_tags(db, photo.id)
face_count = get_photo_face_count(db, photo.id)
person_name_val = get_photo_person(db, photo.id)
# Convert datetime to date for date_added
date_added = photo.date_added.date() if isinstance(photo.date_added, datetime) else photo.date_added
items.append(
PhotoSearchResult(
id=photo.id,
path=photo.path,
filename=photo.filename,
date_taken=photo.date_taken,
date_added=date_added,
processed=photo.processed,
person_name=person_name_val,
tags=tags,
has_faces=face_count > 0,
face_count=face_count,
is_favorite=True, # All results are favorites
)
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid search_type: {search_type}",
)
return SearchPhotosResponse(items=items, page=page, page_size=page_size, total=total)
@router.post("/import", response_model=PhotoImportResponse)
def import_photos(
request: PhotoImportRequest,
) -> PhotoImportResponse:
"""Import photos from a folder path.
This endpoint enqueues a background job to scan and import photos.
"""
if not request.folder_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="folder_path is required",
)
# Validate folder exists
import os
if not os.path.isdir(request.folder_path):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Folder not found: {request.folder_path}",
)
# Estimate number of photos (quick scan)
estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive))
# Enqueue job
# Pass function as string path to avoid serialization issues
job = queue.enqueue(
"backend.services.tasks.import_photos_task",
request.folder_path,
request.recursive,
job_timeout="1h", # Allow up to 1 hour for large imports
)
return PhotoImportResponse(
job_id=job.id,
message=f"Photo import job queued for {request.folder_path}",
folder_path=request.folder_path,
estimated_photos=estimated_photos,
)
@router.post("/import/upload")
async def upload_photos(
files: list[UploadFile] = File(...),
db: Session = Depends(get_db),
) -> dict:
"""Upload photo files directly.
This endpoint accepts file uploads and imports them immediately.
Files are saved to PHOTO_STORAGE_DIR before import.
For large batches, prefer the /import endpoint with folder_path.
"""
import os
import shutil
from pathlib import Path
from backend.settings import PHOTO_STORAGE_DIR
# Ensure storage directory exists
storage_dir = Path(PHOTO_STORAGE_DIR)
storage_dir.mkdir(parents=True, exist_ok=True)
added_count = 0
existing_count = 0
errors = []
for file in files:
try:
# Generate unique filename to avoid conflicts
import uuid
file_ext = Path(file.filename).suffix
unique_filename = f"{uuid.uuid4()}{file_ext}"
stored_path = storage_dir / unique_filename
# Save uploaded file to storage
content = await file.read()
with open(stored_path, "wb") as f:
f.write(content)
# Import photo from stored location
photo, is_new = import_photo_from_path(db, str(stored_path))
if is_new:
added_count += 1
else:
existing_count += 1
# If photo already exists, delete duplicate upload
if os.path.exists(stored_path):
os.remove(stored_path)
except Exception as e:
errors.append(f"Error uploading {file.filename}: {str(e)}")
return {
"message": f"Uploaded {len(files)} files",
"added": added_count,
"existing": existing_count,
"errors": errors,
}
@router.post("/browse-folder")
def browse_folder() -> dict:
"""Open native folder picker dialog and return selected folder path.
Uses tkinter to show a native OS folder picker dialog.
Returns the full absolute path of the selected folder.
Returns:
dict with 'path' (str) and 'success' (bool) keys
"""
import os
import sys
try:
import tkinter as tk
from tkinter import filedialog
except ImportError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="tkinter is not available. Cannot show folder picker.",
)
try:
# Create root window (hidden)
root = tk.Tk()
root.withdraw() # Hide main window
root.attributes('-topmost', True) # Bring to front
# Show folder picker dialog
folder_path = filedialog.askdirectory(
title="Select folder to scan",
mustexist=True
)
# Clean up
root.destroy()
if folder_path:
# Normalize path to absolute
abs_path = os.path.abspath(folder_path)
return {
"path": abs_path,
"success": True,
"message": f"Selected folder: {abs_path}"
}
else:
return {
"path": "",
"success": False,
"message": "No folder selected"
}
except Exception as e:
# Handle errors gracefully
error_msg = str(e)
# Check for common issues (display/headless server)
if "display" in error_msg.lower() or "DISPLAY" not in os.environ:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No display available. Cannot show folder picker. "
"If running on a remote server, ensure X11 forwarding is enabled.",
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error showing folder picker: {error_msg}",
)
@router.get("/{photo_id}", response_model=PhotoResponse)
def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse:
"""Get photo by ID."""
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
return PhotoResponse.model_validate(photo)
@router.get("/{photo_id}/image")
def get_photo_image(photo_id: int, db: Session = Depends(get_db)) -> FileResponse:
"""Serve photo image file for display (not download)."""
import os
import mimetypes
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
if not os.path.exists(photo.path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {photo.path}",
)
# Determine media type from file extension
media_type, _ = mimetypes.guess_type(photo.path)
if not media_type or not media_type.startswith('image/'):
media_type = "image/jpeg"
# Use FileResponse but set headers to display inline (not download)
response = FileResponse(
photo.path,
media_type=media_type,
)
# Set Content-Disposition to inline so browser displays instead of downloads
response.headers["Content-Disposition"] = "inline"
response.headers["Cache-Control"] = "public, max-age=3600"
return response
@router.post("/{photo_id}/toggle-favorite")
def toggle_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Toggle favorite status of a photo for current user."""
from backend.db.models import Photo, PhotoFavorite
username = current_user["username"]
# Verify photo exists
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Check if already favorited
existing = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
if existing:
# Remove favorite
db.delete(existing)
is_favorite = False
else:
# Add favorite
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
is_favorite = True
db.commit()
return {
"photo_id": photo_id,
"is_favorite": is_favorite,
"message": "Favorite status updated"
}
@router.get("/{photo_id}/is-favorite")
def check_favorite(
photo_id: int,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Check if photo is favorited by current user."""
from backend.db.models import PhotoFavorite
username = current_user["username"]
favorite = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id == photo_id
).first()
return {
"photo_id": photo_id,
"is_favorite": favorite is not None
}
@router.post("/bulk-add-favorites", response_model=BulkAddFavoritesResponse)
def bulk_add_favorites(
request: BulkAddFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkAddFavoritesResponse:
"""Add multiple photos to favorites for current user.
Only adds favorites for photos that aren't already favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only add favorites for photos that aren't already favorites
photos_to_add = [photo_id for photo_id in photo_ids if photo_id not in existing_ids]
added_count = 0
for photo_id in photos_to_add:
favorite = PhotoFavorite(
username=username,
photo_id=photo_id
)
db.add(favorite)
added_count += 1
db.commit()
return BulkAddFavoritesResponse(
message=f"Added {added_count} photo(s) to favorites",
added_count=added_count,
already_favorite_count=len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-remove-favorites", response_model=BulkRemoveFavoritesResponse)
def bulk_remove_favorites(
request: BulkRemoveFavoritesRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> BulkRemoveFavoritesResponse:
"""Remove multiple photos from favorites for current user.
Only removes favorites for photos that are currently favorites.
Uses a single database transaction for better performance.
"""
from backend.db.models import Photo, PhotoFavorite
photo_ids = request.photo_ids
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
username = current_user["username"]
# Verify all photos exist
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = set(photo_ids) - found_ids
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photos not found: {sorted(missing_ids)}",
)
# Get existing favorites in a single query
existing_favorites = db.query(PhotoFavorite).filter(
PhotoFavorite.username == username,
PhotoFavorite.photo_id.in_(photo_ids)
).all()
existing_ids = {fav.photo_id for fav in existing_favorites}
# Only remove favorites for photos that are currently favorites
photos_to_remove = [photo_id for photo_id in photo_ids if photo_id in existing_ids]
removed_count = 0
for favorite in existing_favorites:
if favorite.photo_id in photos_to_remove:
db.delete(favorite)
removed_count += 1
db.commit()
return BulkRemoveFavoritesResponse(
message=f"Removed {removed_count} photo(s) from favorites",
removed_count=removed_count,
not_favorite_count=len(photo_ids) - len(existing_ids),
total_requested=len(photo_ids),
)
@router.post("/bulk-delete", response_model=BulkDeletePhotosResponse)
def bulk_delete_photos(
request: BulkDeletePhotosRequest,
current_admin: Annotated[dict, Depends(get_current_admin_user)],
db: Session = Depends(get_db),
) -> BulkDeletePhotosResponse:
"""Delete multiple photos and all related data (faces, encodings, tags, favorites)."""
from backend.db.models import Photo, PhotoTagLinkage
photo_ids = list(dict.fromkeys(request.photo_ids))
if not photo_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="photo_ids list cannot be empty",
)
try:
photos = db.query(Photo).filter(Photo.id.in_(photo_ids)).all()
found_ids = {photo.id for photo in photos}
missing_ids = sorted(set(photo_ids) - found_ids)
deleted_count = 0
for photo in photos:
# Remove tag linkages explicitly (in addition to cascade) to keep counts accurate
db.query(PhotoTagLinkage).filter(
PhotoTagLinkage.photo_id == photo.id
).delete(synchronize_session=False)
db.delete(photo)
deleted_count += 1
db.commit()
except HTTPException:
db.rollback()
raise
except Exception as exc: # pragma: no cover - safety net
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete photos: {exc}",
)
admin_username = current_admin.get("username", "unknown")
message_parts = [f"Deleted {deleted_count} photo(s)"]
if missing_ids:
message_parts.append(f"{len(missing_ids)} photo(s) not found")
message_parts.append(f"Request by admin: {admin_username}")
return BulkDeletePhotosResponse(
message="; ".join(message_parts),
deleted_count=deleted_count,
missing_photo_ids=missing_ids,
)
@router.post("/{photo_id}/open-folder")
def open_photo_folder(photo_id: int, db: Session = Depends(get_db)) -> dict:
"""Open the folder containing the photo in the system file manager and select the file.
Matches desktop behavior and enhances it by selecting the specific file:
- Windows: uses explorer /select,"file_path"
- macOS: uses 'open -R' to reveal and select the file
- Linux: tries file manager-specific commands (nautilus, dolphin, etc.) or opens folder
"""
import os
import sys
import subprocess
from backend.db.models import Photo
photo = db.query(Photo).filter(Photo.id == photo_id).first()
if not photo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo {photo_id} not found",
)
# Ensure we have absolute path
file_path = os.path.abspath(photo.path)
folder = os.path.dirname(file_path)
if not os.path.exists(file_path):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Photo file not found: {file_path}",
)
if not os.path.exists(folder):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Folder not found: {folder}",
)
try:
# Try using showinfm package first (better cross-platform support)
try:
from showinfm import show_in_file_manager
show_in_file_manager(file_path)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except ImportError:
# showinfm not installed, fall back to manual commands
pass
except Exception as e:
# showinfm failed, fall back to manual commands
pass
# Fallback: Open folder and select the file using platform-specific commands
if os.name == "nt": # Windows
# Windows: explorer /select,"file_path" opens folder and selects the file
subprocess.run(["explorer", "/select,", file_path], check=False)
elif sys.platform == "darwin": # macOS
# macOS: open -R reveals the file in Finder and selects it
subprocess.run(["open", "-R", file_path], check=False)
else: # Linux and others
# Linux: Try file manager-specific commands to select the file
# Try different file managers based on desktop environment
opened = False
# Detect desktop environment first
desktop_env = os.environ.get("XDG_CURRENT_DESKTOP", "").lower()
# Try Nautilus (GNOME) - supports --select option
if "gnome" in desktop_env or not desktop_env:
try:
result = subprocess.run(
["nautilus", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Nemo (Cinnamon/MATE) - doesn't support --select, but we can try folder with navigation
if not opened and ("cinnamon" in desktop_env or "mate" in desktop_env):
try:
# For Nemo, we can try opening the folder first, then using a script or
# just opening the folder (Nemo may focus on the file if we pass it)
# Try opening with the file path - Nemo will open the folder
file_uri = f"file://{file_path}"
result = subprocess.Popen(
["nemo", file_uri],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Nemo will open the folder - selection may not work perfectly
# This is a limitation of Nemo
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Dolphin (KDE) - supports --select option
if not opened and "kde" in desktop_env:
try:
result = subprocess.run(
["dolphin", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try PCManFM (LXDE/LXQt) - supports --select option
if not opened and ("lxde" in desktop_env or "lxqt" in desktop_env):
try:
result = subprocess.run(
["pcmanfm", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Try Thunar (XFCE) - supports --select option
if not opened and "xfce" in desktop_env:
try:
result = subprocess.run(
["thunar", "--select", file_path],
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# If desktop-specific didn't work, try all file managers in order
if not opened:
file_managers = [
("nautilus", ["--select", file_path]),
("nemo", [f"file://{file_path}"]), # Nemo uses file:// URI
("dolphin", ["--select", file_path]),
("thunar", ["--select", file_path]),
("pcmanfm", ["--select", file_path]),
]
for fm_name, fm_args in file_managers:
try:
result = subprocess.run(
[fm_name] + fm_args,
check=False,
timeout=5
)
if result.returncode == 0:
opened = True
break
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
# Fallback: try xdg-open with the folder (will open folder but won't select file)
if not opened:
subprocess.run(["xdg-open", folder], check=False)
return {
"message": f"Opened folder and selected file: {os.path.basename(file_path)}",
"folder": folder,
"file": file_path
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to open folder: {str(e)}",
)