diff --git a/.DS_Store b/.DS_Store index 0145aae..350e4a8 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/app.zip b/app.zip index 6af80b6..a857980 100644 Binary files a/app.zip and b/app.zip differ diff --git a/app/core/config.py b/app/core/config.py index d8c4e3a..fd594a1 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -46,7 +46,7 @@ API_CONFIG = { # 七牛云配置 QINIU_ACCESS_KEY = os.getenv('QINIU_ACCESS_KEY', 'A0tp96HCtg-wZCughTgi5vc2pJnw3btClwxRE_e8') QINIU_SECRET_KEY = os.getenv('QINIU_SECRET_KEY', 'Lj-MSHpaVbmzpS86kMIjmwikvYOT9iPBjCk9hm6k') -QINIU_BUCKET = os.getenv('QINIU_BUCKET', 'imeeting') +QINIU_BUCKET = os.getenv('QINIU_BUCKET', 'imeeting_dev') QINIU_DOMAIN = os.getenv('QINIU_DOMAIN', 't0vogyxkz.hn-bkt.clouddn.com') # 应用配置 diff --git a/app/services/async_meeting_service.py b/app/services/async_meeting_service.py index 9ec1f9b..363bf7c 100644 --- a/app/services/async_meeting_service.py +++ b/app/services/async_meeting_service.py @@ -159,17 +159,22 @@ class AsyncMeetingService: if current_status == 'completed': print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}") - # 启动总结任务 - try: - summary_task_id = self.start_summary_generation(meeting_id, user_prompt="", prompt_id=prompt_id) - print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") + # 防止并发:检查是否已经有总结任务存在 + existing_task = self._get_existing_summary_task(meeting_id) + if existing_task: + print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation") + else: + # 启动总结任务 + try: + summary_task_id = self.start_summary_generation(meeting_id, user_prompt="", prompt_id=prompt_id) + print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") - # 在后台执行总结任务 - self._process_task(summary_task_id) + # 在后台执行总结任务 + self._process_task(summary_task_id) - except Exception as e: - error_msg = f"Failed to start summary generation: {e}" - print(f"[Monitor] {error_msg}") + except Exception as e: + error_msg = f"Failed to start summary generation: {e}" + print(f"[Monitor] {error_msg}") # 监控任务完成,退出循环 break @@ -457,5 +462,31 @@ class AsyncMeetingService: print(f"Error getting task from database: {e}") return None + def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]: + """ + 检查会议是否已经有总结任务(用于并发控制) + 返回最新的pending或processing状态的任务ID,如果没有则返回None + """ + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + query = """ + SELECT task_id FROM llm_tasks + WHERE meeting_id = %s AND status IN ('pending', 'processing') + ORDER BY created_at DESC + LIMIT 1 + """ + cursor.execute(query, (meeting_id,)) + result = cursor.fetchone() + cursor.close() + + if result: + return result['task_id'] + return None + + except Exception as e: + print(f"Error checking existing summary task: {e}") + return None + # 创建全局实例 async_meeting_service = AsyncMeetingService() diff --git a/app/services/async_transcription_service.py b/app/services/async_transcription_service.py index 5c54da7..4d3b85c 100644 --- a/app/services/async_transcription_service.py +++ b/app/services/async_transcription_service.py @@ -144,17 +144,26 @@ class AsyncTranscriptionService: # 3. 如果任务完成,处理结果 if current_status == 'completed' and paraformer_response.output.get('results'): - try: - self._process_transcription_result( - business_task_id, - int(task_data['meeting_id']), - paraformer_response.output - ) - except Exception as e: - current_status = 'failed' - progress = 100 # 进度为100,但状态是失败 - error_message = f"Error processing transcription result: {e}" - print(error_message) + # 防止并发处理:先检查数据库中的状态 + db_task_status = self._get_task_status_from_db(business_task_id) + if db_task_status != 'completed': + # 只有当数据库中状态不是completed时才处理 + # 先将状态更新为completed,作为分布式锁 + self._update_task_status_in_db(business_task_id, 'completed', 100, None) + + try: + self._process_transcription_result( + business_task_id, + int(task_data['meeting_id']), + paraformer_response.output + ) + except Exception as e: + current_status = 'failed' + progress = 100 # 进度为100,但状态是失败 + error_message = f"Error processing transcription result: {e}" + print(error_message) + else: + print(f"Task {business_task_id} already processed, skipping duplicate processing") except Exception as e: error_message = f"Error getting task status: {e}" @@ -332,7 +341,26 @@ class AsyncTranscriptionService: except Exception as e: print(f"Error updating task status in database: {e}") - + + def _get_task_status_from_db(self, business_task_id: str) -> Optional[str]: + """从数据库获取任务状态(用于并发控制)""" + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + query = "SELECT status FROM transcript_tasks WHERE task_id = %s" + cursor.execute(query, (business_task_id,)) + result = cursor.fetchone() + cursor.close() + + if result: + return result['status'] + return None + + except Exception as e: + print(f"Error getting task status from database: {e}") + return None + def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]: """从数据库获取任务信息""" try: