imetting_backend/app/services/llm_service.py

239 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import requests
from typing import Optional, Dict, List
from app.core.config import LLM_CONFIG, QWEN_API_KEY
from app.core.database import get_db_connection
class LLMService:
def __init__(self):
self.api_key = QWEN_API_KEY
self.model_name = LLM_CONFIG["model_name"]
self.api_url = LLM_CONFIG["api_url"]
self.system_prompt = LLM_CONFIG["system_prompt"]
self.max_tokens = LLM_CONFIG["max_tokens"]
self.temperature = LLM_CONFIG["temperature"]
self.top_p = LLM_CONFIG["top_p"]
def generate_meeting_summary(self, meeting_id: int, user_prompt: str = "") -> Optional[Dict]:
"""
生成会议总结
Args:
meeting_id: 会议ID
user_prompt: 用户额外提示词
Returns:
包含总结内容的字典如果失败返回None
"""
try:
# 获取会议转录内容
transcript_text = self._get_meeting_transcript(meeting_id)
if not transcript_text:
return {"error": "无法获取会议转录内容"}
# 构建完整提示词
full_prompt = self._build_prompt(transcript_text, user_prompt)
# 调用大模型API
response = self._call_llm_api(full_prompt)
if response:
# 保存总结到数据库
summary_id = self._save_summary_to_db(meeting_id, response, user_prompt)
return {
"summary_id": summary_id,
"content": response,
"meeting_id": meeting_id
}
else:
return {"error": "大模型API调用失败"}
except Exception as e:
print(f"生成会议总结错误: {e}")
return {"error": str(e)}
def _get_meeting_transcript(self, meeting_id: int) -> str:
"""从数据库获取会议转录内容"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
query = """
SELECT speaker_tag, start_time_ms, end_time_ms, text_content
FROM transcript_segments
WHERE meeting_id = %s
ORDER BY start_time_ms
"""
cursor.execute(query, (meeting_id,))
segments = cursor.fetchall()
if not segments:
return ""
# 组装转录文本
transcript_lines = []
for speaker_tag, start_time, end_time, text in segments:
# 将毫秒转换为分:秒格式
start_min = start_time // 60000
start_sec = (start_time % 60000) // 1000
transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}")
return "\n".join(transcript_lines)
except Exception as e:
print(f"获取会议转录内容错误: {e}")
return ""
def _build_prompt(self, transcript_text: str, user_prompt: str) -> str:
"""构建完整的提示词"""
prompt = f"{self.system_prompt}\n\n"
if user_prompt:
prompt += f"用户额外要求:{user_prompt}\n\n"
prompt += f"会议转录内容:\n{transcript_text}\n\n请根据以上内容生成会议总结:"
return prompt
def _call_llm_api(self, prompt: str) -> Optional[str]:
"""调用阿里Qwen3大模型API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.model_name,
"input": {
"messages": [
{
"role": "user",
"content": prompt
}
]
},
"parameters": {
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"top_p": self.top_p,
"incremental_output": False
}
}
try:
response = requests.post(self.api_url, headers=headers, json=data, timeout=60)
response.raise_for_status()
result = response.json()
# 处理阿里Qwen API的响应格式
if result.get("output") and result["output"].get("text"):
return result["output"]["text"]
elif result.get("output") and result["output"].get("choices"):
return result["output"]["choices"][0]["message"]["content"]
else:
print(f"API响应格式错误: {result}")
return None
except requests.exceptions.RequestException as e:
print(f"API请求错误: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
return None
except Exception as e:
print(f"调用大模型API错误: {e}")
return None
def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str) -> Optional[int]:
"""保存总结到数据库 - 更新meetings表的summary字段"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
# 更新meetings表的summary字段
update_query = """
UPDATE meetings
SET summary = %s
WHERE meeting_id = %s
"""
cursor.execute(update_query, (summary_content, meeting_id))
connection.commit()
print(f"成功保存会议总结到meetings表meeting_id: {meeting_id}")
return meeting_id
except Exception as e:
print(f"保存总结到数据库错误: {e}")
return None
def get_meeting_summaries(self, meeting_id: int) -> List[Dict]:
"""获取会议的当前总结 - 从meetings表的summary字段获取"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
query = """
SELECT summary
FROM meetings
WHERE meeting_id = %s
"""
cursor.execute(query, (meeting_id,))
result = cursor.fetchone()
# 如果有总结内容返回一个包含当前总结的列表格式保持API一致性
if result and result[0]:
return [{
"id": meeting_id,
"content": result[0],
"user_prompt": "", # meetings表中没有user_prompt字段
"created_at": None # meetings表中没有单独的总结创建时间
}]
else:
return []
except Exception as e:
print(f"获取会议总结错误: {e}")
return []
def get_current_meeting_summary(self, meeting_id: int) -> Optional[str]:
"""获取会议当前的总结内容 - 从meetings表的summary字段获取"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
query = """
SELECT summary
FROM meetings
WHERE meeting_id = %s
"""
cursor.execute(query, (meeting_id,))
result = cursor.fetchone()
return result[0] if result and result[0] else None
except Exception as e:
print(f"获取会议当前总结错误: {e}")
return None
# 测试代码
if __name__ == '__main__':
# 测试LLM服务
test_meeting_id = 38
test_user_prompt = "请重点关注决策事项和待办任务"
print("--- 运行LLM服务测试 ---")
llm_service = LLMService()
# 生成总结
result = llm_service.generate_meeting_summary(test_meeting_id, test_user_prompt)
if result.get("error"):
print(f"生成总结失败: {result['error']}")
else:
print(f"总结生成成功ID: {result.get('summary_id')}")
print(f"总结内容: {result.get('content')[:200]}...")
# 获取历史总结
summaries = llm_service.get_meeting_summaries(test_meeting_id)
print(f"获取到 {len(summaries)} 个历史总结")
print("--- LLM服务测试完成 ---")