imetting/backend/app/services/terminal_service.py

262 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from typing import List, Optional, Tuple, Dict, Any
from app.core.database import get_db_connection
from app.models.models import UpdateTerminalRequest
import datetime
class TerminalService:
@staticmethod
def _normalize_imei(imei: str) -> str:
return str(imei or "").strip()
def get_terminals(self,
page: int = 1,
size: int = 20,
keyword: Optional[str] = None,
terminal_type: Optional[str] = None,
status: Optional[int] = None) -> Tuple[List[Dict[str, Any]], int]:
"""
获取终端列表,支持分页和筛选
"""
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
where_clauses = []
params = []
if keyword:
where_clauses.append("(t.imei LIKE %s OR t.terminal_name LIKE %s)")
keyword_param = f"%{keyword}%"
params.extend([keyword_param, keyword_param])
if terminal_type:
where_clauses.append("t.terminal_type = %s")
params.append(terminal_type)
if status is not None:
where_clauses.append("t.status = %s")
params.append(status)
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
deduped_terminals = """
FROM terminals t
INNER JOIN (
SELECT MAX(id) AS id
FROM terminals
WHERE imei IS NOT NULL AND TRIM(imei) <> ''
GROUP BY TRIM(imei)
) latest_terminal ON latest_terminal.id = t.id
"""
# 计算总数
count_query = f"SELECT COUNT(*) as total {deduped_terminals} WHERE {where_clause}"
cursor.execute(count_query, params)
total = cursor.fetchone()['total']
# 查询列表
offset = (page - 1) * size
list_query = f"""
SELECT
t.*,
u.username as creator_username,
cu.username as current_username,
cu.caption as current_user_caption,
dd.label_cn as terminal_type_name
{deduped_terminals}
LEFT JOIN sys_users u ON t.created_by = u.user_id
LEFT JOIN sys_users cu ON t.current_user_id = cu.user_id
LEFT JOIN sys_dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
WHERE {where_clause}
ORDER BY t.created_at DESC
LIMIT %s OFFSET %s
"""
cursor.execute(list_query, params + [size, offset])
terminals = cursor.fetchall()
cursor.close()
return terminals, total
def get_terminal_by_id(self, terminal_id: int) -> Optional[Dict[str, Any]]:
"""
根据ID获取终端详情
"""
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT
t.*,
u.username as creator_username,
dd.label_cn as terminal_type_name
FROM terminals t
LEFT JOIN sys_users u ON t.created_by = u.user_id
LEFT JOIN sys_dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
WHERE t.id = %s
"""
cursor.execute(query, (terminal_id,))
terminal = cursor.fetchone()
cursor.close()
return terminal
def get_terminal_by_imei(self, imei: str) -> Optional[Dict[str, Any]]:
"""
根据IMEI获取终端详情
"""
normalized_imei = self._normalize_imei(imei)
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM terminals WHERE TRIM(imei) = %s ORDER BY id DESC LIMIT 1",
(normalized_imei,)
)
terminal = cursor.fetchone()
cursor.close()
return terminal
def update_terminal(self, terminal_id: int, terminal_data: UpdateTerminalRequest) -> bool:
"""
更新终端信息
"""
with get_db_connection() as conn:
cursor = conn.cursor()
update_fields = []
params = []
if terminal_data.terminal_name is not None:
update_fields.append("terminal_name = %s")
params.append(terminal_data.terminal_name)
if terminal_data.terminal_type is not None:
update_fields.append("terminal_type = %s")
params.append(terminal_data.terminal_type)
if terminal_data.description is not None:
update_fields.append("description = %s")
params.append(terminal_data.description)
if terminal_data.status is not None:
update_fields.append("status = %s")
params.append(terminal_data.status)
if terminal_data.firmware_version is not None:
update_fields.append("firmware_version = %s")
params.append(terminal_data.firmware_version)
if terminal_data.mac_address is not None:
update_fields.append("mac_address = %s")
params.append(terminal_data.mac_address)
if not update_fields:
return False
query = f"UPDATE terminals SET {', '.join(update_fields)} WHERE id = %s"
params.append(terminal_id)
cursor.execute(query, params)
conn.commit()
cursor.close()
return True
def delete_terminal(self, terminal_id: int) -> bool:
"""
删除终端
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM terminals WHERE id = %s", (terminal_id,))
deleted = cursor.rowcount > 0
conn.commit()
cursor.close()
return deleted
def set_terminal_status(self, terminal_id: int, status: int) -> bool:
"""
设置终端启用/停用状态
"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE terminals SET status = %s WHERE id = %s", (status, terminal_id))
updated = cursor.rowcount > 0
conn.commit()
cursor.close()
return updated
def check_and_update_terminal(self, imei: str, terminal_type: str, terminal_name: str, ip_address: str, user_id: Optional[int] = None) -> dict:
"""
检查并更新终端状态(中间件调用)
"""
normalized_imei = self._normalize_imei(imei)
normalized_type = str(terminal_type or "UNKNOWN").strip() or "UNKNOWN"
normalized_name = str(terminal_name or "Unknown Device").strip() or "Unknown Device"
if not normalized_imei:
return {"allowed": False, "reason": "IMEI 不能为空", "terminal_id": None}
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT id, status, is_activated FROM terminals WHERE TRIM(imei) = %s ORDER BY id DESC LIMIT 1",
(normalized_imei,)
)
existing = cursor.fetchone()
current_time = datetime.datetime.now()
if existing:
# 检查是否被停用
if existing['status'] == 0:
return {"allowed": False, "reason": "设备已被停用", "terminal_id": existing['id']}
# 更新在线时间、IP、名称如果变了、当前用户
update_query = """
UPDATE terminals
SET last_online_at = %s,
ip_address = %s,
current_user_id = %s,
terminal_name = IF(terminal_name IS NULL OR terminal_name = '', %s, terminal_name),
terminal_type = IF(terminal_type IS NULL OR terminal_type = '', %s, terminal_type)
WHERE id = %s
"""
cursor.execute(update_query, (current_time, ip_address, user_id, normalized_name, normalized_type, existing['id']))
conn.commit()
return {"allowed": True, "reason": "", "terminal_id": existing['id']}
# 新设备自动注册并激活;依赖 imei 唯一索引处理并发首次注册。
insert_query = """
INSERT INTO terminals (
imei, terminal_name, terminal_type, status,
is_activated, activated_at, last_online_at, ip_address, created_at, current_user_id
) VALUES (%s, %s, %s, 1, 1, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
id = LAST_INSERT_ID(id),
last_online_at = IF(status = 0, last_online_at, VALUES(last_online_at)),
ip_address = IF(status = 0, ip_address, VALUES(ip_address)),
current_user_id = IF(status = 0, current_user_id, VALUES(current_user_id)),
terminal_name = IF(terminal_name IS NULL OR terminal_name = '', VALUES(terminal_name), terminal_name),
terminal_type = IF(terminal_type IS NULL OR terminal_type = '', VALUES(terminal_type), terminal_type)
"""
cursor.execute(insert_query, (
normalized_imei, normalized_name, normalized_type,
current_time, current_time, ip_address, current_time, user_id
))
terminal_id = cursor.lastrowid
cursor.execute("SELECT id, status, is_activated FROM terminals WHERE id = %s", (terminal_id,))
terminal = cursor.fetchone()
conn.commit()
if terminal and terminal['status'] == 0:
return {"allowed": False, "reason": "设备已被停用", "terminal_id": terminal['id']}
return {"allowed": True, "reason": "", "terminal_id": terminal_id}
except Exception as e:
print(f"Error in check_and_update_terminal: {e}")
# 数据库错误暂时放行,或者根据安全策略决定
return {"allowed": True, "reason": f"DB Error: {str(e)}", "terminal_id": None}
terminal_service = TerminalService()