punimtag/backend/worker.py
Tanya 32be5c7f23 feat: Enhance auth database setup and environment variable loading
This commit improves the setup of the authentication database by adding a new function to create necessary tables for both frontends. It also ensures that environment variables are loaded from a `.env` file before any database operations, enhancing configuration management. Additionally, minor updates are made to related scripts for better clarity and functionality.
2026-01-02 13:28:07 -05:00

250 lines
11 KiB
Python

"""RQ worker entrypoint for PunimTag."""
from __future__ import annotations
import signal
import sys
from pathlib import Path
from typing import NoReturn
import uuid
from dotenv import load_dotenv
# Load environment variables from .env file before importing anything that needs them
# Path calculation: backend/worker.py -> backend/ -> punimtag/ -> .env
env_path = Path(__file__).parent.parent / ".env"
env_path = env_path.resolve() # Make absolute path
if env_path.exists():
load_dotenv(dotenv_path=env_path, override=True)
from rq import Worker
from redis import Redis
from backend.services.tasks import import_photos_task, process_faces_task
from backend.db.session import auth_engine
from sqlalchemy import text
# Redis connection for RQ
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
def setup_auth_database_tables() -> None:
"""Create all necessary tables in the auth database for both frontends."""
if auth_engine is None:
print("[Worker] ⚠️ Auth database not configured (DATABASE_URL_AUTH not set), skipping table creation")
return
try:
print("[Worker] 🗃️ Setting up auth database tables...")
with auth_engine.connect() as conn:
# Create users table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_admin BOOLEAN DEFAULT FALSE,
has_write_access BOOLEAN DEFAULT FALSE,
email_verified BOOLEAN DEFAULT FALSE,
email_confirmation_token VARCHAR(255) UNIQUE,
email_confirmation_token_expiry TIMESTAMP,
password_reset_token VARCHAR(255) UNIQUE,
password_reset_token_expiry TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""))
# Add missing columns if table already exists
for col_def in [
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin BOOLEAN DEFAULT FALSE",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS has_write_access BOOLEAN DEFAULT FALSE",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN DEFAULT FALSE",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token VARCHAR(255)",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token_expiry TIMESTAMP",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token VARCHAR(255)",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token_expiry TIMESTAMP",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active BOOLEAN DEFAULT TRUE",
]:
try:
conn.execute(text(col_def))
except Exception:
pass # Column might already exist or error is expected
# Create unique indexes for nullable columns
conn.execute(text("""
CREATE UNIQUE INDEX IF NOT EXISTS users_email_confirmation_token_key
ON users(email_confirmation_token)
WHERE email_confirmation_token IS NOT NULL;
"""))
conn.execute(text("""
CREATE UNIQUE INDEX IF NOT EXISTS users_password_reset_token_key
ON users(password_reset_token)
WHERE password_reset_token IS NOT NULL;
"""))
# Create pending_identifications table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS pending_identifications (
id SERIAL PRIMARY KEY,
face_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
middle_name VARCHAR(255),
maiden_name VARCHAR(255),
date_of_birth DATE,
status VARCHAR(50) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
"""))
# Create indexes for pending_identifications
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_face_id ON pending_identifications(face_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_user_id ON pending_identifications(user_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_status ON pending_identifications(status);"))
# Create pending_photos table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS pending_photos (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
filename VARCHAR(255) NOT NULL,
original_filename VARCHAR(255) NOT NULL,
file_path VARCHAR(512) NOT NULL,
file_size INTEGER NOT NULL,
mime_type VARCHAR(100) NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reviewed_at TIMESTAMP,
reviewed_by INTEGER,
rejection_reason TEXT,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
"""))
# Create indexes for pending_photos
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_user_id ON pending_photos(user_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_status ON pending_photos(status);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_submitted_at ON pending_photos(submitted_at);"))
# Create inappropriate_photo_reports table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS inappropriate_photo_reports (
id SERIAL PRIMARY KEY,
photo_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
reported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
reviewed_at TIMESTAMP,
reviewed_by INTEGER,
review_notes TEXT,
report_comment TEXT,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
CONSTRAINT uq_photo_user_report UNIQUE (photo_id, user_id)
);
"""))
# Create indexes for inappropriate_photo_reports
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_photo_id ON inappropriate_photo_reports(photo_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_user_id ON inappropriate_photo_reports(user_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_status ON inappropriate_photo_reports(status);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_reported_at ON inappropriate_photo_reports(reported_at);"))
# Create pending_linkages table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS pending_linkages (
id SERIAL PRIMARY KEY,
photo_id INTEGER NOT NULL,
tag_id INTEGER,
tag_name VARCHAR(255),
user_id INTEGER NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
"""))
# Create indexes for pending_linkages
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_photo_id ON pending_linkages(photo_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_tag_id ON pending_linkages(tag_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_user_id ON pending_linkages(user_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_status ON pending_linkages(status);"))
# Create photo_favorites table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS photo_favorites (
id SERIAL PRIMARY KEY,
photo_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
favorited_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
CONSTRAINT uq_photo_user_favorite UNIQUE (photo_id, user_id)
);
"""))
# Create indexes for photo_favorites
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_photo_id ON photo_favorites(photo_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_user_id ON photo_favorites(user_id);"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_favorited_at ON photo_favorites(favorited_at);"))
conn.commit()
print("[Worker] ✅ Auth database tables created/verified successfully")
except Exception as e:
print(f"[Worker] ⚠️ Failed to create auth database tables: {e}")
print("[Worker] Tables may already exist or database may not be accessible")
# Don't exit - worker can still function without auth tables
def main() -> NoReturn:
"""Worker entrypoint - starts RQ worker to process background jobs."""
def _handle_sigterm(_signum, _frame):
sys.exit(0)
signal.signal(signal.SIGTERM, _handle_sigterm)
signal.signal(signal.SIGINT, _handle_sigterm)
# Generate unique worker name to avoid conflicts
worker_name = f"punimtag-worker-{uuid.uuid4().hex[:8]}"
print(f"[Worker] Starting worker: {worker_name}")
print(f"[Worker] Listening on queue: default")
# Setup auth database tables for both frontends
setup_auth_database_tables()
# Check if Redis is accessible
try:
redis_conn.ping()
print(f"[Worker] Redis connection successful")
except Exception as e:
print(f"[Worker] ❌ Redis connection failed: {e}")
sys.exit(1)
# Register tasks with worker
# Tasks are imported from services.tasks
worker = Worker(
["default"],
connection=redis_conn,
name=worker_name,
)
print(f"[Worker] ✅ Worker ready, waiting for jobs...")
# Start worker
worker.work()
if __name__ == "__main__":
main()