This commit introduces several new analysis documents, including Auto-Match Load Performance Analysis, Folder Picker Analysis, Monorepo Migration Summary, and various performance analysis documents. Additionally, the installation scripts are updated to reflect changes in backend service paths, ensuring proper integration with the new backend structure. These enhancements provide better documentation and streamline the setup process for users.
358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""Authentication endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.constants.roles import (
|
|
DEFAULT_ADMIN_ROLE,
|
|
DEFAULT_USER_ROLE,
|
|
ROLE_VALUES,
|
|
)
|
|
from backend.db.session import get_db
|
|
from backend.db.models import User
|
|
from backend.utils.password import verify_password, hash_password
|
|
from backend.schemas.auth import (
|
|
LoginRequest,
|
|
RefreshRequest,
|
|
TokenResponse,
|
|
UserResponse,
|
|
PasswordChangeRequest,
|
|
PasswordChangeResponse,
|
|
)
|
|
from backend.services.role_permissions import fetch_role_permissions_map
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
security = HTTPBearer()
|
|
|
|
# Placeholder secrets - replace with env vars in production
|
|
SECRET_KEY = "dev-secret-key-change-in-production"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 360
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
# Single user mode placeholder - read from environment or use defaults
|
|
SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
|
|
SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta) -> str:
|
|
"""Create JWT access token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + expires_delta
|
|
to_encode.update({"exp": expire})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
"""Create JWT refresh token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
to_encode.update({"exp": expire, "type": "refresh"})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
|
|
) -> dict:
|
|
"""Get current user from JWT token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
return {"username": username}
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
|
|
|
|
def get_current_user_with_id(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Get current user with ID from main database.
|
|
|
|
Looks up the user in the main database and returns username and user_id.
|
|
If user doesn't exist, creates them (for bootstrap scenarios).
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist, create them (for bootstrap scenarios)
|
|
if not user:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=False,
|
|
role=DEFAULT_USER_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return {"username": username, "user_id": user.id}
|
|
|
|
|
|
def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str:
|
|
"""Determine the role value for a user, ensuring it is valid."""
|
|
if user and user.role in ROLE_VALUES:
|
|
return user.role
|
|
return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse:
|
|
"""Authenticate user and return tokens.
|
|
|
|
First checks main database for users, falls back to hardcoded admin/admin
|
|
for backward compatibility.
|
|
"""
|
|
# First, try to find user in main database
|
|
user = db.query(User).filter(User.username == credentials.username).first()
|
|
|
|
if user:
|
|
# User exists in main database - verify password
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Account is inactive",
|
|
)
|
|
|
|
# Check if password_hash exists (migration might not have run)
|
|
if not user.password_hash:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Password not set. Please contact administrator to set your password.",
|
|
)
|
|
|
|
if not verify_password(credentials.password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
# Update last login
|
|
user.last_login = datetime.utcnow()
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
# Generate tokens
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=user.password_change_required,
|
|
)
|
|
|
|
# Fallback to hardcoded admin/admin for backward compatibility
|
|
if (
|
|
credentials.username == SINGLE_USER_USERNAME
|
|
and credentials.password == SINGLE_USER_PASSWORD
|
|
):
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=False, # Hardcoded admin doesn't require password change
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh_token(request: RefreshRequest) -> TokenResponse:
|
|
"""Refresh access token using refresh token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
if payload.get("type") != "refresh":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token type",
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
)
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": username}, expires_delta=access_token_expires
|
|
)
|
|
new_refresh_token = create_refresh_token(data={"sub": username})
|
|
return TokenResponse(
|
|
access_token=access_token, refresh_token=new_refresh_token
|
|
)
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid refresh token",
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def get_current_user_info(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> UserResponse:
|
|
"""Get current user information including admin status."""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database to get admin status
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist in main database, check if we should bootstrap them
|
|
if not user:
|
|
# Check if any admin users exist
|
|
admin_count = db.query(User).filter(User.is_admin == True).count()
|
|
|
|
# If no admins exist, bootstrap current user as admin
|
|
if admin_count == 0:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user as admin for bootstrap (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
try:
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=True,
|
|
role=DEFAULT_ADMIN_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = True
|
|
except Exception:
|
|
# If creation fails (e.g., race condition), try to get existing user
|
|
db.rollback()
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if user:
|
|
# Update existing user to be admin if no admins exist
|
|
if not user.is_admin:
|
|
user.is_admin = True
|
|
user.role = DEFAULT_ADMIN_ROLE
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = user.is_admin
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = user.is_admin if user else False
|
|
|
|
role_value = _resolve_user_role(user, is_admin)
|
|
permissions_map = fetch_role_permissions_map(db)
|
|
permissions = permissions_map.get(role_value, {})
|
|
|
|
return UserResponse(
|
|
username=username,
|
|
is_admin=is_admin,
|
|
role=role_value,
|
|
permissions=permissions,
|
|
)
|
|
|
|
|
|
@router.post("/change-password", response_model=PasswordChangeResponse)
|
|
def change_password(
|
|
request: PasswordChangeRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> PasswordChangeResponse:
|
|
"""Change user password.
|
|
|
|
Requires current password verification.
|
|
After successful change, clears password_change_required flag.
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Find user in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
|
|
# Verify current password
|
|
if not verify_password(request.current_password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Current password is incorrect",
|
|
)
|
|
|
|
# Update password
|
|
user.password_hash = hash_password(request.new_password)
|
|
user.password_change_required = False # Clear the flag after password change
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
return PasswordChangeResponse(
|
|
success=True,
|
|
message="Password changed successfully",
|
|
)
|
|
|
|
|
|
|