imetting/backend/app/api/endpoints/client_downloads.py

566 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form
from app.models.models import (
ClientDownload,
CreateClientDownloadRequest,
UpdateClientDownloadRequest,
ClientDownloadListResponse
)
from app.core.database import get_db_connection
from app.core.auth import get_current_user, get_current_admin_user
from app.core.response import create_api_response
from app.core.config import CLIENT_DIR, ALLOWED_CLIENT_EXTENSIONS, MAX_CLIENT_SIZE, APP_CONFIG
from app.utils.apk_parser import parse_apk_with_androguard
from typing import Optional
from pathlib import Path
import os
import shutil
router = APIRouter()
@router.get("/clients", response_model=dict)
async def get_client_downloads(
platform_code: Optional[str] = None,
is_active: Optional[bool] = None,
page: int = 1,
size: int = 50
):
"""
获取客户端下载列表(管理后台接口)
参数:
platform_code: 平台编码(如 WIN, MAC, ANDROID等
is_active: 是否启用
page: 页码
size: 每页数量
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 构建查询条件
where_clauses = []
params = []
if platform_code:
where_clauses.append("platform_code = %s")
params.append(platform_code)
if is_active is not None:
where_clauses.append("is_active = %s")
params.append(is_active)
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
# 获取总数
count_query = f"SELECT COUNT(*) as total FROM client_downloads WHERE {where_clause}"
cursor.execute(count_query, params)
total = cursor.fetchone()['total']
# 获取列表数据 - 按 platform_code 和版本号排序
offset = (page - 1) * size
list_query = f"""
SELECT * FROM client_downloads
WHERE {where_clause}
ORDER BY platform_code, version_code DESC
LIMIT %s OFFSET %s
"""
cursor.execute(list_query, params + [size, offset])
clients = cursor.fetchall()
cursor.close()
return create_api_response(
code="200",
message="获取成功",
data={
"clients": clients,
"total": total,
"page": page,
"size": size
}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取客户端下载列表失败: {str(e)}"
)
@router.get("/clients/latest", response_model=dict)
async def get_latest_clients():
"""
获取所有平台的最新版本客户端(公开接口,用于首页下载)
返回按平台类型分组的最新客户端,包含平台的中英文名称
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 关联 dict_data 获取平台信息
query = """
SELECT cd.*, dd.label_cn, dd.label_en, dd.parent_code, dd.extension_attr
FROM client_downloads cd
LEFT JOIN dict_data dd ON cd.platform_code = dd.dict_code
AND dd.dict_type = 'client_platform'
WHERE cd.is_active = TRUE AND cd.is_latest = TRUE
ORDER BY dd.parent_code, dd.sort_order, cd.platform_code
"""
cursor.execute(query)
clients = cursor.fetchall()
# 处理 JSON 字段
for client in clients:
if client.get('extension_attr'):
try:
import json
client['extension_attr'] = json.loads(client['extension_attr'])
except:
client['extension_attr'] = {}
cursor.close()
# 按 parent_code 分组
mobile_clients = []
desktop_clients = []
terminal_clients = []
for client in clients:
parent_code = client.get('parent_code', '').upper()
if parent_code == 'MOBILE':
mobile_clients.append(client)
elif parent_code == 'DESKTOP':
desktop_clients.append(client)
elif parent_code == 'TERMINAL':
terminal_clients.append(client)
return create_api_response(
code="200",
message="获取成功",
data={
"mobile": mobile_clients,
"desktop": desktop_clients,
"terminal": terminal_clients
}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取最新客户端失败: {str(e)}"
)
@router.get("/clients/latest/by-platform", response_model=dict)
async def get_latest_version_by_code(
platform_type: Optional[str] = None,
platform_name: Optional[str] = None,
platform_code: Optional[str] = None
):
"""
获取最新版本客户端(公开接口,用于客户端版本检查)
支持两种调用方式:
1. 旧版方式:传 platform_type 和 platform_name兼容已发布的终端
2. 新版方式:传 platform_code推荐使用
参数:
platform_type: 平台类型 (mobile, desktop, terminal) - 旧版参数
platform_name: 具体平台 (ios, android, windows等) - 旧版参数
platform_code: 平台编码 (WIN, MAC, LINUX, IOS, ANDROID等) - 新版参数
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 优先使用 platform_code新版
if platform_code:
query = """
SELECT * FROM client_downloads
WHERE platform_code = %s
AND is_active = TRUE
AND is_latest = TRUE
LIMIT 1
"""
cursor.execute(query, (platform_code,))
client = cursor.fetchone()
if not client:
cursor.close()
return create_api_response(
code="404",
message=f"暂无最新客户端"
)
# 使用 platform_type 和 platform_name旧版兼容
elif platform_type and platform_name:
query = """
SELECT * FROM client_downloads
WHERE platform_type = %s
AND platform_name = %s
AND is_active = TRUE
AND is_latest = TRUE
LIMIT 1
"""
cursor.execute(query, (platform_type, platform_name))
client = cursor.fetchone()
if not client:
cursor.close()
return create_api_response(
code="404",
message=f"暂无最新客户端"
)
else:
cursor.close()
return create_api_response(
code="400",
message="请提供 platform_code 参数"
)
cursor.close()
return create_api_response(
code="200",
message="获取成功",
data=client
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取客户端版本失败: {str(e)}"
)
@router.get("/clients/{id}", response_model=dict)
async def get_client_download_by_id(id: int):
"""
获取指定ID的客户端详情公开接口
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM client_downloads WHERE id = %s"
cursor.execute(query, (id,))
client = cursor.fetchone()
cursor.close()
if not client:
return create_api_response(
code="404",
message="客户端不存在"
)
return create_api_response(
code="200",
message="获取成功",
data=client
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取客户端详情失败: {str(e)}"
)
@router.post("/clients", response_model=dict)
async def create_client_download(
request: CreateClientDownloadRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
创建新的客户端版本(仅管理员)
注意: platform_type 和 platform_name 为兼容字段,可不传
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# 如果设置为最新版本,先将同平台的其他版本设为非最新
if request.is_latest:
update_query = """
UPDATE client_downloads
SET is_latest = FALSE
WHERE platform_code = %s
"""
cursor.execute(update_query, (request.platform_code,))
# 插入新版本 - platform_type 和 platform_name 允许为 NULL
insert_query = """
INSERT INTO client_downloads (
platform_type, platform_name, platform_code,
version, version_code, download_url, file_size,
release_notes, is_active, is_latest, min_system_version,
created_by
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
request.platform_type, # 可为 None
request.platform_name, # 可为 None
request.platform_code, # 必填
request.version,
request.version_code,
request.download_url,
request.file_size,
request.release_notes,
request.is_active,
request.is_latest,
request.min_system_version,
current_user['user_id']
))
new_id = cursor.lastrowid
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="客户端版本创建成功",
data={"id": new_id}
)
except Exception as e:
return create_api_response(
code="500",
message=f"创建客户端版本失败: {str(e)}"
)
@router.put("/clients/{id}", response_model=dict)
async def update_client_download(
id: int,
request: UpdateClientDownloadRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
更新客户端版本信息(仅管理员)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查客户端是否存在
cursor.execute("SELECT * FROM client_downloads WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="客户端不存在"
)
# 如果设置为最新版本,先将同平台的其他版本设为非最新
if request.is_latest:
# 使用 platform_code (如果有更新) 或现有的 platform_code
platform_code = request.platform_code if request.platform_code else existing['platform_code']
update_query = """
UPDATE client_downloads
SET is_latest = FALSE
WHERE platform_code = %s AND id != %s
"""
cursor.execute(update_query, (platform_code, id))
# 构建更新语句
update_fields = []
params = []
if request.platform_type is not None:
update_fields.append("platform_type = %s")
params.append(request.platform_type)
if request.platform_name is not None:
update_fields.append("platform_name = %s")
params.append(request.platform_name)
if request.platform_code is not None:
update_fields.append("platform_code = %s")
params.append(request.platform_code)
if request.version is not None:
update_fields.append("version = %s")
params.append(request.version)
if request.version_code is not None:
update_fields.append("version_code = %s")
params.append(request.version_code)
if request.download_url is not None:
update_fields.append("download_url = %s")
params.append(request.download_url)
if request.file_size is not None:
update_fields.append("file_size = %s")
params.append(request.file_size)
if request.release_notes is not None:
update_fields.append("release_notes = %s")
params.append(request.release_notes)
if request.is_active is not None:
update_fields.append("is_active = %s")
params.append(request.is_active)
if request.is_latest is not None:
update_fields.append("is_latest = %s")
params.append(request.is_latest)
if request.min_system_version is not None:
update_fields.append("min_system_version = %s")
params.append(request.min_system_version)
if not update_fields:
cursor.close()
return create_api_response(
code="400",
message="没有要更新的字段"
)
# 执行更新
update_query = f"""
UPDATE client_downloads
SET {', '.join(update_fields)}
WHERE id = %s
"""
params.append(id)
cursor.execute(update_query, params)
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="客户端版本更新成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"更新客户端版本失败: {str(e)}"
)
@router.delete("/clients/{id}", response_model=dict)
async def delete_client_download(
id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
删除客户端版本(仅管理员)
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查是否存在
cursor.execute("SELECT * FROM client_downloads WHERE id = %s", (id,))
if not cursor.fetchone():
cursor.close()
return create_api_response(
code="404",
message="客户端不存在"
)
# 执行删除
cursor.execute("DELETE FROM client_downloads WHERE id = %s", (id,))
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="客户端版本删除成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"删除客户端版本失败: {str(e)}"
)
@router.post("/clients/upload", response_model=dict)
async def upload_client_installer(
platform_code: str = Form(...),
file: UploadFile = File(...),
current_user: dict = Depends(get_current_admin_user)
):
"""
上传客户端安装包(仅管理员)
参数:
platform_code: 平台编码(如 WIN, MAC, ANDROID等
file: 安装包文件
返回:
文件信息包括文件大小、下载URL以及APK的版本信息如果是APK
"""
try:
# 验证文件扩展名
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_CLIENT_EXTENSIONS:
return create_api_response(
code="400",
message=f"不支持的文件类型: {file_ext}。支持的类型: {', '.join(ALLOWED_CLIENT_EXTENSIONS)}"
)
# 创建平台目录
platform_dir = CLIENT_DIR / platform_code
platform_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名(保留原始文件名)
file_path = platform_dir / file.filename
# 检查文件大小
file.file.seek(0, 2) # 移动到文件末尾
file_size = file.file.tell() # 获取文件大小
file.file.seek(0) # 移回文件开头
if file_size > MAX_CLIENT_SIZE:
return create_api_response(
code="400",
message=f"文件过大,最大允许 {MAX_CLIENT_SIZE / 1024 / 1024} MB"
)
# 保存文件
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 构建下载URL
base_url = APP_CONFIG['base_url'].rstrip('/')
download_url = f"{base_url}/uploads/clients/{platform_code}/{file.filename}"
# 准备返回数据
result = {
"file_name": file.filename,
"file_size": file_size,
"download_url": download_url,
"platform_code": platform_code
}
# 如果是APK文件尝试解析版本信息
if file_ext == '.apk':
apk_info = parse_apk_with_androguard(str(file_path))
if apk_info:
result['version_code'] = apk_info.get('version_code')
result['version_name'] = apk_info.get('version_name')
result['note'] = apk_info.get('note', '')
else:
# APK解析失败给出提示
result['note'] = 'APK解析失败请检查后台日志。您可以手动输入版本信息。'
return create_api_response(
code="200",
message="文件上传成功",
data=result
)
except Exception as e:
return create_api_response(
code="500",
message=f"文件上传失败: {str(e)}"
)