This commit improves the setup of the authentication database by adding a new function to create necessary tables for both frontends. It also ensures that environment variables are loaded from a `.env` file before any database operations, enhancing configuration management. Additionally, minor updates are made to related scripts for better clarity and functionality.
250 lines
11 KiB
Python
250 lines
11 KiB
Python
"""RQ worker entrypoint for PunimTag."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import signal
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import NoReturn
|
|
|
|
import uuid
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file before importing anything that needs them
|
|
# Path calculation: backend/worker.py -> backend/ -> punimtag/ -> .env
|
|
env_path = Path(__file__).parent.parent / ".env"
|
|
env_path = env_path.resolve() # Make absolute path
|
|
if env_path.exists():
|
|
load_dotenv(dotenv_path=env_path, override=True)
|
|
|
|
from rq import Worker
|
|
from redis import Redis
|
|
|
|
from backend.services.tasks import import_photos_task, process_faces_task
|
|
from backend.db.session import auth_engine
|
|
from sqlalchemy import text
|
|
|
|
# Redis connection for RQ
|
|
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
|
|
|
|
|
def setup_auth_database_tables() -> None:
|
|
"""Create all necessary tables in the auth database for both frontends."""
|
|
if auth_engine is None:
|
|
print("[Worker] ⚠️ Auth database not configured (DATABASE_URL_AUTH not set), skipping table creation")
|
|
return
|
|
|
|
try:
|
|
print("[Worker] 🗃️ Setting up auth database tables...")
|
|
|
|
with auth_engine.connect() as conn:
|
|
# Create users table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id SERIAL PRIMARY KEY,
|
|
email VARCHAR(255) UNIQUE NOT NULL,
|
|
name VARCHAR(255) NOT NULL,
|
|
password_hash VARCHAR(255) NOT NULL,
|
|
is_admin BOOLEAN DEFAULT FALSE,
|
|
has_write_access BOOLEAN DEFAULT FALSE,
|
|
email_verified BOOLEAN DEFAULT FALSE,
|
|
email_confirmation_token VARCHAR(255) UNIQUE,
|
|
email_confirmation_token_expiry TIMESTAMP,
|
|
password_reset_token VARCHAR(255) UNIQUE,
|
|
password_reset_token_expiry TIMESTAMP,
|
|
is_active BOOLEAN DEFAULT TRUE,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
"""))
|
|
|
|
# Add missing columns if table already exists
|
|
for col_def in [
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_admin BOOLEAN DEFAULT FALSE",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS has_write_access BOOLEAN DEFAULT FALSE",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_verified BOOLEAN DEFAULT FALSE",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token VARCHAR(255)",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS email_confirmation_token_expiry TIMESTAMP",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token VARCHAR(255)",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_token_expiry TIMESTAMP",
|
|
"ALTER TABLE users ADD COLUMN IF NOT EXISTS is_active BOOLEAN DEFAULT TRUE",
|
|
]:
|
|
try:
|
|
conn.execute(text(col_def))
|
|
except Exception:
|
|
pass # Column might already exist or error is expected
|
|
|
|
# Create unique indexes for nullable columns
|
|
conn.execute(text("""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS users_email_confirmation_token_key
|
|
ON users(email_confirmation_token)
|
|
WHERE email_confirmation_token IS NOT NULL;
|
|
"""))
|
|
conn.execute(text("""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS users_password_reset_token_key
|
|
ON users(password_reset_token)
|
|
WHERE password_reset_token IS NOT NULL;
|
|
"""))
|
|
|
|
# Create pending_identifications table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS pending_identifications (
|
|
id SERIAL PRIMARY KEY,
|
|
face_id INTEGER NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
first_name VARCHAR(255) NOT NULL,
|
|
last_name VARCHAR(255) NOT NULL,
|
|
middle_name VARCHAR(255),
|
|
maiden_name VARCHAR(255),
|
|
date_of_birth DATE,
|
|
status VARCHAR(50) DEFAULT 'pending',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
"""))
|
|
|
|
# Create indexes for pending_identifications
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_face_id ON pending_identifications(face_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_user_id ON pending_identifications(user_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_identifications_status ON pending_identifications(status);"))
|
|
|
|
# Create pending_photos table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS pending_photos (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id INTEGER NOT NULL,
|
|
filename VARCHAR(255) NOT NULL,
|
|
original_filename VARCHAR(255) NOT NULL,
|
|
file_path VARCHAR(512) NOT NULL,
|
|
file_size INTEGER NOT NULL,
|
|
mime_type VARCHAR(100) NOT NULL,
|
|
status VARCHAR(50) DEFAULT 'pending',
|
|
submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
reviewed_at TIMESTAMP,
|
|
reviewed_by INTEGER,
|
|
rejection_reason TEXT,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
"""))
|
|
|
|
# Create indexes for pending_photos
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_user_id ON pending_photos(user_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_status ON pending_photos(status);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_photos_submitted_at ON pending_photos(submitted_at);"))
|
|
|
|
# Create inappropriate_photo_reports table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS inappropriate_photo_reports (
|
|
id SERIAL PRIMARY KEY,
|
|
photo_id INTEGER NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
status VARCHAR(50) DEFAULT 'pending',
|
|
reported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
reviewed_at TIMESTAMP,
|
|
reviewed_by INTEGER,
|
|
review_notes TEXT,
|
|
report_comment TEXT,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
|
CONSTRAINT uq_photo_user_report UNIQUE (photo_id, user_id)
|
|
);
|
|
"""))
|
|
|
|
# Create indexes for inappropriate_photo_reports
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_photo_id ON inappropriate_photo_reports(photo_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_user_id ON inappropriate_photo_reports(user_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_status ON inappropriate_photo_reports(status);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_inappropriate_photo_reports_reported_at ON inappropriate_photo_reports(reported_at);"))
|
|
|
|
# Create pending_linkages table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS pending_linkages (
|
|
id SERIAL PRIMARY KEY,
|
|
photo_id INTEGER NOT NULL,
|
|
tag_id INTEGER,
|
|
tag_name VARCHAR(255),
|
|
user_id INTEGER NOT NULL,
|
|
status VARCHAR(50) DEFAULT 'pending',
|
|
notes TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
"""))
|
|
|
|
# Create indexes for pending_linkages
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_photo_id ON pending_linkages(photo_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_tag_id ON pending_linkages(tag_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_user_id ON pending_linkages(user_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_pending_linkages_status ON pending_linkages(status);"))
|
|
|
|
# Create photo_favorites table
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS photo_favorites (
|
|
id SERIAL PRIMARY KEY,
|
|
photo_id INTEGER NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
favorited_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
|
CONSTRAINT uq_photo_user_favorite UNIQUE (photo_id, user_id)
|
|
);
|
|
"""))
|
|
|
|
# Create indexes for photo_favorites
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_photo_id ON photo_favorites(photo_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_user_id ON photo_favorites(user_id);"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_photo_favorites_favorited_at ON photo_favorites(favorited_at);"))
|
|
|
|
conn.commit()
|
|
|
|
print("[Worker] ✅ Auth database tables created/verified successfully")
|
|
except Exception as e:
|
|
print(f"[Worker] ⚠️ Failed to create auth database tables: {e}")
|
|
print("[Worker] Tables may already exist or database may not be accessible")
|
|
# Don't exit - worker can still function without auth tables
|
|
|
|
|
|
def main() -> NoReturn:
|
|
"""Worker entrypoint - starts RQ worker to process background jobs."""
|
|
def _handle_sigterm(_signum, _frame):
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, _handle_sigterm)
|
|
signal.signal(signal.SIGINT, _handle_sigterm)
|
|
|
|
# Generate unique worker name to avoid conflicts
|
|
worker_name = f"punimtag-worker-{uuid.uuid4().hex[:8]}"
|
|
|
|
print(f"[Worker] Starting worker: {worker_name}")
|
|
print(f"[Worker] Listening on queue: default")
|
|
|
|
# Setup auth database tables for both frontends
|
|
setup_auth_database_tables()
|
|
|
|
# Check if Redis is accessible
|
|
try:
|
|
redis_conn.ping()
|
|
print(f"[Worker] Redis connection successful")
|
|
except Exception as e:
|
|
print(f"[Worker] ❌ Redis connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
# Register tasks with worker
|
|
# Tasks are imported from services.tasks
|
|
worker = Worker(
|
|
["default"],
|
|
connection=redis_conn,
|
|
name=worker_name,
|
|
)
|
|
|
|
print(f"[Worker] ✅ Worker ready, waiting for jobs...")
|
|
|
|
# Start worker
|
|
worker.work()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|