From b5649cc218e4650cbeef200752675192fef41237 Mon Sep 17 00:00:00 2001 From: Bifang <915779419@qq.com> Date: Mon, 15 Jun 2026 13:13:20 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84Neo4j=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- meeting_memory/extractor.py | 138 ++++-- meeting_memory/graph_store.py | 442 +++++++++++-------- meeting_memory/meeting_processor.py | 11 +- meeting_memory/prompts/extract_edges.py | 13 +- meeting_memory/prompts/extract_nodes.py | 7 +- meeting_memory/web_demo/static/app.js | 2 +- meeting_memory/web_demo/static/graph.js | 18 +- meeting_memory/web_demo/static_v2/graph.html | 2 +- meeting_memory/web_demo/static_v2/styles.css | 32 +- scripts/migrate_v1_to_v2.py | 186 ++++++++ 10 files changed, 615 insertions(+), 236 deletions(-) create mode 100644 scripts/migrate_v1_to_v2.py diff --git a/meeting_memory/extractor.py b/meeting_memory/extractor.py index 035a45f..bc047bc 100644 --- a/meeting_memory/extractor.py +++ b/meeting_memory/extractor.py @@ -2,6 +2,7 @@ import json import logging import re import sys +from enum import Enum from typing import Any, List, Optional from openai import OpenAI @@ -22,25 +23,62 @@ client = OpenAI( ) +class EntityType(str, Enum): + DEPARTMENT = 'Department' + PROJECT = 'Project' + METRIC = 'Metric' + PERSON = 'Person' + SYSTEM = 'System' + DOCUMENT = 'Document' + PARTICIPANT = 'participant' + UNKNOWN = 'Unknown' + + +# Normalization map: legacy LLM output → canonical type +_ENTITY_TYPE_ALIASES = { + '组织': 'Department', + 'organization': 'Department', + '部门': 'Department', + '指标': 'Metric', + 'kpi': 'Metric', + '项目': 'Project', +} + + +def _canonical_entity_type(raw: str) -> str: + normalized = raw.strip() + if normalized in _ENTITY_TYPE_ALIASES: + return _ENTITY_TYPE_ALIASES[normalized] + for member in EntityType: + if member.value.lower() == normalized.lower(): + return member.value + return EntityType.UNKNOWN.value + + +def _neo4j_labels(entity_type: str) -> list[str]: + canonical = _canonical_entity_type(entity_type) + labels = ['Entity'] + if canonical != EntityType.UNKNOWN.value: + labels.append(canonical) + return labels + + class Entity(BaseModel): name: str - entity_type: str + entity_type: str = EntityType.UNKNOWN.value description: str = '' class Relation(BaseModel): - subject: str - subject_type: str = '' - predicate: str - object: str - object_type: str = '' - description: str = '' + source_entity_name: str + target_entity_name: str + relation_type: str fact: str = '' - qualifiers: List[str] = Field(default_factory=list) - evidence: str = '' - confidence: float = 0.0 valid_at: str = '' invalid_at: str = '' + evidence: str = '' + qualifiers: List[str] = Field(default_factory=list) + confidence: float = 0.0 class ActionItem(BaseModel): @@ -63,6 +101,13 @@ class MeetingMetric(BaseModel): target: str = '' owner: str = '' trend: str = '' + unit: str = '' + + +class DepartmentInfo(BaseModel): + name: str + description: str = '' + projects: List[str] = Field(default_factory=list) class MeetingExtraction(BaseModel): @@ -75,6 +120,7 @@ class MeetingExtraction(BaseModel): action_items: List[ActionItem] = Field(default_factory=list) decisions: List[Decision] = Field(default_factory=list) metrics: List[MeetingMetric] = Field(default_factory=list) + departments: List[DepartmentInfo] = Field(default_factory=list) summary: str = '' @@ -372,7 +418,7 @@ EXTRACTION_SYSTEM_PROMPT = """ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: user_prompt = f""" -请从下面会议记录中提取结构化信息,并重点做"深层关系抽取"。 +请从下面会议记录中提取结构化信息,并重点做"深层关系抽取"和"层次结构识别"。 输出 JSON 字段: - title @@ -380,24 +426,28 @@ def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: - participants - agenda - entities: name, entity_type, description + - entity_type 请使用: Department(部门)、Project(项目)、Metric(指标)、Person(人物)、System(系统)、Document(文档) - relations: - - subject - - subject_type - - predicate - - object - - object_type - - description - - fact - - qualifiers - - evidence - - confidence - - valid_at - - invalid_at + - source_entity_name: 源实体名称 + - target_entity_name: 目标实体名称 + - relation_type: 关系类型(如 HAS_PROJECT、HAS_METRIC、负责、汇报、目标值、推进、依赖) + - fact: 一句自然语言事实描述 + - valid_at(可选) + - invalid_at(可选) + - evidence: 原文证据 + - qualifiers: 限定条件列表 + - confidence: 0~1 - action_items: task, assignee, deadline, status, priority - decisions: content, proposer, status -- metrics: metric_name, value, target, owner, trend +- metrics: metric_name, value, target, owner, trend, unit +- departments: [{{"name": "部门名称", "description": "", "projects": ["项目名1", "项目名2"]}}] - summary +层次关系规则: +1. Department 管辖 Project → relation_type 用 HAS_PROJECT +2. Project 拥有 Metric → relation_type 用 HAS_METRIC +3. 其他事实关系(负责、汇报、目标值等)直接用 relation_type 表达 + 关系抽取规则: 1. 不要只抽"汇报了工作"这种会议动作,要尽量继续下钻出具体事实。 2. 如果一句话里同时包含"主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势",应拆成多条关系或在 qualifiers 中保留这些细节。 @@ -432,6 +482,7 @@ def _normalize_meeting_data(data: dict) -> dict: 'action_items': _normalize_action_items(data.get('action_items')), 'decisions': _normalize_decisions(data.get('decisions')), 'metrics': _normalize_metrics(data.get('metrics')), + 'departments': _normalize_departments(data.get('departments')), 'summary': _as_str(data.get('summary')), } @@ -494,20 +545,16 @@ def _normalize_relations(value) -> List[dict]: for relation in value: if not isinstance(relation, dict): continue - subject = _as_str(relation.get('subject')) - predicate = _as_str(relation.get('predicate')) - obj = _as_str(relation.get('object')) - description = _as_str(relation.get('description')) + source = _as_str(relation.get('source_entity_name') or relation.get('subject', '')) + target = _as_str(relation.get('target_entity_name') or relation.get('object', '')) + rtype = _as_str(relation.get('relation_type') or relation.get('predicate', '')) fact = _as_str(relation.get('fact')) - if not fact and subject and predicate and obj: - fact = f'{subject} {predicate} {obj}' + if not fact and source and rtype and target: + fact = f'{source} {rtype} {target}' items.append({ - 'subject': subject, - 'subject_type': _as_str(relation.get('subject_type')), - 'predicate': predicate, - 'object': obj, - 'object_type': _as_str(relation.get('object_type')), - 'description': description, + 'source_entity_name': source, + 'target_entity_name': target, + 'relation_type': rtype, 'fact': fact, 'qualifiers': _as_str_list(relation.get('qualifiers')), 'evidence': _as_str(relation.get('evidence')), @@ -563,5 +610,24 @@ def _normalize_metrics(value) -> List[dict]: 'target': _as_str(metric.get('target')), 'owner': _as_str(metric.get('owner')), 'trend': _as_str(metric.get('trend')), + 'unit': _as_str(metric.get('unit')), + }) + return items + + +def _normalize_departments(value) -> List[dict]: + if not isinstance(value, list): + return [] + items = [] + for dept in value: + if not isinstance(dept, dict): + continue + name = _as_str(dept.get('name')) + if not name: + continue + items.append({ + 'name': name, + 'description': _as_str(dept.get('description')), + 'projects': _as_str_list(dept.get('projects')), }) return items diff --git a/meeting_memory/graph_store.py b/meeting_memory/graph_store.py index bb2c8c2..806c432 100644 --- a/meeting_memory/graph_store.py +++ b/meeting_memory/graph_store.py @@ -4,6 +4,7 @@ import logging import re import time from datetime import datetime, timezone +from enum import Enum from typing import Any, Dict, List, Optional from meeting_memory.config import config @@ -32,6 +33,45 @@ def _keyword_score(text: str, question: str) -> float: return hits / len(terms) +class _EntityType(str, Enum): + DEPARTMENT = 'Department' + PROJECT = 'Project' + METRIC = 'Metric' + PERSON = 'Person' + SYSTEM = 'System' + DOCUMENT = 'Document' + PARTICIPANT = 'participant' + UNKNOWN = 'Unknown' + + +_ENTITY_TYPE_ALIASES = { + '组织': 'Department', + 'organization': 'Department', + '部门': 'Department', + '指标': 'Metric', + 'kpi': 'Metric', + '项目': 'Project', +} + + +def _canonical_entity_type(raw: str) -> str: + normalized = raw.strip() + if normalized in _ENTITY_TYPE_ALIASES: + return _ENTITY_TYPE_ALIASES[normalized] + for member in _EntityType: + if member.value.lower() == normalized.lower(): + return member.value + return _EntityType.UNKNOWN.value + + +def _neo4j_labels(entity_type: str) -> list[str]: + canonical = _canonical_entity_type(entity_type) + labels = ['Entity'] + if canonical != _EntityType.UNKNOWN.value: + labels.append(canonical) + return labels + + def _keyword_terms(text: str) -> List[str]: normalized = (text or '').lower() raw_terms = re.findall(r'[a-z0-9]+|[\u4e00-\u9fff]{2,}', normalized) @@ -151,11 +191,11 @@ class Neo4jGraphStore: 'CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE', 'CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE', 'CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE', - 'CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE', 'CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)', 'CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)', 'CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)', - 'CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)', + 'CREATE INDEX relates_to_name IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.name)', + 'CREATE INDEX relates_to_fact IF NOT EXISTS FOR ()-[r:RELATES_TO]-() ON (r.fact)', ] for statement in statements: self.run_query(statement) @@ -167,11 +207,14 @@ class Neo4jGraphStore: CALL () { MATCH (m:Meeting) RETURN count(m) AS meetings } CALL () { MATCH (ep:Episode) RETURN count(ep) AS episodes } CALL () { MATCH (e:Entity) RETURN count(e) AS entities } - CALL () { MATCH (f:Fact) RETURN count(f) AS facts } - RETURN meetings, episodes, entities, facts + CALL () { MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS relations } + CALL () { MATCH (d:Department) RETURN count(d) AS departments } + CALL () { MATCH (p:Project) RETURN count(p) AS projects } + CALL () { MATCH (m:Metric) RETURN count(m) AS metrics } + RETURN meetings, episodes, entities, relations, departments, projects, metrics ''') if not rows: - return {'enabled': False, 'meetings': 0, 'episodes': 0, 'entities': 0, 'facts': 0} + return {'enabled': False} return {'enabled': True, **rows[0]} # ==================== Entity Dedup (from Graphiti) ==================== @@ -218,19 +261,16 @@ class Neo4jGraphStore: def get_facts_between(self, source_name: str, target_name: str) -> List[Dict[str, Any]]: return self.run_query(''' - MATCH (s:Entity {name: $source_name})-[source_rel:FACT_SOURCE]->(f:Fact) - WHERE (f)-[:FACT_TARGET]->(:Entity {name: $target_name}) - RETURN f.fact_id AS fact_id, - f.fact AS fact, - f.predicate AS predicate, - f.description AS description, - f.qualifiers AS qualifiers, - f.confidence AS confidence, - f.valid_at AS valid_at, - f.invalid_at AS invalid_at, - f.expired_at AS expired_at, - f.meeting_id AS meeting_id - ORDER BY coalesce(f.valid_at, '') DESC + MATCH (s:Entity {name: $source_name})-[r:RELATES_TO]->(t:Entity {name: $target_name}) + RETURN r.name AS relation_type, + r.fact AS fact, + r.qualifiers AS qualifiers, + r.confidence AS confidence, + r.valid_at AS valid_at, + r.invalid_at AS invalid_at, + r.expired_at AS expired_at, + r.meeting_id AS meeting_id + ORDER BY coalesce(r.valid_at, '') DESC ''', source_name=source_name, target_name=target_name) def search_related_facts( @@ -240,30 +280,23 @@ class Neo4jGraphStore: return [] query_embedding = embedding_service.embed_text(fact_text) rows = self.run_query(''' - MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact) - OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f) - OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity) - RETURN f.fact_id AS fact_id, - f.fact AS fact, - f.predicate AS predicate, - f.description AS description, - f.fact_embedding AS fact_embedding, - f.valid_at AS valid_at, - f.invalid_at AS invalid_at, - f.expired_at AS expired_at, - coalesce(s.name, '') AS source_name, - coalesce(o.name, '') AS target_name + MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity) + RETURN r.fact AS fact, + r.name AS relation_type, + r.fact_embedding AS fact_embedding, + r.valid_at AS valid_at, + r.invalid_at AS invalid_at, + r.expired_at AS expired_at, + s.name AS source_name, + t.name AS target_name ''') scored = [] for row in rows: score = _cosine_similarity(query_embedding, row.get('fact_embedding', [])) if score > 0.3: scored.append({ - 'idx': len(scored), - 'fact_id': row.get('fact_id', ''), 'fact': row.get('fact', ''), - 'predicate': row.get('predicate', ''), - 'description': row.get('description', ''), + 'relation_type': row.get('relation_type', ''), 'source_name': row.get('source_name', ''), 'target_name': row.get('target_name', ''), 'valid_at': row.get('valid_at', ''), @@ -274,15 +307,15 @@ class Neo4jGraphStore: scored.sort(key=lambda r: r['score'], reverse=True) return scored[:limit] - def mark_fact_expired(self, fact_id: str, expired_at: str | None = None): + def mark_relation_expired(self, source_name: str, target_name: str, relation_type: str, expired_at: str | None = None): if not expired_at: expired_at = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') self.run_query(''' - MATCH (f:Fact {fact_id: $fact_id}) - SET f.expired_at = $expired_at, - f.invalid_at = $expired_at, - f.updated_at = datetime() - ''', fact_id=fact_id, expired_at=expired_at) + MATCH (s:Entity {name: $source_name})-[r:RELATES_TO {name: $relation_type}]->(t:Entity {name: $target_name}) + SET r.expired_at = $expired_at, + r.invalid_at = $expired_at, + r.updated_at = datetime() + ''', source_name=source_name, target_name=target_name, relation_type=relation_type, expired_at=expired_at) # ==================== Core Write Operations ==================== @@ -335,19 +368,28 @@ class Neo4jGraphStore: ) for relation in meeting_data.get('relations', []): - self._upsert_relation(meeting_id, relation, meeting_data.get('date', '')) + self._upsert_direct_edge(meeting_id, relation, meeting_data.get('date', '')) + + self._upsert_hierarchy(meeting_id, meeting_data) + + for metric in meeting_data.get('metrics', []): + self._upsert_metric_node(meeting_id, metric, meeting_data.get('date', '')) def _upsert_entity(self, meeting_id: str, entity: dict) -> None: name = entity.get('name', '').strip() if not name: return + raw_type = entity.get('entity_type', '').strip() + labels = _neo4j_labels(raw_type) summary = self._entity_summary(entity) name_embedding = embedding_service.embed_text(summary or name) - self.run_query(''' - MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) - MERGE (e:Entity {name: $name}) + set_labels = ' SET ' + ', '.join(f'e:{label}' for label in labels[1:]) if len(labels) > 1 else '' + self.run_query(f''' + MATCH (:Meeting {{meeting_id: $meeting_id}})-[:HAS_EPISODE]->(ep:Episode {{episode_id: $meeting_id}}) + MERGE (e:Entity {{name: $name}}) + {set_labels} SET e.entity_type = CASE - WHEN $entity_type <> '' THEN $entity_type + WHEN $type <> '' THEN $type ELSE coalesce(e.entity_type, '') END, e.description = CASE @@ -367,92 +409,152 @@ class Neo4jGraphStore: ''', meeting_id=meeting_id, name=name, - entity_type=entity.get('entity_type', ''), + type=raw_type, description=entity.get('description', ''), summary=summary, name_embedding=name_embedding, ) - def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None: - subject = relation.get('subject', '').strip() - predicate = relation.get('predicate', '').strip() - obj = relation.get('object', '').strip() - if not subject or not predicate or not obj: + def _upsert_direct_edge(self, meeting_id: str, relation: dict, meeting_date: str) -> None: + source = relation.get('source_entity_name', '').strip() + target = relation.get('target_entity_name', '').strip() + rtype = relation.get('relation_type', '').strip() + if not source or not target or not rtype: return self._upsert_entity( meeting_id, - {'name': subject, 'entity_type': relation.get('subject_type', ''), 'description': ''}, + {'name': source, 'entity_type': '', 'description': ''}, ) self._upsert_entity( meeting_id, - {'name': obj, 'entity_type': relation.get('object_type', ''), 'description': ''}, + {'name': target, 'entity_type': '', 'description': ''}, ) - fact_text = self._fact_text(relation) - fact_id = hashlib.md5( - f'{meeting_id}|{subject}|{predicate}|{obj}'.encode('utf-8') - ).hexdigest() + fact_text = self._relation_text(relation) fact_embedding = embedding_service.embed_text(fact_text) self.run_query(''' - MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) - MATCH (s:Entity {name: $subject}) - MATCH (o:Entity {name: $object}) - MERGE (f:Fact {fact_id: $fact_id}) - SET f.fact = $fact, - f.predicate = $predicate, - f.description = $description, - f.qualifiers = $qualifiers, - f.evidence = $evidence, - f.confidence = $confidence, - f.valid_at = $valid_at, - f.invalid_at = $invalid_at, - f.meeting_id = $meeting_id, - f.meeting_date = $meeting_date, - f.fact_embedding = $fact_embedding, - f.updated_at = datetime() - MERGE (ep)-[:HAS_FACT]->(f) - MERGE (s)-[:FACT_SOURCE]->(f) - MERGE (f)-[:FACT_TARGET]->(o) + MATCH (s:Entity {name: $source}) + MATCH (t:Entity {name: $target}) + MERGE (s)-[r:RELATES_TO {name: $rtype}]->(t) + SET r.fact = $fact, + r.fact_embedding = $fact_embedding, + r.evidence = $evidence, + r.qualifiers = $qualifiers, + r.confidence = $confidence, + r.valid_at = $valid_at, + r.invalid_at = $invalid_at, + r.meeting_id = $meeting_id, + r.meeting_date = $meeting_date, + r.updated_at = datetime() ''', meeting_id=meeting_id, - subject=subject, - predicate=predicate, - object=obj, - fact_id=fact_id, + source=source, + target=target, + rtype=rtype, fact=fact_text, - description=relation.get('description', ''), - qualifiers=relation.get('qualifiers', []), + fact_embedding=fact_embedding, evidence=relation.get('evidence', ''), + qualifiers=relation.get('qualifiers', []), confidence=relation.get('confidence', 0.0), valid_at=relation.get('valid_at', ''), invalid_at=relation.get('invalid_at', ''), meeting_date=meeting_date, - fact_embedding=fact_embedding, + ) + + def _upsert_hierarchy(self, meeting_id: str, meeting_data: dict) -> None: + entities_map = {e['name']: e for e in meeting_data.get('entities', []) if e.get('name')} + + for rel in meeting_data.get('relations', []): + rtype = rel.get('relation_type', '') + if rtype not in ('HAS_PROJECT', 'HAS_METRIC', 'PART_OF'): + continue + source = rel.get('source_entity_name', '') + target = rel.get('target_entity_name', '') + if not source or not target: + continue + + if rtype == 'HAS_PROJECT' or rtype == 'PART_OF': + self.run_query(''' + MATCH (s:Entity {name: $source}) + MATCH (t:Entity {name: $target}) + MERGE (s)-[r:HAS_PROJECT]->(t) + SET r.updated_at = datetime(), + r.meeting_id = $meeting_id + ''', source=source, target=target, meeting_id=meeting_id) + elif rtype == 'HAS_METRIC': + self.run_query(''' + MATCH (s:Entity {name: $source}) + MATCH (t:Entity {name: $target}) + MERGE (s)-[r:HAS_METRIC]->(t) + SET r.updated_at = datetime(), + r.meeting_id = $meeting_id + ''', source=source, target=target, meeting_id=meeting_id) + + departments = meeting_data.get('departments', []) + for dept in departments: + dept_name = dept.get('name', '').strip() + if not dept_name or dept_name not in entities_map: + continue + for proj_name in dept.get('projects', []): + if proj_name in entities_map: + self.run_query(''' + MATCH (s:Entity {name: $source}) + MATCH (t:Entity {name: $target}) + MERGE (s)-[r:HAS_PROJECT]->(t) + SET r.updated_at = datetime(), + r.meeting_id = $meeting_id + ''', source=dept_name, target=proj_name, meeting_id=meeting_id) + + def _upsert_metric_node(self, meeting_id: str, metric: dict, meeting_date: str) -> None: + name = metric.get('metric_name', '').strip() + if not name: + return + entity = { + 'name': name, + 'entity_type': 'Metric', + 'description': f"{metric.get('value', '')} ({metric.get('unit', '')})" if metric.get('unit') else metric.get('value', ''), + } + self._upsert_entity(meeting_id, entity) + + self.run_query(''' + MATCH (e:Entity {name: $name}) + SET e.current_value = $value, + e.target = $target, + e.trend = $trend, + e.unit = $unit, + e.owner = $owner, + e.updated_at = datetime() + ''', + name=name, + value=metric.get('value', ''), + target=metric.get('target', ''), + trend=metric.get('trend', ''), + unit=metric.get('unit', ''), + owner=metric.get('owner', ''), ) def remove_meeting_subgraph(self, meeting_id: str) -> None: if not self.enabled: return + # Phase 1: detach all entities mentioned by this episode self.run_query(''' - MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) + MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode) OPTIONAL MATCH (ep)-[mention:MENTIONS]->(entity:Entity) - OPTIONAL MATCH (ep)-[has_fact:HAS_FACT]->(fact:Fact) - OPTIONAL MATCH (fact)-[target_rel:FACT_TARGET]->(:Entity) - OPTIONAL MATCH (:Entity)-[source_rel:FACT_SOURCE]->(fact) - DELETE mention, has_fact, target_rel, source_rel - WITH m, ep, collect(DISTINCT fact) AS facts, collect(DISTINCT entity) AS entities - FOREACH (fact IN facts | DELETE fact) - DELETE ep, m - WITH entities - UNWIND entities AS entity - WITH DISTINCT entity WHERE entity IS NOT NULL - OPTIONAL MATCH (entity)<-[m1:MENTIONS]-(:Episode) - OPTIONAL MATCH (entity)-[m2:FACT_SOURCE|FACT_TARGET]-(:Fact) - WITH entity, count(m1) + count(m2) AS refs - WHERE refs = 0 - DELETE entity + OPTIONAL MATCH (entity)-[er]-() + DELETE mention, er + ''', meeting_id=meeting_id) + # Phase 2: delete orphan entities no longer mentioned by any episode + self.run_query(''' + MATCH (entity:Entity) + WHERE NOT (entity)<-[:MENTIONS]-(:Episode) + DETACH DELETE entity + ''') + # Phase 3: delete episode and meeting + self.run_query(''' + MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode) + DETACH DELETE ep, m ''', meeting_id=meeting_id) # ==================== Retrieval ==================== @@ -533,19 +635,23 @@ class Neo4jGraphStore: return [] return self.run_query(''' MATCH (n) - WHERE n:Meeting OR n:Episode OR n:Entity OR n:Fact - WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind + WHERE n:Meeting OR n:Episode OR n:Entity OR n:Department OR n:Project OR n:Metric + WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Department','Project','Metric']][0] AS kind RETURN kind, count(*) AS count ORDER BY count DESC ''') def get_entity_types(self) -> List[Dict[str, Any]]: if not self.enabled: return [] - return self.run_query(''' + rows = self.run_query(''' MATCH (e:Entity) WHERE coalesce(e.entity_type, '') <> '' RETURN e.entity_type AS entity_type, count(*) AS count ORDER BY count DESC ''') + return [ + {'entity_type': _canonical_entity_type(r.get('entity_type', '')), 'count': r.get('count', 0)} + for r in rows + ] def get_graph_snapshot( self, @@ -559,34 +665,39 @@ class Neo4jGraphStore: return {'nodes': [], 'edges': [], 'stats': {'enabled': False}} keyword_terms = _keyword_terms(query) if query else [] - raw_nodes = self.run_query(''' + raw_nodes = self.run_query(f''' MATCH (n) - WHERE (n:Meeting OR n:Episode OR n:Entity OR n:Fact) - AND ($kinds = [] OR [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] IN $kinds) + WHERE (n:Meeting OR n:Episode OR n:Entity) + AND ($kinds = [] OR any(lbl IN labels(n) WHERE lbl IN $kinds)) AND ($terms = [] OR (n:Meeting AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t)) OR (n:Episode AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.content,'')) CONTAINS t)) OR (n:Entity AND any(t IN $terms WHERE toLower(coalesce(n.name,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t OR toLower(coalesce(n.description,'')) CONTAINS t)) - OR (n:Fact AND any(t IN $terms WHERE toLower(coalesce(n.fact,'')) CONTAINS t OR toLower(coalesce(n.predicate,'')) CONTAINS t OR toLower(coalesce(n.description,'')) CONTAINS t)) ) AND ($types = [] OR NOT n:Entity OR coalesce(n.entity_type, '') IN $types) OPTIONAL MATCH (n)-[r]-() RETURN n.meeting_id AS meeting_id, n.episode_id AS episode_id, n.name AS entity_name, - n.fact_id AS fact_id, n.title AS title, n.summary AS summary, n.date AS date, n.entity_type AS entity_type, n.description AS description, - n.predicate AS predicate, - n.fact AS fact, - n.confidence AS confidence, n.meeting_date AS meeting_date, - [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind, + n.current_value AS current_value, + n.target AS target_value, + n.trend AS trend, + CASE + WHEN n:Meeting THEN 'Meeting' + WHEN n:Episode THEN 'Episode' + WHEN n:Department THEN 'Department' + WHEN n:Project THEN 'Project' + WHEN n:Metric THEN 'Metric' + ELSE 'Entity' + END AS kind, count(DISTINCT r) AS degree - ORDER BY degree DESC, coalesce(n.title, n.name, n.fact) ASC + ORDER BY degree DESC, coalesce(n.title, n.name) ASC LIMIT $limit_nodes ''', terms=keyword_terms, @@ -607,29 +718,30 @@ class Neo4jGraphStore: elif kind == 'Episode': raw_id = row.get('episode_id', '') label = row.get('title', '') or raw_id - elif kind == 'Entity': + elif kind in ('Entity', 'Department', 'Project', 'Metric'): raw_id = row.get('entity_name', '') label = raw_id - elif kind == 'Fact': - raw_id = row.get('fact_id', '') - label = row.get('predicate', '') or row.get('fact', '') or raw_id else: continue if not raw_id: continue nid = f'{kind}:{raw_id}' all_raw_ids.add(raw_id) - nodes.append({ + node = { 'id': nid, 'label': label, 'kind': kind, - 'entity_type': row.get('entity_type', '') if kind == 'Entity' else '', + 'entity_type': row.get('entity_type', '') if kind in ('Entity', 'Department', 'Project', 'Metric') else '', 'description': row.get('description', '') or row.get('summary', '') or '', 'date': row.get('date', '') or row.get('meeting_date', '') or '', 'degree': row.get('degree', 0), - 'fact': row.get('fact', '') if kind == 'Fact' else '', 'summary': row.get('summary', '') or '', - }) + } + if kind == 'Metric': + node['current_value'] = row.get('current_value', '') + node['target'] = row.get('target_value', '') + node['trend'] = row.get('trend', '') + nodes.append(node) if not nodes: return {'nodes': [], 'edges': [], 'stats': self.get_stats()} @@ -637,44 +749,39 @@ class Neo4jGraphStore: ids_list = list(all_raw_ids) edges_raw = self.run_query(''' MATCH (s)-[r]->(t) - WHERE type(r) IN ['HAS_EPISODE','MENTIONS','HAS_FACT','FACT_SOURCE','FACT_TARGET'] + WHERE type(r) IN ['HAS_EPISODE','MENTIONS','RELATES_TO','HAS_PROJECT','HAS_METRIC'] AND ( (s:Meeting AND s.meeting_id IN $ids) OR (s:Episode AND s.episode_id IN $ids) OR (s:Entity AND s.name IN $ids) - OR (s:Fact AND s.fact_id IN $ids) ) AND ( (t:Meeting AND t.meeting_id IN $ids) OR (t:Episode AND t.episode_id IN $ids) OR (t:Entity AND t.name IN $ids) - OR (t:Fact AND t.fact_id IN $ids) ) RETURN type(r) AS predicate, + r.name AS relation_name, + r.fact AS relation_fact, + r.confidence AS relation_confidence, + r.meeting_date AS relation_date, + r.meeting_id AS relation_meeting_id, CASE WHEN s:Meeting THEN s.meeting_id WHEN s:Episode THEN s.episode_id - WHEN s:Entity THEN s.name - WHEN s:Fact THEN s.fact_id END AS source_raw, + WHEN s:Entity THEN s.name END AS source_raw, CASE WHEN t:Meeting THEN t.meeting_id WHEN t:Episode THEN t.episode_id - WHEN t:Entity THEN t.name - WHEN t:Fact THEN t.fact_id END AS target_raw, + WHEN t:Entity THEN t.name END AS target_raw, CASE WHEN s:Meeting THEN 'Meeting' WHEN s:Episode THEN 'Episode' - WHEN s:Entity THEN 'Entity' WHEN s:Fact THEN 'Fact' END AS source_kind, + WHEN s:Department THEN 'Department' + WHEN s:Project THEN 'Project' + WHEN s:Metric THEN 'Metric' + WHEN s:Entity THEN 'Entity' END AS source_kind, CASE WHEN t:Meeting THEN 'Meeting' WHEN t:Episode THEN 'Episode' - WHEN t:Entity THEN 'Entity' WHEN t:Fact THEN 'Fact' END AS target_kind, - CASE WHEN s:Fact THEN coalesce(s.predicate, '') - WHEN t:Fact THEN coalesce(t.predicate, '') ELSE '' END AS fact_predicate, - CASE WHEN s:Fact THEN coalesce(s.fact, '') - WHEN t:Fact THEN coalesce(t.fact, '') ELSE '' END AS fact_text, - CASE WHEN s:Fact THEN coalesce(s.description, '') - WHEN t:Fact THEN coalesce(t.description, '') ELSE '' END AS fact_description, - CASE WHEN s:Fact THEN coalesce(s.confidence, 0.0) - WHEN t:Fact THEN coalesce(t.confidence, 0.0) ELSE 0.0 END AS fact_confidence, - CASE WHEN s:Fact THEN coalesce(s.meeting_date, '') - WHEN t:Fact THEN coalesce(t.meeting_date, '') ELSE '' END AS fact_date, - CASE WHEN s:Fact THEN coalesce(s.meeting_id, '') - WHEN t:Fact THEN coalesce(t.meeting_id, '') ELSE '' END AS fact_meeting_id + WHEN t:Department THEN 'Department' + WHEN t:Project THEN 'Project' + WHEN t:Metric THEN 'Metric' + WHEN t:Entity THEN 'Entity' END AS target_kind LIMIT $limit_edges ''', ids=list(all_raw_ids), limit_edges=limit_edges) @@ -700,11 +807,11 @@ class Neo4jGraphStore: 'source': f'{sk}:{row["source_raw"]}' if sk and row.get('source_raw') else '', 'target': f'{tk}:{row["target_raw"]}' if tk and row.get('target_raw') else '', 'predicate': row.get('predicate', ''), - 'fact': row.get('fact_text', '') or row.get('fact_description', '') or '', - 'description': row.get('fact_description', '') or '', - 'confidence': row.get('fact_confidence', 0.0), - 'date': row.get('fact_date', '') or '', - 'meeting_id': row.get('fact_meeting_id', '') or '', + 'relation_name': row.get('relation_name', ''), + 'fact': row.get('relation_fact', '') or '', + 'confidence': row.get('relation_confidence', 0.0), + 'date': row.get('relation_date', '') or '', + 'meeting_id': row.get('relation_meeting_id', '') or '', }) return {'nodes': nodes, 'edges': edges, 'stats': self.get_stats(), 'query': query} @@ -729,24 +836,15 @@ class Neo4jGraphStore: def _load_fact_candidates(self) -> List[Dict[str, Any]]: return self.run_query(''' - MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact) - OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f) - OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity) + MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity) + OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(s) + WITH s, r, t, collect(DISTINCT ep.date) AS dates, collect(DISTINCT ep.title) AS titles RETURN 'fact' AS kind, - coalesce(s.name + ' -[' + coalesce(f.predicate, '') + ']-> ' + o.name, f.fact) AS title, - coalesce( - f.description + CASE - WHEN size(coalesce(f.qualifiers, [])) > 0 - THEN ' | ' + reduce(acc = '', item IN f.qualifiers | - acc + CASE WHEN acc = '' THEN item ELSE '; ' + item END - ) - ELSE '' - END, - f.fact, '' - ) AS text, - ep.date AS date, - ep.title AS meeting_title, - f.fact_embedding AS embedding + s.name + ' -[' + r.name + ']-> ' + t.name AS title, + coalesce(r.fact, '') AS text, + head(dates) AS date, + head(titles) AS meeting_title, + r.fact_embedding AS embedding ''') def _load_entity_candidates(self) -> List[Dict[str, Any]]: @@ -774,24 +872,21 @@ class Neo4jGraphStore: @staticmethod def _entity_summary(entity: dict) -> str: - entity_type = entity.get('entity_type', '').strip() + entity_type = _canonical_entity_type(entity.get('entity_type', '').strip()) name = entity.get('name', '').strip() description = entity.get('description', '').strip() parts = [part for part in [entity_type, name, description] if part] return ' | '.join(parts) @staticmethod - def _fact_text(relation: dict) -> str: - subject = relation.get('subject', '').strip() - predicate = relation.get('predicate', '').strip() - obj = relation.get('object', '').strip() - description = relation.get('description', '').strip() - fact = relation.get('fact', '').strip() or f'{subject} {predicate} {obj}'.strip() + def _relation_text(relation: dict) -> str: + source = relation.get('source_entity_name', '').strip() + rtype = relation.get('relation_type', '').strip() + target = relation.get('target_entity_name', '').strip() + fact = relation.get('fact', '').strip() or f'{source} {rtype} {target}'.strip() qualifiers = relation.get('qualifiers', []) qualifier_text = '; '.join(item for item in qualifiers if item) parts = [fact] - if description: - parts.append(description) if qualifier_text: parts.append(qualifier_text) return '. '.join(parts) @@ -808,6 +903,7 @@ class Neo4jGraphStore: 'action_items': meeting_data.get('action_items', []), 'metrics': meeting_data.get('metrics', []), 'decisions': meeting_data.get('decisions', []), + 'departments': meeting_data.get('departments', []), 'original_text': meeting_data.get('_original_text', ''), } return json.dumps(payload, ensure_ascii=False) diff --git a/meeting_memory/meeting_processor.py b/meeting_memory/meeting_processor.py index 95a4371..5349907 100644 --- a/meeting_memory/meeting_processor.py +++ b/meeting_memory/meeting_processor.py @@ -125,7 +125,7 @@ class MeetingProcessor: # Step 3: 提取标题、日期、参与人等元信息 report(3, total_steps, '抽取会议元信息(标题、日期、参与者等)') - meta_info = self._extract_monolithic(text) + meta_info = self._extract_monolithic(text, stream=interactive) if not meta_info: logger.error('Failed to extract meeting metadata') return None @@ -136,9 +136,10 @@ class MeetingProcessor: # Step 4: 抽取实体节点(LLM 调用 1) report(4, total_steps, '第 1 步实体抽取:识别会议中提及的实体') + use_stream = interactive previous_episodes = self._get_previous_episodes_context(data_dict) extracted_entities = extract_entities_from_text( - text, previous_episodes=previous_episodes, stream=True + text, previous_episodes=previous_episodes, stream=use_stream ) logger.info('Extracted %d entities from meeting', len(extracted_entities)) if not extracted_entities: @@ -158,7 +159,7 @@ class MeetingProcessor: text, resolved_entities, reference_time=reference_time, previous_episodes=previous_episodes, - stream=True, + stream=use_stream, ) logger.info('Extracted %d facts from meeting', len(extracted_facts)) @@ -337,9 +338,9 @@ class MeetingProcessor: normalized = text.strip().replace('\r\n', '\n') return hashlib.sha256(normalized.encode('utf-8')).hexdigest() - def _extract_monolithic(self, text: str) -> Optional[MeetingExtraction]: + def _extract_monolithic(self, text: str, *, stream: bool = True) -> Optional[MeetingExtraction]: try: - return monolithic_extract(text, stream=True) + return monolithic_extract(text, stream=stream) except Exception as exc: logger.error('LLM extraction failed: %s', exc) return None diff --git a/meeting_memory/prompts/extract_edges.py b/meeting_memory/prompts/extract_edges.py index ae79bd6..113505a 100644 --- a/meeting_memory/prompts/extract_edges.py +++ b/meeting_memory/prompts/extract_edges.py @@ -47,11 +47,18 @@ def extract_facts(context: dict[str, Any]) -> list[dict]: }}] 4. relation_type 避免使用"关联""涉及"等空泛词,优先使用具体谓词: - 负责、汇报、目标值、当前值、低于、高于、要求、督导、推进、支撑、依赖、计划、完成、截止于、参与、隶属于、分管、协调、审批 + 负责、汇报、目标值、当前值、低于、高于、要求、督导、推进、支撑、依赖、计划、完成、截止于、参与、隶属于、分管、协调、审批 -5. fact 必须是一句完整的自然语言事实,保留所有具体信息(人名、数值、产品名、地点等)。 +5. 层次关系(结构隶属)使用以下固定 relation_type: + HAS_PROJECT: 部门管辖项目(Department -> Project) + HAS_METRIC: 项目拥有指标(Project -> Metric) + PART_OF: 实体属于某个上级实体 -6. 如果根据上下文可以判断事实的开始/结束时间,填入 valid_at / invalid_at。 +6. 同一对实体之间可能既有层次关系(HAS_PROJECT)也有事实关系(负责、汇报),需要分别抽取。 + +7. fact 必须是一句完整的自然语言事实,保留所有具体信息(人名、数值、产品名、地点等)。 + +8. 如果根据上下文可以判断事实的开始/结束时间,填入 valid_at / invalid_at。 """ return [ {'role': 'system', 'content': '你是一个专业的事实关系抽取专家。从会议记录中抽取实体间的结构化事实关系。'}, diff --git a/meeting_memory/prompts/extract_nodes.py b/meeting_memory/prompts/extract_nodes.py index c231508..e35f64d 100644 --- a/meeting_memory/prompts/extract_nodes.py +++ b/meeting_memory/prompts/extract_nodes.py @@ -3,7 +3,7 @@ from typing import Any SYSTEM_PROMPT = ( '你是会议纪要实体抽取专家。' - '从会议记录中抽取明确的实体节点,包括人物、组织、地点、项目、指标等。' + '从会议记录中抽取明确的实体节点,包括部门(Department)、项目(Project)、指标(Metric)、人物(Person)、系统(System)、文档(Document)等。' '不要抽取抽象概念、情感、时间日期或泛泛的名词。' ) @@ -45,7 +45,10 @@ def extract_entities(context: dict[str, Any]) -> list[dict]: 6. description 写一段对该实体的简要描述(20字以内)。 7. evidence 从原文中摘录提及该实体的关键短句。 -注意:实体类型建议使用 Person(人物)、Organization(组织)、Location(地点)、Project(项目)、Metric(指标)、System(系统)、Document(文档)等。 +注意:实体类型建议使用 Department(部门)、Project(项目)、Metric(指标)、Person(人物)、System(系统)、Document(文档)等。请确保: +- 部门(Department):会议中提到的具体部门名称,如"技术部"、"市场部"。 +- 项目(Project):部门负责的具体项目名称。 +- 指标(Metric):项目中提到的具体量化指标,如"响应时间"、"完成率"。 """ return [ {'role': 'system', 'content': SYSTEM_PROMPT}, diff --git a/meeting_memory/web_demo/static/app.js b/meeting_memory/web_demo/static/app.js index 3fb5c6f..a8f1fcb 100644 --- a/meeting_memory/web_demo/static/app.js +++ b/meeting_memory/web_demo/static/app.js @@ -85,7 +85,7 @@ function renderStats(graph = {}, state = {}) { { label: "Neo4j", value: graph.enabled ? "在线" : "离线", icon: "⬡", color: graph.enabled ? "#34c759" : "#b3261e" }, { label: "会议", value: graph.meetings ?? 0, icon: "📋", color: "#4a90d9" }, { label: "实体", value: graph.entities ?? 0, icon: "◆", color: "#53c2da" }, - { label: "关系", value: graph.facts ?? 0, icon: "↗", color: "#ff9500" }, + { label: "关系", value: graph.relations ?? 0, icon: "↗", color: "#ff9500" }, { label: "行动项", value: state.action_items_tracked ?? 0, icon: "☐", color: "#7f8bff" }, { label: "指标", value: state.metrics_tracked ?? 0, icon: "📊", color: "#af52de" }, ]; diff --git a/meeting_memory/web_demo/static/graph.js b/meeting_memory/web_demo/static/graph.js index 47e025c..4f037de 100644 --- a/meeting_memory/web_demo/static/graph.js +++ b/meeting_memory/web_demo/static/graph.js @@ -199,7 +199,6 @@ function renderGraph(payload) { text.setAttribute("y", r + 16); text.setAttribute("text-anchor", "middle"); text.setAttribute("font-size", "11"); - text.setAttribute("fill", "#22264d"); text.setAttribute("data-type", "node-label"); text.textContent = truncate(node.label, TRUNCATE_LENGTH); g.appendChild(text); @@ -390,21 +389,22 @@ function renderGraph(payload) { ${node.date ? `${h(node.date)}` : ""} 关系 ${h(related.length)} `; - } else if (kind === "fact") { - body = ` -
${h(node.fact || node.description || "暂无描述")}
-${h(node.description || "暂无描述")}
当前值:${h(node.current_value)}
` : ""} + ${node.target ? `目标值:${h(node.target)}
` : ""} + ${node.unit ? `单位:${h(node.unit)}
` : ""} + ${node.trend ? `趋势:${h(node.trend)}
` : ""} +