imetting_backend/app/api/endpoints/admin.py

304 lines
12 KiB
Python

from fastapi import APIRouter, Depends
from app.core.auth import get_current_admin_user, get_current_user
from app.core.config import LLM_CONFIG, DEFAULT_RESET_PASSWORD, MAX_FILE_SIZE, VOICEPRINT_CONFIG, TIMELINE_PAGESIZE
from app.core.response import create_api_response
from app.core.database import get_db_connection
from app.models.models import MenuInfo, MenuListResponse, RolePermissionInfo, UpdateRolePermissionsRequest, RoleInfo
from pydantic import BaseModel
from typing import List
import json
from pathlib import Path
router = APIRouter()
# 配置文件路径
CONFIG_FILE = Path(__file__).parent.parent.parent.parent / "config" / "system_config.json"
class SystemConfigModel(BaseModel):
model_name: str
template_text: str
DEFAULT_RESET_PASSWORD: str
MAX_FILE_SIZE: int # 字节为单位
TIMELINE_PAGESIZE: int # 分页数量
def load_config_from_file():
"""从文件加载配置,如果文件不存在则返回默认配置"""
try:
if CONFIG_FILE.exists():
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
pass
# 返回默认配置
return {
'model_name': LLM_CONFIG['model_name'],
'template_text': VOICEPRINT_CONFIG['template_text'],
'DEFAULT_RESET_PASSWORD': DEFAULT_RESET_PASSWORD,
'MAX_FILE_SIZE': MAX_FILE_SIZE,
'TIMELINE_PAGESIZE': TIMELINE_PAGESIZE
}
def save_config_to_file(config_data):
"""将配置保存到文件"""
try:
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存配置文件失败: {e}")
return False
@router.get("/admin/system-config")
async def get_system_config(current_user=Depends(get_current_user)):
"""
获取系统配置
普通用户也可以获取
"""
try:
config = load_config_from_file()
response_data = {
'model_name': config.get('model_name', LLM_CONFIG['model_name']),
'template_text': config.get('template_text', VOICEPRINT_CONFIG['template_text']),
'DEFAULT_RESET_PASSWORD': config.get('DEFAULT_RESET_PASSWORD', DEFAULT_RESET_PASSWORD),
'MAX_FILE_SIZE': config.get('MAX_FILE_SIZE', MAX_FILE_SIZE),
'TIMELINE_PAGESIZE': config.get('TIMELINE_PAGESIZE', TIMELINE_PAGESIZE),
}
return create_api_response(code="200", message="配置获取成功", data=response_data)
except Exception as e:
return create_api_response(code="500", message=f"获取配置失败: {str(e)}")
@router.put("/admin/system-config")
async def update_system_config(
config: SystemConfigModel,
current_user=Depends(get_current_admin_user)
):
"""
更新系统配置
只有管理员才能访问
"""
try:
config_data = {
'model_name': config.model_name,
'template_text': config.template_text,
'DEFAULT_RESET_PASSWORD': config.DEFAULT_RESET_PASSWORD,
'MAX_FILE_SIZE': config.MAX_FILE_SIZE,
'TIMELINE_PAGESIZE': config.TIMELINE_PAGESIZE
}
if not save_config_to_file(config_data):
return create_api_response(code="500", message="配置保存到文件失败")
# 更新运行时配置
LLM_CONFIG['model_name'] = config.model_name
VOICEPRINT_CONFIG['template_text'] = config.template_text
import app.core.config as config_module
config_module.DEFAULT_RESET_PASSWORD = config.DEFAULT_RESET_PASSWORD
config_module.MAX_FILE_SIZE = config.MAX_FILE_SIZE
config_module.TIMELINE_PAGESIZE = config.TIMELINE_PAGESIZE
return create_api_response(
code="200",
message="配置更新成功,重启服务后完全生效",
data=config_data
)
except Exception as e:
return create_api_response(code="500", message=f"更新配置失败: {str(e)}")
# 在应用启动时加载配置
def load_system_config():
"""在应用启动时调用,加载保存的配置"""
try:
config = load_config_from_file()
LLM_CONFIG['model_name'] = config.get('model_name', LLM_CONFIG['model_name'])
VOICEPRINT_CONFIG['template_text'] = config.get('template_text', VOICEPRINT_CONFIG['template_text'])
import app.core.config as config_module
config_module.DEFAULT_RESET_PASSWORD = config.get('DEFAULT_RESET_PASSWORD', DEFAULT_RESET_PASSWORD)
config_module.MAX_FILE_SIZE = config.get('MAX_FILE_SIZE', MAX_FILE_SIZE)
config_module.TIMELINE_PAGESIZE = config.get('TIMELINE_PAGESIZE', TIMELINE_PAGESIZE)
print(f"系统配置加载成功: model={config.get('model_name')}, pagesize={config.get('TIMELINE_PAGESIZE')}")
except Exception as e:
print(f"加载系统配置失败,使用默认配置: {e}")
# ========== 菜单权限管理接口 ==========
@router.get("/admin/menus")
async def get_all_menus(current_user=Depends(get_current_admin_user)):
"""
获取所有菜单列表
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM menus
ORDER BY sort_order ASC, menu_id ASC
"""
cursor.execute(query)
menus = cursor.fetchall()
menu_list = [MenuInfo(**menu) for menu in menus]
return create_api_response(
code="200",
message="获取菜单列表成功",
data=MenuListResponse(menus=menu_list, total=len(menu_list))
)
except Exception as e:
return create_api_response(code="500", message=f"获取菜单列表失败: {str(e)}")
@router.get("/admin/roles")
async def get_all_roles(current_user=Depends(get_current_admin_user)):
"""
获取所有角色列表及其权限统计
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询所有角色及其权限数量
query = """
SELECT r.role_id, r.role_name, r.created_at,
COUNT(rmp.menu_id) as menu_count
FROM roles r
LEFT JOIN role_menu_permissions rmp ON r.role_id = rmp.role_id
GROUP BY r.role_id
ORDER BY r.role_id ASC
"""
cursor.execute(query)
roles = cursor.fetchall()
return create_api_response(
code="200",
message="获取角色列表成功",
data={"roles": roles, "total": len(roles)}
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色列表失败: {str(e)}")
@router.get("/admin/roles/{role_id}/permissions")
async def get_role_permissions(role_id: int, current_user=Depends(get_current_admin_user)):
"""
获取指定角色的菜单权限
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 检查角色是否存在
cursor.execute("SELECT role_id, role_name FROM roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
if not role:
return create_api_response(code="404", message="角色不存在")
# 查询该角色的所有菜单权限
query = """
SELECT menu_id
FROM role_menu_permissions
WHERE role_id = %s
"""
cursor.execute(query, (role_id,))
permissions = cursor.fetchall()
menu_ids = [p['menu_id'] for p in permissions]
return create_api_response(
code="200",
message="获取角色权限成功",
data=RolePermissionInfo(
role_id=role['role_id'],
role_name=role['role_name'],
menu_ids=menu_ids
)
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}")
@router.put("/admin/roles/{role_id}/permissions")
async def update_role_permissions(
role_id: int,
request: UpdateRolePermissionsRequest,
current_user=Depends(get_current_admin_user)
):
"""
更新指定角色的菜单权限
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 检查角色是否存在
cursor.execute("SELECT role_id FROM roles WHERE role_id = %s", (role_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="角色不存在")
# 验证所有menu_id是否有效
if request.menu_ids:
format_strings = ','.join(['%s'] * len(request.menu_ids))
cursor.execute(
f"SELECT COUNT(*) as count FROM menus WHERE menu_id IN ({format_strings})",
tuple(request.menu_ids)
)
valid_count = cursor.fetchone()['count']
if valid_count != len(request.menu_ids):
return create_api_response(code="400", message="包含无效的菜单ID")
# 删除该角色的所有现有权限
cursor.execute("DELETE FROM role_menu_permissions WHERE role_id = %s", (role_id,))
# 插入新的权限
if request.menu_ids:
insert_values = [(role_id, menu_id) for menu_id in request.menu_ids]
cursor.executemany(
"INSERT INTO role_menu_permissions (role_id, menu_id) VALUES (%s, %s)",
insert_values
)
connection.commit()
return create_api_response(
code="200",
message="更新角色权限成功",
data={"role_id": role_id, "menu_count": len(request.menu_ids)}
)
except Exception as e:
return create_api_response(code="500", message=f"更新角色权限失败: {str(e)}")
@router.get("/menus/user")
async def get_user_menus(current_user=Depends(get_current_user)):
"""
获取当前用户可访问的菜单列表(用于渲染下拉菜单)
所有登录用户都可以访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 根据用户的role_id查询可访问的菜单
query = """
SELECT DISTINCT m.menu_id, m.menu_code, m.menu_name, m.menu_icon,
m.menu_url, m.menu_type, m.sort_order
FROM menus m
JOIN role_menu_permissions rmp ON m.menu_id = rmp.menu_id
WHERE rmp.role_id = %s AND m.is_active = 1
ORDER BY m.sort_order ASC
"""
cursor.execute(query, (current_user['role_id'],))
menus = cursor.fetchall()
return create_api_response(
code="200",
message="获取用户菜单成功",
data={"menus": menus}
)
except Exception as e:
return create_api_response(code="500", message=f"获取用户菜单失败: {str(e)}")