imetting_backend/app/api/endpoints/meetings.py

781 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import APIRouter, UploadFile, File, Form, Depends, BackgroundTasks
from app.models.models import Meeting, TranscriptSegment, TranscriptionTaskStatus, CreateMeetingRequest, UpdateMeetingRequest, SpeakerTagUpdateRequest, BatchSpeakerTagUpdateRequest, BatchTranscriptUpdateRequest, Tag
from app.core.database import get_db_connection
from app.core.config import BASE_DIR, AUDIO_DIR, MARKDOWN_DIR, ALLOWED_EXTENSIONS, ALLOWED_IMAGE_EXTENSIONS
import app.core.config as config_module
from app.services.llm_service import LLMService
from app.services.async_transcription_service import AsyncTranscriptionService
from app.services.async_meeting_service import async_meeting_service
from app.core.auth import get_current_user
from app.core.response import create_api_response
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
import os
import uuid
import shutil
router = APIRouter()
llm_service = LLMService()
transcription_service = AsyncTranscriptionService()
class GenerateSummaryRequest(BaseModel):
user_prompt: Optional[str] = ""
def _process_tags(cursor, tag_string: Optional[str], creator_id: Optional[int] = None) -> List[Tag]:
"""
处理标签:查询已存在的标签,如果提供了 creator_id 则创建不存在的标签
"""
if not tag_string:
return []
tag_names = [name.strip() for name in tag_string.split(',') if name.strip()]
if not tag_names:
return []
# 如果提供了 creator_id则创建不存在的标签
if creator_id:
insert_ignore_query = "INSERT IGNORE INTO tags (name, creator_id) VALUES (%s, %s)"
cursor.executemany(insert_ignore_query, [(name, creator_id) for name in tag_names])
# 查询所有标签信息
format_strings = ', '.join(['%s'] * len(tag_names))
cursor.execute(f"SELECT id, name, color FROM tags WHERE name IN ({format_strings})", tuple(tag_names))
tags_data = cursor.fetchall()
return [Tag(**tag) for tag in tags_data]
@router.get("/meetings")
def get_meetings(
current_user: dict = Depends(get_current_user),
user_id: Optional[int] = None,
page: int = 1,
page_size: Optional[int] = None,
search: Optional[str] = None,
tags: Optional[str] = None,
filter_type: str = "all"
):
from app.core.config import TIMELINE_PAGESIZE
# 使用配置的默认页面大小
if page_size is None:
page_size = TIMELINE_PAGESIZE
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 构建WHERE子句
where_conditions = []
params = []
# 用户过滤
if user_id:
# 需要联表查询参与者
has_attendees_join = True
else:
has_attendees_join = False
# 按类型过滤 (created/attended/all)
if user_id:
if filter_type == "created":
where_conditions.append("m.user_id = %s")
params.append(user_id)
elif filter_type == "attended":
where_conditions.append("m.user_id != %s AND a.user_id = %s")
params.extend([user_id, user_id])
has_attendees_join = True
else: # all
where_conditions.append("(m.user_id = %s OR a.user_id = %s)")
params.extend([user_id, user_id])
has_attendees_join = True
# 搜索关键词过滤
if search and search.strip():
search_pattern = f"%{search.strip()}%"
where_conditions.append("(m.title LIKE %s OR u.caption LIKE %s)")
params.extend([search_pattern, search_pattern])
# 标签过滤
if tags and tags.strip():
tag_list = [t.strip() for t in tags.split(',') if t.strip()]
if tag_list:
# 使用JSON_CONTAINS或LIKE查询
tag_conditions = []
for tag in tag_list:
tag_conditions.append("m.tags LIKE %s")
params.append(f"%{tag}%")
where_conditions.append(f"({' OR '.join(tag_conditions)})")
# 构建基础查询
base_query = '''
SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags,
m.user_id as creator_id, u.caption as creator_username, af.file_path as audio_file_path
FROM meetings m
JOIN users u ON m.user_id = u.user_id
LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id
'''
if has_attendees_join:
base_query += " LEFT JOIN attendees a ON m.meeting_id = a.meeting_id"
# 添加WHERE子句
if where_conditions:
base_query += f" WHERE {' AND '.join(where_conditions)}"
# 获取总数 - 需要在添加 GROUP BY 之前
count_base = base_query # 保存一份不含GROUP BY的查询
if has_attendees_join:
# 如果有联表,使用子查询计数
count_query = f"SELECT COUNT(DISTINCT m.meeting_id) as total {count_base[count_base.find('FROM'):]}"
else:
# 没有联表,直接计数
count_query = f"SELECT COUNT(*) as total {count_base[count_base.find('FROM'):]}"
cursor.execute(count_query, params)
total = cursor.fetchone()['total']
# 添加GROUP BY如果联表了attendees
if has_attendees_join:
base_query += " GROUP BY m.meeting_id"
# 计算分页
total_pages = (total + page_size - 1) // page_size
has_more = page < total_pages
offset = (page - 1) * page_size
# 添加排序和分页
query = f"{base_query} ORDER BY m.meeting_time DESC, m.created_at DESC LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor.execute(query, params)
meetings = cursor.fetchall()
meeting_list = []
for meeting in meetings:
attendees_query = 'SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s'
cursor.execute(attendees_query, (meeting['meeting_id'],))
attendees_data = cursor.fetchall()
attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data]
tags_list = _process_tags(cursor, meeting.get('tags'))
meeting_list.append(Meeting(
meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'],
summary=meeting['summary'], created_at=meeting['created_at'], audio_file_path=meeting['audio_file_path'],
attendees=attendees, creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags_list
))
return create_api_response(code="200", message="获取会议列表成功", data={
"meetings": meeting_list,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
"has_more": has_more
})
@router.get("/meetings/stats")
def get_meetings_stats(
current_user: dict = Depends(get_current_user),
user_id: Optional[int] = None
):
"""
获取会议统计数据:全部会议、我创建的会议、我参加的会议数量
"""
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
if not user_id:
return create_api_response(code="400", message="user_id is required")
# 获取全部会议数量(创建的 + 参加的)
all_query = '''
SELECT COUNT(DISTINCT m.meeting_id) as count
FROM meetings m
LEFT JOIN attendees a ON m.meeting_id = a.meeting_id
WHERE m.user_id = %s OR a.user_id = %s
'''
cursor.execute(all_query, (user_id, user_id))
all_count = cursor.fetchone()['count']
# 获取我创建的会议数量
created_query = '''
SELECT COUNT(*) as count
FROM meetings m
WHERE m.user_id = %s
'''
cursor.execute(created_query, (user_id,))
created_count = cursor.fetchone()['count']
# 获取我参加的会议数量(不包括我创建的)
attended_query = '''
SELECT COUNT(DISTINCT a.meeting_id) as count
FROM attendees a
JOIN meetings m ON a.meeting_id = m.meeting_id
WHERE a.user_id = %s AND m.user_id != %s
'''
cursor.execute(attended_query, (user_id, user_id))
attended_count = cursor.fetchone()['count']
return create_api_response(code="200", message="获取会议统计成功", data={
"all_meetings": all_count,
"created_meetings": created_count,
"attended_meetings": attended_count
})
@router.get("/meetings/{meeting_id}")
def get_meeting_details(meeting_id: int, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = '''
SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags,
m.user_id as creator_id, u.caption as creator_username, af.file_path as audio_file_path
FROM meetings m JOIN users u ON m.user_id = u.user_id LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id
WHERE m.meeting_id = %s
'''
cursor.execute(query, (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
attendees_query = 'SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s'
cursor.execute(attendees_query, (meeting['meeting_id'],))
attendees_data = cursor.fetchall()
attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data]
tags = _process_tags(cursor, meeting.get('tags'))
cursor.close()
meeting_data = Meeting(
meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'],
summary=meeting['summary'], created_at=meeting['created_at'], attendees=attendees,
creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags
)
if meeting['audio_file_path']:
meeting_data.audio_file_path = meeting['audio_file_path']
try:
transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id)
if transcription_status_data:
meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data)
except Exception as e:
print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}")
return create_api_response(code="200", message="获取会议详情成功", data=meeting_data)
@router.get("/meetings/{meeting_id}/transcript")
def get_meeting_transcript(meeting_id: int, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="Meeting not found")
transcript_query = '''
SELECT segment_id, meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content
FROM transcript_segments WHERE meeting_id = %s ORDER BY start_time_ms ASC
'''
cursor.execute(transcript_query, (meeting_id,))
segments = cursor.fetchall()
transcript_segments = [TranscriptSegment(
segment_id=s['segment_id'], meeting_id=s['meeting_id'], speaker_id=s['speaker_id'],
speaker_tag=s['speaker_tag'] if s['speaker_tag'] else f"发言人 {s['speaker_id']}",
start_time_ms=s['start_time_ms'], end_time_ms=s['end_time_ms'], text_content=s['text_content']
) for s in segments]
return create_api_response(code="200", message="获取转录内容成功", data=transcript_segments)
@router.post("/meetings")
def create_meeting(meeting_request: CreateMeetingRequest, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 使用 _process_tags 来处理标签创建
if meeting_request.tags:
_process_tags(cursor, meeting_request.tags, current_user['user_id'])
meeting_query = 'INSERT INTO meetings (user_id, title, meeting_time, summary, tags, created_at) VALUES (%s, %s, %s, %s, %s, %s)'
cursor.execute(meeting_query, (meeting_request.user_id, meeting_request.title, meeting_request.meeting_time, None, meeting_request.tags, datetime.now().isoformat()))
meeting_id = cursor.lastrowid
for attendee_id in meeting_request.attendee_ids:
cursor.execute('INSERT IGNORE INTO attendees (meeting_id, user_id) VALUES (%s, %s)', (meeting_id, attendee_id))
connection.commit()
return create_api_response(code="200", message="Meeting created successfully", data={"meeting_id": meeting_id})
@router.put("/meetings/{meeting_id}")
def update_meeting(meeting_id: int, meeting_request: UpdateMeetingRequest, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
if meeting['user_id'] != current_user['user_id']:
return create_api_response(code="403", message="Permission denied")
# 使用 _process_tags 来处理标签创建
if meeting_request.tags:
_process_tags(cursor, meeting_request.tags, current_user['user_id'])
update_query = 'UPDATE meetings SET title = %s, meeting_time = %s, summary = %s, tags = %s WHERE meeting_id = %s'
cursor.execute(update_query, (meeting_request.title, meeting_request.meeting_time, meeting_request.summary, meeting_request.tags, meeting_id))
cursor.execute("DELETE FROM attendees WHERE meeting_id = %s", (meeting_id,))
for attendee_id in meeting_request.attendee_ids:
cursor.execute('INSERT INTO attendees (meeting_id, user_id) VALUES (%s, %s)', (meeting_id, attendee_id))
connection.commit()
return create_api_response(code="200", message="Meeting updated successfully")
@router.delete("/meetings/{meeting_id}")
def delete_meeting(meeting_id: int, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
if meeting['user_id'] != current_user['user_id']:
return create_api_response(code="403", message="Permission denied")
cursor.execute("DELETE FROM transcript_segments WHERE meeting_id = %s", (meeting_id,))
cursor.execute("DELETE FROM audio_files WHERE meeting_id = %s", (meeting_id,))
cursor.execute("DELETE FROM attachments WHERE meeting_id = %s", (meeting_id,))
cursor.execute("DELETE FROM attendees WHERE meeting_id = %s", (meeting_id,))
cursor.execute("DELETE FROM meetings WHERE meeting_id = %s", (meeting_id,))
connection.commit()
return create_api_response(code="200", message="Meeting deleted successfully")
@router.get("/meetings/{meeting_id}/edit")
def get_meeting_for_edit(meeting_id: int, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = '''
SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.created_at, m.tags,
m.user_id as creator_id, u.caption as creator_username, af.file_path as audio_file_path
FROM meetings m JOIN users u ON m.user_id = u.user_id LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id
WHERE m.meeting_id = %s
'''
cursor.execute(query, (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
attendees_query = 'SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s'
cursor.execute(attendees_query, (meeting['meeting_id'],))
attendees_data = cursor.fetchall()
attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data]
tags = _process_tags(cursor, meeting.get('tags'))
cursor.close()
meeting_data = Meeting(
meeting_id=meeting['meeting_id'], title=meeting['title'], meeting_time=meeting['meeting_time'],
summary=meeting['summary'], created_at=meeting['created_at'], attendees=attendees,
creator_id=meeting['creator_id'], creator_username=meeting['creator_username'], tags=tags
)
if meeting.get('audio_file_path'):
meeting_data.audio_file_path = meeting['audio_file_path']
try:
transcription_status_data = transcription_service.get_meeting_transcription_status(meeting_id)
if transcription_status_data:
meeting_data.transcription_status = TranscriptionTaskStatus(**transcription_status_data)
except Exception as e:
print(f"Warning: Failed to get transcription status for meeting {meeting_id}: {e}")
return create_api_response(code="200", message="获取会议编辑信息成功", data=meeting_data)
@router.post("/meetings/upload-audio")
async def upload_audio(audio_file: UploadFile = File(...), meeting_id: int = Form(...), force_replace: str = Form("false"), current_user: dict = Depends(get_current_user)):
force_replace_bool = force_replace.lower() in ("true", "1", "yes")
file_extension = os.path.splitext(audio_file.filename)[1].lower()
if file_extension not in ALLOWED_EXTENSIONS:
return create_api_response(code="400", message=f"Unsupported file type. Allowed types: {', '.join(ALLOWED_EXTENSIONS)}")
max_file_size = getattr(config_module, 'MAX_FILE_SIZE', 100 * 1024 * 1024)
if audio_file.size > max_file_size:
return create_api_response(code="400", message=f"File size exceeds {max_file_size // (1024 * 1024)}MB limit")
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
if meeting['user_id'] != current_user['user_id']:
return create_api_response(code="403", message="Permission denied")
cursor.execute("SELECT file_name, file_path, upload_time FROM audio_files WHERE meeting_id = %s", (meeting_id,))
existing_info = cursor.fetchone()
has_transcription = False
if existing_info:
cursor.execute("SELECT COUNT(*) as segment_count FROM transcript_segments WHERE meeting_id = %s", (meeting_id,))
has_transcription = cursor.fetchone()['segment_count'] > 0
cursor.close()
except Exception as e:
return create_api_response(code="500", message=f"Failed to check existing files: {str(e)}")
if existing_info and has_transcription and not force_replace_bool:
return create_api_response(code="300", message="该会议已有音频文件和转录记录,重新上传将删除现有的转录内容", data={
"requires_confirmation": True,
"existing_file": {
"file_name": existing_info['file_name'],
"upload_time": existing_info['upload_time'].isoformat() if existing_info['upload_time'] else None
}
})
meeting_dir = AUDIO_DIR / str(meeting_id)
meeting_dir.mkdir(exist_ok=True)
unique_filename = f"{uuid.uuid4()}{file_extension}"
absolute_path = meeting_dir / unique_filename
relative_path = absolute_path.relative_to(BASE_DIR)
try:
with open(absolute_path, "wb") as buffer:
shutil.copyfileobj(audio_file.file, buffer)
except Exception as e:
return create_api_response(code="500", message=f"Failed to save file: {str(e)}")
task_id = None
replaced_existing = existing_info is not None
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
if replaced_existing and force_replace_bool:
# 只删除旧的音频文件,转录数据由 start_transcription 统一处理
if existing_info and existing_info['file_path']:
old_file_path = BASE_DIR / existing_info['file_path'].lstrip('/')
if old_file_path.exists():
try:
os.remove(old_file_path)
print(f"Deleted old audio file: {old_file_path}")
except Exception as e:
print(f"Warning: Failed to delete old file {old_file_path}: {e}")
if replaced_existing:
cursor.execute('UPDATE audio_files SET file_name = %s, file_path = %s, file_size = %s, upload_time = NOW(), task_id = NULL WHERE meeting_id = %s', (audio_file.filename, '/'+str(relative_path), audio_file.size, meeting_id))
else:
cursor.execute('INSERT INTO audio_files (meeting_id, file_name, file_path, file_size, upload_time) VALUES (%s, %s, %s, %s, NOW())', (meeting_id, audio_file.filename, '/'+str(relative_path), audio_file.size))
connection.commit()
cursor.close()
try:
# start_transcription 会自动删除旧的转录数据和任务记录
task_id = transcription_service.start_transcription(meeting_id, '/'+str(relative_path))
print(f"Transcription task started with ID: {task_id}")
except Exception as e:
print(f"Failed to start transcription task: {e}")
except Exception as e:
if os.path.exists(absolute_path):
os.remove(absolute_path)
return create_api_response(code="500", message=f"Failed to save file info: {str(e)}")
return create_api_response(code="200", message="Audio file uploaded successfully" + (" and replaced existing file" if replaced_existing else ""), data={
"file_name": audio_file.filename, "file_path": '/'+str(relative_path), "task_id": task_id,
"transcription_started": task_id is not None, "replaced_existing": replaced_existing,
"previous_transcription_cleared": replaced_existing and has_transcription
})
@router.get("/meetings/{meeting_id}/audio")
def get_audio_file(meeting_id: int, current_user: dict = Depends(get_current_user)):
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT file_name, file_path, file_size, upload_time FROM audio_files WHERE meeting_id = %s", (meeting_id,))
audio_file = cursor.fetchone()
if not audio_file:
return create_api_response(code="404", message="Audio file not found for this meeting")
return create_api_response(code="200", message="Audio file found", data=audio_file)
@router.get("/meetings/{meeting_id}/transcription/status")
def get_meeting_transcription_status(meeting_id: int, current_user: dict = Depends(get_current_user)):
try:
status_info = transcription_service.get_meeting_transcription_status(meeting_id)
if not status_info:
return create_api_response(code="404", message="No transcription task found for this meeting")
return create_api_response(code="200", message="Transcription status retrieved", data=status_info)
except Exception as e:
return create_api_response(code="500", message=f"Failed to get meeting transcription status: {str(e)}")
@router.post("/meetings/{meeting_id}/transcription/start")
def start_meeting_transcription(meeting_id: int, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="Meeting not found")
cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,))
audio_file = cursor.fetchone()
if not audio_file:
return create_api_response(code="400", message="No audio file found for this meeting")
existing_status = transcription_service.get_meeting_transcription_status(meeting_id)
if existing_status and existing_status['status'] in ['pending', 'processing']:
return create_api_response(code="409", message="Transcription task already exists", data={
"task_id": existing_status['task_id'], "status": existing_status['status']
})
task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path'])
return create_api_response(code="200", message="Transcription task started successfully", data={
"task_id": task_id, "meeting_id": meeting_id
})
except Exception as e:
return create_api_response(code="500", message=f"Failed to start transcription: {str(e)}")
@router.post("/meetings/{meeting_id}/upload-image")
async def upload_image(meeting_id: int, image_file: UploadFile = File(...), current_user: dict = Depends(get_current_user)):
file_extension = os.path.splitext(image_file.filename)[1].lower()
if file_extension not in ALLOWED_IMAGE_EXTENSIONS:
return create_api_response(code="400", message=f"Unsupported image type. Allowed types: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}")
max_image_size = getattr(config_module, 'MAX_IMAGE_SIZE', 10 * 1024 * 1024)
if image_file.size > max_image_size:
return create_api_response(code="400", message=f"Image size exceeds {max_image_size // (1024 * 1024)}MB limit")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="Meeting not found")
if meeting['user_id'] != current_user['user_id']:
return create_api_response(code="403", message="Permission denied")
meeting_dir = MARKDOWN_DIR / str(meeting_id)
meeting_dir.mkdir(exist_ok=True)
unique_filename = f"{uuid.uuid4()}{file_extension}"
absolute_path = meeting_dir / unique_filename
relative_path = absolute_path.relative_to(BASE_DIR)
try:
with open(absolute_path, "wb") as buffer:
shutil.copyfileobj(image_file.file, buffer)
except Exception as e:
return create_api_response(code="500", message=f"Failed to save image: {str(e)}")
return create_api_response(code="200", message="Image uploaded successfully", data={
"file_name": image_file.filename, "file_path": '/'+ str(relative_path)
})
@router.put("/meetings/{meeting_id}/speaker-tags")
def update_speaker_tag(meeting_id: int, request: SpeakerTagUpdateRequest, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor()
update_query = 'UPDATE transcript_segments SET speaker_tag = %s WHERE meeting_id = %s AND speaker_id = %s'
cursor.execute(update_query, (request.new_tag, meeting_id, request.speaker_id))
if cursor.rowcount == 0:
return create_api_response(code="404", message="No segments found for this speaker")
connection.commit()
return create_api_response(code="200", message="Speaker tag updated successfully", data={'updated_count': cursor.rowcount})
except Exception as e:
return create_api_response(code="500", message=f"Failed to update speaker tag: {str(e)}")
@router.put("/meetings/{meeting_id}/speaker-tags/batch")
def batch_update_speaker_tags(meeting_id: int, request: BatchSpeakerTagUpdateRequest, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor()
total_updated = 0
for update_item in request.updates:
update_query = 'UPDATE transcript_segments SET speaker_tag = %s WHERE meeting_id = %s AND speaker_id = %s'
cursor.execute(update_query, (update_item.new_tag, meeting_id, update_item.speaker_id))
total_updated += cursor.rowcount
connection.commit()
return create_api_response(code="200", message="Speaker tags updated successfully", data={'total_updated': total_updated})
except Exception as e:
return create_api_response(code="500", message=f"Failed to batch update speaker tags: {str(e)}")
@router.put("/meetings/{meeting_id}/transcript/batch")
def batch_update_transcript(meeting_id: int, request: BatchTranscriptUpdateRequest, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor()
total_updated = 0
for update_item in request.updates:
cursor.execute("SELECT segment_id FROM transcript_segments WHERE segment_id = %s AND meeting_id = %s", (update_item.segment_id, meeting_id))
if not cursor.fetchone():
continue
update_query = 'UPDATE transcript_segments SET text_content = %s WHERE segment_id = %s AND meeting_id = %s'
cursor.execute(update_query, (update_item.text_content, update_item.segment_id, meeting_id))
total_updated += cursor.rowcount
connection.commit()
return create_api_response(code="200", message="Transcript updated successfully", data={'total_updated': total_updated})
except Exception as e:
return create_api_response(code="500", message=f"Failed to update transcript: {str(e)}")
@router.get("/meetings/{meeting_id}/summaries")
def get_meeting_summaries(meeting_id: int, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="Meeting not found")
summaries = llm_service.get_meeting_summaries(meeting_id)
return create_api_response(code="200", message="Summaries retrieved successfully", data={"summaries": summaries})
except Exception as e:
return create_api_response(code="500", message=f"Failed to get summaries: {str(e)}")
@router.get("/meetings/{meeting_id}/summaries/{summary_id}")
def get_summary_detail(meeting_id: int, summary_id: int, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT id, summary_content, user_prompt, created_at FROM meeting_summaries WHERE id = %s AND meeting_id = %s"
cursor.execute(query, (summary_id, meeting_id))
summary = cursor.fetchone()
if not summary:
return create_api_response(code="404", message="Summary not found")
return create_api_response(code="200", message="Summary detail retrieved", data=summary)
except Exception as e:
return create_api_response(code="500", message=f"Failed to get summary detail: {str(e)}")
@router.post("/meetings/{meeting_id}/generate-summary-async")
def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequest, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="Meeting not found")
task_id = async_meeting_service.start_summary_generation(meeting_id, request.user_prompt)
background_tasks.add_task(async_meeting_service._process_task, task_id)
return create_api_response(code="200", message="Summary generation task has been accepted.", data={
"task_id": task_id, "status": "pending", "meeting_id": meeting_id
})
except Exception as e:
return create_api_response(code="500", message=f"Failed to start summary generation: {str(e)}")
@router.get("/meetings/{meeting_id}/llm-tasks")
def get_meeting_llm_tasks(meeting_id: int, current_user: dict = Depends(get_current_user)):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT meeting_id FROM meetings WHERE meeting_id = %s", (meeting_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="Meeting not found")
tasks = async_meeting_service.get_meeting_llm_tasks(meeting_id)
return create_api_response(code="200", message="LLM tasks retrieved successfully", data={
"tasks": tasks, "total": len(tasks)
})
except Exception as e:
return create_api_response(code="500", message=f"Failed to get LLM tasks: {str(e)}")
@router.get("/meetings/{meeting_id}/navigation")
def get_meeting_navigation(
meeting_id: int,
current_user: dict = Depends(get_current_user),
user_id: Optional[int] = None,
filter_type: str = "all",
search: Optional[str] = None,
tags: Optional[str] = None
):
"""
获取当前会议在列表中的上一条和下一条
Query params:
- user_id: 当前用户ID
- filter_type: 筛选类型 ('all', 'created', 'attended')
- search: 搜索关键词 (可选)
- tags: 标签列表,逗号分隔 (可选)
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 构建WHERE子句 - 与get_meetings保持一致
where_conditions = []
params = []
has_attendees_join = False
# 按类型过滤
if user_id:
if filter_type == "created":
where_conditions.append("m.user_id = %s")
params.append(user_id)
elif filter_type == "attended":
where_conditions.append("m.user_id != %s AND a.user_id = %s")
params.extend([user_id, user_id])
has_attendees_join = True
else: # all
where_conditions.append("(m.user_id = %s OR a.user_id = %s)")
params.extend([user_id, user_id])
has_attendees_join = True
# 搜索关键词过滤
if search and search.strip():
search_pattern = f"%{search.strip()}%"
where_conditions.append("(m.title LIKE %s OR u.caption LIKE %s)")
params.extend([search_pattern, search_pattern])
# 标签过滤
if tags and tags.strip():
tag_list = [t.strip() for t in tags.split(',') if t.strip()]
if tag_list:
tag_conditions = []
for tag in tag_list:
tag_conditions.append("m.tags LIKE %s")
params.append(f"%{tag}%")
where_conditions.append(f"({' OR '.join(tag_conditions)})")
# 构建查询 - 只获取meeting_id按meeting_time降序排序
query = '''
SELECT m.meeting_id
FROM meetings m
JOIN users u ON m.user_id = u.user_id
'''
if has_attendees_join:
query += " LEFT JOIN attendees a ON m.meeting_id = a.meeting_id"
if where_conditions:
query += f" WHERE {' AND '.join(where_conditions)}"
if has_attendees_join:
query += " GROUP BY m.meeting_id"
query += " ORDER BY m.meeting_time DESC, m.created_at DESC"
cursor.execute(query, params)
all_meetings = cursor.fetchall()
all_meeting_ids = [m['meeting_id'] for m in all_meetings]
# 找到当前会议在列表中的位置
try:
current_index = all_meeting_ids.index(meeting_id)
except ValueError:
return create_api_response(code="200", message="当前会议不在筛选结果中", data={
'prev_meeting_id': None,
'next_meeting_id': None,
'current_index': None,
'total_count': len(all_meeting_ids)
})
# 计算上一条和下一条
prev_meeting_id = all_meeting_ids[current_index - 1] if current_index > 0 else None
next_meeting_id = all_meeting_ids[current_index + 1] if current_index < len(all_meeting_ids) - 1 else None
return create_api_response(code="200", message="获取导航信息成功", data={
'prev_meeting_id': prev_meeting_id,
'next_meeting_id': next_meeting_id,
'current_index': current_index,
'total_count': len(all_meeting_ids)
})
except Exception as e:
return create_api_response(code="500", message=f"获取导航信息失败: {str(e)}")
@router.get("/meetings/{meeting_id}/preview-data")
def get_meeting_preview_data(meeting_id: int):
"""
获取会议预览数据(无需登录认证)
用于二维码扫描后的预览页面
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 检查会议是否存在
query = '''
SELECT m.meeting_id, m.title, m.meeting_time, m.summary, m.updated_at,
m.user_id as creator_id, u.caption as creator_username
FROM meetings m
JOIN users u ON m.user_id = u.user_id
WHERE m.meeting_id = %s
'''
cursor.execute(query, (meeting_id,))
meeting = cursor.fetchone()
if not meeting:
return create_api_response(code="404", message="会议不存在")
# 检查是否已生成会议总结
if not meeting['summary'] or not meeting['updated_at']:
return create_api_response(code="400", message="该会议总结尚未生成")
# 获取参会人员信息
attendees_query = 'SELECT u.user_id, u.caption FROM attendees a JOIN users u ON a.user_id = u.user_id WHERE a.meeting_id = %s'
cursor.execute(attendees_query, (meeting_id,))
attendees_data = cursor.fetchall()
attendees = [{'user_id': row['user_id'], 'caption': row['caption']} for row in attendees_data]
# 组装返回数据
preview_data = {
"meeting_id": meeting['meeting_id'],
"title": meeting['title'],
"meeting_time": meeting['meeting_time'],
"summary": meeting['summary'],
"creator_username": meeting['creator_username'],
"attendees": attendees,
"attendees_count": len(attendees)
}
return create_api_response(code="200", message="获取会议预览数据成功", data=preview_data)
except Exception as e:
return create_api_response(code="500", message=f"Failed to get meeting preview data: {str(e)}")