imetting/backend/app/api/endpoints/terminals.py

167 lines
5.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from typing import Optional
import traceback
from app.core.auth import get_current_admin_user
from app.core.response import create_api_response
from app.models.models import CreateTerminalRequest, UpdateTerminalRequest
from app.services.terminal_service import terminal_service
router = APIRouter()
@router.get("/terminals", response_model=dict)
async def get_terminals(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=10000),
keyword: Optional[str] = None,
terminal_type: Optional[str] = None,
status: Optional[int] = None,
current_user: dict = Depends(get_current_admin_user)
):
"""
获取终端设备列表(分页)
"""
try:
terminals, total = terminal_service.get_terminals(
page=page,
size=size,
keyword=keyword,
terminal_type=terminal_type,
status=status
)
return create_api_response(
code="200",
message="获取成功",
data={
"items": terminals,
"total": total,
"page": page,
"size": size
}
)
except Exception as e:
traceback.print_exc()
return create_api_response(
code="500",
message=f"获取终端列表失败: {str(e)}"
)
@router.post("/terminals", response_model=dict)
async def create_terminal(
request: CreateTerminalRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
创建新终端设备
"""
try:
# 检查IMEI是否存在
existing = terminal_service.get_terminal_by_imei(request.imei)
if existing:
return create_api_response(code="400", message=f"IMEI {request.imei} 已存在")
terminal_id = terminal_service.create_terminal(request, current_user['user_id'])
return create_api_response(
code="200",
message="创建成功",
data={"id": terminal_id}
)
except Exception as e:
return create_api_response(
code="500",
message=f"创建终端失败: {str(e)}"
)
@router.get("/terminals/{terminal_id}", response_model=dict)
async def get_terminal_detail(
terminal_id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
获取终端设备详情
"""
try:
terminal = terminal_service.get_terminal_by_id(terminal_id)
if not terminal:
return create_api_response(code="404", message="终端不存在")
return create_api_response(
code="200",
message="获取成功",
data=terminal
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取终端详情失败: {str(e)}"
)
@router.put("/terminals/{terminal_id}", response_model=dict)
async def update_terminal(
terminal_id: int,
request: UpdateTerminalRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
更新终端设备信息
"""
try:
# 检查是否存在
existing = terminal_service.get_terminal_by_id(terminal_id)
if not existing:
return create_api_response(code="404", message="终端不存在")
success = terminal_service.update_terminal(terminal_id, request)
if success:
return create_api_response(code="200", message="更新成功")
else:
return create_api_response(code="400", message="没有需要更新的字段")
except Exception as e:
return create_api_response(
code="500",
message=f"更新终端失败: {str(e)}"
)
@router.delete("/terminals/{terminal_id}", response_model=dict)
async def delete_terminal(
terminal_id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
删除终端设备
"""
try:
success = terminal_service.delete_terminal(terminal_id)
if success:
return create_api_response(code="200", message="删除成功")
else:
return create_api_response(code="404", message="终端不存在")
except Exception as e:
return create_api_response(
code="500",
message=f"删除终端失败: {str(e)}"
)
@router.post("/terminals/{terminal_id}/status", response_model=dict)
async def toggle_terminal_status(
terminal_id: int,
status: int = Query(..., description="1:启用, 0:停用"),
current_user: dict = Depends(get_current_admin_user)
):
"""
切换终端启用/停用状态
"""
try:
if status not in [0, 1]:
return create_api_response(code="400", message="状态值无效")
success = terminal_service.set_terminal_status(terminal_id, status)
if success:
return create_api_response(code="200", message="状态更新成功")
else:
return create_api_response(code="404", message="终端不存在")
except Exception as e:
return create_api_response(
code="500",
message=f"状态更新失败: {str(e)}"
)