All checks were successful
CI / skip-ci-check (pull_request) Successful in 8s
CI / lint-and-type-check (pull_request) Successful in 1m12s
CI / python-lint (pull_request) Successful in 36s
CI / test-backend (pull_request) Successful in 3m47s
CI / build (pull_request) Successful in 3m28s
CI / secret-scanning (pull_request) Successful in 14s
CI / dependency-scan (pull_request) Successful in 13s
CI / sast-scan (pull_request) Successful in 1m33s
CI / workflow-summary (pull_request) Successful in 5s
- Added new logging scripts for quick access to service logs and troubleshooting. - Updated job streaming API to support authentication via query parameters for EventSource. - Improved photo upload process to capture and validate EXIF dates and original modification times. - Enhanced error handling for file uploads and EXIF extraction failures. - Introduced new configuration options in ecosystem.config.js to prevent infinite crash loops.
430 lines
15 KiB
Python
430 lines
15 KiB
Python
"""Authentication endpoints."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from fastapi.security import HTTPAuthorizationCredentials
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.constants.roles import (
|
|
DEFAULT_ADMIN_ROLE,
|
|
DEFAULT_USER_ROLE,
|
|
ROLE_VALUES,
|
|
)
|
|
from backend.db.session import get_db
|
|
from backend.db.models import User
|
|
from backend.utils.password import verify_password, hash_password
|
|
from backend.schemas.auth import (
|
|
LoginRequest,
|
|
RefreshRequest,
|
|
TokenResponse,
|
|
UserResponse,
|
|
PasswordChangeRequest,
|
|
PasswordChangeResponse,
|
|
)
|
|
from backend.services.role_permissions import fetch_role_permissions_map
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
|
|
def get_bearer_token(request: Request) -> HTTPAuthorizationCredentials:
|
|
"""Custom security dependency that returns 401 for missing tokens (not 403).
|
|
|
|
This replaces HTTPBearer() to follow HTTP standards where missing authentication
|
|
should return 401 Unauthorized, not 403 Forbidden.
|
|
"""
|
|
authorization = request.headers.get("Authorization")
|
|
if not authorization:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Not authenticated",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Parse Authorization header: "Bearer <token>"
|
|
parts = authorization.split(" ", 1)
|
|
if len(parts) != 2:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication scheme",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
scheme, credentials = parts
|
|
if scheme.lower() != "bearer":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication scheme",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
|
|
|
# Read secrets from environment variables
|
|
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 360
|
|
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
|
|
|
# Single user mode placeholder - read from environment or use defaults
|
|
SINGLE_USER_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
|
|
SINGLE_USER_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") # Change in production
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta) -> str:
|
|
"""Create JWT access token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + expires_delta
|
|
to_encode.update({"exp": expire, "jti": str(uuid.uuid4())})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
"""Create JWT refresh token."""
|
|
to_encode = data.copy()
|
|
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
|
|
to_encode.update({"exp": expire, "type": "refresh", "jti": str(uuid.uuid4())})
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def get_current_user_from_token(token: str) -> dict:
|
|
"""Get current user from JWT token string (for query parameter auth).
|
|
|
|
Used for endpoints that need authentication but can't use headers
|
|
(e.g., EventSource/SSE endpoints).
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
return {"username": username}
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
|
|
|
|
def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(get_bearer_token)]
|
|
) -> dict:
|
|
"""Get current user from JWT token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
return {"username": username}
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
)
|
|
|
|
|
|
def get_current_user_with_id(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> dict:
|
|
"""Get current user with ID from main database.
|
|
|
|
Looks up the user in the main database and returns username and user_id.
|
|
If user doesn't exist, creates them (for bootstrap scenarios).
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist, create them (for bootstrap scenarios)
|
|
if not user:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=False,
|
|
role=DEFAULT_USER_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return {"username": username, "user_id": user.id}
|
|
|
|
|
|
def _resolve_user_role(user: User | None, is_admin_flag: bool) -> str:
|
|
"""Determine the role value for a user, ensuring it is valid."""
|
|
if user and user.role in ROLE_VALUES:
|
|
return user.role
|
|
return DEFAULT_ADMIN_ROLE if is_admin_flag else DEFAULT_USER_ROLE
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(credentials: LoginRequest, db: Session = Depends(get_db)) -> TokenResponse:
|
|
"""Authenticate user and return tokens.
|
|
|
|
First checks main database for users, falls back to hardcoded admin/admin
|
|
for backward compatibility.
|
|
"""
|
|
# First, try to find user in main database
|
|
user = db.query(User).filter(User.username == credentials.username).first()
|
|
|
|
if user:
|
|
# User exists in main database - verify password
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Account is inactive",
|
|
)
|
|
|
|
# Check if password_hash exists (migration might not have run)
|
|
if not user.password_hash:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Password not set. Please contact administrator to set your password.",
|
|
)
|
|
|
|
if not verify_password(credentials.password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
# Update last login
|
|
user.last_login = datetime.utcnow()
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
# Generate tokens
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=user.password_change_required,
|
|
)
|
|
|
|
# Fallback to hardcoded admin/admin for backward compatibility
|
|
if (
|
|
credentials.username == SINGLE_USER_USERNAME
|
|
and credentials.password == SINGLE_USER_PASSWORD
|
|
):
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": credentials.username},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
refresh_token = create_refresh_token(data={"sub": credentials.username})
|
|
return TokenResponse(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token,
|
|
password_change_required=False, # Hardcoded admin doesn't require password change
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh_token(request: RefreshRequest) -> TokenResponse:
|
|
"""Refresh access token using refresh token."""
|
|
try:
|
|
payload = jwt.decode(
|
|
request.refresh_token, SECRET_KEY, algorithms=[ALGORITHM]
|
|
)
|
|
if payload.get("type") != "refresh":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token type",
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
)
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": username}, expires_delta=access_token_expires
|
|
)
|
|
new_refresh_token = create_refresh_token(data={"sub": username})
|
|
return TokenResponse(
|
|
access_token=access_token, refresh_token=new_refresh_token
|
|
)
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid refresh token",
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def get_current_user_info(
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> UserResponse:
|
|
"""Get current user information including admin status."""
|
|
username = current_user["username"]
|
|
|
|
# Check if user exists in main database to get admin status
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
# If user doesn't exist in main database, check if we should bootstrap them
|
|
if not user:
|
|
# Check if any admin users exist
|
|
admin_count = db.query(User).filter(User.is_admin == True).count()
|
|
|
|
# If no admins exist, bootstrap current user as admin
|
|
if admin_count == 0:
|
|
from backend.utils.password import hash_password
|
|
|
|
# Generate unique email to avoid conflicts
|
|
base_email = f"{username}@example.com"
|
|
email = base_email
|
|
counter = 1
|
|
# Ensure email is unique
|
|
while db.query(User).filter(User.email == email).first():
|
|
email = f"{username}+{counter}@example.com"
|
|
counter += 1
|
|
|
|
# Create user as admin for bootstrap (they should change password)
|
|
default_password_hash = hash_password("changeme")
|
|
try:
|
|
user = User(
|
|
username=username,
|
|
password_hash=default_password_hash,
|
|
email=email,
|
|
full_name=username,
|
|
is_active=True,
|
|
is_admin=True,
|
|
role=DEFAULT_ADMIN_ROLE,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = True
|
|
except Exception:
|
|
# If creation fails (e.g., race condition), try to get existing user
|
|
db.rollback()
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if user:
|
|
# Update existing user to be admin if no admins exist
|
|
if not user.is_admin:
|
|
user.is_admin = True
|
|
user.role = DEFAULT_ADMIN_ROLE
|
|
db.commit()
|
|
db.refresh(user)
|
|
is_admin = user.is_admin
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = False
|
|
else:
|
|
is_admin = user.is_admin if user else False
|
|
|
|
role_value = _resolve_user_role(user, is_admin)
|
|
|
|
# Fetch permissions - if it fails, return empty permissions to avoid blocking login
|
|
try:
|
|
permissions_map = fetch_role_permissions_map(db)
|
|
permissions = permissions_map.get(role_value, {})
|
|
except Exception as e:
|
|
# If permissions fetch fails, return empty permissions to avoid blocking login
|
|
# Log the error but don't fail the request
|
|
import traceback
|
|
print(f"⚠️ Failed to fetch permissions for /me endpoint: {e}")
|
|
print(f" Traceback: {traceback.format_exc()}")
|
|
permissions = {}
|
|
return UserResponse(
|
|
username=username,
|
|
is_admin=is_admin,
|
|
role=role_value,
|
|
permissions=permissions,
|
|
)
|
|
|
|
|
|
@router.post("/change-password", response_model=PasswordChangeResponse)
|
|
def change_password(
|
|
request: PasswordChangeRequest,
|
|
current_user: Annotated[dict, Depends(get_current_user)],
|
|
db: Session = Depends(get_db),
|
|
) -> PasswordChangeResponse:
|
|
"""Change user password.
|
|
|
|
Requires current password verification.
|
|
After successful change, clears password_change_required flag.
|
|
"""
|
|
username = current_user["username"]
|
|
|
|
# Find user in main database
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
|
|
# Verify current password
|
|
if not verify_password(request.current_password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Current password is incorrect",
|
|
)
|
|
|
|
# Update password
|
|
user.password_hash = hash_password(request.new_password)
|
|
user.password_change_required = False # Clear the flag after password change
|
|
db.add(user)
|
|
db.commit()
|
|
|
|
return PasswordChangeResponse(
|
|
success=True,
|
|
message="Password changed successfully",
|
|
)
|
|
|
|
|
|
|