imetting_backend/app/api/endpoints/dict_data.py

436 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import APIRouter, HTTPException, Depends
from app.core.database import get_db_connection
from app.core.auth import get_current_user, get_current_admin_user
from app.core.response import create_api_response
from pydantic import BaseModel
from typing import Optional, List
import json
router = APIRouter()
class DictDataItem(BaseModel):
"""码表数据项"""
id: int
dict_type: str
dict_code: str
parent_code: str
tree_path: Optional[str] = None
label_cn: str
label_en: Optional[str] = None
sort_order: int
extension_attr: Optional[dict] = None
is_default: int
status: int
create_time: str
class CreateDictDataRequest(BaseModel):
"""创建码表数据请求"""
dict_type: str = "client_platform"
dict_code: str
parent_code: str = "ROOT"
label_cn: str
label_en: Optional[str] = None
sort_order: int = 0
extension_attr: Optional[dict] = None
is_default: int = 0
status: int = 1
class UpdateDictDataRequest(BaseModel):
"""更新码表数据请求"""
parent_code: Optional[str] = None
label_cn: Optional[str] = None
label_en: Optional[str] = None
sort_order: Optional[int] = None
extension_attr: Optional[dict] = None
is_default: Optional[int] = None
status: Optional[int] = None
@router.get("/dict/types", response_model=dict)
async def get_dict_types():
"""
获取所有字典类型(公开接口)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT DISTINCT dict_type
FROM dict_data
WHERE status = 1
ORDER BY dict_type
"""
cursor.execute(query)
types = cursor.fetchall()
cursor.close()
return create_api_response(
code="200",
message="获取成功",
data={"types": [t['dict_type'] for t in types]}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取字典类型失败: {str(e)}"
)
@router.get("/dict/{dict_type}", response_model=dict)
async def get_dict_data_by_type(dict_type: str):
"""
获取指定类型的所有码表数据(公开接口)
支持树形结构
参数:
dict_type: 字典类型,如 'client_platform'
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time
FROM dict_data
WHERE dict_type = %s AND status = 1
ORDER BY parent_code, sort_order, dict_code
"""
cursor.execute(query, (dict_type,))
items = cursor.fetchall()
cursor.close()
# 处理JSON字段
for item in items:
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
# 构建树形结构
tree_data = []
nodes_map = {}
# 第一遍:创建所有节点
for item in items:
nodes_map[item['dict_code']] = {
**item,
'children': []
}
# 第二遍:构建树形关系
for item in items:
node = nodes_map[item['dict_code']]
parent_code = item['parent_code']
if parent_code == 'ROOT':
tree_data.append(node)
elif parent_code in nodes_map:
nodes_map[parent_code]['children'].append(node)
return create_api_response(
code="200",
message="获取成功",
data={
"items": items, # 平铺数据
"tree": tree_data # 树形数据
}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.get("/dict/{dict_type}/{dict_code}", response_model=dict)
async def get_dict_data_by_code(dict_type: str, dict_code: str):
"""
获取指定编码的码表数据(公开接口)
参数:
dict_type: 字典类型
dict_code: 字典编码
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time, update_time
FROM dict_data
WHERE dict_type = %s AND dict_code = %s
LIMIT 1
"""
cursor.execute(query, (dict_type, dict_code))
item = cursor.fetchone()
cursor.close()
if not item:
return create_api_response(
code="404",
message=f"未找到编码 {dict_code} 的数据"
)
# 处理JSON字段
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
return create_api_response(
code="200",
message="获取成功",
data=item
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.post("/dict", response_model=dict)
async def create_dict_data(
request: CreateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
创建码表数据(仅管理员)
"""
try:
# 验证extension_attr的JSON格式
if request.extension_attr:
try:
# 尝试序列化验证是否为有效的JSON对象
json.dumps(request.extension_attr)
except (TypeError, ValueError) as e:
return create_api_response(
code="400",
message=f"扩展属性格式错误必须是有效的JSON对象: {str(e)}"
)
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查是否已存在
cursor.execute(
"SELECT id FROM dict_data WHERE dict_type = %s AND dict_code = %s",
(request.dict_type, request.dict_code)
)
if cursor.fetchone():
cursor.close()
return create_api_response(
code="400",
message=f"编码 {request.dict_code} 已存在"
)
# 插入数据
query = """
INSERT INTO dict_data (
dict_type, dict_code, parent_code, label_cn, label_en,
sort_order, extension_attr, is_default, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
extension_json = json.dumps(request.extension_attr, ensure_ascii=False) if request.extension_attr else None
cursor.execute(query, (
request.dict_type,
request.dict_code,
request.parent_code,
request.label_cn,
request.label_en,
request.sort_order,
extension_json,
request.is_default,
request.status
))
new_id = cursor.lastrowid
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="创建成功",
data={"id": new_id}
)
except Exception as e:
return create_api_response(
code="500",
message=f"创建码表数据失败: {str(e)}"
)
@router.put("/dict/{id}", response_model=dict)
async def update_dict_data(
id: int,
request: UpdateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
更新码表数据(仅管理员)
"""
try:
# 验证extension_attr的JSON格式
if request.extension_attr is not None:
try:
# 尝试序列化验证是否为有效的JSON对象
json.dumps(request.extension_attr)
except (TypeError, ValueError) as e:
return create_api_response(
code="400",
message=f"扩展属性格式错误必须是有效的JSON对象: {str(e)}"
)
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT * FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 构建更新语句
update_fields = []
params = []
if request.parent_code is not None:
update_fields.append("parent_code = %s")
params.append(request.parent_code)
if request.label_cn is not None:
update_fields.append("label_cn = %s")
params.append(request.label_cn)
if request.label_en is not None:
update_fields.append("label_en = %s")
params.append(request.label_en)
if request.sort_order is not None:
update_fields.append("sort_order = %s")
params.append(request.sort_order)
if request.extension_attr is not None:
update_fields.append("extension_attr = %s")
params.append(json.dumps(request.extension_attr, ensure_ascii=False))
if request.is_default is not None:
update_fields.append("is_default = %s")
params.append(request.is_default)
if request.status is not None:
update_fields.append("status = %s")
params.append(request.status)
if not update_fields:
cursor.close()
return create_api_response(
code="400",
message="没有要更新的字段"
)
# 执行更新
update_query = f"""
UPDATE dict_data
SET {', '.join(update_fields)}
WHERE id = %s
"""
params.append(id)
cursor.execute(update_query, params)
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="更新成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"更新码表数据失败: {str(e)}"
)
@router.delete("/dict/{id}", response_model=dict)
async def delete_dict_data(
id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
删除码表数据(仅管理员)
注意:如果有子节点或被引用,应该拒绝删除
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT dict_code FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 检查是否有子节点
cursor.execute(
"SELECT COUNT(*) as count FROM dict_data WHERE parent_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该节点存在子节点,无法删除"
)
# 检查是否被client_downloads引用
cursor.execute(
"SELECT COUNT(*) as count FROM client_downloads WHERE platform_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该平台编码已被客户端下载记录引用,无法删除"
)
# 执行删除
cursor.execute("DELETE FROM dict_data WHERE id = %s", (id,))
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="删除成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"删除码表数据失败: {str(e)}"
)