diff --git a/.DS_Store b/.DS_Store index 96a084f..0bd8332 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..c5bd5d0 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,19 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: FastAPI", + "type": "debugpy", + "request": "launch", + "module": "uvicorn", + "args": [ + "main:app", + "--reload" + ], + "jinja": true + } + ] +} \ No newline at end of file diff --git a/app/api/endpoints/meetings.py b/app/api/endpoints/meetings.py index e7769dd..d325eef 100644 --- a/app/api/endpoints/meetings.py +++ b/app/api/endpoints/meetings.py @@ -1,12 +1,14 @@ -from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends +from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends, BackgroundTasks from app.models.models import Meeting, TranscriptSegment, TranscriptionTaskStatus, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, TranscriptUpdateRequest, BatchTranscriptUpdateRequest from app.core.database import get_db_connection from app.core.config import BASE_DIR, UPLOAD_DIR, AUDIO_DIR, MARKDOWN_DIR, ALLOWED_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS, MAX_FILE_SIZE, MAX_IMAGE_SIZE from app.services.qiniu_service import qiniu_service from app.services.llm_service import LLMService from app.services.async_transcription_service import AsyncTranscriptionService +from app.services.async_llm_service import async_llm_service from app.core.auth import get_current_user, get_optional_current_user from typing import Optional +from datetime import datetime from pydantic import BaseModel import os import uuid @@ -18,6 +20,9 @@ router = APIRouter() llm_service = LLMService() transcription_service = AsyncTranscriptionService() +# 注意:异步LLM服务需要单独启动worker进程 +# 运行命令:python llm_worker.py + # 请求模型 class GenerateSummaryRequest(BaseModel): user_prompt: Optional[str] = "" @@ -179,14 +184,15 @@ def create_meeting(meeting_request: CreateMeetingRequest, current_user: dict = D # Create meeting meeting_query = ''' - INSERT INTO meetings (user_id, title, meeting_time, summary) - VALUES (%s, %s, %s, %s) + INSERT INTO meetings (user_id, title, meeting_time, summary,created_at) + VALUES (%s, %s, %s, %s, %s) ''' cursor.execute(meeting_query, ( meeting_request.user_id, meeting_request.title, meeting_request.meeting_time, - None # summary starts as None + None, + datetime.now().isoformat() )) meeting_id = cursor.lastrowid @@ -268,42 +274,6 @@ def delete_meeting(meeting_id: int, current_user: dict = Depends(get_current_use connection.commit() return {"message": "Meeting deleted successfully"} -@router.post("/meetings/{meeting_id}/regenerate-summary") -def regenerate_summary(meeting_id: int, current_user: dict = Depends(get_current_user)): - with get_db_connection() as connection: - cursor = connection.cursor(dictionary=True) - - # Check if meeting exists - cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) - if not cursor.fetchone(): - raise HTTPException(status_code=404, detail="Meeting not found") - - # For now, return a mock summary - # In a real implementation, this would call an AI service - mock_summary = """# AI 生成摘要 - -## 主要议题 -- 项目进度回顾 -- 技术方案讨论 -- 下阶段规划 - -## 关键决策 -- 采用新的技术架构 -- 调整项目时间节点 -- 分配任务责任 - -## 后续行动 -- [ ] 完成技术方案文档 -- [ ] 安排下次会议时间 -- [ ] 跟进项目进度""" - - # Update meeting summary - update_query = "UPDATE meetings SET summary = %s WHERE meeting_id = %s" - cursor.execute(update_query, (mock_summary, meeting_id)) - connection.commit() - - return {"message": "Summary regenerated successfully", "summary": mock_summary} - @router.get("/meetings/{meeting_id}/edit", response_model=Meeting) def get_meeting_for_edit(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议信息用于编辑""" @@ -834,4 +804,105 @@ def get_summary_detail(meeting_id: int, summary_id: int, current_user: dict = De except HTTPException: raise except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to get summary detail: {str(e)}") \ No newline at end of file + raise HTTPException(status_code=500, detail=f"Failed to get summary detail: {str(e)}") + +# ==================== 异步LLM总结相关接口 ==================== + +@router.post("/meetings/{meeting_id}/generate-summary-async") +def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequest, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user)): + """生成会议AI总结(异步版本)""" + try: + # 检查会议是否存在 + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) + if not cursor.fetchone(): + raise HTTPException(status_code=404, detail="Meeting not found") + + # 启动异步任务 + task_id = async_llm_service.start_summary_generation(meeting_id, request.user_prompt) + + # 将真正的处理函数作为后台任务添加 + background_tasks.add_task(async_llm_service._process_task, task_id) + + return { + "message": "Summary generation task has been accepted.", + "task_id": task_id, + "status": "pending", + "meeting_id": meeting_id + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start summary generation: {str(e)}") + +@router.get("/llm-tasks/{task_id}/status") +def get_llm_task_status(task_id: str, current_user: dict = Depends(get_current_user)): + """获取LLM任务状态(包括进度)""" + try: + status = async_llm_service.get_task_status(task_id) + + if status.get('status') == 'not_found': + raise HTTPException(status_code=404, detail="Task not found") + + return status + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") + +@router.get("/meetings/{meeting_id}/llm-tasks") +def get_meeting_llm_tasks(meeting_id: int, current_user: dict = Depends(get_current_user)): + """获取会议的所有LLM任务历史""" + try: + # 检查会议是否存在 + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) + if not cursor.fetchone(): + raise HTTPException(status_code=404, detail="Meeting not found") + + # 获取任务列表 + tasks = async_llm_service.get_meeting_llm_tasks(meeting_id) + + return { + "meeting_id": meeting_id, + "tasks": tasks, + "total": len(tasks) + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get LLM tasks: {str(e)}") + +@router.get("/meetings/{meeting_id}/latest-llm-task") +def get_latest_llm_task(meeting_id: int, current_user: dict = Depends(get_current_user)): + """获取会议最新的LLM任务状态""" + try: + tasks = async_llm_service.get_meeting_llm_tasks(meeting_id) + + if not tasks: + return { + "meeting_id": meeting_id, + "task": None, + "message": "No LLM tasks found for this meeting" + } + + # 返回最新的任务 + latest_task = tasks[0] + + # 如果任务还在进行中,获取实时状态 + if latest_task['status'] in ['pending', 'processing']: + latest_status = async_llm_service.get_task_status(latest_task['task_id']) + latest_task.update(latest_status) + + return { + "meeting_id": meeting_id, + "task": latest_task + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get latest LLM task: {str(e)}") \ No newline at end of file diff --git a/app/services/async_llm_service.py b/app/services/async_llm_service.py new file mode 100644 index 0000000..f26964e --- /dev/null +++ b/app/services/async_llm_service.py @@ -0,0 +1,215 @@ +""" +异步LLM服务 - 处理会议总结生成的异步任务 +采用FastAPI BackgroundTasks模式 +""" +import uuid +import time +from datetime import datetime +from typing import Optional, Dict, Any, List + +import redis +from app.core.config import REDIS_CONFIG +from app.core.database import get_db_connection +from app.services.llm_service import LLMService + +class AsyncLLMService: + """异步LLM服务类 - 采用FastAPI BackgroundTasks模式""" + + def __init__(self): + # 确保redis客户端自动解码响应,代码更简洁 + if 'decode_responses' not in REDIS_CONFIG: + REDIS_CONFIG['decode_responses'] = True + self.redis_client = redis.Redis(**REDIS_CONFIG) + self.llm_service = LLMService() # 复用现有的同步LLM服务 + + def start_summary_generation(self, meeting_id: int, user_prompt: str = "") -> str: + """ + 创建异步总结任务,任务的执行将由外部(如API层的BackgroundTasks)触发。 + + Args: + meeting_id: 会议ID + user_prompt: 用户额外提示词 + + Returns: + str: 任务ID + """ + try: + task_id = str(uuid.uuid4()) + + # 在数据库中创建任务记录 + self._save_task_to_db(task_id, meeting_id, user_prompt) + + # 将任务详情存入Redis,用于快速查询状态 + current_time = datetime.now().isoformat() + task_data = { + 'task_id': task_id, + 'meeting_id': str(meeting_id), + 'user_prompt': user_prompt, + 'status': 'pending', + 'progress': '0', + 'created_at': current_time, + 'updated_at': current_time + } + self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data) + self.redis_client.expire(f"llm_task:{task_id}", 86400) + + print(f"LLM summary task created: {task_id} for meeting: {meeting_id}") + return task_id + + except Exception as e: + print(f"Error starting summary generation: {e}") + raise e + + def _process_task(self, task_id: str): + """ + 处理单个异步任务的函数,设计为由BackgroundTasks调用。 + """ + print(f"Background task started for LLM task: {task_id}") + try: + # 从Redis获取任务数据 + task_data = self.redis_client.hgetall(f"llm_task:{task_id}") + if not task_data: + print(f"Error: Task {task_id} not found in Redis for processing.") + return + + meeting_id = int(task_data['meeting_id']) + user_prompt = task_data.get('user_prompt', '') + + # 1. 更新状态为processing + self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...") + + # 2. 获取会议转录内容 + self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...") + transcript_text = self.llm_service._get_meeting_transcript(meeting_id) + if not transcript_text: + raise Exception("无法获取会议转录内容") + + # 3. 构建提示词 + self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...") + full_prompt = self.llm_service._build_prompt(transcript_text, user_prompt) + + # 4. 调用LLM API + self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...") + summary_content = self.llm_service._call_llm_api(full_prompt) + if not summary_content: + raise Exception("LLM API调用失败或返回空内容") + + # 5. 保存结果到主表 + self._update_task_status_in_redis(task_id, 'processing', 95, message="保存总结结果...") + self.llm_service._save_summary_to_db(meeting_id, summary_content, user_prompt) + + # 6. 任务完成 + self._update_task_in_db(task_id, 'completed', 100, result=summary_content) + self._update_task_status_in_redis(task_id, 'completed', 100, result=summary_content) + print(f"Task {task_id} completed successfully") + + except Exception as e: + error_msg = str(e) + print(f"Task {task_id} failed: {error_msg}") + # 更新失败状态 + self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg) + self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg) + + # --- 状态查询和数据库操作方法 --- + + def get_task_status(self, task_id: str) -> Dict[str, Any]: + """获取任务状态""" + try: + task_data = self.redis_client.hgetall(f"llm_task:{task_id}") + if not task_data: + task_data = self._get_task_from_db(task_id) + if not task_data: + return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'} + + return { + 'task_id': task_id, + 'status': task_data.get('status', 'unknown'), + 'progress': int(task_data.get('progress', 0)), + 'meeting_id': int(task_data.get('meeting_id', 0)), + 'created_at': task_data.get('created_at'), + 'updated_at': task_data.get('updated_at'), + 'result': task_data.get('result'), + 'error_message': task_data.get('error_message') + } + except Exception as e: + print(f"Error getting task status: {e}") + return {'task_id': task_id, 'status': 'error', 'error_message': str(e)} + + def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]: + """获取会议的所有LLM任务""" + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + query = "SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC" + cursor.execute(query, (meeting_id,)) + tasks = cursor.fetchall() + for task in tasks: + if task.get('created_at'): task['created_at'] = task['created_at'].isoformat() + if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat() + return tasks + except Exception as e: + print(f"Error getting meeting LLM tasks: {e}") + return [] + + def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None): + """更新Redis中的任务状态""" + try: + update_data = { + 'status': status, + 'progress': str(progress), + 'updated_at': datetime.now().isoformat() + } + if message: update_data['message'] = message + if result: update_data['result'] = result + if error_message: update_data['error_message'] = error_message + self.redis_client.hset(f"llm_task:{task_id}", mapping=update_data) + except Exception as e: + print(f"Error updating task status in Redis: {e}") + + def _save_task_to_db(self, task_id: str, meeting_id: int, user_prompt: str): + """保存任务到数据库""" + try: + with get_db_connection() as connection: + cursor = connection.cursor() + insert_query = "INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, status, progress, created_at) VALUES (%s, %s, %s, 'pending', 0, NOW())" + cursor.execute(insert_query, (task_id, meeting_id, user_prompt)) + connection.commit() + except Exception as e: + print(f"Error saving task to database: {e}") + raise + + def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None): + """更新数据库中的任务状态""" + try: + with get_db_connection() as connection: + cursor = connection.cursor() + params = [status, progress, error_message, task_id] + if status == 'completed': + query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s, result = %s, completed_at = NOW() WHERE task_id = %s" + params.insert(2, result) + else: + query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s" + + cursor.execute(query, tuple(params)) + connection.commit() + except Exception as e: + print(f"Error updating task in database: {e}") + + def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]: + """从数据库获取任务信息""" + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + query = "SELECT * FROM llm_tasks WHERE task_id = %s" + cursor.execute(query, (task_id,)) + task = cursor.fetchone() + if task: + # 确保所有字段都是字符串,以匹配Redis的行为 + return {k: v.isoformat() if isinstance(v, datetime) else str(v) for k, v in task.items()} + return None + except Exception as e: + print(f"Error getting task from database: {e}") + return None + +# 创建全局实例 +async_llm_service = AsyncLLMService() diff --git a/app/services/async_transcription_service.py b/app/services/async_transcription_service.py index c3cdafa..1db34f5 100644 --- a/app/services/async_transcription_service.py +++ b/app/services/async_transcription_service.py @@ -56,6 +56,7 @@ class AsyncTranscriptionService: business_task_id = str(uuid.uuid4()) # 在Redis中存储任务映射 + current_time = datetime.now().isoformat() task_data = { 'business_task_id': business_task_id, 'paraformer_task_id': paraformer_task_id, @@ -63,8 +64,8 @@ class AsyncTranscriptionService: 'file_url': file_url, 'status': 'pending', 'progress': '0', - 'created_at': datetime.now().isoformat(), - 'updated_at': datetime.now().isoformat() + 'created_at': current_time, + 'updated_at': current_time } # 存储到Redis,过期时间24小时 @@ -72,7 +73,7 @@ class AsyncTranscriptionService: self.redis_client.expire(f"task:{business_task_id}", 86400) # 在数据库中创建任务记录 - self._save_task_to_db(business_task_id, meeting_id, audio_file_path) + self._save_task_to_db(business_task_id, paraformer_task_id, meeting_id, audio_file_path) print(f"Transcription task created: {business_task_id}") return business_task_id @@ -91,68 +92,106 @@ class AsyncTranscriptionService: Returns: Dict: 任务状态信息 """ + task_data = None + current_status = 'failed' + progress = 0 + error_message = "An unknown error occurred." + try: - # 从Redis获取任务信息 - task_data = self.redis_client.hgetall(f"task:{business_task_id}") - if not task_data: - # 尝试从数据库获取 - task_data = self._get_task_from_db(business_task_id) - if not task_data: - raise Exception("Task not found") - + # 1. 获取任务数据(优先Redis,回源DB) + task_data = self._get_task_data(business_task_id) paraformer_task_id = task_data['paraformer_task_id'] - - # 查询Paraformer任务状态 - paraformer_response = Transcription.fetch(task=paraformer_task_id) - - if paraformer_response.status_code != HTTPStatus.OK: - print(f"Failed to fetch task status: {paraformer_response.message}") - current_status = 'failed' - progress = 0 - error_message = paraformer_response.message - else: - # 映射Paraformer状态到业务状态 + + # 2. 查询外部API获取状态 + try: + paraformer_response = Transcription.fetch(task=paraformer_task_id) + if paraformer_response.status_code != HTTPStatus.OK: + raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}") + paraformer_status = paraformer_response.output.task_status current_status = self._map_paraformer_status(paraformer_status) progress = self._calculate_progress(paraformer_status) - error_message = None - - # 如果任务完成,处理结果 - if current_status == 'completed' and paraformer_response.output.get('results'): + error_message = None #执行成功,清除初始状态 + + except Exception as e: + current_status = 'failed' + progress = 0 + error_message = f"Error fetching status from provider: {e}" + # 直接进入finally块更新状态后返回 + return + + # 3. 如果任务完成,处理结果 + if current_status == 'completed' and paraformer_response.output.get('results'): + try: self._process_transcription_result( business_task_id, int(task_data['meeting_id']), paraformer_response.output ) + except Exception as e: + current_status = 'failed' + progress = 100 # 进度为100,但状态是失败 + error_message = f"Error processing transcription result: {e}" + print(error_message) + + except Exception as e: + error_message = f"Error getting task status: {e}" + print(error_message) + current_status = 'failed' + progress = 0 + + finally: + # 4. 更新Redis和数据库状态 + updated_at = datetime.now().isoformat() - # 更新Redis中的状态 + # 更新Redis update_data = { 'status': current_status, 'progress': str(progress), - 'updated_at': datetime.now().isoformat() + 'updated_at': updated_at } if error_message: update_data['error_message'] = error_message - self.redis_client.hset(f"task:{business_task_id}", mapping=update_data) - - # 更新数据库中的状态 + + # 更新数据库 self._update_task_status_in_db(business_task_id, current_status, progress, error_message) - - return { + + # 5. 构造并返回最终结果 + result = { 'task_id': business_task_id, 'status': current_status, 'progress': progress, - 'meeting_id': int(task_data['meeting_id']), - 'created_at': task_data.get('created_at'), - 'updated_at': update_data['updated_at'], - 'error_message': error_message + 'error_message': error_message, + 'updated_at': updated_at, + 'meeting_id': None, + 'created_at': None, } + if task_data: + result['meeting_id'] = int(task_data['meeting_id']) + result['created_at'] = task_data.get('created_at') - except Exception as e: - print(f"Error getting task status: {e}") - raise e - + return result + + def _get_task_data(self, business_task_id: str) -> Dict[str, Any]: + """从Redis或数据库获取任务数据""" + # 尝试从Redis获取 + task_data_bytes = self.redis_client.hgetall(f"task:{business_task_id}") + if task_data_bytes and task_data_bytes.get(b'paraformer_task_id'): + # Redis返回的是bytes,需要解码 + return {k.decode('utf-8'): v.decode('utf-8') for k, v in task_data_bytes.items()} + + # 如果Redis没有,从数据库回源 + task_data_from_db = self._get_task_from_db(business_task_id) + if not task_data_from_db or not task_data_from_db.get('paraformer_task_id'): + raise Exception("Task not found in DB or paraformer_task_id is missing") + + # 将从DB获取的数据缓存回Redis + self.redis_client.hset(f"task:{business_task_id}", mapping=task_data_from_db) + self.redis_client.expire(f"task:{business_task_id}", 86400) + + return task_data_from_db + def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]: """ 获取会议的转录任务状态 @@ -187,11 +226,9 @@ class AsyncTranscriptionService: # 如果任务还在进行中,获取最新状态 if task_record['status'] in ['pending', 'processing']: try: - latest_status = self.get_task_status(task_record['task_id']) - return latest_status - except Exception: - # 如果获取最新状态失败,返回数据库中的状态 - pass + return self.get_task_status(task_record['task_id']) + except Exception as e: + print(f"Failed to get latest task status for meeting {meeting_id}, returning DB status. Error: {e}") return { 'task_id': task_record['task_id'], @@ -220,37 +257,28 @@ class AsyncTranscriptionService: def _calculate_progress(self, paraformer_status: str) -> int: """根据Paraformer状态计算进度""" progress_mapping = { - 'PENDING': 0, + 'PENDING': 10, 'RUNNING': 50, 'SUCCEEDED': 100, 'FAILED': 0 } return progress_mapping.get(paraformer_status, 0) - def _save_task_to_db(self, business_task_id: str, meeting_id: int, audio_file_path: str): + def _save_task_to_db(self, business_task_id: str, paraformer_task_id: str, meeting_id: int, audio_file_path: str): """保存任务记录到数据库""" try: with get_db_connection() as connection: cursor = connection.cursor() - # 更新audio_files表关联task_id - update_audio_query = """ - UPDATE audio_files - SET task_id = %s - WHERE meeting_id = %s AND file_path = %s - """ - cursor.execute(update_audio_query, (business_task_id, meeting_id, audio_file_path)) - # 插入转录任务记录 insert_task_query = """ - INSERT INTO transcript_tasks (task_id, meeting_id, status, progress, created_at) - VALUES (%s, %s, 'pending', 0, NOW()) + INSERT INTO transcript_tasks (task_id, paraformer_task_id, meeting_id, status, progress, created_at) + VALUES (%s, %s, %s, 'pending', 0, NOW()) """ - cursor.execute(insert_task_query, (business_task_id, meeting_id)) + cursor.execute(insert_task_query, (business_task_id, paraformer_task_id, meeting_id)) connection.commit() - cursor.close() # 明确关闭游标 - print(f"Task record saved to database: {business_task_id}") + cursor.close() except Exception as e: print(f"Error saving task to database: {e}") @@ -262,6 +290,7 @@ class AsyncTranscriptionService: with get_db_connection() as connection: cursor = connection.cursor() + params = [status, progress, error_message, business_task_id] if status == 'completed': update_query = """ UPDATE transcript_tasks @@ -275,9 +304,9 @@ class AsyncTranscriptionService: WHERE task_id = %s """ - cursor.execute(update_query, (status, progress, error_message, business_task_id)) + cursor.execute(update_query, tuple(params)) connection.commit() - cursor.close() # 明确关闭游标 + cursor.close() except Exception as e: print(f"Error updating task status in database: {e}") @@ -289,21 +318,22 @@ class AsyncTranscriptionService: cursor = connection.cursor(dictionary=True) query = """ - SELECT tt.task_id as business_task_id, tt.meeting_id, tt.status + SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.created_at FROM transcript_tasks tt WHERE tt.task_id = %s """ cursor.execute(query, (business_task_id,)) result = cursor.fetchone() - cursor.close() # 明确关闭游标 + cursor.close() if result: - # 转换为字符串格式以保持一致性 + # 转换为与Redis一致的字符串格式 return { 'business_task_id': result['business_task_id'], - 'paraformer_task_id': '', # 数据库中没有存储,需要从Redis获取 + 'paraformer_task_id': result['paraformer_task_id'], 'meeting_id': str(result['meeting_id']), - 'status': result['status'] + 'status': result['status'], + 'created_at': result['created_at'].isoformat() if result['created_at'] else None } return None @@ -312,11 +342,13 @@ class AsyncTranscriptionService: return None def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any): - """处理转录结果""" + """ + 处理转录结果. + 如果处理失败,此函数会抛出异常. + """ try: if not paraformer_output.get('results'): - print("No transcription results found in the response.") - return + raise Exception("No transcription results found in the provider response.") transcription_url = paraformer_output['results'][0]['transcription_url'] print(f"Fetching transcription from URL: {transcription_url}") @@ -331,7 +363,9 @@ class AsyncTranscriptionService: print(f"Transcription result processed for task: {business_task_id}") except Exception as e: - print(f"Error processing transcription result: {e}") + # 记录具体错误并重新抛出,以便上层捕获 + print(f"Error processing transcription result for task {business_task_id}: {e}") + raise def _save_segments_to_db(self, data: dict, meeting_id: int): """保存转录分段到数据库""" @@ -368,8 +402,9 @@ class AsyncTranscriptionService: ''' cursor.executemany(insert_query, segments_to_insert) connection.commit() - cursor.close() # 明确关闭游标 + cursor.close() print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}") except Exception as e: - print(f"Database error when saving segments: {e}") \ No newline at end of file + print(f"Database error when saving segments: {e}") + raise diff --git a/main.py b/main.py index 6b02c71..8d48397 100644 --- a/main.py +++ b/main.py @@ -4,27 +4,15 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from app.api.endpoints import auth, users, meetings from app.core.config import UPLOAD_DIR, API_CONFIG, MAX_FILE_SIZE +from app.services.async_llm_service import async_llm_service import os app = FastAPI( title="iMeeting API", description="智慧会议系统API", - version="1.0.0" + version="1.0.2" ) -# 添加请求体大小检查中间件 -@app.middleware("http") -async def check_content_size(request: Request, call_next): - # 检查Content-Length头 - content_length = request.headers.get("content-length") - if content_length: - content_length = int(content_length) - if content_length > MAX_FILE_SIZE: - raise HTTPException(status_code=413, detail=f"Request entity too large. Maximum size is {MAX_FILE_SIZE//1024//1024}MB") - - response = await call_next(request) - return response - # 添加CORS中间件 app.add_middleware( CORSMiddleware, @@ -65,8 +53,4 @@ if __name__ == "__main__": limit_max_requests=1000, timeout_keep_alive=30, reload=True, - # 设置更大的请求体限制以支持音频文件上传 - h11_max_incomplete_event_size=104857600, # 100MB - - ) \ No newline at end of file diff --git a/requirements-prod.txt b/requirements-prod.txt index e8230bc..0cbca48 100644 --- a/requirements-prod.txt +++ b/requirements-prod.txt @@ -1,58 +1,17 @@ -aiohappyeyeballs==2.6.1 -aiohttp==3.12.15 -aiosignal==1.4.0 -annotated-types==0.7.0 -anyio==3.7.1 -async-timeout==5.0.1 -attrs==25.3.0 -bcrypt==4.3.0 -certifi==2025.8.3 -cffi==1.17.1 -charset-normalizer==3.4.3 -click==8.1.8 -cryptography==45.0.6 -dashscope==1.24.1 -distro==1.9.0 -dnspython==2.7.0 -ecdsa==0.19.1 -email_validator==2.2.0 -exceptiongroup==1.3.0 -fastapi==0.104.1 -frozenlist==1.7.0 -h11==0.16.0 -httpcore==1.0.9 -httptools==0.6.4 -httpx==0.28.1 -idna==3.10 -jiter==0.10.0 -multidict==6.6.4 -mysql-connector-python==8.2.0 -openai==1.99.9 -passlib==1.7.4 -propcache==0.3.2 -protobuf==4.21.12 -pyasn1==0.6.1 -pycparser==2.22 -pydantic==2.5.0 -pydantic_core==2.14.1 -PyJWT==2.10.1 -python-dotenv==1.1.1 -python-jose==3.5.0 -python-multipart==0.0.6 -PyYAML==6.0.2 -qiniu==7.17.0 -redis==6.4.0 -requests==2.32.4 -rsa==4.9.1 -six==1.17.0 -sniffio==1.3.1 -starlette==0.27.0 -tqdm==4.67.1 -typing_extensions==4.14.1 -urllib3==2.5.0 -uvicorn==0.24.0 -uvloop==0.21.0 -watchfiles==1.1.0 -websocket-client==1.8.0 -websockets==15.0.1 -yarl==1.20.1 +# Core Application Framework +fastapi +uvicorn + +# Database & Cache +mysql-connector-python +redis + +# Services & External APIs +requests +dashscope +PyJWT +qiniu + +# Validation & Forms +email-validator +python-multipart \ No newline at end of file diff --git a/test/test_create_task.py b/test/test_create_task.py new file mode 100644 index 0000000..87228de --- /dev/null +++ b/test/test_create_task.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +""" +创建一个新的测试任务 +""" +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.services.async_llm_service import AsyncLLMService + +# 创建服务实例 +service = AsyncLLMService() + +# 创建测试任务 +meeting_id = 38 +user_prompt = "请重点关注决策事项和待办任务" + +print("创建新任务...") +task_id = service.start_summary_generation(meeting_id, user_prompt) +print(f"✅ 任务创建成功: {task_id}") + +# 获取任务状态 +status = service.get_task_status(task_id) +print(f"任务状态: {status}") \ No newline at end of file diff --git a/test/test_redis_llm.py b/test/test_redis_llm.py new file mode 100644 index 0000000..3ce9ef8 --- /dev/null +++ b/test/test_redis_llm.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +测试Redis连接和LLM任务队列 +""" +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import redis +from app.core.config import REDIS_CONFIG + +# 连接Redis +redis_client = redis.Redis(**REDIS_CONFIG) + +try: + # 测试连接 + redis_client.ping() + print("✅ Redis连接成功") + + # 检查任务队列 + queue_length = redis_client.llen("llm_task_queue") + print(f"📋 当前任务队列长度: {queue_length}") + + # 检查所有LLM任务 + keys = redis_client.keys("llm_task:*") + print(f"📊 当前存在的LLM任务: {len(keys)} 个") + + for key in keys: + task_data = redis_client.hgetall(key) + # key可能是bytes或str + if isinstance(key, bytes): + task_id = key.decode('utf-8').replace('llm_task:', '') + else: + task_id = key.replace('llm_task:', '') + + # 获取状态和进度 + status = task_data.get(b'status', task_data.get('status', 'unknown')) + if isinstance(status, bytes): + status = status.decode('utf-8') + + progress = task_data.get(b'progress', task_data.get('progress', '0')) + if isinstance(progress, bytes): + progress = progress.decode('utf-8') + + print(f" - 任务 {task_id[:8]}... 状态: {status}, 进度: {progress}%") + + # 如果任务是pending,重新推送到队列 + if status == 'pending': + print(f" 🔄 发现pending任务,重新推送到队列...") + redis_client.lpush("llm_task_queue", task_id) + print(f" ✅ 任务 {task_id[:8]}... 已重新推送到队列") + +except redis.ConnectionError as e: + print(f"❌ Redis连接失败: {e}") +except Exception as e: + print(f"❌ 错误: {e}") + import traceback + traceback.print_exc() \ No newline at end of file diff --git a/test/test_worker_thread.py b/test/test_worker_thread.py new file mode 100644 index 0000000..3c0c8c2 --- /dev/null +++ b/test/test_worker_thread.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +""" +测试worker线程是否正常工作 +""" +import sys +import os +import time +import threading +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.services.async_llm_service import AsyncLLMService + +# 创建服务实例 +service = AsyncLLMService() + +# 直接调用处理任务方法测试 +print("测试直接调用_process_tasks方法...") + +# 设置worker_running为True +service.worker_running = True + +# 创建线程并启动 +thread = threading.Thread(target=service._process_tasks) +thread.daemon = False # 不设置为daemon,确保能看到输出 +thread.start() + +print(f"线程是否活动: {thread.is_alive()}") +print("等待5秒...") + +# 等待一段时间 +time.sleep(5) + +# 停止worker +service.worker_running = False +thread.join(timeout=10) + +print("测试完成") \ No newline at end of file diff --git a/uploads/.DS_Store b/uploads/.DS_Store index 2396b3c..e245fc0 100644 Binary files a/uploads/.DS_Store and b/uploads/.DS_Store differ diff --git a/uploads/audio/.DS_Store b/uploads/audio/.DS_Store index da691ba..06da376 100644 Binary files a/uploads/audio/.DS_Store and b/uploads/audio/.DS_Store differ diff --git a/uploads/audio/38/c08b25f9-6029-4495-ad5d-19512bbc10a2.mp3 b/uploads/audio/38/c08b25f9-6029-4495-ad5d-19512bbc10a2.mp3 deleted file mode 100644 index 0fd96c9..0000000 Binary files a/uploads/audio/38/c08b25f9-6029-4495-ad5d-19512bbc10a2.mp3 and /dev/null differ