from __future__ import annotations import os import subprocess import sys from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from sqlalchemy import inspect, text from src.web.api.auth import router as auth_router from src.web.api.faces import router as faces_router from src.web.api.health import router as health_router from src.web.api.jobs import router as jobs_router from src.web.api.metrics import router as metrics_router from src.web.api.people import router as people_router from src.web.api.pending_identifications import router as pending_identifications_router from src.web.api.pending_linkages import router as pending_linkages_router from src.web.api.photos import router as photos_router from src.web.api.reported_photos import router as reported_photos_router from src.web.api.pending_photos import router as pending_photos_router from src.web.api.tags import router as tags_router from src.web.api.users import router as users_router from src.web.api.auth_users import router as auth_users_router from src.web.api.role_permissions import router as role_permissions_router from src.web.api.videos import router as videos_router from src.web.api.version import router as version_router from src.web.settings import APP_TITLE, APP_VERSION from src.web.constants.roles import DEFAULT_ADMIN_ROLE, DEFAULT_USER_ROLE, ROLE_VALUES from src.web.db.base import Base, engine from src.web.db.session import database_url # Import models to ensure they're registered with Base.metadata from src.web.db import models # noqa: F401 from src.web.db.models import RolePermission from src.web.utils.password import hash_password # Global worker process (will be set in lifespan) _worker_process: subprocess.Popen | None = None def start_worker() -> None: """Start RQ worker in background subprocess.""" global _worker_process try: from redis import Redis # Check Redis connection first redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False) redis_conn.ping() # Start worker as a subprocess (avoids signal handler issues) project_root = Path(__file__).parent.parent.parent python_executable = sys.executable _worker_process = subprocess.Popen( [ python_executable, "-m", "src.web.worker", ], cwd=str(project_root), stdout=None, # Don't capture - let output go to console stderr=None, # Don't capture - let errors go to console env={ **{k: v for k, v in os.environ.items()}, "PYTHONPATH": str(project_root), } ) # Give it a moment to start, then check if it's still running import time time.sleep(0.5) if _worker_process.poll() is not None: # Process already exited - there was an error print(f"❌ Worker process exited immediately with code {_worker_process.returncode}") print(" Check worker errors above") else: print(f"✅ RQ worker started in background subprocess (PID: {_worker_process.pid})") except Exception as e: print(f"⚠️ Failed to start RQ worker: {e}") print(" Background jobs will not be processed. Ensure Redis is running.") def stop_worker() -> None: """Stop RQ worker gracefully.""" global _worker_process if _worker_process: try: _worker_process.terminate() try: _worker_process.wait(timeout=5) except subprocess.TimeoutExpired: _worker_process.kill() print("✅ RQ worker stopped") except Exception: pass def ensure_user_password_hash_column(inspector) -> None: """Ensure users table contains password_hash column.""" if "users" not in inspector.get_table_names(): print("ℹ️ Users table does not exist yet - will be created with password_hash column") return columns = {column["name"] for column in inspector.get_columns("users")} if "password_hash" in columns: print("ℹ️ password_hash column already exists in users table") return print("🔄 Adding password_hash column to users table...") default_hash = hash_password("changeme") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": # PostgreSQL: Add column as nullable first, then update, then set NOT NULL connection.execute( text("ALTER TABLE users ADD COLUMN IF NOT EXISTS password_hash TEXT") ) connection.execute( text( "UPDATE users SET password_hash = :default_hash " "WHERE password_hash IS NULL OR password_hash = ''" ), {"default_hash": default_hash}, ) # Set NOT NULL constraint connection.execute( text("ALTER TABLE users ALTER COLUMN password_hash SET NOT NULL") ) else: # SQLite connection.execute( text("ALTER TABLE users ADD COLUMN password_hash TEXT") ) connection.execute( text( "UPDATE users SET password_hash = :default_hash " "WHERE password_hash IS NULL OR password_hash = ''" ), {"default_hash": default_hash}, ) print("✅ Added password_hash column to users table (default password: changeme)") def ensure_user_password_change_required_column(inspector) -> None: """Ensure users table contains password_change_required column.""" if "users" not in inspector.get_table_names(): return columns = {column["name"] for column in inspector.get_columns("users")} if "password_change_required" in columns: print("ℹ️ password_change_required column already exists in users table") return print("🔄 Adding password_change_required column to users table...") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": connection.execute( text("ALTER TABLE users ADD COLUMN IF NOT EXISTS password_change_required BOOLEAN NOT NULL DEFAULT true") ) else: # SQLite connection.execute( text("ALTER TABLE users ADD COLUMN password_change_required BOOLEAN DEFAULT 1") ) connection.execute( text("UPDATE users SET password_change_required = 1 WHERE password_change_required IS NULL") ) print("✅ Added password_change_required column to users table") def ensure_user_email_unique_constraint(inspector) -> None: """Ensure users table email column has a unique constraint.""" if "users" not in inspector.get_table_names(): return # Check if email column exists columns = {col["name"] for col in inspector.get_columns("users")} if "email" not in columns: print("ℹ️ email column does not exist in users table yet") return # Check if unique constraint already exists on email dialect = engine.dialect.name with engine.connect() as connection: if dialect == "postgresql": # Check if unique constraint exists result = connection.execute(text(""" SELECT constraint_name FROM information_schema.table_constraints WHERE table_name = 'users' AND constraint_type = 'UNIQUE' AND constraint_name LIKE '%email%' """)) if result.first(): print("ℹ️ Unique constraint on email column already exists") return # Try to add unique constraint (will fail if duplicates exist) try: print("🔄 Adding unique constraint to email column...") connection.execute(text("ALTER TABLE users ADD CONSTRAINT uq_users_email UNIQUE (email)")) connection.commit() print("✅ Added unique constraint to email column") except Exception as e: # If constraint already exists or duplicates exist, that's okay # API validation will prevent new duplicates if "already exists" in str(e).lower() or "duplicate" in str(e).lower(): print(f"ℹ️ Could not add unique constraint (may have duplicates): {e}") else: print(f"⚠️ Could not add unique constraint: {e}") else: # SQLite - unique constraint is handled at column level # Check if column already has unique constraint # SQLite doesn't easily support adding unique constraints to existing columns # The model definition will handle it for new tables print("ℹ️ SQLite: Unique constraint on email will be enforced by model definition for new tables") def ensure_face_identified_by_user_id_column(inspector) -> None: """Ensure faces table contains identified_by_user_id column.""" if "faces" not in inspector.get_table_names(): return columns = {column["name"] for column in inspector.get_columns("faces")} if "identified_by_user_id" in columns: print("ℹ️ identified_by_user_id column already exists in faces table") return print("🔄 Adding identified_by_user_id column to faces table...") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": connection.execute( text("ALTER TABLE faces ADD COLUMN IF NOT EXISTS identified_by_user_id INTEGER REFERENCES users(id)") ) # Add index try: connection.execute( text("CREATE INDEX IF NOT EXISTS idx_faces_identified_by ON faces(identified_by_user_id)") ) except Exception: pass # Index might already exist else: # SQLite connection.execute( text("ALTER TABLE faces ADD COLUMN identified_by_user_id INTEGER REFERENCES users(id)") ) # SQLite doesn't support IF NOT EXISTS for indexes, so we'll try to create it try: connection.execute( text("CREATE INDEX idx_faces_identified_by ON faces(identified_by_user_id)") ) except Exception: pass # Index might already exist print("✅ Added identified_by_user_id column to faces table") def ensure_user_role_column(inspector) -> None: """Ensure users table has a role column with valid values.""" if "users" not in inspector.get_table_names(): return columns = {column["name"] for column in inspector.get_columns("users")} dialect = engine.dialect.name role_values = sorted(ROLE_VALUES) placeholder_parts = ", ".join( f":role_value_{index}" for index, _ in enumerate(role_values) ) where_clause = ( "role IS NULL OR role = ''" if not placeholder_parts else f"role IS NULL OR role = '' OR role NOT IN ({placeholder_parts})" ) params = { f"role_value_{index}": value for index, value in enumerate(role_values) } params["admin_role"] = DEFAULT_ADMIN_ROLE params["default_role"] = DEFAULT_USER_ROLE with engine.connect() as connection: with connection.begin(): if "role" not in columns: if dialect == "postgresql": connection.execute( text( f"ALTER TABLE users ADD COLUMN IF NOT EXISTS role TEXT " f"NOT NULL DEFAULT '{DEFAULT_USER_ROLE}'" ) ) else: connection.execute( text( f"ALTER TABLE users ADD COLUMN role TEXT " f"DEFAULT '{DEFAULT_USER_ROLE}'" ) ) connection.execute( text( f""" UPDATE users SET role = CASE WHEN is_admin THEN :admin_role ELSE :default_role END WHERE {where_clause} """ ), params, ) connection.execute( text("CREATE INDEX IF NOT EXISTS idx_users_role ON users(role)") ) print("✅ Ensured users.role column exists and is populated") def ensure_photo_media_type_column(inspector) -> None: """Ensure photos table contains media_type column.""" if "photos" not in inspector.get_table_names(): return columns = {column["name"] for column in inspector.get_columns("photos")} if "media_type" in columns: print("ℹ️ media_type column already exists in photos table") return print("🔄 Adding media_type column to photos table...") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": connection.execute( text("ALTER TABLE photos ADD COLUMN IF NOT EXISTS media_type TEXT NOT NULL DEFAULT 'image'") ) # Add index try: connection.execute( text("CREATE INDEX IF NOT EXISTS idx_photos_media_type ON photos(media_type)") ) except Exception: pass # Index might already exist else: # SQLite connection.execute( text("ALTER TABLE photos ADD COLUMN media_type TEXT DEFAULT 'image'") ) # Update existing rows to have 'image' as default connection.execute( text("UPDATE photos SET media_type = 'image' WHERE media_type IS NULL") ) # SQLite doesn't support IF NOT EXISTS for indexes, so we'll try to create it try: connection.execute( text("CREATE INDEX idx_photos_media_type ON photos(media_type)") ) except Exception: pass # Index might already exist print("✅ Added media_type column to photos table") def ensure_face_excluded_column(inspector) -> None: """Ensure faces table contains excluded column.""" if "faces" not in inspector.get_table_names(): print("ℹ️ Faces table does not exist yet - will be created with excluded column") return columns = {column["name"] for column in inspector.get_columns("faces")} if "excluded" in columns: print("ℹ️ excluded column already exists in faces table") return print("🔄 Adding excluded column to faces table...") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": # PostgreSQL: Add column with default value connection.execute( text("ALTER TABLE faces ADD COLUMN IF NOT EXISTS excluded BOOLEAN DEFAULT FALSE NOT NULL") ) # Create index try: connection.execute( text("CREATE INDEX IF NOT EXISTS idx_faces_excluded ON faces(excluded)") ) except Exception: pass # Index might already exist else: # SQLite connection.execute( text("ALTER TABLE faces ADD COLUMN excluded BOOLEAN DEFAULT 0 NOT NULL") ) # Create index try: connection.execute( text("CREATE INDEX idx_faces_excluded ON faces(excluded)") ) except Exception: pass # Index might already exist print("✅ Added excluded column to faces table") def ensure_photo_person_linkage_table(inspector) -> None: """Ensure photo_person_linkage table exists for direct video-person associations.""" if "photo_person_linkage" in inspector.get_table_names(): print("ℹ️ photo_person_linkage table already exists") return print("🔄 Creating photo_person_linkage table...") dialect = engine.dialect.name with engine.connect() as connection: with connection.begin(): if dialect == "postgresql": connection.execute(text(""" CREATE TABLE IF NOT EXISTS photo_person_linkage ( id SERIAL PRIMARY KEY, photo_id INTEGER NOT NULL REFERENCES photos(id) ON DELETE CASCADE, person_id INTEGER NOT NULL REFERENCES people(id) ON DELETE CASCADE, identified_by_user_id INTEGER REFERENCES users(id), created_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(photo_id, person_id) ) """)) # Create indexes for idx_name, idx_col in [ ("idx_photo_person_photo", "photo_id"), ("idx_photo_person_person", "person_id"), ("idx_photo_person_user", "identified_by_user_id"), ]: try: connection.execute( text(f"CREATE INDEX IF NOT EXISTS {idx_name} ON photo_person_linkage({idx_col})") ) except Exception: pass # Index might already exist else: # SQLite connection.execute(text(""" CREATE TABLE IF NOT EXISTS photo_person_linkage ( id INTEGER PRIMARY KEY AUTOINCREMENT, photo_id INTEGER NOT NULL REFERENCES photos(id) ON DELETE CASCADE, person_id INTEGER NOT NULL REFERENCES people(id) ON DELETE CASCADE, identified_by_user_id INTEGER REFERENCES users(id), created_date DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, UNIQUE(photo_id, person_id) ) """)) # Create indexes for idx_name, idx_col in [ ("idx_photo_person_photo", "photo_id"), ("idx_photo_person_person", "person_id"), ("idx_photo_person_user", "identified_by_user_id"), ]: try: connection.execute( text(f"CREATE INDEX {idx_name} ON photo_person_linkage({idx_col})") ) except Exception: pass # Index might already exist print("✅ Created photo_person_linkage table") def ensure_role_permissions_table(inspector) -> None: """Ensure the role_permissions table exists for permission matrix.""" if "role_permissions" in inspector.get_table_names(): return try: print("🔄 Creating role_permissions table...") RolePermission.__table__.create(bind=engine, checkfirst=True) print("✅ Created role_permissions table") except Exception as exc: print(f"⚠️ Failed to create role_permissions table: {exc}") @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events.""" # Ensure database exists and tables are created on first run try: if database_url.startswith("sqlite"): db_path = database_url.replace("sqlite:///", "") db_file = Path(db_path) db_file.parent.mkdir(parents=True, exist_ok=True) # Only create tables if they don't already exist (safety check) inspector = inspect(engine) existing_tables = set(inspector.get_table_names()) # Check if required application tables exist (not just alembic_version) required_tables = {"photos", "people", "faces", "tags", "phototaglinkage", "person_encodings", "photo_favorites", "users", "photo_person_linkage"} missing_tables = required_tables - existing_tables if missing_tables: # Some required tables are missing - create all tables # create_all() only creates missing tables, won't drop existing ones Base.metadata.create_all(bind=engine) if len(missing_tables) == len(required_tables): print("✅ Database initialized (first run - tables created)") else: print(f"✅ Database tables created (missing tables: {', '.join(missing_tables)})") else: # All required tables exist - don't recreate (prevents data loss) print(f"✅ Database already initialized ({len(existing_tables)} tables exist)") # Ensure new columns exist (backward compatibility without migrations) ensure_user_password_hash_column(inspector) ensure_user_password_change_required_column(inspector) ensure_user_email_unique_constraint(inspector) ensure_face_identified_by_user_id_column(inspector) ensure_user_role_column(inspector) ensure_photo_media_type_column(inspector) ensure_photo_person_linkage_table(inspector) ensure_face_excluded_column(inspector) ensure_role_permissions_table(inspector) except Exception as exc: print(f"❌ Database initialization failed: {exc}") raise # Startup start_worker() yield # Shutdown stop_worker() def create_app() -> FastAPI: """Create and configure the FastAPI application instance.""" app = FastAPI( title=APP_TITLE, version=APP_VERSION, lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(health_router, tags=["health"]) app.include_router(version_router, tags=["meta"]) app.include_router(metrics_router, tags=["metrics"]) app.include_router(auth_router, prefix="/api/v1") app.include_router(jobs_router, prefix="/api/v1") app.include_router(photos_router, prefix="/api/v1") app.include_router(faces_router, prefix="/api/v1") app.include_router(people_router, prefix="/api/v1") app.include_router(videos_router, prefix="/api/v1") app.include_router(pending_identifications_router, prefix="/api/v1") app.include_router(pending_linkages_router, prefix="/api/v1") app.include_router(reported_photos_router, prefix="/api/v1") app.include_router(pending_photos_router, prefix="/api/v1") app.include_router(tags_router, prefix="/api/v1") app.include_router(users_router, prefix="/api/v1") app.include_router(auth_users_router, prefix="/api/v1") app.include_router(role_permissions_router, prefix="/api/v1") return app app = create_app()