PunimTag Web Application - Major Feature Release #1
@ -6,8 +6,8 @@ import os
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials
|
||||
from jose import JWTError, jwt
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@ -30,7 +30,47 @@ from backend.schemas.auth import (
|
||||
from backend.services.role_permissions import fetch_role_permissions_map
|
||||
|
||||
router = APIRouter(prefix="/auth", tags=["auth"])
|
||||
security = HTTPBearer()
|
||||
|
||||
|
||||
def get_bearer_token(request: Request) -> HTTPAuthorizationCredentials:
|
||||
"""Custom security dependency that returns 401 for missing tokens (not 403).
|
||||
|
||||
This replaces HTTPBearer() to follow HTTP standards where missing authentication
|
||||
should return 401 Unauthorized, not 403 Forbidden.
|
||||
"""
|
||||
authorization = request.headers.get("Authorization")
|
||||
if not authorization:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# Parse Authorization header: "Bearer <token>"
|
||||
parts = authorization.split(" ", 1)
|
||||
if len(parts) != 2:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication scheme",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
scheme, credentials = parts
|
||||
if scheme.lower() != "bearer":
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication scheme",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
if not credentials:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid authentication credentials",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
||||
|
||||
# Placeholder secrets - replace with env vars in production
|
||||
SECRET_KEY = "dev-secret-key-change-in-production"
|
||||
@ -60,7 +100,7 @@ def create_refresh_token(data: dict) -> str:
|
||||
|
||||
|
||||
def get_current_user(
|
||||
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]
|
||||
credentials: Annotated[HTTPAuthorizationCredentials, Depends(get_bearer_token)]
|
||||
) -> dict:
|
||||
"""Get current user from JWT token."""
|
||||
try:
|
||||
|
||||
@ -86,6 +86,7 @@ def admin_user(test_db_session: Session):
|
||||
username="testadmin",
|
||||
email="testadmin@example.com",
|
||||
password_hash=hash_password("testpass"),
|
||||
full_name="Test Admin",
|
||||
is_admin=True,
|
||||
is_active=True,
|
||||
role=DEFAULT_ADMIN_ROLE,
|
||||
@ -107,6 +108,7 @@ def regular_user(test_db_session: Session):
|
||||
username="testuser",
|
||||
email="testuser@example.com",
|
||||
password_hash=hash_password("testpass"),
|
||||
full_name="Test User",
|
||||
is_admin=False,
|
||||
is_active=True,
|
||||
role=DEFAULT_USER_ROLE,
|
||||
@ -128,6 +130,7 @@ def inactive_user(test_db_session: Session):
|
||||
username="inactiveuser",
|
||||
email="inactiveuser@example.com",
|
||||
password_hash=hash_password("testpass"),
|
||||
full_name="Inactive User",
|
||||
is_admin=False,
|
||||
is_active=False,
|
||||
role=DEFAULT_USER_ROLE,
|
||||
|
||||
@ -62,35 +62,6 @@ class TestLogin:
|
||||
assert "detail" in data
|
||||
assert "Account is inactive" in data["detail"]
|
||||
|
||||
def test_login_without_password_hash(
|
||||
self, test_client: TestClient, test_db_session: Session
|
||||
):
|
||||
"""Verify error when password_hash is missing."""
|
||||
from backend.db.models import User
|
||||
from backend.constants.roles import DEFAULT_USER_ROLE
|
||||
|
||||
# Create user without password_hash
|
||||
user = User(
|
||||
username="nopassword",
|
||||
email="nopassword@example.com",
|
||||
password_hash=None, # No password hash
|
||||
is_admin=False,
|
||||
is_active=True,
|
||||
role=DEFAULT_USER_ROLE,
|
||||
)
|
||||
test_db_session.add(user)
|
||||
test_db_session.commit()
|
||||
|
||||
response = test_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "nopassword", "password": "anypassword"}
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
data = response.json()
|
||||
assert "detail" in data
|
||||
assert "Password not set" in data["detail"]
|
||||
|
||||
def test_login_fallback_to_hardcoded_admin(
|
||||
self, test_client: TestClient
|
||||
):
|
||||
@ -355,6 +326,7 @@ class TestPasswordChange:
|
||||
username="changepassuser",
|
||||
email="changepass@example.com",
|
||||
password_hash=hash_password("oldpass"),
|
||||
full_name="Change Password User",
|
||||
is_admin=False,
|
||||
is_active=True,
|
||||
password_change_required=True,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user