nex_docus/backend/app/api/v1/menu.py

125 lines
3.7 KiB
Python

"""
权限菜单相关 API
"""
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Dict, Any
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import User
from app.models.menu import SystemMenu, RoleMenu
from app.models.role import UserRole
from app.schemas.response import success_response
router = APIRouter()
def build_menu_tree(menus: List[SystemMenu], parent_id: int = 0) -> List[Dict[str, Any]]:
"""构建菜单树"""
result = []
for menu in menus:
if menu.parent_id == parent_id:
menu_dict = {
"id": menu.id,
"menu_name": menu.menu_name,
"menu_code": menu.menu_code,
"menu_type": menu.menu_type,
"path": menu.path,
"component": menu.component,
"icon": menu.icon,
"sort_order": menu.sort_order,
"visible": menu.visible,
"permission": menu.permission,
}
# 递归构建子菜单
children = build_menu_tree(menus, menu.id)
if children:
menu_dict["children"] = children
result.append(menu_dict)
# 按 sort_order 排序
result.sort(key=lambda x: x.get("sort_order", 0))
return result
@router.get("/user-menus", response_model=dict)
async def get_user_menus(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取当前用户的权限菜单"""
# 获取用户的角色
user_roles_result = await db.execute(
select(UserRole.role_id).where(UserRole.user_id == current_user.id)
)
role_ids = [row[0] for row in user_roles_result.all()]
if not role_ids:
return success_response(data=[])
# 获取角色的菜单权限
role_menus_result = await db.execute(
select(RoleMenu.menu_id).where(RoleMenu.role_id.in_(role_ids))
)
menu_ids = list(set([row[0] for row in role_menus_result.all()]))
if not menu_ids:
return success_response(data=[])
# 获取菜单详情
menus_result = await db.execute(
select(SystemMenu)
.where(SystemMenu.id.in_(menu_ids))
.where(SystemMenu.status == 1)
.where(SystemMenu.visible == 1)
.order_by(SystemMenu.sort_order)
)
user_menus = menus_result.scalars().all()
# 构建菜单树
menu_tree = build_menu_tree(user_menus)
return success_response(data=menu_tree)
@router.get("/user-permissions", response_model=dict)
async def get_user_permissions(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取当前用户的权限列表"""
# 获取用户的角色
user_roles_result = await db.execute(
select(UserRole.role_id).where(UserRole.user_id == current_user.id)
)
role_ids = [row[0] for row in user_roles_result.all()]
if not role_ids:
return success_response(data=[])
# 获取角色的菜单权限
role_menus_result = await db.execute(
select(RoleMenu.menu_id).where(RoleMenu.role_id.in_(role_ids))
)
menu_ids = list(set([row[0] for row in role_menus_result.all()]))
if not menu_ids:
return success_response(data=[])
# 获取权限字符串
permissions_result = await db.execute(
select(SystemMenu.permission)
.where(SystemMenu.id.in_(menu_ids))
.where(SystemMenu.status == 1)
.where(SystemMenu.permission.isnot(None))
)
permissions = [row[0] for row in permissions_result.all()]
return success_response(data=permissions)