Some checks failed
CI / skip-ci-check (pull_request) Successful in 1m4s
CI / lint-and-type-check (pull_request) Has been cancelled
CI / python-lint (pull_request) Has been cancelled
CI / test-backend (pull_request) Has been cancelled
CI / build (pull_request) Has been cancelled
CI / secret-scanning (pull_request) Has been cancelled
CI / dependency-scan (pull_request) Has been cancelled
CI / sast-scan (pull_request) Has been cancelled
CI / workflow-summary (pull_request) Has been cancelled
Add on-demand H.264/AAC web playback (RQ, ffmpeg) with API routes and Next.js proxies; extend admin UI with WebPlaybackVideo and shared hooks. Store transcode cache beside pending-photos (WEB_VIDEO_CACHE_DIR / UPLOAD_DIR) and ignore data/web_videos. Centralize FastAPI URL helpers, optional Vite and Next base paths for subfolder deploy, and fix modal reopen by using router.replace when closing the home photo viewer. Include migration, install scripts, deployment doc updates, and CI admin build env tweak. Made-with: Cursor
423 lines
15 KiB
Python
423 lines
15 KiB
Python
"""RQ worker tasks for PunimTag."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from rq import get_current_job
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.db.session import SessionLocal
|
|
from backend.services.photo_service import import_photos_from_folder
|
|
from backend.services.face_service import process_unprocessed_photos
|
|
|
|
|
|
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
|
|
"""RQ task to import photos from a folder.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos found
|
|
- added: number of new photos added
|
|
- existing: number of photos that already existed
|
|
"""
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
try:
|
|
def update_progress(processed: int, total: int, current_file: str) -> None:
|
|
"""Update job progress."""
|
|
if job:
|
|
progress = int((processed / total) * 100) if total > 0 else 0
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": f"Processing {current_file}... ({processed}/{total})",
|
|
"processed": processed,
|
|
"total": total,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Import photos
|
|
added, existing = import_photos_from_folder(
|
|
db, folder_path, recursive, update_progress
|
|
)
|
|
|
|
# Final update
|
|
total_processed = added + existing
|
|
result = {
|
|
"folder_path": folder_path,
|
|
"recursive": recursive,
|
|
"added": added,
|
|
"existing": existing,
|
|
"total": total_processed,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": f"Completed: {added} new, {existing} existing",
|
|
"processed": total_processed,
|
|
"total": total_processed,
|
|
"added": added,
|
|
"existing": existing,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def process_faces_task(
|
|
batch_size: Optional[int] = None,
|
|
detector_backend: str = "retinaface",
|
|
model_name: str = "ArcFace",
|
|
) -> dict:
|
|
"""RQ task to process faces in unprocessed photos.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos to process
|
|
- faces_detected: total faces detected
|
|
- faces_stored: total faces stored
|
|
"""
|
|
import traceback
|
|
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
|
|
|
|
# Update progress immediately - job started
|
|
try:
|
|
if job:
|
|
job.meta = {
|
|
"progress": 0,
|
|
"message": "Initializing face processing...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
except Exception as e:
|
|
print(f"[Task] Error setting initial job metadata: {e}")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
# Initialize result variables
|
|
photos_processed = 0
|
|
total_faces_detected = 0
|
|
total_faces_stored = 0
|
|
|
|
def refresh_db_session():
|
|
"""Refresh database session if it becomes stale or disconnected.
|
|
|
|
This prevents crashes when the database connection is lost during long-running
|
|
processing tasks. Closes the old session and creates a new one.
|
|
"""
|
|
nonlocal db
|
|
try:
|
|
# Test if the session is still alive by executing a simple query
|
|
from sqlalchemy import text
|
|
db.execute(text("SELECT 1"))
|
|
db.commit() # Ensure transaction is clean
|
|
except Exception as e:
|
|
# Session is stale or disconnected - create a new one
|
|
try:
|
|
print(f"[Task] Database session disconnected, refreshing... Error: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
try:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
db = SessionLocal()
|
|
try:
|
|
print(f"[Task] Database session refreshed")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
try:
|
|
def update_progress(
|
|
processed: int,
|
|
total: int,
|
|
current_file: str,
|
|
faces_detected: int,
|
|
faces_stored: int,
|
|
) -> None:
|
|
"""Update job progress.
|
|
|
|
NOTE: This function does NOT check for cancellation to avoid interrupting
|
|
photo processing. Cancellation is checked in process_unprocessed_photos
|
|
BEFORE starting each photo and AFTER completing it, ensuring the current
|
|
photo always finishes fully.
|
|
"""
|
|
try:
|
|
if job:
|
|
# Calculate progress: 10% for setup, 90% for processing
|
|
if total == 0:
|
|
# Setup phase
|
|
progress = min(10, processed * 2) # 0-10% during setup
|
|
else:
|
|
# Processing phase
|
|
progress = 10 + int((processed / total) * 90) if total > 0 else 10
|
|
|
|
# Avoid duplicating "Processing" if current_file already contains it
|
|
if current_file.startswith("Processing "):
|
|
message = f"{current_file} ({processed}/{total})" if total > 0 else current_file
|
|
else:
|
|
message = f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file
|
|
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": message,
|
|
"processed": processed,
|
|
"total": total,
|
|
"faces_detected": faces_detected,
|
|
"faces_stored": faces_stored,
|
|
}
|
|
job.save_meta()
|
|
except Exception as e:
|
|
# Log but don't fail the batch if progress update fails
|
|
try:
|
|
print(f"[Task] Warning: Failed to update progress: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
# Update progress - finding photos
|
|
if job:
|
|
job.meta = {
|
|
"progress": 5,
|
|
"message": "Finding photos to process...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Process faces
|
|
# Wrap in try-except to ensure we preserve progress even if process_unprocessed_photos fails
|
|
try:
|
|
# Refresh session before starting processing to ensure it's healthy
|
|
refresh_db_session()
|
|
|
|
photos_processed, total_faces_detected, total_faces_stored = (
|
|
process_unprocessed_photos(
|
|
db,
|
|
batch_size=batch_size,
|
|
detector_backend=detector_backend,
|
|
model_name=model_name,
|
|
update_progress=update_progress,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
# Check if it's a database connection error
|
|
error_str = str(e).lower()
|
|
is_db_error = any(keyword in error_str for keyword in [
|
|
'connection', 'disconnect', 'timeout', 'closed', 'lost',
|
|
'operationalerror', 'database', 'server closed', 'connection reset',
|
|
'connection pool', 'connection refused', 'session needs refresh'
|
|
])
|
|
|
|
if is_db_error:
|
|
# Try to refresh the session - this helps if the error is recoverable
|
|
# but we don't retry the entire batch to avoid reprocessing photos
|
|
try:
|
|
print(f"[Task] Database error detected, attempting to refresh session: {e}")
|
|
refresh_db_session()
|
|
print(f"[Task] Session refreshed - job will fail gracefully. Restart job to continue processing remaining photos.")
|
|
except Exception as refresh_error:
|
|
try:
|
|
print(f"[Task] Failed to refresh database session: {refresh_error}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
# If process_unprocessed_photos fails, preserve any progress made
|
|
# and re-raise so the outer handler can log it properly
|
|
try:
|
|
print(f"[Task] Error in process_unprocessed_photos: {e}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
# Re-raise to be handled by outer exception handler
|
|
raise
|
|
|
|
# Final update
|
|
result = {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": (
|
|
f"Completed: {photos_processed} photos, "
|
|
f"{total_faces_stored} faces stored"
|
|
),
|
|
"processed": photos_processed,
|
|
"total": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
except KeyboardInterrupt as e:
|
|
# Job was cancelled - exit gracefully
|
|
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
|
|
if job:
|
|
try:
|
|
job.meta = job.meta or {}
|
|
job.meta.update({
|
|
"progress": job.meta.get("progress", 0),
|
|
"message": "Cancelled by user - finished current photo",
|
|
"cancelled": True,
|
|
"processed": job.meta.get("processed", photos_processed),
|
|
"total": job.meta.get("total", 0),
|
|
"faces_detected": job.meta.get("faces_detected", total_faces_detected),
|
|
"faces_stored": job.meta.get("faces_stored", total_faces_stored),
|
|
})
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
# Don't re-raise - job cancellation is not a failure
|
|
return {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
"cancelled": True,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Log error and update job metadata
|
|
error_msg = f"Task failed: {str(e)}"
|
|
try:
|
|
print(f"[Task] ❌ {error_msg}")
|
|
except (BrokenPipeError, OSError):
|
|
pass # Ignore broken pipe errors when printing
|
|
|
|
# Try to print traceback, but don't fail if stdout is closed
|
|
try:
|
|
traceback.print_exc()
|
|
except (BrokenPipeError, OSError):
|
|
# If printing fails, at least log the error type
|
|
try:
|
|
print(f"[Task] Error type: {type(e).__name__}: {str(e)}")
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
|
|
if job:
|
|
try:
|
|
# Preserve progress information if available
|
|
existing_meta = job.meta or {}
|
|
job.meta = {
|
|
"progress": existing_meta.get("progress", 0),
|
|
"message": error_msg,
|
|
"processed": existing_meta.get("processed", photos_processed),
|
|
"total": existing_meta.get("total", 0),
|
|
"faces_detected": existing_meta.get("faces_detected", total_faces_detected),
|
|
"faces_stored": existing_meta.get("faces_stored", total_faces_stored),
|
|
}
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
|
|
# Re-raise so RQ marks job as failed
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def transcode_video_for_web_task(photo_id: int) -> dict:
|
|
"""Background job: build browser-safe MP4 (or mark original as ready if H.264/AAC)."""
|
|
import os
|
|
import traceback
|
|
|
|
from backend.db.models import Photo
|
|
from backend.db.session import SessionLocal
|
|
from backend.services.web_video_service import (
|
|
derived_mp4_path,
|
|
expire_web_playable_if_stale,
|
|
probe_browser_safe_video,
|
|
run_ffmpeg_web_transcode,
|
|
web_playback_is_ready,
|
|
)
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
photo = (
|
|
db.query(Photo)
|
|
.filter(Photo.id == photo_id)
|
|
.with_for_update()
|
|
.first()
|
|
)
|
|
if not photo or photo.media_type != "video":
|
|
return {"ok": False, "error": "not_found"}
|
|
|
|
expire_web_playable_if_stale(photo)
|
|
if web_playback_is_ready(photo):
|
|
db.commit()
|
|
return {"ok": True, "skipped": True}
|
|
|
|
photo.web_transcode_status = "running"
|
|
photo.web_transcode_error = None
|
|
db.commit()
|
|
|
|
if not os.path.isfile(photo.path):
|
|
raise RuntimeError("Source video file not found on disk")
|
|
|
|
if probe_browser_safe_video(photo.path):
|
|
photo.web_playable_path = photo.path
|
|
photo.web_transcode_status = "ready"
|
|
photo.web_transcode_error = None
|
|
db.commit()
|
|
return {"ok": True, "native": True}
|
|
|
|
dst = derived_mp4_path(photo_id)
|
|
run_ffmpeg_web_transcode(photo.path, dst)
|
|
photo.web_playable_path = str(dst)
|
|
photo.web_transcode_status = "ready"
|
|
photo.web_transcode_error = None
|
|
db.commit()
|
|
return {"ok": True, "transcoded": True}
|
|
except Exception as e:
|
|
try:
|
|
db.rollback()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
print(f"[Task] web transcode failed for photo {photo_id}: {e}")
|
|
traceback.print_exc()
|
|
except (BrokenPipeError, OSError):
|
|
pass
|
|
try:
|
|
photo = db.query(Photo).filter(Photo.id == photo_id).first()
|
|
if photo:
|
|
photo.web_transcode_status = "failed"
|
|
photo.web_transcode_error = str(e)[:2000]
|
|
db.commit()
|
|
except Exception:
|
|
pass
|
|
return {"ok": False, "error": str(e)}
|
|
finally:
|
|
db.close()
|
|
|