meeting_memory/scripts/migrate_v1_to_v2.py

187 lines
6.8 KiB
Python

"""
Migration script: v1 (flat Entity + Fact nodes) → v2 (composite labels + direct edges)
Steps:
1. Add composite Neo4j labels to existing Entity nodes based on entity_type
2. Convert Fact nodes to RELATES_TO edges between Entity nodes
3. Verify data integrity
"""
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from meeting_memory.graph_store import graph_store, _canonical_entity_type, _EntityType
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('migrate')
def get_type_label_map() -> dict[str, str]:
"""Map canonical entity_type -> Neo4j label"""
return {
_EntityType.DEPARTMENT.value: 'Department',
_EntityType.PROJECT.value: 'Project',
_EntityType.METRIC.value: 'Metric',
_EntityType.PERSON.value: 'Person',
_EntityType.SYSTEM.value: 'System',
_EntityType.DOCUMENT.value: 'Document',
}
def step1_add_composite_labels():
"""Add composite labels (e.g., :Department) to existing Entity nodes."""
type_label_map = get_type_label_map()
total = 0
for canonical_type, label in type_label_map.items():
rows = graph_store.run_query(
'MATCH (e:Entity) WHERE e.entity_type = $etype RETURN count(e) AS cnt',
etype=canonical_type,
)
count = rows[0]['cnt'] if rows else 0
if count == 0:
logger.info(' No Entity with entity_type=%s to migrate', canonical_type)
continue
graph_store.run_query(
f'MATCH (e:Entity) WHERE e.entity_type = $etype SET e:{label}',
etype=canonical_type,
)
logger.info(' Added :%s label to %d Entity nodes', label, count)
total += count
# Also handle aliases: Organization -> Department
for alias in ('组织', 'Organization', '部门'):
rows = graph_store.run_query(
'MATCH (e:Entity {entity_type: $etype}) RETURN count(e) AS cnt',
etype=alias,
)
count = rows[0]['cnt'] if rows else 0
if count == 0:
continue
graph_store.run_query(
'MATCH (e:Entity {entity_type: $etype}) SET e.entity_type = $canonical, e:Department',
etype=alias, canonical=_EntityType.DEPARTMENT.value,
)
logger.info(' Redirected %d entities from entity_type=%s -> Department', count, alias)
total += count
for alias in ('指标', 'kpi', 'KPI'):
rows = graph_store.run_query(
'MATCH (e:Entity {entity_type: $etype}) RETURN count(e) AS cnt',
etype=alias,
)
count = rows[0]['cnt'] if rows else 0
if count == 0:
continue
graph_store.run_query(
'MATCH (e:Entity {entity_type: $etype}) SET e.entity_type = $canonical, e:Metric',
etype=alias, canonical=_EntityType.METRIC.value,
)
logger.info(' Redirected %d entities from entity_type=%s -> Metric', count, alias)
total += count
logger.info('Step 1 done: %d entities got composite labels', total)
def step2_convert_facts_to_edges():
"""Convert existing Fact nodes to RELATES_TO edges, then remove Fact nodes."""
facts = graph_store.run_query('''
MATCH (s:Entity)-[:FACT_SOURCE]->(f:Fact)-[:FACT_TARGET]->(t:Entity)
RETURN s.name AS source, t.name AS target,
f.predicate AS relation_type,
f.fact AS fact,
f.qualifiers AS qualifiers,
f.evidence AS evidence,
f.confidence AS confidence,
f.valid_at AS valid_at,
f.invalid_at AS invalid_at,
f.meeting_id AS meeting_id,
f.meeting_date AS meeting_date,
f.fact_embedding AS fact_embedding
''')
logger.info('Found %d Fact nodes to convert', len(facts))
converted = 0
for f in facts:
source = f.get('source', '')
target = f.get('target', '')
rtype = f.get('relation_type', '') or '关联'
if not source or not target:
continue
fact_embedding = f.get('fact_embedding') or []
graph_store.run_query('''
MATCH (s:Entity {name: $source})
MATCH (t:Entity {name: $target})
MERGE (s)-[r:RELATES_TO {name: $rtype}]->(t)
SET r.fact = $fact,
r.evidence = $evidence,
r.qualifiers = $qualifiers,
r.confidence = $confidence,
r.valid_at = $valid_at,
r.invalid_at = $invalid_at,
r.meeting_id = $meeting_id,
r.meeting_date = $meeting_date,
r.updated_at = datetime()
''',
source=source,
target=target,
rtype=rtype,
fact=f.get('fact', ''),
evidence=f.get('evidence', ''),
qualifiers=f.get('qualifiers', []),
confidence=f.get('confidence', 0.0),
valid_at=f.get('valid_at', ''),
invalid_at=f.get('invalid_at', ''),
meeting_id=f.get('meeting_id', ''),
meeting_date=f.get('meeting_date', ''),
)
if fact_embedding:
graph_store.run_query('''
MATCH (s:Entity {name: $source})-[r:RELATES_TO {name: $rtype}]->(t:Entity {name: $target})
SET r.fact_embedding = $embedding
''', source=source, target=target, rtype=rtype, embedding=fact_embedding)
converted += 1
# Now remove Fact nodes and their incident edges
graph_store.run_query('''
MATCH (f:Fact)
OPTIONAL MATCH (f)-[r]-()
DELETE r, f
''')
logger.info('Step 2 done: converted %d facts to edges, removed Fact nodes', converted)
def verify():
"""Verify migration results."""
stats = graph_store.get_stats()
logger.info('Final stats: %s', stats)
types = graph_store.get_entity_types()
logger.info('Entity types: %s', [(t['entity_type'], t['count']) for t in types])
kinds = graph_store.get_graph_kinds()
logger.info('Graph kinds: %s', [(k['kind'], k['count']) for k in kinds])
# Count labeled entities
for label in ('Department', 'Project', 'Metric', 'Person', 'System', 'Document'):
rows = graph_store.run_query(f'MATCH (n:{label}) RETURN count(n) AS cnt')
count = rows[0]['cnt'] if rows else 0
if count:
logger.info(' :%s nodes: %d', label, count)
edges = graph_store.run_query('MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS cnt')
logger.info(' RELATES_TO edges: %d', edges[0]['cnt'] if edges else 0)
if __name__ == '__main__':
if not graph_store.enabled:
logger.error('Neo4j is not available')
sys.exit(1)
logger.info('Starting v1→v2 migration...')
step1_add_composite_labels()
step2_convert_facts_to_edges()
verify()
logger.info('Migration complete')