cosmo_backend/app/services/auth_deps.py

100 lines
2.6 KiB
Python

"""
Authentication dependencies for FastAPI
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.db import User, Role
from app.services.auth import decode_access_token
# HTTP Bearer token scheme
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""
Get current authenticated user from JWT token
Raises:
HTTPException: If token is invalid or user not found
"""
token = credentials.credentials
# Decode token
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user ID from token
user_id: Optional[int] = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Query user from database with roles
result = await db.execute(
select(User)
.options(selectinload(User.roles))
.where(User.id == int(user_id))
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Get current active user"""
return current_user
async def require_admin(
current_user: User = Depends(get_current_user)
) -> User:
"""
Require user to have admin role
Raises:
HTTPException: If user is not admin
"""
# Check if user has admin role
is_admin = any(role.name == "admin" for role in current_user.roles)
if not is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user