"""Authentication endpoints.""" from __future__ import annotations import os from datetime import datetime, timedelta from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials from jose import JWTError, jwt from sqlalchemy.orm import Session from backend.constants.roles import ( DEFAULT_ADMIN_ROLE, DEFAULT_USER_ROLE, ROLE_VALUES, ) from backend.db.session import get_db from backend.db.models import User from backend.utils.password import verify_password, hash_password from backend.schemas.auth import ( LoginRequest, RefreshRequest, TokenResponse, UserResponse, PasswordChangeRequest, PasswordChangeResponse, ) from backend.services.role_permissions import fetch_role_permissions_map router = APIRouter(prefix="/auth", tags=["auth"]) def get_bearer_token(request: Request) -> HTTPAuthorizationCredentials: """Custom security dependency that returns 401 for missing tokens (not 403). This replaces HTTPBearer() to follow HTTP standards where missing authentication should return 401 Unauthorized, not 403 Forbidden. """ authorization = request.headers.get("Authorization") if not authorization: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"}, ) # Parse Authorization header: "Bearer " parts = authorization.split(" ", 1) if len(parts) != 2: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication scheme", headers={"WWW-Authenticate": "Bearer"}, ) scheme, credentials = parts if scheme.lower() != "bearer": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication scheme", headers={"WWW-Authenticate": "Bearer"}, ) if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) # Read secrets from environment variables SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 360 REFRESH_TOKEN_EXPIRE_DAYS = 7 # Single user mode placeholder - read from environment or use defaults SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin") SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production def create_access_token(data: dict, expires_delta: timedelta) -> str: """Create JWT access token.""" to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(data: dict) -> str: """Create JWT refresh token.""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(get_bearer_token)] ) -> dict: """Get current user from JWT token.""" try: payload = jwt.decode( credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM] ) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) return {"username": username} except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) def get_current_user_with_id( current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> dict: """Get current user with ID from main database. Looks up the user in the main database and returns username and user_id. If user doesn't exist, creates them (for bootstrap scenarios). """ username = current_user["username"] # Check if user exists in main database user = db.query(User).filter(User.username == username).first() # If user doesn't exist, create them (for bootstrap scenarios) if not user: from backend.utils.password import hash_password # Generate unique email to avoid conflicts base_email = f"{username}@example.com" email = base_email counter = 1 # Ensure email is unique while db.query(User).filter(User.email == email).first(): email = f"{username}+{counter}@example.com" counter += 1 # Create user (they should change password) default_password_hash = hash_password("changeme") user = User( username=username, password_hash=default_password_hash, email=email, full_name=username, is_active=True, is_admin=False, role=DEFAULT_USER_ROLE, ) db.add(user) db.commit() db.refresh(user) return {"username": username, "user_id": user.id} def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str: """Determine the role value for a user, ensuring it is valid.""" if user and user.role in ROLE_VALUES: return user.role return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE @router.post("/login", response_model=TokenResponse) def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse: """Authenticate user and return tokens. First checks main database for users, falls back to hardcoded admin/admin for backward compatibility. """ # First, try to find user in main database user = db.query(User).filter(User.username == credentials.username).first() if user: # User exists in main database - verify password if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Account is inactive", ) # Check if password_hash exists (migration might not have run) if not user.password_hash: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Password not set. Please contact administrator to set your password.", ) if not verify_password(credentials.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", ) # Update last login user.last_login = datetime.utcnow() db.add(user) db.commit() # Generate tokens access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": credentials.username}, expires_delta=access_token_expires, ) refresh_token = create_refresh_token(data={"sub": credentials.username}) return TokenResponse( access_token=access_token, refresh_token=refresh_token, password_change_required=user.password_change_required, ) # Fallback to hardcoded admin/admin for backward compatibility if ( credentials.username == SINGLE_USER_USERNAME and credentials.password == SINGLE_USER_PASSWORD ): access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": credentials.username}, expires_delta=access_token_expires, ) refresh_token = create_refresh_token(data={"sub": credentials.username}) return TokenResponse( access_token=access_token, refresh_token=refresh_token, password_change_required=False, # Hardcoded admin doesn't require password change ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", ) @router.post("/refresh", response_model=TokenResponse) def refresh_token(request: RefreshRequest) -> TokenResponse: """Refresh access token using refresh token.""" try: payload = jwt.decode( request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM] ) if payload.get("type") != "refresh": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type", ) username: str = payload.get("sub") if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": username}, expires_delta=access_token_expires ) new_refresh_token = create_refresh_token(data={"sub": username}) return TokenResponse( access_token=access_token, refresh_token=new_refresh_token ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", ) @router.get("/me", response_model=UserResponse) def get_current_user_info( current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> UserResponse: """Get current user information including admin status.""" username = current_user["username"] # Check if user exists in main database to get admin status user = db.query(User).filter(User.username == username).first() # If user doesn't exist in main database, check if we should bootstrap them if not user: # Check if any admin users exist admin_count = db.query(User).filter(User.is_admin == True).count() # If no admins exist, bootstrap current user as admin if admin_count == 0: from backend.utils.password import hash_password # Generate unique email to avoid conflicts base_email = f"{username}@example.com" email = base_email counter = 1 # Ensure email is unique while db.query(User).filter(User.email == email).first(): email = f"{username}+{counter}@example.com" counter += 1 # Create user as admin for bootstrap (they should change password) default_password_hash = hash_password("changeme") try: user = User( username=username, password_hash=default_password_hash, email=email, full_name=username, is_active=True, is_admin=True, role=DEFAULT_ADMIN_ROLE, ) db.add(user) db.commit() db.refresh(user) is_admin = True except Exception: # If creation fails (e.g., race condition), try to get existing user db.rollback() user = db.query(User).filter(User.username == username).first() if user: # Update existing user to be admin if no admins exist if not user.is_admin: user.is_admin = True user.role = DEFAULT_ADMIN_ROLE db.commit() db.refresh(user) is_admin = user.is_admin else: is_admin = False else: is_admin = False else: is_admin = user.is_admin if user else False role_value = _resolve_user_role(user, is_admin) permissions_map = fetch_role_permissions_map(db) permissions = permissions_map.get(role_value, {}) return UserResponse( username=username, is_admin=is_admin, role=role_value, permissions=permissions, ) @router.post("/change-password", response_model=PasswordChangeResponse) def change_password( request: PasswordChangeRequest, current_user: Annotated[dict, Depends(get_current_user)], db: Session = Depends(get_db), ) -> PasswordChangeResponse: """Change user password. Requires current password verification. After successful change, clears password_change_required flag. """ username = current_user["username"] # Find user in main database user = db.query(User).filter(User.username == username).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Verify current password if not verify_password(request.current_password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Current password is incorrect", ) # Update password user.password_hash = hash_password(request.new_password) user.password_change_required = False # Clear the flag after password change db.add(user) db.commit() return PasswordChangeResponse( success=True, message="Password changed successfully", )