增加异步任务的保护机制,防止重复

main
mula.liu 2025-12-27 16:57:56 +08:00
parent 6f1ce5ffd9
commit 5a34df3944
5 changed files with 81 additions and 22 deletions

BIN
.DS_Store vendored

Binary file not shown.

BIN
app.zip

Binary file not shown.

View File

@ -46,7 +46,7 @@ API_CONFIG = {
# 七牛云配置 # 七牛云配置
QINIU_ACCESS_KEY = os.getenv('QINIU_ACCESS_KEY', 'A0tp96HCtg-wZCughTgi5vc2pJnw3btClwxRE_e8') QINIU_ACCESS_KEY = os.getenv('QINIU_ACCESS_KEY', 'A0tp96HCtg-wZCughTgi5vc2pJnw3btClwxRE_e8')
QINIU_SECRET_KEY = os.getenv('QINIU_SECRET_KEY', 'Lj-MSHpaVbmzpS86kMIjmwikvYOT9iPBjCk9hm6k') QINIU_SECRET_KEY = os.getenv('QINIU_SECRET_KEY', 'Lj-MSHpaVbmzpS86kMIjmwikvYOT9iPBjCk9hm6k')
QINIU_BUCKET = os.getenv('QINIU_BUCKET', 'imeeting') QINIU_BUCKET = os.getenv('QINIU_BUCKET', 'imeeting_dev')
QINIU_DOMAIN = os.getenv('QINIU_DOMAIN', 't0vogyxkz.hn-bkt.clouddn.com') QINIU_DOMAIN = os.getenv('QINIU_DOMAIN', 't0vogyxkz.hn-bkt.clouddn.com')
# 应用配置 # 应用配置

View File

@ -159,6 +159,11 @@ class AsyncMeetingService:
if current_status == 'completed': if current_status == 'completed':
print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}") print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}")
# 防止并发:检查是否已经有总结任务存在
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation")
else:
# 启动总结任务 # 启动总结任务
try: try:
summary_task_id = self.start_summary_generation(meeting_id, user_prompt="", prompt_id=prompt_id) summary_task_id = self.start_summary_generation(meeting_id, user_prompt="", prompt_id=prompt_id)
@ -457,5 +462,31 @@ class AsyncMeetingService:
print(f"Error getting task from database: {e}") print(f"Error getting task from database: {e}")
return None return None
def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]:
"""
检查会议是否已经有总结任务用于并发控制
返回最新的pending或processing状态的任务ID如果没有则返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT task_id FROM llm_tasks
WHERE meeting_id = %s AND status IN ('pending', 'processing')
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
result = cursor.fetchone()
cursor.close()
if result:
return result['task_id']
return None
except Exception as e:
print(f"Error checking existing summary task: {e}")
return None
# 创建全局实例 # 创建全局实例
async_meeting_service = AsyncMeetingService() async_meeting_service = AsyncMeetingService()

View File

@ -144,6 +144,13 @@ class AsyncTranscriptionService:
# 3. 如果任务完成,处理结果 # 3. 如果任务完成,处理结果
if current_status == 'completed' and paraformer_response.output.get('results'): if current_status == 'completed' and paraformer_response.output.get('results'):
# 防止并发处理:先检查数据库中的状态
db_task_status = self._get_task_status_from_db(business_task_id)
if db_task_status != 'completed':
# 只有当数据库中状态不是completed时才处理
# 先将状态更新为completed作为分布式锁
self._update_task_status_in_db(business_task_id, 'completed', 100, None)
try: try:
self._process_transcription_result( self._process_transcription_result(
business_task_id, business_task_id,
@ -155,6 +162,8 @@ class AsyncTranscriptionService:
progress = 100 # 进度为100但状态是失败 progress = 100 # 进度为100但状态是失败
error_message = f"Error processing transcription result: {e}" error_message = f"Error processing transcription result: {e}"
print(error_message) print(error_message)
else:
print(f"Task {business_task_id} already processed, skipping duplicate processing")
except Exception as e: except Exception as e:
error_message = f"Error getting task status: {e}" error_message = f"Error getting task status: {e}"
@ -333,6 +342,25 @@ class AsyncTranscriptionService:
except Exception as e: except Exception as e:
print(f"Error updating task status in database: {e}") print(f"Error updating task status in database: {e}")
def _get_task_status_from_db(self, business_task_id: str) -> Optional[str]:
"""从数据库获取任务状态(用于并发控制)"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT status FROM transcript_tasks WHERE task_id = %s"
cursor.execute(query, (business_task_id,))
result = cursor.fetchone()
cursor.close()
if result:
return result['status']
return None
except Exception as e:
print(f"Error getting task status from database: {e}")
return None
def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]: def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]:
"""从数据库获取任务信息""" """从数据库获取任务信息"""
try: try: