"""RQ worker entrypoint for PunimTag.""" from __future__ import annotations import signal import sys from pathlib import Path from typing import NoReturn import uuid from dotenv import load_dotenv # Load environment variables from .env file before importing anything that needs them # Path calculation: backend/worker.py -> backend/ -> punimtag/ -> .env env_path = Path(__file__).parent.parent / ".env" env_path = env_path.resolve() # Make absolute path if env_path.exists(): load_dotenv(dotenv_path=env_path, override=True) from rq import Worker from redis import Redis from backend.services.tasks import import_photos_task, process_faces_task from backend.db.session import auth_engine from sqlalchemy import text # Redis connection for RQ redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) def setup_auth_database_tables() -> None: """Create all necessary tables in the auth database for both frontends.""" if auth_engine is None: print("[Worker] ⚠️ Auth database not configured (DATABASE_URL_AUTH not set), skipping table creation") return try: print("[Worker] 🗃️ Setting up auth database tables...") with auth_engine.connect() as conn: # Create users table conn.execute(text(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, name VARCHAR(255) NOT NULL, password_hash VARCHAR(255) NOT NULL, is_admin BOOLEAN DEFAULT FALSE, has_write_access BOOLEAN DEFAULT FALSE, email_verified BOOLEAN DEFAULT FALSE, email_confirmation_token VARCHAR(255) UNIQUE, email_confirmation_token_expiry TIMESTAMP, password_reset_token VARCHAR(255) UNIQUE, password_reset_token_expiry TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """)) # Add missing columns if table already exists for col_def in [ "ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin BOOLEAN DEFAULT FALSE", "ALTER TABLE users ADD COLUMN IF NOT EXISTS has_write_access BOOLEAN DEFAULT FALSE", "ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN DEFAULT FALSE", "ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token VARCHAR(255)", "ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token_expiry TIMESTAMP", "ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token VARCHAR(255)", "ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token_expiry TIMESTAMP", "ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active BOOLEAN DEFAULT TRUE", ]: try: conn.execute(text(col_def)) except Exception: pass # Column might already exist or error is expected # Create unique indexes for nullable columns conn.execute(text(""" CREATE UNIQUE INDEX IF NOT EXISTS users_email_confirmation_token_key ON users(email_confirmation_token) WHERE email_confirmation_token IS NOT NULL; """)) conn.execute(text(""" CREATE UNIQUE INDEX IF NOT EXISTS users_password_reset_token_key ON users(password_reset_token) WHERE password_reset_token IS NOT NULL; """)) # Create pending_identifications table conn.execute(text(""" CREATE TABLE IF NOT EXISTS pending_identifications ( id SERIAL PRIMARY KEY, face_id INTEGER NOT NULL, user_id INTEGER NOT NULL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, middle_name VARCHAR(255), maiden_name VARCHAR(255), date_of_birth DATE, status VARCHAR(50) DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); """)) # Create indexes for pending_identifications conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_face_id ON pending_identifications(face_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_user_id ON pending_identifications(user_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_status ON pending_identifications(status);")) # Create pending_photos table conn.execute(text(""" CREATE TABLE IF NOT EXISTS pending_photos ( id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, filename VARCHAR(255) NOT NULL, original_filename VARCHAR(255) NOT NULL, file_path VARCHAR(512) NOT NULL, file_size INTEGER NOT NULL, mime_type VARCHAR(100) NOT NULL, status VARCHAR(50) DEFAULT 'pending', submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, reviewed_at TIMESTAMP, reviewed_by INTEGER, rejection_reason TEXT, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); """)) # Create indexes for pending_photos conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_user_id ON pending_photos(user_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_status ON pending_photos(status);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_submitted_at ON pending_photos(submitted_at);")) # Create inappropriate_photo_reports table conn.execute(text(""" CREATE TABLE IF NOT EXISTS inappropriate_photo_reports ( id SERIAL PRIMARY KEY, photo_id INTEGER NOT NULL, user_id INTEGER NOT NULL, status VARCHAR(50) DEFAULT 'pending', reported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, reviewed_at TIMESTAMP, reviewed_by INTEGER, review_notes TEXT, report_comment TEXT, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, CONSTRAINT uq_photo_user_report UNIQUE (photo_id, user_id) ); """)) # Create indexes for inappropriate_photo_reports conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_photo_id ON inappropriate_photo_reports(photo_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_user_id ON inappropriate_photo_reports(user_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_status ON inappropriate_photo_reports(status);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_reported_at ON inappropriate_photo_reports(reported_at);")) # Create pending_linkages table conn.execute(text(""" CREATE TABLE IF NOT EXISTS pending_linkages ( id SERIAL PRIMARY KEY, photo_id INTEGER NOT NULL, tag_id INTEGER, tag_name VARCHAR(255), user_id INTEGER NOT NULL, status VARCHAR(50) DEFAULT 'pending', notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); """)) # Create indexes for pending_linkages conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_photo_id ON pending_linkages(photo_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_tag_id ON pending_linkages(tag_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_user_id ON pending_linkages(user_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_status ON pending_linkages(status);")) # Create photo_favorites table conn.execute(text(""" CREATE TABLE IF NOT EXISTS photo_favorites ( id SERIAL PRIMARY KEY, photo_id INTEGER NOT NULL, user_id INTEGER NOT NULL, favorited_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, CONSTRAINT uq_photo_user_favorite UNIQUE (photo_id, user_id) ); """)) # Create indexes for photo_favorites conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_photo_id ON photo_favorites(photo_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_user_id ON photo_favorites(user_id);")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_favorited_at ON photo_favorites(favorited_at);")) conn.commit() print("[Worker] ✅ Auth database tables created/verified successfully") except Exception as e: print(f"[Worker] ⚠️ Failed to create auth database tables: {e}") print("[Worker] Tables may already exist or database may not be accessible") # Don't exit - worker can still function without auth tables def main() -> NoReturn: """Worker entrypoint - starts RQ worker to process background jobs.""" def _handle_sigterm(_signum, _frame): sys.exit(0) signal.signal(signal.SIGTERM, _handle_sigterm) signal.signal(signal.SIGINT, _handle_sigterm) # Generate unique worker name to avoid conflicts worker_name = f"punimtag-worker-{uuid.uuid4().hex[:8]}" print(f"[Worker] Starting worker: {worker_name}") print(f"[Worker] Listening on queue: default") # Setup auth database tables for both frontends setup_auth_database_tables() # Check if Redis is accessible try: redis_conn.ping() print(f"[Worker] Redis connection successful") except Exception as e: print(f"[Worker] ❌ Redis connection failed: {e}") sys.exit(1) # Register tasks with worker # Tasks are imported from services.tasks worker = Worker( ["default"], connection=redis_conn, name=worker_name, ) print(f"[Worker] ✅ Worker ready, waiting for jobs...") # Start worker worker.work() if __name__ == "__main__": main()