""" 角色权限管理 API """ from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from typing import List, Dict, Any from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user from app.models.user import User from app.models.role import Role from app.models.menu import SystemMenu, RoleMenu from app.schemas.response import success_response, error_response router = APIRouter() class RolePermissionUpdate(BaseModel): """角色权限更新请求""" menu_ids: List[int] def build_menu_tree(menus: List[SystemMenu], parent_id: int = 0) -> List[Dict[str, Any]]: """构建菜单树(包含所有菜单,用于权限分配)""" result = [] for menu in menus: if menu.parent_id == parent_id: menu_dict = { "id": menu.id, "parent_id": menu.parent_id, "menu_name": menu.menu_name, "menu_code": menu.menu_code, "menu_type": menu.menu_type, "path": menu.path, "component": menu.component, "icon": menu.icon, "sort_order": menu.sort_order, "visible": menu.visible, "status": menu.status, "permission": menu.permission, } # 递归构建子菜单 children = build_menu_tree(menus, menu.id) if children: menu_dict["children"] = children result.append(menu_dict) # 按 sort_order 排序 result.sort(key=lambda x: x.get("sort_order", 0)) return result @router.get("/roles", response_model=dict) async def get_all_roles( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取所有角色列表""" # 查询所有角色 result = await db.execute( select(Role).order_by(Role.created_at.desc()) ) roles = result.scalars().all() # 构建角色数据 roles_data = [] for role in roles: roles_data.append({ "id": role.id, "role_name": role.role_name, "role_code": role.role_code, "description": role.description, "status": role.status, "is_system": role.is_system, "created_at": role.created_at.isoformat() if role.created_at else None, "updated_at": role.updated_at.isoformat() if role.updated_at else None, }) return success_response(data=roles_data) @router.get("/menu-tree", response_model=dict) async def get_menu_tree( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取完整的菜单权限树(用于权限分配)""" # 获取所有菜单(包括禁用的,管理员需要看到所有菜单) result = await db.execute( select(SystemMenu).order_by(SystemMenu.sort_order) ) all_menus = result.scalars().all() # 构建菜单树 menu_tree = build_menu_tree(all_menus) return success_response(data=menu_tree) @router.get("/roles/{role_id}/permissions", response_model=dict) async def get_role_permissions( role_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """获取指定角色的权限菜单ID列表""" # 检查角色是否存在 role_result = await db.execute( select(Role).where(Role.id == role_id) ) role = role_result.scalar_one_or_none() if not role: raise HTTPException(status_code=404, detail="角色不存在") # 获取角色的菜单权限 permissions_result = await db.execute( select(RoleMenu.menu_id).where(RoleMenu.role_id == role_id) ) menu_ids = [row[0] for row in permissions_result.all()] return success_response(data={ "role_id": role_id, "role_name": role.role_name, "role_code": role.role_code, "menu_ids": menu_ids }) @router.put("/roles/{role_id}/permissions", response_model=dict) async def update_role_permissions( role_id: int, permission_update: RolePermissionUpdate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """更新角色的权限菜单""" # 检查角色是否存在 role_result = await db.execute( select(Role).where(Role.id == role_id) ) role = role_result.scalar_one_or_none() if not role: raise HTTPException(status_code=404, detail="角色不存在") # 检查是否是系统角色 if role.is_system == 1: raise HTTPException(status_code=400, detail="系统角色不允许修改权限") # 验证菜单ID是否都存在 if permission_update.menu_ids: menus_result = await db.execute( select(SystemMenu.id).where(SystemMenu.id.in_(permission_update.menu_ids)) ) valid_menu_ids = [row[0] for row in menus_result.all()] invalid_ids = set(permission_update.menu_ids) - set(valid_menu_ids) if invalid_ids: raise HTTPException( status_code=400, detail=f"以下菜单ID不存在: {', '.join(map(str, invalid_ids))}" ) # 删除原有权限 await db.execute( delete(RoleMenu).where(RoleMenu.role_id == role_id) ) # 添加新权限 if permission_update.menu_ids: for menu_id in permission_update.menu_ids: role_menu = RoleMenu( role_id=role_id, menu_id=menu_id ) db.add(role_menu) await db.commit() return success_response( data={ "role_id": role_id, "menu_ids": permission_update.menu_ids }, message="角色权限更新成功" )