meeting_memory/graph_store.py

251 lines
8.0 KiB
Python

import logging
from typing import Any, Dict, List
from config import config
logger = logging.getLogger(__name__)
class Neo4jGraphStore:
def __init__(self):
self._driver = None
self._enabled = False
self._connect()
def _connect(self):
if not config.neo4j.enabled:
logger.info("Neo4j graph store disabled")
return
try:
from neo4j import GraphDatabase
except ImportError:
logger.warning("neo4j package is not installed")
return
if not config.neo4j.password:
logger.warning("Neo4j is enabled but NEO4J_PASSWORD is empty")
return
self._driver = GraphDatabase.driver(
config.neo4j.uri,
auth=(config.neo4j.user, config.neo4j.password),
)
self._enabled = True
@property
def enabled(self) -> bool:
return self._enabled and self._driver is not None
def close(self):
if self._driver is not None:
self._driver.close()
def run_query(self, query: str, **params) -> List[Dict[str, Any]]:
if not self.enabled:
return []
with self._driver.session(database=config.neo4j.database) as session:
result = session.run(query, **params)
return [record.data() for record in result]
def initialize_schema(self):
if not self.enabled:
return
statements = [
"CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE",
"CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE",
"CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)",
"CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)",
]
for statement in statements:
self.run_query(statement)
def get_stats(self) -> Dict[str, Any]:
if not self.enabled:
return {"enabled": False}
rows = self.run_query(
"""
CALL {
MATCH (m:Meeting)
RETURN count(m) AS meetings
}
CALL {
MATCH (e:Entity)
RETURN count(e) AS entities
}
RETURN meetings, entities
"""
)
if not rows:
return {"enabled": True, "meetings": 0, "entities": 0}
return {"enabled": True, **rows[0]}
def upsert_meeting_subgraph(self, meeting_data: dict) -> None:
if not self.enabled:
return
meeting_id = meeting_data.get("_graph_meeting_id")
if not meeting_id:
return
self.initialize_schema()
self.run_query(
"""
MERGE (m:Meeting {meeting_id: $meeting_id})
SET m.title = $title,
m.date = $date,
m.summary = $summary,
m.content_hash = $content_hash,
m.updated_at = datetime()
""",
meeting_id=meeting_id,
title=meeting_data.get("title", ""),
date=meeting_data.get("date", ""),
summary=meeting_data.get("summary", ""),
content_hash=meeting_data.get("_content_hash", ""),
)
for entity in meeting_data.get("entities", []):
self._upsert_entity(meeting_id, entity)
for participant in meeting_data.get("participants", []):
self._upsert_entity(
meeting_id,
{"name": participant, "entity_type": "participant", "description": ""},
)
for relation in meeting_data.get("relations", []):
self._upsert_relation(meeting_id, relation, meeting_data.get("date", ""))
def _upsert_entity(self, meeting_id: str, entity: dict) -> None:
name = entity.get("name", "").strip()
if not name:
return
self.run_query(
"""
MATCH (m:Meeting {meeting_id: $meeting_id})
MERGE (e:Entity {name: $name})
SET e.entity_type = CASE
WHEN $entity_type <> '' THEN $entity_type
ELSE coalesce(e.entity_type, '')
END,
e.description = CASE
WHEN $description <> '' THEN $description
ELSE coalesce(e.description, '')
END,
e.updated_at = datetime()
MERGE (m)-[:MENTIONS]->(e)
""",
meeting_id=meeting_id,
name=name,
entity_type=entity.get("entity_type", ""),
description=entity.get("description", ""),
)
def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None:
subject = relation.get("subject", "").strip()
predicate = relation.get("predicate", "").strip()
obj = relation.get("object", "").strip()
if not subject or not predicate or not obj:
return
self._upsert_entity(
meeting_id,
{
"name": subject,
"entity_type": relation.get("subject_type", ""),
"description": "",
},
)
self._upsert_entity(
meeting_id,
{
"name": obj,
"entity_type": relation.get("object_type", ""),
"description": "",
},
)
self.run_query(
"""
MATCH (s:Entity {name: $subject})
MATCH (o:Entity {name: $object})
MERGE (s)-[r:RELATES_TO {
meeting_id: $meeting_id,
predicate: $predicate,
object_name: $object
}]->(o)
SET r.description = $description,
r.meeting_date = $meeting_date,
r.updated_at = datetime()
""",
meeting_id=meeting_id,
subject=subject,
predicate=predicate,
object=obj,
description=relation.get("description", ""),
meeting_date=meeting_date,
)
def remove_meeting_subgraph(self, meeting_id: str) -> None:
if not self.enabled:
return
self.run_query(
"""
MATCH (m:Meeting {meeting_id: $meeting_id})
OPTIONAL MATCH (m)-[mentions:MENTIONS]->(:Entity)
DELETE mentions
WITH m
OPTIONAL MATCH ()-[r:RELATES_TO {meeting_id: $meeting_id}]->()
DELETE r
WITH m
DELETE m
""",
meeting_id=meeting_id,
)
def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]:
if not self.enabled or not question.strip():
return []
rows = self.run_query(
"""
CALL {
MATCH (m:Meeting)
WHERE toLower(m.title) CONTAINS toLower($question)
OR toLower(coalesce(m.summary, '')) CONTAINS toLower($question)
RETURN 'meeting' AS kind,
m.title AS title,
coalesce(m.summary, '') AS text,
m.date AS date
UNION
MATCH (e:Entity)
WHERE toLower(e.name) CONTAINS toLower($question)
OR toLower(coalesce(e.description, '')) CONTAINS toLower($question)
OPTIONAL MATCH (e)-[r:RELATES_TO]-(other:Entity)
RETURN 'entity' AS kind,
e.name AS title,
coalesce(
head(collect(
e.name + ' -[' + coalesce(r.predicate, '') + ']-> ' + coalesce(other.name, '')
)),
coalesce(e.description, '')
) AS text,
'' AS date
}
RETURN kind, title, text, date
LIMIT $limit
""",
question=question,
limit=limit,
)
return rows
graph_store = Neo4jGraphStore()