from fastapi import APIRouter, HTTPException, Depends from app.core.database import get_db_connection from app.core.auth import get_current_user, get_current_admin_user from app.core.response import create_api_response from pydantic import BaseModel from typing import Optional, List import json router = APIRouter() class DictDataItem(BaseModel): """码表数据项""" id: int dict_type: str dict_code: str parent_code: str tree_path: Optional[str] = None label_cn: str label_en: Optional[str] = None sort_order: int extension_attr: Optional[dict] = None is_default: int status: int create_time: str class CreateDictDataRequest(BaseModel): """创建码表数据请求""" dict_type: str = "client_platform" dict_code: str parent_code: str = "ROOT" label_cn: str label_en: Optional[str] = None sort_order: int = 0 extension_attr: Optional[dict] = None is_default: int = 0 status: int = 1 class UpdateDictDataRequest(BaseModel): """更新码表数据请求""" parent_code: Optional[str] = None label_cn: Optional[str] = None label_en: Optional[str] = None sort_order: Optional[int] = None extension_attr: Optional[dict] = None is_default: Optional[int] = None status: Optional[int] = None @router.get("/dict/types", response_model=dict) async def get_dict_types(): """ 获取所有字典类型(公开接口) """ try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT DISTINCT dict_type FROM dict_data WHERE status = 1 ORDER BY dict_type """ cursor.execute(query) types = cursor.fetchall() cursor.close() return create_api_response( code="200", message="获取成功", data={"types": [t['dict_type'] for t in types]} ) except Exception as e: return create_api_response( code="500", message=f"获取字典类型失败: {str(e)}" ) @router.get("/dict/{dict_type}", response_model=dict) async def get_dict_data_by_type(dict_type: str, parent_code: Optional[str] = None): """ 获取指定类型的所有码表数据(公开接口) 支持树形结构 可选参数:parent_code 筛选特定父节点的子项(此时不返回树结构,只返回平铺列表) 参数: dict_type: 字典类型,如 'client_platform' """ try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT id, dict_type, dict_code, parent_code, tree_path, label_cn, label_en, sort_order, extension_attr, is_default, status, create_time FROM dict_data WHERE dict_type = %s AND status = 1 """ params = [dict_type] if parent_code: query += " AND parent_code = %s" params.append(parent_code) query += " ORDER BY parent_code, sort_order, dict_code" cursor.execute(query, params) items = cursor.fetchall() cursor.close() # 处理JSON字段 for item in items: if item.get('extension_attr'): try: item['extension_attr'] = json.loads(item['extension_attr']) except: item['extension_attr'] = {} # 如果指定了parent_code,只返回平铺列表 if parent_code: return create_api_response( code="200", message="获取成功", data={ "items": items, "tree": [] } ) # 构建树形结构 tree_data = [] nodes_map = {} # 第一遍:创建所有节点 for item in items: nodes_map[item['dict_code']] = { **item, 'children': [] } # 第二遍:构建树形关系 for item in items: node = nodes_map[item['dict_code']] parent_code_val = item['parent_code'] if parent_code_val == 'ROOT': tree_data.append(node) elif parent_code_val in nodes_map: nodes_map[parent_code_val]['children'].append(node) return create_api_response( code="200", message="获取成功", data={ "items": items, # 平铺数据 "tree": tree_data # 树形数据 } ) except Exception as e: return create_api_response( code="500", message=f"获取码表数据失败: {str(e)}" ) @router.get("/dict/{dict_type}/{dict_code}", response_model=dict) async def get_dict_data_by_code(dict_type: str, dict_code: str): """ 获取指定编码的码表数据(公开接口) 参数: dict_type: 字典类型 dict_code: 字典编码 """ try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT id, dict_type, dict_code, parent_code, tree_path, label_cn, label_en, sort_order, extension_attr, is_default, status, create_time, update_time FROM dict_data WHERE dict_type = %s AND dict_code = %s LIMIT 1 """ cursor.execute(query, (dict_type, dict_code)) item = cursor.fetchone() cursor.close() if not item: return create_api_response( code="404", message=f"未找到编码 {dict_code} 的数据" ) # 处理JSON字段 if item.get('extension_attr'): try: item['extension_attr'] = json.loads(item['extension_attr']) except: item['extension_attr'] = {} return create_api_response( code="200", message="获取成功", data=item ) except Exception as e: return create_api_response( code="500", message=f"获取码表数据失败: {str(e)}" ) @router.post("/dict", response_model=dict) async def create_dict_data( request: CreateDictDataRequest, current_user: dict = Depends(get_current_admin_user) ): """ 创建码表数据(仅管理员) """ try: # 验证extension_attr的JSON格式 if request.extension_attr: try: # 尝试序列化,验证是否为有效的JSON对象 json.dumps(request.extension_attr) except (TypeError, ValueError) as e: return create_api_response( code="400", message=f"扩展属性格式错误,必须是有效的JSON对象: {str(e)}" ) with get_db_connection() as conn: cursor = conn.cursor() # 检查是否已存在 cursor.execute( "SELECT id FROM dict_data WHERE dict_type = %s AND dict_code = %s", (request.dict_type, request.dict_code) ) if cursor.fetchone(): cursor.close() return create_api_response( code="400", message=f"编码 {request.dict_code} 已存在" ) # 插入数据 query = """ INSERT INTO dict_data ( dict_type, dict_code, parent_code, label_cn, label_en, sort_order, extension_attr, is_default, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ extension_json = json.dumps(request.extension_attr, ensure_ascii=False) if request.extension_attr else None cursor.execute(query, ( request.dict_type, request.dict_code, request.parent_code, request.label_cn, request.label_en, request.sort_order, extension_json, request.is_default, request.status )) new_id = cursor.lastrowid conn.commit() cursor.close() return create_api_response( code="200", message="创建成功", data={"id": new_id} ) except Exception as e: return create_api_response( code="500", message=f"创建码表数据失败: {str(e)}" ) @router.put("/dict/{id}", response_model=dict) async def update_dict_data( id: int, request: UpdateDictDataRequest, current_user: dict = Depends(get_current_admin_user) ): """ 更新码表数据(仅管理员) """ try: # 验证extension_attr的JSON格式 if request.extension_attr is not None: try: # 尝试序列化,验证是否为有效的JSON对象 json.dumps(request.extension_attr) except (TypeError, ValueError) as e: return create_api_response( code="400", message=f"扩展属性格式错误,必须是有效的JSON对象: {str(e)}" ) with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) # 检查是否存在 cursor.execute("SELECT * FROM dict_data WHERE id = %s", (id,)) existing = cursor.fetchone() if not existing: cursor.close() return create_api_response( code="404", message="码表数据不存在" ) # 构建更新语句 update_fields = [] params = [] if request.parent_code is not None: update_fields.append("parent_code = %s") params.append(request.parent_code) if request.label_cn is not None: update_fields.append("label_cn = %s") params.append(request.label_cn) if request.label_en is not None: update_fields.append("label_en = %s") params.append(request.label_en) if request.sort_order is not None: update_fields.append("sort_order = %s") params.append(request.sort_order) if request.extension_attr is not None: update_fields.append("extension_attr = %s") params.append(json.dumps(request.extension_attr, ensure_ascii=False)) if request.is_default is not None: update_fields.append("is_default = %s") params.append(request.is_default) if request.status is not None: update_fields.append("status = %s") params.append(request.status) if not update_fields: cursor.close() return create_api_response( code="400", message="没有要更新的字段" ) # 执行更新 update_query = f""" UPDATE dict_data SET {', '.join(update_fields)} WHERE id = %s """ params.append(id) cursor.execute(update_query, params) conn.commit() cursor.close() return create_api_response( code="200", message="更新成功" ) except Exception as e: return create_api_response( code="500", message=f"更新码表数据失败: {str(e)}" ) @router.delete("/dict/{id}", response_model=dict) async def delete_dict_data( id: int, current_user: dict = Depends(get_current_admin_user) ): """ 删除码表数据(仅管理员) 注意:如果有子节点或被引用,应该拒绝删除 """ try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) # 检查是否存在 cursor.execute("SELECT dict_code FROM dict_data WHERE id = %s", (id,)) existing = cursor.fetchone() if not existing: cursor.close() return create_api_response( code="404", message="码表数据不存在" ) # 检查是否有子节点 cursor.execute( "SELECT COUNT(*) as count FROM dict_data WHERE parent_code = %s", (existing['dict_code'],) ) if cursor.fetchone()['count'] > 0: cursor.close() return create_api_response( code="400", message="该节点存在子节点,无法删除" ) # 检查是否被client_downloads引用 cursor.execute( "SELECT COUNT(*) as count FROM client_downloads WHERE platform_code = %s", (existing['dict_code'],) ) if cursor.fetchone()['count'] > 0: cursor.close() return create_api_response( code="400", message="该平台编码已被客户端下载记录引用,无法删除" ) # 执行删除 cursor.execute("DELETE FROM dict_data WHERE id = %s", (id,)) conn.commit() cursor.close() return create_api_response( code="200", message="删除成功" ) except Exception as e: return create_api_response( code="500", message=f"删除码表数据失败: {str(e)}" )