测试版本1

main
Bifang 2026-06-12 10:57:32 +08:00
parent 0ec08185f9
commit b7d4cc8782
9 changed files with 1191 additions and 525 deletions

View File

@ -2,12 +2,17 @@ import json
import logging
import re
import sys
from typing import List, Optional
from typing import Any, List, Optional
from openai import OpenAI
from pydantic import BaseModel, Field
from meeting_memory.config import config
from meeting_memory.prompts import extract_entities as prompt_extract_entities
from meeting_memory.prompts import extract_facts as prompt_extract_facts
from meeting_memory.prompts import resolve_entities as prompt_dedupe_nodes
from meeting_memory.prompts import resolve_facts as prompt_dedupe_edges
from meeting_memory.prompts import summarize_entity as prompt_summarize
logger = logging.getLogger(__name__)
@ -20,49 +25,49 @@ client = OpenAI(
class Entity(BaseModel):
name: str
entity_type: str
description: str = ""
description: str = ''
class Relation(BaseModel):
subject: str
subject_type: str
subject_type: str = ''
predicate: str
object: str
object_type: str
description: str = ""
fact: str = ""
object_type: str = ''
description: str = ''
fact: str = ''
qualifiers: List[str] = Field(default_factory=list)
evidence: str = ""
evidence: str = ''
confidence: float = 0.0
valid_at: str = ""
invalid_at: str = ""
valid_at: str = ''
invalid_at: str = ''
class ActionItem(BaseModel):
task: str
assignee: str = ""
deadline: str = ""
status: str = "待办"
priority: str = ""
assignee: str = ''
deadline: str = ''
status: str = '待办'
priority: str = ''
class Decision(BaseModel):
content: str
proposer: str = ""
status: str = "已决"
proposer: str = ''
status: str = '已决'
class MeetingMetric(BaseModel):
metric_name: str
value: str
target: str = ""
owner: str = ""
trend: str = ""
target: str = ''
owner: str = ''
trend: str = ''
class MeetingExtraction(BaseModel):
title: str
date: str = ""
date: str = ''
participants: List[str] = Field(default_factory=list)
agenda: List[str] = Field(default_factory=list)
entities: List[Entity] = Field(default_factory=list)
@ -70,15 +75,281 @@ class MeetingExtraction(BaseModel):
action_items: List[ActionItem] = Field(default_factory=list)
decisions: List[Decision] = Field(default_factory=list)
metrics: List[MeetingMetric] = Field(default_factory=list)
summary: str = ""
summary: str = ''
def _call_llm(
messages: list[dict],
response_model: type | None = None,
stream: bool = False,
max_tokens: int | None = None,
) -> Any:
kwargs = {
'model': config.llm.model,
'messages': messages,
'max_tokens': max_tokens or config.llm.max_tokens,
'temperature': config.llm.temperature,
}
if response_model is not None:
kwargs['response_format'] = {'type': 'json_object'}
if stream:
kwargs['stream'] = True
if not stream:
response = client.chat.completions.create(**kwargs)
content = response.choices[0].message.content
if content is None:
raise ValueError('LLM returned empty response')
return content
kwargs['stream'] = True
response = client.chat.completions.create(**kwargs)
chunks: List[str] = []
print('\n[LLM] 开始流式输出:')
for event in response:
if not event.choices:
continue
delta = event.choices[0].delta.content
if not delta:
continue
chunks.append(delta)
sys.stdout.write(delta)
sys.stdout.flush()
print('\n[LLM] 输出结束')
return ''.join(chunks)
def _try_parse_json(content: str) -> dict | list:
try:
return json.loads(content)
except json.JSONDecodeError:
logger.warning('JSON parsing failed; trying to repair extracted block')
match = re.search(r'\{.*\}|\[.*\]', content, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError as exc:
logger.error('Repaired JSON still failed to parse: %s', exc)
raise
def _normalize_string(name: str) -> str:
return re.sub(r'[\s]+', ' ', name.strip().lower())
def _format_episodes_for_context(episodes: list[dict] | None) -> str:
if not episodes:
return ''
return '\n'.join(
f'[Episode {i}] {ep.get("content", "")}'
for i, ep in enumerate(episodes)
)
# ===== Step 1: 实体节点抽取 =====
def extract_entities_from_text(
text: str,
previous_episodes: list[dict] | None = None,
entity_types: list[dict] | None = None,
stream: bool = False,
) -> list[dict]:
context = {
'episode_content': text,
'previous_episodes': previous_episodes or [],
'entity_types': entity_types or [],
}
messages = prompt_extract_entities(context)
content = _call_llm(messages, stream=stream)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.error('Failed to parse entity extraction result: %s', exc)
return []
if isinstance(data, dict):
data = data.get('entities', data.get('extracted_entities', []))
if not isinstance(data, list):
return []
result = []
for item in data:
if isinstance(item, dict) and item.get('name', '').strip():
result.append({
'name': item['name'].strip(),
'entity_type': item.get('entity_type', 'Entity'),
'description': item.get('description', ''),
'evidence': item.get('evidence', ''),
})
return result
# ===== Step 2: 实体去重 =====
def resolve_entities_against_graph(
extracted: list[dict],
existing: list[dict],
episode_content: str = '',
) -> list[dict]:
if not existing:
return extracted
context = {
'extracted_entities': extracted,
'existing_entities': existing,
'episode_content': episode_content,
}
messages = prompt_dedupe_nodes(context)
content = _call_llm(messages)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.warning('LLM dedup failed, keeping all extracted: %s', exc)
return extracted
if isinstance(data, dict):
data = data.get('entity_resolutions', data.get('resolutions', []))
extracted_by_id = {i: e for i, e in enumerate(extracted)}
existing_by_id = {c.get('candidate_id'): c for c in existing}
for resolution in (data if isinstance(data, list) else []):
if not isinstance(resolution, dict):
continue
rid = resolution.get('id')
dup_id = resolution.get('duplicate_candidate_id', -1)
if rid is None or rid not in extracted_by_id:
continue
if dup_id >= 0 and dup_id in existing_by_id:
extracted_by_id[rid]['_resolved_to'] = existing_by_id[dup_id]
extracted_by_id[rid]['name'] = resolution.get('name', extracted_by_id[rid]['name'])
return [e for e in extracted_by_id.values() if '_resolved_to' not in e]
# ===== Step 3: 事实关系抽取 =====
def extract_facts_from_text(
text: str,
entities: list[dict],
reference_time: str = '',
previous_episodes: list[dict] | None = None,
stream: bool = False,
) -> list[dict]:
if len(entities) < 2:
return []
context = {
'episode_content': text,
'entities': entities,
'reference_time': reference_time,
'previous_episodes': previous_episodes or [],
}
messages = prompt_extract_facts(context)
content = _call_llm(messages, stream=stream)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.error('Failed to parse fact extraction result: %s', exc)
return []
if isinstance(data, dict):
data = data.get('edges', data.get('facts', data.get('relations', [])))
if not isinstance(data, list):
return []
entity_names = {_normalize_string(e.get('name', '')) for e in entities}
result = []
for item in data:
if not isinstance(item, dict):
continue
src = _normalize_string(item.get('source_entity_name', ''))
tgt = _normalize_string(item.get('target_entity_name', ''))
if src not in entity_names or tgt not in entity_names:
continue
if src == tgt:
continue
result.append({
'source_entity_name': item['source_entity_name'],
'target_entity_name': item['target_entity_name'],
'relation_type': item.get('relation_type', '关联'),
'fact': item.get('fact', ''),
'valid_at': item.get('valid_at', ''),
'invalid_at': item.get('invalid_at', ''),
'evidence': item.get('evidence', ''),
'qualifiers': item.get('qualifiers', []),
'confidence': item.get('confidence', 0.0),
})
return result
# ===== Step 4: 事实去重/矛盾检测 =====
def resolve_facts_against_graph(
new_fact: dict,
existing_facts: list[dict],
invalidation_candidates: list[dict],
) -> dict:
if not existing_facts:
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
context = {
'new_fact': new_fact.get('fact', ''),
'existing_facts': existing_facts,
'invalidation_candidates': invalidation_candidates,
}
messages = prompt_dedupe_edges(context)
content = _call_llm(messages)
try:
data = _try_parse_json(content)
except Exception as exc:
logger.warning('Fact dedup failed, treating as new: %s', exc)
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
if not isinstance(data, dict):
return {'is_duplicate': False, 'is_contradicted': False, 'resolved': new_fact}
return {
'is_duplicate': len(data.get('duplicate_facts', [])) > 0,
'is_contradicted': len(data.get('contradicted_facts', [])) > 0,
'resolved': new_fact,
'duplicate_facts': data.get('duplicate_facts', []),
'contradicted_facts': data.get('contradicted_facts', []),
}
# ===== Step 5: 实体摘要 =====
def extract_entity_summary(
entity_name: str,
episodes: list[str],
existing_summary: str = '',
previous_episodes: list[dict] | None = None,
) -> str:
context = {
'entity_name': entity_name,
'episodes': episodes,
'existing_summary': existing_summary,
'previous_episodes': previous_episodes or [],
}
messages = prompt_summarize(context)
content = _call_llm(messages, max_tokens=1024)
try:
data = _try_parse_json(content)
except Exception:
logger.warning('Failed to parse summary, using empty')
return ''
if isinstance(data, dict):
return data.get('summary', '')
return ''
# ===== 统一入口(兼容原有接口) =====
EXTRACTION_SYSTEM_PROMPT = """
你是一个专业的会议知识抽取助手你的任务是从中文会议记录中抽取结构化事实尤其要抽出更细粒度更有语义深度的关系
输出要求
1. 只输出一个 JSON 对象不要输出解释文字
2. 关系抽取不要停留在部门汇报了工作这种浅层描述要尽可能向下细化到
2. 关系抽取不要停留在"部门汇报了工作"这种浅层描述要尽可能向下细化到
- 责任归属
- 目标值 / 当前值 / 趋势
- 约束条件
@ -99,51 +370,9 @@ EXTRACTION_SYSTEM_PROMPT = """
"""
def _call_llm(system: str, user: str, stream: bool = False) -> str:
if not stream:
response = client.chat.completions.create(
model=config.llm.model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
max_tokens=config.llm.max_tokens,
temperature=config.llm.temperature,
)
content = response.choices[0].message.content
if content is None:
raise ValueError("LLM returned empty response")
return content
response = client.chat.completions.create(
model=config.llm.model,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
max_tokens=config.llm.max_tokens,
temperature=config.llm.temperature,
stream=True,
)
chunks: List[str] = []
print("\n[LLM] 开始抽取,流式输出中:")
for event in response:
if not event.choices:
continue
delta = event.choices[0].delta.content
if not delta:
continue
chunks.append(delta)
sys.stdout.write(delta)
sys.stdout.flush()
print("\n[LLM] 抽取输出结束")
return "".join(chunks)
def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction:
user_prompt = f"""
请从下面会议记录中提取结构化信息并重点做深层关系抽取
请从下面会议记录中提取结构化信息并重点做"深层关系抽取"
输出 JSON 字段
- title
@ -170,9 +399,9 @@ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction:
- summary
关系抽取规则
1. 不要只抽汇报了工作这种会议动作要尽量继续下钻出具体事实
2. 如果一句话里同时包含主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势应拆成多条关系或在 qualifiers 中保留这些细节
3. 对于要求部署负责依赖影响约束目标风险类信息优先保留
1. 不要只抽"汇报了工作"这种会议动作要尽量继续下钻出具体事实
2. 如果一句话里同时包含"主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势"应拆成多条关系或在 qualifiers 中保留这些细节
3. 对于"要求、部署、负责、依赖、影响、约束、目标、风险"类信息优先保留
4. fact 必须是一句完整自然可检索的事实描述
5. qualifiers 用于补充数值范围状态条件截止时间优先级等信息
6. evidence 用原文中的关键词短句不要太长
@ -181,54 +410,42 @@ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction:
会议记录如下
{text}
"""
content = _call_llm(EXTRACTION_SYSTEM_PROMPT, user_prompt, stream=stream)
content = _call_llm([
{'role': 'system', 'content': EXTRACTION_SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt},
], stream=stream)
data = _try_parse_json(content)
data = _normalize_meeting_data(data)
return MeetingExtraction(**data)
def _try_parse_json(content: str) -> dict:
try:
return json.loads(content)
except json.JSONDecodeError:
logger.warning("JSON parsing failed; trying to repair extracted block")
match = re.search(r"\{.*\}", content, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError as exc:
logger.error("Repaired JSON still failed to parse: %s", exc)
raise
def _normalize_meeting_data(data: dict) -> dict:
if not isinstance(data, dict):
return {}
return {
"title": _as_str(data.get("title")),
"date": _as_str(data.get("date")),
"participants": _as_str_list(data.get("participants")),
"agenda": _as_str_list(data.get("agenda")),
"entities": _normalize_entities(data.get("entities")),
"relations": _normalize_relations(data.get("relations")),
"action_items": _normalize_action_items(data.get("action_items")),
"decisions": _normalize_decisions(data.get("decisions")),
"metrics": _normalize_metrics(data.get("metrics")),
"summary": _as_str(data.get("summary")),
'title': _as_str(data.get('title')),
'date': _as_str(data.get('date')),
'participants': _as_str_list(data.get('participants')),
'agenda': _as_str_list(data.get('agenda')),
'entities': _normalize_entities(data.get('entities')),
'relations': _normalize_relations(data.get('relations')),
'action_items': _normalize_action_items(data.get('action_items')),
'decisions': _normalize_decisions(data.get('decisions')),
'metrics': _normalize_metrics(data.get('metrics')),
'summary': _as_str(data.get('summary')),
}
def _as_str(value) -> str:
if value is None:
return ""
return ''
if isinstance(value, str):
return value
return str(value)
def _as_float(value) -> float:
if value is None or value == "":
if value is None or value == '':
return 0.0
try:
numeric = float(value)
@ -244,7 +461,7 @@ def _as_str_list(value) -> List[str]:
key_text = _as_str(key)
value_text = _as_str(item)
if key_text and value_text:
items.append(f"{key_text}: {value_text}")
items.append(f'{key_text}: {value_text}')
elif key_text:
items.append(key_text)
elif value_text:
@ -262,50 +479,42 @@ def _normalize_entities(value) -> List[dict]:
for entity in value:
if not isinstance(entity, dict):
continue
items.append(
{
"name": _as_str(entity.get("name")),
"entity_type": _as_str(entity.get("entity_type")),
"description": _as_str(entity.get("description")),
}
)
items.append({
'name': _as_str(entity.get('name')),
'entity_type': _as_str(entity.get('entity_type')),
'description': _as_str(entity.get('description')),
})
return items
def _normalize_relations(value) -> List[dict]:
if not isinstance(value, list):
return []
items = []
for relation in value:
if not isinstance(relation, dict):
continue
subject = _as_str(relation.get("subject"))
predicate = _as_str(relation.get("predicate"))
obj = _as_str(relation.get("object"))
description = _as_str(relation.get("description"))
fact = _as_str(relation.get("fact"))
subject = _as_str(relation.get('subject'))
predicate = _as_str(relation.get('predicate'))
obj = _as_str(relation.get('object'))
description = _as_str(relation.get('description'))
fact = _as_str(relation.get('fact'))
if not fact and subject and predicate and obj:
fact = f"{subject} {predicate} {obj}"
items.append(
{
"subject": subject,
"subject_type": _as_str(relation.get("subject_type")),
"predicate": predicate,
"object": obj,
"object_type": _as_str(relation.get("object_type")),
"description": description,
"fact": fact,
"qualifiers": _as_str_list(relation.get("qualifiers")),
"evidence": _as_str(relation.get("evidence")),
"confidence": _as_float(relation.get("confidence")),
"valid_at": _as_str(relation.get("valid_at")),
"invalid_at": _as_str(relation.get("invalid_at")),
}
)
fact = f'{subject} {predicate} {obj}'
items.append({
'subject': subject,
'subject_type': _as_str(relation.get('subject_type')),
'predicate': predicate,
'object': obj,
'object_type': _as_str(relation.get('object_type')),
'description': description,
'fact': fact,
'qualifiers': _as_str_list(relation.get('qualifiers')),
'evidence': _as_str(relation.get('evidence')),
'confidence': _as_float(relation.get('confidence')),
'valid_at': _as_str(relation.get('valid_at')),
'invalid_at': _as_str(relation.get('invalid_at')),
})
return items
@ -316,15 +525,13 @@ def _normalize_action_items(value) -> List[dict]:
for action in value:
if not isinstance(action, dict):
continue
items.append(
{
"task": _as_str(action.get("task")),
"assignee": _as_str(action.get("assignee")),
"deadline": _as_str(action.get("deadline")),
"status": _as_str(action.get("status")) or "待办",
"priority": _as_str(action.get("priority")) or "",
}
)
items.append({
'task': _as_str(action.get('task')),
'assignee': _as_str(action.get('assignee')),
'deadline': _as_str(action.get('deadline')),
'status': _as_str(action.get('status')) or '待办',
'priority': _as_str(action.get('priority')) or '',
})
return items
@ -335,13 +542,11 @@ def _normalize_decisions(value) -> List[dict]:
for decision in value:
if not isinstance(decision, dict):
continue
items.append(
{
"content": _as_str(decision.get("content")),
"proposer": _as_str(decision.get("proposer")),
"status": _as_str(decision.get("status")) or "已决",
}
)
items.append({
'content': _as_str(decision.get('content')),
'proposer': _as_str(decision.get('proposer')),
'status': _as_str(decision.get('status')) or '已决',
})
return items
@ -352,13 +557,11 @@ def _normalize_metrics(value) -> List[dict]:
for metric in value:
if not isinstance(metric, dict):
continue
items.append(
{
"metric_name": _as_str(metric.get("metric_name")),
"value": _as_str(metric.get("value")),
"target": _as_str(metric.get("target")),
"owner": _as_str(metric.get("owner")),
"trend": _as_str(metric.get("trend")),
}
)
items.append({
'metric_name': _as_str(metric.get('metric_name')),
'value': _as_str(metric.get('value')),
'target': _as_str(metric.get('target')),
'owner': _as_str(metric.get('owner')),
'trend': _as_str(metric.get('trend')),
})
return items

View File

@ -3,6 +3,7 @@ import json
import logging
import re
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from meeting_memory.config import config
@ -14,7 +15,6 @@ logger = logging.getLogger(__name__)
def _cosine_similarity(left: List[float], right: List[float]) -> float:
if not left or not right or len(left) != len(right):
return 0.0
dot = sum(a * b for a, b in zip(left, right))
left_norm = sum(a * a for a in left) ** 0.5
right_norm = sum(b * b for b in right) ** 0.5
@ -24,7 +24,7 @@ def _cosine_similarity(left: List[float], right: List[float]) -> float:
def _keyword_score(text: str, question: str) -> float:
source = (text or "").lower()
source = (text or '').lower()
terms = _keyword_terms(question)
if not source or not terms:
return 0.0
@ -33,21 +33,19 @@ def _keyword_score(text: str, question: str) -> float:
def _keyword_terms(text: str) -> List[str]:
normalized = (text or "").lower()
raw_terms = re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]{2,}", normalized)
stopwords = {"是什么", "多少", "分别", "以及", "还有", "当前值", "目标值"}
normalized = (text or '').lower()
raw_terms = re.findall(r'[a-z0-9]+|[\u4e00-\u9fff]{2,}', normalized)
stopwords = {'是什么', '多少', '分别', '以及', '还有', '当前值', '目标值'}
terms: List[str] = []
for raw in raw_terms:
if raw in stopwords:
continue
if raw not in terms:
terms.append(raw)
if re.fullmatch(r"[\u4e00-\u9fff]{4,}", raw):
if re.fullmatch(r'[\u4e00-\u9fff]{4,}', raw):
for size in (2, 3, 4):
for idx in range(0, len(raw) - size + 1):
piece = raw[idx : idx + size]
piece = raw[idx: idx + size]
if piece not in stopwords and piece not in terms:
terms.append(piece)
return terms
@ -64,22 +62,20 @@ class Neo4jGraphStore:
def _connect(self):
if not config.neo4j.enabled:
logger.info("Neo4j graph store disabled")
logger.info('Neo4j graph store disabled')
return
try:
from neo4j import GraphDatabase
except ImportError:
logger.warning("neo4j package is not installed")
logger.warning('neo4j package is not installed')
return
if not config.neo4j.password:
logger.warning("Neo4j is enabled but NEO4J_PASSWORD is empty")
logger.warning('Neo4j is enabled but NEO4J_PASSWORD is empty')
return
tried_uris = [self._uri]
if self._uri.startswith("neo4j://"):
tried_uris.append("bolt://" + self._uri[len("neo4j://") :])
if self._uri.startswith('neo4j://'):
tried_uris.append('bolt://' + self._uri[len('neo4j://'):])
for uri in tried_uris:
driver = None
@ -94,16 +90,15 @@ class Neo4jGraphStore:
self._enabled = True
self._last_failure_at = 0.0
if uri != config.neo4j.uri:
logger.warning("Neo4j routing URI unavailable; fell back to %s", uri)
logger.warning('Neo4j routing URI unavailable; fell back to %s', uri)
return
except Exception as exc:
logger.warning("Neo4j connection failed for %s: %s", uri, exc)
logger.warning('Neo4j connection failed for %s: %s', uri, exc)
try:
driver.close()
except Exception:
pass
self._mark_unavailable("Neo4j is currently unreachable")
self._mark_unavailable('Neo4j is currently unreachable')
@property
def enabled(self) -> bool:
@ -114,9 +109,9 @@ class Neo4jGraphStore:
def _should_retry_connect(self) -> bool:
return (time.time() - self._last_failure_at) >= self._retry_cooldown_seconds
def _mark_unavailable(self, reason: str = "") -> None:
def _mark_unavailable(self, reason: str = '') -> None:
if reason:
logger.warning("Neo4j temporarily disabled: %s", reason)
logger.warning('Neo4j temporarily disabled: %s', reason)
self._enabled = False
self._last_failure_at = time.time()
if self._driver is not None:
@ -128,10 +123,10 @@ class Neo4jGraphStore:
@staticmethod
def meeting_id(meeting_data: dict) -> str:
title = meeting_data.get("title", "")
date = meeting_data.get("date", "")
raw = f"{date}_{title}"
return f"meeting_{hashlib.md5(raw.encode('utf-8')).hexdigest()[:12]}"
title = meeting_data.get('title', '')
date = meeting_data.get('date', '')
raw = f'{date}_{title}'
return f'meeting_{hashlib.md5(raw.encode("utf-8")).hexdigest()[:12]}'
def close(self):
if self._driver is not None:
@ -140,73 +135,166 @@ class Neo4jGraphStore:
def run_query(self, query: str, **params) -> List[Dict[str, Any]]:
if not self.enabled:
return []
try:
with self._driver.session(database=config.neo4j.database) as session:
result = session.run(query, **params)
return [record.data() for record in result]
except Exception as exc:
logger.warning("Neo4j query failed: %s", exc)
logger.warning('Neo4j query failed: %s', exc)
self._mark_unavailable(str(exc))
return []
def initialize_schema(self):
if not self.enabled:
return
statements = [
"CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE",
"CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE",
"CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE",
"CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE",
"CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)",
"CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)",
"CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)",
"CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)",
'CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE',
'CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE',
'CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE',
'CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE',
'CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)',
'CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)',
'CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)',
'CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)',
]
for statement in statements:
self.run_query(statement)
def get_stats(self) -> Dict[str, Any]:
if not self.enabled:
return {"enabled": False}
rows = self.run_query(
"""
CALL () {
MATCH (m:Meeting)
RETURN count(m) AS meetings
}
CALL () {
MATCH (ep:Episode)
RETURN count(ep) AS episodes
}
CALL () {
MATCH (e:Entity)
RETURN count(e) AS entities
}
CALL () {
MATCH (f:Fact)
RETURN count(f) AS facts
}
return {'enabled': False}
rows = self.run_query('''
CALL () { MATCH (m:Meeting) RETURN count(m) AS meetings }
CALL () { MATCH (ep:Episode) RETURN count(ep) AS episodes }
CALL () { MATCH (e:Entity) RETURN count(e) AS entities }
CALL () { MATCH (f:Fact) RETURN count(f) AS facts }
RETURN meetings, episodes, entities, facts
"""
)
''')
if not rows:
return {"enabled": False, "meetings": 0, "episodes": 0, "entities": 0, "facts": 0}
return {"enabled": True, **rows[0]}
return {'enabled': False, 'meetings': 0, 'episodes': 0, 'entities': 0, 'facts': 0}
return {'enabled': True, **rows[0]}
# ==================== Entity Dedup (from Graphiti) ====================
def find_similar_entities(
self, name: str, threshold: float = 0.6, limit: int = 15
) -> List[Dict[str, Any]]:
if not self.enabled or not name.strip():
return []
query_embedding = embedding_service.embed_text(name)
rows = self.run_query('''
MATCH (e:Entity)
RETURN e.name AS name,
e.entity_type AS entity_type,
e.summary AS summary,
e.description AS description,
e.name_embedding AS name_embedding
''')
scored = []
for row in rows:
score = _cosine_similarity(query_embedding, row.get('name_embedding', []))
if score >= threshold:
scored.append({
'candidate_id': len(scored),
'name': row.get('name', ''),
'entity_type': row.get('entity_type', ''),
'summary': row.get('summary', '') or row.get('description', ''),
'score': score,
})
scored.sort(key=lambda r: r['score'], reverse=True)
return scored[:limit]
def get_entities_map(self) -> Dict[str, Dict[str, Any]]:
rows = self.run_query('''
MATCH (e:Entity)
RETURN e.name AS name,
e.entity_type AS entity_type,
e.summary AS summary,
e.description AS description
''')
return {r['name']: r for r in rows if r.get('name')}
# ==================== Edge Dedup / Resolution (from Graphiti) ====================
def get_facts_between(self, source_name: str, target_name: str) -> List[Dict[str, Any]]:
return self.run_query('''
MATCH (s:Entity {name: $source_name})-[source_rel:FACT_SOURCE]->(f:Fact)
WHERE (f)-[:FACT_TARGET]->(:Entity {name: $target_name})
RETURN f.fact_id AS fact_id,
f.fact AS fact,
f.predicate AS predicate,
f.description AS description,
f.qualifiers AS qualifiers,
f.confidence AS confidence,
f.valid_at AS valid_at,
f.invalid_at AS invalid_at,
f.expired_at AS expired_at,
f.meeting_id AS meeting_id
ORDER BY coalesce(f.valid_at, '') DESC
''', source_name=source_name, target_name=target_name)
def search_related_facts(
self, fact_text: str, group_id: str = '', limit: int = 10
) -> List[Dict[str, Any]]:
if not fact_text.strip():
return []
query_embedding = embedding_service.embed_text(fact_text)
rows = self.run_query('''
MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact)
OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f)
OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity)
RETURN f.fact_id AS fact_id,
f.fact AS fact,
f.predicate AS predicate,
f.description AS description,
f.fact_embedding AS fact_embedding,
f.valid_at AS valid_at,
f.invalid_at AS invalid_at,
f.expired_at AS expired_at,
coalesce(s.name, '') AS source_name,
coalesce(o.name, '') AS target_name
''')
scored = []
for row in rows:
score = _cosine_similarity(query_embedding, row.get('fact_embedding', []))
if score > 0.3:
scored.append({
'idx': len(scored),
'fact_id': row.get('fact_id', ''),
'fact': row.get('fact', ''),
'predicate': row.get('predicate', ''),
'description': row.get('description', ''),
'source_name': row.get('source_name', ''),
'target_name': row.get('target_name', ''),
'valid_at': row.get('valid_at', ''),
'invalid_at': row.get('invalid_at', ''),
'expired_at': row.get('expired_at', ''),
'score': score,
})
scored.sort(key=lambda r: r['score'], reverse=True)
return scored[:limit]
def mark_fact_expired(self, fact_id: str, expired_at: str | None = None):
if not expired_at:
expired_at = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
self.run_query('''
MATCH (f:Fact {fact_id: $fact_id})
SET f.expired_at = $expired_at,
f.invalid_at = $expired_at,
f.updated_at = datetime()
''', fact_id=fact_id, expired_at=expired_at)
# ==================== Core Write Operations ====================
def upsert_meeting_subgraph(self, meeting_data: dict) -> None:
if not self.enabled:
return
meeting_id = meeting_data.get("_graph_meeting_id") or self.meeting_id(meeting_data)
meeting_id = meeting_data.get('_graph_meeting_id') or self.meeting_id(meeting_data)
episode_text = self._build_episode_text(meeting_data)
episode_embedding = embedding_service.embed_text(episode_text)
self.initialize_schema()
self.run_query(
"""
self.run_query('''
MERGE (m:Meeting {meeting_id: $meeting_id})
SET m.title = $title,
m.date = $date,
@ -225,39 +313,37 @@ class Neo4jGraphStore:
ep.content_embedding = $content_embedding,
ep.updated_at = datetime()
MERGE (m)-[:HAS_EPISODE]->(ep)
""",
''',
meeting_id=meeting_id,
title=meeting_data.get("title", ""),
date=meeting_data.get("date", ""),
summary=meeting_data.get("summary", ""),
content_hash=meeting_data.get("_content_hash", ""),
raw_path=meeting_data.get("_original_text_path", ""),
title=meeting_data.get('title', ''),
date=meeting_data.get('date', ''),
summary=meeting_data.get('summary', ''),
content_hash=meeting_data.get('_content_hash', ''),
raw_path=meeting_data.get('_original_text_path', ''),
content=episode_text,
participants=meeting_data.get("participants", []),
participants=meeting_data.get('participants', []),
content_embedding=episode_embedding,
)
for entity in meeting_data.get("entities", []):
for entity in meeting_data.get('entities', []):
self._upsert_entity(meeting_id, entity)
for participant in meeting_data.get("participants", []):
for participant in meeting_data.get('participants', []):
self._upsert_entity(
meeting_id,
{"name": participant, "entity_type": "participant", "description": ""},
{'name': participant, 'entity_type': 'participant', 'description': ''},
)
for relation in meeting_data.get("relations", []):
self._upsert_relation(meeting_id, relation, meeting_data.get("date", ""))
for relation in meeting_data.get('relations', []):
self._upsert_relation(meeting_id, relation, meeting_data.get('date', ''))
def _upsert_entity(self, meeting_id: str, entity: dict) -> None:
name = entity.get("name", "").strip()
name = entity.get('name', '').strip()
if not name:
return
summary = self._entity_summary(entity)
name_embedding = embedding_service.embed_text(summary or name)
self.run_query(
"""
self.run_query('''
MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id})
MERGE (e:Entity {name: $name})
SET e.entity_type = CASE
@ -278,47 +364,38 @@ class Neo4jGraphStore:
END,
e.updated_at = datetime()
MERGE (ep)-[:MENTIONS]->(e)
""",
''',
meeting_id=meeting_id,
name=name,
entity_type=entity.get("entity_type", ""),
description=entity.get("description", ""),
entity_type=entity.get('entity_type', ''),
description=entity.get('description', ''),
summary=summary,
name_embedding=name_embedding,
)
def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None:
subject = relation.get("subject", "").strip()
predicate = relation.get("predicate", "").strip()
obj = relation.get("object", "").strip()
subject = relation.get('subject', '').strip()
predicate = relation.get('predicate', '').strip()
obj = relation.get('object', '').strip()
if not subject or not predicate or not obj:
return
self._upsert_entity(
meeting_id,
{
"name": subject,
"entity_type": relation.get("subject_type", ""),
"description": "",
},
{'name': subject, 'entity_type': relation.get('subject_type', ''), 'description': ''},
)
self._upsert_entity(
meeting_id,
{
"name": obj,
"entity_type": relation.get("object_type", ""),
"description": "",
},
{'name': obj, 'entity_type': relation.get('object_type', ''), 'description': ''},
)
fact_text = self._fact_text(relation)
fact_id = hashlib.md5(
f"{meeting_id}|{subject}|{predicate}|{obj}".encode("utf-8")
f'{meeting_id}|{subject}|{predicate}|{obj}'.encode('utf-8')
).hexdigest()
fact_embedding = embedding_service.embed_text(fact_text)
self.run_query(
"""
self.run_query('''
MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id})
MATCH (s:Entity {name: $subject})
MATCH (o:Entity {name: $object})
@ -338,19 +415,19 @@ class Neo4jGraphStore:
MERGE (ep)-[:HAS_FACT]->(f)
MERGE (s)-[:FACT_SOURCE]->(f)
MERGE (f)-[:FACT_TARGET]->(o)
""",
''',
meeting_id=meeting_id,
subject=subject,
predicate=predicate,
object=obj,
fact_id=fact_id,
fact=fact_text,
description=relation.get("description", ""),
qualifiers=relation.get("qualifiers", []),
evidence=relation.get("evidence", ""),
confidence=relation.get("confidence", 0.0),
valid_at=relation.get("valid_at", ""),
invalid_at=relation.get("invalid_at", ""),
description=relation.get('description', ''),
qualifiers=relation.get('qualifiers', []),
evidence=relation.get('evidence', ''),
confidence=relation.get('confidence', 0.0),
valid_at=relation.get('valid_at', ''),
invalid_at=relation.get('invalid_at', ''),
meeting_date=meeting_date,
fact_embedding=fact_embedding,
)
@ -358,9 +435,7 @@ class Neo4jGraphStore:
def remove_meeting_subgraph(self, meeting_id: str) -> None:
if not self.enabled:
return
self.run_query(
"""
self.run_query('''
MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id})
OPTIONAL MATCH (ep)-[mention:MENTIONS]->(entity:Entity)
OPTIONAL MATCH (ep)-[has_fact:HAS_FACT]->(fact:Fact)
@ -378,66 +453,49 @@ class Neo4jGraphStore:
WITH entity, count(m1) + count(m2) AS refs
WHERE refs = 0
DELETE entity
""",
meeting_id=meeting_id,
)
''', meeting_id=meeting_id)
def get_meeting(self, title: str, date: str = "") -> Optional[Dict[str, Any]]:
# ==================== Retrieval ====================
def get_meeting(self, title: str, date: str = '') -> Optional[Dict[str, Any]]:
if not self.enabled:
return None
rows = self.run_query(
"""
rows = self.run_query('''
MATCH (m:Meeting)
WHERE m.title = $title
AND ($date = '' OR m.date = $date)
RETURN m.meeting_id AS meeting_id,
m.title AS title,
m.date AS date,
m.summary AS summary,
m.content_hash AS content_hash
WHERE m.title = $title AND ($date = '' OR m.date = $date)
RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date,
m.summary AS summary, m.content_hash AS content_hash
LIMIT 1
""",
title=title,
date=date,
)
''', title=title, date=date)
return rows[0] if rows else None
def find_similar_episode(self, text: str, threshold: float = 0.92) -> Optional[Dict[str, Any]]:
if not self.enabled or not text.strip():
return None
query_embedding = embedding_service.embed_text(text)
rows = self.run_query(
"""
rows = self.run_query('''
MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode)
RETURN m.meeting_id AS meeting_id,
m.title AS title,
m.date AS date,
m.content_hash AS content_hash,
ep.content_embedding AS content_embedding
"""
)
RETURN m.meeting_id AS meeting_id, m.title AS title, m.date AS date,
m.content_hash AS content_hash, ep.content_embedding AS content_embedding
''')
best_match = None
for row in rows:
score = _cosine_similarity(query_embedding, row.get("content_embedding", []))
if score >= threshold and (best_match is None or score > best_match["score"]):
score = _cosine_similarity(query_embedding, row.get('content_embedding', []))
if score >= threshold and (best_match is None or score > best_match['score']):
best_match = {
"metadata": {
"meeting_id": row.get("meeting_id", ""),
"title": row.get("title", ""),
"date": row.get("date", ""),
"content_hash": row.get("content_hash", ""),
'metadata': {
'meeting_id': row.get('meeting_id', ''),
'title': row.get('title', ''),
'date': row.get('date', ''),
'content_hash': row.get('content_hash', ''),
},
"score": score,
'score': score,
}
return best_match
def hybrid_search(self, question: str, limit: int = 5) -> List[Dict[str, Any]]:
if not self.enabled or not question.strip():
return []
query_embedding = embedding_service.embed_text(question)
candidates = self._load_fact_candidates()
candidates.extend(self._load_entity_candidates())
@ -445,31 +503,26 @@ class Neo4jGraphStore:
scored = []
for item in candidates:
combined_text = " ".join(
[
str(item.get("title") or ""),
str(item.get("text") or ""),
str(item.get("meeting_title") or ""),
str(item.get("date") or ""),
]
)
semantic = _cosine_similarity(query_embedding, item.get("embedding", []))
combined_text = ' '.join([
str(item.get('title') or ''),
str(item.get('text') or ''),
str(item.get('meeting_title') or ''),
str(item.get('date') or ''),
])
semantic = _cosine_similarity(query_embedding, item.get('embedding', []))
lexical = _keyword_score(combined_text, question)
graph_bonus = 0.1 if item.get("kind") == "fact" else 0.05
graph_bonus = 0.1 if item.get('kind') == 'fact' else 0.05
score = semantic * 0.7 + lexical * 0.2 + graph_bonus
if score <= 0:
continue
scored.append({
**item,
'score': round(score, 4),
'semantic_score': round(semantic, 4),
'keyword_score': round(lexical, 4),
})
scored.append(
{
**item,
"score": round(score, 4),
"semantic_score": round(semantic, 4),
"keyword_score": round(lexical, 4),
}
)
scored.sort(key=lambda row: row["score"], reverse=True)
scored.sort(key=lambda row: row['score'], reverse=True)
return scored[:limit]
def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]:
@ -478,43 +531,35 @@ class Neo4jGraphStore:
def get_graph_kinds(self) -> List[Dict[str, Any]]:
if not self.enabled:
return []
rows = self.run_query(
"""
return self.run_query('''
MATCH (n)
WHERE n:Meeting OR n:Episode OR n:Entity OR n:Fact
WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind
RETURN kind, count(*) AS count
ORDER BY count DESC
"""
)
return rows
RETURN kind, count(*) AS count ORDER BY count DESC
''')
def get_entity_types(self) -> List[Dict[str, Any]]:
if not self.enabled:
return []
return self.run_query(
"""
return self.run_query('''
MATCH (e:Entity)
WHERE coalesce(e.entity_type, '') <> ''
RETURN e.entity_type AS entity_type, count(*) AS count
ORDER BY count DESC
"""
)
RETURN e.entity_type AS entity_type, count(*) AS count ORDER BY count DESC
''')
def get_graph_snapshot(
self,
query: str = "",
query: str = '',
entity_types: Optional[List[str]] = None,
kinds: Optional[List[str]] = None,
limit_nodes: int = 80,
limit_edges: int = 160,
) -> Dict[str, Any]:
if not self.enabled:
return {"nodes": [], "edges": [], "stats": {"enabled": False}}
return {'nodes': [], 'edges': [], 'stats': {'enabled': False}}
keyword_terms = _keyword_terms(query) if query else []
raw_nodes = self.run_query(
"""
raw_nodes = self.run_query('''
MATCH (n)
WHERE (n:Meeting OR n:Episode OR n:Entity OR n:Fact)
AND ($kinds = [] OR [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] IN $kinds)
@ -543,55 +588,54 @@ class Neo4jGraphStore:
count(DISTINCT r) AS degree
ORDER BY degree DESC, coalesce(n.title, n.name, n.fact) ASC
LIMIT $limit_nodes
""",
''',
terms=keyword_terms,
types=entity_types or [],
kinds=kinds or [],
limit_nodes=limit_nodes,
)
if not raw_nodes:
return {"nodes": [], "edges": [], "stats": self.get_stats()}
return {'nodes': [], 'edges': [], 'stats': self.get_stats()}
all_raw_ids = set()
nodes = []
for row in raw_nodes:
kind = row.get("kind", "")
if kind == "Meeting":
raw_id = row.get("meeting_id", "")
label = row.get("title", "") or raw_id
elif kind == "Episode":
raw_id = row.get("episode_id", "")
label = row.get("title", "") or raw_id
elif kind == "Entity":
raw_id = row.get("entity_name", "")
kind = row.get('kind', '')
if kind == 'Meeting':
raw_id = row.get('meeting_id', '')
label = row.get('title', '') or raw_id
elif kind == 'Episode':
raw_id = row.get('episode_id', '')
label = row.get('title', '') or raw_id
elif kind == 'Entity':
raw_id = row.get('entity_name', '')
label = raw_id
elif kind == "Fact":
raw_id = row.get("fact_id", "")
label = row.get("predicate", "") or row.get("fact", "") or raw_id
elif kind == 'Fact':
raw_id = row.get('fact_id', '')
label = row.get('predicate', '') or row.get('fact', '') or raw_id
else:
continue
if not raw_id:
continue
nid = f"{kind}:{raw_id}"
nid = f'{kind}:{raw_id}'
all_raw_ids.add(raw_id)
nodes.append({
"id": nid,
"label": label,
"kind": kind,
"entity_type": row.get("entity_type", "") if kind == "Entity" else "",
"description": row.get("description", "") or row.get("summary", "") or "",
"date": row.get("date", "") or row.get("meeting_date", "") or "",
"degree": row.get("degree", 0),
"fact": row.get("fact", "") if kind == "Fact" else "",
"summary": row.get("summary", "") or "",
'id': nid,
'label': label,
'kind': kind,
'entity_type': row.get('entity_type', '') if kind == 'Entity' else '',
'description': row.get('description', '') or row.get('summary', '') or '',
'date': row.get('date', '') or row.get('meeting_date', '') or '',
'degree': row.get('degree', 0),
'fact': row.get('fact', '') if kind == 'Fact' else '',
'summary': row.get('summary', '') or '',
})
if not nodes:
return {"nodes": [], "edges": [], "stats": self.get_stats()}
return {'nodes': [], 'edges': [], 'stats': self.get_stats()}
ids_list = list(all_raw_ids)
edges_raw = self.run_query(
"""
edges_raw = self.run_query('''
MATCH (s)-[r]->(t)
WHERE type(r) IN ['HAS_EPISODE','MENTIONS','HAS_FACT','FACT_SOURCE','FACT_TARGET']
AND (
@ -632,65 +676,59 @@ class Neo4jGraphStore:
CASE WHEN s:Fact THEN coalesce(s.meeting_id, '')
WHEN t:Fact THEN coalesce(t.meeting_id, '') ELSE '' END AS fact_meeting_id
LIMIT $limit_edges
""",
ids=list(all_raw_ids),
limit_edges=limit_edges,
)
''', ids=list(all_raw_ids), limit_edges=limit_edges)
degree_map: Dict[str, int] = {}
for row in edges_raw:
src = row.get("source", "")
tgt = row.get("target", "")
degree_map[src] = degree_map.get(src, 0) + 1
degree_map[tgt] = degree_map.get(tgt, 0) + 1
src = row.get('source_raw', '')
tgt = row.get('target_raw', '')
sk = row.get('source_kind', '')
tk = row.get('target_kind', '')
if sk and src:
degree_map[f'{sk}:{src}'] = degree_map.get(f'{sk}:{src}', 0) + 1
if tk and tgt:
degree_map[f'{tk}:{tgt}'] = degree_map.get(f'{tk}:{tgt}', 0) + 1
for node in nodes:
node["degree"] = degree_map.get(node["id"], node.get("degree", 0))
node['degree'] = degree_map.get(node['id'], node.get('degree', 0))
edges = []
for idx, row in enumerate(edges_raw, start=1):
sk = row.get("source_kind", "")
tk = row.get("target_kind", "")
sk = row.get('source_kind', '')
tk = row.get('target_kind', '')
edges.append({
"id": f"edge_{idx}",
"source": f"{sk}:{row['source_raw']}" if sk and row.get("source_raw") else "",
"target": f"{tk}:{row['target_raw']}" if tk and row.get("target_raw") else "",
"predicate": row.get("predicate", ""),
"fact": row.get("fact_text", "") or row.get("fact_description", "") or "",
"description": row.get("fact_description", "") or "",
"confidence": row.get("fact_confidence", 0.0),
"date": row.get("fact_date", "") or "",
"meeting_id": row.get("fact_meeting_id", "") or "",
'id': f'edge_{idx}',
'source': f'{sk}:{row["source_raw"]}' if sk and row.get('source_raw') else '',
'target': f'{tk}:{row["target_raw"]}' if tk and row.get('target_raw') else '',
'predicate': row.get('predicate', ''),
'fact': row.get('fact_text', '') or row.get('fact_description', '') or '',
'description': row.get('fact_description', '') or '',
'confidence': row.get('fact_confidence', 0.0),
'date': row.get('fact_date', '') or '',
'meeting_id': row.get('fact_meeting_id', '') or '',
})
return {
"nodes": nodes,
"edges": edges,
"stats": self.get_stats(),
"query": query,
}
return {'nodes': nodes, 'edges': edges, 'stats': self.get_stats(), 'query': query}
def format_search_context(self, question: str, top_k: int = 5) -> str:
results = self.hybrid_search(question, limit=top_k)
if not results:
return ""
return ''
lines = []
for idx, row in enumerate(results, start=1):
date = row.get("date", "")
meeting_title = row.get("meeting_title", "")
title = row.get("title", row.get("kind", "item"))
suffix = f" ({date})" if date else ""
source = f" | 来源会议: {meeting_title}" if meeting_title else ""
date = row.get('date', '')
meeting_title = row.get('meeting_title', '')
title = row.get('title', row.get('kind', 'item'))
suffix = f' ({date})' if date else ''
source = f' | 来源会议: {meeting_title}' if meeting_title else ''
lines.append(
f"[{idx}] {title}{suffix}{source}\n"
f"{row.get('text', '')}\n"
f"score={row.get('score', 0):.4f}, semantic={row.get('semantic_score', 0):.4f}, keyword={row.get('keyword_score', 0):.4f}"
f'[{idx}] {title}{suffix}{source}\n'
f'{row.get("text", "")}\n'
f'score={row.get("score", 0):.4f}, semantic={row.get("semantic_score", 0):.4f}, keyword={row.get("keyword_score", 0):.4f}'
)
return "\n\n".join(lines)
return '\n\n'.join(lines)
def _load_fact_candidates(self) -> List[Dict[str, Any]]:
return self.run_query(
"""
return self.run_query('''
MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact)
OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f)
OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity)
@ -698,23 +736,21 @@ class Neo4jGraphStore:
coalesce(s.name + ' -[' + coalesce(f.predicate, '') + ']-> ' + o.name, f.fact) AS title,
coalesce(
f.description + CASE
WHEN size(coalesce(f.qualifiers, [])) > 0 THEN ' | ' + reduce(acc = '', item IN f.qualifiers |
WHEN size(coalesce(f.qualifiers, [])) > 0
THEN ' | ' + reduce(acc = '', item IN f.qualifiers |
acc + CASE WHEN acc = '' THEN item ELSE '; ' + item END
)
ELSE ''
END,
f.fact,
''
f.fact, ''
) AS text,
ep.date AS date,
ep.title AS meeting_title,
f.fact_embedding AS embedding
"""
)
''')
def _load_entity_candidates(self) -> List[Dict[str, Any]]:
return self.run_query(
"""
return self.run_query('''
MATCH (e:Entity)
OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(e)
RETURN 'entity' AS kind,
@ -723,12 +759,10 @@ class Neo4jGraphStore:
max(ep.date) AS date,
head(collect(DISTINCT ep.title)) AS meeting_title,
e.name_embedding AS embedding
"""
)
''')
def _load_episode_candidates(self) -> List[Dict[str, Any]]:
return self.run_query(
"""
return self.run_query('''
MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode)
RETURN 'episode' AS kind,
m.title AS title,
@ -736,47 +770,45 @@ class Neo4jGraphStore:
ep.date AS date,
m.title AS meeting_title,
ep.content_embedding AS embedding
"""
)
''')
@staticmethod
def _entity_summary(entity: dict) -> str:
entity_type = entity.get("entity_type", "").strip()
name = entity.get("name", "").strip()
description = entity.get("description", "").strip()
entity_type = entity.get('entity_type', '').strip()
name = entity.get('name', '').strip()
description = entity.get('description', '').strip()
parts = [part for part in [entity_type, name, description] if part]
return " | ".join(parts)
return ' | '.join(parts)
@staticmethod
def _fact_text(relation: dict) -> str:
subject = relation.get("subject", "").strip()
predicate = relation.get("predicate", "").strip()
obj = relation.get("object", "").strip()
description = relation.get("description", "").strip()
fact = relation.get("fact", "").strip() or f"{subject} {predicate} {obj}".strip()
qualifiers = relation.get("qualifiers", [])
qualifier_text = "; ".join(item for item in qualifiers if item)
if description and qualifier_text:
return f"{fact}. {description}. {qualifier_text}"
subject = relation.get('subject', '').strip()
predicate = relation.get('predicate', '').strip()
obj = relation.get('object', '').strip()
description = relation.get('description', '').strip()
fact = relation.get('fact', '').strip() or f'{subject} {predicate} {obj}'.strip()
qualifiers = relation.get('qualifiers', [])
qualifier_text = '; '.join(item for item in qualifiers if item)
parts = [fact]
if description:
return f"{fact}. {description}"
parts.append(description)
if qualifier_text:
return f"{fact}. {qualifier_text}"
return fact
parts.append(qualifier_text)
return '. '.join(parts)
@staticmethod
def _build_episode_text(meeting_data: dict) -> str:
payload = {
"title": meeting_data.get("title", ""),
"date": meeting_data.get("date", ""),
"participants": meeting_data.get("participants", []),
"summary": meeting_data.get("summary", ""),
"entities": meeting_data.get("entities", []),
"relations": meeting_data.get("relations", []),
"action_items": meeting_data.get("action_items", []),
"metrics": meeting_data.get("metrics", []),
"decisions": meeting_data.get("decisions", []),
"original_text": meeting_data.get("_original_text", ""),
'title': meeting_data.get('title', ''),
'date': meeting_data.get('date', ''),
'participants': meeting_data.get('participants', []),
'summary': meeting_data.get('summary', ''),
'entities': meeting_data.get('entities', []),
'relations': meeting_data.get('relations', []),
'action_items': meeting_data.get('action_items', []),
'metrics': meeting_data.get('metrics', []),
'decisions': meeting_data.get('decisions', []),
'original_text': meeting_data.get('_original_text', ''),
}
return json.dumps(payload, ensure_ascii=False)

View File

@ -1,9 +1,18 @@
import hashlib
import logging
from typing import Callable, Optional
from typing import Callable, List, Optional
from meeting_memory.config import config
from meeting_memory.extractor import MeetingExtraction, extract_meeting_info
from meeting_memory.extractor import (
MeetingExtraction,
extract_entities_from_text,
extract_facts_from_text,
extract_meeting_info as monolithic_extract,
)
from meeting_memory.extractor import (
resolve_entities_against_graph,
resolve_facts_against_graph,
)
from meeting_memory.graph_store import graph_store
from meeting_memory.meeting_state import MeetingStateStore
from meeting_memory.raw_store import raw_meeting_store
@ -15,8 +24,9 @@ ProgressCallback = Callable[[int, int, str], None]
class MeetingProcessor:
def process_meeting_file(self, filepath: str, force: bool = False) -> Optional[str]:
with open(filepath, "r", encoding="utf-8") as file_obj:
with open(filepath, 'r', encoding='utf-8') as file_obj:
text = file_obj.read()
return self.process_meeting_text(text, force=force)
@ -26,147 +36,312 @@ class MeetingProcessor:
force: bool = False,
interactive: bool = True,
progress_callback: Optional[ProgressCallback] = None,
use_multistep_extraction: bool = True,
) -> Optional[str]:
def report(step: int, message: str) -> None:
def report(step: int, total: int, message: str) -> None:
if progress_callback:
progress_callback(step, 7, message)
print(f"[{step}/7] {message}")
progress_callback(step, total, message)
print(f'[{step}/{total}] {message}')
report(1, "计算内容哈希")
if use_multistep_extraction:
return self._process_multistep(text, force, interactive, report)
else:
return self._process_monolithic(text, force, interactive, report)
def _process_monolithic(
self, text: str, force: bool, interactive: bool,
report: Callable,
) -> Optional[str]:
total_steps = 7
report(1, total_steps, '计算内容哈希')
content_hash = self._compute_content_hash(text)
if not force and state_store.has_content_hash(content_hash):
logger.info("Duplicate content hash skipped: %s", content_hash[:12])
logger.info('Duplicate content hash skipped: %s', content_hash[:12])
return None
if not force:
report(2, "Neo4j 语义相似去重检索")
report(2, total_steps, 'Neo4j 语义相似去重检索')
similar = graph_store.find_similar_episode(text, threshold=0.92)
if similar:
meta = similar["metadata"]
meta = similar['metadata']
if not interactive:
logger.info(
"Skipped similar meeting in non-interactive mode: %s",
meta.get("title", ""),
)
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
print(
f"\n发现相似会议:{meta.get('title', '')} ({meta.get('date', '')}) "
f"相似度 {similar['score']:.2%}"
)
print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}')
while True:
choice = input("选择 [s]跳过 / [o]覆盖(默认 s").strip().lower() or "s"
if choice == "s":
logger.info("Skipped similar meeting: %s", meta.get("title", ""))
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
if choice == "o":
if choice == 'o':
force = True
break
print("请输入 s 或 o。")
print('请输入 s 或 o。')
else:
report(2, "跳过语义去重,按覆盖模式继续")
report(2, total_steps, '跳过语义去重,按覆盖模式继续')
report(3, "调用大模型抽取结构化信息")
meeting_data = self._extract(text)
report(3, total_steps, '调用大模型抽取结构化信息(单步模式)')
meeting_data = self._extract_monolithic(text)
if not meeting_data:
logger.error("Failed to extract meeting information")
logger.error('Failed to extract meeting information')
return None
data_dict = meeting_data.model_dump()
data_dict["_content_hash"] = content_hash
data_dict["_graph_meeting_id"] = graph_store.meeting_id(data_dict)
return self._finish_pipeline(data_dict, content_hash, text, force, interactive, report, total_steps)
report(4, "检查标题和日期重复")
def _process_multistep(
self, text: str, force: bool, interactive: bool,
report: Callable,
) -> Optional[str]:
total_steps = 10
report(1, total_steps, '计算内容哈希')
content_hash = self._compute_content_hash(text)
if not force and state_store.has_content_hash(content_hash):
logger.info('Duplicate content hash skipped: %s', content_hash[:12])
return None
if not force:
report(2, total_steps, 'Neo4j 语义相似去重检索')
similar = graph_store.find_similar_episode(text, threshold=0.92)
if similar:
meta = similar['metadata']
if not interactive:
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
print(f'\n发现相似会议:{meta.get("title", "")} ({meta.get("date", "")}) 相似度 {similar["score"]:.2%}')
while True:
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped similar meeting: %s', meta.get('title', ''))
return None
if choice == 'o':
force = True
break
print('请输入 s 或 o。')
else:
report(2, total_steps, '跳过语义去重,按覆盖模式继续')
# Step 3: 提取标题、日期、参与人等元信息
report(3, total_steps, '抽取会议元信息(标题、日期、参与者等)')
meta_info = self._extract_monolithic(text)
if not meta_info:
logger.error('Failed to extract meeting metadata')
return None
data_dict = meta_info.model_dump()
data_dict['_content_hash'] = content_hash
data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict)
data_dict['_original_text'] = text
# Step 4: 抽取实体节点LLM 调用 1
report(4, total_steps, '第 1 步实体抽取:识别会议中提及的实体')
previous_episodes = self._get_previous_episodes_context(data_dict)
extracted_entities = extract_entities_from_text(
text, previous_episodes=previous_episodes, stream=True
)
logger.info('Extracted %d entities from meeting', len(extracted_entities))
if not extracted_entities:
logger.warning('No entities extracted, aborting')
return None
# Step 5: 实体去重(与已有图谱对比 + LLM 裁决)
report(5, total_steps, '实体去重:与图谱中已有实体对比')
resolved_entities = self._dedup_entities(extracted_entities, text)
data_dict['entities'] = resolved_entities
logger.info('After dedup: %d entities remain', len(resolved_entities))
# Step 6: 抽取事实关系LLM 调用 2
report(6, total_steps, '事实抽取:提取实体间的结构化关系')
reference_time = data_dict.get('date', '')
extracted_facts = extract_facts_from_text(
text, resolved_entities,
reference_time=reference_time,
previous_episodes=previous_episodes,
stream=True,
)
logger.info('Extracted %d facts from meeting', len(extracted_facts))
# Step 7: 事实去重与矛盾检测
report(7, total_steps, '事实解析:去重与矛盾检测')
resolved_facts = self._dedup_facts(extracted_facts, data_dict)
data_dict['relations'] = resolved_facts
logger.info('After dedup: %d facts remain', len(resolved_facts))
# Step 8: 检查标题和日期重复
report(8, total_steps, '检查标题和日期重复')
should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive)
if should_skip:
return None
meeting_title = data_dict.get("title", "")
meeting_date = data_dict.get("date", "")
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
report(5, "归档原始会议文本")
# Step 9: 归档 + 合并行动项/指标
report(9, total_steps, '归档和状态合并')
raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date)
data_dict["_original_text"] = text
data_dict["_original_text_path"] = raw_path
data_dict['_original_text_path'] = raw_path
meeting_filename = f"{graph_store.meeting_id(data_dict)}.md"
report(6, "合并行动项和指标状态")
data_dict["action_items"] = state_store.merge_action_items(
data_dict.get("action_items", []),
meeting_title,
meeting_date,
meeting_filename,
meeting_filename = f'{graph_store.meeting_id(data_dict)}.md'
data_dict['action_items'] = state_store.merge_action_items(
data_dict.get('action_items', []),
meeting_title, meeting_date, meeting_filename,
)
data_dict["metrics"] = state_store.merge_metrics(
data_dict.get("metrics", []),
meeting_title,
meeting_date,
meeting_filename,
data_dict['metrics'] = state_store.merge_metrics(
data_dict.get('metrics', []),
meeting_title, meeting_date, meeting_filename,
)
state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename)
state_store.save()
report(7, "写入 Neo4j 图谱和检索数据")
# Step 10: 写入 Neo4j
report(10, total_steps, '写入 Neo4j 图谱')
graph_store.upsert_meeting_subgraph(data_dict)
logger.info("Meeting processed: %s", meeting_title)
logger.info('Meeting processed (multi-step): %s', meeting_title)
return raw_path
def _get_previous_episodes_context(self, data_dict: dict) -> list:
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
series_info = state_store.get_series_info(meeting_title)
if not series_info:
return []
processed = series_info.get('processed_titles', [])
if not processed:
return []
rows = graph_store.run_query('''
MATCH (m:Meeting)
WHERE m.title IN $titles
OPTIONAL MATCH (m)-[:HAS_EPISODE]->(ep:Episode)
RETURN m.title AS title, m.date AS date, ep.summary AS summary, ep.content AS content
ORDER BY m.date DESC
LIMIT 3
''', titles=processed[-3:])
return [{'content': r.get('content', r.get('summary', '')), 'timestamp': r.get('date', '')} for r in rows]
def _dedup_entities(self, extracted: list, text: str) -> list:
try:
existing = graph_store.get_entities_map()
if not existing:
return extracted
existing_list = [
{
'candidate_id': i,
'name': v['name'],
'entity_type': v.get('entity_type', ''),
'summary': v.get('summary', '') or v.get('description', ''),
}
for i, v in enumerate(existing.values())
]
return resolve_entities_against_graph(extracted, existing_list, episode_content=text)
except Exception as exc:
logger.warning('Entity dedup failed, keeping all extracted: %s', exc)
return extracted
def _dedup_facts(self, facts: list, data_dict: dict) -> list:
resolved = []
for fact in facts:
try:
source = fact.get('source_entity_name', '')
target = fact.get('target_entity_name', '')
existing = graph_store.get_facts_between(source, target)
if not existing:
resolved.append(fact)
continue
result = resolve_facts_against_graph(fact, existing, [])
if isinstance(result, dict) and result.get('is_duplicate'):
logger.debug('Skipped duplicate fact: %s', fact.get('fact', ''))
continue
resolved.append(fact)
except Exception as exc:
logger.warning('Fact dedup failed, keeping: %s', exc)
resolved.append(fact)
return resolved
def _finish_pipeline(
self, data_dict: dict, content_hash: str, text: str,
force: bool, interactive: bool, report: Callable, total_steps: int,
) -> Optional[str]:
data_dict['_content_hash'] = content_hash
data_dict['_graph_meeting_id'] = graph_store.meeting_id(data_dict)
report(4, total_steps, '检查标题和日期重复')
should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive)
if should_skip:
return None
meeting_title = data_dict.get('title', '')
meeting_date = data_dict.get('date', '')
report(5, total_steps, '归档原始会议文本')
raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date)
data_dict['_original_text'] = text
data_dict['_original_text_path'] = raw_path
meeting_filename = f'{graph_store.meeting_id(data_dict)}.md'
report(6, total_steps, '合并行动项和指标状态')
data_dict['action_items'] = state_store.merge_action_items(
data_dict.get('action_items', []), meeting_title, meeting_date, meeting_filename,
)
data_dict['metrics'] = state_store.merge_metrics(
data_dict.get('metrics', []), meeting_title, meeting_date, meeting_filename,
)
state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename)
state_store.save()
report(7, total_steps, '写入 Neo4j 图谱和检索数据')
graph_store.upsert_meeting_subgraph(data_dict)
logger.info('Meeting processed: %s', meeting_title)
return raw_path
def _handle_duplicate(self, data_dict: dict, force: bool, interactive: bool = True) -> bool:
title = data_dict.get("title", "")
date = data_dict.get("date", "")
title = data_dict.get('title', '')
date = data_dict.get('date', '')
existing = graph_store.get_meeting(title, date)
if not existing:
return False
if force:
logger.info("Duplicate meeting found; overwriting in force mode: %s", title)
logger.info('Duplicate meeting found; overwriting in force mode: %s', title)
self._remove_old(data_dict, existing)
return False
if not interactive:
logger.info("Skipped duplicate meeting in non-interactive mode: %s", title)
logger.info('Skipped duplicate meeting in non-interactive mode: %s', title)
return True
print(f"\n发现重复会议:{title} ({date})")
print(f'\n发现重复会议:{title} ({date})')
while True:
choice = input("选择 [s]跳过 / [o]覆盖(默认 s").strip().lower() or "s"
if choice == "s":
logger.info("Skipped duplicate meeting: %s", title)
choice = input('选择 [s]跳过 / [o]覆盖(默认 s').strip().lower() or 's'
if choice == 's':
logger.info('Skipped duplicate meeting: %s', title)
return True
if choice == "o":
if choice == 'o':
self._remove_old(data_dict, existing)
return False
print("请输入 s 或 o。")
print('请输入 s 或 o。')
def _remove_old(self, data_dict: dict, existing: Optional[dict] = None) -> None:
meeting_id = graph_store.meeting_id(data_dict)
graph_store.remove_meeting_subgraph(meeting_id)
new_hash = data_dict.get("_content_hash", "")
new_hash = data_dict.get('_content_hash', '')
if new_hash:
state_store.remove_content_hash(new_hash)
if existing:
old_hash = existing.get("content_hash", "")
old_hash = existing.get('content_hash', '')
if old_hash and old_hash != new_hash:
state_store.remove_content_hash(old_hash)
logger.info("Removed old meeting artifacts: %s", data_dict.get("title", ""))
logger.info('Removed old meeting artifacts: %s', data_dict.get('title', ''))
def _compute_content_hash(self, text: str) -> str:
normalized = text.strip().replace("\r\n", "\n")
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
normalized = text.strip().replace('\r\n', '\n')
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
def _extract(self, text: str) -> Optional[MeetingExtraction]:
def _extract_monolithic(self, text: str) -> Optional[MeetingExtraction]:
try:
return extract_meeting_info(text, stream=True)
return monolithic_extract(text, stream=True)
except Exception as exc:
logger.error("LLM extraction failed: %s", exc)
logger.error('LLM extraction failed: %s', exc)
return None
def query(self, question: str, top_k: int = 3) -> str:
@ -174,10 +349,10 @@ class MeetingProcessor:
def stats(self) -> dict:
return {
"graph": graph_store.get_stats(),
"state": state_store.get_stats(),
"raw_dir": config.storage.raw_dir,
"state_path": config.state_path,
'graph': graph_store.get_stats(),
'state': state_store.get_stats(),
'raw_dir': config.storage.raw_dir,
'state_path': config.state_path,
}

View File

@ -0,0 +1,5 @@
from .extract_nodes import extract_entities
from .extract_edges import extract_facts
from .dedupe_nodes import resolve_entities
from .dedupe_edges import resolve_facts
from .summarize_nodes import summarize_entity

View File

@ -0,0 +1,49 @@
from typing import Any
def resolve_facts(context: dict[str, Any]) -> list[dict]:
existing_facts = context.get('existing_facts', [])
new_fact = context.get('new_fact', '')
invalidation_candidates = context.get('invalidation_candidates', [])
existing_text = '\n'.join(
f' [idx={i}] {f.get("fact", "")}' for i, f in enumerate(existing_facts)
)
invalidation_text = '\n'.join(
f' [idx={i + len(existing_facts)}] {f.get("fact", "")}'
for i, f in enumerate(invalidation_candidates)
)
user_prompt = f"""
<已有事实>
{existing_text}
</已有事实>
<事实失效候选>
{invalidation_text}
</事实失效候选>
<新事实>
{new_fact}
</新事实>
注意idx 编号是连续的已有事实从 0 开始失效候选紧随其后
任务
1. **重复检测**如果<新事实><已有事实>中的某条描述的是完全相同的客观事实返回该 idx
2. **矛盾检测**如果<新事实><已有事实><失效候选>中的某条相互矛盾如状态已更新数值已变更返回该 idx
返回格式
{{"duplicate_facts": [idx列表], "contradicted_facts": [idx列表]}}
如果没有重复或矛盾返回空列表
示例
- 新事实"张三负责宽带运维项目" vs 已有"张三负责宽带运维" 重复相同事实
- 新事实"宽带用户数当前值 8500" vs 已有"宽带用户数目标值 10000" 不重复不矛盾数值维度不同
- 新事实"宽带用户数当前值 9000" vs 已有"宽带用户数 8000" 矛盾同一指标数值更新
"""
return [
{'role': 'system', 'content': '你是事实去重和矛盾检测助手。判断新事实与已有事实的关系。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,49 @@
from typing import Any
def resolve_entities(context: dict[str, Any]) -> list[dict]:
extracted = context.get('extracted_entities', [])
existing = context.get('existing_entities', [])
episode_content = context.get('episode_content', '')
extracted_text = '\n'.join(
f' [{i}] {e.get("name", "")}{e.get("entity_type", "未知")}{e.get("description", "")}'
for i, e in enumerate(extracted)
)
existing_text = '\n'.join(
f' [candidate_id={c.get("candidate_id", i)}] {c.get("name", "")}{c.get("entity_type", "未知")}{c.get("summary", "")[:100]}'
for i, c in enumerate(existing)
)
user_prompt = f"""
<当前会议内容>
{episode_content}
</当前会议内容>
<新抽取的实体>
{extracted_text}
</新抽取的实体>
<图谱中已有的实体>
{existing_text}
</图谱中已有的实体>
任务判断<新抽取的实体>中的每一个是否与<图谱中已有的实体>中的某个是同一个真实世界对象
判断标准
- **是重复**两个名称指向同一个真实世界的人组织地点项目指标等
- **不是重复**名称相似但指向不同实体如两个同名但不同的人同名的不同项目
对每个新抽取的实体返回
- id: 对应新抽取实体列表中的序号
- name: 实体的最佳名称优先使用已有实体中的更完整名称
- duplicate_candidate_id: 匹配到的已有实体的 candidate_id如果无匹配则填 -1
返回格式 JSON 数组[{{"id": 0, "name": "张三", "duplicate_candidate_id": -1}}, ...]
必须为新抽取的每个实体返回一条记录id 0 开始连续编号
"""
return [
{'role': 'system', 'content': '你是实体去重助手。判断两个实体是否指向同一个真实世界对象。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,59 @@
from typing import Any
def extract_facts(context: dict[str, Any]) -> list[dict]:
previous = context.get('previous_episodes', [])
current = context.get('episode_content', '')
entities = context.get('entities', [])
reference_time = context.get('reference_time', '')
previous_section = ''
if previous:
import json
previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n</历史上下文>\n'
entities_text = '\n'.join(
f' [{i}] {e.get("name", "")}{e.get("entity_type", "未知")}' for i, e in enumerate(entities)
)
user_prompt = f"""
{previous_section}
<当前会议内容>
{current}
</当前会议内容>
<已抽取实体>
{entities_text}
</已抽取实体>
<参考时间>
{reference_time}
</参考时间>
抽取规则
1. <当前会议内容>中抽取上述<已抽取实体>之间的**事实关系**
2. 每条关系必须涉及两个**不同**的实体
3. 返回 JSON 数组格式
[{{
"source_entity_name": "源实体名称(必须来自上方的实体列表)",
"target_entity_name": "目标实体名称(必须来自上方的实体列表)",
"relation_type": "关系类型,如 负责、汇报、隶属于、参与、目标值、截止于、影响、依赖于",
"fact": "一句自然语言的事实描述,保留原文中所有具体细节(数值、时间、地点等)",
"valid_at": "该事实开始成立的时间ISO 8601格式如 2025-04-30T00:00:00Z不明确则留空",
"invalid_at": "该事实不再成立的时间,不明确则留空",
"evidence": "原文中的关键证据短句",
"qualifiers": ["限定条件列表,如数值、范围、状态、截止时间等"],
"confidence": 置信度0到1之间
}}]
4. relation_type 避免使用"关联""涉及"等空泛词优先使用具体谓词
负责汇报目标值当前值低于高于要求督导推进支撑依赖计划完成截止于参与隶属于分管协调审批
5. fact 必须是一句完整的自然语言事实保留所有具体信息人名数值产品名地点等
6. 如果根据上下文可以判断事实的开始/结束时间填入 valid_at / invalid_at
"""
return [
{'role': 'system', 'content': '你是一个专业的事实关系抽取专家。从会议记录中抽取实体间的结构化事实关系。'},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,53 @@
from typing import Any
SYSTEM_PROMPT = (
'你是会议纪要实体抽取专家。'
'从会议记录中抽取明确的实体节点,包括人物、组织、地点、项目、指标等。'
'不要抽取抽象概念、情感、时间日期或泛泛的名词。'
)
def extract_entities(context: dict[str, Any]) -> list[dict]:
previous = context.get('previous_episodes', [])
current = context.get('episode_content', '')
entity_types = context.get('entity_types', [])
entity_types_section = ''
if entity_types:
entity_types_section = '\n'.join(
f' - {t["type"]}: {t["description"]}' for t in entity_types
)
else:
entity_types_section = ' - 未限定类型,请根据上下文自行判断'
previous_section = ''
if previous:
import json
previous_section = f'\n<历史上下文>\n{json.dumps(previous, ensure_ascii=False)}\n</历史上下文>\n'
user_prompt = f"""
{previous_section}
<当前会议内容>
{current}
</当前会议内容>
<实体类型>
{entity_types_section}
</实体类型>
抽取规则
1. 只抽取当前会议内容中**明确提及**的实体
2. 每个实体必须是有唯一标识的具体事物人名组织名地名项目名指标名称等
3. 不要抽取代词抽象概念增长改善风险时间日期
4. 如果同一实体在不同来源中以不同名称出现如简称/全称保留最完整的形式
5. 必须返回 JSON 数组格式[{{"name": "实体名称", "entity_type": "类型", "description": "描述", "evidence": "原文证据"}}]
6. description 写一段对该实体的简要描述20字以内
7. evidence 从原文中摘录提及该实体的关键短句
注意实体类型建议使用 Person人物Organization组织Location地点Project项目Metric指标System系统Document文档
"""
return [
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': user_prompt},
]

View File

@ -0,0 +1,41 @@
from typing import Any
def summarize_entity(context: dict[str, Any]) -> list[dict]:
entity_name = context.get('entity_name', '')
existing_summary = context.get('existing_summary', '')
episodes = context.get('episodes', [])
previous = context.get('previous_episodes', [])
existing_section = ''
if existing_summary:
existing_section = f'\n<已有摘要>\n{existing_summary}\n</已有摘要>\n'
previous_section = ''
if previous:
import json
previous_section = f'\n<历史内容>\n{json.dumps(previous, ensure_ascii=False)}\n</历史内容>\n'
episodes_text = '\n---\n'.join(episodes) if isinstance(episodes, list) else episodes
user_prompt = f"""
{previous_section}
<当前内容>
{episodes_text}
</当前内容>
{existing_section}
为实体 **{entity_name}** 生成一段信息密集的摘要
规则
1. 只使用<当前内容><已有摘要>中的事实不要推测
2. 保留所有实质性的人名角色地点日期数值
3. 用第三人称直接陈述事实
4. 不要使用"提及了""讨论了""指出"等元语言动词直接陈述事实
5. 如果会议对已有信息做了更新采用更新的说法
6. 摘要不超过 500
7. 返回 JSON{{"summary": "摘要内容"}}
"""
return [
{'role': 'system', 'content': '你是实体摘要助手。根据会议内容为实体生成信息密集的摘要。'},
{'role': 'user', 'content': user_prompt},
]