""" Migration script: v1 (flat Entity + Fact nodes) → v2 (composite labels + direct edges) Steps: 1. Add composite Neo4j labels to existing Entity nodes based on entity_type 2. Convert Fact nodes to RELATES_TO edges between Entity nodes 3. Verify data integrity """ import logging import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from meeting_memory.graph_store import graph_store, _canonical_entity_type, _EntityType logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger('migrate') def get_type_label_map() -> dict[str, str]: """Map canonical entity_type -> Neo4j label""" return { _EntityType.DEPARTMENT.value: 'Department', _EntityType.PROJECT.value: 'Project', _EntityType.METRIC.value: 'Metric', _EntityType.PERSON.value: 'Person', _EntityType.SYSTEM.value: 'System', _EntityType.DOCUMENT.value: 'Document', } def step1_add_composite_labels(): """Add composite labels (e.g., :Department) to existing Entity nodes.""" type_label_map = get_type_label_map() total = 0 for canonical_type, label in type_label_map.items(): rows = graph_store.run_query( 'MATCH (e:Entity) WHERE e.entity_type = $etype RETURN count(e) AS cnt', etype=canonical_type, ) count = rows[0]['cnt'] if rows else 0 if count == 0: logger.info(' No Entity with entity_type=%s to migrate', canonical_type) continue graph_store.run_query( f'MATCH (e:Entity) WHERE e.entity_type = $etype SET e:{label}', etype=canonical_type, ) logger.info(' Added :%s label to %d Entity nodes', label, count) total += count # Also handle aliases: Organization -> Department for alias in ('组织', 'Organization', '部门'): rows = graph_store.run_query( 'MATCH (e:Entity {entity_type: $etype}) RETURN count(e) AS cnt', etype=alias, ) count = rows[0]['cnt'] if rows else 0 if count == 0: continue graph_store.run_query( 'MATCH (e:Entity {entity_type: $etype}) SET e.entity_type = $canonical, e:Department', etype=alias, canonical=_EntityType.DEPARTMENT.value, ) logger.info(' Redirected %d entities from entity_type=%s -> Department', count, alias) total += count for alias in ('指标', 'kpi', 'KPI'): rows = graph_store.run_query( 'MATCH (e:Entity {entity_type: $etype}) RETURN count(e) AS cnt', etype=alias, ) count = rows[0]['cnt'] if rows else 0 if count == 0: continue graph_store.run_query( 'MATCH (e:Entity {entity_type: $etype}) SET e.entity_type = $canonical, e:Metric', etype=alias, canonical=_EntityType.METRIC.value, ) logger.info(' Redirected %d entities from entity_type=%s -> Metric', count, alias) total += count logger.info('Step 1 done: %d entities got composite labels', total) def step2_convert_facts_to_edges(): """Convert existing Fact nodes to RELATES_TO edges, then remove Fact nodes.""" facts = graph_store.run_query(''' MATCH (s:Entity)-[:FACT_SOURCE]->(f:Fact)-[:FACT_TARGET]->(t:Entity) RETURN s.name AS source, t.name AS target, f.predicate AS relation_type, f.fact AS fact, f.qualifiers AS qualifiers, f.evidence AS evidence, f.confidence AS confidence, f.valid_at AS valid_at, f.invalid_at AS invalid_at, f.meeting_id AS meeting_id, f.meeting_date AS meeting_date, f.fact_embedding AS fact_embedding ''') logger.info('Found %d Fact nodes to convert', len(facts)) converted = 0 for f in facts: source = f.get('source', '') target = f.get('target', '') rtype = f.get('relation_type', '') or '关联' if not source or not target: continue fact_embedding = f.get('fact_embedding') or [] graph_store.run_query(''' MATCH (s:Entity {name: $source}) MATCH (t:Entity {name: $target}) MERGE (s)-[r:RELATES_TO {name: $rtype}]->(t) SET r.fact = $fact, r.evidence = $evidence, r.qualifiers = $qualifiers, r.confidence = $confidence, r.valid_at = $valid_at, r.invalid_at = $invalid_at, r.meeting_id = $meeting_id, r.meeting_date = $meeting_date, r.updated_at = datetime() ''', source=source, target=target, rtype=rtype, fact=f.get('fact', ''), evidence=f.get('evidence', ''), qualifiers=f.get('qualifiers', []), confidence=f.get('confidence', 0.0), valid_at=f.get('valid_at', ''), invalid_at=f.get('invalid_at', ''), meeting_id=f.get('meeting_id', ''), meeting_date=f.get('meeting_date', ''), ) if fact_embedding: graph_store.run_query(''' MATCH (s:Entity {name: $source})-[r:RELATES_TO {name: $rtype}]->(t:Entity {name: $target}) SET r.fact_embedding = $embedding ''', source=source, target=target, rtype=rtype, embedding=fact_embedding) converted += 1 # Now remove Fact nodes and their incident edges graph_store.run_query(''' MATCH (f:Fact) OPTIONAL MATCH (f)-[r]-() DELETE r, f ''') logger.info('Step 2 done: converted %d facts to edges, removed Fact nodes', converted) def verify(): """Verify migration results.""" stats = graph_store.get_stats() logger.info('Final stats: %s', stats) types = graph_store.get_entity_types() logger.info('Entity types: %s', [(t['entity_type'], t['count']) for t in types]) kinds = graph_store.get_graph_kinds() logger.info('Graph kinds: %s', [(k['kind'], k['count']) for k in kinds]) # Count labeled entities for label in ('Department', 'Project', 'Metric', 'Person', 'System', 'Document'): rows = graph_store.run_query(f'MATCH (n:{label}) RETURN count(n) AS cnt') count = rows[0]['cnt'] if rows else 0 if count: logger.info(' :%s nodes: %d', label, count) edges = graph_store.run_query('MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS cnt') logger.info(' RELATES_TO edges: %d', edges[0]['cnt'] if edges else 0) if __name__ == '__main__': if not graph_store.enabled: logger.error('Neo4j is not available') sys.exit(1) logger.info('Starting v1→v2 migration...') step1_add_composite_labels() step2_convert_facts_to_edges() verify() logger.info('Migration complete')