252 lines
10 KiB
Python
252 lines
10 KiB
Python
from typing import List, Optional, Tuple, Dict, Any
|
||
from app.core.database import get_db_connection
|
||
from app.models.models import Terminal, CreateTerminalRequest, UpdateTerminalRequest
|
||
import datetime
|
||
|
||
class TerminalService:
|
||
def get_terminals(self,
|
||
page: int = 1,
|
||
size: int = 20,
|
||
keyword: Optional[str] = None,
|
||
terminal_type: Optional[str] = None,
|
||
status: Optional[int] = None) -> Tuple[List[Dict[str, Any]], int]:
|
||
"""
|
||
获取终端列表,支持分页和筛选
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
where_clauses = []
|
||
params = []
|
||
|
||
if keyword:
|
||
where_clauses.append("(t.imei LIKE %s OR t.terminal_name LIKE %s)")
|
||
keyword_param = f"%{keyword}%"
|
||
params.extend([keyword_param, keyword_param])
|
||
|
||
if terminal_type:
|
||
where_clauses.append("t.terminal_type = %s")
|
||
params.append(terminal_type)
|
||
|
||
if status is not None:
|
||
where_clauses.append("t.status = %s")
|
||
params.append(status)
|
||
|
||
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
|
||
|
||
# 计算总数
|
||
count_query = f"SELECT COUNT(*) as total FROM terminals t WHERE {where_clause}"
|
||
cursor.execute(count_query, params)
|
||
total = cursor.fetchone()['total']
|
||
|
||
# 查询列表
|
||
offset = (page - 1) * size
|
||
list_query = f"""
|
||
SELECT
|
||
t.*,
|
||
u.username as creator_username,
|
||
cu.username as current_username,
|
||
cu.caption as current_user_caption,
|
||
dd.label_cn as terminal_type_name
|
||
FROM terminals t
|
||
LEFT JOIN users u ON t.created_by = u.user_id
|
||
LEFT JOIN users cu ON t.current_user_id = cu.user_id
|
||
LEFT JOIN dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
|
||
WHERE {where_clause}
|
||
ORDER BY t.created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
"""
|
||
cursor.execute(list_query, params + [size, offset])
|
||
terminals = cursor.fetchall()
|
||
|
||
cursor.close()
|
||
return terminals, total
|
||
|
||
def get_terminal_by_id(self, terminal_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据ID获取终端详情
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
query = """
|
||
SELECT
|
||
t.*,
|
||
u.username as creator_username,
|
||
dd.label_cn as terminal_type_name
|
||
FROM terminals t
|
||
LEFT JOIN users u ON t.created_by = u.user_id
|
||
LEFT JOIN dict_data dd ON t.terminal_type = dd.dict_code AND dd.dict_type = 'terminal_type'
|
||
WHERE t.id = %s
|
||
"""
|
||
cursor.execute(query, (terminal_id,))
|
||
terminal = cursor.fetchone()
|
||
|
||
cursor.close()
|
||
return terminal
|
||
|
||
def get_terminal_by_imei(self, imei: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据IMEI获取终端详情
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM terminals WHERE imei = %s", (imei,))
|
||
terminal = cursor.fetchone()
|
||
cursor.close()
|
||
return terminal
|
||
|
||
def create_terminal(self, terminal_data: CreateTerminalRequest, user_id: int) -> int:
|
||
"""
|
||
创建新终端
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
query = """
|
||
INSERT INTO terminals (
|
||
imei, terminal_name, terminal_type, description, status, created_by
|
||
) VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(query, (
|
||
terminal_data.imei,
|
||
terminal_data.terminal_name,
|
||
terminal_data.terminal_type,
|
||
terminal_data.description,
|
||
terminal_data.status,
|
||
user_id
|
||
))
|
||
|
||
new_id = cursor.lastrowid
|
||
conn.commit()
|
||
cursor.close()
|
||
return new_id
|
||
|
||
def update_terminal(self, terminal_id: int, terminal_data: UpdateTerminalRequest) -> bool:
|
||
"""
|
||
更新终端信息
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
update_fields = []
|
||
params = []
|
||
|
||
if terminal_data.terminal_name is not None:
|
||
update_fields.append("terminal_name = %s")
|
||
params.append(terminal_data.terminal_name)
|
||
|
||
if terminal_data.terminal_type is not None:
|
||
update_fields.append("terminal_type = %s")
|
||
params.append(terminal_data.terminal_type)
|
||
|
||
if terminal_data.description is not None:
|
||
update_fields.append("description = %s")
|
||
params.append(terminal_data.description)
|
||
|
||
if terminal_data.status is not None:
|
||
update_fields.append("status = %s")
|
||
params.append(terminal_data.status)
|
||
|
||
if terminal_data.firmware_version is not None:
|
||
update_fields.append("firmware_version = %s")
|
||
params.append(terminal_data.firmware_version)
|
||
|
||
if terminal_data.mac_address is not None:
|
||
update_fields.append("mac_address = %s")
|
||
params.append(terminal_data.mac_address)
|
||
|
||
if not update_fields:
|
||
return False
|
||
|
||
query = f"UPDATE terminals SET {', '.join(update_fields)} WHERE id = %s"
|
||
params.append(terminal_id)
|
||
|
||
cursor.execute(query, params)
|
||
conn.commit()
|
||
cursor.close()
|
||
return True
|
||
|
||
def delete_terminal(self, terminal_id: int) -> bool:
|
||
"""
|
||
删除终端
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM terminals WHERE id = %s", (terminal_id,))
|
||
deleted = cursor.rowcount > 0
|
||
conn.commit()
|
||
cursor.close()
|
||
return deleted
|
||
|
||
def set_terminal_status(self, terminal_id: int, status: int) -> bool:
|
||
"""
|
||
设置终端启用/停用状态
|
||
"""
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("UPDATE terminals SET status = %s WHERE id = %s", (status, terminal_id))
|
||
updated = cursor.rowcount > 0
|
||
conn.commit()
|
||
cursor.close()
|
||
return updated
|
||
|
||
def check_and_update_terminal(self, imei: str, terminal_type: str, terminal_name: str, ip_address: str, user_id: Optional[int] = None) -> dict:
|
||
"""
|
||
检查并更新终端状态(中间件调用)
|
||
"""
|
||
try:
|
||
with get_db_connection() as conn:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 检查是否存在
|
||
cursor.execute("SELECT id, status, is_activated FROM terminals WHERE imei = %s", (imei,))
|
||
existing = cursor.fetchone()
|
||
|
||
current_time = datetime.datetime.now()
|
||
|
||
if existing:
|
||
# 检查是否被停用
|
||
if existing['status'] == 0:
|
||
return {"allowed": False, "reason": "设备已被停用", "terminal_id": existing['id']}
|
||
|
||
# 更新在线时间、IP、名称(如果变了)、当前用户
|
||
update_query = """
|
||
UPDATE terminals
|
||
SET last_online_at = %s,
|
||
ip_address = %s,
|
||
current_user_id = %s,
|
||
# 如果设备没名字,尝试用上报的名字填充
|
||
terminal_name = IF(terminal_name IS NULL OR terminal_name = '', %s, terminal_name),
|
||
# 如果设备没类型,尝试用上报的类型填充
|
||
terminal_type = IF(terminal_type IS NULL OR terminal_type = '', %s, terminal_type)
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_query, (current_time, ip_address, user_id, terminal_name, terminal_type, existing['id']))
|
||
conn.commit()
|
||
|
||
return {"allowed": True, "reason": "", "terminal_id": existing['id']}
|
||
else:
|
||
# 新设备自动注册并激活
|
||
insert_query = """
|
||
INSERT INTO terminals (
|
||
imei, terminal_name, terminal_type, status,
|
||
is_activated, activated_at, last_online_at, ip_address, created_at, current_user_id
|
||
) VALUES (%s, %s, %s, 1, 1, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_query, (
|
||
imei, terminal_name, terminal_type,
|
||
current_time, current_time, ip_address, current_time, user_id
|
||
))
|
||
new_id = cursor.lastrowid
|
||
conn.commit()
|
||
|
||
return {"allowed": True, "reason": "", "terminal_id": new_id}
|
||
|
||
except Exception as e:
|
||
print(f"Error in check_and_update_terminal: {e}")
|
||
# 数据库错误暂时放行,或者根据安全策略决定
|
||
return {"allowed": True, "reason": f"DB Error: {str(e)}", "terminal_id": None}
|
||
|
||
terminal_service = TerminalService()
|