399 lines
15 KiB
Python
399 lines
15 KiB
Python
from fastapi import APIRouter, Depends, Query
|
||
from app.core.auth import get_current_admin_user
|
||
from app.core.response import create_api_response
|
||
from app.core.database import get_db_connection
|
||
from app.services.jwt_service import jwt_service
|
||
from app.core.config import AUDIO_DIR, REDIS_CONFIG
|
||
from datetime import datetime
|
||
from typing import Dict, List
|
||
import os
|
||
import redis
|
||
|
||
router = APIRouter()
|
||
|
||
# Redis 客户端
|
||
redis_client = redis.Redis(**REDIS_CONFIG)
|
||
|
||
# 常量定义
|
||
AUDIO_FILE_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.aac', '.flac', '.ogg')
|
||
BYTES_TO_GB = 1024 ** 3
|
||
|
||
|
||
def _build_status_condition(status: str) -> str:
|
||
"""构建任务状态查询条件"""
|
||
if status == 'running':
|
||
return "AND (t.status = 'pending' OR t.status = 'processing')"
|
||
elif status == 'completed':
|
||
return "AND t.status = 'completed'"
|
||
elif status == 'failed':
|
||
return "AND t.status = 'failed'"
|
||
return ""
|
||
|
||
|
||
def _get_task_stats_query() -> str:
|
||
"""获取任务统计的 SQL 查询"""
|
||
return """
|
||
SELECT
|
||
COUNT(*) as total,
|
||
SUM(CASE WHEN status = 'pending' OR status = 'processing' THEN 1 ELSE 0 END) as running,
|
||
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
|
||
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
|
||
"""
|
||
|
||
|
||
def _get_online_user_count(redis_client) -> int:
|
||
"""从 Redis 获取在线用户数"""
|
||
try:
|
||
token_keys = redis_client.keys("token:*")
|
||
user_ids = set()
|
||
for key in token_keys:
|
||
parts = key.split(':')
|
||
if len(parts) >= 2:
|
||
user_ids.add(parts[1])
|
||
return len(user_ids)
|
||
except Exception as e:
|
||
print(f"获取在线用户数失败: {e}")
|
||
return 0
|
||
|
||
|
||
def _calculate_audio_storage() -> Dict[str, float]:
|
||
"""计算音频文件存储统计"""
|
||
audio_files_count = 0
|
||
audio_total_size = 0
|
||
|
||
try:
|
||
if os.path.exists(AUDIO_DIR):
|
||
for root, _, files in os.walk(AUDIO_DIR):
|
||
for file in files:
|
||
if file.endswith(AUDIO_FILE_EXTENSIONS):
|
||
audio_files_count += 1
|
||
file_path = os.path.join(root, file)
|
||
try:
|
||
audio_total_size += os.path.getsize(file_path)
|
||
except OSError:
|
||
continue
|
||
except Exception as e:
|
||
print(f"统计音频文件失败: {e}")
|
||
|
||
return {
|
||
"audio_files_count": audio_files_count,
|
||
"audio_total_size_gb": round(audio_total_size / BYTES_TO_GB, 2)
|
||
}
|
||
|
||
|
||
@router.get("/admin/dashboard/stats")
|
||
async def get_dashboard_stats(current_user=Depends(get_current_admin_user)):
|
||
"""获取管理员 Dashboard 统计数据"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
|
||
# 1. 用户统计
|
||
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
|
||
cursor.execute("SELECT COUNT(*) as total FROM users")
|
||
total_users = cursor.fetchone()['total']
|
||
|
||
cursor.execute(
|
||
"SELECT COUNT(*) as count FROM users WHERE created_at >= %s",
|
||
(today_start,)
|
||
)
|
||
today_new_users = cursor.fetchone()['count']
|
||
|
||
online_users = _get_online_user_count(redis_client)
|
||
|
||
# 2. 会议统计
|
||
cursor.execute("SELECT COUNT(*) as total FROM meetings")
|
||
total_meetings = cursor.fetchone()['total']
|
||
|
||
cursor.execute(
|
||
"SELECT COUNT(*) as count FROM meetings WHERE created_at >= %s",
|
||
(today_start,)
|
||
)
|
||
today_new_meetings = cursor.fetchone()['count']
|
||
|
||
# 3. 任务统计
|
||
task_stats_query = _get_task_stats_query()
|
||
|
||
# 转录任务
|
||
cursor.execute(f"{task_stats_query} FROM transcript_tasks")
|
||
transcription_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
|
||
|
||
# 总结任务
|
||
cursor.execute(f"{task_stats_query} FROM llm_tasks")
|
||
summary_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
|
||
|
||
# 知识库任务
|
||
cursor.execute(f"{task_stats_query} FROM knowledge_base_tasks")
|
||
kb_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
|
||
|
||
# 4. 音频存储统计
|
||
storage_stats = _calculate_audio_storage()
|
||
|
||
# 组装返回数据
|
||
stats = {
|
||
"users": {
|
||
"total": total_users,
|
||
"today_new": today_new_users,
|
||
"online": online_users
|
||
},
|
||
"meetings": {
|
||
"total": total_meetings,
|
||
"today_new": today_new_meetings
|
||
},
|
||
"tasks": {
|
||
"transcription": {
|
||
"total": transcription_stats['total'] or 0,
|
||
"running": transcription_stats['running'] or 0,
|
||
"completed": transcription_stats['completed'] or 0,
|
||
"failed": transcription_stats['failed'] or 0
|
||
},
|
||
"summary": {
|
||
"total": summary_stats['total'] or 0,
|
||
"running": summary_stats['running'] or 0,
|
||
"completed": summary_stats['completed'] or 0,
|
||
"failed": summary_stats['failed'] or 0
|
||
},
|
||
"knowledge_base": {
|
||
"total": kb_stats['total'] or 0,
|
||
"running": kb_stats['running'] or 0,
|
||
"completed": kb_stats['completed'] or 0,
|
||
"failed": kb_stats['failed'] or 0
|
||
}
|
||
},
|
||
"storage": storage_stats
|
||
}
|
||
|
||
return create_api_response(code="200", message="获取统计数据成功", data=stats)
|
||
|
||
except Exception as e:
|
||
print(f"获取Dashboard统计数据失败: {e}")
|
||
return create_api_response(code="500", message=f"获取统计数据失败: {str(e)}")
|
||
|
||
|
||
@router.get("/admin/online-users")
|
||
async def get_online_users(current_user=Depends(get_current_admin_user)):
|
||
"""获取在线用户列表"""
|
||
try:
|
||
token_keys = redis_client.keys("token:*")
|
||
|
||
# 提取用户ID并去重
|
||
user_tokens = {}
|
||
for key in token_keys:
|
||
parts = key.split(':')
|
||
if len(parts) >= 3:
|
||
user_id = int(parts[1])
|
||
token = parts[2]
|
||
if user_id not in user_tokens:
|
||
user_tokens[user_id] = []
|
||
user_tokens[user_id].append({'token': token, 'key': key})
|
||
|
||
# 查询用户信息
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
|
||
online_users_list = []
|
||
for user_id, tokens in user_tokens.items():
|
||
cursor.execute(
|
||
"SELECT user_id, username, caption, email, role_id FROM users WHERE user_id = %s",
|
||
(user_id,)
|
||
)
|
||
user = cursor.fetchone()
|
||
if user:
|
||
ttl_seconds = redis_client.ttl(tokens[0]['key'])
|
||
online_users_list.append({
|
||
**user,
|
||
'token_count': len(tokens),
|
||
'ttl_seconds': ttl_seconds,
|
||
'ttl_hours': round(ttl_seconds / 3600, 1) if ttl_seconds > 0 else 0
|
||
})
|
||
|
||
# 按用户ID排序
|
||
online_users_list.sort(key=lambda x: x['user_id'])
|
||
|
||
return create_api_response(
|
||
code="200",
|
||
message="获取在线用户列表成功",
|
||
data={"users": online_users_list, "total": len(online_users_list)}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"获取在线用户列表失败: {e}")
|
||
return create_api_response(code="500", message=f"获取在线用户列表失败: {str(e)}")
|
||
|
||
|
||
@router.post("/admin/kick-user/{user_id}")
|
||
async def kick_user(user_id: int, current_user=Depends(get_current_admin_user)):
|
||
"""踢出用户(撤销该用户的所有 token)"""
|
||
try:
|
||
revoked_count = jwt_service.revoke_all_user_tokens(user_id)
|
||
|
||
if revoked_count > 0:
|
||
return create_api_response(
|
||
code="200",
|
||
message=f"已踢出用户,撤销了 {revoked_count} 个 token",
|
||
data={"user_id": user_id, "revoked_count": revoked_count}
|
||
)
|
||
else:
|
||
return create_api_response(
|
||
code="404",
|
||
message="该用户当前不在线或未找到 token"
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"踢出用户失败: {e}")
|
||
return create_api_response(code="500", message=f"踢出用户失败: {str(e)}")
|
||
|
||
|
||
@router.get("/admin/tasks/monitor")
|
||
async def monitor_tasks(
|
||
task_type: str = Query('all', description="任务类型: all, transcription, summary, knowledge_base"),
|
||
status: str = Query('all', description="任务状态: all, running, completed, failed"),
|
||
limit: int = Query(20, ge=1, le=100, description="返回数量限制"),
|
||
current_user=Depends(get_current_admin_user)
|
||
):
|
||
"""监控任务进度"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
tasks = []
|
||
status_condition = _build_status_condition(status)
|
||
|
||
# 转录任务
|
||
if task_type in ['all', 'transcription']:
|
||
query = f"""
|
||
SELECT
|
||
t.task_id,
|
||
'transcription' as task_type,
|
||
t.meeting_id,
|
||
m.title as meeting_title,
|
||
t.status,
|
||
t.progress,
|
||
t.error_message,
|
||
t.created_at,
|
||
t.completed_at,
|
||
u.username as creator_name
|
||
FROM transcript_tasks t
|
||
LEFT JOIN meetings m ON t.meeting_id = m.meeting_id
|
||
LEFT JOIN users u ON m.user_id = u.user_id
|
||
WHERE 1=1 {status_condition}
|
||
ORDER BY t.created_at DESC
|
||
LIMIT %s
|
||
"""
|
||
cursor.execute(query, (limit,))
|
||
tasks.extend(cursor.fetchall())
|
||
|
||
# 总结任务
|
||
if task_type in ['all', 'summary']:
|
||
query = f"""
|
||
SELECT
|
||
t.task_id,
|
||
'summary' as task_type,
|
||
t.meeting_id,
|
||
m.title as meeting_title,
|
||
t.status,
|
||
NULL as progress,
|
||
t.error_message,
|
||
t.created_at,
|
||
t.completed_at,
|
||
u.username as creator_name
|
||
FROM llm_tasks t
|
||
LEFT JOIN meetings m ON t.meeting_id = m.meeting_id
|
||
LEFT JOIN users u ON m.user_id = u.user_id
|
||
WHERE 1=1 {status_condition}
|
||
ORDER BY t.created_at DESC
|
||
LIMIT %s
|
||
"""
|
||
cursor.execute(query, (limit,))
|
||
tasks.extend(cursor.fetchall())
|
||
|
||
# 知识库任务
|
||
if task_type in ['all', 'knowledge_base']:
|
||
query = f"""
|
||
SELECT
|
||
t.task_id,
|
||
'knowledge_base' as task_type,
|
||
t.kb_id as meeting_id,
|
||
k.title as meeting_title,
|
||
t.status,
|
||
t.progress,
|
||
t.error_message,
|
||
t.created_at,
|
||
t.updated_at,
|
||
u.username as creator_name
|
||
FROM knowledge_base_tasks t
|
||
LEFT JOIN knowledge_bases k ON t.kb_id = k.kb_id
|
||
LEFT JOIN users u ON k.creator_id = u.user_id
|
||
WHERE 1=1 {status_condition}
|
||
ORDER BY t.created_at DESC
|
||
LIMIT %s
|
||
"""
|
||
cursor.execute(query, (limit,))
|
||
tasks.extend(cursor.fetchall())
|
||
|
||
# 按创建时间排序并限制返回数量
|
||
tasks.sort(key=lambda x: x['created_at'], reverse=True)
|
||
tasks = tasks[:limit]
|
||
|
||
return create_api_response(
|
||
code="200",
|
||
message="获取任务监控数据成功",
|
||
data={"tasks": tasks, "total": len(tasks)}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"获取任务监控数据失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return create_api_response(code="500", message=f"获取任务监控数据失败: {str(e)}")
|
||
|
||
|
||
@router.get("/admin/system/resources")
|
||
async def get_system_resources(current_user=Depends(get_current_admin_user)):
|
||
"""获取服务器资源使用情况"""
|
||
try:
|
||
import psutil
|
||
|
||
# CPU 使用率
|
||
cpu_percent = psutil.cpu_percent(interval=1)
|
||
cpu_count = psutil.cpu_count()
|
||
|
||
# 内存使用情况
|
||
memory = psutil.virtual_memory()
|
||
memory_total_gb = round(memory.total / BYTES_TO_GB, 2)
|
||
memory_used_gb = round(memory.used / BYTES_TO_GB, 2)
|
||
|
||
# 磁盘使用情况
|
||
disk = psutil.disk_usage('/')
|
||
disk_total_gb = round(disk.total / BYTES_TO_GB, 2)
|
||
disk_used_gb = round(disk.used / BYTES_TO_GB, 2)
|
||
|
||
resources = {
|
||
"cpu": {
|
||
"percent": cpu_percent,
|
||
"count": cpu_count
|
||
},
|
||
"memory": {
|
||
"total_gb": memory_total_gb,
|
||
"used_gb": memory_used_gb,
|
||
"percent": memory.percent
|
||
},
|
||
"disk": {
|
||
"total_gb": disk_total_gb,
|
||
"used_gb": disk_used_gb,
|
||
"percent": disk.percent
|
||
},
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
return create_api_response(code="200", message="获取系统资源成功", data=resources)
|
||
|
||
except ImportError:
|
||
return create_api_response(
|
||
code="500",
|
||
message="psutil 库未安装,请运行: pip install psutil"
|
||
)
|
||
except Exception as e:
|
||
print(f"获取系统资源失败: {e}")
|
||
return create_api_response(code="500", message=f"获取系统资源失败: {str(e)}")
|