Tanya 68d280e8f5 feat: Add new analysis documents and update installation scripts for backend integration
This commit introduces several new analysis documents, including Auto-Match Load Performance Analysis, Folder Picker Analysis, Monorepo Migration Summary, and various performance analysis documents. Additionally, the installation scripts are updated to reflect changes in backend service paths, ensuring proper integration with the new backend structure. These enhancements provide better documentation and streamline the setup process for users.
2025-12-30 15:04:32 -05:00

535 lines
18 KiB
Python

"""User management endpoints - admin only."""
from __future__ import annotations
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from fastapi.responses import JSONResponse
from sqlalchemy import text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from backend.api.auth import get_current_user
from backend.constants.roles import (
DEFAULT_ADMIN_ROLE,
DEFAULT_USER_ROLE,
ROLE_VALUES,
UserRole,
is_admin_role,
)
from backend.db.session import get_auth_db, get_db
from backend.db.models import Face, PhotoFavorite, PhotoPersonLinkage, User
from backend.schemas.users import (
UserCreateRequest,
UserResponse,
UserUpdateRequest,
UsersListResponse,
)
from backend.utils.password import hash_password
from backend.services.role_permissions import fetch_role_permissions_map
router = APIRouter(prefix="/users", tags=["users"])
logger = logging.getLogger(__name__)
def _normalize_role_and_admin(
role: str | None,
is_admin_flag: bool | None,
) -> tuple[str, bool]:
"""Normalize requested role/is_admin values into a consistent pair."""
selected_role = role or (DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE)
if selected_role not in ROLE_VALUES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid role '{selected_role}'",
)
derived_is_admin = is_admin_role(selected_role)
if is_admin_flag is not None and is_admin_flag != derived_is_admin:
logger.warning(
"Role/is_admin mismatch detected. Using role-derived admin flag.",
extra={"role": selected_role, "is_admin_flag": is_admin_flag},
)
return selected_role, derived_is_admin
def _ensure_role_set(user: User) -> None:
"""Guarantee that a User instance has a valid role value."""
if user.role in ROLE_VALUES:
return
fallback_role = DEFAULT_ADMIN_ROLE if user.is_admin else DEFAULT_USER_ROLE
user.role = fallback_role
def get_auth_db_optional() -> Session | None:
"""Get auth database session if available, otherwise return None."""
try:
return next(get_auth_db())
except ValueError:
# Auth database not configured
return None
def create_auth_user_if_missing(
email: str,
full_name: str,
password_hash: str,
is_admin: bool,
) -> None:
"""Create matching auth user if one does not already exist."""
if not email:
return
auth_db = get_auth_db_optional()
if auth_db is None:
return
try:
check_result = auth_db.execute(
text(
"""
SELECT id FROM users
WHERE email = :email
"""
),
{"email": email},
)
existing_auth = check_result.first()
if existing_auth:
return
dialect = auth_db.bind.dialect.name if auth_db.bind else "postgresql"
supports_returning = dialect == "postgresql"
has_write_access = is_admin
if supports_returning:
auth_db.execute(
text(
"""
INSERT INTO users (email, name, password_hash, is_admin, has_write_access)
VALUES (:email, :name, :password_hash, :is_admin, :has_write_access)
"""
),
{
"email": email,
"name": full_name,
"password_hash": password_hash,
"is_admin": is_admin,
"has_write_access": has_write_access,
},
)
auth_db.commit()
else:
auth_db.execute(
text(
"""
INSERT INTO users (email, name, password_hash, is_admin, has_write_access)
VALUES (:email, :name, :password_hash, :is_admin, :has_write_access)
"""
),
{
"email": email,
"name": full_name,
"password_hash": password_hash,
"is_admin": is_admin,
"has_write_access": has_write_access,
},
)
auth_db.commit()
except Exception as e: # pragma: no cover - logging helper
auth_db.rollback()
import traceback
print(
f"Warning: Failed to create auth user: {str(e)}\n{traceback.format_exc()}"
)
finally:
auth_db.close()
def get_current_admin_user(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Get current user and verify admin status from main database.
Raises HTTPException if user is not an admin.
If no admin users exist, allows the current user to bootstrap as admin.
"""
username = current_user["username"]
# Check if any admin users exist
admin_count = db.query(User).filter(User.is_admin == True).count()
# If no admins exist, allow current user to bootstrap as admin
if admin_count == 0:
# Check if user already exists in main database
main_user = db.query(User).filter(User.username == username).first()
if not main_user:
# Create the user as admin for bootstrap
# Use a default password hash (user should change password after first login)
# In production, this should be handled differently
default_password_hash = hash_password("changeme")
main_user = User(
username=username,
password_hash=default_password_hash,
is_active=True,
is_admin=True,
role=DEFAULT_ADMIN_ROLE,
)
db.add(main_user)
db.commit()
db.refresh(main_user)
elif not main_user.is_admin:
# User exists but is not admin - make them admin for bootstrap
main_user.is_admin = True
main_user.role = DEFAULT_ADMIN_ROLE
db.add(main_user)
db.commit()
db.refresh(main_user)
return {"username": username, "user_id": main_user.id}
# Normal admin check - user must exist and be admin
main_user = db.query(User).filter(User.username == username).first()
if not main_user or not main_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return {"username": username, "user_id": main_user.id}
def require_feature_permission(feature_key: str):
"""Return a dependency that enforces feature-level access via role permissions."""
def dependency(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
username = current_user["username"]
user = db.query(User).filter(User.username == username).first()
if not user:
default_password_hash = hash_password("changeme")
user = User(
username=username,
password_hash=default_password_hash,
is_active=True,
is_admin=False,
role=DEFAULT_USER_ROLE,
)
db.add(user)
db.commit()
db.refresh(user)
_ensure_role_set(user)
has_access = user.is_admin or is_admin_role(user.role)
if not has_access:
permissions_map = fetch_role_permissions_map(db)
role_permissions = permissions_map.get(user.role, {})
has_access = bool(role_permissions.get(feature_key))
if not has_access:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied for this feature",
)
return {
"username": username,
"user_id": user.id,
"role": user.role,
"is_admin": user.is_admin,
}
return dependency
@router.get("", response_model=UsersListResponse)
def list_users(
current_admin: Annotated[dict, Depends(get_current_admin_user)],
is_active: bool | None = Query(None, description="Filter by active status"),
is_admin: bool | None = Query(None, description="Filter by admin status"),
db: Session = Depends(get_db),
) -> UsersListResponse:
"""List all users - admin only.
Optionally filter by is_active and/or is_admin status.
"""
query = db.query(User)
if is_active is not None:
query = query.filter(User.is_active == is_active)
if is_admin is not None:
query = query.filter(User.is_admin == is_admin)
users = query.order_by(User.username.asc()).all()
for user in users:
_ensure_role_set(user)
items = [UserResponse.model_validate(u) for u in users]
return UsersListResponse(items=items, total=len(items))
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
current_admin: Annotated[dict, Depends(get_current_admin_user)],
request: UserCreateRequest,
db: Session = Depends(get_db),
) -> UserResponse:
"""Create a new user - admin only.
If give_frontend_permission is True, also creates the user in the auth database
for frontend access.
"""
# Check if username already exists
existing_user = db.query(User).filter(User.username == request.username).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Username '{request.username}' already exists",
)
# Check if email already exists
existing_email = db.query(User).filter(User.email == request.email).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Email address '{request.email}' is already in use",
)
# Hash the password before storing
password_hash = hash_password(request.password)
if request.role is None:
requested_role = None
elif isinstance(request.role, UserRole):
requested_role = request.role.value
else:
requested_role = str(request.role)
normalized_role, normalized_is_admin = _normalize_role_and_admin(
requested_role,
request.is_admin,
)
user = User(
username=request.username,
password_hash=password_hash,
email=request.email,
full_name=request.full_name,
is_active=request.is_active,
is_admin=normalized_is_admin,
role=normalized_role,
password_change_required=True, # Force password change on first login
)
db.add(user)
db.commit()
db.refresh(user)
if request.give_frontend_permission:
create_auth_user_if_missing(
email=request.email,
full_name=request.full_name,
password_hash=password_hash,
is_admin=normalized_is_admin,
)
return UserResponse.model_validate(user)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(
current_admin: Annotated[dict, Depends(get_current_admin_user)],
user_id: int,
db: Session = Depends(get_db),
) -> UserResponse:
"""Get a specific user by ID - admin only."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
_ensure_role_set(user)
return UserResponse.model_validate(user)
@router.put("/{user_id}", response_model=UserResponse)
def update_user(
current_admin: Annotated[dict, Depends(get_current_admin_user)],
user_id: int,
request: UserUpdateRequest,
db: Session = Depends(get_db),
) -> UserResponse:
"""Update a user - admin only."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
if request.role is None:
desired_role = None
elif isinstance(request.role, UserRole):
desired_role = request.role.value
else:
desired_role = str(request.role)
if desired_role is None:
if request.is_admin is not None:
desired_role = DEFAULT_ADMIN_ROLE if request.is_admin else DEFAULT_USER_ROLE
elif user.role:
desired_role = user.role
else:
desired_role = DEFAULT_ADMIN_ROLE if user.is_admin else DEFAULT_USER_ROLE
normalized_role, normalized_is_admin = _normalize_role_and_admin(
desired_role,
request.is_admin,
)
# Prevent admin from removing their own admin status
if current_admin["username"] == user.username and not normalized_is_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove your own admin status",
)
# Check if email is being changed and if the new email already exists
if request.email is not None and request.email != user.email:
existing_email = db.query(User).filter(User.email == request.email).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Email address '{request.email}' is already in use",
)
# Update fields if provided
if request.password is not None:
user.password_hash = hash_password(request.password)
if request.email is not None:
user.email = request.email
if request.full_name is not None:
user.full_name = request.full_name
if request.is_active is not None:
user.is_active = request.is_active
user.is_admin = normalized_is_admin
user.role = normalized_role
db.add(user)
db.commit()
db.refresh(user)
if request.give_frontend_permission:
create_auth_user_if_missing(
email=user.email,
full_name=user.full_name or user.username,
password_hash=user.password_hash,
is_admin=user.is_admin,
)
return UserResponse.model_validate(user)
@router.delete("/{user_id}")
def delete_user(
current_admin: Annotated[dict, Depends(get_current_admin_user)],
user_id: int,
db: Session = Depends(get_db),
) -> Response:
"""Delete a user - admin only.
If the user has linked data (faces identified, video person linkages),
the user will be set to inactive instead of deleted, and favorites will
be removed. Admins will be notified via logging.
Prevents admin from deleting themselves.
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found",
)
# Prevent admin from deleting themselves
if current_admin["username"] == user.username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete your own account",
)
# Check for linked data (faces or photo_person_linkages identified by this user)
faces_count = db.query(Face).filter(Face.identified_by_user_id == user_id).count()
linkages_count = db.query(PhotoPersonLinkage).filter(
PhotoPersonLinkage.identified_by_user_id == user_id
).count()
has_linked_data = faces_count > 0 or linkages_count > 0
# Always delete favorites (they use username, not user_id)
favorites_deleted = db.query(PhotoFavorite).filter(
PhotoFavorite.username == user.username
).delete()
if has_linked_data:
# Set user inactive instead of deleting
user.is_active = False
db.add(user)
db.commit()
# Notify admins via logging
logger.warning(
f"User '{user.username}' (ID: {user_id}) was set to inactive instead of deleted "
f"because they have linked data: {faces_count} face(s) and {linkages_count} "
f"video person linkage(s). {favorites_deleted} favorite(s) were deleted. "
f"Action performed by admin: {current_admin['username']}",
extra={
"user_id": user_id,
"username": user.username,
"faces_count": faces_count,
"linkages_count": linkages_count,
"favorites_deleted": favorites_deleted,
"admin_username": current_admin["username"],
}
)
# Return success but indicate user was deactivated
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"message": (
f"User '{user.username}' has been set to inactive because they have "
f"linked data ({faces_count} face(s), {linkages_count} linkage(s)). "
f"{favorites_deleted} favorite(s) were deleted."
),
"deactivated": True,
"faces_count": faces_count,
"linkages_count": linkages_count,
"favorites_deleted": favorites_deleted,
}
)
else:
# No linked data - safe to delete
db.delete(user)
db.commit()
logger.info(
f"User '{user.username}' (ID: {user_id}) was deleted. "
f"{favorites_deleted} favorite(s) were deleted. "
f"Action performed by admin: {current_admin['username']}",
extra={
"user_id": user_id,
"username": user.username,
"favorites_deleted": favorites_deleted,
"admin_username": current_admin["username"],
}
)
return Response(status_code=status.HTTP_204_NO_CONTENT)