All checks were successful
CI / skip-ci-check (pull_request) Successful in 1m29s
CI / lint-and-type-check (pull_request) Successful in 2m6s
CI / python-lint (pull_request) Successful in 1m47s
CI / test-backend (pull_request) Successful in 3m8s
CI / build (pull_request) Successful in 2m26s
CI / secret-scanning (pull_request) Successful in 1m43s
CI / dependency-scan (pull_request) Successful in 1m35s
CI / sast-scan (pull_request) Successful in 2m46s
CI / workflow-summary (pull_request) Successful in 1m27s
This commit updates the CI workflow summary to provide a clearer overview of job results and their purposes. It also modifies the JWT token generation in the authentication API to include a unique identifier (`jti`) for both access and refresh tokens, improving token management. Additionally, the test for the token refresh endpoint is adjusted to ensure it verifies the new access token correctly.
399 lines
14 KiB
Python
399 lines
14 KiB
Python
"""Authentication endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from fastapi.security import HTTPAuthorizationCredentials
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.constants.roles import (
|
|
DEFAULT_ADMIN_ROLE,
|
|
DEFAULT_USER_ROLE,
|
|
ROLE_VALUES,
|
|
)
|
|
from backend.db.session import get_db
|
|
from backend.db.models import User
|
|
from backend.utils.password import verify_password, hash_password
|
|
from backend.schemas.auth import (
|
|
LoginRequest,
|
|
RefreshRequest,
|
|
TokenResponse,
|
|
UserResponse,
|
|
PasswordChangeRequest,
|
|
PasswordChangeResponse,
|
|
)
|
|
from backend.services.role_permissions import fetch_role_permissions_map
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
|
|
def get_bearer_token(request: Request) -> HTTPAuthorizationCredentials:
|
|
"""Custom security dependency that returns 401 for missing tokens (not 403).
|
|
|
|
This replaces HTTPBearer() to follow HTTP standards where missing authentication
|
|
should return 401 Unauthorized, not 403 Forbidden.
|
|
"""
|
|
authorization = request.headers.get("Authorization")
|
|
if not authorization:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Parse Authorization header: "Bearer <token>"
|
|
parts = authorization.split(" ", 1)
|
|
if len(parts) != 2:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication scheme",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
scheme, credentials = parts
|
|
if scheme.lower() != "bearer":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication scheme",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
|
|
|
# Read secrets from environment variables
|
|
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 360
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
# Single user mode placeholder - read from environment or use defaults
|
|
SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
|
|
SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta) -> str:
|
|
"""Create JWT access token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + expires_delta
|
|
to_encode.update({"exp": expire, "jti": str(uuid.uuid4())})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
"""Create JWT refresh token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
to_encode.update({"exp": expire, "type": "refresh", "jti": str(uuid.uuid4())})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(get_bearer_token)]
|
|
) -> dict:
|
|
"""Get current user from JWT token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
return {"username": username}
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
|
|
|
|
def get_current_user_with_id(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Get current user with ID from main database.
|
|
|
|
Looks up the user in the main database and returns username and user_id.
|
|
If user doesn't exist, creates them (for bootstrap scenarios).
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist, create them (for bootstrap scenarios)
|
|
if not user:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=False,
|
|
role=DEFAULT_USER_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return {"username": username, "user_id": user.id}
|
|
|
|
|
|
def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str:
|
|
"""Determine the role value for a user, ensuring it is valid."""
|
|
if user and user.role in ROLE_VALUES:
|
|
return user.role
|
|
return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse:
|
|
"""Authenticate user and return tokens.
|
|
|
|
First checks main database for users, falls back to hardcoded admin/admin
|
|
for backward compatibility.
|
|
"""
|
|
# First, try to find user in main database
|
|
user = db.query(User).filter(User.username == credentials.username).first()
|
|
|
|
if user:
|
|
# User exists in main database - verify password
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Account is inactive",
|
|
)
|
|
|
|
# Check if password_hash exists (migration might not have run)
|
|
if not user.password_hash:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Password not set. Please contact administrator to set your password.",
|
|
)
|
|
|
|
if not verify_password(credentials.password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
# Update last login
|
|
user.last_login = datetime.utcnow()
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
# Generate tokens
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=user.password_change_required,
|
|
)
|
|
|
|
# Fallback to hardcoded admin/admin for backward compatibility
|
|
if (
|
|
credentials.username == SINGLE_USER_USERNAME
|
|
and credentials.password == SINGLE_USER_PASSWORD
|
|
):
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=False, # Hardcoded admin doesn't require password change
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh_token(request: RefreshRequest) -> TokenResponse:
|
|
"""Refresh access token using refresh token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
if payload.get("type") != "refresh":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token type",
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
)
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": username}, expires_delta=access_token_expires
|
|
)
|
|
new_refresh_token = create_refresh_token(data={"sub": username})
|
|
return TokenResponse(
|
|
access_token=access_token, refresh_token=new_refresh_token
|
|
)
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid refresh token",
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def get_current_user_info(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> UserResponse:
|
|
"""Get current user information including admin status."""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database to get admin status
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist in main database, check if we should bootstrap them
|
|
if not user:
|
|
# Check if any admin users exist
|
|
admin_count = db.query(User).filter(User.is_admin == True).count()
|
|
|
|
# If no admins exist, bootstrap current user as admin
|
|
if admin_count == 0:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user as admin for bootstrap (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
try:
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=True,
|
|
role=DEFAULT_ADMIN_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = True
|
|
except Exception:
|
|
# If creation fails (e.g., race condition), try to get existing user
|
|
db.rollback()
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if user:
|
|
# Update existing user to be admin if no admins exist
|
|
if not user.is_admin:
|
|
user.is_admin = True
|
|
user.role = DEFAULT_ADMIN_ROLE
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = user.is_admin
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = user.is_admin if user else False
|
|
|
|
role_value = _resolve_user_role(user, is_admin)
|
|
permissions_map = fetch_role_permissions_map(db)
|
|
permissions = permissions_map.get(role_value, {})
|
|
|
|
return UserResponse(
|
|
username=username,
|
|
is_admin=is_admin,
|
|
role=role_value,
|
|
permissions=permissions,
|
|
)
|
|
|
|
|
|
@router.post("/change-password", response_model=PasswordChangeResponse)
|
|
def change_password(
|
|
request: PasswordChangeRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> PasswordChangeResponse:
|
|
"""Change user password.
|
|
|
|
Requires current password verification.
|
|
After successful change, clears password_change_required flag.
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Find user in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
|
|
# Verify current password
|
|
if not verify_password(request.current_password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Current password is incorrect",
|
|
)
|
|
|
|
# Update password
|
|
user.password_hash = hash_password(request.new_password)
|
|
user.password_change_required = False # Clear the flag after password change
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
return PasswordChangeResponse(
|
|
success=True,
|
|
message="Password changed successfully",
|
|
)
|
|
|
|
|
|
|