imetting/backend/app/services/terminal_service.py

252 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from typing import List, Optional, Tuple, Dict, Any
from app.core.database import get_db_connection
from app.models.models import Terminal, CreateTerminalRequest, UpdateTerminalRequest
import datetime
class TerminalService:
def get_terminals(self,
page: int = 1,
size: int = 20,
keyword: Optional[str] = None,
terminal_type: Optional[str] = None,
status: Optional[int] = None) -> Tuple[List[Dict[str, Any]], int]:
"""
获取终端列表,支持分页和筛选
"""
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
where_clauses = []
params = []
if keyword:
where_clauses.append("(t.imei LIKE %s OR t.terminal_name LIKE %s)")
keyword_param = f"%{keyword}%"
params.extend([keyword_param, keyword_param])
if terminal_type:
where_clauses.append("t.terminal_type = %s")
params.append(terminal_type)
if status is not None:
where_clauses.append("t.status = %s")
params.append(status)
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
# 计算总数
count_query = f"SELECT COUNT(*) as total FROM terminals t WHERE {where_clause}"
cursor.execute(count_query, params)
total = cursor.fetchone()['total']
# 查询列表
offset = (page - 1) * size
list_query = f"""
SELECT
t.*,
u.username as creator_username,
cu.username as current_username,
cu.caption as current_user_caption,
dd.label_cn as terminal_type_name
FROM terminals t
LEFT JOIN users u ON t.created_by = u.user_id
LEFT JOIN users cu ON t.current_user_id = cu.user_id
LEFT JOIN dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
WHERE {where_clause}
ORDER BY t.created_at DESC
LIMIT %s OFFSET %s
"""
cursor.execute(list_query, params + [size, offset])
terminals = cursor.fetchall()
cursor.close()
return terminals, total
def get_terminal_by_id(self, terminal_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取终端详情
"""
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT
t.*,
u.username as creator_username,
dd.label_cn as terminal_type_name
FROM terminals t
LEFT JOIN users u ON t.created_by = u.user_id
LEFT JOIN dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
WHERE t.id = %s
"""
cursor.execute(query, (terminal_id,))
terminal = cursor.fetchone()
cursor.close()
return terminal
def get_terminal_by_imei(self, imei: str) -> Optional[Dict[str, Any]]:
"""
根据IMEI获取终端详情
"""
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM terminals WHERE imei = %s", (imei,))
terminal = cursor.fetchone()
cursor.close()
return terminal
def create_terminal(self, terminal_data: CreateTerminalRequest, user_id: int) -> int:
"""
创建新终端
"""
with get_db_connection() as conn:
cursor = conn.cursor()
query = """
INSERT INTO terminals (
imei, terminal_name, terminal_type, description, status, created_by
) VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (
terminal_data.imei,
terminal_data.terminal_name,
terminal_data.terminal_type,
terminal_data.description,
terminal_data.status,
user_id
))
new_id = cursor.lastrowid
conn.commit()
cursor.close()
return new_id
def update_terminal(self, terminal_id: int, terminal_data: UpdateTerminalRequest) -> bool:
"""
更新终端信息
"""
with get_db_connection() as conn:
cursor = conn.cursor()
update_fields = []
params = []
if terminal_data.terminal_name is not None:
update_fields.append("terminal_name = %s")
params.append(terminal_data.terminal_name)
if terminal_data.terminal_type is not None:
update_fields.append("terminal_type = %s")
params.append(terminal_data.terminal_type)
if terminal_data.description is not None:
update_fields.append("description = %s")
params.append(terminal_data.description)
if terminal_data.status is not None:
update_fields.append("status = %s")
params.append(terminal_data.status)
if terminal_data.firmware_version is not None:
update_fields.append("firmware_version = %s")
params.append(terminal_data.firmware_version)
if terminal_data.mac_address is not None:
update_fields.append("mac_address = %s")
params.append(terminal_data.mac_address)
if not update_fields:
return False
query = f"UPDATE terminals SET {', '.join(update_fields)} WHERE id = %s"
params.append(terminal_id)
cursor.execute(query, params)
conn.commit()
cursor.close()
return True
def delete_terminal(self, terminal_id: int) -> bool:
"""
删除终端
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM terminals WHERE id = %s", (terminal_id,))
deleted = cursor.rowcount > 0
conn.commit()
cursor.close()
return deleted
def set_terminal_status(self, terminal_id: int, status: int) -> bool:
"""
设置终端启用/停用状态
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE terminals SET status = %s WHERE id = %s", (status, terminal_id))
updated = cursor.rowcount > 0
conn.commit()
cursor.close()
return updated
def check_and_update_terminal(self, imei: str, terminal_type: str, terminal_name: str, ip_address: str, user_id: Optional[int] = None) -> dict:
"""
检查并更新终端状态(中间件调用)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT id, status, is_activated FROM terminals WHERE imei = %s", (imei,))
existing = cursor.fetchone()
current_time = datetime.datetime.now()
if existing:
# 检查是否被停用
if existing['status'] == 0:
return {"allowed": False, "reason": "设备已被停用", "terminal_id": existing['id']}
# 更新在线时间、IP、名称如果变了、当前用户
update_query = """
UPDATE terminals
SET last_online_at = %s,
ip_address = %s,
current_user_id = %s,
# 如果设备没名字,尝试用上报的名字填充
terminal_name = IF(terminal_name IS NULL OR terminal_name = '', %s, terminal_name),
# 如果设备没类型,尝试用上报的类型填充
terminal_type = IF(terminal_type IS NULL OR terminal_type = '', %s, terminal_type)
WHERE id = %s
"""
cursor.execute(update_query, (current_time, ip_address, user_id, terminal_name, terminal_type, existing['id']))
conn.commit()
return {"allowed": True, "reason": "", "terminal_id": existing['id']}
else:
# 新设备自动注册并激活
insert_query = """
INSERT INTO terminals (
imei, terminal_name, terminal_type, status,
is_activated, activated_at, last_online_at, ip_address, created_at, current_user_id
) VALUES (%s, %s, %s, 1, 1, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
imei, terminal_name, terminal_type,
current_time, current_time, ip_address, current_time, user_id
))
new_id = cursor.lastrowid
conn.commit()
return {"allowed": True, "reason": "", "terminal_id": new_id}
except Exception as e:
print(f"Error in check_and_update_terminal: {e}")
# 数据库错误暂时放行,或者根据安全策略决定
return {"allowed": True, "reason": f"DB Error: {str(e)}", "terminal_id": None}
terminal_service = TerminalService()