""" 音频处理服务 处理已保存的完整音频文件:数据库更新、转录、自动总结 """ from fastapi import BackgroundTasks from app.core.database import get_db_connection from app.core.response import create_api_response from app.core.config import BASE_DIR from app.services.async_transcription_service import AsyncTranscriptionService from app.services.async_meeting_service import async_meeting_service from pathlib import Path import os transcription_service = AsyncTranscriptionService() def handle_audio_upload( file_path: str, file_name: str, file_size: int, meeting_id: int, current_user: dict, auto_summarize: bool = True, background_tasks: BackgroundTasks = None, prompt_id: int = None ) -> dict: """ 处理已保存的完整音频文件 职责: 1. 权限检查 2. 检查已有文件和转录记录 3. 更新数据库(audio_files 表) 4. 启动转录任务 5. 可选启动自动总结监控 Args: file_path: 已保存的文件路径(相对于 BASE_DIR 的路径,如 /uploads/audio/123/xxx.webm) file_name: 原始文件名 file_size: 文件大小(字节) meeting_id: 会议ID current_user: 当前用户信息 auto_summarize: 是否自动生成总结(默认True) background_tasks: FastAPI 后台任务对象 prompt_id: 提示词模版ID(可选,如果不指定则使用默认模版) Returns: dict: { "success": bool, # 是否成功 "response": dict, # 如果需要返回,这里是响应数据 "file_info": dict, # 文件信息 (成功时) "transcription_task_id": str, # 转录任务ID (成功时) "replaced_existing": bool, # 是否替换了现有文件 (成功时) "has_transcription": bool # 原来是否有转录记录 (成功时) } """ print(f"[Audio Service] handle_audio_upload called - Meeting ID: {meeting_id}, Auto-summarize: {auto_summarize}, Received prompt_id: {prompt_id}, Type: {type(prompt_id)}") # 1. 权限和已有文件检查 try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 检查会议是否存在及权限 cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() if not meeting: return { "success": False, "response": create_api_response(code="404", message="会议不存在") } if meeting['user_id'] != current_user['user_id']: return { "success": False, "response": create_api_response(code="403", message="无权限操作此会议") } # 检查已有音频文件 cursor.execute( "SELECT file_name, file_path, upload_time FROM audio_files WHERE meeting_id = %s", (meeting_id,) ) existing_info = cursor.fetchone() # 检查是否有转录记录 has_transcription = False if existing_info: cursor.execute( "SELECT COUNT(*) as segment_count FROM transcript_segments WHERE meeting_id = %s", (meeting_id,) ) has_transcription = cursor.fetchone()['segment_count'] > 0 cursor.close() except Exception as e: return { "success": False, "response": create_api_response(code="500", message=f"检查已有文件失败: {str(e)}") } # 2. 删除旧的音频文件(如果存在) replaced_existing = existing_info is not None if replaced_existing and existing_info['file_path']: old_file_path = BASE_DIR / existing_info['file_path'].lstrip('/') if old_file_path.exists(): try: os.remove(old_file_path) print(f"Deleted old audio file: {old_file_path}") except Exception as e: print(f"Warning: Failed to delete old file {old_file_path}: {e}") transcription_task_id = None try: # 3. 更新数据库记录 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) if replaced_existing: cursor.execute( 'UPDATE audio_files SET file_name = %s, file_path = %s, file_size = %s, upload_time = NOW(), task_id = NULL WHERE meeting_id = %s', (file_name, file_path, file_size, meeting_id) ) else: cursor.execute( 'INSERT INTO audio_files (meeting_id, file_name, file_path, file_size, upload_time) VALUES (%s, %s, %s, %s, NOW())', (meeting_id, file_name, file_path, file_size) ) connection.commit() cursor.close() # 4. 启动转录任务 try: transcription_task_id = transcription_service.start_transcription(meeting_id, file_path) print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}") # 5. 如果启用自动总结且提供了 background_tasks,添加监控任务 if auto_summarize and transcription_task_id and background_tasks: background_tasks.add_task( async_meeting_service.monitor_and_auto_summarize, meeting_id, transcription_task_id, prompt_id # 传递 prompt_id 给自动总结监控任务 ) print(f"[audio_service] Auto-summarize enabled, monitor task added for meeting {meeting_id}, prompt_id: {prompt_id}") except Exception as e: print(f"Failed to start transcription: {e}") raise except Exception as e: # 出错时的处理(文件已保存,不删除) return { "success": False, "response": create_api_response(code="500", message=f"处理失败: {str(e)}") } # 6. 返回成功结果 return { "success": True, "file_info": { "file_name": file_name, "file_path": file_path, "file_size": file_size }, "transcription_task_id": transcription_task_id, "replaced_existing": replaced_existing, "has_transcription": has_transcription }