imetting_backend/app/api/endpoints/dict_data.py

414 lines
12 KiB
Python

from fastapi import APIRouter, HTTPException, Depends
from app.core.database import get_db_connection
from app.core.auth import get_current_user, get_current_admin_user
from app.core.response import create_api_response
from pydantic import BaseModel
from typing import Optional, List
import json
router = APIRouter()
class DictDataItem(BaseModel):
"""码表数据项"""
id: int
dict_type: str
dict_code: str
parent_code: str
tree_path: Optional[str] = None
label_cn: str
label_en: Optional[str] = None
sort_order: int
extension_attr: Optional[dict] = None
is_default: int
status: int
create_time: str
class CreateDictDataRequest(BaseModel):
"""创建码表数据请求"""
dict_type: str = "client_platform"
dict_code: str
parent_code: str = "ROOT"
label_cn: str
label_en: Optional[str] = None
sort_order: int = 0
extension_attr: Optional[dict] = None
is_default: int = 0
status: int = 1
class UpdateDictDataRequest(BaseModel):
"""更新码表数据请求"""
parent_code: Optional[str] = None
label_cn: Optional[str] = None
label_en: Optional[str] = None
sort_order: Optional[int] = None
extension_attr: Optional[dict] = None
is_default: Optional[int] = None
status: Optional[int] = None
@router.get("/dict/types", response_model=dict)
async def get_dict_types():
"""
获取所有字典类型(公开接口)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT DISTINCT dict_type
FROM dict_data
WHERE status = 1
ORDER BY dict_type
"""
cursor.execute(query)
types = cursor.fetchall()
cursor.close()
return create_api_response(
code="200",
message="获取成功",
data={"types": [t['dict_type'] for t in types]}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取字典类型失败: {str(e)}"
)
@router.get("/dict/{dict_type}", response_model=dict)
async def get_dict_data_by_type(dict_type: str):
"""
获取指定类型的所有码表数据(公开接口)
支持树形结构
参数:
dict_type: 字典类型,如 'client_platform'
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time
FROM dict_data
WHERE dict_type = %s AND status = 1
ORDER BY parent_code, sort_order, dict_code
"""
cursor.execute(query, (dict_type,))
items = cursor.fetchall()
cursor.close()
# 处理JSON字段
for item in items:
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
# 构建树形结构
tree_data = []
nodes_map = {}
# 第一遍:创建所有节点
for item in items:
nodes_map[item['dict_code']] = {
**item,
'children': []
}
# 第二遍:构建树形关系
for item in items:
node = nodes_map[item['dict_code']]
parent_code = item['parent_code']
if parent_code == 'ROOT':
tree_data.append(node)
elif parent_code in nodes_map:
nodes_map[parent_code]['children'].append(node)
return create_api_response(
code="200",
message="获取成功",
data={
"items": items, # 平铺数据
"tree": tree_data # 树形数据
}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.get("/dict/{dict_type}/{dict_code}", response_model=dict)
async def get_dict_data_by_code(dict_type: str, dict_code: str):
"""
获取指定编码的码表数据(公开接口)
参数:
dict_type: 字典类型
dict_code: 字典编码
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time
FROM dict_data
WHERE dict_type = %s AND dict_code = %s
LIMIT 1
"""
cursor.execute(query, (dict_type, dict_code))
item = cursor.fetchone()
cursor.close()
if not item:
return create_api_response(
code="404",
message=f"未找到编码 {dict_code} 的数据"
)
# 处理JSON字段
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
return create_api_response(
code="200",
message="获取成功",
data=item
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.post("/dict", response_model=dict)
async def create_dict_data(
request: CreateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
创建码表数据(仅管理员)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查是否已存在
cursor.execute(
"SELECT id FROM dict_data WHERE dict_type = %s AND dict_code = %s",
(request.dict_type, request.dict_code)
)
if cursor.fetchone():
cursor.close()
return create_api_response(
code="400",
message=f"编码 {request.dict_code} 已存在"
)
# 插入数据
query = """
INSERT INTO dict_data (
dict_type, dict_code, parent_code, label_cn, label_en,
sort_order, extension_attr, is_default, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
extension_json = json.dumps(request.extension_attr) if request.extension_attr else None
cursor.execute(query, (
request.dict_type,
request.dict_code,
request.parent_code,
request.label_cn,
request.label_en,
request.sort_order,
extension_json,
request.is_default,
request.status
))
new_id = cursor.lastrowid
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="创建成功",
data={"id": new_id}
)
except Exception as e:
return create_api_response(
code="500",
message=f"创建码表数据失败: {str(e)}"
)
@router.put("/dict/{id}", response_model=dict)
async def update_dict_data(
id: int,
request: UpdateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
更新码表数据(仅管理员)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT * FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 构建更新语句
update_fields = []
params = []
if request.parent_code is not None:
update_fields.append("parent_code = %s")
params.append(request.parent_code)
if request.label_cn is not None:
update_fields.append("label_cn = %s")
params.append(request.label_cn)
if request.label_en is not None:
update_fields.append("label_en = %s")
params.append(request.label_en)
if request.sort_order is not None:
update_fields.append("sort_order = %s")
params.append(request.sort_order)
if request.extension_attr is not None:
update_fields.append("extension_attr = %s")
params.append(json.dumps(request.extension_attr))
if request.is_default is not None:
update_fields.append("is_default = %s")
params.append(request.is_default)
if request.status is not None:
update_fields.append("status = %s")
params.append(request.status)
if not update_fields:
cursor.close()
return create_api_response(
code="400",
message="没有要更新的字段"
)
# 执行更新
update_query = f"""
UPDATE dict_data
SET {', '.join(update_fields)}
WHERE id = %s
"""
params.append(id)
cursor.execute(update_query, params)
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="更新成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"更新码表数据失败: {str(e)}"
)
@router.delete("/dict/{id}", response_model=dict)
async def delete_dict_data(
id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
删除码表数据(仅管理员)
注意:如果有子节点或被引用,应该拒绝删除
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT dict_code FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 检查是否有子节点
cursor.execute(
"SELECT COUNT(*) as count FROM dict_data WHERE parent_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该节点存在子节点,无法删除"
)
# 检查是否被client_downloads引用
cursor.execute(
"SELECT COUNT(*) as count FROM client_downloads WHERE platform_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该平台编码已被客户端下载记录引用,无法删除"
)
# 执行删除
cursor.execute("DELETE FROM dict_data WHERE id = %s", (id,))
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="删除成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"删除码表数据失败: {str(e)}"
)