imetting_backend/app/api/endpoints/auth.py

135 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import hashlib
from typing import Union
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.core.auth import get_current_user
from app.core.database import get_db_connection
from app.models.models import LoginRequest, LoginResponse
from app.services.jwt_service import jwt_service
from app.core.response import create_api_response
security = HTTPBearer()
router = APIRouter()
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
@router.post("/auth/login")
def login(request: LoginRequest):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT user_id, username, caption, email, password_hash, role_id FROM users WHERE username = %s"
cursor.execute(query, (request.username,))
user = cursor.fetchone()
if not user:
return create_api_response(code="401", message="用户名或密码错误")
hashed_input = hash_password(request.password)
if user['password_hash'] != hashed_input:
return create_api_response(code="401", message="用户名或密码错误")
# 创建JWT token
token_data = {
"user_id": user['user_id'],
"username": user['username'],
"caption": user['caption'],
"role_id": user['role_id']
}
token = jwt_service.create_access_token(token_data)
login_response_data = LoginResponse(
user_id=user['user_id'],
username=user['username'],
caption=user['caption'],
email=user['email'],
token=token,
role_id=user['role_id']
)
return create_api_response(
code="200",
message="登录成功",
data=login_response_data.dict()
)
@router.post("/auth/logout")
def logout(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""登出接口撤销当前token"""
token = credentials.credentials
payload = jwt_service.verify_token(token)
if not payload:
return create_api_response(code="401", message="无效或过期的token")
user_id = payload.get("user_id")
if not user_id:
return create_api_response(code="401", message="无效的token payload")
revoked = jwt_service.revoke_token(token, user_id)
if revoked:
return create_api_response(code="200", message="登出成功")
else:
return create_api_response(code="400", message="已经登出或token未找到")
@router.post("/auth/logout-all")
def logout_all(current_user: dict = Depends(get_current_user)):
"""登出所有设备"""
user_id = current_user["user_id"]
revoked_count = jwt_service.revoke_all_user_tokens(user_id)
return create_api_response(code="200", message=f"{revoked_count} 个设备登出")
@router.post("/auth/admin/revoke-user-tokens/{user_id}")
def admin_revoke_user_tokens(
user_id: int, credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""管理员功能撤销指定用户的所有token"""
token = credentials.credentials
payload = jwt_service.verify_token(token)
if not payload:
return create_api_response(code="401", message="无效或过期的token")
admin_user_id = payload.get("user_id")
if not admin_user_id:
return create_api_response(code="401", message="无效的token payload")
# 这里可以添加管理员权限检查,目前暂时允许任何登录用户操作
# if payload.get('role_id') != ADMIN_ROLE_ID:
# return create_api_response(code="403", message="需要管理员权限")
revoked_count = jwt_service.revoke_all_user_tokens(user_id)
return create_api_response(
code="200", message=f"为用户 {user_id} 撤销了 {revoked_count} 个token"
)
@router.get("/auth/me")
def get_me(current_user: dict = Depends(get_current_user)):
"""获取当前用户信息"""
return create_api_response(code="200", message="获取用户信息成功", data=current_user)
@router.post("/auth/refresh")
def refresh_token(current_user: dict = Depends(get_current_user)):
"""刷新token"""
token_data = {
"user_id": current_user["user_id"],
"username": current_user["username"],
"caption": current_user["caption"],
"role_id": current_user["role_id"],
}
new_token = jwt_service.create_access_token(token_data)
return create_api_response(
code="200", message="Token刷新成功", data={"token": new_token}
)