punimtag/backend/api/auth.py
tanyar09 7a981b069a
All checks were successful
CI / skip-ci-check (pull_request) Successful in 8s
CI / lint-and-type-check (pull_request) Successful in 1m12s
CI / python-lint (pull_request) Successful in 36s
CI / test-backend (pull_request) Successful in 3m47s
CI / build (pull_request) Successful in 3m28s
CI / secret-scanning (pull_request) Successful in 14s
CI / dependency-scan (pull_request) Successful in 13s
CI / sast-scan (pull_request) Successful in 1m33s
CI / workflow-summary (pull_request) Successful in 5s
feat: Enhance logging and error handling for job streaming and photo uploads
- Added new logging scripts for quick access to service logs and troubleshooting.
- Updated job streaming API to support authentication via query parameters for EventSource.
- Improved photo upload process to capture and validate EXIF dates and original modification times.
- Enhanced error handling for file uploads and EXIF extraction failures.
- Introduced new configuration options in ecosystem.config.js to prevent infinite crash loops.
2026-02-04 19:30:05 +00:00

430 lines
15 KiB
Python

"""Authentication endpoints."""
from __future__ import annotations
import os
import uuid
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from backend.constants.roles import (
DEFAULT_ADMIN_ROLE,
DEFAULT_USER_ROLE,
ROLE_VALUES,
)
from backend.db.session import get_db
from backend.db.models import User
from backend.utils.password import verify_password, hash_password
from backend.schemas.auth import (
LoginRequest,
RefreshRequest,
TokenResponse,
UserResponse,
PasswordChangeRequest,
PasswordChangeResponse,
)
from backend.services.role_permissions import fetch_role_permissions_map
router = APIRouter(prefix="/auth", tags=["auth"])
def get_bearer_token(request: Request) -> HTTPAuthorizationCredentials:
"""Custom security dependency that returns 401 for missing tokens (not 403).
This replaces HTTPBearer() to follow HTTP standards where missing authentication
should return 401 Unauthorized, not 403 Forbidden.
"""
authorization = request.headers.get("Authorization")
if not authorization:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
# Parse Authorization header: "Bearer <token>"
parts = authorization.split(" ", 1)
if len(parts) != 2:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme",
headers={"WWW-Authenticate": "Bearer"},
)
scheme, credentials = parts
if scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme",
headers={"WWW-Authenticate": "Bearer"},
)
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
# Read secrets from environment variables
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 360
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Single user mode placeholder - read from environment or use defaults
SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production
def create_access_token(data: dict, expires_delta: timedelta) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire, "jti": str(uuid.uuid4())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
"""Create JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh", "jti": str(uuid.uuid4())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user_from_token(token: str) -> dict:
"""Get current user from JWT token string (for query parameter auth).
Used for endpoints that need authentication but can't use headers
(e.g., EventSource/SSE endpoints).
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return {"username": username}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(get_bearer_token)]
) -> dict:
"""Get current user from JWT token."""
try:
payload = jwt.decode(
credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return {"username": username}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
def get_current_user_with_id(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> dict:
"""Get current user with ID from main database.
Looks up the user in the main database and returns username and user_id.
If user doesn't exist, creates them (for bootstrap scenarios).
"""
username = current_user["username"]
# Check if user exists in main database
user = db.query(User).filter(User.username == username).first()
# If user doesn't exist, create them (for bootstrap scenarios)
if not user:
from backend.utils.password import hash_password
# Generate unique email to avoid conflicts
base_email = f"{username}@example.com"
email = base_email
counter = 1
# Ensure email is unique
while db.query(User).filter(User.email == email).first():
email = f"{username}+{counter}@example.com"
counter += 1
# Create user (they should change password)
default_password_hash = hash_password("changeme")
user = User(
username=username,
password_hash=default_password_hash,
email=email,
full_name=username,
is_active=True,
is_admin=False,
role=DEFAULT_USER_ROLE,
)
db.add(user)
db.commit()
db.refresh(user)
return {"username": username, "user_id": user.id}
def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str:
"""Determine the role value for a user, ensuring it is valid."""
if user and user.role in ROLE_VALUES:
return user.role
return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE
@router.post("/login", response_model=TokenResponse)
def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse:
"""Authenticate user and return tokens.
First checks main database for users, falls back to hardcoded admin/admin
for backward compatibility.
"""
# First, try to find user in main database
user = db.query(User).filter(User.username == credentials.username).first()
if user:
# User exists in main database - verify password
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Account is inactive",
)
# Check if password_hash exists (migration might not have run)
if not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Password not set. Please contact administrator to set your password.",
)
if not verify_password(credentials.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
# Update last login
user.last_login = datetime.utcnow()
db.add(user)
db.commit()
# Generate tokens
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": credentials.username},
expires_delta=access_token_expires,
)
refresh_token = create_refresh_token(data={"sub": credentials.username})
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
password_change_required=user.password_change_required,
)
# Fallback to hardcoded admin/admin for backward compatibility
if (
credentials.username == SINGLE_USER_USERNAME
and credentials.password == SINGLE_USER_PASSWORD
):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": credentials.username},
expires_delta=access_token_expires,
)
refresh_token = create_refresh_token(data={"sub": credentials.username})
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
password_change_required=False, # Hardcoded admin doesn't require password change
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
@router.post("/refresh", response_model=TokenResponse)
def refresh_token(request: RefreshRequest) -> TokenResponse:
"""Refresh access token using refresh token."""
try:
payload = jwt.decode(
request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM]
)
if payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
)
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
new_refresh_token = create_refresh_token(data={"sub": username})
return TokenResponse(
access_token=access_token, refresh_token=new_refresh_token
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
@router.get("/me", response_model=UserResponse)
def get_current_user_info(
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> UserResponse:
"""Get current user information including admin status."""
username = current_user["username"]
# Check if user exists in main database to get admin status
user = db.query(User).filter(User.username == username).first()
# If user doesn't exist in main database, check if we should bootstrap them
if not user:
# Check if any admin users exist
admin_count = db.query(User).filter(User.is_admin == True).count()
# If no admins exist, bootstrap current user as admin
if admin_count == 0:
from backend.utils.password import hash_password
# Generate unique email to avoid conflicts
base_email = f"{username}@example.com"
email = base_email
counter = 1
# Ensure email is unique
while db.query(User).filter(User.email == email).first():
email = f"{username}+{counter}@example.com"
counter += 1
# Create user as admin for bootstrap (they should change password)
default_password_hash = hash_password("changeme")
try:
user = User(
username=username,
password_hash=default_password_hash,
email=email,
full_name=username,
is_active=True,
is_admin=True,
role=DEFAULT_ADMIN_ROLE,
)
db.add(user)
db.commit()
db.refresh(user)
is_admin = True
except Exception:
# If creation fails (e.g., race condition), try to get existing user
db.rollback()
user = db.query(User).filter(User.username == username).first()
if user:
# Update existing user to be admin if no admins exist
if not user.is_admin:
user.is_admin = True
user.role = DEFAULT_ADMIN_ROLE
db.commit()
db.refresh(user)
is_admin = user.is_admin
else:
is_admin = False
else:
is_admin = False
else:
is_admin = user.is_admin if user else False
role_value = _resolve_user_role(user, is_admin)
# Fetch permissions - if it fails, return empty permissions to avoid blocking login
try:
permissions_map = fetch_role_permissions_map(db)
permissions = permissions_map.get(role_value, {})
except Exception as e:
# If permissions fetch fails, return empty permissions to avoid blocking login
# Log the error but don't fail the request
import traceback
print(f"⚠️ Failed to fetch permissions for /me endpoint: {e}")
print(f" Traceback: {traceback.format_exc()}")
permissions = {}
return UserResponse(
username=username,
is_admin=is_admin,
role=role_value,
permissions=permissions,
)
@router.post("/change-password", response_model=PasswordChangeResponse)
def change_password(
request: PasswordChangeRequest,
current_user: Annotated[dict, Depends(get_current_user)],
db: Session = Depends(get_db),
) -> PasswordChangeResponse:
"""Change user password.
Requires current password verification.
After successful change, clears password_change_required flag.
"""
username = current_user["username"]
# Find user in main database
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Verify current password
if not verify_password(request.current_password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Current password is incorrect",
)
# Update password
user.password_hash = hash_password(request.new_password)
user.password_change_required = False # Clear the flag after password change
db.add(user)
db.commit()
return PasswordChangeResponse(
success=True,
message="Password changed successfully",
)