This commit introduces a new API for managing pending tag linkages, allowing admins to review and approve or deny user-suggested tags. The frontend has been updated with a new User Tagged Photos page for displaying pending linkages, including options for filtering and submitting decisions. Additionally, the Layout component has been modified to include navigation to the new page. Documentation has been updated to reflect these changes.
426 lines
17 KiB
Python
426 lines
17 KiB
Python
from __future__ import annotations
|
||
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from contextlib import asynccontextmanager
|
||
from pathlib import Path
|
||
|
||
from fastapi import FastAPI
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from sqlalchemy import inspect, text
|
||
|
||
from src.web.api.auth import router as auth_router
|
||
from src.web.api.faces import router as faces_router
|
||
from src.web.api.health import router as health_router
|
||
from src.web.api.jobs import router as jobs_router
|
||
from src.web.api.metrics import router as metrics_router
|
||
from src.web.api.people import router as people_router
|
||
from src.web.api.pending_identifications import router as pending_identifications_router
|
||
from src.web.api.pending_linkages import router as pending_linkages_router
|
||
from src.web.api.photos import router as photos_router
|
||
from src.web.api.reported_photos import router as reported_photos_router
|
||
from src.web.api.pending_photos import router as pending_photos_router
|
||
from src.web.api.tags import router as tags_router
|
||
from src.web.api.users import router as users_router
|
||
from src.web.api.auth_users import router as auth_users_router
|
||
from src.web.api.role_permissions import router as role_permissions_router
|
||
from src.web.api.version import router as version_router
|
||
from src.web.settings import APP_TITLE, APP_VERSION
|
||
from src.web.constants.roles import DEFAULT_ADMIN_ROLE, DEFAULT_USER_ROLE, ROLE_VALUES
|
||
from src.web.db.base import Base, engine
|
||
from src.web.db.session import database_url
|
||
# Import models to ensure they're registered with Base.metadata
|
||
from src.web.db import models # noqa: F401
|
||
from src.web.db.models import RolePermission
|
||
from src.web.utils.password import hash_password
|
||
|
||
# Global worker process (will be set in lifespan)
|
||
_worker_process: subprocess.Popen | None = None
|
||
|
||
|
||
def start_worker() -> None:
|
||
"""Start RQ worker in background subprocess."""
|
||
global _worker_process
|
||
|
||
try:
|
||
from redis import Redis
|
||
|
||
# Check Redis connection first
|
||
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=False)
|
||
redis_conn.ping()
|
||
|
||
# Start worker as a subprocess (avoids signal handler issues)
|
||
project_root = Path(__file__).parent.parent.parent
|
||
python_executable = sys.executable
|
||
|
||
_worker_process = subprocess.Popen(
|
||
[
|
||
python_executable,
|
||
"-m",
|
||
"src.web.worker",
|
||
],
|
||
cwd=str(project_root),
|
||
stdout=None, # Don't capture - let output go to console
|
||
stderr=None, # Don't capture - let errors go to console
|
||
env={
|
||
**{k: v for k, v in os.environ.items()},
|
||
"PYTHONPATH": str(project_root),
|
||
}
|
||
)
|
||
# Give it a moment to start, then check if it's still running
|
||
import time
|
||
time.sleep(0.5)
|
||
if _worker_process.poll() is not None:
|
||
# Process already exited - there was an error
|
||
print(f"❌ Worker process exited immediately with code {_worker_process.returncode}")
|
||
print(" Check worker errors above")
|
||
else:
|
||
print(f"✅ RQ worker started in background subprocess (PID: {_worker_process.pid})")
|
||
except Exception as e:
|
||
print(f"⚠️ Failed to start RQ worker: {e}")
|
||
print(" Background jobs will not be processed. Ensure Redis is running.")
|
||
|
||
|
||
def stop_worker() -> None:
|
||
"""Stop RQ worker gracefully."""
|
||
global _worker_process
|
||
|
||
if _worker_process:
|
||
try:
|
||
_worker_process.terminate()
|
||
try:
|
||
_worker_process.wait(timeout=5)
|
||
except subprocess.TimeoutExpired:
|
||
_worker_process.kill()
|
||
print("✅ RQ worker stopped")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def ensure_user_password_hash_column(inspector) -> None:
|
||
"""Ensure users table contains password_hash column."""
|
||
if "users" not in inspector.get_table_names():
|
||
print("ℹ️ Users table does not exist yet - will be created with password_hash column")
|
||
return
|
||
|
||
columns = {column["name"] for column in inspector.get_columns("users")}
|
||
if "password_hash" in columns:
|
||
print("ℹ️ password_hash column already exists in users table")
|
||
return
|
||
|
||
print("🔄 Adding password_hash column to users table...")
|
||
|
||
default_hash = hash_password("changeme")
|
||
dialect = engine.dialect.name
|
||
|
||
with engine.connect() as connection:
|
||
with connection.begin():
|
||
if dialect == "postgresql":
|
||
# PostgreSQL: Add column as nullable first, then update, then set NOT NULL
|
||
connection.execute(
|
||
text("ALTER TABLE users ADD COLUMN IF NOT EXISTS password_hash TEXT")
|
||
)
|
||
connection.execute(
|
||
text(
|
||
"UPDATE users SET password_hash = :default_hash "
|
||
"WHERE password_hash IS NULL OR password_hash = ''"
|
||
),
|
||
{"default_hash": default_hash},
|
||
)
|
||
# Set NOT NULL constraint
|
||
connection.execute(
|
||
text("ALTER TABLE users ALTER COLUMN password_hash SET NOT NULL")
|
||
)
|
||
else:
|
||
# SQLite
|
||
connection.execute(
|
||
text("ALTER TABLE users ADD COLUMN password_hash TEXT")
|
||
)
|
||
connection.execute(
|
||
text(
|
||
"UPDATE users SET password_hash = :default_hash "
|
||
"WHERE password_hash IS NULL OR password_hash = ''"
|
||
),
|
||
{"default_hash": default_hash},
|
||
)
|
||
print("✅ Added password_hash column to users table (default password: changeme)")
|
||
|
||
|
||
def ensure_user_password_change_required_column(inspector) -> None:
|
||
"""Ensure users table contains password_change_required column."""
|
||
if "users" not in inspector.get_table_names():
|
||
return
|
||
|
||
columns = {column["name"] for column in inspector.get_columns("users")}
|
||
if "password_change_required" in columns:
|
||
print("ℹ️ password_change_required column already exists in users table")
|
||
return
|
||
|
||
print("🔄 Adding password_change_required column to users table...")
|
||
dialect = engine.dialect.name
|
||
|
||
with engine.connect() as connection:
|
||
with connection.begin():
|
||
if dialect == "postgresql":
|
||
connection.execute(
|
||
text("ALTER TABLE users ADD COLUMN IF NOT EXISTS password_change_required BOOLEAN NOT NULL DEFAULT true")
|
||
)
|
||
else:
|
||
# SQLite
|
||
connection.execute(
|
||
text("ALTER TABLE users ADD COLUMN password_change_required BOOLEAN DEFAULT 1")
|
||
)
|
||
connection.execute(
|
||
text("UPDATE users SET password_change_required = 1 WHERE password_change_required IS NULL")
|
||
)
|
||
print("✅ Added password_change_required column to users table")
|
||
|
||
|
||
def ensure_user_email_unique_constraint(inspector) -> None:
|
||
"""Ensure users table email column has a unique constraint."""
|
||
if "users" not in inspector.get_table_names():
|
||
return
|
||
|
||
# Check if email column exists
|
||
columns = {col["name"] for col in inspector.get_columns("users")}
|
||
if "email" not in columns:
|
||
print("ℹ️ email column does not exist in users table yet")
|
||
return
|
||
|
||
# Check if unique constraint already exists on email
|
||
dialect = engine.dialect.name
|
||
with engine.connect() as connection:
|
||
if dialect == "postgresql":
|
||
# Check if unique constraint exists
|
||
result = connection.execute(text("""
|
||
SELECT constraint_name
|
||
FROM information_schema.table_constraints
|
||
WHERE table_name = 'users'
|
||
AND constraint_type = 'UNIQUE'
|
||
AND constraint_name LIKE '%email%'
|
||
"""))
|
||
if result.first():
|
||
print("ℹ️ Unique constraint on email column already exists")
|
||
return
|
||
|
||
# Try to add unique constraint (will fail if duplicates exist)
|
||
try:
|
||
print("🔄 Adding unique constraint to email column...")
|
||
connection.execute(text("ALTER TABLE users ADD CONSTRAINT uq_users_email UNIQUE (email)"))
|
||
connection.commit()
|
||
print("✅ Added unique constraint to email column")
|
||
except Exception as e:
|
||
# If constraint already exists or duplicates exist, that's okay
|
||
# API validation will prevent new duplicates
|
||
if "already exists" in str(e).lower() or "duplicate" in str(e).lower():
|
||
print(f"ℹ️ Could not add unique constraint (may have duplicates): {e}")
|
||
else:
|
||
print(f"⚠️ Could not add unique constraint: {e}")
|
||
else:
|
||
# SQLite - unique constraint is handled at column level
|
||
# Check if column already has unique constraint
|
||
# SQLite doesn't easily support adding unique constraints to existing columns
|
||
# The model definition will handle it for new tables
|
||
print("ℹ️ SQLite: Unique constraint on email will be enforced by model definition for new tables")
|
||
|
||
|
||
def ensure_face_identified_by_user_id_column(inspector) -> None:
|
||
"""Ensure faces table contains identified_by_user_id column."""
|
||
if "faces" not in inspector.get_table_names():
|
||
return
|
||
|
||
columns = {column["name"] for column in inspector.get_columns("faces")}
|
||
if "identified_by_user_id" in columns:
|
||
print("ℹ️ identified_by_user_id column already exists in faces table")
|
||
return
|
||
|
||
print("🔄 Adding identified_by_user_id column to faces table...")
|
||
dialect = engine.dialect.name
|
||
|
||
with engine.connect() as connection:
|
||
with connection.begin():
|
||
if dialect == "postgresql":
|
||
connection.execute(
|
||
text("ALTER TABLE faces ADD COLUMN IF NOT EXISTS identified_by_user_id INTEGER REFERENCES users(id)")
|
||
)
|
||
# Add index
|
||
try:
|
||
connection.execute(
|
||
text("CREATE INDEX IF NOT EXISTS idx_faces_identified_by ON faces(identified_by_user_id)")
|
||
)
|
||
except Exception:
|
||
pass # Index might already exist
|
||
else:
|
||
# SQLite
|
||
connection.execute(
|
||
text("ALTER TABLE faces ADD COLUMN identified_by_user_id INTEGER REFERENCES users(id)")
|
||
)
|
||
# SQLite doesn't support IF NOT EXISTS for indexes, so we'll try to create it
|
||
try:
|
||
connection.execute(
|
||
text("CREATE INDEX idx_faces_identified_by ON faces(identified_by_user_id)")
|
||
)
|
||
except Exception:
|
||
pass # Index might already exist
|
||
print("✅ Added identified_by_user_id column to faces table")
|
||
|
||
|
||
def ensure_user_role_column(inspector) -> None:
|
||
"""Ensure users table has a role column with valid values."""
|
||
if "users" not in inspector.get_table_names():
|
||
return
|
||
|
||
columns = {column["name"] for column in inspector.get_columns("users")}
|
||
dialect = engine.dialect.name
|
||
role_values = sorted(ROLE_VALUES)
|
||
placeholder_parts = ", ".join(
|
||
f":role_value_{index}" for index, _ in enumerate(role_values)
|
||
)
|
||
where_clause = (
|
||
"role IS NULL OR role = ''"
|
||
if not placeholder_parts
|
||
else f"role IS NULL OR role = '' OR role NOT IN ({placeholder_parts})"
|
||
)
|
||
params = {
|
||
f"role_value_{index}": value for index, value in enumerate(role_values)
|
||
}
|
||
params["admin_role"] = DEFAULT_ADMIN_ROLE
|
||
params["default_role"] = DEFAULT_USER_ROLE
|
||
|
||
with engine.connect() as connection:
|
||
with connection.begin():
|
||
if "role" not in columns:
|
||
if dialect == "postgresql":
|
||
connection.execute(
|
||
text(
|
||
f"ALTER TABLE users ADD COLUMN IF NOT EXISTS role TEXT "
|
||
f"NOT NULL DEFAULT '{DEFAULT_USER_ROLE}'"
|
||
)
|
||
)
|
||
else:
|
||
connection.execute(
|
||
text(
|
||
f"ALTER TABLE users ADD COLUMN role TEXT "
|
||
f"DEFAULT '{DEFAULT_USER_ROLE}'"
|
||
)
|
||
)
|
||
connection.execute(
|
||
text(
|
||
f"""
|
||
UPDATE users
|
||
SET role = CASE
|
||
WHEN is_admin THEN :admin_role
|
||
ELSE :default_role
|
||
END
|
||
WHERE {where_clause}
|
||
"""
|
||
),
|
||
params,
|
||
)
|
||
connection.execute(
|
||
text("CREATE INDEX IF NOT EXISTS idx_users_role ON users(role)")
|
||
)
|
||
print("✅ Ensured users.role column exists and is populated")
|
||
|
||
|
||
def ensure_role_permissions_table(inspector) -> None:
|
||
"""Ensure the role_permissions table exists for permission matrix."""
|
||
if "role_permissions" in inspector.get_table_names():
|
||
return
|
||
|
||
try:
|
||
print("🔄 Creating role_permissions table...")
|
||
RolePermission.__table__.create(bind=engine, checkfirst=True)
|
||
print("✅ Created role_permissions table")
|
||
except Exception as exc:
|
||
print(f"⚠️ Failed to create role_permissions table: {exc}")
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
"""Lifespan context manager for startup and shutdown events."""
|
||
# Ensure database exists and tables are created on first run
|
||
try:
|
||
if database_url.startswith("sqlite"):
|
||
db_path = database_url.replace("sqlite:///", "")
|
||
db_file = Path(db_path)
|
||
db_file.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Only create tables if they don't already exist (safety check)
|
||
inspector = inspect(engine)
|
||
existing_tables = set(inspector.get_table_names())
|
||
|
||
# Check if required application tables exist (not just alembic_version)
|
||
required_tables = {"photos", "people", "faces", "tags", "phototaglinkage", "person_encodings", "photo_favorites", "users"}
|
||
missing_tables = required_tables - existing_tables
|
||
|
||
if missing_tables:
|
||
# Some required tables are missing - create all tables
|
||
# create_all() only creates missing tables, won't drop existing ones
|
||
Base.metadata.create_all(bind=engine)
|
||
if len(missing_tables) == len(required_tables):
|
||
print("✅ Database initialized (first run - tables created)")
|
||
else:
|
||
print(f"✅ Database tables created (missing tables: {', '.join(missing_tables)})")
|
||
else:
|
||
# All required tables exist - don't recreate (prevents data loss)
|
||
print(f"✅ Database already initialized ({len(existing_tables)} tables exist)")
|
||
|
||
# Ensure new columns exist (backward compatibility without migrations)
|
||
ensure_user_password_hash_column(inspector)
|
||
ensure_user_password_change_required_column(inspector)
|
||
ensure_user_email_unique_constraint(inspector)
|
||
ensure_face_identified_by_user_id_column(inspector)
|
||
ensure_user_role_column(inspector)
|
||
ensure_role_permissions_table(inspector)
|
||
except Exception as exc:
|
||
print(f"❌ Database initialization failed: {exc}")
|
||
raise
|
||
# Startup
|
||
start_worker()
|
||
yield
|
||
# Shutdown
|
||
stop_worker()
|
||
|
||
|
||
def create_app() -> FastAPI:
|
||
"""Create and configure the FastAPI application instance."""
|
||
app = FastAPI(
|
||
title=APP_TITLE,
|
||
version=APP_VERSION,
|
||
lifespan=lifespan,
|
||
)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
app.include_router(health_router, tags=["health"])
|
||
app.include_router(version_router, tags=["meta"])
|
||
app.include_router(metrics_router, tags=["metrics"])
|
||
app.include_router(auth_router, prefix="/api/v1")
|
||
app.include_router(jobs_router, prefix="/api/v1")
|
||
app.include_router(photos_router, prefix="/api/v1")
|
||
app.include_router(faces_router, prefix="/api/v1")
|
||
app.include_router(people_router, prefix="/api/v1")
|
||
app.include_router(pending_identifications_router, prefix="/api/v1")
|
||
app.include_router(pending_linkages_router, prefix="/api/v1")
|
||
app.include_router(reported_photos_router, prefix="/api/v1")
|
||
app.include_router(pending_photos_router, prefix="/api/v1")
|
||
app.include_router(tags_router, prefix="/api/v1")
|
||
app.include_router(users_router, prefix="/api/v1")
|
||
app.include_router(auth_users_router, prefix="/api/v1")
|
||
app.include_router(role_permissions_router, prefix="/api/v1")
|
||
|
||
return app
|
||
|
||
|
||
app = create_app()
|
||
|
||
|