From b7d4cc87822733dfc06edaf8b531452db4153ba9 Mon Sep 17 00:00:00 2001 From: Bifang <915779419@qq.com> Date: Fri, 12 Jun 2026 10:57:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=89=88=E6=9C=AC1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- meeting_memory/extractor.py | 511 ++++++++++++------ meeting_memory/graph_store.py | 612 ++++++++++++---------- meeting_memory/meeting_processor.py | 337 +++++++++--- meeting_memory/prompts/__init__.py | 5 + meeting_memory/prompts/dedupe_edges.py | 49 ++ meeting_memory/prompts/dedupe_nodes.py | 49 ++ meeting_memory/prompts/extract_edges.py | 59 +++ meeting_memory/prompts/extract_nodes.py | 53 ++ meeting_memory/prompts/summarize_nodes.py | 41 ++ 9 files changed, 1191 insertions(+), 525 deletions(-) create mode 100644 meeting_memory/prompts/__init__.py create mode 100644 meeting_memory/prompts/dedupe_edges.py create mode 100644 meeting_memory/prompts/dedupe_nodes.py create mode 100644 meeting_memory/prompts/extract_edges.py create mode 100644 meeting_memory/prompts/extract_nodes.py create mode 100644 meeting_memory/prompts/summarize_nodes.py diff --git a/meeting_memory/extractor.py b/meeting_memory/extractor.py index 4ff4671..035a45f 100644 --- a/meeting_memory/extractor.py +++ b/meeting_memory/extractor.py @@ -2,12 +2,17 @@ import json import logging import re import sys -from typing import List, Optional +from typing import Any, List, Optional from openai import OpenAI from pydantic import BaseModel, Field from meeting_memory.config import config +from meeting_memory.prompts import extract_entities as prompt_extract_entities +from meeting_memory.prompts import extract_facts as prompt_extract_facts +from meeting_memory.prompts import resolve_entities as prompt_dedupe_nodes +from meeting_memory.prompts import resolve_facts as prompt_dedupe_edges +from meeting_memory.prompts import summarize_entity as prompt_summarize logger = logging.getLogger(__name__) @@ -20,49 +25,49 @@ client = OpenAI( class Entity(BaseModel): name: str entity_type: str - description: str = "" + description: str = '' class Relation(BaseModel): subject: str - subject_type: str + subject_type: str = '' predicate: str object: str - object_type: str - description: str = "" - fact: str = "" + object_type: str = '' + description: str = '' + fact: str = '' qualifiers: List[str] = Field(default_factory=list) - evidence: str = "" + evidence: str = '' confidence: float = 0.0 - valid_at: str = "" - invalid_at: str = "" + valid_at: str = '' + invalid_at: str = '' class ActionItem(BaseModel): task: str - assignee: str = "" - deadline: str = "" - status: str = "待办" - priority: str = "中" + assignee: str = '' + deadline: str = '' + status: str = '待办' + priority: str = '中' class Decision(BaseModel): content: str - proposer: str = "" - status: str = "已决" + proposer: str = '' + status: str = '已决' class MeetingMetric(BaseModel): metric_name: str value: str - target: str = "" - owner: str = "" - trend: str = "" + target: str = '' + owner: str = '' + trend: str = '' class MeetingExtraction(BaseModel): title: str - date: str = "" + date: str = '' participants: List[str] = Field(default_factory=list) agenda: List[str] = Field(default_factory=list) entities: List[Entity] = Field(default_factory=list) @@ -70,15 +75,281 @@ class MeetingExtraction(BaseModel): action_items: List[ActionItem] = Field(default_factory=list) decisions: List[Decision] = Field(default_factory=list) metrics: List[MeetingMetric] = Field(default_factory=list) - summary: str = "" + summary: str = '' +def _call_llm( + messages: list[dict], + response_model: type | None = None, + stream: bool = False, + max_tokens: int | None = None, +) -> Any: + kwargs = { + 'model': config.llm.model, + 'messages': messages, + 'max_tokens': max_tokens or config.llm.max_tokens, + 'temperature': config.llm.temperature, + } + if response_model is not None: + kwargs['response_format'] = {'type': 'json_object'} + if stream: + kwargs['stream'] = True + + if not stream: + response = client.chat.completions.create(**kwargs) + content = response.choices[0].message.content + if content is None: + raise ValueError('LLM returned empty response') + return content + + kwargs['stream'] = True + response = client.chat.completions.create(**kwargs) + chunks: List[str] = [] + print('\n[LLM] 开始流式输出:') + for event in response: + if not event.choices: + continue + delta = event.choices[0].delta.content + if not delta: + continue + chunks.append(delta) + sys.stdout.write(delta) + sys.stdout.flush() + print('\n[LLM] 输出结束') + return ''.join(chunks) + + +def _try_parse_json(content: str) -> dict | list: + try: + return json.loads(content) + except json.JSONDecodeError: + logger.warning('JSON parsing failed; trying to repair extracted block') + match = re.search(r'\{.*\}|\[.*\]', content, re.DOTALL) + if match: + try: + return json.loads(match.group()) + except json.JSONDecodeError as exc: + logger.error('Repaired JSON still failed to parse: %s', exc) + raise + + +def _normalize_string(name: str) -> str: + return re.sub(r'[\s]+', ' ', name.strip().lower()) + + +def _format_episodes_for_context(episodes: list[dict] | None) -> str: + if not episodes: + return '' + return '\n'.join( + f'[Episode {i}] {ep.get("content", "")}' + for i, ep in enumerate(episodes) + ) + + +# ===== Step 1: 实体节点抽取 ===== + +def extract_entities_from_text( + text: str, + previous_episodes: list[dict] | None = None, + entity_types: list[dict] | None = None, + stream: bool = False, +) -> list[dict]: + context = { + 'episode_content': text, + 'previous_episodes': previous_episodes or [], + 'entity_types': entity_types or [], + } + messages = prompt_extract_entities(context) + content = _call_llm(messages, stream=stream) + try: + data = _try_parse_json(content) + except Exception as exc: + logger.error('Failed to parse entity extraction result: %s', exc) + return [] + if isinstance(data, dict): + data = data.get('entities', data.get('extracted_entities', [])) + if not isinstance(data, list): + return [] + result = [] + for item in data: + if isinstance(item, dict) and item.get('name', '').strip(): + result.append({ + 'name': item['name'].strip(), + 'entity_type': item.get('entity_type', 'Entity'), + 'description': item.get('description', ''), + 'evidence': item.get('evidence', ''), + }) + return result + + +# ===== Step 2: 实体去重 ===== + +def resolve_entities_against_graph( + extracted: list[dict], + existing: list[dict], + episode_content: str = '', +) -> list[dict]: + if not existing: + return extracted + + context = { + 'extracted_entities': extracted, + 'existing_entities': existing, + 'episode_content': episode_content, + } + messages = prompt_dedupe_nodes(context) + content = _call_llm(messages) + try: + data = _try_parse_json(content) + except Exception as exc: + logger.warning('LLM dedup failed, keeping all extracted: %s', exc) + return extracted + + if isinstance(data, dict): + data = data.get('entity_resolutions', data.get('resolutions', [])) + + extracted_by_id = {i: e for i, e in enumerate(extracted)} + existing_by_id = {c.get('candidate_id'): c for c in existing} + + for resolution in (data if isinstance(data, list) else []): + if not isinstance(resolution, dict): + continue + rid = resolution.get('id') + dup_id = resolution.get('duplicate_candidate_id', -1) + if rid is None or rid not in extracted_by_id: + continue + if dup_id >= 0 and dup_id in existing_by_id: + extracted_by_id[rid]['_resolved_to'] = existing_by_id[dup_id] + extracted_by_id[rid]['name'] = resolution.get('name', extracted_by_id[rid]['name']) + + return [e for e in extracted_by_id.values() if '_resolved_to' not in e] + + +# ===== Step 3: 事实关系抽取 ===== + +def extract_facts_from_text( + text: str, + entities: list[dict], + reference_time: str = '', + previous_episodes: list[dict] | None = None, + stream: bool = False, +) -> list[dict]: + if len(entities) < 2: + return [] + + context = { + 'episode_content': text, + 'entities': entities, + 'reference_time': reference_time, + 'previous_episodes': previous_episodes or [], + } + messages = prompt_extract_facts(context) + content = _call_llm(messages, stream=stream) + try: + data = _try_parse_json(content) + except Exception as exc: + logger.error('Failed to parse fact extraction result: %s', exc) + return [] + + if isinstance(data, dict): + data = data.get('edges', data.get('facts', data.get('relations', []))) + + if not isinstance(data, list): + return [] + + entity_names = {_normalize_string(e.get('name', '')) for e in entities} + result = [] + for item in data: + if not isinstance(item, dict): + continue + src = _normalize_string(item.get('source_entity_name', '')) + tgt = _normalize_string(item.get('target_entity_name', '')) + if src not in entity_names or tgt not in entity_names: + continue + if src == tgt: + continue + result.append({ + 'source_entity_name': item['source_entity_name'], + 'target_entity_name': item['target_entity_name'], + 'relation_type': item.get('relation_type', '关联'), + 'fact': item.get('fact', ''), + 'valid_at': item.get('valid_at', ''), + 'invalid_at': item.get('invalid_at', ''), + 'evidence': item.get('evidence', ''), + 'qualifiers': item.get('qualifiers', []), + 'confidence': item.get('confidence', 0.0), + }) + return result + + +# ===== Step 4: 事实去重/矛盾检测 ===== + +def resolve_facts_against_graph( + new_fact: dict, + existing_facts: list[dict], + invalidation_candidates: list[dict], +) -> dict: + if not existing_facts: + return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact} + + context = { + 'new_fact': new_fact.get('fact', ''), + 'existing_facts': existing_facts, + 'invalidation_candidates': invalidation_candidates, + } + messages = prompt_dedupe_edges(context) + content = _call_llm(messages) + try: + data = _try_parse_json(content) + except Exception as exc: + logger.warning('Fact dedup failed, treating as new: %s', exc) + return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact} + + if not isinstance(data, dict): + return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact} + return { + 'is_duplicate': len(data.get('duplicate_facts', [])) > 0, + 'is_contradicted': len(data.get('contradicted_facts', [])) > 0, + 'resolved': new_fact, + 'duplicate_facts': data.get('duplicate_facts', []), + 'contradicted_facts': data.get('contradicted_facts', []), + } + + +# ===== Step 5: 实体摘要 ===== + +def extract_entity_summary( + entity_name: str, + episodes: list[str], + existing_summary: str = '', + previous_episodes: list[dict] | None = None, +) -> str: + context = { + 'entity_name': entity_name, + 'episodes': episodes, + 'existing_summary': existing_summary, + 'previous_episodes': previous_episodes or [], + } + messages = prompt_summarize(context) + content = _call_llm(messages, max_tokens=1024) + try: + data = _try_parse_json(content) + except Exception: + logger.warning('Failed to parse summary, using empty') + return '' + if isinstance(data, dict): + return data.get('summary', '') + return '' + + +# ===== 统一入口(兼容原有接口) ===== + EXTRACTION_SYSTEM_PROMPT = """ 你是一个专业的会议知识抽取助手。你的任务是从中文会议记录中抽取结构化事实,尤其要抽出更细粒度、更有语义深度的关系。 输出要求: 1. 只输出一个 JSON 对象,不要输出解释文字。 -2. 关系抽取不要停留在“部门汇报了工作”这种浅层描述,要尽可能向下细化到: +2. 关系抽取不要停留在"部门汇报了工作"这种浅层描述,要尽可能向下细化到: - 责任归属 - 目标值 / 当前值 / 趋势 - 约束条件 @@ -99,51 +370,9 @@ EXTRACTION_SYSTEM_PROMPT = """ """ -def _call_llm(system: str, user: str, stream: bool = False) -> str: - if not stream: - response = client.chat.completions.create( - model=config.llm.model, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ], - max_tokens=config.llm.max_tokens, - temperature=config.llm.temperature, - ) - content = response.choices[0].message.content - if content is None: - raise ValueError("LLM returned empty response") - return content - - response = client.chat.completions.create( - model=config.llm.model, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ], - max_tokens=config.llm.max_tokens, - temperature=config.llm.temperature, - stream=True, - ) - - chunks: List[str] = [] - print("\n[LLM] 开始抽取,流式输出中:") - for event in response: - if not event.choices: - continue - delta = event.choices[0].delta.content - if not delta: - continue - chunks.append(delta) - sys.stdout.write(delta) - sys.stdout.flush() - print("\n[LLM] 抽取输出结束") - return "".join(chunks) - - def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: user_prompt = f""" -请从下面会议记录中提取结构化信息,并重点做“深层关系抽取”。 +请从下面会议记录中提取结构化信息,并重点做"深层关系抽取"。 输出 JSON 字段: - title @@ -170,9 +399,9 @@ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: - summary 关系抽取规则: -1. 不要只抽“汇报了工作”这种会议动作,要尽量继续下钻出具体事实。 -2. 如果一句话里同时包含“主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势”,应拆成多条关系或在 qualifiers 中保留这些细节。 -3. 对于“要求、部署、负责、依赖、影响、约束、目标、风险”类信息优先保留。 +1. 不要只抽"汇报了工作"这种会议动作,要尽量继续下钻出具体事实。 +2. 如果一句话里同时包含"主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势",应拆成多条关系或在 qualifiers 中保留这些细节。 +3. 对于"要求、部署、负责、依赖、影响、约束、目标、风险"类信息优先保留。 4. fact 必须是一句完整、自然、可检索的事实描述。 5. qualifiers 用于补充数值、范围、状态、条件、截止时间、优先级等信息。 6. evidence 用原文中的关键词短句,不要太长。 @@ -181,54 +410,42 @@ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: 会议记录如下: {text} """ - content = _call_llm(EXTRACTION_SYSTEM_PROMPT, user_prompt, stream=stream) + content = _call_llm([ + {'role': 'system', 'content': EXTRACTION_SYSTEM_PROMPT}, + {'role': 'user', 'content': user_prompt}, + ], stream=stream) data = _try_parse_json(content) data = _normalize_meeting_data(data) return MeetingExtraction(**data) -def _try_parse_json(content: str) -> dict: - try: - return json.loads(content) - except json.JSONDecodeError: - logger.warning("JSON parsing failed; trying to repair extracted block") - match = re.search(r"\{.*\}", content, re.DOTALL) - if match: - try: - return json.loads(match.group()) - except json.JSONDecodeError as exc: - logger.error("Repaired JSON still failed to parse: %s", exc) - raise - - def _normalize_meeting_data(data: dict) -> dict: if not isinstance(data, dict): return {} - return { - "title": _as_str(data.get("title")), - "date": _as_str(data.get("date")), - "participants": _as_str_list(data.get("participants")), - "agenda": _as_str_list(data.get("agenda")), - "entities": _normalize_entities(data.get("entities")), - "relations": _normalize_relations(data.get("relations")), - "action_items": _normalize_action_items(data.get("action_items")), - "decisions": _normalize_decisions(data.get("decisions")), - "metrics": _normalize_metrics(data.get("metrics")), - "summary": _as_str(data.get("summary")), + 'title': _as_str(data.get('title')), + 'date': _as_str(data.get('date')), + 'participants': _as_str_list(data.get('participants')), + 'agenda': _as_str_list(data.get('agenda')), + 'entities': _normalize_entities(data.get('entities')), + 'relations': _normalize_relations(data.get('relations')), + 'action_items': _normalize_action_items(data.get('action_items')), + 'decisions': _normalize_decisions(data.get('decisions')), + 'metrics': _normalize_metrics(data.get('metrics')), + 'summary': _as_str(data.get('summary')), } def _as_str(value) -> str: if value is None: - return "" + return '' if isinstance(value, str): return value return str(value) def _as_float(value) -> float: - if value is None or value == "": + if value is None or value == '': return 0.0 try: numeric = float(value) @@ -244,7 +461,7 @@ def _as_str_list(value) -> List[str]: key_text = _as_str(key) value_text = _as_str(item) if key_text and value_text: - items.append(f"{key_text}: {value_text}") + items.append(f'{key_text}: {value_text}') elif key_text: items.append(key_text) elif value_text: @@ -262,50 +479,42 @@ def _normalize_entities(value) -> List[dict]: for entity in value: if not isinstance(entity, dict): continue - items.append( - { - "name": _as_str(entity.get("name")), - "entity_type": _as_str(entity.get("entity_type")), - "description": _as_str(entity.get("description")), - } - ) + items.append({ + 'name': _as_str(entity.get('name')), + 'entity_type': _as_str(entity.get('entity_type')), + 'description': _as_str(entity.get('description')), + }) return items def _normalize_relations(value) -> List[dict]: if not isinstance(value, list): return [] - items = [] for relation in value: if not isinstance(relation, dict): continue - - subject = _as_str(relation.get("subject")) - predicate = _as_str(relation.get("predicate")) - obj = _as_str(relation.get("object")) - description = _as_str(relation.get("description")) - fact = _as_str(relation.get("fact")) - + subject = _as_str(relation.get('subject')) + predicate = _as_str(relation.get('predicate')) + obj = _as_str(relation.get('object')) + description = _as_str(relation.get('description')) + fact = _as_str(relation.get('fact')) if not fact and subject and predicate and obj: - fact = f"{subject} {predicate} {obj}" - - items.append( - { - "subject": subject, - "subject_type": _as_str(relation.get("subject_type")), - "predicate": predicate, - "object": obj, - "object_type": _as_str(relation.get("object_type")), - "description": description, - "fact": fact, - "qualifiers": _as_str_list(relation.get("qualifiers")), - "evidence": _as_str(relation.get("evidence")), - "confidence": _as_float(relation.get("confidence")), - "valid_at": _as_str(relation.get("valid_at")), - "invalid_at": _as_str(relation.get("invalid_at")), - } - ) + fact = f'{subject} {predicate} {obj}' + items.append({ + 'subject': subject, + 'subject_type': _as_str(relation.get('subject_type')), + 'predicate': predicate, + 'object': obj, + 'object_type': _as_str(relation.get('object_type')), + 'description': description, + 'fact': fact, + 'qualifiers': _as_str_list(relation.get('qualifiers')), + 'evidence': _as_str(relation.get('evidence')), + 'confidence': _as_float(relation.get('confidence')), + 'valid_at': _as_str(relation.get('valid_at')), + 'invalid_at': _as_str(relation.get('invalid_at')), + }) return items @@ -316,15 +525,13 @@ def _normalize_action_items(value) -> List[dict]: for action in value: if not isinstance(action, dict): continue - items.append( - { - "task": _as_str(action.get("task")), - "assignee": _as_str(action.get("assignee")), - "deadline": _as_str(action.get("deadline")), - "status": _as_str(action.get("status")) or "待办", - "priority": _as_str(action.get("priority")) or "中", - } - ) + items.append({ + 'task': _as_str(action.get('task')), + 'assignee': _as_str(action.get('assignee')), + 'deadline': _as_str(action.get('deadline')), + 'status': _as_str(action.get('status')) or '待办', + 'priority': _as_str(action.get('priority')) or '中', + }) return items @@ -335,13 +542,11 @@ def _normalize_decisions(value) -> List[dict]: for decision in value: if not isinstance(decision, dict): continue - items.append( - { - "content": _as_str(decision.get("content")), - "proposer": _as_str(decision.get("proposer")), - "status": _as_str(decision.get("status")) or "已决", - } - ) + items.append({ + 'content': _as_str(decision.get('content')), + 'proposer': _as_str(decision.get('proposer')), + 'status': _as_str(decision.get('status')) or '已决', + }) return items @@ -352,13 +557,11 @@ def _normalize_metrics(value) -> List[dict]: for metric in value: if not isinstance(metric, dict): continue - items.append( - { - "metric_name": _as_str(metric.get("metric_name")), - "value": _as_str(metric.get("value")), - "target": _as_str(metric.get("target")), - "owner": _as_str(metric.get("owner")), - "trend": _as_str(metric.get("trend")), - } - ) + items.append({ + 'metric_name': _as_str(metric.get('metric_name')), + 'value': _as_str(metric.get('value')), + 'target': _as_str(metric.get('target')), + 'owner': _as_str(metric.get('owner')), + 'trend': _as_str(metric.get('trend')), + }) return items diff --git a/meeting_memory/graph_store.py b/meeting_memory/graph_store.py index ab16f47..bb2c8c2 100644 --- a/meeting_memory/graph_store.py +++ b/meeting_memory/graph_store.py @@ -3,6 +3,7 @@ import json import logging import re import time +from datetime import datetime, timezone from typing import Any, Dict, List, Optional from meeting_memory.config import config @@ -14,7 +15,6 @@ logger = logging.getLogger(__name__) def _cosine_similarity(left: List[float], right: List[float]) -> float: if not left or not right or len(left) != len(right): return 0.0 - dot = sum(a * b for a, b in zip(left, right)) left_norm = sum(a * a for a in left) ** 0.5 right_norm = sum(b * b for b in right) ** 0.5 @@ -24,7 +24,7 @@ def _cosine_similarity(left: List[float], right: List[float]) -> float: def _keyword_score(text: str, question: str) -> float: - source = (text or "").lower() + source = (text or '').lower() terms = _keyword_terms(question) if not source or not terms: return 0.0 @@ -33,21 +33,19 @@ def _keyword_score(text: str, question: str) -> float: def _keyword_terms(text: str) -> List[str]: - normalized = (text or "").lower() - raw_terms = re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]{2,}", normalized) - stopwords = {"是什么", "多少", "分别", "以及", "还有", "当前值", "目标值"} - + normalized = (text or '').lower() + raw_terms = re.findall(r'[a-z0-9]+|[\u4e00-\u9fff]{2,}', normalized) + stopwords = {'是什么', '多少', '分别', '以及', '还有', '当前值', '目标值'} terms: List[str] = [] for raw in raw_terms: if raw in stopwords: continue if raw not in terms: terms.append(raw) - - if re.fullmatch(r"[\u4e00-\u9fff]{4,}", raw): + if re.fullmatch(r'[\u4e00-\u9fff]{4,}', raw): for size in (2, 3, 4): for idx in range(0, len(raw) - size + 1): - piece = raw[idx : idx + size] + piece = raw[idx: idx + size] if piece not in stopwords and piece not in terms: terms.append(piece) return terms @@ -64,22 +62,20 @@ class Neo4jGraphStore: def _connect(self): if not config.neo4j.enabled: - logger.info("Neo4j graph store disabled") + logger.info('Neo4j graph store disabled') return - try: from neo4j import GraphDatabase except ImportError: - logger.warning("neo4j package is not installed") + logger.warning('neo4j package is not installed') return - if not config.neo4j.password: - logger.warning("Neo4j is enabled but NEO4J_PASSWORD is empty") + logger.warning('Neo4j is enabled but NEO4J_PASSWORD is empty') return tried_uris = [self._uri] - if self._uri.startswith("neo4j://"): - tried_uris.append("bolt://" + self._uri[len("neo4j://") :]) + if self._uri.startswith('neo4j://'): + tried_uris.append('bolt://' + self._uri[len('neo4j://'):]) for uri in tried_uris: driver = None @@ -94,16 +90,15 @@ class Neo4jGraphStore: self._enabled = True self._last_failure_at = 0.0 if uri != config.neo4j.uri: - logger.warning("Neo4j routing URI unavailable; fell back to %s", uri) + logger.warning('Neo4j routing URI unavailable; fell back to %s', uri) return except Exception as exc: - logger.warning("Neo4j connection failed for %s: %s", uri, exc) + logger.warning('Neo4j connection failed for %s: %s', uri, exc) try: driver.close() except Exception: pass - - self._mark_unavailable("Neo4j is currently unreachable") + self._mark_unavailable('Neo4j is currently unreachable') @property def enabled(self) -> bool: @@ -114,9 +109,9 @@ class Neo4jGraphStore: def _should_retry_connect(self) -> bool: return (time.time() - self._last_failure_at) >= self._retry_cooldown_seconds - def _mark_unavailable(self, reason: str = "") -> None: + def _mark_unavailable(self, reason: str = '') -> None: if reason: - logger.warning("Neo4j temporarily disabled: %s", reason) + logger.warning('Neo4j temporarily disabled: %s', reason) self._enabled = False self._last_failure_at = time.time() if self._driver is not None: @@ -128,10 +123,10 @@ class Neo4jGraphStore: @staticmethod def meeting_id(meeting_data: dict) -> str: - title = meeting_data.get("title", "") - date = meeting_data.get("date", "") - raw = f"{date}_{title}" - return f"meeting_{hashlib.md5(raw.encode('utf-8')).hexdigest()[:12]}" + title = meeting_data.get('title', '') + date = meeting_data.get('date', '') + raw = f'{date}_{title}' + return f'meeting_{hashlib.md5(raw.encode("utf-8")).hexdigest()[:12]}' def close(self): if self._driver is not None: @@ -140,73 +135,166 @@ class Neo4jGraphStore: def run_query(self, query: str, **params) -> List[Dict[str, Any]]: if not self.enabled: return [] - try: with self._driver.session(database=config.neo4j.database) as session: result = session.run(query, **params) return [record.data() for record in result] except Exception as exc: - logger.warning("Neo4j query failed: %s", exc) + logger.warning('Neo4j query failed: %s', exc) self._mark_unavailable(str(exc)) return [] def initialize_schema(self): if not self.enabled: return - statements = [ - "CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE", - "CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE", - "CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE", - "CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE", - "CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)", - "CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)", - "CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)", - "CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)", + 'CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE', + 'CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE', + 'CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE', + 'CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE', + 'CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)', + 'CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)', + 'CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)', + 'CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)', ] for statement in statements: self.run_query(statement) def get_stats(self) -> Dict[str, Any]: if not self.enabled: - return {"enabled": False} - - rows = self.run_query( - """ - CALL () { - MATCH (m:Meeting) - RETURN count(m) AS meetings - } - CALL () { - MATCH (ep:Episode) - RETURN count(ep) AS episodes - } - CALL () { - MATCH (e:Entity) - RETURN count(e) AS entities - } - CALL () { - MATCH (f:Fact) - RETURN count(f) AS facts - } + return {'enabled': False} + rows = self.run_query(''' + CALL () { MATCH (m:Meeting) RETURN count(m) AS meetings } + CALL () { MATCH (ep:Episode) RETURN count(ep) AS episodes } + CALL () { MATCH (e:Entity) RETURN count(e) AS entities } + CALL () { MATCH (f:Fact) RETURN count(f) AS facts } RETURN meetings, episodes, entities, facts - """ - ) + ''') if not rows: - return {"enabled": False, "meetings": 0, "episodes": 0, "entities": 0, "facts": 0} - return {"enabled": True, **rows[0]} + return {'enabled': False, 'meetings': 0, 'episodes': 0, 'entities': 0, 'facts': 0} + return {'enabled': True, **rows[0]} + + # ==================== Entity Dedup (from Graphiti) ==================== + + def find_similar_entities( + self, name: str, threshold: float = 0.6, limit: int = 15 + ) -> List[Dict[str, Any]]: + if not self.enabled or not name.strip(): + return [] + query_embedding = embedding_service.embed_text(name) + rows = self.run_query(''' + MATCH (e:Entity) + RETURN e.name AS name, + e.entity_type AS entity_type, + e.summary AS summary, + e.description AS description, + e.name_embedding AS name_embedding + ''') + scored = [] + for row in rows: + score = _cosine_similarity(query_embedding, row.get('name_embedding', [])) + if score >= threshold: + scored.append({ + 'candidate_id': len(scored), + 'name': row.get('name', ''), + 'entity_type': row.get('entity_type', ''), + 'summary': row.get('summary', '') or row.get('description', ''), + 'score': score, + }) + scored.sort(key=lambda r: r['score'], reverse=True) + return scored[:limit] + + def get_entities_map(self) -> Dict[str, Dict[str, Any]]: + rows = self.run_query(''' + MATCH (e:Entity) + RETURN e.name AS name, + e.entity_type AS entity_type, + e.summary AS summary, + e.description AS description + ''') + return {r['name']: r for r in rows if r.get('name')} + + # ==================== Edge Dedup / Resolution (from Graphiti) ==================== + + def get_facts_between(self, source_name: str, target_name: str) -> List[Dict[str, Any]]: + return self.run_query(''' + MATCH (s:Entity {name: $source_name})-[source_rel:FACT_SOURCE]->(f:Fact) + WHERE (f)-[:FACT_TARGET]->(:Entity {name: $target_name}) + RETURN f.fact_id AS fact_id, + f.fact AS fact, + f.predicate AS predicate, + f.description AS description, + f.qualifiers AS qualifiers, + f.confidence AS confidence, + f.valid_at AS valid_at, + f.invalid_at AS invalid_at, + f.expired_at AS expired_at, + f.meeting_id AS meeting_id + ORDER BY coalesce(f.valid_at, '') DESC + ''', source_name=source_name, target_name=target_name) + + def search_related_facts( + self, fact_text: str, group_id: str = '', limit: int = 10 + ) -> List[Dict[str, Any]]: + if not fact_text.strip(): + return [] + query_embedding = embedding_service.embed_text(fact_text) + rows = self.run_query(''' + MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact) + OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f) + OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity) + RETURN f.fact_id AS fact_id, + f.fact AS fact, + f.predicate AS predicate, + f.description AS description, + f.fact_embedding AS fact_embedding, + f.valid_at AS valid_at, + f.invalid_at AS invalid_at, + f.expired_at AS expired_at, + coalesce(s.name, '') AS source_name, + coalesce(o.name, '') AS target_name + ''') + scored = [] + for row in rows: + score = _cosine_similarity(query_embedding, row.get('fact_embedding', [])) + if score > 0.3: + scored.append({ + 'idx': len(scored), + 'fact_id': row.get('fact_id', ''), + 'fact': row.get('fact', ''), + 'predicate': row.get('predicate', ''), + 'description': row.get('description', ''), + 'source_name': row.get('source_name', ''), + 'target_name': row.get('target_name', ''), + 'valid_at': row.get('valid_at', ''), + 'invalid_at': row.get('invalid_at', ''), + 'expired_at': row.get('expired_at', ''), + 'score': score, + }) + scored.sort(key=lambda r: r['score'], reverse=True) + return scored[:limit] + + def mark_fact_expired(self, fact_id: str, expired_at: str | None = None): + if not expired_at: + expired_at = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + self.run_query(''' + MATCH (f:Fact {fact_id: $fact_id}) + SET f.expired_at = $expired_at, + f.invalid_at = $expired_at, + f.updated_at = datetime() + ''', fact_id=fact_id, expired_at=expired_at) + + # ==================== Core Write Operations ==================== def upsert_meeting_subgraph(self, meeting_data: dict) -> None: if not self.enabled: return - - meeting_id = meeting_data.get("_graph_meeting_id") or self.meeting_id(meeting_data) + meeting_id = meeting_data.get('_graph_meeting_id') or self.meeting_id(meeting_data) episode_text = self._build_episode_text(meeting_data) episode_embedding = embedding_service.embed_text(episode_text) self.initialize_schema() - self.run_query( - """ + self.run_query(''' MERGE (m:Meeting {meeting_id: $meeting_id}) SET m.title = $title, m.date = $date, @@ -225,39 +313,37 @@ class Neo4jGraphStore: ep.content_embedding = $content_embedding, ep.updated_at = datetime() MERGE (m)-[:HAS_EPISODE]->(ep) - """, + ''', meeting_id=meeting_id, - title=meeting_data.get("title", ""), - date=meeting_data.get("date", ""), - summary=meeting_data.get("summary", ""), - content_hash=meeting_data.get("_content_hash", ""), - raw_path=meeting_data.get("_original_text_path", ""), + title=meeting_data.get('title', ''), + date=meeting_data.get('date', ''), + summary=meeting_data.get('summary', ''), + content_hash=meeting_data.get('_content_hash', ''), + raw_path=meeting_data.get('_original_text_path', ''), content=episode_text, - participants=meeting_data.get("participants", []), + participants=meeting_data.get('participants', []), content_embedding=episode_embedding, ) - for entity in meeting_data.get("entities", []): + for entity in meeting_data.get('entities', []): self._upsert_entity(meeting_id, entity) - for participant in meeting_data.get("participants", []): + for participant in meeting_data.get('participants', []): self._upsert_entity( meeting_id, - {"name": participant, "entity_type": "participant", "description": ""}, + {'name': participant, 'entity_type': 'participant', 'description': ''}, ) - for relation in meeting_data.get("relations", []): - self._upsert_relation(meeting_id, relation, meeting_data.get("date", "")) + for relation in meeting_data.get('relations', []): + self._upsert_relation(meeting_id, relation, meeting_data.get('date', '')) def _upsert_entity(self, meeting_id: str, entity: dict) -> None: - name = entity.get("name", "").strip() + name = entity.get('name', '').strip() if not name: return - summary = self._entity_summary(entity) name_embedding = embedding_service.embed_text(summary or name) - self.run_query( - """ + self.run_query(''' MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) MERGE (e:Entity {name: $name}) SET e.entity_type = CASE @@ -278,47 +364,38 @@ class Neo4jGraphStore: END, e.updated_at = datetime() MERGE (ep)-[:MENTIONS]->(e) - """, + ''', meeting_id=meeting_id, name=name, - entity_type=entity.get("entity_type", ""), - description=entity.get("description", ""), + entity_type=entity.get('entity_type', ''), + description=entity.get('description', ''), summary=summary, name_embedding=name_embedding, ) def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None: - subject = relation.get("subject", "").strip() - predicate = relation.get("predicate", "").strip() - obj = relation.get("object", "").strip() + subject = relation.get('subject', '').strip() + predicate = relation.get('predicate', '').strip() + obj = relation.get('object', '').strip() if not subject or not predicate or not obj: return self._upsert_entity( meeting_id, - { - "name": subject, - "entity_type": relation.get("subject_type", ""), - "description": "", - }, + {'name': subject, 'entity_type': relation.get('subject_type', ''), 'description': ''}, ) self._upsert_entity( meeting_id, - { - "name": obj, - "entity_type": relation.get("object_type", ""), - "description": "", - }, + {'name': obj, 'entity_type': relation.get('object_type', ''), 'description': ''}, ) fact_text = self._fact_text(relation) fact_id = hashlib.md5( - f"{meeting_id}|{subject}|{predicate}|{obj}".encode("utf-8") + f'{meeting_id}|{subject}|{predicate}|{obj}'.encode('utf-8') ).hexdigest() fact_embedding = embedding_service.embed_text(fact_text) - self.run_query( - """ + self.run_query(''' MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) MATCH (s:Entity {name: $subject}) MATCH (o:Entity {name: $object}) @@ -338,19 +415,19 @@ class Neo4jGraphStore: MERGE (ep)-[:HAS_FACT]->(f) MERGE (s)-[:FACT_SOURCE]->(f) MERGE (f)-[:FACT_TARGET]->(o) - """, + ''', meeting_id=meeting_id, subject=subject, predicate=predicate, object=obj, fact_id=fact_id, fact=fact_text, - description=relation.get("description", ""), - qualifiers=relation.get("qualifiers", []), - evidence=relation.get("evidence", ""), - confidence=relation.get("confidence", 0.0), - valid_at=relation.get("valid_at", ""), - invalid_at=relation.get("invalid_at", ""), + description=relation.get('description', ''), + qualifiers=relation.get('qualifiers', []), + evidence=relation.get('evidence', ''), + confidence=relation.get('confidence', 0.0), + valid_at=relation.get('valid_at', ''), + invalid_at=relation.get('invalid_at', ''), meeting_date=meeting_date, fact_embedding=fact_embedding, ) @@ -358,9 +435,7 @@ class Neo4jGraphStore: def remove_meeting_subgraph(self, meeting_id: str) -> None: if not self.enabled: return - - self.run_query( - """ + self.run_query(''' MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) OPTIONAL MATCH (ep)-[mention:MENTIONS]->(entity:Entity) OPTIONAL MATCH (ep)-[has_fact:HAS_FACT]->(fact:Fact) @@ -378,66 +453,49 @@ class Neo4jGraphStore: WITH entity, count(m1) + count(m2) AS refs WHERE refs = 0 DELETE entity - """, - meeting_id=meeting_id, - ) + ''', meeting_id=meeting_id) - def get_meeting(self, title: str, date: str = "") -> Optional[Dict[str, Any]]: + # ==================== Retrieval ==================== + + def get_meeting(self, title: str, date: str = '') -> Optional[Dict[str, Any]]: if not self.enabled: return None - - rows = self.run_query( - """ + rows = self.run_query(''' MATCH (m:Meeting) - WHERE m.title = $title - AND ($date = '' OR m.date = $date) - RETURN m.meeting_id AS meeting_id, - m.title AS title, - m.date AS date, - m.summary AS summary, - m.content_hash AS content_hash + WHERE m.title = $title AND ($date = '' OR m.date = $date) + RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date, + m.summary AS summary, m.content_hash AS content_hash LIMIT 1 - """, - title=title, - date=date, - ) + ''', title=title, date=date) return rows[0] if rows else None def find_similar_episode(self, text: str, threshold: float = 0.92) -> Optional[Dict[str, Any]]: if not self.enabled or not text.strip(): return None - query_embedding = embedding_service.embed_text(text) - rows = self.run_query( - """ + rows = self.run_query(''' MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode) - RETURN m.meeting_id AS meeting_id, - m.title AS title, - m.date AS date, - m.content_hash AS content_hash, - ep.content_embedding AS content_embedding - """ - ) - + RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date, + m.content_hash AS content_hash, ep.content_embedding AS content_embedding + ''') best_match = None for row in rows: - score = _cosine_similarity(query_embedding, row.get("content_embedding", [])) - if score >= threshold and (best_match is None or score > best_match["score"]): + score = _cosine_similarity(query_embedding, row.get('content_embedding', [])) + if score >= threshold and (best_match is None or score > best_match['score']): best_match = { - "metadata": { - "meeting_id": row.get("meeting_id", ""), - "title": row.get("title", ""), - "date": row.get("date", ""), - "content_hash": row.get("content_hash", ""), + 'metadata': { + 'meeting_id': row.get('meeting_id', ''), + 'title': row.get('title', ''), + 'date': row.get('date', ''), + 'content_hash': row.get('content_hash', ''), }, - "score": score, + 'score': score, } return best_match def hybrid_search(self, question: str, limit: int = 5) -> List[Dict[str, Any]]: if not self.enabled or not question.strip(): return [] - query_embedding = embedding_service.embed_text(question) candidates = self._load_fact_candidates() candidates.extend(self._load_entity_candidates()) @@ -445,31 +503,26 @@ class Neo4jGraphStore: scored = [] for item in candidates: - combined_text = " ".join( - [ - str(item.get("title") or ""), - str(item.get("text") or ""), - str(item.get("meeting_title") or ""), - str(item.get("date") or ""), - ] - ) - semantic = _cosine_similarity(query_embedding, item.get("embedding", [])) + combined_text = ' '.join([ + str(item.get('title') or ''), + str(item.get('text') or ''), + str(item.get('meeting_title') or ''), + str(item.get('date') or ''), + ]) + semantic = _cosine_similarity(query_embedding, item.get('embedding', [])) lexical = _keyword_score(combined_text, question) - graph_bonus = 0.1 if item.get("kind") == "fact" else 0.05 + graph_bonus = 0.1 if item.get('kind') == 'fact' else 0.05 score = semantic * 0.7 + lexical * 0.2 + graph_bonus if score <= 0: continue + scored.append({ + **item, + 'score': round(score, 4), + 'semantic_score': round(semantic, 4), + 'keyword_score': round(lexical, 4), + }) - scored.append( - { - **item, - "score": round(score, 4), - "semantic_score": round(semantic, 4), - "keyword_score": round(lexical, 4), - } - ) - - scored.sort(key=lambda row: row["score"], reverse=True) + scored.sort(key=lambda row: row['score'], reverse=True) return scored[:limit] def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]: @@ -478,43 +531,35 @@ class Neo4jGraphStore: def get_graph_kinds(self) -> List[Dict[str, Any]]: if not self.enabled: return [] - rows = self.run_query( - """ + return self.run_query(''' MATCH (n) WHERE n:Meeting OR n:Episode OR n:Entity OR n:Fact WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind - RETURN kind, count(*) AS count - ORDER BY count DESC - """ - ) - return rows + RETURN kind, count(*) AS count ORDER BY count DESC + ''') def get_entity_types(self) -> List[Dict[str, Any]]: if not self.enabled: return [] - return self.run_query( - """ + return self.run_query(''' MATCH (e:Entity) WHERE coalesce(e.entity_type, '') <> '' - RETURN e.entity_type AS entity_type, count(*) AS count - ORDER BY count DESC - """ - ) + RETURN e.entity_type AS entity_type, count(*) AS count ORDER BY count DESC + ''') def get_graph_snapshot( self, - query: str = "", + query: str = '', entity_types: Optional[List[str]] = None, kinds: Optional[List[str]] = None, limit_nodes: int = 80, limit_edges: int = 160, ) -> Dict[str, Any]: if not self.enabled: - return {"nodes": [], "edges": [], "stats": {"enabled": False}} + return {'nodes': [], 'edges': [], 'stats': {'enabled': False}} keyword_terms = _keyword_terms(query) if query else [] - raw_nodes = self.run_query( - """ + raw_nodes = self.run_query(''' MATCH (n) WHERE (n:Meeting OR n:Episode OR n:Entity OR n:Fact) AND ($kinds = [] OR [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] IN $kinds) @@ -543,55 +588,54 @@ class Neo4jGraphStore: count(DISTINCT r) AS degree ORDER BY degree DESC, coalesce(n.title, n.name, n.fact) ASC LIMIT $limit_nodes - """, + ''', terms=keyword_terms, types=entity_types or [], kinds=kinds or [], limit_nodes=limit_nodes, ) if not raw_nodes: - return {"nodes": [], "edges": [], "stats": self.get_stats()} + return {'nodes': [], 'edges': [], 'stats': self.get_stats()} all_raw_ids = set() nodes = [] for row in raw_nodes: - kind = row.get("kind", "") - if kind == "Meeting": - raw_id = row.get("meeting_id", "") - label = row.get("title", "") or raw_id - elif kind == "Episode": - raw_id = row.get("episode_id", "") - label = row.get("title", "") or raw_id - elif kind == "Entity": - raw_id = row.get("entity_name", "") + kind = row.get('kind', '') + if kind == 'Meeting': + raw_id = row.get('meeting_id', '') + label = row.get('title', '') or raw_id + elif kind == 'Episode': + raw_id = row.get('episode_id', '') + label = row.get('title', '') or raw_id + elif kind == 'Entity': + raw_id = row.get('entity_name', '') label = raw_id - elif kind == "Fact": - raw_id = row.get("fact_id", "") - label = row.get("predicate", "") or row.get("fact", "") or raw_id + elif kind == 'Fact': + raw_id = row.get('fact_id', '') + label = row.get('predicate', '') or row.get('fact', '') or raw_id else: continue if not raw_id: continue - nid = f"{kind}:{raw_id}" + nid = f'{kind}:{raw_id}' all_raw_ids.add(raw_id) nodes.append({ - "id": nid, - "label": label, - "kind": kind, - "entity_type": row.get("entity_type", "") if kind == "Entity" else "", - "description": row.get("description", "") or row.get("summary", "") or "", - "date": row.get("date", "") or row.get("meeting_date", "") or "", - "degree": row.get("degree", 0), - "fact": row.get("fact", "") if kind == "Fact" else "", - "summary": row.get("summary", "") or "", + 'id': nid, + 'label': label, + 'kind': kind, + 'entity_type': row.get('entity_type', '') if kind == 'Entity' else '', + 'description': row.get('description', '') or row.get('summary', '') or '', + 'date': row.get('date', '') or row.get('meeting_date', '') or '', + 'degree': row.get('degree', 0), + 'fact': row.get('fact', '') if kind == 'Fact' else '', + 'summary': row.get('summary', '') or '', }) if not nodes: - return {"nodes": [], "edges": [], "stats": self.get_stats()} + return {'nodes': [], 'edges': [], 'stats': self.get_stats()} ids_list = list(all_raw_ids) - edges_raw = self.run_query( - """ + edges_raw = self.run_query(''' MATCH (s)-[r]->(t) WHERE type(r) IN ['HAS_EPISODE','MENTIONS','HAS_FACT','FACT_SOURCE','FACT_TARGET'] AND ( @@ -632,65 +676,59 @@ class Neo4jGraphStore: CASE WHEN s:Fact THEN coalesce(s.meeting_id, '') WHEN t:Fact THEN coalesce(t.meeting_id, '') ELSE '' END AS fact_meeting_id LIMIT $limit_edges - """, - ids=list(all_raw_ids), - limit_edges=limit_edges, - ) + ''', ids=list(all_raw_ids), limit_edges=limit_edges) degree_map: Dict[str, int] = {} for row in edges_raw: - src = row.get("source", "") - tgt = row.get("target", "") - degree_map[src] = degree_map.get(src, 0) + 1 - degree_map[tgt] = degree_map.get(tgt, 0) + 1 + src = row.get('source_raw', '') + tgt = row.get('target_raw', '') + sk = row.get('source_kind', '') + tk = row.get('target_kind', '') + if sk and src: + degree_map[f'{sk}:{src}'] = degree_map.get(f'{sk}:{src}', 0) + 1 + if tk and tgt: + degree_map[f'{tk}:{tgt}'] = degree_map.get(f'{tk}:{tgt}', 0) + 1 for node in nodes: - node["degree"] = degree_map.get(node["id"], node.get("degree", 0)) + node['degree'] = degree_map.get(node['id'], node.get('degree', 0)) edges = [] for idx, row in enumerate(edges_raw, start=1): - sk = row.get("source_kind", "") - tk = row.get("target_kind", "") + sk = row.get('source_kind', '') + tk = row.get('target_kind', '') edges.append({ - "id": f"edge_{idx}", - "source": f"{sk}:{row['source_raw']}" if sk and row.get("source_raw") else "", - "target": f"{tk}:{row['target_raw']}" if tk and row.get("target_raw") else "", - "predicate": row.get("predicate", ""), - "fact": row.get("fact_text", "") or row.get("fact_description", "") or "", - "description": row.get("fact_description", "") or "", - "confidence": row.get("fact_confidence", 0.0), - "date": row.get("fact_date", "") or "", - "meeting_id": row.get("fact_meeting_id", "") or "", + 'id': f'edge_{idx}', + 'source': f'{sk}:{row["source_raw"]}' if sk and row.get('source_raw') else '', + 'target': f'{tk}:{row["target_raw"]}' if tk and row.get('target_raw') else '', + 'predicate': row.get('predicate', ''), + 'fact': row.get('fact_text', '') or row.get('fact_description', '') or '', + 'description': row.get('fact_description', '') or '', + 'confidence': row.get('fact_confidence', 0.0), + 'date': row.get('fact_date', '') or '', + 'meeting_id': row.get('fact_meeting_id', '') or '', }) - return { - "nodes": nodes, - "edges": edges, - "stats": self.get_stats(), - "query": query, - } + return {'nodes': nodes, 'edges': edges, 'stats': self.get_stats(), 'query': query} def format_search_context(self, question: str, top_k: int = 5) -> str: results = self.hybrid_search(question, limit=top_k) if not results: - return "" - + return '' lines = [] for idx, row in enumerate(results, start=1): - date = row.get("date", "") - meeting_title = row.get("meeting_title", "") - title = row.get("title", row.get("kind", "item")) - suffix = f" ({date})" if date else "" - source = f" | 来源会议: {meeting_title}" if meeting_title else "" + date = row.get('date', '') + meeting_title = row.get('meeting_title', '') + title = row.get('title', row.get('kind', 'item')) + suffix = f' ({date})' if date else '' + source = f' | 来源会议: {meeting_title}' if meeting_title else '' lines.append( - f"[{idx}] {title}{suffix}{source}\n" - f"{row.get('text', '')}\n" - f"score={row.get('score', 0):.4f}, semantic={row.get('semantic_score', 0):.4f}, keyword={row.get('keyword_score', 0):.4f}" + f'[{idx}] {title}{suffix}{source}\n' + f'{row.get("text", "")}\n' + f'score={row.get("score", 0):.4f}, semantic={row.get("semantic_score", 0):.4f}, keyword={row.get("keyword_score", 0):.4f}' ) - return "\n\n".join(lines) + return '\n\n'.join(lines) def _load_fact_candidates(self) -> List[Dict[str, Any]]: - return self.run_query( - """ + return self.run_query(''' MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact) OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f) OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity) @@ -698,23 +736,21 @@ class Neo4jGraphStore: coalesce(s.name + ' -[' + coalesce(f.predicate, '') + ']-> ' + o.name, f.fact) AS title, coalesce( f.description + CASE - WHEN size(coalesce(f.qualifiers, [])) > 0 THEN ' | ' + reduce(acc = '', item IN f.qualifiers | + WHEN size(coalesce(f.qualifiers, [])) > 0 + THEN ' | ' + reduce(acc = '', item IN f.qualifiers | acc + CASE WHEN acc = '' THEN item ELSE '; ' + item END ) ELSE '' END, - f.fact, - '' + f.fact, '' ) AS text, ep.date AS date, ep.title AS meeting_title, f.fact_embedding AS embedding - """ - ) + ''') def _load_entity_candidates(self) -> List[Dict[str, Any]]: - return self.run_query( - """ + return self.run_query(''' MATCH (e:Entity) OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(e) RETURN 'entity' AS kind, @@ -723,12 +759,10 @@ class Neo4jGraphStore: max(ep.date) AS date, head(collect(DISTINCT ep.title)) AS meeting_title, e.name_embedding AS embedding - """ - ) + ''') def _load_episode_candidates(self) -> List[Dict[str, Any]]: - return self.run_query( - """ + return self.run_query(''' MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode) RETURN 'episode' AS kind, m.title AS title, @@ -736,47 +770,45 @@ class Neo4jGraphStore: ep.date AS date, m.title AS meeting_title, ep.content_embedding AS embedding - """ - ) + ''') @staticmethod def _entity_summary(entity: dict) -> str: - entity_type = entity.get("entity_type", "").strip() - name = entity.get("name", "").strip() - description = entity.get("description", "").strip() + entity_type = entity.get('entity_type', '').strip() + name = entity.get('name', '').strip() + description = entity.get('description', '').strip() parts = [part for part in [entity_type, name, description] if part] - return " | ".join(parts) + return ' | '.join(parts) @staticmethod def _fact_text(relation: dict) -> str: - subject = relation.get("subject", "").strip() - predicate = relation.get("predicate", "").strip() - obj = relation.get("object", "").strip() - description = relation.get("description", "").strip() - fact = relation.get("fact", "").strip() or f"{subject} {predicate} {obj}".strip() - qualifiers = relation.get("qualifiers", []) - qualifier_text = "; ".join(item for item in qualifiers if item) - if description and qualifier_text: - return f"{fact}. {description}. {qualifier_text}" + subject = relation.get('subject', '').strip() + predicate = relation.get('predicate', '').strip() + obj = relation.get('object', '').strip() + description = relation.get('description', '').strip() + fact = relation.get('fact', '').strip() or f'{subject} {predicate} {obj}'.strip() + qualifiers = relation.get('qualifiers', []) + qualifier_text = '; '.join(item for item in qualifiers if item) + parts = [fact] if description: - return f"{fact}. {description}" + parts.append(description) if qualifier_text: - return f"{fact}. {qualifier_text}" - return fact + parts.append(qualifier_text) + return '. '.join(parts) @staticmethod def _build_episode_text(meeting_data: dict) -> str: payload = { - "title": meeting_data.get("title", ""), - "date": meeting_data.get("date", ""), - "participants": meeting_data.get("participants", []), - "summary": meeting_data.get("summary", ""), - "entities": meeting_data.get("entities", []), - "relations": meeting_data.get("relations", []), - "action_items": meeting_data.get("action_items", []), - "metrics": meeting_data.get("metrics", []), - "decisions": meeting_data.get("decisions", []), - "original_text": meeting_data.get("_original_text", ""), + 'title': meeting_data.get('title', ''), + 'date': meeting_data.get('date', ''), + 'participants': meeting_data.get('participants', []), + 'summary': meeting_data.get('summary', ''), + 'entities': meeting_data.get('entities', []), + 'relations': meeting_data.get('relations', []), + 'action_items': meeting_data.get('action_items', []), + 'metrics': meeting_data.get('metrics', []), + 'decisions': meeting_data.get('decisions', []), + 'original_text': meeting_data.get('_original_text', ''), } return json.dumps(payload, ensure_ascii=False) diff --git a/meeting_memory/meeting_processor.py b/meeting_memory/meeting_processor.py index ddeceea..95a4371 100644 --- a/meeting_memory/meeting_processor.py +++ b/meeting_memory/meeting_processor.py @@ -1,9 +1,18 @@ import hashlib import logging -from typing import Callable, Optional +from typing import Callable, List, Optional from meeting_memory.config import config -from meeting_memory.extractor import MeetingExtraction, extract_meeting_info +from meeting_memory.extractor import ( + MeetingExtraction, + extract_entities_from_text, + extract_facts_from_text, + extract_meeting_info as monolithic_extract, +) +from meeting_memory.extractor import ( + resolve_entities_against_graph, + resolve_facts_against_graph, +) from meeting_memory.graph_store import graph_store from meeting_memory.meeting_state import MeetingStateStore from meeting_memory.raw_store import raw_meeting_store @@ -15,8 +24,9 @@ ProgressCallback = Callable[[int, int, str], None] class MeetingProcessor: + def process_meeting_file(self, filepath: str, force: bool = False) -> Optional[str]: - with open(filepath, "r", encoding="utf-8") as file_obj: + with open(filepath, 'r', encoding='utf-8') as file_obj: text = file_obj.read() return self.process_meeting_text(text, force=force) @@ -26,147 +36,312 @@ class MeetingProcessor: force: bool = False, interactive: bool = True, progress_callback: Optional[ProgressCallback] = None, + use_multistep_extraction: bool = True, ) -> Optional[str]: - def report(step: int, message: str) -> None: + def report(step: int, total: int, message: str) -> None: if progress_callback: - progress_callback(step, 7, message) - print(f"[{step}/7] {message}") + progress_callback(step, total, message) + print(f'[{step}/{total}] {message}') - report(1, "计算内容哈希") + if use_multistep_extraction: + return self._process_multistep(text, force, interactive, report) + else: + return self._process_monolithic(text, force, interactive, report) + + def _process_monolithic( + self, text: str, force: bool, interactive: bool, + report: Callable, + ) -> Optional[str]: + total_steps = 7 + report(1, total_steps, '计算内容哈希') content_hash = self._compute_content_hash(text) if not force and state_store.has_content_hash(content_hash): - logger.info("Duplicate content hash skipped: %s", content_hash[:12]) + logger.info('Duplicate content hash skipped: %s', content_hash[:12]) return None if not force: - report(2, "Neo4j 语义相似去重检索") + report(2, total_steps, 'Neo4j 语义相似去重检索') similar = graph_store.find_similar_episode(text, threshold=0.92) if similar: - meta = similar["metadata"] + meta = similar['metadata'] if not interactive: - logger.info( - "Skipped similar meeting in non-interactive mode: %s", - meta.get("title", ""), - ) + logger.info('Skipped similar meeting: %s', meta.get('title', '')) return None - - print( - f"\n发现相似会议:{meta.get('title', '')} ({meta.get('date', '')}) " - f"相似度 {similar['score']:.2%}" - ) + print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}') while True: - choice = input("选择 [s]跳过 / [o]覆盖(默认 s):").strip().lower() or "s" - if choice == "s": - logger.info("Skipped similar meeting: %s", meta.get("title", "")) + choice = input('选择 [s]跳过 / [o]覆盖(默认 s):').strip().lower() or 's' + if choice == 's': + logger.info('Skipped similar meeting: %s', meta.get('title', '')) return None - if choice == "o": + if choice == 'o': force = True break - print("请输入 s 或 o。") + print('请输入 s 或 o。') else: - report(2, "跳过语义去重,按覆盖模式继续") + report(2, total_steps, '跳过语义去重,按覆盖模式继续') - report(3, "调用大模型抽取结构化信息") - meeting_data = self._extract(text) + report(3, total_steps, '调用大模型抽取结构化信息(单步模式)') + meeting_data = self._extract_monolithic(text) if not meeting_data: - logger.error("Failed to extract meeting information") + logger.error('Failed to extract meeting information') return None data_dict = meeting_data.model_dump() - data_dict["_content_hash"] = content_hash - data_dict["_graph_meeting_id"] = graph_store.meeting_id(data_dict) + return self._finish_pipeline(data_dict, content_hash, text, force, interactive, report, total_steps) - report(4, "检查标题和日期重复") + def _process_multistep( + self, text: str, force: bool, interactive: bool, + report: Callable, + ) -> Optional[str]: + total_steps = 10 + report(1, total_steps, '计算内容哈希') + content_hash = self._compute_content_hash(text) + + if not force and state_store.has_content_hash(content_hash): + logger.info('Duplicate content hash skipped: %s', content_hash[:12]) + return None + + if not force: + report(2, total_steps, 'Neo4j 语义相似去重检索') + similar = graph_store.find_similar_episode(text, threshold=0.92) + if similar: + meta = similar['metadata'] + if not interactive: + logger.info('Skipped similar meeting: %s', meta.get('title', '')) + return None + print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}') + while True: + choice = input('选择 [s]跳过 / [o]覆盖(默认 s):').strip().lower() or 's' + if choice == 's': + logger.info('Skipped similar meeting: %s', meta.get('title', '')) + return None + if choice == 'o': + force = True + break + print('请输入 s 或 o。') + else: + report(2, total_steps, '跳过语义去重,按覆盖模式继续') + + # Step 3: 提取标题、日期、参与人等元信息 + report(3, total_steps, '抽取会议元信息(标题、日期、参与者等)') + meta_info = self._extract_monolithic(text) + if not meta_info: + logger.error('Failed to extract meeting metadata') + return None + data_dict = meta_info.model_dump() + data_dict['_content_hash'] = content_hash + data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict) + data_dict['_original_text'] = text + + # Step 4: 抽取实体节点(LLM 调用 1) + report(4, total_steps, '第 1 步实体抽取:识别会议中提及的实体') + previous_episodes = self._get_previous_episodes_context(data_dict) + extracted_entities = extract_entities_from_text( + text, previous_episodes=previous_episodes, stream=True + ) + logger.info('Extracted %d entities from meeting', len(extracted_entities)) + if not extracted_entities: + logger.warning('No entities extracted, aborting') + return None + + # Step 5: 实体去重(与已有图谱对比 + LLM 裁决) + report(5, total_steps, '实体去重:与图谱中已有实体对比') + resolved_entities = self._dedup_entities(extracted_entities, text) + data_dict['entities'] = resolved_entities + logger.info('After dedup: %d entities remain', len(resolved_entities)) + + # Step 6: 抽取事实关系(LLM 调用 2) + report(6, total_steps, '事实抽取:提取实体间的结构化关系') + reference_time = data_dict.get('date', '') + extracted_facts = extract_facts_from_text( + text, resolved_entities, + reference_time=reference_time, + previous_episodes=previous_episodes, + stream=True, + ) + logger.info('Extracted %d facts from meeting', len(extracted_facts)) + + # Step 7: 事实去重与矛盾检测 + report(7, total_steps, '事实解析:去重与矛盾检测') + resolved_facts = self._dedup_facts(extracted_facts, data_dict) + data_dict['relations'] = resolved_facts + logger.info('After dedup: %d facts remain', len(resolved_facts)) + + # Step 8: 检查标题和日期重复 + report(8, total_steps, '检查标题和日期重复') should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive) if should_skip: return None - meeting_title = data_dict.get("title", "") - meeting_date = data_dict.get("date", "") + meeting_title = data_dict.get('title', '') + meeting_date = data_dict.get('date', '') - report(5, "归档原始会议文本") + # Step 9: 归档 + 合并行动项/指标 + report(9, total_steps, '归档和状态合并') raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date) - data_dict["_original_text"] = text - data_dict["_original_text_path"] = raw_path + data_dict['_original_text_path'] = raw_path - meeting_filename = f"{graph_store.meeting_id(data_dict)}.md" - - report(6, "合并行动项和指标状态") - data_dict["action_items"] = state_store.merge_action_items( - data_dict.get("action_items", []), - meeting_title, - meeting_date, - meeting_filename, + meeting_filename = f'{graph_store.meeting_id(data_dict)}.md' + data_dict['action_items'] = state_store.merge_action_items( + data_dict.get('action_items', []), + meeting_title, meeting_date, meeting_filename, ) - data_dict["metrics"] = state_store.merge_metrics( - data_dict.get("metrics", []), - meeting_title, - meeting_date, - meeting_filename, + data_dict['metrics'] = state_store.merge_metrics( + data_dict.get('metrics', []), + meeting_title, meeting_date, meeting_filename, ) - state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename) state_store.save() - report(7, "写入 Neo4j 图谱和检索数据") + # Step 10: 写入 Neo4j + report(10, total_steps, '写入 Neo4j 图谱') graph_store.upsert_meeting_subgraph(data_dict) - logger.info("Meeting processed: %s", meeting_title) + logger.info('Meeting processed (multi-step): %s', meeting_title) + return raw_path + + def _get_previous_episodes_context(self, data_dict: dict) -> list: + meeting_title = data_dict.get('title', '') + meeting_date = data_dict.get('date', '') + series_info = state_store.get_series_info(meeting_title) + if not series_info: + return [] + processed = series_info.get('processed_titles', []) + if not processed: + return [] + rows = graph_store.run_query(''' + MATCH (m:Meeting) + WHERE m.title IN $titles + OPTIONAL MATCH (m)-[:HAS_EPISODE]->(ep:Episode) + RETURN m.title AS title, m.date AS date, ep.summary AS summary, ep.content AS content + ORDER BY m.date DESC + LIMIT 3 + ''', titles=processed[-3:]) + return [{'content': r.get('content', r.get('summary', '')), 'timestamp': r.get('date', '')} for r in rows] + + def _dedup_entities(self, extracted: list, text: str) -> list: + try: + existing = graph_store.get_entities_map() + if not existing: + return extracted + existing_list = [ + { + 'candidate_id': i, + 'name': v['name'], + 'entity_type': v.get('entity_type', ''), + 'summary': v.get('summary', '') or v.get('description', ''), + } + for i, v in enumerate(existing.values()) + ] + return resolve_entities_against_graph(extracted, existing_list, episode_content=text) + except Exception as exc: + logger.warning('Entity dedup failed, keeping all extracted: %s', exc) + return extracted + + def _dedup_facts(self, facts: list, data_dict: dict) -> list: + resolved = [] + for fact in facts: + try: + source = fact.get('source_entity_name', '') + target = fact.get('target_entity_name', '') + existing = graph_store.get_facts_between(source, target) + if not existing: + resolved.append(fact) + continue + result = resolve_facts_against_graph(fact, existing, []) + if isinstance(result, dict) and result.get('is_duplicate'): + logger.debug('Skipped duplicate fact: %s', fact.get('fact', '')) + continue + resolved.append(fact) + except Exception as exc: + logger.warning('Fact dedup failed, keeping: %s', exc) + resolved.append(fact) + return resolved + + def _finish_pipeline( + self, data_dict: dict, content_hash: str, text: str, + force: bool, interactive: bool, report: Callable, total_steps: int, + ) -> Optional[str]: + data_dict['_content_hash'] = content_hash + data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict) + + report(4, total_steps, '检查标题和日期重复') + should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive) + if should_skip: + return None + + meeting_title = data_dict.get('title', '') + meeting_date = data_dict.get('date', '') + + report(5, total_steps, '归档原始会议文本') + raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date) + data_dict['_original_text'] = text + data_dict['_original_text_path'] = raw_path + + meeting_filename = f'{graph_store.meeting_id(data_dict)}.md' + + report(6, total_steps, '合并行动项和指标状态') + data_dict['action_items'] = state_store.merge_action_items( + data_dict.get('action_items', []), meeting_title, meeting_date, meeting_filename, + ) + data_dict['metrics'] = state_store.merge_metrics( + data_dict.get('metrics', []), meeting_title, meeting_date, meeting_filename, + ) + state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename) + state_store.save() + + report(7, total_steps, '写入 Neo4j 图谱和检索数据') + graph_store.upsert_meeting_subgraph(data_dict) + + logger.info('Meeting processed: %s', meeting_title) return raw_path def _handle_duplicate(self, data_dict: dict, force: bool, interactive: bool = True) -> bool: - title = data_dict.get("title", "") - date = data_dict.get("date", "") + title = data_dict.get('title', '') + date = data_dict.get('date', '') existing = graph_store.get_meeting(title, date) - if not existing: return False - if force: - logger.info("Duplicate meeting found; overwriting in force mode: %s", title) + logger.info('Duplicate meeting found; overwriting in force mode: %s', title) self._remove_old(data_dict, existing) return False - if not interactive: - logger.info("Skipped duplicate meeting in non-interactive mode: %s", title) + logger.info('Skipped duplicate meeting in non-interactive mode: %s', title) return True - - print(f"\n发现重复会议:{title} ({date})") + print(f'\n发现重复会议:{title} ({date})') while True: - choice = input("选择 [s]跳过 / [o]覆盖(默认 s):").strip().lower() or "s" - if choice == "s": - logger.info("Skipped duplicate meeting: %s", title) + choice = input('选择 [s]跳过 / [o]覆盖(默认 s):').strip().lower() or 's' + if choice == 's': + logger.info('Skipped duplicate meeting: %s', title) return True - if choice == "o": + if choice == 'o': self._remove_old(data_dict, existing) return False - print("请输入 s 或 o。") + print('请输入 s 或 o。') def _remove_old(self, data_dict: dict, existing: Optional[dict] = None) -> None: meeting_id = graph_store.meeting_id(data_dict) graph_store.remove_meeting_subgraph(meeting_id) - - new_hash = data_dict.get("_content_hash", "") + new_hash = data_dict.get('_content_hash', '') if new_hash: state_store.remove_content_hash(new_hash) - if existing: - old_hash = existing.get("content_hash", "") + old_hash = existing.get('content_hash', '') if old_hash and old_hash != new_hash: state_store.remove_content_hash(old_hash) - - logger.info("Removed old meeting artifacts: %s", data_dict.get("title", "")) + logger.info('Removed old meeting artifacts: %s', data_dict.get('title', '')) def _compute_content_hash(self, text: str) -> str: - normalized = text.strip().replace("\r\n", "\n") - return hashlib.sha256(normalized.encode("utf-8")).hexdigest() + normalized = text.strip().replace('\r\n', '\n') + return hashlib.sha256(normalized.encode('utf-8')).hexdigest() - def _extract(self, text: str) -> Optional[MeetingExtraction]: + def _extract_monolithic(self, text: str) -> Optional[MeetingExtraction]: try: - return extract_meeting_info(text, stream=True) + return monolithic_extract(text, stream=True) except Exception as exc: - logger.error("LLM extraction failed: %s", exc) + logger.error('LLM extraction failed: %s', exc) return None def query(self, question: str, top_k: int = 3) -> str: @@ -174,10 +349,10 @@ class MeetingProcessor: def stats(self) -> dict: return { - "graph": graph_store.get_stats(), - "state": state_store.get_stats(), - "raw_dir": config.storage.raw_dir, - "state_path": config.state_path, + 'graph': graph_store.get_stats(), + 'state': state_store.get_stats(), + 'raw_dir': config.storage.raw_dir, + 'state_path': config.state_path, } diff --git a/meeting_memory/prompts/__init__.py b/meeting_memory/prompts/__init__.py new file mode 100644 index 0000000..617c89c --- /dev/null +++ b/meeting_memory/prompts/__init__.py @@ -0,0 +1,5 @@ +from .extract_nodes import extract_entities +from .extract_edges import extract_facts +from .dedupe_nodes import resolve_entities +from .dedupe_edges import resolve_facts +from .summarize_nodes import summarize_entity diff --git a/meeting_memory/prompts/dedupe_edges.py b/meeting_memory/prompts/dedupe_edges.py new file mode 100644 index 0000000..8ff439b --- /dev/null +++ b/meeting_memory/prompts/dedupe_edges.py @@ -0,0 +1,49 @@ +from typing import Any + + +def resolve_facts(context: dict[str, Any]) -> list[dict]: + existing_facts = context.get('existing_facts', []) + new_fact = context.get('new_fact', '') + invalidation_candidates = context.get('invalidation_candidates', []) + + existing_text = '\n'.join( + f' [idx={i}] {f.get("fact", "")}' for i, f in enumerate(existing_facts) + ) + + invalidation_text = '\n'.join( + f' [idx={i + len(existing_facts)}] {f.get("fact", "")}' + for i, f in enumerate(invalidation_candidates) + ) + + user_prompt = f""" +<已有事实> +{existing_text} + + +<事实失效候选> +{invalidation_text} + + +<新事实> +{new_fact} + + +注意:idx 编号是连续的——已有事实从 0 开始,失效候选紧随其后。 + +任务: +1. **重复检测**:如果<新事实>与<已有事实>中的某条描述的是完全相同的客观事实,返回该 idx。 +2. **矛盾检测**:如果<新事实>与<已有事实>或<失效候选>中的某条相互矛盾(如状态已更新、数值已变更),返回该 idx。 + +返回格式: +{{"duplicate_facts": [idx列表], "contradicted_facts": [idx列表]}} +如果没有重复或矛盾,返回空列表。 + +示例: +- 新事实:"张三负责宽带运维项目" vs 已有:"张三负责宽带运维" → 重复(相同事实) +- 新事实:"宽带用户数当前值 8500" vs 已有:"宽带用户数目标值 10000" → 不重复,不矛盾(数值维度不同) +- 新事实:"宽带用户数当前值 9000" vs 已有:"宽带用户数 8000" → 矛盾(同一指标数值更新) +""" + return [ + {'role': 'system', 'content': '你是事实去重和矛盾检测助手。判断新事实与已有事实的关系。'}, + {'role': 'user', 'content': user_prompt}, + ] diff --git a/meeting_memory/prompts/dedupe_nodes.py b/meeting_memory/prompts/dedupe_nodes.py new file mode 100644 index 0000000..a2b7152 --- /dev/null +++ b/meeting_memory/prompts/dedupe_nodes.py @@ -0,0 +1,49 @@ +from typing import Any + + +def resolve_entities(context: dict[str, Any]) -> list[dict]: + extracted = context.get('extracted_entities', []) + existing = context.get('existing_entities', []) + episode_content = context.get('episode_content', '') + + extracted_text = '\n'.join( + f' [{i}] {e.get("name", "")}({e.get("entity_type", "未知")}):{e.get("description", "")}' + for i, e in enumerate(extracted) + ) + + existing_text = '\n'.join( + f' [candidate_id={c.get("candidate_id", i)}] {c.get("name", "")}({c.get("entity_type", "未知")}):{c.get("summary", "")[:100]}' + for i, c in enumerate(existing) + ) + + user_prompt = f""" +<当前会议内容> +{episode_content} + + +<新抽取的实体> +{extracted_text} + + +<图谱中已有的实体> +{existing_text} + + +任务:判断<新抽取的实体>中的每一个是否与<图谱中已有的实体>中的某个是同一个真实世界对象。 + +判断标准: +- **是重复**:两个名称指向同一个真实世界的人、组织、地点、项目、指标等。 +- **不是重复**:名称相似但指向不同实体(如两个同名但不同的人、同名的不同项目)。 + +对每个新抽取的实体,返回: + - id: 对应新抽取实体列表中的序号 + - name: 实体的最佳名称(优先使用已有实体中的更完整名称) + - duplicate_candidate_id: 匹配到的已有实体的 candidate_id,如果无匹配则填 -1 + +返回格式 JSON 数组:[{{"id": 0, "name": "张三", "duplicate_candidate_id": -1}}, ...] +必须为新抽取的每个实体返回一条记录。id 从 0 开始连续编号。 +""" + return [ + {'role': 'system', 'content': '你是实体去重助手。判断两个实体是否指向同一个真实世界对象。'}, + {'role': 'user', 'content': user_prompt}, + ] diff --git a/meeting_memory/prompts/extract_edges.py b/meeting_memory/prompts/extract_edges.py new file mode 100644 index 0000000..ae79bd6 --- /dev/null +++ b/meeting_memory/prompts/extract_edges.py @@ -0,0 +1,59 @@ +from typing import Any + + +def extract_facts(context: dict[str, Any]) -> list[dict]: + previous = context.get('previous_episodes', []) + current = context.get('episode_content', '') + entities = context.get('entities', []) + reference_time = context.get('reference_time', '') + + previous_section = '' + if previous: + import json + previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n\n' + + entities_text = '\n'.join( + f' [{i}] {e.get("name", "")}({e.get("entity_type", "未知")})' for i, e in enumerate(entities) + ) + + user_prompt = f""" +{previous_section} +<当前会议内容> +{current} + + +<已抽取实体> +{entities_text} + + +<参考时间> +{reference_time} + + +抽取规则: +1. 从<当前会议内容>中抽取上述<已抽取实体>之间的**事实关系**。 +2. 每条关系必须涉及两个**不同**的实体。 +3. 返回 JSON 数组,格式: + [{{ + "source_entity_name": "源实体名称(必须来自上方的实体列表)", + "target_entity_name": "目标实体名称(必须来自上方的实体列表)", + "relation_type": "关系类型,如 负责、汇报、隶属于、参与、目标值、截止于、影响、依赖于", + "fact": "一句自然语言的事实描述,保留原文中所有具体细节(数值、时间、地点等)", + "valid_at": "该事实开始成立的时间(ISO 8601格式,如 2025-04-30T00:00:00Z),不明确则留空", + "invalid_at": "该事实不再成立的时间,不明确则留空", + "evidence": "原文中的关键证据短句", + "qualifiers": ["限定条件列表,如数值、范围、状态、截止时间等"], + "confidence": 置信度0到1之间 + }}] + +4. relation_type 避免使用"关联""涉及"等空泛词,优先使用具体谓词: + 负责、汇报、目标值、当前值、低于、高于、要求、督导、推进、支撑、依赖、计划、完成、截止于、参与、隶属于、分管、协调、审批 + +5. fact 必须是一句完整的自然语言事实,保留所有具体信息(人名、数值、产品名、地点等)。 + +6. 如果根据上下文可以判断事实的开始/结束时间,填入 valid_at / invalid_at。 +""" + return [ + {'role': 'system', 'content': '你是一个专业的事实关系抽取专家。从会议记录中抽取实体间的结构化事实关系。'}, + {'role': 'user', 'content': user_prompt}, + ] diff --git a/meeting_memory/prompts/extract_nodes.py b/meeting_memory/prompts/extract_nodes.py new file mode 100644 index 0000000..c231508 --- /dev/null +++ b/meeting_memory/prompts/extract_nodes.py @@ -0,0 +1,53 @@ +from typing import Any + + +SYSTEM_PROMPT = ( + '你是会议纪要实体抽取专家。' + '从会议记录中抽取明确的实体节点,包括人物、组织、地点、项目、指标等。' + '不要抽取抽象概念、情感、时间日期或泛泛的名词。' +) + + +def extract_entities(context: dict[str, Any]) -> list[dict]: + previous = context.get('previous_episodes', []) + current = context.get('episode_content', '') + entity_types = context.get('entity_types', []) + + entity_types_section = '' + if entity_types: + entity_types_section = '\n'.join( + f' - {t["type"]}: {t["description"]}' for t in entity_types + ) + else: + entity_types_section = ' - 未限定类型,请根据上下文自行判断' + + previous_section = '' + if previous: + import json + previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n\n' + + user_prompt = f""" +{previous_section} +<当前会议内容> +{current} + + +<实体类型> +{entity_types_section} + + +抽取规则: +1. 只抽取当前会议内容中**明确提及**的实体。 +2. 每个实体必须是有唯一标识的具体事物——人名、组织名、地名、项目名、指标名称等。 +3. 不要抽取:代词(他、她、它、这、那)、抽象概念(增长、改善、风险)、时间日期。 +4. 如果同一实体在不同来源中以不同名称出现(如简称/全称),保留最完整的形式。 +5. 必须返回 JSON 数组,格式:[{{"name": "实体名称", "entity_type": "类型", "description": "描述", "evidence": "原文证据"}}] +6. description 写一段对该实体的简要描述(20字以内)。 +7. evidence 从原文中摘录提及该实体的关键短句。 + +注意:实体类型建议使用 Person(人物)、Organization(组织)、Location(地点)、Project(项目)、Metric(指标)、System(系统)、Document(文档)等。 +""" + return [ + {'role': 'system', 'content': SYSTEM_PROMPT}, + {'role': 'user', 'content': user_prompt}, + ] diff --git a/meeting_memory/prompts/summarize_nodes.py b/meeting_memory/prompts/summarize_nodes.py new file mode 100644 index 0000000..51f03c1 --- /dev/null +++ b/meeting_memory/prompts/summarize_nodes.py @@ -0,0 +1,41 @@ +from typing import Any + + +def summarize_entity(context: dict[str, Any]) -> list[dict]: + entity_name = context.get('entity_name', '') + existing_summary = context.get('existing_summary', '') + episodes = context.get('episodes', []) + previous = context.get('previous_episodes', []) + + existing_section = '' + if existing_summary: + existing_section = f'\n<已有摘要>\n{existing_summary}\n\n' + + previous_section = '' + if previous: + import json + previous_section = f'\n<历史内容>\n{json.dumps(previous, ensure_ascii=False)}\n\n' + + episodes_text = '\n---\n'.join(episodes) if isinstance(episodes, list) else episodes + + user_prompt = f""" +{previous_section} +<当前内容> +{episodes_text} + +{existing_section} +为实体 **{entity_name}** 生成一段信息密集的摘要。 + +规则: +1. 只使用<当前内容>和<已有摘要>中的事实。不要推测。 +2. 保留所有实质性的人名、角色、地点、日期、数值。 +3. 用第三人称直接陈述事实。 +4. 不要使用"提及了""讨论了""指出"等元语言动词。直接陈述事实。 +5. 如果会议对已有信息做了更新,采用更新的说法。 +6. 摘要不超过 500 字。 +7. 返回 JSON:{{"summary": "摘要内容"}} +""" + return [ + {'role': 'system', 'content': '你是实体摘要助手。根据会议内容为实体生成信息密集的摘要。'}, + {'role': 'user', 'content': user_prompt}, + ]