feat: Add media_type column to photos table and enhance video handling

This commit introduces a new column, media_type, to the photos table to differentiate between image and video files. The ensure_photo_media_type_column function has been added to manage the database schema changes. Additionally, the photo and video processing logic has been updated to skip videos during face detection and to extract metadata from videos, including the date taken. The find_photos_in_folder function now supports both image and video formats, improving the overall media management capabilities. Documentation has been updated to reflect these changes.
This commit is contained in:
tanyar09 2025-12-01 12:21:24 -05:00
parent a888968a97
commit 9c6a2ff05e
5 changed files with 174 additions and 11 deletions

View File

@ -324,6 +324,51 @@ def ensure_user_role_column(inspector) -> None:
print("✅ Ensured users.role column exists and is populated")
def ensure_photo_media_type_column(inspector) -> None:
"""Ensure photos table contains media_type column."""
if "photos" not in inspector.get_table_names():
return
columns = {column["name"] for column in inspector.get_columns("photos")}
if "media_type" in columns:
print(" media_type column already exists in photos table")
return
print("🔄 Adding media_type column to photos table...")
dialect = engine.dialect.name
with engine.connect() as connection:
with connection.begin():
if dialect == "postgresql":
connection.execute(
text("ALTER TABLE photos ADD COLUMN IF NOT EXISTS media_type TEXT NOT NULL DEFAULT 'image'")
)
# Add index
try:
connection.execute(
text("CREATE INDEX IF NOT EXISTS idx_photos_media_type ON photos(media_type)")
)
except Exception:
pass # Index might already exist
else:
# SQLite
connection.execute(
text("ALTER TABLE photos ADD COLUMN media_type TEXT DEFAULT 'image'")
)
# Update existing rows to have 'image' as default
connection.execute(
text("UPDATE photos SET media_type = 'image' WHERE media_type IS NULL")
)
# SQLite doesn't support IF NOT EXISTS for indexes, so we'll try to create it
try:
connection.execute(
text("CREATE INDEX idx_photos_media_type ON photos(media_type)")
)
except Exception:
pass # Index might already exist
print("✅ Added media_type column to photos table")
def ensure_role_permissions_table(inspector) -> None:
"""Ensure the role_permissions table exists for permission matrix."""
if "role_permissions" in inspector.get_table_names():
@ -373,6 +418,7 @@ async def lifespan(app: FastAPI):
ensure_user_email_unique_constraint(inspector)
ensure_face_identified_by_user_id_column(inspector)
ensure_user_role_column(inspector)
ensure_photo_media_type_column(inspector)
ensure_role_permissions_table(inspector)
except Exception as exc:
print(f"❌ Database initialization failed: {exc}")

View File

@ -9,6 +9,9 @@ from __future__ import annotations
# Supported image formats for uploads/imports
SUPPORTED_IMAGE_FORMATS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"}
# Supported video formats for scanning (not processed for faces)
SUPPORTED_VIDEO_FORMATS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv", ".mpg", ".mpeg"}
# DeepFace behavior
DEEPFACE_ENFORCE_DETECTION = False
DEEPFACE_ALIGN_FACES = True

View File

@ -41,6 +41,7 @@ class Photo(Base):
date_taken = Column(Date, nullable=True, index=True)
processed = Column(Boolean, default=False, nullable=False, index=True)
file_hash = Column(Text, nullable=False, index=True)
media_type = Column(Text, default="image", nullable=False, index=True) # "image" or "video"
faces = relationship("Face", back_populates="photo", cascade="all, delete-orphan")
photo_tags = relationship(

View File

@ -306,6 +306,14 @@ def process_photo_faces(
if not os.path.exists(photo_path):
return 0, 0
# Skip videos (videos are not processed for face detection)
try:
media_type = getattr(photo, 'media_type', 'image')
if media_type == 'video':
return 0, 0
except Exception:
pass
# Skip if already processed (desktop parity)
try:
if getattr(photo, 'processed', False):
@ -983,8 +991,13 @@ def process_unprocessed_photos(
update_progress(0, 0, f"{batch_msg} that need processing...", 0, 0)
# Desktop parity: find photos that are not yet processed
# Also filter out videos (only process images for face detection)
query_start = time.time()
unprocessed_query = db.query(Photo).filter(getattr(Photo, 'processed') == False) # noqa: E712
# Filter for unprocessed photos, excluding videos
unprocessed_query = db.query(Photo).filter(
Photo.processed == False, # noqa: E712
Photo.media_type != 'video' # Skip videos (videos are marked as processed and not processed for faces)
)
# Apply batch size limit BEFORE executing query to avoid loading unnecessary photos
# When batch_size is set, only that many photos are fetched from the database

View File

@ -11,7 +11,7 @@ from typing import Callable, Optional, Tuple
from PIL import Image
from sqlalchemy.orm import Session
from src.web.config import SUPPORTED_IMAGE_FORMATS
from src.web.config import SUPPORTED_IMAGE_FORMATS, SUPPORTED_VIDEO_FORMATS
from src.web.db.models import Photo
@ -127,6 +127,80 @@ def calculate_file_hash(file_path: str) -> str:
raise
def extract_video_date(video_path: str) -> Optional[date]:
"""Extract date taken from video metadata.
Tries in order:
1. Video metadata date (using ffprobe if available)
2. File modification time (as fallback)
Returns:
Date object or None if no date can be determined
"""
# Try to extract date from video metadata using ffprobe
try:
import subprocess
import json
# Use ffprobe to get video metadata
result = subprocess.run(
[
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
metadata = json.loads(result.stdout)
format_info = metadata.get("format", {})
# Try common date tags in video metadata
date_tags = [
"creation_time", # Common in MP4/MOV
"date", # Alternative tag
"com.apple.quicktime.creationdate", # QuickTime specific
]
for tag in date_tags:
date_str = format_info.get("tags", {}).get(tag)
if date_str:
try:
# Try ISO format first (2023-12-25T10:30:00)
if "T" in date_str:
dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
else:
# Try other common formats
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
return dt.date()
except (ValueError, AttributeError):
continue
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError, Exception) as e:
# ffprobe not available or failed - fall through to file modification time
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Failed to extract video metadata from {video_path}: {e}")
# Fallback to file modification time
try:
if os.path.exists(video_path):
mtime = os.path.getmtime(video_path)
mtime_date = datetime.fromtimestamp(mtime).date()
return mtime_date
except Exception as e:
# Log error for debugging (but don't fail the import)
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Failed to get file modification time from {video_path}: {e}")
return None
def extract_photo_date(image_path: str) -> Optional[date]:
"""Extract date taken from photo with fallback to file modification time.
@ -158,24 +232,29 @@ def extract_photo_date(image_path: str) -> Optional[date]:
def find_photos_in_folder(folder_path: str, recursive: bool = True) -> list[str]:
"""Find all photo files in a folder."""
"""Find all photo and video files in a folder.
Returns both image and video files. Videos are scanned but not processed for faces.
"""
folder_path = os.path.abspath(folder_path)
if not os.path.isdir(folder_path):
return []
found_photos = []
# Combine image and video formats
supported_formats = SUPPORTED_IMAGE_FORMATS | SUPPORTED_VIDEO_FORMATS
if recursive:
for root, _dirs, files in os.walk(folder_path):
for file in files:
file_ext = Path(file).suffix.lower()
if file_ext in SUPPORTED_IMAGE_FORMATS:
if file_ext in supported_formats:
photo_path = os.path.join(root, file)
found_photos.append(photo_path)
else:
for file in os.listdir(folder_path):
file_ext = Path(file).suffix.lower()
if file_ext in SUPPORTED_IMAGE_FORMATS:
if file_ext in supported_formats:
photo_path = os.path.join(folder_path, file)
if os.path.isfile(photo_path):
found_photos.append(photo_path)
@ -186,13 +265,20 @@ def find_photos_in_folder(folder_path: str, recursive: bool = True) -> list[str]
def import_photo_from_path(
db: Session, photo_path: str, update_progress: Optional[Callable[[int, int, str], None]] = None
) -> Tuple[Optional[Photo], bool]:
"""Import a single photo from file path into database.
"""Import a single photo or video from file path into database.
Returns:
Tuple of (Photo instance or None, is_new: bool)
"""
photo_path = os.path.abspath(photo_path)
filename = os.path.basename(photo_path)
file_ext = Path(photo_path).suffix.lower()
# Determine media type
if file_ext in SUPPORTED_VIDEO_FORMATS:
media_type = "video"
else:
media_type = "image"
# Calculate file hash for duplicate detection
try:
@ -209,7 +295,10 @@ def import_photo_from_path(
if existing:
# If existing photo doesn't have date_taken, try to update it
if existing.date_taken is None:
date_taken = extract_photo_date(photo_path)
if media_type == "video":
date_taken = extract_video_date(photo_path)
else:
date_taken = extract_photo_date(photo_path)
if date_taken:
existing.date_taken = date_taken
db.commit()
@ -226,7 +315,10 @@ def import_photo_from_path(
db.refresh(existing_by_path)
# If existing photo doesn't have date_taken, try to update it
if existing_by_path.date_taken is None:
date_taken = extract_photo_date(photo_path)
if media_type == "video":
date_taken = extract_video_date(photo_path)
else:
date_taken = extract_photo_date(photo_path)
if date_taken:
existing_by_path.date_taken = date_taken
db.commit()
@ -234,15 +326,23 @@ def import_photo_from_path(
return existing_by_path, False
# Extract date taken with fallback to file modification time
date_taken = extract_photo_date(photo_path)
if media_type == "video":
date_taken = extract_video_date(photo_path)
else:
date_taken = extract_photo_date(photo_path)
# Create new photo record with file_hash
# For videos, mark as processed immediately (we don't process videos for faces)
# For images, start as unprocessed
processed = media_type == "video"
# Create new photo record with file_hash and media_type
photo = Photo(
path=photo_path,
filename=filename,
date_taken=date_taken,
processed=False,
processed=processed,
file_hash=file_hash,
media_type=media_type,
)
db.add(photo)