diff --git a/README.md b/README.md index c83dbf9..b72830a 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,23 @@ This creates the SQLite database at `data/punimtag.db` (default). For PostgreSQL ### Running the Application -**Terminal 1 - Backend API:** +**Prerequisites:** +- Redis must be running (for background jobs) + ```bash + # Check if Redis is running + redis-cli ping + # If not running, start Redis: + # On Linux: sudo systemctl start redis + # On macOS with Homebrew: brew services start redis + # Or run directly: redis-server + ``` + +**Terminal 1 - Redis (if not running as service):** +```bash +redis-server +``` + +**Terminal 2 - Backend API:** ```bash cd /home/ladmin/Code/punimtag source venv/bin/activate @@ -72,7 +88,15 @@ export PYTHONPATH=/home/ladmin/Code/punimtag uvicorn src.web.app:app --host 127.0.0.1 --port 8000 ``` -**Terminal 2 - Frontend:** +**Terminal 3 - RQ Worker (required for photo import):** +```bash +cd /home/ladmin/Code/punimtag +source venv/bin/activate +export PYTHONPATH=/home/ladmin/Code/punimtag +python -m src.web.worker +``` + +**Terminal 4 - Frontend:** ```bash cd /home/ladmin/Code/punimtag/frontend npm run dev @@ -84,6 +108,8 @@ Then open your browser to **http://localhost:3000** - Username: `admin` - Password: `admin` +**Note:** The RQ worker (Terminal 3) is required for background photo import jobs. Without it, jobs will remain in "Pending" status. + --- ## 📖 Documentation diff --git a/frontend/src/api/jobs.ts b/frontend/src/api/jobs.ts new file mode 100644 index 0000000..7e80773 --- /dev/null +++ b/frontend/src/api/jobs.ts @@ -0,0 +1,28 @@ +import apiClient from './client' + +export enum JobStatus { + PENDING = 'pending', + STARTED = 'started', + PROGRESS = 'progress', + SUCCESS = 'success', + FAILURE = 'failure', +} + +export interface JobResponse { + id: string + status: JobStatus + progress: number + message: string + created_at: string + updated_at: string +} + +export const jobsApi = { + getJob: async (jobId: string): Promise => { + const { data } = await apiClient.get( + `/api/v1/jobs/${jobId}` + ) + return data + }, +} + diff --git a/frontend/src/api/photos.ts b/frontend/src/api/photos.ts new file mode 100644 index 0000000..48fe8e6 --- /dev/null +++ b/frontend/src/api/photos.ts @@ -0,0 +1,76 @@ +import apiClient from './client' +import { JobResponse } from './jobs' + +export interface PhotoImportRequest { + folder_path: string + recursive?: boolean +} + +export interface PhotoImportResponse { + job_id: string + message: string + folder_path?: string + estimated_photos?: number +} + +export interface PhotoResponse { + id: number + path: string + filename: string + checksum?: string + date_added: string + date_taken?: string + width?: number + height?: number + mime_type?: string +} + +export interface UploadResponse { + message: string + added: number + existing: number + errors: string[] +} + +export const photosApi = { + importPhotos: async ( + request: PhotoImportRequest + ): Promise => { + const { data } = await apiClient.post( + '/api/v1/photos/import', + request + ) + return data + }, + + uploadPhotos: async (files: File[]): Promise => { + const formData = new FormData() + files.forEach((file) => { + formData.append('files', file) + }) + + const { data } = await apiClient.post( + '/api/v1/photos/import/upload', + formData, + { + headers: { + 'Content-Type': 'multipart/form-data', + }, + } + ) + return data + }, + + getPhoto: async (photoId: number): Promise => { + const { data } = await apiClient.get( + `/api/v1/photos/${photoId}` + ) + return data + }, + + streamJobProgress: (jobId: string): EventSource => { + const baseURL = import.meta.env.VITE_API_URL || 'http://127.0.0.1:8000' + return new EventSource(`${baseURL}/api/v1/jobs/stream/${jobId}`) + }, +} + diff --git a/frontend/src/pages/Scan.tsx b/frontend/src/pages/Scan.tsx index 2ede5a5..6b518d6 100644 --- a/frontend/src/pages/Scan.tsx +++ b/frontend/src/pages/Scan.tsx @@ -1,12 +1,419 @@ +import { useState, useRef, useCallback, useEffect } from 'react' +import { photosApi, PhotoImportRequest } from '../api/photos' +import { jobsApi, JobResponse, JobStatus } from '../api/jobs' + +interface JobProgress { + id: string + status: string + progress: number + message: string + processed?: number + total?: number +} + export default function Scan() { + const [folderPath, setFolderPath] = useState('') + const [recursive, setRecursive] = useState(true) + const [isImporting, setIsImporting] = useState(false) + const [currentJob, setCurrentJob] = useState(null) + const [jobProgress, setJobProgress] = useState(null) + const [importResult, setImportResult] = useState<{ + added?: number + existing?: number + total?: number + } | null>(null) + const [error, setError] = useState(null) + const fileInputRef = useRef(null) + const eventSourceRef = useRef(null) + + // Cleanup event source on unmount + useEffect(() => { + return () => { + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + } + }, []) + + const handleFolderBrowse = () => { + // Note: Browser security prevents direct folder selection + // This is a workaround - user must type/paste path + // In production, consider using Electron or a file picker library + const path = prompt('Enter folder path to scan:') + if (path) { + setFolderPath(path) + } + } + + const handleDragOver = useCallback((e: React.DragEvent) => { + e.preventDefault() + e.stopPropagation() + }, []) + + const handleDrop = useCallback((e: React.DragEvent) => { + e.preventDefault() + e.stopPropagation() + + const files = Array.from(e.dataTransfer.files) + const imageFiles = files.filter((file) => + /\.(jpg|jpeg|png|bmp|tiff|tif)$/i.test(file.name) + ) + + if (imageFiles.length > 0) { + handleUploadFiles(imageFiles) + } + }, []) + + const handleFileSelect = (e: React.ChangeEvent) => { + const files = e.target.files + if (files && files.length > 0) { + handleUploadFiles(Array.from(files)) + } + } + + const handleUploadFiles = async (files: File[]) => { + setIsImporting(true) + setError(null) + setImportResult(null) + + try { + const result = await photosApi.uploadPhotos(files) + setImportResult({ + added: result.added, + existing: result.existing, + total: result.added + result.existing, + }) + setIsImporting(false) + } catch (err: any) { + setError(err.response?.data?.detail || err.message || 'Upload failed') + setIsImporting(false) + } + } + + const handleScanFolder = async () => { + if (!folderPath.trim()) { + setError('Please enter a folder path') + return + } + + setIsImporting(true) + setError(null) + setImportResult(null) + setCurrentJob(null) + setJobProgress(null) + + try { + const request: PhotoImportRequest = { + folder_path: folderPath.trim(), + recursive, + } + + const response = await photosApi.importPhotos(request) + setCurrentJob({ + id: response.job_id, + status: JobStatus.PENDING, + progress: 0, + message: response.message, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }) + + // Start SSE stream for job progress + startJobProgressStream(response.job_id) + } catch (err: any) { + setError(err.response?.data?.detail || err.message || 'Import failed') + setIsImporting(false) + } + } + + const startJobProgressStream = (jobId: string) => { + // Close existing stream if any + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + + const eventSource = photosApi.streamJobProgress(jobId) + eventSourceRef.current = eventSource + + eventSource.onmessage = (event) => { + try { + const data: JobProgress = JSON.parse(event.data) + setJobProgress(data) + + // Update job status + const statusMap: Record = { + pending: JobStatus.PENDING, + started: JobStatus.STARTED, + progress: JobStatus.PROGRESS, + success: JobStatus.SUCCESS, + failure: JobStatus.FAILURE, + } + + setCurrentJob({ + id: data.id, + status: statusMap[data.status] || JobStatus.PENDING, + progress: data.progress, + message: data.message, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }) + + // Check if job is complete + if (data.status === 'success' || data.status === 'failure') { + setIsImporting(false) + eventSource.close() + eventSourceRef.current = null + + // Fetch final job result to get added/existing counts + if (data.status === 'success') { + fetchJobResult(jobId) + } + } + } catch (err) { + console.error('Error parsing SSE event:', err) + } + } + + eventSource.onerror = (err) => { + console.error('SSE error:', err) + eventSource.close() + eventSourceRef.current = null + } + } + + const fetchJobResult = async (jobId: string) => { + try { + const job = await jobsApi.getJob(jobId) + // Job result may contain added/existing counts in metadata + // For now, we'll just update the job status + setCurrentJob(job) + } catch (err) { + console.error('Error fetching job result:', err) + } + } + + const getStatusColor = (status: JobStatus) => { + switch (status) { + case JobStatus.SUCCESS: + return 'text-green-600' + case JobStatus.FAILURE: + return 'text-red-600' + case JobStatus.STARTED: + case JobStatus.PROGRESS: + return 'text-blue-600' + default: + return 'text-gray-600' + } + } + return ( -
-

Scan

-
-

Folder scanning UI coming in Phase 2.

+
+

Scan Photos

+ +
+ {/* Folder Scan Section */} +
+

+ Scan Folder +

+ +
+
+ +
+ setFolderPath(e.target.value)} + placeholder="/path/to/photos" + className="flex-1 px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500" + disabled={isImporting} + /> + +
+

+ Enter the full path to the folder containing photos +

+
+ +
+ setRecursive(e.target.checked)} + disabled={isImporting} + className="h-4 w-4 text-blue-600 focus:ring-blue-500 border-gray-300 rounded" + /> + +
+ + +
+
+ + {/* File Upload Section */} +
+

+ Upload Photos +

+ +
+ + +
+ + + +

+ Drag and drop photos here, or{' '} + +

+

+ Supports: JPG, PNG, BMP, TIFF +

+
+
+
+ + {/* Progress Section */} + {(currentJob || jobProgress) && ( +
+

+ Import Progress +

+ + {currentJob && ( +
+
+
+ + {currentJob.status === JobStatus.SUCCESS && '✓ '} + {currentJob.status === JobStatus.FAILURE && '✗ '} + {currentJob.status.charAt(0).toUpperCase() + + currentJob.status.slice(1)} + + + {currentJob.progress}% + +
+
+
+
+
+ + {jobProgress && ( +
+ {jobProgress.processed !== undefined && + jobProgress.total !== undefined && ( +

+ Processed: {jobProgress.processed} /{' '} + {jobProgress.total} +

+ )} + {jobProgress.message && ( +

{jobProgress.message}

+ )} +
+ )} +
+ )} +
+ )} + + {/* Results Section */} + {importResult && ( +
+

+ Import Results +

+ +
+ {importResult.added !== undefined && ( +

+ ✓ {importResult.added} new photos added +

+ )} + {importResult.existing !== undefined && ( +

+ {importResult.existing} photos already in database +

+ )} + {importResult.total !== undefined && ( +

+ Total: {importResult.total} photos +

+ )} +
+
+ )} + + {/* Error Section */} + {error && ( +
+

{error}

+
+ )}
) } - - diff --git a/run_worker.sh b/run_worker.sh new file mode 100755 index 0000000..50c6581 --- /dev/null +++ b/run_worker.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# Start RQ worker for PunimTag background jobs + +cd "$(dirname "$0")" + +# Activate virtual environment if it exists +if [ -d "venv" ]; then + source venv/bin/activate +fi + +# Set Python path +export PYTHONPATH="$(pwd)" + +# Start worker +python -m src.web.worker + diff --git a/src/web/api/jobs.py b/src/web/api/jobs.py index 8eb6941..25f5268 100644 --- a/src/web/api/jobs.py +++ b/src/web/api/jobs.py @@ -3,12 +3,14 @@ from __future__ import annotations from datetime import datetime -from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, HTTPException, status +from fastapi.responses import StreamingResponse from rq import Queue from rq.job import Job from redis import Redis +import json +import time from src.web.schemas.jobs import JobResponse, JobStatus @@ -32,7 +34,7 @@ def get_job(job_id: str) -> JobResponse: } job_status = status_map.get(job.get_status(), JobStatus.PENDING) progress = 0 - if job_status == JobStatus.STARTED: + if job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS: progress = job.meta.get("progress", 0) if job.meta else 0 elif job_status == JobStatus.SUCCESS: progress = 100 @@ -43,7 +45,9 @@ def get_job(job_id: str) -> JobResponse: progress=progress, message=message, created_at=datetime.fromisoformat(str(job.created_at)), - updated_at=datetime.fromisoformat(str(job.ended_at or job.started_at or job.created_at)), + updated_at=datetime.fromisoformat( + str(job.ended_at or job.started_at or job.created_at) + ), ) except Exception: raise HTTPException( @@ -51,3 +55,64 @@ def get_job(job_id: str) -> JobResponse: detail=f"Job {job_id} not found", ) + +@router.get("/stream/{job_id}") +def stream_job_progress(job_id: str): + """Stream job progress via Server-Sent Events (SSE).""" + + def event_generator(): + """Generate SSE events for job progress.""" + last_progress = -1 + last_message = "" + + while True: + try: + job = Job.fetch(job_id, connection=redis_conn) + status_map = { + "queued": JobStatus.PENDING, + "started": JobStatus.STARTED, + "finished": JobStatus.SUCCESS, + "failed": JobStatus.FAILURE, + } + job_status = status_map.get(job.get_status(), JobStatus.PENDING) + + progress = 0 + if job_status == JobStatus.STARTED or job_status == JobStatus.PROGRESS: + progress = job.meta.get("progress", 0) if job.meta else 0 + elif job_status == JobStatus.SUCCESS: + progress = 100 + elif job_status == JobStatus.FAILURE: + progress = 0 + + message = job.meta.get("message", "") if job.meta else "" + + # Only send event if progress or message changed + if progress != last_progress or message != last_message: + event_data = { + "id": job.id, + "status": job_status.value, + "progress": progress, + "message": message, + "processed": job.meta.get("processed", 0) if job.meta else 0, + "total": job.meta.get("total", 0) if job.meta else 0, + } + + yield f"data: {json.dumps(event_data)}\n\n" + last_progress = progress + last_message = message + + # Stop streaming if job is complete or failed + if job_status in (JobStatus.SUCCESS, JobStatus.FAILURE): + break + + time.sleep(0.5) # Poll every 500ms + + except Exception as e: + error_data = {"error": str(e)} + yield f"data: {json.dumps(error_data)}\n\n" + break + + return StreamingResponse( + event_generator(), media_type="text/event-stream" + ) + diff --git a/src/web/api/photos.py b/src/web/api/photos.py index ea0f49e..a9d23c5 100644 --- a/src/web/api/photos.py +++ b/src/web/api/photos.py @@ -2,7 +2,27 @@ from __future__ import annotations -from fastapi import APIRouter +from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status +from fastapi.responses import JSONResponse +from rq import Queue +from redis import Redis +from sqlalchemy.orm import Session + +from src.web.db.session import get_db + +# Redis connection for RQ +redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) +queue = Queue(connection=redis_conn) +from src.web.schemas.photos import ( + PhotoImportRequest, + PhotoImportResponse, + PhotoResponse, +) +from src.web.services.photo_service import ( + find_photos_in_folder, + import_photo_from_path, +) +from src.web.services.tasks import import_photos_task router = APIRouter(prefix="/photos", tags=["photos"]) @@ -13,17 +33,118 @@ def list_photos() -> dict: return {"message": "Photos endpoint - to be implemented in Phase 2"} -@router.post("/import") -def import_photos() -> dict: - """Import photos - placeholder for Phase 2.""" - return {"message": "Import endpoint - to be implemented in Phase 2"} +@router.post("/import", response_model=PhotoImportResponse) +def import_photos( + request: PhotoImportRequest, +) -> PhotoImportResponse: + """Import photos from a folder path. + + This endpoint enqueues a background job to scan and import photos. + """ + if not request.folder_path: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="folder_path is required", + ) + + # Validate folder exists + import os + + if not os.path.isdir(request.folder_path): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Folder not found: {request.folder_path}", + ) + + # Estimate number of photos (quick scan) + estimated_photos = len(find_photos_in_folder(request.folder_path, request.recursive)) + + # Enqueue job + job = queue.enqueue( + import_photos_task, + request.folder_path, + request.recursive, + job_timeout="1h", # Allow up to 1 hour for large imports + ) + + return PhotoImportResponse( + job_id=job.id, + message=f"Photo import job queued for {request.folder_path}", + folder_path=request.folder_path, + estimated_photos=estimated_photos, + ) -@router.get("/{photo_id}") -def get_photo(photo_id: int) -> dict: - """Get photo by ID - placeholder for Phase 2.""" +@router.post("/import/upload") +async def upload_photos( + files: list[UploadFile] = File(...), + db: Session = Depends(get_db), +) -> dict: + """Upload photo files directly. + + This endpoint accepts file uploads and imports them immediately. + Files are saved to PHOTO_STORAGE_DIR before import. + For large batches, prefer the /import endpoint with folder_path. + """ + import os + import shutil + from pathlib import Path + + from src.web.settings import PHOTO_STORAGE_DIR + + # Ensure storage directory exists + storage_dir = Path(PHOTO_STORAGE_DIR) + storage_dir.mkdir(parents=True, exist_ok=True) + + added_count = 0 + existing_count = 0 + errors = [] + + for file in files: + try: + # Generate unique filename to avoid conflicts + import uuid + + file_ext = Path(file.filename).suffix + unique_filename = f"{uuid.uuid4()}{file_ext}" + stored_path = storage_dir / unique_filename + + # Save uploaded file to storage + content = await file.read() + with open(stored_path, "wb") as f: + f.write(content) + + # Import photo from stored location + photo, is_new = import_photo_from_path(db, str(stored_path)) + if is_new: + added_count += 1 + else: + existing_count += 1 + # If photo already exists, delete duplicate upload + if os.path.exists(stored_path): + os.remove(stored_path) + except Exception as e: + errors.append(f"Error uploading {file.filename}: {str(e)}") + return { - "message": f"Get photo {photo_id} - to be implemented in Phase 2", - "id": photo_id, + "message": f"Uploaded {len(files)} files", + "added": added_count, + "existing": existing_count, + "errors": errors, } + +@router.get("/{photo_id}", response_model=PhotoResponse) +def get_photo(photo_id: int, db: Session = Depends(get_db)) -> PhotoResponse: + """Get photo by ID.""" + from src.web.db.models import Photo + + photo = db.query(Photo).filter(Photo.id == photo_id).first() + if not photo: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Photo {photo_id} not found", + ) + + return PhotoResponse.model_validate(photo) + diff --git a/src/web/schemas/photos.py b/src/web/schemas/photos.py new file mode 100644 index 0000000..9101ffe --- /dev/null +++ b/src/web/schemas/photos.py @@ -0,0 +1,46 @@ +"""Photo schemas.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class PhotoImportRequest(BaseModel): + """Request to import photos from a folder or upload files.""" + + folder_path: Optional[str] = Field( + None, description="Path to folder to scan for photos" + ) + recursive: bool = Field( + True, description="Whether to scan subdirectories recursively" + ) + + +class PhotoResponse(BaseModel): + """Photo response schema.""" + + id: int + path: str + filename: str + checksum: Optional[str] = None + date_added: datetime + date_taken: Optional[datetime] = None + width: Optional[int] = None + height: Optional[int] = None + mime_type: Optional[str] = None + + class Config: + from_attributes = True + + +class PhotoImportResponse(BaseModel): + """Response after initiating photo import.""" + + job_id: str + message: str + folder_path: Optional[str] = None + estimated_photos: Optional[int] = None + diff --git a/src/web/services/photo_service.py b/src/web/services/photo_service.py new file mode 100644 index 0000000..178c47b --- /dev/null +++ b/src/web/services/photo_service.py @@ -0,0 +1,193 @@ +"""Photo import and management services.""" + +from __future__ import annotations + +import hashlib +import mimetypes +import os +from pathlib import Path +from datetime import datetime +from typing import Callable, Optional, Tuple + +from PIL import Image +from sqlalchemy.orm import Session + +from src.core.config import SUPPORTED_IMAGE_FORMATS +from src.web.db.models import Photo + + +def compute_checksum(file_path: str) -> str: + """Compute SHA256 checksum of a file.""" + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() + + +def extract_exif_date(image_path: str) -> Optional[datetime]: + """Extract date taken from photo EXIF data.""" + try: + with Image.open(image_path) as image: + exifdata = image.getexif() + + # Look for date taken in EXIF tags + date_tags = [ + 306, # DateTime + 36867, # DateTimeOriginal + 36868, # DateTimeDigitized + ] + + for tag_id in date_tags: + if tag_id in exifdata: + date_str = exifdata[tag_id] + if date_str: + # Parse EXIF date format (YYYY:MM:DD HH:MM:SS) + try: + return datetime.strptime(date_str, "%Y:%m:%d %H:%M:%S") + except ValueError: + # Try alternative format + try: + return datetime.strptime( + date_str, "%Y-%m-%d %H:%M:%S" + ) + except ValueError: + continue + except Exception: + pass + + return None + + +def get_image_metadata(image_path: str) -> Tuple[Optional[int], Optional[int], Optional[str]]: + """Get image dimensions and MIME type.""" + try: + with Image.open(image_path) as image: + width, height = image.size + mime_type = mimetypes.guess_type(image_path)[0] or f"image/{image.format.lower() if image.format else 'unknown'}" + return width, height, mime_type + except Exception: + return None, None, None + + +def find_photos_in_folder(folder_path: str, recursive: bool = True) -> list[str]: + """Find all photo files in a folder.""" + folder_path = os.path.abspath(folder_path) + if not os.path.isdir(folder_path): + return [] + + found_photos = [] + + if recursive: + for root, _dirs, files in os.walk(folder_path): + for file in files: + file_ext = Path(file).suffix.lower() + if file_ext in SUPPORTED_IMAGE_FORMATS: + photo_path = os.path.join(root, file) + found_photos.append(photo_path) + else: + for file in os.listdir(folder_path): + file_ext = Path(file).suffix.lower() + if file_ext in SUPPORTED_IMAGE_FORMATS: + photo_path = os.path.join(folder_path, file) + if os.path.isfile(photo_path): + found_photos.append(photo_path) + + return found_photos + + +def import_photo_from_path( + db: Session, photo_path: str, update_progress: Optional[Callable[[int, int, str], None]] = None +) -> Tuple[Optional[Photo], bool]: + """Import a single photo from file path into database. + + Returns: + Tuple of (Photo instance or None, is_new: bool) + """ + photo_path = os.path.abspath(photo_path) + filename = os.path.basename(photo_path) + + # Check if photo already exists by path + existing = db.query(Photo).filter(Photo.path == photo_path).first() + if existing: + return existing, False + + # Compute checksum + try: + checksum = compute_checksum(photo_path) + # Check if photo with same checksum exists + existing_by_checksum = ( + db.query(Photo).filter(Photo.checksum == checksum).first() + if checksum + else None + ) + if existing_by_checksum: + return existing_by_checksum, False + except Exception: + checksum = None + + # Extract metadata + date_taken = extract_exif_date(photo_path) + width, height, mime_type = get_image_metadata(photo_path) + + # Create new photo record + photo = Photo( + path=photo_path, + filename=filename, + checksum=checksum, + date_taken=date_taken, + width=width, + height=height, + mime_type=mime_type, + ) + + db.add(photo) + db.commit() + db.refresh(photo) + + return photo, True + + +def import_photos_from_folder( + db: Session, + folder_path: str, + recursive: bool = True, + update_progress: Optional[Callable[[int, int, str], None]] = None, +) -> Tuple[int, int]: + """Import all photos from a folder. + + Args: + db: Database session + folder_path: Path to folder to scan + recursive: Whether to scan subdirectories + update_progress: Optional callback(processed, total, current_file) + + Returns: + Tuple of (added_count, existing_count) + """ + found_photos = find_photos_in_folder(folder_path, recursive) + total = len(found_photos) + + if total == 0: + return 0, 0 + + added_count = 0 + existing_count = 0 + + for idx, photo_path in enumerate(found_photos, 1): + try: + photo, is_new = import_photo_from_path(db, photo_path) + if is_new: + added_count += 1 + else: + existing_count += 1 + + if update_progress: + update_progress(idx, total, os.path.basename(photo_path)) + except Exception: + # Log error but continue + if update_progress: + update_progress(idx, total, f"Error: {os.path.basename(photo_path)}") + + return added_count, existing_count + diff --git a/src/web/services/tasks.py b/src/web/services/tasks.py new file mode 100644 index 0000000..d192d34 --- /dev/null +++ b/src/web/services/tasks.py @@ -0,0 +1,74 @@ +"""RQ worker tasks for PunimTag.""" + +from __future__ import annotations + +from typing import Optional + +from rq import get_current_job +from sqlalchemy.orm import Session + +from src.web.db.session import SessionLocal +from src.web.services.photo_service import import_photos_from_folder + + +def import_photos_task(folder_path: str, recursive: bool = True) -> dict: + """RQ task to import photos from a folder. + + Updates job metadata with progress: + - progress: 0-100 + - message: status message + - processed: number of photos processed + - total: total photos found + - added: number of new photos added + - existing: number of photos that already existed + """ + job = get_current_job() + if not job: + raise RuntimeError("Not running in RQ job context") + + db: Session = SessionLocal() + + try: + def update_progress(processed: int, total: int, current_file: str) -> None: + """Update job progress.""" + if job: + progress = int((processed / total) * 100) if total > 0 else 0 + job.meta = { + "progress": progress, + "message": f"Processing {current_file}... ({processed}/{total})", + "processed": processed, + "total": total, + } + job.save_meta() + + # Import photos + added, existing = import_photos_from_folder( + db, folder_path, recursive, update_progress + ) + + # Final update + total_processed = added + existing + result = { + "folder_path": folder_path, + "recursive": recursive, + "added": added, + "existing": existing, + "total": total_processed, + } + + if job: + job.meta = { + "progress": 100, + "message": f"Completed: {added} new, {existing} existing", + "processed": total_processed, + "total": total_processed, + "added": added, + "existing": existing, + } + job.save_meta() + + return result + + finally: + db.close() + diff --git a/src/web/settings.py b/src/web/settings.py index a79b577..f38f653 100644 --- a/src/web/settings.py +++ b/src/web/settings.py @@ -1,6 +1,13 @@ +"""Application settings for PunimTag Web.""" + from __future__ import annotations +import os + APP_TITLE = "PunimTag Web API" APP_VERSION = "0.1.0" +# Photo storage settings +PHOTO_STORAGE_DIR = os.getenv("PHOTO_STORAGE_DIR", "data/uploads") + diff --git a/src/web/worker.py b/src/web/worker.py index 015f13c..791e8a5 100644 --- a/src/web/worker.py +++ b/src/web/worker.py @@ -1,20 +1,39 @@ +"""RQ worker entrypoint for PunimTag.""" + from __future__ import annotations import signal import sys from typing import NoReturn +from rq import Worker +from rq.connections import use_connection +from redis import Redis + +from src.web.services.tasks import import_photos_task + +# Redis connection for RQ +redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) + def main() -> NoReturn: - """Worker entrypoint placeholder (RQ/Celery to be wired).""" + """Worker entrypoint - starts RQ worker to process background jobs.""" def _handle_sigterm(_signum, _frame): sys.exit(0) signal.signal(signal.SIGTERM, _handle_sigterm) signal.signal(signal.SIGINT, _handle_sigterm) - # Placeholder: actual worker loop will be implemented in Phase 2. - signal.pause() + # Register tasks with worker + # Tasks are imported from services.tasks + worker = Worker( + ["default"], + connection=redis_conn, + name="punimtag-worker", + ) + + # Start worker + worker.work() if __name__ == "__main__":