"""User management endpoints - admin only.""" from __future__ import annotations import logging from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query, Response, status from fastapi.responses import JSONResponse from sqlalchemy import text from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from backend.api.auth import get_current_user from backend.constants.roles import ( DEFAULT_ADMIN_ROLE, DEFAULT_USER_ROLE, ROLE_VALUES, UserRole, is_admin_role, ) from backend.db.session import get_auth_db, get_db from backend.db.models import Face, PhotoFavorite, PhotoPersonLinkage, User from backend.schemas.users import ( UserCreateRequest, UserResponse, UserUpdateRequest, UsersListResponse, ) from backend.utils.password import hash_password from backend.services.role_permissions import fetch_role_permissions_map router = APIRouter(prefix="/users", tags=["users"]) logger = logging.getLogger(__name__) def _normalize_role_and_admin( role: str | None, is_admin_flag: bool | None, ) -> tuple[str, bool]: """Normalize requested role/is_admin values into a consistent pair.""" selected_role = role or (DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE) if selected_role not in ROLE_VALUES: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role '{selected_role}'", ) derived_is_admin = is_admin_role(selected_role) if is_admin_flag is not None and is_admin_flag != derived_is_admin: logger.warning( "Role/is_admin mismatch detected. Using role-derived admin flag.", extra={"role": selected_role, "is_admin_flag": is_admin_flag}, ) return selected_role, derived_is_admin def _ensure_role_set(user: User) -> None: """Guarantee that a User instance has a valid role value.""" if user.role in ROLE_VALUES: return fallback_role = DEFAULT_ADMIN_ROLE if user.is_admin else DEFAULT_USER_ROLE user.role = fallback_role def get_auth_db_optional() -> Session | None: """Get auth database session if available, otherwise return None.""" try: return next(get_auth_db()) except ValueError: # Auth database not configured return None def create_auth_user_if_missing( email: str, full_name: str, password_hash: str, is_admin: bool, ) -> None: """Create matching auth user if one does not already exist.""" if not email: return auth_db = get_auth_db_optional() if auth_db is None: return try: check_result = auth_db.execute( text( """ SELECT id FROM users WHERE email = :email """ ), {"email": email}, ) existing_auth = check_result.first() if existing_auth: return dialect = auth_db.bind.dialect.name if auth_db.bind else "postgresql" supports_returning = dialect == "postgresql" has_write_access = is_admin if supports_returning: auth_db.execute( text( """ INSERT INTO users (email, name, password_hash, is_admin, has_write_access) VALUES (:email, :name, :password_hash, :is_admin, :has_write_access) """ ), { "email": email, "name": full_name, "password_hash": password_hash, "is_admin": is_admin, "has_write_access": has_write_access, }, ) auth_db.commit() else: auth_db.execute( text( """ INSERT INTO users (email, name, password_hash, is_admin, has_write_access) VALUES (:email, :name, :password_hash, :is_admin, :has_write_access) """ ), { "email": email, "name": full_name, "password_hash": password_hash, "is_admin": is_admin, "has_write_access": has_write_access, }, ) auth_db.commit() except Exception as e: # pragma: no cover - logging helper auth_db.rollback() import traceback print( f"Warning: Failed to create auth user: {str(e)}\n{traceback.format_exc()}" ) finally: auth_db.close() def get_current_admin_user( current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> dict: """Get current user and verify admin status from main database. Raises HTTPException if user is not an admin. If no admin users exist, allows the current user to bootstrap as admin. """ username = current_user["username"] # Check if any admin users exist admin_count = db.query(User).filter(User.is_admin == True).count() # If no admins exist, allow current user to bootstrap as admin if admin_count == 0: # Check if user already exists in main database main_user = db.query(User).filter(User.username == username).first() if not main_user: # Create the user as admin for bootstrap # Use a default password hash (user should change password after first login) # In production, this should be handled differently default_password_hash = hash_password("changeme") main_user = User( username=username, password_hash=default_password_hash, is_active=True, is_admin=True, role=DEFAULT_ADMIN_ROLE, ) db.add(main_user) db.commit() db.refresh(main_user) elif not main_user.is_admin: # User exists but is not admin - make them admin for bootstrap main_user.is_admin = True main_user.role = DEFAULT_ADMIN_ROLE db.add(main_user) db.commit() db.refresh(main_user) return {"username": username, "user_id": main_user.id} # Normal admin check - user must exist and be admin main_user = db.query(User).filter(User.username == username).first() if not main_user or not main_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return {"username": username, "user_id": main_user.id} def require_feature_permission(feature_key: str): """Return a dependency that enforces feature-level access via role permissions.""" def dependency( current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> dict: username = current_user["username"] user = db.query(User).filter(User.username == username).first() if not user: default_password_hash = hash_password("changeme") user = User( username=username, password_hash=default_password_hash, is_active=True, is_admin=False, role=DEFAULT_USER_ROLE, ) db.add(user) db.commit() db.refresh(user) _ensure_role_set(user) has_access = user.is_admin or is_admin_role(user.role) if not has_access: permissions_map = fetch_role_permissions_map(db) role_permissions = permissions_map.get(user.role, {}) has_access = bool(role_permissions.get(feature_key)) if not has_access: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied for this feature", ) return { "username": username, "user_id": user.id, "role": user.role, "is_admin": user.is_admin, } return dependency @router.get("", response_model=UsersListResponse) def list_users( current_admin: Annotated[dict, Depends(get_current_admin_user)], is_active: bool | None = Query(None, description="Filter by active status"), is_admin: bool | None = Query(None, description="Filter by admin status"), db: Session = Depends(get_db), ) -> UsersListResponse: """List all users - admin only. Optionally filter by is_active and/or is_admin status. """ query = db.query(User) if is_active is not None: query = query.filter(User.is_active == is_active) if is_admin is not None: query = query.filter(User.is_admin == is_admin) users = query.order_by(User.username.asc()).all() for user in users: _ensure_role_set(user) items = [UserResponse.model_validate(u) for u in users] return UsersListResponse(items=items, total=len(items)) @router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( current_admin: Annotated[dict, Depends(get_current_admin_user)], request: UserCreateRequest, db: Session = Depends(get_db), ) -> UserResponse: """Create a new user - admin only. If give_frontend_permission is True, also creates the user in the auth database for frontend access. """ # Check if username already exists existing_user = db.query(User).filter(User.username == request.username).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Username '{request.username}' already exists", ) # Check if email already exists existing_email = db.query(User).filter(User.email == request.email).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Email address '{request.email}' is already in use", ) # Hash the password before storing password_hash = hash_password(request.password) if request.role is None: requested_role = None elif isinstance(request.role, UserRole): requested_role = request.role.value else: requested_role = str(request.role) normalized_role, normalized_is_admin = _normalize_role_and_admin( requested_role, request.is_admin, ) user = User( username=request.username, password_hash=password_hash, email=request.email, full_name=request.full_name, is_active=request.is_active, is_admin=normalized_is_admin, role=normalized_role, password_change_required=True, # Force password change on first login ) db.add(user) db.commit() db.refresh(user) if request.give_frontend_permission: create_auth_user_if_missing( email=request.email, full_name=request.full_name, password_hash=password_hash, is_admin=normalized_is_admin, ) return UserResponse.model_validate(user) @router.get("/{user_id}", response_model=UserResponse) def get_user( current_admin: Annotated[dict, Depends(get_current_admin_user)], user_id: int, db: Session = Depends(get_db), ) -> UserResponse: """Get a specific user by ID - admin only.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {user_id} not found", ) _ensure_role_set(user) return UserResponse.model_validate(user) @router.put("/{user_id}", response_model=UserResponse) def update_user( current_admin: Annotated[dict, Depends(get_current_admin_user)], user_id: int, request: UserUpdateRequest, db: Session = Depends(get_db), ) -> UserResponse: """Update a user - admin only.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {user_id} not found", ) if request.role is None: desired_role = None elif isinstance(request.role, UserRole): desired_role = request.role.value else: desired_role = str(request.role) if desired_role is None: if request.is_admin is not None: desired_role = DEFAULT_ADMIN_ROLE if request.is_admin else DEFAULT_USER_ROLE elif user.role: desired_role = user.role else: desired_role = DEFAULT_ADMIN_ROLE if user.is_admin else DEFAULT_USER_ROLE normalized_role, normalized_is_admin = _normalize_role_and_admin( desired_role, request.is_admin, ) # Prevent admin from removing their own admin status if current_admin["username"] == user.username and not normalized_is_admin: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot remove your own admin status", ) # Check if email is being changed and if the new email already exists if request.email is not None and request.email != user.email: existing_email = db.query(User).filter(User.email == request.email).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Email address '{request.email}' is already in use", ) # Update fields if provided if request.password is not None: user.password_hash = hash_password(request.password) if request.email is not None: user.email = request.email if request.full_name is not None: user.full_name = request.full_name if request.is_active is not None: user.is_active = request.is_active user.is_admin = normalized_is_admin user.role = normalized_role db.add(user) db.commit() db.refresh(user) if request.give_frontend_permission: create_auth_user_if_missing( email=user.email, full_name=user.full_name or user.username, password_hash=user.password_hash, is_admin=user.is_admin, ) return UserResponse.model_validate(user) @router.delete("/{user_id}") def delete_user( current_admin: Annotated[dict, Depends(get_current_admin_user)], user_id: int, db: Session = Depends(get_db), ) -> Response: """Delete a user - admin only. If the user has linked data (faces identified, video person linkages), the user will be set to inactive instead of deleted, and favorites will be removed. Admins will be notified via logging. Prevents admin from deleting themselves. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {user_id} not found", ) # Prevent admin from deleting themselves if current_admin["username"] == user.username: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account", ) # Check for linked data (faces or photo_person_linkages identified by this user) faces_count = db.query(Face).filter(Face.identified_by_user_id == user_id).count() linkages_count = db.query(PhotoPersonLinkage).filter( PhotoPersonLinkage.identified_by_user_id == user_id ).count() has_linked_data = faces_count > 0 or linkages_count > 0 # Always delete favorites (they use username, not user_id) favorites_deleted = db.query(PhotoFavorite).filter( PhotoFavorite.username == user.username ).delete() if has_linked_data: # Set user inactive instead of deleting user.is_active = False db.add(user) db.commit() # Notify admins via logging logger.warning( f"User '{user.username}' (ID: {user_id}) was set to inactive instead of deleted " f"because they have linked data: {faces_count} face(s) and {linkages_count} " f"video person linkage(s). {favorites_deleted} favorite(s) were deleted. " f"Action performed by admin: {current_admin['username']}", extra={ "user_id": user_id, "username": user.username, "faces_count": faces_count, "linkages_count": linkages_count, "favorites_deleted": favorites_deleted, "admin_username": current_admin["username"], } ) # Return success but indicate user was deactivated return JSONResponse( status_code=status.HTTP_200_OK, content={ "message": ( f"User '{user.username}' has been set to inactive because they have " f"linked data ({faces_count} face(s), {linkages_count} linkage(s)). " f"{favorites_deleted} favorite(s) were deleted." ), "deactivated": True, "faces_count": faces_count, "linkages_count": linkages_count, "favorites_deleted": favorites_deleted, } ) else: # No linked data - safe to delete db.delete(user) db.commit() logger.info( f"User '{user.username}' (ID: {user_id}) was deleted. " f"{favorites_deleted} favorite(s) were deleted. " f"Action performed by admin: {current_admin['username']}", extra={ "user_id": user_id, "username": user.username, "favorites_deleted": favorites_deleted, "admin_username": current_admin["username"], } ) return Response(status_code=status.HTTP_204_NO_CONTENT)