""" 权限菜单相关 API """ from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import List, Dict, Any from app.core.database import get_db from app.core.deps import get_current_user from app.models.user import User from app.models.menu import SystemMenu, RoleMenu from app.models.role import UserRole from app.schemas.response import success_response router = APIRouter() def build_menu_tree(menus: List[SystemMenu], parent_id: int = 0) -> List[Dict[str, Any]]: """构建菜单树""" result = [] for menu in menus: if menu.parent_id == parent_id: menu_dict = { "id": menu.id, "menu_name": menu.menu_name, "menu_code": menu.menu_code, "menu_type": menu.menu_type, "path": menu.path, "component": menu.component, "icon": menu.icon, "sort_order": menu.sort_order, "visible": menu.visible, "permission": menu.permission, } # 递归构建子菜单 children = build_menu_tree(menus, menu.id) if children: menu_dict["children"] = children result.append(menu_dict) # 按 sort_order 排序 result.sort(key=lambda x: x.get("sort_order", 0)) return result @router.get("/user-menus", response_model=dict) async def get_user_menus( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取当前用户的权限菜单""" # 获取用户的角色 user_roles_result = await db.execute( select(UserRole.role_id).where(UserRole.user_id == current_user.id) ) role_ids = [row[0] for row in user_roles_result.all()] if not role_ids: return success_response(data=[]) # 获取角色的菜单权限 role_menus_result = await db.execute( select(RoleMenu.menu_id).where(RoleMenu.role_id.in_(role_ids)) ) menu_ids = list(set([row[0] for row in role_menus_result.all()])) if not menu_ids: return success_response(data=[]) # 获取菜单详情 menus_result = await db.execute( select(SystemMenu) .where(SystemMenu.id.in_(menu_ids)) .where(SystemMenu.status == 1) .where(SystemMenu.visible == 1) .order_by(SystemMenu.sort_order) ) user_menus = menus_result.scalars().all() # 构建菜单树 menu_tree = build_menu_tree(user_menus) return success_response(data=menu_tree) @router.get("/user-permissions", response_model=dict) async def get_user_permissions( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取当前用户的权限列表""" # 获取用户的角色 user_roles_result = await db.execute( select(UserRole.role_id).where(UserRole.user_id == current_user.id) ) role_ids = [row[0] for row in user_roles_result.all()] if not role_ids: return success_response(data=[]) # 获取角色的菜单权限 role_menus_result = await db.execute( select(RoleMenu.menu_id).where(RoleMenu.role_id.in_(role_ids)) ) menu_ids = list(set([row[0] for row in role_menus_result.all()])) if not menu_ids: return success_response(data=[]) # 获取权限字符串 permissions_result = await db.execute( select(SystemMenu.permission) .where(SystemMenu.id.in_(menu_ids)) .where(SystemMenu.status == 1) .where(SystemMenu.permission.isnot(None)) ) permissions = [row[0] for row in permissions_result.all()] return success_response(data=permissions)