This commit introduces several new analysis documents, including Auto-Match Load Performance Analysis, Folder Picker Analysis, Monorepo Migration Summary, and various performance analysis documents. Additionally, the installation scripts are updated to reflect changes in backend service paths, ensuring proper integration with the new backend structure. These enhancements provide better documentation and streamline the setup process for users.
261 lines
8.4 KiB
Python
261 lines
8.4 KiB
Python
"""RQ worker tasks for PunimTag."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from rq import get_current_job
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.db.session import SessionLocal
|
|
from backend.services.photo_service import import_photos_from_folder
|
|
from backend.services.face_service import process_unprocessed_photos
|
|
|
|
|
|
def import_photos_task(folder_path: str, recursive: bool = True) -> dict:
|
|
"""RQ task to import photos from a folder.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos found
|
|
- added: number of new photos added
|
|
- existing: number of photos that already existed
|
|
"""
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
try:
|
|
def update_progress(processed: int, total: int, current_file: str) -> None:
|
|
"""Update job progress."""
|
|
if job:
|
|
progress = int((processed / total) * 100) if total > 0 else 0
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": f"Processing {current_file}... ({processed}/{total})",
|
|
"processed": processed,
|
|
"total": total,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Import photos
|
|
added, existing = import_photos_from_folder(
|
|
db, folder_path, recursive, update_progress
|
|
)
|
|
|
|
# Final update
|
|
total_processed = added + existing
|
|
result = {
|
|
"folder_path": folder_path,
|
|
"recursive": recursive,
|
|
"added": added,
|
|
"existing": existing,
|
|
"total": total_processed,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": f"Completed: {added} new, {existing} existing",
|
|
"processed": total_processed,
|
|
"total": total_processed,
|
|
"added": added,
|
|
"existing": existing,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def process_faces_task(
|
|
batch_size: Optional[int] = None,
|
|
detector_backend: str = "retinaface",
|
|
model_name: str = "ArcFace",
|
|
) -> dict:
|
|
"""RQ task to process faces in unprocessed photos.
|
|
|
|
Updates job metadata with progress:
|
|
- progress: 0-100
|
|
- message: status message
|
|
- processed: number of photos processed
|
|
- total: total photos to process
|
|
- faces_detected: total faces detected
|
|
- faces_stored: total faces stored
|
|
"""
|
|
import traceback
|
|
|
|
job = get_current_job()
|
|
if not job:
|
|
raise RuntimeError("Not running in RQ job context")
|
|
|
|
print(f"[Task] Starting face processing task: job_id={job.id}, batch_size={batch_size}, detector={detector_backend}, model={model_name}")
|
|
|
|
# Update progress immediately - job started
|
|
try:
|
|
if job:
|
|
job.meta = {
|
|
"progress": 0,
|
|
"message": "Initializing face processing...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
except Exception as e:
|
|
print(f"[Task] Error setting initial job metadata: {e}")
|
|
|
|
db: Session = SessionLocal()
|
|
|
|
# Initialize result variables
|
|
photos_processed = 0
|
|
total_faces_detected = 0
|
|
total_faces_stored = 0
|
|
|
|
try:
|
|
def update_progress(
|
|
processed: int,
|
|
total: int,
|
|
current_file: str,
|
|
faces_detected: int,
|
|
faces_stored: int,
|
|
) -> None:
|
|
"""Update job progress and check for cancellation."""
|
|
if job:
|
|
# Check if job was cancelled
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
return # Don't update if cancelled
|
|
|
|
# Calculate progress: 10% for setup, 90% for processing
|
|
if total == 0:
|
|
# Setup phase
|
|
progress = min(10, processed * 2) # 0-10% during setup
|
|
else:
|
|
# Processing phase
|
|
progress = 10 + int((processed / total) * 90) if total > 0 else 10
|
|
|
|
job.meta = {
|
|
"progress": progress,
|
|
"message": f"Processing {current_file}... ({processed}/{total})" if total > 0 else current_file,
|
|
"processed": processed,
|
|
"total": total,
|
|
"faces_detected": faces_detected,
|
|
"faces_stored": faces_stored,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Check for cancellation after updating
|
|
if job.meta and job.meta.get("cancelled", False):
|
|
print(f"[Task] Job {job.id} cancellation detected")
|
|
raise KeyboardInterrupt("Job cancelled by user")
|
|
|
|
# Update progress - finding photos
|
|
if job:
|
|
job.meta = {
|
|
"progress": 5,
|
|
"message": "Finding photos to process...",
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
|
|
# Process faces
|
|
photos_processed, total_faces_detected, total_faces_stored = (
|
|
process_unprocessed_photos(
|
|
db,
|
|
batch_size=batch_size,
|
|
detector_backend=detector_backend,
|
|
model_name=model_name,
|
|
update_progress=update_progress,
|
|
)
|
|
)
|
|
|
|
# Final update
|
|
result = {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
}
|
|
|
|
if job:
|
|
job.meta = {
|
|
"progress": 100,
|
|
"message": (
|
|
f"Completed: {photos_processed} photos, "
|
|
f"{total_faces_stored} faces stored"
|
|
),
|
|
"processed": photos_processed,
|
|
"total": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
}
|
|
job.save_meta()
|
|
|
|
return result
|
|
|
|
except KeyboardInterrupt as e:
|
|
# Job was cancelled - exit gracefully
|
|
print(f"[Task] Job {job.id if job else 'unknown'} cancelled by user")
|
|
if job:
|
|
try:
|
|
job.meta = job.meta or {}
|
|
job.meta.update({
|
|
"progress": job.meta.get("progress", 0),
|
|
"message": "Cancelled by user - finished current photo",
|
|
"cancelled": True,
|
|
"processed": job.meta.get("processed", photos_processed),
|
|
"total": job.meta.get("total", 0),
|
|
"faces_detected": job.meta.get("faces_detected", total_faces_detected),
|
|
"faces_stored": job.meta.get("faces_stored", total_faces_stored),
|
|
})
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
# Don't re-raise - job cancellation is not a failure
|
|
return {
|
|
"photos_processed": photos_processed,
|
|
"faces_detected": total_faces_detected,
|
|
"faces_stored": total_faces_stored,
|
|
"detector_backend": detector_backend,
|
|
"model_name": model_name,
|
|
"cancelled": True,
|
|
}
|
|
|
|
except Exception as e:
|
|
# Log error and update job metadata
|
|
error_msg = f"Task failed: {str(e)}"
|
|
print(f"[Task] ❌ {error_msg}")
|
|
traceback.print_exc()
|
|
|
|
if job:
|
|
try:
|
|
job.meta = {
|
|
"progress": 0,
|
|
"message": error_msg,
|
|
"processed": 0,
|
|
"total": 0,
|
|
"faces_detected": 0,
|
|
"faces_stored": 0,
|
|
}
|
|
job.save_meta()
|
|
except Exception:
|
|
pass
|
|
|
|
# Re-raise so RQ marks job as failed
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|