import json import dashscope from http import HTTPStatus from typing import Optional, Dict, List, Generator import app.core.config as config_module from app.core.database import get_db_connection class LLMService: def __init__(self): # 设置dashscope API key dashscope.api_key = config_module.QWEN_API_KEY @property def model_name(self): """动态获取模型名称""" return config_module.LLM_CONFIG["model_name"] @property def system_prompt(self): """动态获取系统提示词""" return config_module.LLM_CONFIG["system_prompt"] @property def time_out(self): """动态获取超时时间""" return config_module.LLM_CONFIG["time_out"] @property def temperature(self): """动态获取temperature""" return config_module.LLM_CONFIG["temperature"] @property def top_p(self): """动态获取top_p""" return config_module.LLM_CONFIG["top_p"] def generate_meeting_summary_stream(self, meeting_id: int, user_prompt: str = "") -> Generator[str, None, None]: """ 流式生成会议总结 Args: meeting_id: 会议ID user_prompt: 用户额外提示词 Yields: str: 流式输出的内容片段 """ try: # 获取会议转录内容 transcript_text = self._get_meeting_transcript(meeting_id) if not transcript_text: yield "error: 无法获取会议转录内容" return # 构建完整提示词 full_prompt = self._build_prompt(transcript_text, user_prompt) # 调用大模型API进行流式生成 full_content = "" for chunk in self._call_llm_api_stream(full_prompt): if chunk.startswith("error:"): yield chunk return full_content += chunk yield chunk # 保存完整总结到数据库 if full_content: self._save_summary_to_db(meeting_id, full_content, user_prompt) except Exception as e: print(f"流式生成会议总结错误: {e}") yield f"error: {str(e)}" def generate_meeting_summary(self, meeting_id: int, user_prompt: str = "") -> Optional[Dict]: """ 生成会议总结(非流式,保持向后兼容) Args: meeting_id: 会议ID user_prompt: 用户额外提示词 Returns: 包含总结内容的字典,如果失败返回None """ try: # 获取会议转录内容 transcript_text = self._get_meeting_transcript(meeting_id) if not transcript_text: return {"error": "无法获取会议转录内容"} # 构建完整提示词 full_prompt = self._build_prompt(transcript_text, user_prompt) # 调用大模型API response = self._call_llm_api(full_prompt) if response: # 保存总结到数据库 summary_id = self._save_summary_to_db(meeting_id, response, user_prompt) return { "summary_id": summary_id, "content": response, "meeting_id": meeting_id } else: return {"error": "大模型API调用失败"} except Exception as e: print(f"生成会议总结错误: {e}") return {"error": str(e)} def _get_meeting_transcript(self, meeting_id: int) -> str: """从数据库获取会议转录内容""" try: with get_db_connection() as connection: cursor = connection.cursor() query = """ SELECT speaker_tag, start_time_ms, end_time_ms, text_content FROM transcript_segments WHERE meeting_id = %s ORDER BY start_time_ms """ cursor.execute(query, (meeting_id,)) segments = cursor.fetchall() if not segments: return "" # 组装转录文本 transcript_lines = [] for speaker_tag, start_time, end_time, text in segments: # 将毫秒转换为分:秒格式 start_min = start_time // 60000 start_sec = (start_time % 60000) // 1000 transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}") return "\n".join(transcript_lines) except Exception as e: print(f"获取会议转录内容错误: {e}") return "" def _build_prompt(self, transcript_text: str, user_prompt: str) -> str: """构建完整的提示词""" prompt = f"{self.system_prompt}\n\n" if user_prompt: prompt += f"用户额外要求:{user_prompt}\n\n" prompt += f"会议转录内容:\n{transcript_text}\n\n请根据以上内容生成会议总结:" return prompt def _call_llm_api_stream(self, prompt: str) -> Generator[str, None, None]: """流式调用阿里Qwen3大模型API""" try: responses = dashscope.Generation.call( model=self.model_name, prompt=prompt, stream=True, timeout=self.time_out, temperature=self.temperature, top_p=self.top_p, incremental_output=True # 开启增量输出模式 ) for response in responses: if response.status_code == HTTPStatus.OK: # 增量输出内容 new_content = response.output.get('text', '') if new_content: yield new_content else: error_msg = f"Request failed with status code: {response.status_code}, Error: {response.message}" print(error_msg) yield f"error: {error_msg}" break except Exception as e: error_msg = f"流式调用大模型API错误: {e}" print(error_msg) yield f"error: {error_msg}" def _call_llm_api(self, prompt: str) -> Optional[str]: """调用阿里Qwen3大模型API(非流式,保持向后兼容)""" try: response = dashscope.Generation.call( model=self.model_name, prompt=prompt, timeout=self.time_out, temperature=self.temperature, top_p=self.top_p ) if response.status_code == HTTPStatus.OK: return response.output.get('text', '') else: print(f"API调用失败: {response.status_code}, {response.message}") return None except Exception as e: print(f"调用大模型API错误: {e}") return None def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str) -> Optional[int]: """保存总结到数据库 - 更新meetings表的summary字段""" try: with get_db_connection() as connection: cursor = connection.cursor() # 更新meetings表的summary字段 update_query = """ UPDATE meetings SET summary = %s WHERE meeting_id = %s """ cursor.execute(update_query, (summary_content, meeting_id)) connection.commit() print(f"成功保存会议总结到meetings表,meeting_id: {meeting_id}") return meeting_id except Exception as e: print(f"保存总结到数据库错误: {e}") return None def get_meeting_summaries(self, meeting_id: int) -> List[Dict]: """获取会议的当前总结 - 从meetings表的summary字段获取""" try: with get_db_connection() as connection: cursor = connection.cursor() query = """ SELECT summary FROM meetings WHERE meeting_id = %s """ cursor.execute(query, (meeting_id,)) result = cursor.fetchone() # 如果有总结内容,返回一个包含当前总结的列表格式(保持API一致性) if result and result[0]: return [{ "id": meeting_id, "content": result[0], "user_prompt": "", # meetings表中没有user_prompt字段 "created_at": None # meetings表中没有单独的总结创建时间 }] else: return [] except Exception as e: print(f"获取会议总结错误: {e}") return [] def get_current_meeting_summary(self, meeting_id: int) -> Optional[str]: """获取会议当前的总结内容 - 从meetings表的summary字段获取""" try: with get_db_connection() as connection: cursor = connection.cursor() query = """ SELECT summary FROM meetings WHERE meeting_id = %s """ cursor.execute(query, (meeting_id,)) result = cursor.fetchone() return result[0] if result and result[0] else None except Exception as e: print(f"获取会议当前总结错误: {e}") return None # 测试代码 if __name__ == '__main__': # 测试LLM服务 test_meeting_id = 38 test_user_prompt = "请重点关注决策事项和待办任务" print("--- 运行LLM服务测试 ---") llm_service = LLMService() # 生成总结 result = llm_service.generate_meeting_summary(test_meeting_id, test_user_prompt) if result.get("error"): print(f"生成总结失败: {result['error']}") else: print(f"总结生成成功,ID: {result.get('summary_id')}") print(f"总结内容: {result.get('content')[:200]}...") # 获取历史总结 summaries = llm_service.get_meeting_summaries(test_meeting_id) print(f"获取到 {len(summaries)} 个历史总结") print("--- LLM服务测试完成 ---")