""" Authentication dependencies for FastAPI """ from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from app.database import get_db from app.models.db import User, Role from app.services.auth import decode_access_token # HTTP Bearer token scheme security = HTTPBearer() async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db) ) -> User: """ Get current authenticated user from JWT token Raises: HTTPException: If token is invalid or user not found """ token = credentials.credentials # Decode token payload = decode_access_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Get user ID from token user_id: Optional[int] = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Query user from database with roles result = await db.execute( select(User) .options(selectinload(User.roles)) .where(User.id == int(user_id)) ) user = result.scalar_one_or_none() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return user async def get_current_active_user( current_user: User = Depends(get_current_user) ) -> User: """Get current active user""" return current_user async def require_admin( current_user: User = Depends(get_current_user) ) -> User: """ Require user to have admin role Raises: HTTPException: If user is not admin """ # Check if user has admin role is_admin = any(role.name == "admin" for role in current_user.roles) if not is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user