diff --git a/app/api/endpoints/meetings.py b/app/api/endpoints/meetings.py index 0ac71c0..1a9d692 100644 --- a/app/api/endpoints/meetings.py +++ b/app/api/endpoints/meetings.py @@ -1,9 +1,10 @@ from fastapi import APIRouter, HTTPException, UploadFile, File, Form -from app.models.models import Meeting, TranscriptSegment, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, TranscriptUpdateRequest, BatchTranscriptUpdateRequest +from app.models.models import Meeting, TranscriptSegment, TranscriptionTaskStatus, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, TranscriptUpdateRequest, BatchTranscriptUpdateRequest from app.core.database import get_db_connection from app.core.config import BASE_DIR, UPLOAD_DIR, AUDIO_DIR, MARKDOWN_DIR, ALLOWED_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS, MAX_FILE_SIZE, MAX_IMAGE_SIZE from app.services.qiniu_service import qiniu_service from app.services.llm_service import LLMService +from app.services.async_transcription_service import AsyncTranscriptionService from typing import Optional from pydantic import BaseModel import os @@ -12,8 +13,9 @@ import shutil router = APIRouter() -# 实例化LLM服务 +# 实例化服务 llm_service = LLMService() +transcription_service = AsyncTranscriptionService() # 请求模型 class GenerateSummaryRequest(BaseModel): @@ -91,6 +93,7 @@ def get_meeting_details(meeting_id: int): meeting = cursor.fetchone() if not meeting: + cursor.close() # 明确关闭游标 raise HTTPException(status_code=404, detail="Meeting not found") attendees_query = ''' @@ -102,6 +105,9 @@ def get_meeting_details(meeting_id: int): cursor.execute(attendees_query, (meeting['meeting_id'],)) attendees_data = cursor.fetchall() attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data] + + # 关闭游标,避免与转录服务的数据库连接冲突 + cursor.close() meeting_data = Meeting( meeting_id=meeting['meeting_id'], @@ -117,8 +123,17 @@ def get_meeting_details(meeting_id: int): # Add audio file path if exists if meeting['audio_file_path']: meeting_data.audio_file_path = meeting['audio_file_path'] - - return meeting_data + + # 在连接外部获取转录状态,避免游标冲突 + try: + transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id) + if transcription_status_data: + meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data) + except Exception as e: + print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}") + # Don't fail the entire request if transcription status can't be fetched + + return meeting_data @router.get("/meetings/{meeting_id}/transcript", response_model=list[TranscriptSegment]) def get_meeting_transcript(meeting_id: int): @@ -300,6 +315,7 @@ def get_meeting_for_edit(meeting_id: int): meeting = cursor.fetchone() if not meeting: + cursor.close() raise HTTPException(status_code=404, detail="Meeting not found") # Get attendees @@ -312,6 +328,9 @@ def get_meeting_for_edit(meeting_id: int): cursor.execute(attendees_query, (meeting_id,)) attendees_data = cursor.fetchall() attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data] + + # 关闭游标,避免与转录服务的数据库连接冲突 + cursor.close() meeting_data = Meeting( meeting_id=meeting['meeting_id'], @@ -327,14 +346,26 @@ def get_meeting_for_edit(meeting_id: int): # Add audio file path if exists if meeting.get('audio_file_path'): meeting_data.audio_file_path = meeting['audio_file_path'] - - return meeting_data + + # 在连接外部获取转录状态,避免游标冲突 + try: + transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id) + if transcription_status_data: + meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data) + except Exception as e: + print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}") + + return meeting_data @router.post("/meetings/upload-audio") async def upload_audio( audio_file: UploadFile = File(...), - meeting_id: int = Form(...) + meeting_id: int = Form(...), + force_replace: str = Form("false") # 接收字符串,然后手动转换 ): + # Convert string to boolean + force_replace_bool = force_replace.lower() in ("true", "1", "yes") + # Validate file extension file_extension = os.path.splitext(audio_file.filename)[1].lower() if file_extension not in ALLOWED_EXTENSIONS: @@ -350,6 +381,55 @@ async def upload_audio( detail="File size exceeds 100MB limit" ) + # 检查是否已有音频文件和转录记录 + existing_info = None + has_transcription = False + + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + # Check if meeting exists + cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) + if not cursor.fetchone(): + cursor.close() + raise HTTPException(status_code=404, detail="Meeting not found") + + # Check existing audio file + cursor.execute(""" + SELECT file_name, file_path, upload_time + FROM audio_files + WHERE meeting_id = %s + """, (meeting_id,)) + existing_info = cursor.fetchone() + + # Check existing transcription segments + if existing_info: + cursor.execute(""" + SELECT COUNT(*) as segment_count + FROM transcript_segments + WHERE meeting_id = %s + """, (meeting_id,)) + result = cursor.fetchone() + has_transcription = result['segment_count'] > 0 + + cursor.close() + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to check existing files: {str(e)}") + + # 如果存在音频文件且有转录记录,需要用户确认 + if existing_info and has_transcription and not force_replace_bool: + return { + "requires_confirmation": True, + "message": "该会议已有音频文件和转录记录,重新上传将删除现有的转录内容", + "existing_file": { + "file_name": existing_info['file_name'], + "upload_time": existing_info['upload_time'].isoformat() if existing_info['upload_time'] else None + }, + "transcription_segments": has_transcription, + "suggestion": "请确认是否要替换现有音频文件并重新进行转录" + } + # Create meeting-specific directory meeting_dir = AUDIO_DIR / str(meeting_id) meeting_dir.mkdir(exist_ok=True) @@ -366,34 +446,71 @@ async def upload_audio( except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") - # Save file info to database - with get_db_connection() as connection: - cursor = connection.cursor(dictionary=True) + # Save file info to database and start transcription + task_id = None + replaced_existing = existing_info is not None + + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + # 如果是替换操作,清理旧的转录数据 + if replaced_existing and force_replace_bool: + # Delete existing transcription segments + cursor.execute("DELETE FROM transcript_segments WHERE meeting_id = %s", (meeting_id,)) + + # Delete existing transcription tasks + cursor.execute("DELETE FROM transcript_tasks WHERE meeting_id = %s", (meeting_id,)) + + # Clean up old audio file + if existing_info and existing_info['file_path']: + old_file_path = BASE_DIR / existing_info['file_path'].lstrip('/') + if old_file_path.exists(): + try: + os.remove(old_file_path) + except Exception as e: + print(f"Warning: Failed to delete old file {old_file_path}: {e}") + + # Insert or update audio file record + if replaced_existing: + update_query = ''' + UPDATE audio_files + SET file_name = %s, file_path = %s, file_size = %s, upload_time = NOW(), task_id = NULL + WHERE meeting_id = %s + ''' + cursor.execute(update_query, (audio_file.filename, '/'+str(relative_path), audio_file.size, meeting_id)) + else: + insert_query = ''' + INSERT INTO audio_files (meeting_id, file_name, file_path, file_size, upload_time) + VALUES (%s, %s, %s, %s, NOW()) + ''' + cursor.execute(insert_query, (meeting_id, audio_file.filename, '/'+str(relative_path), audio_file.size)) + + connection.commit() + cursor.close() - # Check if meeting exists - cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) - if not cursor.fetchone(): - # Clean up uploaded file if meeting doesn't exist + # Start transcription task + try: + task_id = transcription_service.start_transcription(meeting_id, '/'+str(relative_path)) + print(f"Transcription task started with ID: {task_id}") + except Exception as e: + print(f"Failed to start transcription task: {e}") + # 不抛出异常,允许文件上传成功但转录失败 + + except Exception as e: + # Clean up uploaded file on database error + if os.path.exists(absolute_path): os.remove(absolute_path) - raise HTTPException(status_code=404, detail="Meeting not found") - - # Insert audio file record - insert_query = ''' - INSERT INTO audio_files (meeting_id, file_name, file_path, file_size, upload_time) - VALUES (%s, %s, %s, %s, NOW()) - ON DUPLICATE KEY UPDATE - file_name = VALUES(file_name), - file_path = VALUES(file_path), - file_size = VALUES(file_size), - upload_time = VALUES(upload_time) - ''' - cursor.execute(insert_query, (meeting_id, audio_file.filename, '/'+str(relative_path), audio_file.size)) - connection.commit() + raise HTTPException(status_code=500, detail=f"Failed to save file info: {str(e)}") return { - "message": "Audio file uploaded successfully", + "message": "Audio file uploaded successfully" + (" and replaced existing file" if replaced_existing else ""), "file_name": audio_file.filename, - "file_path": '/'+str(relative_path) + "file_path": '/'+str(relative_path), + "task_id": task_id, + "transcription_started": task_id is not None, + "replaced_existing": replaced_existing, + "previous_transcription_cleared": replaced_existing and has_transcription } @router.get("/meetings/{meeting_id}/audio") @@ -419,6 +536,73 @@ def get_audio_file(meeting_id: int): "upload_time": audio_file['upload_time'] } +# 转录任务相关接口 +@router.get("/transcription/tasks/{task_id}/status") +def get_transcription_task_status(task_id: str): + """获取转录任务状态""" + try: + status_info = transcription_service.get_task_status(task_id) + return status_info + except Exception as e: + if "Task not found" in str(e): + raise HTTPException(status_code=404, detail="Transcription task not found") + else: + raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") + +@router.get("/meetings/{meeting_id}/transcription/status") +def get_meeting_transcription_status(meeting_id: int): + """获取会议的转录任务状态""" + try: + status_info = transcription_service.get_meeting_transcription_status(meeting_id) + if not status_info: + return {"message": "No transcription task found for this meeting"} + return status_info + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get meeting transcription status: {str(e)}") + +@router.post("/meetings/{meeting_id}/transcription/start") +def start_meeting_transcription(meeting_id: int): + """手动启动会议转录任务(如果有音频文件的话)""" + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + # 检查会议是否存在 + cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) + if not cursor.fetchone(): + raise HTTPException(status_code=404, detail="Meeting not found") + + # 查询音频文件 + audio_query = "SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1" + cursor.execute(audio_query, (meeting_id,)) + audio_file = cursor.fetchone() + + if not audio_file: + raise HTTPException(status_code=400, detail="No audio file found for this meeting") + + # 检查是否已有进行中的任务 + existing_status = transcription_service.get_meeting_transcription_status(meeting_id) + if existing_status and existing_status['status'] in ['pending', 'processing']: + return { + "message": "Transcription task already exists", + "task_id": existing_status['task_id'], + "status": existing_status['status'] + } + + # 启动转录任务 + task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path']) + + return { + "message": "Transcription task started successfully", + "task_id": task_id, + "meeting_id": meeting_id + } + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start transcription: {str(e)}") + @router.post("/meetings/{meeting_id}/upload-image") async def upload_image( meeting_id: int, diff --git a/app/core/config.py b/app/core/config.py index 531f184..7056185 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -41,6 +41,20 @@ QINIU_SECRET_KEY = os.getenv('QINIU_SECRET_KEY', 'Lj-MSHpaVbmzpS86kMIjmwikvYOT9i QINIU_BUCKET = os.getenv('QINIU_BUCKET', 'imeeting') QINIU_DOMAIN = os.getenv('QINIU_DOMAIN', 't0vogyxkz.hn-bkt.clouddn.com') +# 应用配置 +APP_CONFIG = { + 'base_url': os.getenv('BASE_URL', 'http://imeeting.unisspace.com') +} + +# Redis配置 +REDIS_CONFIG = { + 'host': os.getenv('REDIS_HOST', 'localhost'), + 'port': int(os.getenv('REDIS_PORT', '6379')), + 'db': int(os.getenv('REDIS_DB', '0')), + 'password': os.getenv('REDIS_PASSWORD', None), + 'decode_responses': True +} + # Dashscope (Tongyi Qwen) API Key QWEN_API_KEY = os.getenv('QWEN_API_KEY', 'sk-c2bf06ea56b4491ea3d1e37fdb472b8f') diff --git a/app/core/database.py b/app/core/database.py index dc42fbf..dc438d5 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -10,7 +10,9 @@ DB_CONFIG = { 'user': 'root', 'password': 'sagacity', 'port': 3306, - 'charset': 'utf8mb4' + 'charset': 'utf8mb4', + 'autocommit': False, # 禁用自动提交 + 'consume_results': True # 自动消费未读结果 } @contextmanager @@ -24,4 +26,10 @@ def get_db_connection(): raise HTTPException(status_code=500, detail="数据库连接失败") finally: if connection and connection.is_connected(): - connection.close() + try: + # 确保清理任何未读结果 + if connection.unread_result: + connection.consume_results() + connection.close() + except Exception as e: + print(f"关闭数据库连接时出错: {e}") diff --git a/app/models/models.py b/app/models/models.py index 6a84e7a..995380f 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -27,6 +27,16 @@ class AttendeeInfo(BaseModel): user_id: int caption: str +class TranscriptionTaskStatus(BaseModel): + task_id: str + status: str # 'pending', 'processing', 'completed', 'failed' + progress: int # 0-100 + meeting_id: int + created_at: Optional[str] = None + updated_at: Optional[str] = None + completed_at: Optional[str] = None + error_message: Optional[str] = None + class Meeting(BaseModel): meeting_id: int title: str @@ -37,6 +47,7 @@ class Meeting(BaseModel): creator_id: int creator_username: str audio_file_path: Optional[str] = None + transcription_status: Optional[TranscriptionTaskStatus] = None class TranscriptSegment(BaseModel): segment_id: int diff --git a/app/services/async_transcription_service.py b/app/services/async_transcription_service.py new file mode 100644 index 0000000..c3cdafa --- /dev/null +++ b/app/services/async_transcription_service.py @@ -0,0 +1,375 @@ +import uuid +import json +import redis +import requests +from datetime import datetime +from typing import Optional, Dict, Any +from http import HTTPStatus + +import dashscope +from dashscope.audio.asr import Transcription + +from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG +from app.core.database import get_db_connection + + +class AsyncTranscriptionService: + """异步转录服务类""" + + def __init__(self): + dashscope.api_key = QWEN_API_KEY + self.redis_client = redis.Redis(**REDIS_CONFIG) + self.base_url = APP_CONFIG['base_url'] + + def start_transcription(self, meeting_id: int, audio_file_path: str) -> str: + """ + 启动异步转录任务 + + Args: + meeting_id: 会议ID + audio_file_path: 音频文件相对路径 + + Returns: + str: 业务任务ID + """ + try: + # 构造完整的文件URL + file_url = f"{self.base_url}{audio_file_path}" + + print(f"Starting transcription for meeting_id: {meeting_id}, file_url: {file_url}") + + # 调用Paraformer异步API + task_response = Transcription.async_call( + model='paraformer-v2', + file_urls=[file_url], + language_hints=['zh', 'en'], + disfluency_removal_enabled=True, + diarization_enabled=True, + speaker_count=10 + ) + + if task_response.status_code != HTTPStatus.OK: + print(f"Failed to start transcription: {task_response.status_code}, {task_response.message}") + raise Exception(f"Transcription API error: {task_response.message}") + + paraformer_task_id = task_response.output.task_id + business_task_id = str(uuid.uuid4()) + + # 在Redis中存储任务映射 + task_data = { + 'business_task_id': business_task_id, + 'paraformer_task_id': paraformer_task_id, + 'meeting_id': str(meeting_id), + 'file_url': file_url, + 'status': 'pending', + 'progress': '0', + 'created_at': datetime.now().isoformat(), + 'updated_at': datetime.now().isoformat() + } + + # 存储到Redis,过期时间24小时 + self.redis_client.hset(f"task:{business_task_id}", mapping=task_data) + self.redis_client.expire(f"task:{business_task_id}", 86400) + + # 在数据库中创建任务记录 + self._save_task_to_db(business_task_id, meeting_id, audio_file_path) + + print(f"Transcription task created: {business_task_id}") + return business_task_id + + except Exception as e: + print(f"Error starting transcription: {e}") + raise e + + def get_task_status(self, business_task_id: str) -> Dict[str, Any]: + """ + 获取任务状态 + + Args: + business_task_id: 业务任务ID + + Returns: + Dict: 任务状态信息 + """ + try: + # 从Redis获取任务信息 + task_data = self.redis_client.hgetall(f"task:{business_task_id}") + if not task_data: + # 尝试从数据库获取 + task_data = self._get_task_from_db(business_task_id) + if not task_data: + raise Exception("Task not found") + + paraformer_task_id = task_data['paraformer_task_id'] + + # 查询Paraformer任务状态 + paraformer_response = Transcription.fetch(task=paraformer_task_id) + + if paraformer_response.status_code != HTTPStatus.OK: + print(f"Failed to fetch task status: {paraformer_response.message}") + current_status = 'failed' + progress = 0 + error_message = paraformer_response.message + else: + # 映射Paraformer状态到业务状态 + paraformer_status = paraformer_response.output.task_status + current_status = self._map_paraformer_status(paraformer_status) + progress = self._calculate_progress(paraformer_status) + error_message = None + + # 如果任务完成,处理结果 + if current_status == 'completed' and paraformer_response.output.get('results'): + self._process_transcription_result( + business_task_id, + int(task_data['meeting_id']), + paraformer_response.output + ) + + # 更新Redis中的状态 + update_data = { + 'status': current_status, + 'progress': str(progress), + 'updated_at': datetime.now().isoformat() + } + if error_message: + update_data['error_message'] = error_message + + self.redis_client.hset(f"task:{business_task_id}", mapping=update_data) + + # 更新数据库中的状态 + self._update_task_status_in_db(business_task_id, current_status, progress, error_message) + + return { + 'task_id': business_task_id, + 'status': current_status, + 'progress': progress, + 'meeting_id': int(task_data['meeting_id']), + 'created_at': task_data.get('created_at'), + 'updated_at': update_data['updated_at'], + 'error_message': error_message + } + + except Exception as e: + print(f"Error getting task status: {e}") + raise e + + def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]: + """ + 获取会议的转录任务状态 + + Args: + meeting_id: 会议ID + + Returns: + Optional[Dict]: 任务状态信息,如果没有任务返回None + """ + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + # 查询最新的转录任务 + query = """ + SELECT task_id, status, progress, created_at, completed_at, error_message + FROM transcript_tasks + WHERE meeting_id = %s + ORDER BY created_at DESC + LIMIT 1 + """ + cursor.execute(query, (meeting_id,)) + task_record = cursor.fetchone() + + # 关闭游标 + cursor.close() + + if not task_record: + return None + + # 如果任务还在进行中,获取最新状态 + if task_record['status'] in ['pending', 'processing']: + try: + latest_status = self.get_task_status(task_record['task_id']) + return latest_status + except Exception: + # 如果获取最新状态失败,返回数据库中的状态 + pass + + return { + 'task_id': task_record['task_id'], + 'status': task_record['status'], + 'progress': task_record['progress'] or 0, + 'meeting_id': meeting_id, + 'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None, + 'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None, + 'error_message': task_record['error_message'] + } + + except Exception as e: + print(f"Error getting meeting transcription status: {e}") + return None + + def _map_paraformer_status(self, paraformer_status: str) -> str: + """映射Paraformer状态到业务状态""" + status_mapping = { + 'PENDING': 'pending', + 'RUNNING': 'processing', + 'SUCCEEDED': 'completed', + 'FAILED': 'failed' + } + return status_mapping.get(paraformer_status, 'unknown') + + def _calculate_progress(self, paraformer_status: str) -> int: + """根据Paraformer状态计算进度""" + progress_mapping = { + 'PENDING': 0, + 'RUNNING': 50, + 'SUCCEEDED': 100, + 'FAILED': 0 + } + return progress_mapping.get(paraformer_status, 0) + + def _save_task_to_db(self, business_task_id: str, meeting_id: int, audio_file_path: str): + """保存任务记录到数据库""" + try: + with get_db_connection() as connection: + cursor = connection.cursor() + + # 更新audio_files表关联task_id + update_audio_query = """ + UPDATE audio_files + SET task_id = %s + WHERE meeting_id = %s AND file_path = %s + """ + cursor.execute(update_audio_query, (business_task_id, meeting_id, audio_file_path)) + + # 插入转录任务记录 + insert_task_query = """ + INSERT INTO transcript_tasks (task_id, meeting_id, status, progress, created_at) + VALUES (%s, %s, 'pending', 0, NOW()) + """ + cursor.execute(insert_task_query, (business_task_id, meeting_id)) + + connection.commit() + cursor.close() # 明确关闭游标 + print(f"Task record saved to database: {business_task_id}") + + except Exception as e: + print(f"Error saving task to database: {e}") + raise e + + def _update_task_status_in_db(self, business_task_id: str, status: str, progress: int, error_message: Optional[str] = None): + """更新数据库中的任务状态""" + try: + with get_db_connection() as connection: + cursor = connection.cursor() + + if status == 'completed': + update_query = """ + UPDATE transcript_tasks + SET status = %s, progress = %s, completed_at = NOW(), error_message = %s + WHERE task_id = %s + """ + else: + update_query = """ + UPDATE transcript_tasks + SET status = %s, progress = %s, error_message = %s + WHERE task_id = %s + """ + + cursor.execute(update_query, (status, progress, error_message, business_task_id)) + connection.commit() + cursor.close() # 明确关闭游标 + + except Exception as e: + print(f"Error updating task status in database: {e}") + + def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]: + """从数据库获取任务信息""" + try: + with get_db_connection() as connection: + cursor = connection.cursor(dictionary=True) + + query = """ + SELECT tt.task_id as business_task_id, tt.meeting_id, tt.status + FROM transcript_tasks tt + WHERE tt.task_id = %s + """ + cursor.execute(query, (business_task_id,)) + result = cursor.fetchone() + cursor.close() # 明确关闭游标 + + if result: + # 转换为字符串格式以保持一致性 + return { + 'business_task_id': result['business_task_id'], + 'paraformer_task_id': '', # 数据库中没有存储,需要从Redis获取 + 'meeting_id': str(result['meeting_id']), + 'status': result['status'] + } + return None + + except Exception as e: + print(f"Error getting task from database: {e}") + return None + + def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any): + """处理转录结果""" + try: + if not paraformer_output.get('results'): + print("No transcription results found in the response.") + return + + transcription_url = paraformer_output['results'][0]['transcription_url'] + print(f"Fetching transcription from URL: {transcription_url}") + + response = requests.get(transcription_url) + response.raise_for_status() + transcription_data = response.json() + + # 保存转录内容到数据库 + self._save_segments_to_db(transcription_data, meeting_id) + + print(f"Transcription result processed for task: {business_task_id}") + + except Exception as e: + print(f"Error processing transcription result: {e}") + + def _save_segments_to_db(self, data: dict, meeting_id: int): + """保存转录分段到数据库""" + segments_to_insert = [] + for transcript in data.get('transcripts', []): + for sentence in transcript.get('sentences', []): + speaker_id = sentence.get('speaker_id', -1) + segments_to_insert.append(( + meeting_id, + speaker_id, + f"发言人 {speaker_id}", # 默认speaker_tag + sentence.get('begin_time'), + sentence.get('end_time'), + sentence.get('text') + )) + + if not segments_to_insert: + print("No segments to save.") + return + + try: + with get_db_connection() as connection: + cursor = connection.cursor() + + # 清除该会议的现有转录分段 + delete_query = "DELETE FROM transcript_segments WHERE meeting_id = %s" + cursor.execute(delete_query, (meeting_id,)) + print(f"Deleted existing segments for meeting_id: {meeting_id}") + + # 插入新的转录分段 + insert_query = ''' + INSERT INTO transcript_segments (meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content) + VALUES (%s, %s, %s, %s, %s, %s) + ''' + cursor.executemany(insert_query, segments_to_insert) + connection.commit() + cursor.close() # 明确关闭游标 + print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}") + + except Exception as e: + print(f"Database error when saving segments: {e}") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a552ffd..8e763a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ uvicorn[standard] python-multipart pydantic[email] passlib[bcrypt] -qiniu \ No newline at end of file +qiniu +redis>=5.0.0 +dashscope \ No newline at end of file diff --git a/uploads/audio/41/61a1fca4-91a8-4f89-b08b-76705847609a.m4a b/uploads/audio/41/61a1fca4-91a8-4f89-b08b-76705847609a.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/41/61a1fca4-91a8-4f89-b08b-76705847609a.m4a differ diff --git a/uploads/audio/41/ab94de20-8ad6-45be-b6f7-cf87b425e773.m4a b/uploads/audio/41/ab94de20-8ad6-45be-b6f7-cf87b425e773.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/41/ab94de20-8ad6-45be-b6f7-cf87b425e773.m4a differ diff --git a/uploads/audio/41/d4a4c7ee-78f9-4721-84ef-aba6ce4d9197.m4a b/uploads/audio/41/d4a4c7ee-78f9-4721-84ef-aba6ce4d9197.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/41/d4a4c7ee-78f9-4721-84ef-aba6ce4d9197.m4a differ diff --git a/uploads/audio/42/5e4da95d-4019-435d-ab52-86564329a195.m4a b/uploads/audio/42/5e4da95d-4019-435d-ab52-86564329a195.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/42/5e4da95d-4019-435d-ab52-86564329a195.m4a differ diff --git a/uploads/audio/43/f956e9b0-e7e2-4a09-8ee6-44601abd2533.m4a b/uploads/audio/43/f956e9b0-e7e2-4a09-8ee6-44601abd2533.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/43/f956e9b0-e7e2-4a09-8ee6-44601abd2533.m4a differ diff --git a/uploads/audio/44/d50d769d-a65d-4f9f-830a-e97e8c56e04b.m4a b/uploads/audio/44/d50d769d-a65d-4f9f-830a-e97e8c56e04b.m4a new file mode 100644 index 0000000..1b8c736 Binary files /dev/null and b/uploads/audio/44/d50d769d-a65d-4f9f-830a-e97e8c56e04b.m4a differ diff --git a/uploads/audio/6/03637f67-96ae-4167-820a-fb87b3df2646.mp3 b/uploads/audio/6/03637f67-96ae-4167-820a-fb87b3df2646.mp3 new file mode 100644 index 0000000..0fd96c9 Binary files /dev/null and b/uploads/audio/6/03637f67-96ae-4167-820a-fb87b3df2646.mp3 differ diff --git a/uploads/audio/6/a8eb6acd-52e6-4895-b608-d73aba66393c.mp3 b/uploads/audio/6/a8eb6acd-52e6-4895-b608-d73aba66393c.mp3 new file mode 100644 index 0000000..0fd96c9 Binary files /dev/null and b/uploads/audio/6/a8eb6acd-52e6-4895-b608-d73aba66393c.mp3 differ