Compare commits
2 Commits
d85f12fb9d
...
a192590c3b
| Author | SHA1 | Date |
|---|---|---|
|
|
a192590c3b | |
|
|
d6602435c4 |
|
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python Debugger: FastAPI",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "uvicorn",
|
||||
"args": [
|
||||
"main:app",
|
||||
"--reload"
|
||||
],
|
||||
"jinja": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
# 使用Python 3.9.6基础镜像
|
||||
FROM swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/python:3.12.9-slim
|
||||
|
||||
# 设置工作目录
|
||||
WORKDIR /app
|
||||
|
||||
# 设置环境变量
|
||||
ENV PYTHONPATH=/app
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
|
||||
# 安装系统依赖
|
||||
RUN apt-get update && apt-get install -y \
|
||||
gcc \
|
||||
default-libmysqlclient-dev \
|
||||
pkg-config \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 复制依赖文件
|
||||
COPY requirements-prod.txt .
|
||||
|
||||
# 安装Python依赖
|
||||
RUN pip install --no-cache-dir -r requirements-prod.txt
|
||||
|
||||
# 复制应用代码
|
||||
COPY . .
|
||||
|
||||
# 暴露端口
|
||||
EXPOSE 8001
|
||||
|
||||
# 启动命令
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
|
||||
|
|
@ -1,12 +1,14 @@
|
|||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends
|
||||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends, BackgroundTasks
|
||||
from app.models.models import Meeting, TranscriptSegment, TranscriptionTaskStatus, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, TranscriptUpdateRequest, BatchTranscriptUpdateRequest
|
||||
from app.core.database import get_db_connection
|
||||
from app.core.config import BASE_DIR, UPLOAD_DIR, AUDIO_DIR, MARKDOWN_DIR, ALLOWED_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS, MAX_FILE_SIZE, MAX_IMAGE_SIZE
|
||||
from app.services.qiniu_service import qiniu_service
|
||||
from app.services.llm_service import LLMService
|
||||
from app.services.async_transcription_service import AsyncTranscriptionService
|
||||
from app.services.async_llm_service import async_llm_service
|
||||
from app.core.auth import get_current_user, get_optional_current_user
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel
|
||||
import os
|
||||
import uuid
|
||||
|
|
@ -18,6 +20,9 @@ router = APIRouter()
|
|||
llm_service = LLMService()
|
||||
transcription_service = AsyncTranscriptionService()
|
||||
|
||||
# 注意:异步LLM服务需要单独启动worker进程
|
||||
# 运行命令:python llm_worker.py
|
||||
|
||||
# 请求模型
|
||||
class GenerateSummaryRequest(BaseModel):
|
||||
user_prompt: Optional[str] = ""
|
||||
|
|
@ -179,14 +184,15 @@ def create_meeting(meeting_request: CreateMeetingRequest, current_user: dict = D
|
|||
|
||||
# Create meeting
|
||||
meeting_query = '''
|
||||
INSERT INTO meetings (user_id, title, meeting_time, summary)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
INSERT INTO meetings (user_id, title, meeting_time, summary,created_at)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
'''
|
||||
cursor.execute(meeting_query, (
|
||||
meeting_request.user_id,
|
||||
meeting_request.title,
|
||||
meeting_request.meeting_time,
|
||||
None # summary starts as None
|
||||
None,
|
||||
datetime.now().isoformat()
|
||||
))
|
||||
|
||||
meeting_id = cursor.lastrowid
|
||||
|
|
@ -268,42 +274,6 @@ def delete_meeting(meeting_id: int, current_user: dict = Depends(get_current_use
|
|||
connection.commit()
|
||||
return {"message": "Meeting deleted successfully"}
|
||||
|
||||
@router.post("/meetings/{meeting_id}/regenerate-summary")
|
||||
def regenerate_summary(meeting_id: int, current_user: dict = Depends(get_current_user)):
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
# Check if meeting exists
|
||||
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
|
||||
if not cursor.fetchone():
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
# For now, return a mock summary
|
||||
# In a real implementation, this would call an AI service
|
||||
mock_summary = """# AI 生成摘要
|
||||
|
||||
## 主要议题
|
||||
- 项目进度回顾
|
||||
- 技术方案讨论
|
||||
- 下阶段规划
|
||||
|
||||
## 关键决策
|
||||
- 采用新的技术架构
|
||||
- 调整项目时间节点
|
||||
- 分配任务责任
|
||||
|
||||
## 后续行动
|
||||
- [ ] 完成技术方案文档
|
||||
- [ ] 安排下次会议时间
|
||||
- [ ] 跟进项目进度"""
|
||||
|
||||
# Update meeting summary
|
||||
update_query = "UPDATE meetings SET summary = %s WHERE meeting_id = %s"
|
||||
cursor.execute(update_query, (mock_summary, meeting_id))
|
||||
connection.commit()
|
||||
|
||||
return {"message": "Summary regenerated successfully", "summary": mock_summary}
|
||||
|
||||
@router.get("/meetings/{meeting_id}/edit", response_model=Meeting)
|
||||
def get_meeting_for_edit(meeting_id: int, current_user: dict = Depends(get_current_user)):
|
||||
"""获取会议信息用于编辑"""
|
||||
|
|
@ -834,4 +804,105 @@ def get_summary_detail(meeting_id: int, summary_id: int, current_user: dict = De
|
|||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get summary detail: {str(e)}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get summary detail: {str(e)}")
|
||||
|
||||
# ==================== 异步LLM总结相关接口 ====================
|
||||
|
||||
@router.post("/meetings/{meeting_id}/generate-summary-async")
|
||||
def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequest, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user)):
|
||||
"""生成会议AI总结(异步版本)"""
|
||||
try:
|
||||
# 检查会议是否存在
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
|
||||
if not cursor.fetchone():
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
# 启动异步任务
|
||||
task_id = async_llm_service.start_summary_generation(meeting_id, request.user_prompt)
|
||||
|
||||
# 将真正的处理函数作为后台任务添加
|
||||
background_tasks.add_task(async_llm_service._process_task, task_id)
|
||||
|
||||
return {
|
||||
"message": "Summary generation task has been accepted.",
|
||||
"task_id": task_id,
|
||||
"status": "pending",
|
||||
"meeting_id": meeting_id
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to start summary generation: {str(e)}")
|
||||
|
||||
@router.get("/llm-tasks/{task_id}/status")
|
||||
def get_llm_task_status(task_id: str, current_user: dict = Depends(get_current_user)):
|
||||
"""获取LLM任务状态(包括进度)"""
|
||||
try:
|
||||
status = async_llm_service.get_task_status(task_id)
|
||||
|
||||
if status.get('status') == 'not_found':
|
||||
raise HTTPException(status_code=404, detail="Task not found")
|
||||
|
||||
return status
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}")
|
||||
|
||||
@router.get("/meetings/{meeting_id}/llm-tasks")
|
||||
def get_meeting_llm_tasks(meeting_id: int, current_user: dict = Depends(get_current_user)):
|
||||
"""获取会议的所有LLM任务历史"""
|
||||
try:
|
||||
# 检查会议是否存在
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
|
||||
if not cursor.fetchone():
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
# 获取任务列表
|
||||
tasks = async_llm_service.get_meeting_llm_tasks(meeting_id)
|
||||
|
||||
return {
|
||||
"meeting_id": meeting_id,
|
||||
"tasks": tasks,
|
||||
"total": len(tasks)
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get LLM tasks: {str(e)}")
|
||||
|
||||
@router.get("/meetings/{meeting_id}/latest-llm-task")
|
||||
def get_latest_llm_task(meeting_id: int, current_user: dict = Depends(get_current_user)):
|
||||
"""获取会议最新的LLM任务状态"""
|
||||
try:
|
||||
tasks = async_llm_service.get_meeting_llm_tasks(meeting_id)
|
||||
|
||||
if not tasks:
|
||||
return {
|
||||
"meeting_id": meeting_id,
|
||||
"task": None,
|
||||
"message": "No LLM tasks found for this meeting"
|
||||
}
|
||||
|
||||
# 返回最新的任务
|
||||
latest_task = tasks[0]
|
||||
|
||||
# 如果任务还在进行中,获取实时状态
|
||||
if latest_task['status'] in ['pending', 'processing']:
|
||||
latest_status = async_llm_service.get_task_status(latest_task['task_id'])
|
||||
latest_task.update(latest_status)
|
||||
|
||||
return {
|
||||
"meeting_id": meeting_id,
|
||||
"task": latest_task
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get latest LLM task: {str(e)}")
|
||||
|
|
@ -22,7 +22,7 @@ MARKDOWN_DIR.mkdir(exist_ok=True)
|
|||
DATABASE_CONFIG = {
|
||||
'host': os.getenv('DB_HOST', 'localhost'),
|
||||
'user': os.getenv('DB_USER', 'root'),
|
||||
'password': os.getenv('DB_PASSWORD', ''),
|
||||
'password': os.getenv('DB_PASSWORD', 'sagacity'),
|
||||
'database': os.getenv('DB_NAME', 'imeeting'),
|
||||
'port': int(os.getenv('DB_PORT', '3306')),
|
||||
'charset': 'utf8mb4'
|
||||
|
|
@ -31,8 +31,7 @@ DATABASE_CONFIG = {
|
|||
# API配置
|
||||
API_CONFIG = {
|
||||
'host': os.getenv('API_HOST', '0.0.0.0'),
|
||||
'port': int(os.getenv('API_PORT', '8000')),
|
||||
'cors_origins': os.getenv('CORS_ORIGINS', 'http://localhost:5173').split(',')
|
||||
'port': int(os.getenv('API_PORT', '8000'))
|
||||
}
|
||||
|
||||
# 七牛云配置
|
||||
|
|
|
|||
|
|
@ -2,24 +2,14 @@
|
|||
from fastapi import HTTPException
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from app.core.config import DATABASE_CONFIG
|
||||
from contextlib import contextmanager
|
||||
|
||||
DB_CONFIG = {
|
||||
'host': 'localhost',
|
||||
'database': 'imeeting',
|
||||
'user': 'root',
|
||||
'password': 'sagacity',
|
||||
'port': 3306,
|
||||
'charset': 'utf8mb4',
|
||||
'autocommit': False, # 禁用自动提交
|
||||
'consume_results': True # 自动消费未读结果
|
||||
}
|
||||
|
||||
@contextmanager
|
||||
def get_db_connection():
|
||||
connection = None
|
||||
try:
|
||||
connection = mysql.connector.connect(**DB_CONFIG)
|
||||
connection = mysql.connector.connect(**DATABASE_CONFIG)
|
||||
yield connection
|
||||
except Error as e:
|
||||
print(f"数据库连接错误: {e}")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,215 @@
|
|||
"""
|
||||
异步LLM服务 - 处理会议总结生成的异步任务
|
||||
采用FastAPI BackgroundTasks模式
|
||||
"""
|
||||
import uuid
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
import redis
|
||||
from app.core.config import REDIS_CONFIG
|
||||
from app.core.database import get_db_connection
|
||||
from app.services.llm_service import LLMService
|
||||
|
||||
class AsyncLLMService:
|
||||
"""异步LLM服务类 - 采用FastAPI BackgroundTasks模式"""
|
||||
|
||||
def __init__(self):
|
||||
# 确保redis客户端自动解码响应,代码更简洁
|
||||
if 'decode_responses' not in REDIS_CONFIG:
|
||||
REDIS_CONFIG['decode_responses'] = True
|
||||
self.redis_client = redis.Redis(**REDIS_CONFIG)
|
||||
self.llm_service = LLMService() # 复用现有的同步LLM服务
|
||||
|
||||
def start_summary_generation(self, meeting_id: int, user_prompt: str = "") -> str:
|
||||
"""
|
||||
创建异步总结任务,任务的执行将由外部(如API层的BackgroundTasks)触发。
|
||||
|
||||
Args:
|
||||
meeting_id: 会议ID
|
||||
user_prompt: 用户额外提示词
|
||||
|
||||
Returns:
|
||||
str: 任务ID
|
||||
"""
|
||||
try:
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
# 在数据库中创建任务记录
|
||||
self._save_task_to_db(task_id, meeting_id, user_prompt)
|
||||
|
||||
# 将任务详情存入Redis,用于快速查询状态
|
||||
current_time = datetime.now().isoformat()
|
||||
task_data = {
|
||||
'task_id': task_id,
|
||||
'meeting_id': str(meeting_id),
|
||||
'user_prompt': user_prompt,
|
||||
'status': 'pending',
|
||||
'progress': '0',
|
||||
'created_at': current_time,
|
||||
'updated_at': current_time
|
||||
}
|
||||
self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data)
|
||||
self.redis_client.expire(f"llm_task:{task_id}", 86400)
|
||||
|
||||
print(f"LLM summary task created: {task_id} for meeting: {meeting_id}")
|
||||
return task_id
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error starting summary generation: {e}")
|
||||
raise e
|
||||
|
||||
def _process_task(self, task_id: str):
|
||||
"""
|
||||
处理单个异步任务的函数,设计为由BackgroundTasks调用。
|
||||
"""
|
||||
print(f"Background task started for LLM task: {task_id}")
|
||||
try:
|
||||
# 从Redis获取任务数据
|
||||
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
|
||||
if not task_data:
|
||||
print(f"Error: Task {task_id} not found in Redis for processing.")
|
||||
return
|
||||
|
||||
meeting_id = int(task_data['meeting_id'])
|
||||
user_prompt = task_data.get('user_prompt', '')
|
||||
|
||||
# 1. 更新状态为processing
|
||||
self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...")
|
||||
|
||||
# 2. 获取会议转录内容
|
||||
self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...")
|
||||
transcript_text = self.llm_service._get_meeting_transcript(meeting_id)
|
||||
if not transcript_text:
|
||||
raise Exception("无法获取会议转录内容")
|
||||
|
||||
# 3. 构建提示词
|
||||
self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...")
|
||||
full_prompt = self.llm_service._build_prompt(transcript_text, user_prompt)
|
||||
|
||||
# 4. 调用LLM API
|
||||
self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...")
|
||||
summary_content = self.llm_service._call_llm_api(full_prompt)
|
||||
if not summary_content:
|
||||
raise Exception("LLM API调用失败或返回空内容")
|
||||
|
||||
# 5. 保存结果到主表
|
||||
self._update_task_status_in_redis(task_id, 'processing', 95, message="保存总结结果...")
|
||||
self.llm_service._save_summary_to_db(meeting_id, summary_content, user_prompt)
|
||||
|
||||
# 6. 任务完成
|
||||
self._update_task_in_db(task_id, 'completed', 100, result=summary_content)
|
||||
self._update_task_status_in_redis(task_id, 'completed', 100, result=summary_content)
|
||||
print(f"Task {task_id} completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
print(f"Task {task_id} failed: {error_msg}")
|
||||
# 更新失败状态
|
||||
self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg)
|
||||
self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg)
|
||||
|
||||
# --- 状态查询和数据库操作方法 ---
|
||||
|
||||
def get_task_status(self, task_id: str) -> Dict[str, Any]:
|
||||
"""获取任务状态"""
|
||||
try:
|
||||
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
|
||||
if not task_data:
|
||||
task_data = self._get_task_from_db(task_id)
|
||||
if not task_data:
|
||||
return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'}
|
||||
|
||||
return {
|
||||
'task_id': task_id,
|
||||
'status': task_data.get('status', 'unknown'),
|
||||
'progress': int(task_data.get('progress', 0)),
|
||||
'meeting_id': int(task_data.get('meeting_id', 0)),
|
||||
'created_at': task_data.get('created_at'),
|
||||
'updated_at': task_data.get('updated_at'),
|
||||
'result': task_data.get('result'),
|
||||
'error_message': task_data.get('error_message')
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error getting task status: {e}")
|
||||
return {'task_id': task_id, 'status': 'error', 'error_message': str(e)}
|
||||
|
||||
def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]:
|
||||
"""获取会议的所有LLM任务"""
|
||||
try:
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
query = "SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC"
|
||||
cursor.execute(query, (meeting_id,))
|
||||
tasks = cursor.fetchall()
|
||||
for task in tasks:
|
||||
if task.get('created_at'): task['created_at'] = task['created_at'].isoformat()
|
||||
if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat()
|
||||
return tasks
|
||||
except Exception as e:
|
||||
print(f"Error getting meeting LLM tasks: {e}")
|
||||
return []
|
||||
|
||||
def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None):
|
||||
"""更新Redis中的任务状态"""
|
||||
try:
|
||||
update_data = {
|
||||
'status': status,
|
||||
'progress': str(progress),
|
||||
'updated_at': datetime.now().isoformat()
|
||||
}
|
||||
if message: update_data['message'] = message
|
||||
if result: update_data['result'] = result
|
||||
if error_message: update_data['error_message'] = error_message
|
||||
self.redis_client.hset(f"llm_task:{task_id}", mapping=update_data)
|
||||
except Exception as e:
|
||||
print(f"Error updating task status in Redis: {e}")
|
||||
|
||||
def _save_task_to_db(self, task_id: str, meeting_id: int, user_prompt: str):
|
||||
"""保存任务到数据库"""
|
||||
try:
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor()
|
||||
insert_query = "INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, status, progress, created_at) VALUES (%s, %s, %s, 'pending', 0, NOW())"
|
||||
cursor.execute(insert_query, (task_id, meeting_id, user_prompt))
|
||||
connection.commit()
|
||||
except Exception as e:
|
||||
print(f"Error saving task to database: {e}")
|
||||
raise
|
||||
|
||||
def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None):
|
||||
"""更新数据库中的任务状态"""
|
||||
try:
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor()
|
||||
params = [status, progress, error_message, task_id]
|
||||
if status == 'completed':
|
||||
query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s, result = %s, completed_at = NOW() WHERE task_id = %s"
|
||||
params.insert(2, result)
|
||||
else:
|
||||
query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s"
|
||||
|
||||
cursor.execute(query, tuple(params))
|
||||
connection.commit()
|
||||
except Exception as e:
|
||||
print(f"Error updating task in database: {e}")
|
||||
|
||||
def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]:
|
||||
"""从数据库获取任务信息"""
|
||||
try:
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor(dictionary=True)
|
||||
query = "SELECT * FROM llm_tasks WHERE task_id = %s"
|
||||
cursor.execute(query, (task_id,))
|
||||
task = cursor.fetchone()
|
||||
if task:
|
||||
# 确保所有字段都是字符串,以匹配Redis的行为
|
||||
return {k: v.isoformat() if isinstance(v, datetime) else str(v) for k, v in task.items()}
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Error getting task from database: {e}")
|
||||
return None
|
||||
|
||||
# 创建全局实例
|
||||
async_llm_service = AsyncLLMService()
|
||||
|
|
@ -56,6 +56,7 @@ class AsyncTranscriptionService:
|
|||
business_task_id = str(uuid.uuid4())
|
||||
|
||||
# 在Redis中存储任务映射
|
||||
current_time = datetime.now().isoformat()
|
||||
task_data = {
|
||||
'business_task_id': business_task_id,
|
||||
'paraformer_task_id': paraformer_task_id,
|
||||
|
|
@ -63,8 +64,8 @@ class AsyncTranscriptionService:
|
|||
'file_url': file_url,
|
||||
'status': 'pending',
|
||||
'progress': '0',
|
||||
'created_at': datetime.now().isoformat(),
|
||||
'updated_at': datetime.now().isoformat()
|
||||
'created_at': current_time,
|
||||
'updated_at': current_time
|
||||
}
|
||||
|
||||
# 存储到Redis,过期时间24小时
|
||||
|
|
@ -72,7 +73,7 @@ class AsyncTranscriptionService:
|
|||
self.redis_client.expire(f"task:{business_task_id}", 86400)
|
||||
|
||||
# 在数据库中创建任务记录
|
||||
self._save_task_to_db(business_task_id, meeting_id, audio_file_path)
|
||||
self._save_task_to_db(business_task_id, paraformer_task_id, meeting_id, audio_file_path)
|
||||
|
||||
print(f"Transcription task created: {business_task_id}")
|
||||
return business_task_id
|
||||
|
|
@ -91,68 +92,106 @@ class AsyncTranscriptionService:
|
|||
Returns:
|
||||
Dict: 任务状态信息
|
||||
"""
|
||||
task_data = None
|
||||
current_status = 'failed'
|
||||
progress = 0
|
||||
error_message = "An unknown error occurred."
|
||||
|
||||
try:
|
||||
# 从Redis获取任务信息
|
||||
task_data = self.redis_client.hgetall(f"task:{business_task_id}")
|
||||
if not task_data:
|
||||
# 尝试从数据库获取
|
||||
task_data = self._get_task_from_db(business_task_id)
|
||||
if not task_data:
|
||||
raise Exception("Task not found")
|
||||
|
||||
# 1. 获取任务数据(优先Redis,回源DB)
|
||||
task_data = self._get_task_data(business_task_id)
|
||||
paraformer_task_id = task_data['paraformer_task_id']
|
||||
|
||||
# 查询Paraformer任务状态
|
||||
paraformer_response = Transcription.fetch(task=paraformer_task_id)
|
||||
|
||||
if paraformer_response.status_code != HTTPStatus.OK:
|
||||
print(f"Failed to fetch task status: {paraformer_response.message}")
|
||||
current_status = 'failed'
|
||||
progress = 0
|
||||
error_message = paraformer_response.message
|
||||
else:
|
||||
# 映射Paraformer状态到业务状态
|
||||
|
||||
# 2. 查询外部API获取状态
|
||||
try:
|
||||
paraformer_response = Transcription.fetch(task=paraformer_task_id)
|
||||
if paraformer_response.status_code != HTTPStatus.OK:
|
||||
raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}")
|
||||
|
||||
paraformer_status = paraformer_response.output.task_status
|
||||
current_status = self._map_paraformer_status(paraformer_status)
|
||||
progress = self._calculate_progress(paraformer_status)
|
||||
error_message = None
|
||||
|
||||
# 如果任务完成,处理结果
|
||||
if current_status == 'completed' and paraformer_response.output.get('results'):
|
||||
error_message = None #执行成功,清除初始状态
|
||||
|
||||
except Exception as e:
|
||||
current_status = 'failed'
|
||||
progress = 0
|
||||
error_message = f"Error fetching status from provider: {e}"
|
||||
# 直接进入finally块更新状态后返回
|
||||
return
|
||||
|
||||
# 3. 如果任务完成,处理结果
|
||||
if current_status == 'completed' and paraformer_response.output.get('results'):
|
||||
try:
|
||||
self._process_transcription_result(
|
||||
business_task_id,
|
||||
int(task_data['meeting_id']),
|
||||
paraformer_response.output
|
||||
)
|
||||
except Exception as e:
|
||||
current_status = 'failed'
|
||||
progress = 100 # 进度为100,但状态是失败
|
||||
error_message = f"Error processing transcription result: {e}"
|
||||
print(error_message)
|
||||
|
||||
except Exception as e:
|
||||
error_message = f"Error getting task status: {e}"
|
||||
print(error_message)
|
||||
current_status = 'failed'
|
||||
progress = 0
|
||||
|
||||
finally:
|
||||
# 4. 更新Redis和数据库状态
|
||||
updated_at = datetime.now().isoformat()
|
||||
|
||||
# 更新Redis中的状态
|
||||
# 更新Redis
|
||||
update_data = {
|
||||
'status': current_status,
|
||||
'progress': str(progress),
|
||||
'updated_at': datetime.now().isoformat()
|
||||
'updated_at': updated_at
|
||||
}
|
||||
if error_message:
|
||||
update_data['error_message'] = error_message
|
||||
|
||||
self.redis_client.hset(f"task:{business_task_id}", mapping=update_data)
|
||||
|
||||
# 更新数据库中的状态
|
||||
|
||||
# 更新数据库
|
||||
self._update_task_status_in_db(business_task_id, current_status, progress, error_message)
|
||||
|
||||
return {
|
||||
|
||||
# 5. 构造并返回最终结果
|
||||
result = {
|
||||
'task_id': business_task_id,
|
||||
'status': current_status,
|
||||
'progress': progress,
|
||||
'meeting_id': int(task_data['meeting_id']),
|
||||
'created_at': task_data.get('created_at'),
|
||||
'updated_at': update_data['updated_at'],
|
||||
'error_message': error_message
|
||||
'error_message': error_message,
|
||||
'updated_at': updated_at,
|
||||
'meeting_id': None,
|
||||
'created_at': None,
|
||||
}
|
||||
if task_data:
|
||||
result['meeting_id'] = int(task_data['meeting_id'])
|
||||
result['created_at'] = task_data.get('created_at')
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error getting task status: {e}")
|
||||
raise e
|
||||
|
||||
return result
|
||||
|
||||
def _get_task_data(self, business_task_id: str) -> Dict[str, Any]:
|
||||
"""从Redis或数据库获取任务数据"""
|
||||
# 尝试从Redis获取
|
||||
task_data_bytes = self.redis_client.hgetall(f"task:{business_task_id}")
|
||||
if task_data_bytes and task_data_bytes.get(b'paraformer_task_id'):
|
||||
# Redis返回的是bytes,需要解码
|
||||
return {k.decode('utf-8'): v.decode('utf-8') for k, v in task_data_bytes.items()}
|
||||
|
||||
# 如果Redis没有,从数据库回源
|
||||
task_data_from_db = self._get_task_from_db(business_task_id)
|
||||
if not task_data_from_db or not task_data_from_db.get('paraformer_task_id'):
|
||||
raise Exception("Task not found in DB or paraformer_task_id is missing")
|
||||
|
||||
# 将从DB获取的数据缓存回Redis
|
||||
self.redis_client.hset(f"task:{business_task_id}", mapping=task_data_from_db)
|
||||
self.redis_client.expire(f"task:{business_task_id}", 86400)
|
||||
|
||||
return task_data_from_db
|
||||
|
||||
def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
获取会议的转录任务状态
|
||||
|
|
@ -187,11 +226,9 @@ class AsyncTranscriptionService:
|
|||
# 如果任务还在进行中,获取最新状态
|
||||
if task_record['status'] in ['pending', 'processing']:
|
||||
try:
|
||||
latest_status = self.get_task_status(task_record['task_id'])
|
||||
return latest_status
|
||||
except Exception:
|
||||
# 如果获取最新状态失败,返回数据库中的状态
|
||||
pass
|
||||
return self.get_task_status(task_record['task_id'])
|
||||
except Exception as e:
|
||||
print(f"Failed to get latest task status for meeting {meeting_id}, returning DB status. Error: {e}")
|
||||
|
||||
return {
|
||||
'task_id': task_record['task_id'],
|
||||
|
|
@ -220,37 +257,28 @@ class AsyncTranscriptionService:
|
|||
def _calculate_progress(self, paraformer_status: str) -> int:
|
||||
"""根据Paraformer状态计算进度"""
|
||||
progress_mapping = {
|
||||
'PENDING': 0,
|
||||
'PENDING': 10,
|
||||
'RUNNING': 50,
|
||||
'SUCCEEDED': 100,
|
||||
'FAILED': 0
|
||||
}
|
||||
return progress_mapping.get(paraformer_status, 0)
|
||||
|
||||
def _save_task_to_db(self, business_task_id: str, meeting_id: int, audio_file_path: str):
|
||||
def _save_task_to_db(self, business_task_id: str, paraformer_task_id: str, meeting_id: int, audio_file_path: str):
|
||||
"""保存任务记录到数据库"""
|
||||
try:
|
||||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor()
|
||||
|
||||
# 更新audio_files表关联task_id
|
||||
update_audio_query = """
|
||||
UPDATE audio_files
|
||||
SET task_id = %s
|
||||
WHERE meeting_id = %s AND file_path = %s
|
||||
"""
|
||||
cursor.execute(update_audio_query, (business_task_id, meeting_id, audio_file_path))
|
||||
|
||||
# 插入转录任务记录
|
||||
insert_task_query = """
|
||||
INSERT INTO transcript_tasks (task_id, meeting_id, status, progress, created_at)
|
||||
VALUES (%s, %s, 'pending', 0, NOW())
|
||||
INSERT INTO transcript_tasks (task_id, paraformer_task_id, meeting_id, status, progress, created_at)
|
||||
VALUES (%s, %s, %s, 'pending', 0, NOW())
|
||||
"""
|
||||
cursor.execute(insert_task_query, (business_task_id, meeting_id))
|
||||
cursor.execute(insert_task_query, (business_task_id, paraformer_task_id, meeting_id))
|
||||
|
||||
connection.commit()
|
||||
cursor.close() # 明确关闭游标
|
||||
print(f"Task record saved to database: {business_task_id}")
|
||||
cursor.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error saving task to database: {e}")
|
||||
|
|
@ -262,6 +290,7 @@ class AsyncTranscriptionService:
|
|||
with get_db_connection() as connection:
|
||||
cursor = connection.cursor()
|
||||
|
||||
params = [status, progress, error_message, business_task_id]
|
||||
if status == 'completed':
|
||||
update_query = """
|
||||
UPDATE transcript_tasks
|
||||
|
|
@ -275,9 +304,9 @@ class AsyncTranscriptionService:
|
|||
WHERE task_id = %s
|
||||
"""
|
||||
|
||||
cursor.execute(update_query, (status, progress, error_message, business_task_id))
|
||||
cursor.execute(update_query, tuple(params))
|
||||
connection.commit()
|
||||
cursor.close() # 明确关闭游标
|
||||
cursor.close()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error updating task status in database: {e}")
|
||||
|
|
@ -289,21 +318,22 @@ class AsyncTranscriptionService:
|
|||
cursor = connection.cursor(dictionary=True)
|
||||
|
||||
query = """
|
||||
SELECT tt.task_id as business_task_id, tt.meeting_id, tt.status
|
||||
SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.created_at
|
||||
FROM transcript_tasks tt
|
||||
WHERE tt.task_id = %s
|
||||
"""
|
||||
cursor.execute(query, (business_task_id,))
|
||||
result = cursor.fetchone()
|
||||
cursor.close() # 明确关闭游标
|
||||
cursor.close()
|
||||
|
||||
if result:
|
||||
# 转换为字符串格式以保持一致性
|
||||
# 转换为与Redis一致的字符串格式
|
||||
return {
|
||||
'business_task_id': result['business_task_id'],
|
||||
'paraformer_task_id': '', # 数据库中没有存储,需要从Redis获取
|
||||
'paraformer_task_id': result['paraformer_task_id'],
|
||||
'meeting_id': str(result['meeting_id']),
|
||||
'status': result['status']
|
||||
'status': result['status'],
|
||||
'created_at': result['created_at'].isoformat() if result['created_at'] else None
|
||||
}
|
||||
return None
|
||||
|
||||
|
|
@ -312,11 +342,13 @@ class AsyncTranscriptionService:
|
|||
return None
|
||||
|
||||
def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any):
|
||||
"""处理转录结果"""
|
||||
"""
|
||||
处理转录结果.
|
||||
如果处理失败,此函数会抛出异常.
|
||||
"""
|
||||
try:
|
||||
if not paraformer_output.get('results'):
|
||||
print("No transcription results found in the response.")
|
||||
return
|
||||
raise Exception("No transcription results found in the provider response.")
|
||||
|
||||
transcription_url = paraformer_output['results'][0]['transcription_url']
|
||||
print(f"Fetching transcription from URL: {transcription_url}")
|
||||
|
|
@ -331,7 +363,9 @@ class AsyncTranscriptionService:
|
|||
print(f"Transcription result processed for task: {business_task_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing transcription result: {e}")
|
||||
# 记录具体错误并重新抛出,以便上层捕获
|
||||
print(f"Error processing transcription result for task {business_task_id}: {e}")
|
||||
raise
|
||||
|
||||
def _save_segments_to_db(self, data: dict, meeting_id: int):
|
||||
"""保存转录分段到数据库"""
|
||||
|
|
@ -368,8 +402,9 @@ class AsyncTranscriptionService:
|
|||
'''
|
||||
cursor.executemany(insert_query, segments_to_insert)
|
||||
connection.commit()
|
||||
cursor.close() # 明确关闭游标
|
||||
cursor.close()
|
||||
print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Database error when saving segments: {e}")
|
||||
print(f"Database error when saving segments: {e}")
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
#!/bin/bash
|
||||
|
||||
echo "🚀 开始部署iMeeting后端服务..."
|
||||
|
||||
# 检查虚拟环境
|
||||
if [ ! -d "venv" ]; then
|
||||
echo "❌ 虚拟环境不存在!请先创建虚拟环境"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "✅ 发现虚拟环境,继续部署..."
|
||||
|
||||
# 停止并删除现有容器
|
||||
echo "📦 停止现有容器..."
|
||||
docker-compose -f docker-compose.prod.yml down
|
||||
|
||||
# 构建新镜像
|
||||
echo "🔨 构建Docker镜像..."
|
||||
docker-compose -f docker-compose.prod.yml build --no-cache
|
||||
|
||||
# 启动服务
|
||||
echo "▶️ 启动服务..."
|
||||
docker-compose -f docker-compose.prod.yml up -d
|
||||
|
||||
# 检查服务状态
|
||||
echo "🔍 检查服务状态..."
|
||||
sleep 10
|
||||
docker-compose -f docker-compose.prod.yml ps
|
||||
|
||||
# 检查健康状态
|
||||
echo "🏥 检查健康状态..."
|
||||
curl -f http://localhost:8001/health && echo "✅ 后端服务健康检查通过" || echo "❌ 后端服务健康检查失败"
|
||||
|
||||
echo ""
|
||||
echo "🎉 部署完成!"
|
||||
echo "🔧 后端服务访问地址: http://localhost:8001"
|
||||
echo "📊 查看日志: docker-compose -f docker-compose.prod.yml logs -f"
|
||||
echo "🛑 停止服务: docker-compose -f docker-compose.prod.yml down"
|
||||
echo ""
|
||||
echo "💡 提示:如需更新后端,请:"
|
||||
echo " 1. 修改代码后运行 ./deploy-prod.sh 重新部署"
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
version: '3.8'
|
||||
|
||||
services:
|
||||
imeeting-backend:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
ports:
|
||||
- "8001:8001"
|
||||
environment:
|
||||
# Python运行环境
|
||||
- PYTHONPATH=/app
|
||||
- PYTHONUNBUFFERED=1
|
||||
|
||||
# 数据库配置
|
||||
- DB_HOST=host.docker.internal
|
||||
- DB_USER=root
|
||||
- DB_PASSWORD=sagacity
|
||||
- DB_NAME=imeeting
|
||||
- DB_PORT=3306
|
||||
|
||||
# Redis配置
|
||||
- REDIS_HOST=host.docker.internal
|
||||
- REDIS_PORT=6379
|
||||
- REDIS_DB=6
|
||||
- REDIS_PASSWORD=
|
||||
|
||||
# API配置
|
||||
- API_HOST=0.0.0.0
|
||||
- API_PORT=8001
|
||||
|
||||
# 应用配置
|
||||
- BASE_URL=http://imeeting.unisspace.com
|
||||
|
||||
# 七牛云配置
|
||||
- QINIU_ACCESS_KEY=A0tp96HCtg-wZCughTgi5vc2pJnw3btClwxRE_e8
|
||||
- QINIU_SECRET_KEY=Lj-MSHpaVbmzpS86kMIjmwikvYOT9iPBjCk9hm6k
|
||||
- QINIU_BUCKET=imeeting
|
||||
- QINIU_DOMAIN=t0vogyxkz.hn-bkt.clouddn.com
|
||||
|
||||
# LLM配置
|
||||
- QWEN_API_KEY=sk-c2bf06ea56b4491ea3d1e37fdb472b8f
|
||||
- LLM_MODEL_NAME=qwen-plus
|
||||
- LLM_API_URL=https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation
|
||||
- LLM_MAX_TOKENS=2000
|
||||
- LLM_TEMPERATURE=0.7
|
||||
- LLM_TOP_P=0.9
|
||||
|
||||
volumes:
|
||||
# 挂载上传目录保持数据持久化
|
||||
- ./uploads:/app/uploads
|
||||
restart: unless-stopped
|
||||
container_name: imeeting-backend
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 40s
|
||||
28
main.py
28
main.py
|
|
@ -1,21 +1,22 @@
|
|||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, Request, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from app.api.endpoints import auth, users, meetings
|
||||
from app.core.config import UPLOAD_DIR, API_CONFIG
|
||||
from app.core.config import UPLOAD_DIR, API_CONFIG, MAX_FILE_SIZE
|
||||
from app.services.async_llm_service import async_llm_service
|
||||
import os
|
||||
|
||||
app = FastAPI(
|
||||
title="iMeeting API",
|
||||
description="智慧会议系统API",
|
||||
version="1.0.0"
|
||||
version="1.0.2"
|
||||
)
|
||||
|
||||
# 添加CORS中间件
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=API_CONFIG['cors_origins'],
|
||||
# allow_origins=API_CONFIG['cors_origins'],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
|
|
@ -34,5 +35,22 @@ app.include_router(meetings.router, prefix="/api", tags=["Meetings"])
|
|||
def read_root():
|
||||
return {"message": "Welcome to iMeeting API"}
|
||||
|
||||
@app.get("/health")
|
||||
def health_check():
|
||||
"""健康检查端点"""
|
||||
return {
|
||||
"status": "healthy",
|
||||
"service": "iMeeting API",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host=API_CONFIG['host'], port=API_CONFIG['port'])
|
||||
# 简单的uvicorn配置,避免参数冲突
|
||||
uvicorn.run(
|
||||
"main:app",
|
||||
host=API_CONFIG['host'],
|
||||
port=API_CONFIG['port'],
|
||||
limit_max_requests=1000,
|
||||
timeout_keep_alive=30,
|
||||
reload=True,
|
||||
)
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
# Core Application Framework
|
||||
fastapi
|
||||
uvicorn
|
||||
|
||||
# Database & Cache
|
||||
mysql-connector-python
|
||||
redis
|
||||
|
||||
# Services & External APIs
|
||||
requests
|
||||
dashscope
|
||||
PyJWT
|
||||
qiniu
|
||||
|
||||
# Validation & Forms
|
||||
email-validator
|
||||
python-multipart
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
创建一个新的测试任务
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from app.services.async_llm_service import AsyncLLMService
|
||||
|
||||
# 创建服务实例
|
||||
service = AsyncLLMService()
|
||||
|
||||
# 创建测试任务
|
||||
meeting_id = 38
|
||||
user_prompt = "请重点关注决策事项和待办任务"
|
||||
|
||||
print("创建新任务...")
|
||||
task_id = service.start_summary_generation(meeting_id, user_prompt)
|
||||
print(f"✅ 任务创建成功: {task_id}")
|
||||
|
||||
# 获取任务状态
|
||||
status = service.get_task_status(task_id)
|
||||
print(f"任务状态: {status}")
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
测试Redis连接和LLM任务队列
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
import redis
|
||||
from app.core.config import REDIS_CONFIG
|
||||
|
||||
# 连接Redis
|
||||
redis_client = redis.Redis(**REDIS_CONFIG)
|
||||
|
||||
try:
|
||||
# 测试连接
|
||||
redis_client.ping()
|
||||
print("✅ Redis连接成功")
|
||||
|
||||
# 检查任务队列
|
||||
queue_length = redis_client.llen("llm_task_queue")
|
||||
print(f"📋 当前任务队列长度: {queue_length}")
|
||||
|
||||
# 检查所有LLM任务
|
||||
keys = redis_client.keys("llm_task:*")
|
||||
print(f"📊 当前存在的LLM任务: {len(keys)} 个")
|
||||
|
||||
for key in keys:
|
||||
task_data = redis_client.hgetall(key)
|
||||
# key可能是bytes或str
|
||||
if isinstance(key, bytes):
|
||||
task_id = key.decode('utf-8').replace('llm_task:', '')
|
||||
else:
|
||||
task_id = key.replace('llm_task:', '')
|
||||
|
||||
# 获取状态和进度
|
||||
status = task_data.get(b'status', task_data.get('status', 'unknown'))
|
||||
if isinstance(status, bytes):
|
||||
status = status.decode('utf-8')
|
||||
|
||||
progress = task_data.get(b'progress', task_data.get('progress', '0'))
|
||||
if isinstance(progress, bytes):
|
||||
progress = progress.decode('utf-8')
|
||||
|
||||
print(f" - 任务 {task_id[:8]}... 状态: {status}, 进度: {progress}%")
|
||||
|
||||
# 如果任务是pending,重新推送到队列
|
||||
if status == 'pending':
|
||||
print(f" 🔄 发现pending任务,重新推送到队列...")
|
||||
redis_client.lpush("llm_task_queue", task_id)
|
||||
print(f" ✅ 任务 {task_id[:8]}... 已重新推送到队列")
|
||||
|
||||
except redis.ConnectionError as e:
|
||||
print(f"❌ Redis连接失败: {e}")
|
||||
except Exception as e:
|
||||
print(f"❌ 错误: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
测试worker线程是否正常工作
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from app.services.async_llm_service import AsyncLLMService
|
||||
|
||||
# 创建服务实例
|
||||
service = AsyncLLMService()
|
||||
|
||||
# 直接调用处理任务方法测试
|
||||
print("测试直接调用_process_tasks方法...")
|
||||
|
||||
# 设置worker_running为True
|
||||
service.worker_running = True
|
||||
|
||||
# 创建线程并启动
|
||||
thread = threading.Thread(target=service._process_tasks)
|
||||
thread.daemon = False # 不设置为daemon,确保能看到输出
|
||||
thread.start()
|
||||
|
||||
print(f"线程是否活动: {thread.is_alive()}")
|
||||
print("等待5秒...")
|
||||
|
||||
# 等待一段时间
|
||||
time.sleep(5)
|
||||
|
||||
# 停止worker
|
||||
service.worker_running = False
|
||||
thread.join(timeout=10)
|
||||
|
||||
print("测试完成")
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue