nex_basse/backend/app/api/v1/endpoints/auth.py

110 lines
3.6 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.orm import Session
from redis import Redis
from app.core.db import get_db
from app.core.redis import get_redis
from app.schemas.auth import LoginRequest, TokenPair, RefreshRequest, LogoutRequest
from app.services.auth_service import (
authenticate_user,
create_token_pair,
refresh_access_token,
logout_refresh_token,
)
from app.services.log_service import create_log
from app.core.security import decode_token
from app.models import User
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login", response_model=TokenPair)
def login(
payload: LoginRequest,
request: Request,
db: Session = Depends(get_db),
redis: Redis = Depends(get_redis)
):
user = authenticate_user(db, payload.username, payload.password)
if not user:
# Optionally log failed login attempt here
create_log(
db=db,
user_id=None,
username=payload.username,
operation_type="LOGIN",
resource_type="auth",
detail=f"Failed login attempt for user: {payload.username}",
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
status=0,
error_message="Invalid credentials"
)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
access_token, refresh_token = create_token_pair(db, redis, user)
create_log(
db=db,
user_id=user.user_id,
username=user.username,
operation_type="LOGIN",
resource_type="auth",
detail="User logged in successfully",
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
status=1
)
return TokenPair(access_token=access_token, refresh_token=refresh_token)
@router.post("/refresh", response_model=TokenPair)
def refresh(payload: RefreshRequest, db: Session = Depends(get_db), redis: Redis = Depends(get_redis)):
try:
access_token, refresh_token = refresh_access_token(db, redis, payload.refresh_token)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc
return TokenPair(access_token=access_token, refresh_token=refresh_token)
@router.post("/logout")
def logout(
payload: LogoutRequest,
request: Request,
redis: Redis = Depends(get_redis),
db: Session = Depends(get_db)
):
user_id = None
username = None
# Try to extract user info for logging before invalidating token
try:
token_payload = decode_token(payload.refresh_token)
if token_payload and "sub" in token_payload:
user_id = int(token_payload["sub"])
user = db.query(User).filter(User.user_id == user_id).first()
if user:
username = user.username
except:
pass
try:
logout_refresh_token(redis, payload.refresh_token)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc
if user_id:
create_log(
db=db,
user_id=user_id,
username=username,
operation_type="LOGOUT",
resource_type="auth",
detail="User logged out",
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
status=1
)
return {"status": "ok"}