imetting_backend/app/services/async_transcription_service.py

375 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import uuid
import json
import redis
import requests
from datetime import datetime
from typing import Optional, Dict, Any
from http import HTTPStatus
import dashscope
from dashscope.audio.asr import Transcription
from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG
from app.core.database import get_db_connection
class AsyncTranscriptionService:
"""异步转录服务类"""
def __init__(self):
dashscope.api_key = QWEN_API_KEY
self.redis_client = redis.Redis(**REDIS_CONFIG)
self.base_url = APP_CONFIG['base_url']
def start_transcription(self, meeting_id: int, audio_file_path: str) -> str:
"""
启动异步转录任务
Args:
meeting_id: 会议ID
audio_file_path: 音频文件相对路径
Returns:
str: 业务任务ID
"""
try:
# 构造完整的文件URL
file_url = f"{self.base_url}{audio_file_path}"
print(f"Starting transcription for meeting_id: {meeting_id}, file_url: {file_url}")
# 调用Paraformer异步API
task_response = Transcription.async_call(
model='paraformer-v2',
file_urls=[file_url],
language_hints=['zh', 'en'],
disfluency_removal_enabled=True,
diarization_enabled=True,
speaker_count=10
)
if task_response.status_code != HTTPStatus.OK:
print(f"Failed to start transcription: {task_response.status_code}, {task_response.message}")
raise Exception(f"Transcription API error: {task_response.message}")
paraformer_task_id = task_response.output.task_id
business_task_id = str(uuid.uuid4())
# 在Redis中存储任务映射
task_data = {
'business_task_id': business_task_id,
'paraformer_task_id': paraformer_task_id,
'meeting_id': str(meeting_id),
'file_url': file_url,
'status': 'pending',
'progress': '0',
'created_at': datetime.now().isoformat(),
'updated_at': datetime.now().isoformat()
}
# 存储到Redis过期时间24小时
self.redis_client.hset(f"task:{business_task_id}", mapping=task_data)
self.redis_client.expire(f"task:{business_task_id}", 86400)
# 在数据库中创建任务记录
self._save_task_to_db(business_task_id, meeting_id, audio_file_path)
print(f"Transcription task created: {business_task_id}")
return business_task_id
except Exception as e:
print(f"Error starting transcription: {e}")
raise e
def get_task_status(self, business_task_id: str) -> Dict[str, Any]:
"""
获取任务状态
Args:
business_task_id: 业务任务ID
Returns:
Dict: 任务状态信息
"""
try:
# 从Redis获取任务信息
task_data = self.redis_client.hgetall(f"task:{business_task_id}")
if not task_data:
# 尝试从数据库获取
task_data = self._get_task_from_db(business_task_id)
if not task_data:
raise Exception("Task not found")
paraformer_task_id = task_data['paraformer_task_id']
# 查询Paraformer任务状态
paraformer_response = Transcription.fetch(task=paraformer_task_id)
if paraformer_response.status_code != HTTPStatus.OK:
print(f"Failed to fetch task status: {paraformer_response.message}")
current_status = 'failed'
progress = 0
error_message = paraformer_response.message
else:
# 映射Paraformer状态到业务状态
paraformer_status = paraformer_response.output.task_status
current_status = self._map_paraformer_status(paraformer_status)
progress = self._calculate_progress(paraformer_status)
error_message = None
# 如果任务完成,处理结果
if current_status == 'completed' and paraformer_response.output.get('results'):
self._process_transcription_result(
business_task_id,
int(task_data['meeting_id']),
paraformer_response.output
)
# 更新Redis中的状态
update_data = {
'status': current_status,
'progress': str(progress),
'updated_at': datetime.now().isoformat()
}
if error_message:
update_data['error_message'] = error_message
self.redis_client.hset(f"task:{business_task_id}", mapping=update_data)
# 更新数据库中的状态
self._update_task_status_in_db(business_task_id, current_status, progress, error_message)
return {
'task_id': business_task_id,
'status': current_status,
'progress': progress,
'meeting_id': int(task_data['meeting_id']),
'created_at': task_data.get('created_at'),
'updated_at': update_data['updated_at'],
'error_message': error_message
}
except Exception as e:
print(f"Error getting task status: {e}")
raise e
def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
"""
获取会议的转录任务状态
Args:
meeting_id: 会议ID
Returns:
Optional[Dict]: 任务状态信息如果没有任务返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询最新的转录任务
query = """
SELECT task_id, status, progress, created_at, completed_at, error_message
FROM transcript_tasks
WHERE meeting_id = %s
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
task_record = cursor.fetchone()
# 关闭游标
cursor.close()
if not task_record:
return None
# 如果任务还在进行中,获取最新状态
if task_record['status'] in ['pending', 'processing']:
try:
latest_status = self.get_task_status(task_record['task_id'])
return latest_status
except Exception:
# 如果获取最新状态失败,返回数据库中的状态
pass
return {
'task_id': task_record['task_id'],
'status': task_record['status'],
'progress': task_record['progress'] or 0,
'meeting_id': meeting_id,
'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None,
'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None,
'error_message': task_record['error_message']
}
except Exception as e:
print(f"Error getting meeting transcription status: {e}")
return None
def _map_paraformer_status(self, paraformer_status: str) -> str:
"""映射Paraformer状态到业务状态"""
status_mapping = {
'PENDING': 'pending',
'RUNNING': 'processing',
'SUCCEEDED': 'completed',
'FAILED': 'failed'
}
return status_mapping.get(paraformer_status, 'unknown')
def _calculate_progress(self, paraformer_status: str) -> int:
"""根据Paraformer状态计算进度"""
progress_mapping = {
'PENDING': 0,
'RUNNING': 50,
'SUCCEEDED': 100,
'FAILED': 0
}
return progress_mapping.get(paraformer_status, 0)
def _save_task_to_db(self, business_task_id: str, meeting_id: int, audio_file_path: str):
"""保存任务记录到数据库"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
# 更新audio_files表关联task_id
update_audio_query = """
UPDATE audio_files
SET task_id = %s
WHERE meeting_id = %s AND file_path = %s
"""
cursor.execute(update_audio_query, (business_task_id, meeting_id, audio_file_path))
# 插入转录任务记录
insert_task_query = """
INSERT INTO transcript_tasks (task_id, meeting_id, status, progress, created_at)
VALUES (%s, %s, 'pending', 0, NOW())
"""
cursor.execute(insert_task_query, (business_task_id, meeting_id))
connection.commit()
cursor.close() # 明确关闭游标
print(f"Task record saved to database: {business_task_id}")
except Exception as e:
print(f"Error saving task to database: {e}")
raise e
def _update_task_status_in_db(self, business_task_id: str, status: str, progress: int, error_message: Optional[str] = None):
"""更新数据库中的任务状态"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
if status == 'completed':
update_query = """
UPDATE transcript_tasks
SET status = %s, progress = %s, completed_at = NOW(), error_message = %s
WHERE task_id = %s
"""
else:
update_query = """
UPDATE transcript_tasks
SET status = %s, progress = %s, error_message = %s
WHERE task_id = %s
"""
cursor.execute(update_query, (status, progress, error_message, business_task_id))
connection.commit()
cursor.close() # 明确关闭游标
except Exception as e:
print(f"Error updating task status in database: {e}")
def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]:
"""从数据库获取任务信息"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT tt.task_id as business_task_id, tt.meeting_id, tt.status
FROM transcript_tasks tt
WHERE tt.task_id = %s
"""
cursor.execute(query, (business_task_id,))
result = cursor.fetchone()
cursor.close() # 明确关闭游标
if result:
# 转换为字符串格式以保持一致性
return {
'business_task_id': result['business_task_id'],
'paraformer_task_id': '', # 数据库中没有存储需要从Redis获取
'meeting_id': str(result['meeting_id']),
'status': result['status']
}
return None
except Exception as e:
print(f"Error getting task from database: {e}")
return None
def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any):
"""处理转录结果"""
try:
if not paraformer_output.get('results'):
print("No transcription results found in the response.")
return
transcription_url = paraformer_output['results'][0]['transcription_url']
print(f"Fetching transcription from URL: {transcription_url}")
response = requests.get(transcription_url)
response.raise_for_status()
transcription_data = response.json()
# 保存转录内容到数据库
self._save_segments_to_db(transcription_data, meeting_id)
print(f"Transcription result processed for task: {business_task_id}")
except Exception as e:
print(f"Error processing transcription result: {e}")
def _save_segments_to_db(self, data: dict, meeting_id: int):
"""保存转录分段到数据库"""
segments_to_insert = []
for transcript in data.get('transcripts', []):
for sentence in transcript.get('sentences', []):
speaker_id = sentence.get('speaker_id', -1)
segments_to_insert.append((
meeting_id,
speaker_id,
f"发言人 {speaker_id}", # 默认speaker_tag
sentence.get('begin_time'),
sentence.get('end_time'),
sentence.get('text')
))
if not segments_to_insert:
print("No segments to save.")
return
try:
with get_db_connection() as connection:
cursor = connection.cursor()
# 清除该会议的现有转录分段
delete_query = "DELETE FROM transcript_segments WHERE meeting_id = %s"
cursor.execute(delete_query, (meeting_id,))
print(f"Deleted existing segments for meeting_id: {meeting_id}")
# 插入新的转录分段
insert_query = '''
INSERT INTO transcript_segments (meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content)
VALUES (%s, %s, %s, %s, %s, %s)
'''
cursor.executemany(insert_query, segments_to_insert)
connection.commit()
cursor.close() # 明确关闭游标
print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}")
except Exception as e:
print(f"Database error when saving segments: {e}")