punimtag/backend/api/auth.py
Tanya 68d280e8f5 feat: Add new analysis documents and update installation scripts for backend integration
This commit introduces several new analysis documents, including Auto-Match Load Performance Analysis, Folder Picker Analysis, Monorepo Migration Summary, and various performance analysis documents. Additionally, the installation scripts are updated to reflect changes in backend service paths, ensuring proper integration with the new backend structure. These enhancements provide better documentation and streamline the setup process for users.
2025-12-30 15:04:32 -05:00

358 lines
12 KiB
Python

"""Authentication endpoints."""
from __future__ import annotations
import os
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from backend.constants.roles import (
DEFAULT_ADMIN_ROLE,
DEFAULT_USER_ROLE,
ROLE_VALUES,
)
from backend.db.session import get_db
from backend.db.models import User
from backend.utils.password import verify_password, hash_password
from backend.schemas.auth import (
LoginRequest,
RefreshRequest,
TokenResponse,
UserResponse,
PasswordChangeRequest,
PasswordChangeResponse,
)
from backend.services.role_permissions import fetch_role_permissions_map
router = APIRouter(prefix="/auth", tags=["auth"])
security = HTTPBearer()
# Placeholder secrets - replace with env vars in production
SECRET_KEY = "dev-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 360
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Single user mode placeholder - read from environment or use defaults
SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production
def create_access_token(data: dict, expires_delta: timedelta) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
"""Create JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
) -> dict:
"""Get current user from JWT token."""
try:
payload = jwt.decode(
credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return {"username": username}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
def get_current_user_with_id(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Get current user with ID from main database.
Looks up the user in the main database and returns username and user_id.
If user doesn't exist, creates them (for bootstrap scenarios).
"""
username = current_user["username"]
# Check if user exists in main database
user = db.query(User).filter(User.username == username).first()
# If user doesn't exist, create them (for bootstrap scenarios)
if not user:
from backend.utils.password import hash_password
# Generate unique email to avoid conflicts
base_email = f"{username}@example.com"
email = base_email
counter = 1
# Ensure email is unique
while db.query(User).filter(User.email == email).first():
email = f"{username}+{counter}@example.com"
counter += 1
# Create user (they should change password)
default_password_hash = hash_password("changeme")
user = User(
username=username,
password_hash=default_password_hash,
email=email,
full_name=username,
is_active=True,
is_admin=False,
role=DEFAULT_USER_ROLE,
)
db.add(user)
db.commit()
db.refresh(user)
return {"username": username, "user_id": user.id}
def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str:
"""Determine the role value for a user, ensuring it is valid."""
if user and user.role in ROLE_VALUES:
return user.role
return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE
@router.post("/login", response_model=TokenResponse)
def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse:
"""Authenticate user and return tokens.
First checks main database for users, falls back to hardcoded admin/admin
for backward compatibility.
"""
# First, try to find user in main database
user = db.query(User).filter(User.username == credentials.username).first()
if user:
# User exists in main database - verify password
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Account is inactive",
)
# Check if password_hash exists (migration might not have run)
if not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Password not set. Please contact administrator to set your password.",
)
if not verify_password(credentials.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
# Update last login
user.last_login = datetime.utcnow()
db.add(user)
db.commit()
# Generate tokens
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": credentials.username},
expires_delta=access_token_expires,
)
refresh_token = create_refresh_token(data={"sub": credentials.username})
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
password_change_required=user.password_change_required,
)
# Fallback to hardcoded admin/admin for backward compatibility
if (
credentials.username == SINGLE_USER_USERNAME
and credentials.password == SINGLE_USER_PASSWORD
):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": credentials.username},
expires_delta=access_token_expires,
)
refresh_token = create_refresh_token(data={"sub": credentials.username})
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
password_change_required=False, # Hardcoded admin doesn't require password change
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
@router.post("/refresh", response_model=TokenResponse)
def refresh_token(request: RefreshRequest) -> TokenResponse:
"""Refresh access token using refresh token."""
try:
payload = jwt.decode(
request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM]
)
if payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
new_refresh_token = create_refresh_token(data={"sub": username})
return TokenResponse(
access_token=access_token, refresh_token=new_refresh_token
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
@router.get("/me", response_model=UserResponse)
def get_current_user_info(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> UserResponse:
"""Get current user information including admin status."""
username = current_user["username"]
# Check if user exists in main database to get admin status
user = db.query(User).filter(User.username == username).first()
# If user doesn't exist in main database, check if we should bootstrap them
if not user:
# Check if any admin users exist
admin_count = db.query(User).filter(User.is_admin == True).count()
# If no admins exist, bootstrap current user as admin
if admin_count == 0:
from backend.utils.password import hash_password
# Generate unique email to avoid conflicts
base_email = f"{username}@example.com"
email = base_email
counter = 1
# Ensure email is unique
while db.query(User).filter(User.email == email).first():
email = f"{username}+{counter}@example.com"
counter += 1
# Create user as admin for bootstrap (they should change password)
default_password_hash = hash_password("changeme")
try:
user = User(
username=username,
password_hash=default_password_hash,
email=email,
full_name=username,
is_active=True,
is_admin=True,
role=DEFAULT_ADMIN_ROLE,
)
db.add(user)
db.commit()
db.refresh(user)
is_admin = True
except Exception:
# If creation fails (e.g., race condition), try to get existing user
db.rollback()
user = db.query(User).filter(User.username == username).first()
if user:
# Update existing user to be admin if no admins exist
if not user.is_admin:
user.is_admin = True
user.role = DEFAULT_ADMIN_ROLE
db.commit()
db.refresh(user)
is_admin = user.is_admin
else:
is_admin = False
else:
is_admin = False
else:
is_admin = user.is_admin if user else False
role_value = _resolve_user_role(user, is_admin)
permissions_map = fetch_role_permissions_map(db)
permissions = permissions_map.get(role_value, {})
return UserResponse(
username=username,
is_admin=is_admin,
role=role_value,
permissions=permissions,
)
@router.post("/change-password", response_model=PasswordChangeResponse)
def change_password(
request: PasswordChangeRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> PasswordChangeResponse:
"""Change user password.
Requires current password verification.
After successful change, clears password_change_required flag.
"""
username = current_user["username"]
# Find user in main database
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Verify current password
if not verify_password(request.current_password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Current password is incorrect",
)
# Update password
user.password_hash = hash_password(request.new_password)
user.password_change_required = False # Clear the flag after password change
db.add(user)
db.commit()
return PasswordChangeResponse(
success=True,
message="Password changed successfully",
)