from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends, BackgroundTasks from app.models.models import Meeting, TranscriptSegment, TranscriptionTaskStatus, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, TranscriptUpdateRequest, BatchTranscriptUpdateRequest, Tag from app.core.database import get_db_connection from app.core.config import BASE_DIR, UPLOAD_DIR, AUDIO_DIR, MARKDOWN_DIR, ALLOWED_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS, MAX_FILE_SIZE, MAX_IMAGE_SIZE from app.services.qiniu_service import qiniu_service from app.services.llm_service import LLMService from app.services.async_transcription_service import AsyncTranscriptionService from app.services.async_llm_service import async_llm_service from app.core.auth import get_current_user, get_optional_current_user from typing import List, Optional from datetime import datetime from pydantic import BaseModel import os import uuid import shutil router = APIRouter() # 实例化服务 llm_service = LLMService() transcription_service = AsyncTranscriptionService() # 注意:异步LLM服务需要单独启动worker进程 # 运行命令:python llm_worker.py # 请求模型 class GenerateSummaryRequest(BaseModel): user_prompt: Optional[str] = "" def _process_tags(cursor, tag_string: Optional[str]) -> List[Tag]: if not tag_string: return [] tag_names = [name.strip() for name in tag_string.split(',') if name.strip()] if not tag_names: return [] # Ensure all tags exist in the 'tags' table insert_ignore_query = "INSERT IGNORE INTO tags (name) VALUES (%s)" cursor.executemany(insert_ignore_query, [(name,) for name in tag_names]) # Fetch the full tag objects format_strings = ', '.join(['%s'] * len(tag_names)) cursor.execute(f"SELECT id, name, color FROM tags WHERE name IN ({format_strings})", tuple(tag_names)) tags_data = cursor.fetchall() return [Tag(**tag) for tag in tags_data] @router.get("/meetings", response_model=list[Meeting]) def get_meetings(current_user: dict = Depends(get_current_user), user_id: Optional[int] = None): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) base_query = ''' SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags, m.user_id as creator_id, u.caption as creator_username FROM meetings m JOIN users u ON m.user_id = u.user_id ''' if user_id: query = f''' {base_query} LEFT JOIN attendees a ON m.meeting_id = a.meeting_id WHERE m.user_id = %s OR a.user_id = %s GROUP BY m.meeting_id ORDER BY m.meeting_time DESC, m.created_at DESC ''' cursor.execute(query, (user_id, user_id)) else: query = f" {base_query} ORDER BY m.meeting_time DESC, m.created_at DESC" cursor.execute(query) meetings = cursor.fetchall() meeting_list = [] for meeting in meetings: attendees_query = ''' SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s ''' cursor.execute(attendees_query, (meeting['meeting_id'],)) attendees_data = cursor.fetchall() attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data] tags = _process_tags(cursor, meeting.get('tags')) meeting_list.append(Meeting( meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'], summary=meeting['summary'], created_at=meeting['created_at'], attendees=attendees, creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags )) return meeting_list @router.get("/meetings/{meeting_id}", response_model=Meeting) def get_meeting_details(meeting_id: int, current_user: dict = Depends(get_current_user)): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = ''' SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags, m.user_id as creator_id, u.caption as creator_username, af.file_path as audio_file_path FROM meetings m JOIN users u ON m.user_id = u.user_id LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id WHERE m.meeting_id = %s ''' cursor.execute(query, (meeting_id,)) meeting = cursor.fetchone() if not meeting: cursor.close() # 明确关闭游标 raise HTTPException(status_code=404, detail="Meeting not found") attendees_query = ''' SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s ''' cursor.execute(attendees_query, (meeting['meeting_id'],)) attendees_data = cursor.fetchall() attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data] tags = _process_tags(cursor, meeting.get('tags')) # 关闭游标,避免与转录服务的数据库连接冲突 cursor.close() meeting_data = Meeting( meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'], summary=meeting['summary'], created_at=meeting['created_at'], attendees=attendees, creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags ) # Add audio file path if exists if meeting['audio_file_path']: meeting_data.audio_file_path = meeting['audio_file_path'] # 在连接外部获取转录状态,避免游标冲突 try: transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id) if transcription_status_data: meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data) except Exception as e: print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}") # Don't fail the entire request if transcription status can't be fetched return meeting_data @router.get("/meetings/{meeting_id}/transcript", response_model=list[TranscriptSegment]) def get_meeting_transcript(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议的转录内容""" with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # First check if meeting exists meeting_query = "SELECT meeting_id FROM meetings WHERE meeting_id = %s" cursor.execute(meeting_query, (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # Get transcript segments transcript_query = ''' SELECT segment_id, meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content FROM transcript_segments WHERE meeting_id = %s ORDER BY start_time_ms ASC ''' cursor.execute(transcript_query, (meeting_id,)) segments = cursor.fetchall() transcript_segments = [] for segment in segments: transcript_segments.append(TranscriptSegment( segment_id=segment['segment_id'], meeting_id=segment['meeting_id'], speaker_id=segment['speaker_id'], speaker_tag=segment['speaker_tag'] if segment['speaker_tag'] else f"发言人 {segment['speaker_id']}", start_time_ms=segment['start_time_ms'], end_time_ms=segment['end_time_ms'], text_content=segment['text_content'] )) return transcript_segments @router.post("/meetings") def create_meeting(meeting_request: CreateMeetingRequest, current_user: dict = Depends(get_current_user)): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # Process tags if meeting_request.tags: tag_names = [name.strip() for name in meeting_request.tags.split(',') if name.strip()] if tag_names: insert_ignore_query = "INSERT IGNORE INTO tags (name) VALUES (%s)" cursor.executemany(insert_ignore_query, [(name,) for name in tag_names]) # Create meeting meeting_query = ''' INSERT INTO meetings (user_id, title, meeting_time, summary, tags, created_at) VALUES (%s, %s, %s, %s, %s, %s) ''' cursor.execute(meeting_query, ( meeting_request.user_id, meeting_request.title, meeting_request.meeting_time, None, meeting_request.tags, datetime.now().isoformat() )) meeting_id = cursor.lastrowid # Add attendees for attendee_id in meeting_request.attendee_ids: attendee_query = ''' INSERT IGNORE INTO attendees (meeting_id, user_id) VALUES (%s, %s) ''' cursor.execute(attendee_query, (meeting_id, attendee_id)) connection.commit() return {"message": "Meeting created successfully", "meeting_id": meeting_id} @router.put("/meetings/{meeting_id}") def update_meeting(meeting_id: int, meeting_request: UpdateMeetingRequest, current_user: dict = Depends(get_current_user)): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # Check if meeting exists and user has permission cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") if meeting['user_id'] != current_user['user_id']: raise HTTPException(status_code=403, detail="Permission denied") # Process tags if meeting_request.tags: tag_names = [name.strip() for name in meeting_request.tags.split(',') if name.strip()] if tag_names: insert_ignore_query = "INSERT IGNORE INTO tags (name) VALUES (%s)" cursor.executemany(insert_ignore_query, [(name,) for name in tag_names]) # Update meeting update_query = ''' UPDATE meetings SET title = %s, meeting_time = %s, summary = %s, tags = %s WHERE meeting_id = %s ''' cursor.execute(update_query, ( meeting_request.title, meeting_request.meeting_time, meeting_request.summary, meeting_request.tags, meeting_id )) # Update attendees - remove existing and add new ones cursor.execute("DELETE FROM attendees WHERE meeting_id = %s", (meeting_id,)) for attendee_id in meeting_request.attendee_ids: attendee_query = ''' INSERT INTO attendees (meeting_id, user_id) VALUES (%s, %s) ''' cursor.execute(attendee_query, (meeting_id, attendee_id)) connection.commit() return {"message": "Meeting updated successfully"} @router.delete("/meetings/{meeting_id}") def delete_meeting(meeting_id: int, current_user: dict = Depends(get_current_user)): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # Check if meeting exists and user has permission cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") if meeting['user_id'] != current_user['user_id']: raise HTTPException(status_code=403, detail="Permission denied") # Delete related records first (foreign key constraints) cursor.execute("DELETE FROM transcript_segments WHERE meeting_id = %s", (meeting_id,)) cursor.execute("DELETE FROM audio_files WHERE meeting_id = %s", (meeting_id,)) cursor.execute("DELETE FROM attachments WHERE meeting_id = %s", (meeting_id,)) cursor.execute("DELETE FROM attendees WHERE meeting_id = %s", (meeting_id,)) # Delete meeting cursor.execute("DELETE FROM meetings WHERE meeting_id = %s", (meeting_id,)) connection.commit() return {"message": "Meeting deleted successfully"} @router.get("/meetings/{meeting_id}/edit", response_model=Meeting) def get_meeting_for_edit(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议信息用于编辑""" with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = ''' SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags, m.user_id as creator_id, u.caption as creator_username, af.file_path as audio_file_path FROM meetings m JOIN users u ON m.user_id = u.user_id LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id WHERE m.meeting_id = %s ''' cursor.execute(query, (meeting_id,)) meeting = cursor.fetchone() if not meeting: cursor.close() raise HTTPException(status_code=404, detail="Meeting not found") # Get attendees attendees_query = ''' SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s ''' cursor.execute(attendees_query, (meeting_id,)) attendees_data = cursor.fetchall() attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data] tags = _process_tags(cursor, meeting.get('tags')) # 关闭游标,避免与转录服务的数据库连接冲突 cursor.close() meeting_data = Meeting( meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'], summary=meeting['summary'], created_at=meeting['created_at'], attendees=attendees, creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags ) # Add audio file path if exists if meeting.get('audio_file_path'): meeting_data.audio_file_path = meeting['audio_file_path'] # 在连接外部获取转录状态,避免游标冲突 try: transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id) if transcription_status_data: meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data) except Exception as e: print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}") return meeting_data @router.post("/meetings/upload-audio") async def upload_audio( audio_file: UploadFile = File(...), meeting_id: int = Form(...), force_replace: str = Form("false"), # 接收字符串,然后手动转换 current_user: dict = Depends(get_current_user) ): # Convert string to boolean force_replace_bool = force_replace.lower() in ("true", "1", "yes") # Validate file extension file_extension = os.path.splitext(audio_file.filename)[1].lower() if file_extension not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"Unsupported file type. Allowed types: {', '.join(ALLOWED_EXTENSIONS)}" ) # Check file size if audio_file.size > MAX_FILE_SIZE: raise HTTPException( status_code=400, detail="File size exceeds 100MB limit" ) # 检查是否已有音频文件和转录记录 existing_info = None has_transcription = False try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # Check if meeting exists and user has permission cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() if not meeting: cursor.close() raise HTTPException(status_code=404, detail="Meeting not found") if meeting['user_id'] != current_user['user_id']: cursor.close() raise HTTPException(status_code=403, detail="Permission denied") # Check existing audio file cursor.execute(""" SELECT file_name, file_path, upload_time FROM audio_files WHERE meeting_id = %s """, (meeting_id,)) existing_info = cursor.fetchone() # Check existing transcription segments if existing_info: cursor.execute(""" SELECT COUNT(*) as segment_count FROM transcript_segments WHERE meeting_id = %s """, (meeting_id,)) result = cursor.fetchone() has_transcription = result['segment_count'] > 0 cursor.close() except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to check existing files: {str(e)}") # 如果存在音频文件且有转录记录,需要用户确认 if existing_info and has_transcription and not force_replace_bool: return { "requires_confirmation": True, "message": "该会议已有音频文件和转录记录,重新上传将删除现有的转录内容", "existing_file": { "file_name": existing_info['file_name'], "upload_time": existing_info['upload_time'].isoformat() if existing_info['upload_time'] else None }, "transcription_segments": has_transcription, "suggestion": "请确认是否要替换现有音频文件并重新进行转录" } # Create meeting-specific directory meeting_dir = AUDIO_DIR / str(meeting_id) meeting_dir.mkdir(exist_ok=True) # Generate unique filename unique_filename = f"{uuid.uuid4()}{file_extension}" absolute_path = meeting_dir / unique_filename relative_path = absolute_path.relative_to(BASE_DIR) # Save file try: with open(absolute_path, "wb") as buffer: shutil.copyfileobj(audio_file.file, buffer) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") # Save file info to database and start transcription task_id = None replaced_existing = existing_info is not None try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 如果是替换操作,清理旧的转录数据 if replaced_existing and force_replace_bool: # Delete existing transcription segments cursor.execute("DELETE FROM transcript_segments WHERE meeting_id = %s", (meeting_id,)) # Delete existing transcription tasks cursor.execute("DELETE FROM transcript_tasks WHERE meeting_id = %s", (meeting_id,)) # Clean up old audio file if existing_info and existing_info['file_path']: old_file_path = BASE_DIR / existing_info['file_path'].lstrip('/') if old_file_path.exists(): try: os.remove(old_file_path) except Exception as e: print(f"Warning: Failed to delete old file {old_file_path}: {e}") # Insert or update audio file record if replaced_existing: update_query = ''' UPDATE audio_files SET file_name = %s, file_path = %s, file_size = %s, upload_time = NOW(), task_id = NULL WHERE meeting_id = %s ''' cursor.execute(update_query, (audio_file.filename, '/'+str(relative_path), audio_file.size, meeting_id)) else: insert_query = ''' INSERT INTO audio_files (meeting_id, file_name, file_path, file_size, upload_time) VALUES (%s, %s, %s, %s, NOW()) ''' cursor.execute(insert_query, (meeting_id, audio_file.filename, '/'+str(relative_path), audio_file.size)) connection.commit() cursor.close() # Start transcription task try: task_id = transcription_service.start_transcription(meeting_id, '/'+str(relative_path)) print(f"Transcription task started with ID: {task_id}") except Exception as e: print(f"Failed to start transcription task: {e}") # 不抛出异常,允许文件上传成功但转录失败 except Exception as e: # Clean up uploaded file on database error if os.path.exists(absolute_path): os.remove(absolute_path) raise HTTPException(status_code=500, detail=f"Failed to save file info: {str(e)}") return { "message": "Audio file uploaded successfully" + (" and replaced existing file" if replaced_existing else ""), "file_name": audio_file.filename, "file_path": '/'+str(relative_path), "task_id": task_id, "transcription_started": task_id is not None, "replaced_existing": replaced_existing, "previous_transcription_cleared": replaced_existing and has_transcription } @router.get("/meetings/{meeting_id}/audio") def get_audio_file(meeting_id: int, current_user: dict = Depends(get_current_user)): with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = ''' SELECT file_name, file_path, file_size, upload_time FROM audio_files WHERE meeting_id = %s ''' cursor.execute(query, (meeting_id,)) audio_file = cursor.fetchone() if not audio_file: raise HTTPException(status_code=404, detail="Audio file not found for this meeting") return { "file_name": audio_file['file_name'], "file_path": audio_file['file_path'], "file_size": audio_file['file_size'], "upload_time": audio_file['upload_time'] } # 转录任务相关接口 @router.get("/transcription/tasks/{task_id}/status") def get_transcription_task_status(task_id: str, current_user: dict = Depends(get_current_user)): """获取转录任务状态""" try: status_info = transcription_service.get_task_status(task_id) return status_info except Exception as e: if "Task not found" in str(e): raise HTTPException(status_code=404, detail="Transcription task not found") else: raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") @router.get("/meetings/{meeting_id}/transcription/status") def get_meeting_transcription_status(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议的转录任务状态""" try: status_info = transcription_service.get_meeting_transcription_status(meeting_id) if not status_info: return {"message": "No transcription task found for this meeting"} return status_info except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get meeting transcription status: {str(e)}") @router.post("/meetings/{meeting_id}/transcription/start") def start_meeting_transcription(meeting_id: int, current_user: dict = Depends(get_current_user)): """手动启动会议转录任务(如果有音频文件的话)""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 检查会议是否存在 cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # 查询音频文件 audio_query = "SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1" cursor.execute(audio_query, (meeting_id,)) audio_file = cursor.fetchone() if not audio_file: raise HTTPException(status_code=400, detail="No audio file found for this meeting") # 检查是否已有进行中的任务 existing_status = transcription_service.get_meeting_transcription_status(meeting_id) if existing_status and existing_status['status'] in ['pending', 'processing']: return { "message": "Transcription task already exists", "task_id": existing_status['task_id'], "status": existing_status['status'] } # 启动转录任务 task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path']) return { "message": "Transcription task started successfully", "task_id": task_id, "meeting_id": meeting_id } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to start transcription: {str(e)}") @router.post("/meetings/{meeting_id}/upload-image") async def upload_image( meeting_id: int, image_file: UploadFile = File(...), current_user: dict = Depends(get_current_user) ): # Validate file extension file_extension = os.path.splitext(image_file.filename)[1].lower() if file_extension not in ALLOWED_IMAGE_EXTENSIONS: raise HTTPException( status_code=400, detail=f"Unsupported image type. Allowed types: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}" ) # Check file size if image_file.size > MAX_IMAGE_SIZE: raise HTTPException( status_code=400, detail="Image size exceeds 10MB limit" ) # Check if meeting exists and user has permission with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") if meeting['user_id'] != current_user['user_id']: raise HTTPException(status_code=403, detail="Permission denied") # Create meeting-specific directory meeting_dir = MARKDOWN_DIR / str(meeting_id) meeting_dir.mkdir(exist_ok=True) # Generate unique filename unique_filename = f"{uuid.uuid4()}{file_extension}" absolute_path = meeting_dir / unique_filename relative_path = absolute_path.relative_to(BASE_DIR) # Save file try: with open(absolute_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save image: {str(e)}") return { "message": "Image uploaded successfully", "file_name": image_file.filename, "file_path": '/'+ str(relative_path) } # 发言人标签更新接口 @router.put("/meetings/{meeting_id}/speaker-tags") def update_speaker_tag(meeting_id: int, request: SpeakerTagUpdateRequest, current_user: dict = Depends(get_current_user)): """更新单个发言人标签(基于原始的speaker_id值)""" try: with get_db_connection() as connection: cursor = connection.cursor() # 只修改speaker_tag,保留speaker_id的原始值 update_query = """ UPDATE transcript_segments SET speaker_tag = %s WHERE meeting_id = %s AND speaker_id = %s """ cursor.execute(update_query, (request.new_tag, meeting_id, request.speaker_id)) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="No segments found for this speaker") connection.commit() return {'message': 'Speaker tag updated successfully', 'updated_count': cursor.rowcount} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to update speaker tag: {str(e)}") @router.put("/meetings/{meeting_id}/speaker-tags/batch") def batch_update_speaker_tags(meeting_id: int, request: BatchSpeakerTagUpdateRequest, current_user: dict = Depends(get_current_user)): """批量更新发言人标签(基于原始的speaker_id值)""" try: with get_db_connection() as connection: cursor = connection.cursor() total_updated = 0 for update_item in request.updates: # 只修改speaker_tag,保留speaker_id的原始值 update_query = """ UPDATE transcript_segments SET speaker_tag = %s WHERE meeting_id = %s AND speaker_id = %s """ cursor.execute(update_query, (update_item.new_tag, meeting_id, update_item.speaker_id)) total_updated += cursor.rowcount connection.commit() return {'message': 'Speaker tags updated successfully', 'total_updated': total_updated} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to batch update speaker tags: {str(e)}") # 转录内容更新接口 @router.put("/meetings/{meeting_id}/transcript/batch") def batch_update_transcript(meeting_id: int, request: BatchTranscriptUpdateRequest, current_user: dict = Depends(get_current_user)): """批量更新转录内容""" try: with get_db_connection() as connection: cursor = connection.cursor() total_updated = 0 for update_item in request.updates: # 验证segment_id是否属于指定会议 verify_query = "SELECT segment_id FROM transcript_segments WHERE segment_id = %s AND meeting_id = %s" cursor.execute(verify_query, (update_item.segment_id, meeting_id)) if not cursor.fetchone(): continue # 跳过不属于该会议的转录条目 # 更新转录内容 update_query = """ UPDATE transcript_segments SET text_content = %s WHERE segment_id = %s AND meeting_id = %s """ cursor.execute(update_query, (update_item.text_content, update_item.segment_id, meeting_id)) total_updated += cursor.rowcount connection.commit() return {'message': 'Transcript updated successfully', 'total_updated': total_updated} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to update transcript: {str(e)}") # AI总结相关接口 @router.post("/meetings/{meeting_id}/generate-summary") def generate_meeting_summary(meeting_id: int, request: GenerateSummaryRequest, current_user: dict = Depends(get_current_user)): """生成会议AI总结""" try: # 检查会议是否存在 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # 调用LLM服务生成总结 result = llm_service.generate_meeting_summary(meeting_id, request.user_prompt) if result.get("error"): raise HTTPException(status_code=500, detail=result["error"]) return { "message": "Summary generated successfully", "summary_id": result["summary_id"], "content": result["content"], "meeting_id": meeting_id } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to generate summary: {str(e)}") @router.get("/meetings/{meeting_id}/summaries") def get_meeting_summaries(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议的所有AI总结历史""" try: # 检查会议是否存在 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # 获取总结列表 summaries = llm_service.get_meeting_summaries(meeting_id) return { "meeting_id": meeting_id, "summaries": summaries } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get summaries: {str(e)}") @router.get("/meetings/{meeting_id}/summaries/{summary_id}") def get_summary_detail(meeting_id: int, summary_id: int, current_user: dict = Depends(get_current_user)): """获取特定总结的详细内容""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = """ SELECT id, summary_content, user_prompt, created_at FROM meeting_summaries WHERE id = %s AND meeting_id = %s """ cursor.execute(query, (summary_id, meeting_id)) summary = cursor.fetchone() if not summary: raise HTTPException(status_code=404, detail="Summary not found") return { "id": summary["id"], "meeting_id": meeting_id, "content": summary["summary_content"], "user_prompt": summary["user_prompt"], "created_at": summary["created_at"].isoformat() if summary["created_at"] else None } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get summary detail: {str(e)}") # ==================== 异步LLM总结相关接口 ==================== @router.post("/meetings/{meeting_id}/generate-summary-async") def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequest, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user)): """生成会议AI总结(异步版本)""" try: # 1. 检查会议是否存在 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # 2. 启动异步任务 task_id = async_llm_service.start_summary_generation(meeting_id, request.user_prompt) # 3. 将真正的处理函数作为后台任务添加 background_tasks.add_task(async_llm_service._process_task, task_id) return { "message": "Summary generation task has been accepted.", "task_id": task_id, "status": "pending", "meeting_id": meeting_id } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to start summary generation: {str(e)}") @router.get("/llm-tasks/{task_id}/status") def get_llm_task_status(task_id: str, current_user: dict = Depends(get_current_user)): """获取LLM任务状态(包括进度)""" try: status = async_llm_service.get_task_status(task_id) if status.get('status') == 'not_found': raise HTTPException(status_code=404, detail="Task not found") return status except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get task status: {str(e)}") @router.get("/meetings/{meeting_id}/llm-tasks") def get_meeting_llm_tasks(meeting_id: int, current_user: dict = Depends(get_current_user)): """获取会议的所有LLM任务历史""" try: # 检查会议是否存在 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Meeting not found") # 获取任务列表 tasks = async_llm_service.get_meeting_llm_tasks(meeting_id) return { "meeting_id": meeting_id, "tasks": tasks, "total": len(tasks) } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get LLM tasks: {str(e)}") # @router.get("/meetings/{meeting_id}/latest-llm-task") #此方法被替代 # def get_latest_llm_task(meeting_id: int, current_user: dict = Depends(get_current_user)): # """获取会议最新的LLM任务状态""" # try: # tasks = async_llm_service.get_meeting_llm_tasks(meeting_id) # if not tasks: # return { # "meeting_id": meeting_id, # "task": None, # "message": "No LLM tasks found for this meeting" # } # # 返回最新的任务 # latest_task = tasks[0] # # 如果任务还在进行中,获取实时状态 # if latest_task['status'] in ['pending', 'processing']: # latest_status = async_llm_service.get_task_status(latest_task['task_id']) # latest_task.update(latest_status) # return { # "meeting_id": meeting_id, # "task": latest_task # } # except Exception as e: # raise HTTPException(status_code=500, detail=f"Failed to get latest LLM task: {str(e)}")