${escapeHtml(item.label)}
+ ${escapeHtml(item.value)} +${escapeHtml(item.meta || "")}
+diff --git a/MIGRATION_TASKS.md b/MIGRATION_TASKS.md index 95d0ef7..02d4325 100644 --- a/MIGRATION_TASKS.md +++ b/MIGRATION_TASKS.md @@ -1,13 +1,38 @@ # Migration Tasks +## Goal + +Align this project toward `D:\github_project\graphiti` while keeping the meeting-processing flow usable and making the codebase easier to maintain. + +Target direction: + +- Neo4j is the only persistence layer for graph and retrieval data +- Retrieval is hybrid: semantic similarity + keyword/fact recall + graph relationship context +- Storage is more provenance-friendly, closer to `Meeting / Episode / Entity / Fact` +- Core implementation lives in package modules instead of the repository root + ## In Progress -- [ ] Rework deduplication and identity resolution for entities, action items, and metrics +- [ ] No active migration tasks ## Todo +- [ ] Clean up any stale data directories only after explicit user confirmation + ## Done +- [x] Step 1: Extract a shared embedding utility and stop coupling semantic retrieval to the old vector-store implementation +- [x] Step 2.1: Create a package structure and move shared foundations out of the repository root +- [x] Step 2.2: Move extraction, raw storage, and state tracking into package modules +- [x] Step 2.3: Move graph storage, processing, and CLI into package modules +- [x] Step 3: Redesign Neo4j schema from simple `Meeting -> Entity -> RELATES_TO` into `Meeting / Episode / Entity / Fact` +- [x] Step 4: Store semantic retrieval payload inside Neo4j instead of external vector storage +- [x] Step 5: Replace current query path with hybrid retrieval over Neo4j candidates +- [x] Step 6: Replace duplicate detection to use Neo4j-backed semantic matching and exact meeting lookup +- [x] Step 7: Remove runtime dependency on `llama-index` and `chroma` +- [x] Step 8: Update CLI stats output to reflect hybrid retrieval structures such as episodes and facts +- [x] Step 9: Update README and environment instructions to match the new architecture +- [x] Step 10: Run end-to-end verification on `process`, `query`, and `stats` with a real Neo4j environment - [x] Remove Obsidian from the project documentation and dependency surface - [x] Remove Obsidian from the runtime processing pipeline - [x] Move raw meeting archival to `data/raw` diff --git a/README.md b/README.md index 9c47849..bc61868 100644 --- a/README.md +++ b/README.md @@ -1,103 +1,75 @@ # 会议纪要长期记忆系统 -基于 `LLM + LlamaIndex + 本地状态存储` 的会议纪要长期记忆原型,当前聚焦三件事: +一个面向会议纪要的长期记忆原型,当前架构已经从“根目录脚本堆叠 + 外部向量库存储”迁移为更清晰的包结构,并收敛到: -- 会议内容结构化抽取 -- 跨会议行动项与指标状态追踪 -- 语义检索与重复内容过滤 +- `Neo4j` 作为唯一图存储与检索数据载体 +- `Embedding + 关键词 + 图事实` 的混合检索模式 +- 更接近 `graphiti` 的 `Meeting / Episode / Entity / Fact` 数据组织方式 -## 当前处理链路 +## 当前能力 + +- 会议文本结构化抽取 +- 原文归档到 `data/raw` +- 行动项和指标状态的跨会议合并 +- 基于内容哈希和语义相似度的重复检测 +- 基于 `Neo4j` 的图谱写入 +- 基于 `Neo4j` 的混合检索 + +## 处理流程 ```text meeting.md -> 内容哈希去重 - -> 语义相似去重 - -> LLM 结构化抽取 - -> 状态合并 - -> 原文归档到 data/raw - -> 写入向量索引 + -> Neo4j 语义相似去重 + -> LLM 抽取结构化信息 + -> 原文归档 + -> 行动项 / 指标状态合并 + -> 写入 Neo4j: + Meeting + Episode + Entity + Fact ``` -当前版本已经移除了 `Obsidian` 作为主流程依赖,后续会继续引入图数据库来承载关系层。 - -## 快速开始 - -```bash -cd meeting_memory - -python -m venv .venv -.venv\Scripts\pip install -r requirements.txt - -copy .env.example .env -# 然后补充 API 配置 - -.venv\Scripts\python main.py process meeting_example.md -``` - -## 用法 - -```bash -python main.py -python main.py process meeting_example.md -python main.py process meeting_example.md -f -python main.py query "弱光指标目标值是多少" -python main.py stats -python main.py text "今天会议讨论了..." -python main.py batch "meetings/*.md" -f -``` - -## 目录 +## 项目结构 ```text meeting_memory/ -├── config.py 配置 -├── extractor.py LLM 结构化抽取 -├── meeting_processor.py 主处理流程 -├── meeting_state.py 跨会议状态追踪 -├── raw_store.py 原文归档 -├── vector_store.py 向量索引与语义检索 -├── main.py CLI 入口 +├── meeting_memory/ +│ ├── __init__.py +│ ├── cli.py +│ ├── config.py +│ ├── extractor.py +│ ├── graph_store.py +│ ├── meeting_processor.py +│ ├── meeting_state.py +│ ├── raw_store.py +│ └── services/ +│ ├── __init__.py +│ └── embedding_service.py ├── data/ -│ ├── raw/ 原始会议文本归档 -│ └── meeting_state.json 状态持久化 -└── vector_store_data/ 向量索引持久化目录 +│ ├── raw/ +│ └── meeting_state.json +├── main.py +├── MIGRATION_TASKS.md +└── requirements.txt ``` -## 核心能力 +说明: -### 1. 结构化抽取 +- `meeting_memory/` 包目录中是当前真实实现 +- 根目录现在只保留 `main.py` 作为 CLI 入口,其他实现全部收拢到包目录 +- `vector_store.py` 已移除,检索能力已迁到 `Neo4j` 图结构中 -从会议文本中提取: +## 环境配置 -- 标题、日期、参会人 -- 实体 -- 关系 -- 行动项 -- 指标 -- 决策 -- 摘要 +复制环境变量模板: -### 2. 长期状态追踪 +```bash +copy .env.example .env +``` -- 行动项按 `task + assignee` 做稳定 ID -- 指标按 `metric_name + owner` 做稳定 ID -- 保留历史状态演化 -- 支持跨会议合并 - -### 3. 双重去重 - -- 内容哈希精确去重 -- 语义相似度去重 - -### 4. 语义检索 - -- 会议内容写入向量库 -- 支持自然语言查询 -- 索引持久化,重启自动加载 - -## 配置 - -编辑 `.env`: +填写配置: ```ini LLM_API_KEY=sk-xxx @@ -107,8 +79,52 @@ LLM_MODEL=deepseek-chat EMBEDDING_API_KEY=sk-xxx EMBEDDING_BASE_URL=https://api.openai.com/v1 EMBEDDING_MODEL=text-embedding-3-small + +NEO4J_ENABLED=true +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=your-password +NEO4J_DATABASE=neo4j ``` -## 迁移计划 +## 安装 -当前迁移进度见 [MIGRATION_TASKS.md](/d:/github_project/my_code/meeting_memory/MIGRATION_TASKS.md:1)。 +```bash +python -m venv .venv +.venv\Scripts\pip install -r requirements.txt +``` + +## 使用方式 + +```bash +python main.py +python main.py process meeting_example.md +python main.py process meeting_example.md -f +python main.py text "今天会议讨论了弱光指标和交付节奏" +python main.py query "弱光指标目标值是多少" +python main.py stats +python main.py batch "meetings/*.md" -f +``` + +## 检索设计 + +当前查询不再依赖独立向量库,而是基于 `Neo4j` 中的三类候选进行混合排序: + +- `Episode`:会议级文本上下文 +- `Entity`:实体摘要与描述 +- `Fact`:主体-关系-客体事实 + +排序信号包括: + +- 语义相似度 +- 关键词命中 +- 图事实加权 + +## 迁移说明 + +迁移任务记录见 [MIGRATION_TASKS.md](/d:/github_project/my_code/meeting_memory/MIGRATION_TASKS.md:1)。 + +## 当前限制 + +- 当前环境如果没有安装 `neo4j` Python 包,导入图存储模块时会退化为禁用状态 +- 由于本地运行环境限制,端到端验证仍然依赖可用的 Neo4j 实例和正确的凭据 diff --git a/data/meeting_state.json b/data/meeting_state.json new file mode 100644 index 0000000..1a9df0b --- /dev/null +++ b/data/meeting_state.json @@ -0,0 +1,497 @@ +{ + "action_items": { + "59f75356": { + "item_id": "59f75356", + "task": "针对关键业务上量指标缺乏保障措施问题,出具具体可行方案并明确责任人", + "assignee": "建维部", + "series": "合川分公司周例会", + "created_meeting": "meeting_ed164adc704f.md", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "进行中", + "priority": "高", + "deadline": "本周内" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "进行中", + "priority": "高", + "deadline": "本周内" + } + }, + "b16a65ce": { + "item_id": "b16a65ce", + "task": "完成招聘情况、农村渠道进度及营销方案汇报", + "assignee": "市场部", + "series": "合川分公司周例会", + "created_meeting": "meeting_ed164adc704f.md", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "本周内" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "本周内" + } + }, + "691d7a64": { + "item_id": "691d7a64", + "task": "视频汇报运动会筹备情况", + "assignee": "市场部", + "series": "合川分公司周例会", + "created_meeting": "meeting_ed164adc704f.md", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "本周六上午" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "本周六上午" + } + }, + "8d9685f0": { + "item_id": "8d9685f0", + "task": "商客经理每日发送微信日报", + "assignee": "商客经理", + "series": "合川分公司周例会", + "created_meeting": "meeting_ed164adc704f.md", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "每日" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "status": "待办", + "priority": "中", + "deadline": "每日" + } + }, + "723bdb36": { + "item_id": "723bdb36", + "task": "跟进学校IP限速机制建立", + "assignee": "宽带/客服部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "高", + "deadline": "" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "高", + "deadline": "" + } + }, + "c1ebcaaf": { + "item_id": "c1ebcaaf", + "task": "处理客服内部机房问题清单并与客户沟通", + "assignee": "宽带/客服部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "中", + "deadline": "" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "中", + "deadline": "" + } + }, + "9428cf05": { + "item_id": "9428cf05", + "task": "完成食堂改造审计及自饮机引入批复跟进", + "assignee": "综合部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "中", + "deadline": "" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "中", + "deadline": "" + } + }, + "e4df98ca": { + "item_id": "e4df98ca", + "task": "落实招待费公示及纪检报备", + "assignee": "综合部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "高", + "deadline": "" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "进行中", + "priority": "高", + "deadline": "" + } + }, + "e5e1449e": { + "item_id": "e5e1449e", + "task": "汇报招聘进度、农村渠道进度及营销方案", + "assignee": "市场部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "本周内" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "本周内" + } + }, + "eb342fed": { + "item_id": "eb342fed", + "task": "每日微信发送满意度日报", + "assignee": "市场部", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "每日" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "每日" + } + }, + "3db9820d": { + "item_id": "3db9820d", + "task": "确定体育文化节方阵补充人员名单", + "assignee": "各部门", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "今日" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "高", + "deadline": "今日" + } + }, + "684a31f4": { + "item_id": "684a31f4", + "task": "针对专线助账客指标拿出具体保障方案并回复", + "assignee": "各部门", + "series": "宽带运维、行政管理及市场业务推进会议", + "created_meeting": "meeting_5026dc1db2fe.md", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "中", + "deadline": "" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "status": "待办", + "priority": "中", + "deadline": "" + } + } + }, + "metrics": { + "a76a5616": { + "metric_id": "a76a5616", + "metric_name": "弱光指标", + "owner": "建维部", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "0.51", + "target": "", + "trend": "向好" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "0.51", + "target": "", + "trend": "向好" + } + }, + "d64cea03": { + "metric_id": "d64cea03", + "metric_name": "三代终端年度目标", + "owner": "建维部", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "", + "target": "5.5", + "trend": "需压降" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "", + "target": "5.5", + "trend": "需压降" + } + }, + "13144224": { + "metric_id": "13144224", + "metric_name": "九零工程月度转化率", + "owner": "建维部", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "87.35%", + "target": "90%", + "trend": "接近目标" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "87.35%", + "target": "90%", + "trend": "接近目标" + } + }, + "e056b315": { + "metric_id": "e056b315", + "metric_name": "退单率", + "owner": "建维部", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "6.53%", + "target": "", + "trend": "" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "6.53%", + "target": "", + "trend": "" + } + }, + "12e0764a": { + "metric_id": "12e0764a", + "metric_name": "商客市场2月收入", + "owner": "商客市场部", + "history": [ + { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "88.5万元", + "target": "", + "trend": "增长" + } + ], + "latest": { + "date": "2026-05-06", + "meeting": "meeting_ed164adc704f.md", + "value": "88.5万元", + "target": "", + "trend": "增长" + } + }, + "23942096": { + "metric_id": "23942096", + "metric_name": "FPTR", + "owner": "宽带/客服部", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "达标", + "target": "达标", + "trend": "稳定" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "达标", + "target": "达标", + "trend": "稳定" + } + }, + "671827ff": { + "metric_id": "671827ff", + "metric_name": "弱光指标", + "owner": "宽带/客服部", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "趋近目标", + "target": "预设目标值", + "trend": "改善" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "趋近目标", + "target": "预设目标值", + "trend": "改善" + } + }, + "5d622e23": { + "metric_id": "5d622e23", + "metric_name": "投诉率", + "owner": "各部门", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "", + "target": "KPI考核核心值", + "trend": "需管控" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "", + "target": "KPI考核核心值", + "trend": "需管控" + } + }, + "def6050a": { + "metric_id": "def6050a", + "metric_name": "工信部有责指标", + "owner": "各部门", + "history": [ + { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "", + "target": "KPI考核核心值", + "trend": "需管控" + } + ], + "latest": { + "date": "2026-05-06 13:37", + "meeting": "meeting_5026dc1db2fe.md", + "value": "", + "target": "KPI考核核心值", + "trend": "需管控" + } + } + }, + "meeting_series": { + "合川分公司周例会": { + "latest_date": "2026-05-06", + "processed_titles": [ + "合川分公司周例会(2026第X期)" + ] + }, + "宽带运维、行政管理及市场业务推进会议": { + "latest_date": "2026-05-06 13:37", + "processed_titles": [ + "宽带运维、行政管理及市场业务推进会议" + ] + } + }, + "content_hashes": { + "090ff5313a9e5c0dfd8d91c8f8aeb5246bd40a3ed92def6e498bd8254d71a9a4": { + "title": "合川分公司周例会(2026第X期)", + "date": "2026-05-06", + "filename": "meeting_ed164adc704f.md" + }, + "64078fdfd6dbe3c094ddad97b907bbcc0404df3de912488c020efb0e76fbe048": { + "title": "宽带运维、行政管理及市场业务推进会议", + "date": "2026-05-06 13:37", + "filename": "meeting_5026dc1db2fe.md" + } + } +} \ No newline at end of file diff --git a/data/raw/2026-05-06_1337_宽带运维、行政管理及市场业务推进会议.md b/data/raw/2026-05-06_1337_宽带运维、行政管理及市场业务推进会议.md new file mode 100644 index 0000000..2eef5f1 --- /dev/null +++ b/data/raw/2026-05-06_1337_宽带运维、行政管理及市场业务推进会议.md @@ -0,0 +1,35 @@ +--- +title: "宽带运维、行政管理及市场业务推进会议" +date: "2026-05-06 13:37" +status: archived +--- + +# 宽带运维、行政管理及市场业务推进会议 + +会议概述 +会议主要围绕宽带运维指标、综合行政与工会管理、市场政企业务推进、满意度提升及年度考核准备等议题展开,旨在总结阶段性工作进度,协调跨部门资源,明确后续重点任务与考核要求。 + +主要讨论点 +宽带运维与网络质量:上周上门量及安装进度受天气影响,弱光指标趋近目标,FPTR达标但主动过境偏后。PCDN专线在学校端持续恶化,已报市公司分析IP并拟限速。超频基站故障已恢复,专线巡检进度符合预期。客服培训后内部机房问题已梳理清单。 +综合行政与工会事务:2025年剩余两项工作按原计划推进。工会经费压减,食堂改造拟于5月结合更替修补进行,拟引入自饮机降本。第四届体育文化节筹备中,因主力选手受伤需各部门抽调人员补充方阵。主题教育简报已发,基层党组织学习已完成。 +区表彰、主题教育与招待费管理:区级担当作为表彰正在对接,建议争取参与以拉开竞争差距。招待费实行每年公开1次制度,综合部超预算需调整26年预算,其他部门严控成本。政企对外接待需统筹,严禁客户经理个人垫资。 +拆迁商客、满意度考核与KPI准备:拆迁以二次升套为主,社区与单位集中营销并行。满意度测评发现开卷考试形式导致拉分,相关经理思想松懈。KPI考核已明确工信部有责及投诉率两项核心指标,强调日清日结与执行力。 +决策事项 +二级基站拆除及下电服务费调整需在4月15日前全量完成。 +招待费实行每年公开1次制度,综合部超预算需调整26年预算,其他部门严控成本。 +满意度测评取消开卷考试形式,后续采用“后机评”模式,市场部需按四公司规定制定满意客户样板及不满客户处理流程。 +第四届体育文化节方阵缺编9人,需各部门协调抽调,确定名单后统一采购服装并安排下班后排练。 +年度考核指标已初步定调,各部门需提前与市公司沟通争取有利政策,避免起跑线落后。 +待办事项 +宽带/客服部:跟进学校IP限速机制建立;处理客服内部机房问题清单并与客户沟通。 +综合部:完成食堂改造审计及自饮机引入批复跟进;落实招待费公示及纪检报备。 +市场部:本周内汇报招聘进度、农村渠道进度及营销方案;每日微信发送满意度日报。 +各部门:今日确定体育文化节方阵补充人员名单;针对专线助账客指标拿出具体保障方案并回复。 +关键信息 +会议时间:2026-05-06 13:37 +核心考核导向:KPI考核聚焦工信部有责与投诉率,强调执行力与日清日结习惯。 +业务风险点:PCDN专线恶化、满意度测评因形式问题导致拉分、招待费预算超支。 +AI建议 +针对满意度考核风险,建议市场部立即复盘测评机制,避免形式主义拉低指标,并提前演练“后机评”应对策略。 +针对招待费超支及客户经理个人垫资问题,建议综合部建立统一审批与结算台账,明确费用归属与报销时效,规避财务与合规风险。 +针对KPI考核准备,建议建立市公司指标动态跟踪表,提前模拟考核场景,强化跨部门协同与数据预埋,确保年底考核不被动。 diff --git a/data/raw/2026-05-06_合川分公司周例会(2026第X期).md b/data/raw/2026-05-06_合川分公司周例会(2026第X期).md new file mode 100644 index 0000000..97c9387 --- /dev/null +++ b/data/raw/2026-05-06_合川分公司周例会(2026第X期).md @@ -0,0 +1,81 @@ +--- +title: "合川分公司周例会(2026第X期)" +date: "2026-05-06" +status: archived +--- + +# 合川分公司周例会(2026第X期) + +# 会议记录 + +议 题:合川分公司周例会(2026第X期) + +时 间:2026年5月6日 13:37—14:23 + +地 点:分公司会议室 + +主持人:AlanPaine + +参加人:分公司领导、各部门经理及相关人员 + +议程: + + 一、各部门汇报 + + 二、分公司领导指示部署 + +--- + +## 会议内容 + +### 一、各部门汇报 + +建维部、综合部、商客市场负责人按议程现场按顺序做汇报。建维部汇报宽带安装受天气影响进度偏后,弱光指标0.51持续向好,三代终端年度目标5.5需持续压降,九零工程月度转化率87.35%接近90%目标,退单率6.53%,PCDN专线学校出口问题正协调限速机制,二级基站拆除预计4月中旬完成;综合部通报建委相关工作清单及投资计划已汇报,打印设备已协调保障招投标需求,工会经费压减后严考严用,食堂改造及自饮机引入方案正在推进,第四届体育文化节方阵人员招募与排练已部署;商客市场2月收入88.5万元实现增长,三期项目二期拆迁完成1145户,社区与单位清洗服务5场落实签约量待提升。 + +--- + +### 二、部署强调 + +#### 建维部负责人强调: + +1. **网络运维与指标管控:** + - 弱光指标0.51持续向好,三代终端年度目标5.5需持续压降,FPTR已达标但主动过境0.3靠后。 + - 九零工程月度转化率87.35%接近90%目标,退单率6.53%,主要受用户原因及改约影响,已建议施工优化BtoC审核撤单流程。 + - PCDN专线因学校出口带宽问题持续恶化,正协调限速机制;超频基站故障已及时处理,专线巡检按计划推进。 + +2. **工作反馈与执行要求:** + - 强调养成“日清日结”习惯,工作回复必须量化、有结果、有措施,杜绝工作拖延数月未动。 + - 针对关键业务上量指标缺乏保障措施问题,要求本周内出具具体可行方案并明确责任人。 + +--- + +#### 综合部负责人强调: + +1. **工会经费与后勤保障:** + - 工会经费全面压减,需严考严用、以更少资金办更好实事。软性工程(更衣室、食堂改造)已确定上报市公司,拟引入自饮机解决饮水问题并节约成本。 + +2. **奖项申报策略:** + - 针对河川区“担当作为先进集体和先进个人”申报,评选条件多为定性要求,建议提前与区领导或分管领导沟通确认意向后再行申报,避免盲目提交浪费资源。 + +--- + +#### 市场部负责人强调: + +1. **季度收官与二季度谋划:** + - 市场部需提前谋划季度收官及二季度业务活动,打破淡季思维,全力推动商客、H业务及AI军团活动升温。 + - 本周内完成招聘情况、农村渠道进度及营销方案汇报;本周六上午视频汇报运动会筹备情况。 + +2. **满意度与考核管控:** + - 深刻反思满意度测评前期工作未做到位问题,要求主管亲自抓。明确满意度及投诉考核标准,不满客户需严格按5:30及5:35节点操作报警。 + - 要求商客经理每日微信发送日报,跟进考核细节,确保指标可控。 + +--- + +#### 分公司主要领导强调: + +1. **强化执行力与作风:** + - 各部门及一线人员必须摒弃“知道怎么做却不去做”的作风,做到事不做好不收兵。分管领导需加强政企部等部门督导力度,必要时亲自沟通。 + +2. **年度考核提前摸底:** + - 针对四公司年度考核及集团相关指标提升,要求各部门提前深入了解考核细则及可能产生重大影响的不利因素并及时上报,切忌定稿后被动。 + - 市公司会统筹考虑分公司整体情况,务必提前布局、赢在起跑线。 diff --git a/extractor.py b/extractor.py deleted file mode 100644 index d137af8..0000000 --- a/extractor.py +++ /dev/null @@ -1,157 +0,0 @@ -import json -import logging -import re -from typing import List, Optional -from pydantic import BaseModel - -from openai import OpenAI - -from config import config - -logger = logging.getLogger(__name__) - -client = OpenAI( - api_key=config.llm.api_key or None, - base_url=config.llm.base_url if config.llm.base_url else None, -) - - -class Entity(BaseModel): - name: str - entity_type: str - description: str = "" - - -class Relation(BaseModel): - subject: str - subject_type: str - predicate: str - object: str - object_type: str - description: str = "" - - -class ActionItem(BaseModel): - task: str - assignee: str = "" - deadline: str = "" - status: str = "待办" - priority: str = "中" - - -class Decision(BaseModel): - content: str - proposer: str = "" - status: str = "已决" - - -class MeetingMetric(BaseModel): - metric_name: str - value: str - target: str = "" - owner: str = "" - trend: str = "" - - -class MeetingExtraction(BaseModel): - title: str - date: str = "" - participants: List[str] = [] - agenda: List[str] = [] - entities: List[Entity] = [] - relations: List[Relation] = [] - action_items: List[ActionItem] = [] - decisions: List[Decision] = [] - metrics: List[MeetingMetric] = [] - summary: str = "" - - -EXTRACTION_SYSTEM_PROMPT = """ -你是一个专业的会议纪要信息抽取专家。你的任务是从中文会议记录中抽取结构化信息,并严格按照要求的JSON格式返回。 - -## 抽取内容 - -### 1. 实体 -- 人物:参会人员、提及的人员 -- 组织/部门:公司、部门、团队 -- 项目/任务:正在进行的项目、任务 -- 指标/KPI:关键绩效指标(如转化率、退单率等) -- 概念/制度:管理概念、制度要求 -- 地点:会议地点、项目地点 - -### 2. 关系 (主体-关系谓词-客体) -抽取事实性关系,例如: -- {"subject": "建维部", "subject_type": "组织", "predicate": "负责", "object": "网络运维", "object_type": "任务", "description": ""} -- {"subject": "弱光指标", "subject_type": "指标", "predicate": "目标值", "object": "0.5以下", "object_type": "数值", "description": ""} - -### 3. 行动项 -谁负责什么任务,截止时间,优先级 - -### 4. 决策 -做出的决定和结论 - -### 5. 指标数据 -具体的数字指标:当前值、目标值、负责人、趋势(向好/持平/恶化) - -## 规则 -- 只提取事实性信息 -- 过滤比喻、假设、主观评价 -- 数字指标要精确提取 -- entities、relations、action_items、decisions、metrics 如果没有则返回空数组 -""" - - -def _call_llm(system: str, user: str) -> str: - response = client.chat.completions.create( - model=config.llm.model, - messages=[ - {"role": "system", "content": system}, - {"role": "user", "content": user}, - ], - max_tokens=config.llm.max_tokens, - temperature=config.llm.temperature, - ) - content = response.choices[0].message.content - if content is None: - raise ValueError("LLM returned empty response") - return content - - -def extract_meeting_info(text: str) -> MeetingExtraction: - user_prompt = f""" -从以下会议记录中抽取结构化信息。 - -JSON字段说明: -- title: 会议标题 -- date: 会议日期 -- participants: 参会人列表 -- agenda: 议程列表 -- entities: 实体列表,每个实体包含 name(名称), entity_type(类型), description(描述) -- relations: 关系列表,每个关系包含 subject(主体), subject_type(主体类型), predicate(关系谓词), object(客体), object_type(客体类型), description(描述) -- action_items: 行动项列表,每条包含 task(任务), assignee(负责人), deadline(截止时间), status(状态), priority(优先级) -- decisions: 决策列表,每条包含 content(决策内容), proposer(提出人), status(状态) -- metrics: 指标列表,每条包含 metric_name(指标名), value(当前值), target(目标值), owner(负责人), trend(趋势) -- summary: 会议摘要 - -请直接返回JSON对象。不要包含任何额外说明文字。 - -会议记录: -{text} -""" - content = _call_llm(EXTRACTION_SYSTEM_PROMPT, user_prompt) - data = _try_parse_json(content) - return MeetingExtraction(**data) - - -def _try_parse_json(content: str) -> dict: - try: - return json.loads(content) - except json.JSONDecodeError: - logger.warning("JSON解析失败,尝试修复...") - match = re.search(r'\{.*\}', content, re.DOTALL) - if match: - try: - return json.loads(match.group()) - except json.JSONDecodeError as e: - logger.error(f"修复后的JSON仍无法解析: {e}") - raise \ No newline at end of file diff --git a/graph_store.py b/graph_store.py deleted file mode 100644 index 5712dbe..0000000 --- a/graph_store.py +++ /dev/null @@ -1,250 +0,0 @@ -import logging -from typing import Any, Dict, List - -from config import config - -logger = logging.getLogger(__name__) - - -class Neo4jGraphStore: - def __init__(self): - self._driver = None - self._enabled = False - self._connect() - - def _connect(self): - if not config.neo4j.enabled: - logger.info("Neo4j graph store disabled") - return - - try: - from neo4j import GraphDatabase - except ImportError: - logger.warning("neo4j package is not installed") - return - - if not config.neo4j.password: - logger.warning("Neo4j is enabled but NEO4J_PASSWORD is empty") - return - - self._driver = GraphDatabase.driver( - config.neo4j.uri, - auth=(config.neo4j.user, config.neo4j.password), - ) - self._enabled = True - - @property - def enabled(self) -> bool: - return self._enabled and self._driver is not None - - def close(self): - if self._driver is not None: - self._driver.close() - - def run_query(self, query: str, **params) -> List[Dict[str, Any]]: - if not self.enabled: - return [] - - with self._driver.session(database=config.neo4j.database) as session: - result = session.run(query, **params) - return [record.data() for record in result] - - def initialize_schema(self): - if not self.enabled: - return - - statements = [ - "CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE", - "CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE", - "CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)", - "CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)", - ] - for statement in statements: - self.run_query(statement) - - def get_stats(self) -> Dict[str, Any]: - if not self.enabled: - return {"enabled": False} - - rows = self.run_query( - """ - CALL { - MATCH (m:Meeting) - RETURN count(m) AS meetings - } - CALL { - MATCH (e:Entity) - RETURN count(e) AS entities - } - RETURN meetings, entities - """ - ) - if not rows: - return {"enabled": True, "meetings": 0, "entities": 0} - return {"enabled": True, **rows[0]} - - def upsert_meeting_subgraph(self, meeting_data: dict) -> None: - if not self.enabled: - return - - meeting_id = meeting_data.get("_graph_meeting_id") - if not meeting_id: - return - - self.initialize_schema() - self.run_query( - """ - MERGE (m:Meeting {meeting_id: $meeting_id}) - SET m.title = $title, - m.date = $date, - m.summary = $summary, - m.content_hash = $content_hash, - m.updated_at = datetime() - """, - meeting_id=meeting_id, - title=meeting_data.get("title", ""), - date=meeting_data.get("date", ""), - summary=meeting_data.get("summary", ""), - content_hash=meeting_data.get("_content_hash", ""), - ) - - for entity in meeting_data.get("entities", []): - self._upsert_entity(meeting_id, entity) - - for participant in meeting_data.get("participants", []): - self._upsert_entity( - meeting_id, - {"name": participant, "entity_type": "participant", "description": ""}, - ) - - for relation in meeting_data.get("relations", []): - self._upsert_relation(meeting_id, relation, meeting_data.get("date", "")) - - def _upsert_entity(self, meeting_id: str, entity: dict) -> None: - name = entity.get("name", "").strip() - if not name: - return - - self.run_query( - """ - MATCH (m:Meeting {meeting_id: $meeting_id}) - MERGE (e:Entity {name: $name}) - SET e.entity_type = CASE - WHEN $entity_type <> '' THEN $entity_type - ELSE coalesce(e.entity_type, '') - END, - e.description = CASE - WHEN $description <> '' THEN $description - ELSE coalesce(e.description, '') - END, - e.updated_at = datetime() - MERGE (m)-[:MENTIONS]->(e) - """, - meeting_id=meeting_id, - name=name, - entity_type=entity.get("entity_type", ""), - description=entity.get("description", ""), - ) - - def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None: - subject = relation.get("subject", "").strip() - predicate = relation.get("predicate", "").strip() - obj = relation.get("object", "").strip() - if not subject or not predicate or not obj: - return - - self._upsert_entity( - meeting_id, - { - "name": subject, - "entity_type": relation.get("subject_type", ""), - "description": "", - }, - ) - self._upsert_entity( - meeting_id, - { - "name": obj, - "entity_type": relation.get("object_type", ""), - "description": "", - }, - ) - - self.run_query( - """ - MATCH (s:Entity {name: $subject}) - MATCH (o:Entity {name: $object}) - MERGE (s)-[r:RELATES_TO { - meeting_id: $meeting_id, - predicate: $predicate, - object_name: $object - }]->(o) - SET r.description = $description, - r.meeting_date = $meeting_date, - r.updated_at = datetime() - """, - meeting_id=meeting_id, - subject=subject, - predicate=predicate, - object=obj, - description=relation.get("description", ""), - meeting_date=meeting_date, - ) - - def remove_meeting_subgraph(self, meeting_id: str) -> None: - if not self.enabled: - return - - self.run_query( - """ - MATCH (m:Meeting {meeting_id: $meeting_id}) - OPTIONAL MATCH (m)-[mentions:MENTIONS]->(:Entity) - DELETE mentions - WITH m - OPTIONAL MATCH ()-[r:RELATES_TO {meeting_id: $meeting_id}]->() - DELETE r - WITH m - DELETE m - """, - meeting_id=meeting_id, - ) - - def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]: - if not self.enabled or not question.strip(): - return [] - - rows = self.run_query( - """ - CALL { - MATCH (m:Meeting) - WHERE toLower(m.title) CONTAINS toLower($question) - OR toLower(coalesce(m.summary, '')) CONTAINS toLower($question) - RETURN 'meeting' AS kind, - m.title AS title, - coalesce(m.summary, '') AS text, - m.date AS date - UNION - MATCH (e:Entity) - WHERE toLower(e.name) CONTAINS toLower($question) - OR toLower(coalesce(e.description, '')) CONTAINS toLower($question) - OPTIONAL MATCH (e)-[r:RELATES_TO]-(other:Entity) - RETURN 'entity' AS kind, - e.name AS title, - coalesce( - head(collect( - e.name + ' -[' + coalesce(r.predicate, '') + ']-> ' + coalesce(other.name, '') - )), - coalesce(e.description, '') - ) AS text, - '' AS date - } - RETURN kind, title, text, date - LIMIT $limit - """, - question=question, - limit=limit, - ) - return rows - - -graph_store = Neo4jGraphStore() diff --git a/main.py b/main.py index 3bd184a..c80f60a 100644 --- a/main.py +++ b/main.py @@ -1,184 +1,4 @@ -import argparse -import glob as glob_module -import logging -import os -import sys - -if sys.stdout.encoding and sys.stdout.encoding.lower() == "gbk": - sys.stdout.reconfigure(encoding="utf-8") - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", - datefmt="%H:%M:%S", -) -logger = logging.getLogger(__name__) - - -def cmd_process(args): - from meeting_processor import meeting_processor - - filepath = args.file - if not os.path.exists(filepath): - print(f"错误:文件不存在 {filepath}") - sys.exit(1) - - print(f"正在处理会议文件:{filepath}") - archive_path = meeting_processor.process_meeting_file(filepath, force=getattr(args, "force", False)) - - if archive_path: - print("\n处理完成") - print(f"原文归档:{archive_path}") - else: - print("\n处理失败或已跳过") - sys.exit(1) - - -def cmd_text(args): - from meeting_processor import meeting_processor - - print("正在处理会议文本...") - archive_path = meeting_processor.process_meeting_text(args.text, force=getattr(args, "force", False)) - if archive_path: - print("\n处理完成") - print(f"原文归档:{archive_path}") - else: - print("\n处理失败或已跳过") - - -def cmd_query(args): - from meeting_processor import meeting_processor - - print(f"查询:{args.question}") - print("-" * 40) - result = meeting_processor.query(args.question, top_k=args.top_k) - print(result if result else "未找到相关信息") - - -def cmd_stats(args): - from meeting_processor import meeting_processor - - stats = meeting_processor.stats() - print("会议记忆系统统计") - print("-" * 40) - print(f"向量节点数:{stats.get('vector_index', {}).get('node_count', 0)}") - print(f"Neo4j 启用:{stats.get('graph', {}).get('enabled', False)}") - print(f"图谱会议数:{stats.get('graph', {}).get('meetings', 0)}") - print(f"图谱实体数:{stats.get('graph', {}).get('entities', 0)}") - print(f"行动项数:{stats.get('state', {}).get('action_items_tracked', 0)}") - print(f"指标数:{stats.get('state', {}).get('metrics_tracked', 0)}") - print(f"会议系列数:{stats.get('state', {}).get('meeting_series', 0)}") - print(f"原文归档目录:{stats.get('raw_dir', '')}") - print(f"状态文件:{stats.get('state_path', '')}") - - -def cmd_batch(args): - from meeting_processor import meeting_processor - - files = glob_module.glob(args.pattern, recursive=True) - if not files: - print(f"未匹配到任何文件:{args.pattern}") - sys.exit(1) - - print(f"找到 {len(files)} 个文件,开始批量处理...") - success = 0 - for path in files: - try: - print(f"\n处理:{path}") - result = meeting_processor.process_meeting_file(path, force=getattr(args, "force", False)) - if result: - success += 1 - except Exception as exc: - logger.error("处理失败: %s - %s", path, exc) - - print(f"\n批量处理完成:{success}/{len(files)} 成功") - - -def cmd_interactive(): - from meeting_processor import meeting_processor - - print("会议纪要长期记忆系统") - print("=" * 50) - print("可用命令:") - print(" query <问题> 语义查询会议记忆") - print(" process <路径> 处理会议文件") - print(" stats 查看统计") - print(" help 显示帮助") - print(" exit/quit 退出") - print("=" * 50) - - while True: - try: - line = input("\n> ").strip() - except (EOFError, KeyboardInterrupt): - print() - break - - if not line: - continue - if line in ("exit", "quit", "q"): - break - if line == "help": - print(" query <问题>") - print(" process <路径>") - print(" stats") - print(" exit/quit") - continue - if line == "stats": - cmd_stats(None) - continue - if line.startswith("process "): - filepath = line[8:].strip() - if not os.path.exists(filepath): - print(f"文件不存在:{filepath}") - continue - result = meeting_processor.process_meeting_file(filepath) - print(f"完成:{result}" if result else "处理失败或已跳过") - continue - - question = line[6:].strip() if line.startswith("query ") else line - result = meeting_processor.query(question, top_k=3) - print(result if result else "未找到相关信息") - - print("bye!") - - -def main(): - parser = argparse.ArgumentParser(description="会议纪要长期记忆系统") - subparsers = parser.add_subparsers(dest="command") - - p_process = subparsers.add_parser("process", help="处理会议 markdown 文件") - p_process.add_argument("file", help="会议文件路径") - p_process.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") - - p_text = subparsers.add_parser("text", help="直接处理一段会议文本") - p_text.add_argument("text", help="会议文本内容") - p_text.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") - - p_query = subparsers.add_parser("query", help="语义查询会议记忆") - p_query.add_argument("question", help="查询问题") - p_query.add_argument("--top-k", type=int, default=3, help="返回结果数量") - - subparsers.add_parser("stats", help="查看统计") - - p_batch = subparsers.add_parser("batch", help="批量处理会议文件") - p_batch.add_argument("pattern", help="glob 模式,如 meetings/*.md") - p_batch.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") - - args = parser.parse_args() - - if args.command == "process": - cmd_process(args) - elif args.command == "text": - cmd_text(args) - elif args.command == "query": - cmd_query(args) - elif args.command == "stats": - cmd_stats(args) - elif args.command == "batch": - cmd_batch(args) - else: - cmd_interactive() +from meeting_memory.cli import main if __name__ == "__main__": diff --git a/meeting_memory/__init__.py b/meeting_memory/__init__.py new file mode 100644 index 0000000..9a1087c --- /dev/null +++ b/meeting_memory/__init__.py @@ -0,0 +1,3 @@ +from meeting_memory.config import config + +__all__ = ["config"] diff --git a/meeting_memory/cli.py b/meeting_memory/cli.py new file mode 100644 index 0000000..7f51ea5 --- /dev/null +++ b/meeting_memory/cli.py @@ -0,0 +1,202 @@ +import argparse +import glob as glob_module +import logging +import os +import sys + +from meeting_memory.meeting_processor import meeting_processor +from meeting_memory.web_demo import run_demo_server + +if sys.stdout.encoding and sys.stdout.encoding.lower() == "gbk": + sys.stdout.reconfigure(encoding="utf-8") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + + +def cmd_process(args): + filepath = args.file + if not os.path.exists(filepath): + print(f"错误:文件不存在 {filepath}") + sys.exit(1) + + print(f"正在处理会议文件:{filepath}") + archive_path = meeting_processor.process_meeting_file( + filepath, + force=getattr(args, "force", False), + ) + + if archive_path: + print("\n处理完成") + print(f"原文归档:{archive_path}") + else: + print("\n处理失败或已跳过") + sys.exit(1) + + +def cmd_text(args): + print("正在处理会议文本...") + archive_path = meeting_processor.process_meeting_text( + args.text, + force=getattr(args, "force", False), + ) + if archive_path: + print("\n处理完成") + print(f"原文归档:{archive_path}") + else: + print("\n处理失败或已跳过") + + +def cmd_query(args): + print(f"查询:{args.question}") + print("-" * 40) + result = meeting_processor.query(args.question, top_k=args.top_k) + print(result if result else "未找到相关信息") + + +def cmd_stats(_args): + stats = meeting_processor.stats() + graph = stats.get("graph", {}) + state = stats.get("state", {}) + + print("会议纪要长期记忆系统统计") + print("-" * 40) + print(f"Neo4j 启用:{graph.get('enabled', False)}") + print(f"图谱会议数:{graph.get('meetings', 0)}") + print(f"图谱 Episode 数:{graph.get('episodes', 0)}") + print(f"图谱实体数:{graph.get('entities', 0)}") + print(f"图谱 Fact 数:{graph.get('facts', 0)}") + print(f"行动项数:{state.get('action_items_tracked', 0)}") + print(f"指标数:{state.get('metrics_tracked', 0)}") + print(f"会议系列数:{state.get('meeting_series', 0)}") + print(f"原文归档目录:{stats.get('raw_dir', '')}") + print(f"状态文件:{stats.get('state_path', '')}") + + +def cmd_batch(args): + files = glob_module.glob(args.pattern, recursive=True) + if not files: + print(f"未匹配到任何文件:{args.pattern}") + sys.exit(1) + + print(f"找到 {len(files)} 个文件,开始批量处理...") + success = 0 + for path in files: + try: + print(f"\n处理:{path}") + result = meeting_processor.process_meeting_file( + path, + force=getattr(args, "force", False), + ) + if result: + success += 1 + except Exception as exc: + logger.error("处理失败: %s - %s", path, exc) + + print(f"\n批量处理完成:{success}/{len(files)} 成功") + + +def cmd_web(args): + run_demo_server( + host=getattr(args, "host", "127.0.0.1"), + port=getattr(args, "port", 8765), + ) + + +def cmd_interactive(): + print("会议纪要长期记忆系统") + print("=" * 50) + print("可用命令:") + print(" query <问题> 查询会议记忆") + print(" process <路径> 处理会议文件") + print(" stats 查看统计") + print(" help 显示帮助") + print(" exit/quit 退出") + print("=" * 50) + + while True: + try: + line = input("\n> ").strip() + except (EOFError, KeyboardInterrupt): + print() + break + + if not line: + continue + if line in ("exit", "quit", "q"): + break + if line == "help": + print(" query <问题>") + print(" process <路径>") + print(" stats") + print(" exit/quit") + continue + if line == "stats": + cmd_stats(None) + continue + if line.startswith("process "): + filepath = line[8:].strip() + if not os.path.exists(filepath): + print(f"文件不存在:{filepath}") + continue + result = meeting_processor.process_meeting_file(filepath) + print(f"完成:{result}" if result else "处理失败或已跳过") + continue + + question = line[6:].strip() if line.startswith("query ") else line + result = meeting_processor.query(question, top_k=3) + print(result if result else "未找到相关信息") + + print("bye!") + + +def main(): + parser = argparse.ArgumentParser(description="会议纪要长期记忆系统") + subparsers = parser.add_subparsers(dest="command") + + p_process = subparsers.add_parser("process", help="处理会议 markdown 文件") + p_process.add_argument("file", help="会议文件路径") + p_process.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") + + p_text = subparsers.add_parser("text", help="直接处理一段会议文本") + p_text.add_argument("text", help="会议文本内容") + p_text.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") + + p_query = subparsers.add_parser("query", help="查询会议记忆") + p_query.add_argument("question", help="查询问题") + p_query.add_argument("--top-k", type=int, default=3, help="返回结果数量") + + subparsers.add_parser("stats", help="查看统计") + + p_batch = subparsers.add_parser("batch", help="批量处理会议文件") + p_batch.add_argument("pattern", help="glob 模式,如 meetings/*.md") + p_batch.add_argument("-f", "--force", action="store_true", help="发现重复时自动覆盖") + + p_web = subparsers.add_parser("web", help="启动 Web 界面") + p_web.add_argument("--host", default="127.0.0.1", help="绑定地址") + p_web.add_argument("--port", type=int, default=8765, help="服务端口") + + args = parser.parse_args() + + if args.command == "process": + cmd_process(args) + elif args.command == "text": + cmd_text(args) + elif args.command == "query": + cmd_query(args) + elif args.command == "stats": + cmd_stats(args) + elif args.command == "batch": + cmd_batch(args) + elif args.command == "web": + cmd_web(args) + else: + cmd_interactive() + + +if __name__ == "__main__": + main() diff --git a/config.py b/meeting_memory/config.py similarity index 86% rename from config.py rename to meeting_memory/config.py index d430240..8bd9abe 100644 --- a/config.py +++ b/meeting_memory/config.py @@ -5,7 +5,8 @@ from pydantic import BaseModel, Field load_dotenv() -PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) +PACKAGE_ROOT = os.path.dirname(os.path.abspath(__file__)) +PROJECT_ROOT = os.path.dirname(PACKAGE_ROOT) class LLMConfig(BaseModel): @@ -26,11 +27,6 @@ class StorageConfig(BaseModel): data_dir: str = Field(default=os.path.join(PROJECT_ROOT, "data")) raw_dir: str = Field(default=os.path.join(PROJECT_ROOT, "data", "raw")) - -class VectorStoreConfig(BaseModel): - persist_dir: str = Field(default=os.path.join(PROJECT_ROOT, "vector_store_data")) - - class Neo4jConfig(BaseModel): enabled: bool = Field(default=os.getenv("NEO4J_ENABLED", "false").lower() == "true") uri: str = Field(default=os.getenv("NEO4J_URI", "bolt://localhost:7687")) @@ -43,7 +39,6 @@ class ProjectConfig(BaseModel): llm: LLMConfig = Field(default_factory=LLMConfig) embedding: EmbeddingConfig = Field(default_factory=EmbeddingConfig) storage: StorageConfig = Field(default_factory=StorageConfig) - vector_store: VectorStoreConfig = Field(default_factory=VectorStoreConfig) neo4j: Neo4jConfig = Field(default_factory=Neo4jConfig) state_path: str = Field(default=os.path.join(PROJECT_ROOT, "data", "meeting_state.json")) diff --git a/meeting_memory/extractor.py b/meeting_memory/extractor.py new file mode 100644 index 0000000..4ff4671 --- /dev/null +++ b/meeting_memory/extractor.py @@ -0,0 +1,364 @@ +import json +import logging +import re +import sys +from typing import List, Optional + +from openai import OpenAI +from pydantic import BaseModel, Field + +from meeting_memory.config import config + +logger = logging.getLogger(__name__) + +client = OpenAI( + api_key=config.llm.api_key or None, + base_url=config.llm.base_url if config.llm.base_url else None, +) + + +class Entity(BaseModel): + name: str + entity_type: str + description: str = "" + + +class Relation(BaseModel): + subject: str + subject_type: str + predicate: str + object: str + object_type: str + description: str = "" + fact: str = "" + qualifiers: List[str] = Field(default_factory=list) + evidence: str = "" + confidence: float = 0.0 + valid_at: str = "" + invalid_at: str = "" + + +class ActionItem(BaseModel): + task: str + assignee: str = "" + deadline: str = "" + status: str = "待办" + priority: str = "中" + + +class Decision(BaseModel): + content: str + proposer: str = "" + status: str = "已决" + + +class MeetingMetric(BaseModel): + metric_name: str + value: str + target: str = "" + owner: str = "" + trend: str = "" + + +class MeetingExtraction(BaseModel): + title: str + date: str = "" + participants: List[str] = Field(default_factory=list) + agenda: List[str] = Field(default_factory=list) + entities: List[Entity] = Field(default_factory=list) + relations: List[Relation] = Field(default_factory=list) + action_items: List[ActionItem] = Field(default_factory=list) + decisions: List[Decision] = Field(default_factory=list) + metrics: List[MeetingMetric] = Field(default_factory=list) + summary: str = "" + + +EXTRACTION_SYSTEM_PROMPT = """ +你是一个专业的会议知识抽取助手。你的任务是从中文会议记录中抽取结构化事实,尤其要抽出更细粒度、更有语义深度的关系。 + +输出要求: +1. 只输出一个 JSON 对象,不要输出解释文字。 +2. 关系抽取不要停留在“部门汇报了工作”这种浅层描述,要尽可能向下细化到: + - 责任归属 + - 目标值 / 当前值 / 趋势 + - 约束条件 + - 因果 / 影响 + - 时间要求 + - 依赖关系 + - 部署 / 决策 / 要求 / 风险 / 支撑关系 +3. 每条关系尽量同时给出: + - subject / predicate / object + - fact: 一句自然语言事实表述 + - qualifiers: 限定条件、范围、状态、数值、约束等 + - evidence: 原文中的关键短句或压缩证据 + - confidence: 0 到 1 之间 + - valid_at / invalid_at: 如果文中明确提到时间,可填写;否则留空 +4. 如果原文存在多个事实,不要只抽象概括,要拆成多条关系。 +5. 避免空泛关系词,优先使用更具体的谓词,例如: + - 负责 / 汇报 / 目标值 / 当前值 / 低于 / 高于 / 要求 / 督导 / 推进 / 影响 / 支撑 / 依赖 / 计划 / 完成 / 截止于 +""" + + +def _call_llm(system: str, user: str, stream: bool = False) -> str: + if not stream: + response = client.chat.completions.create( + model=config.llm.model, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + max_tokens=config.llm.max_tokens, + temperature=config.llm.temperature, + ) + content = response.choices[0].message.content + if content is None: + raise ValueError("LLM returned empty response") + return content + + response = client.chat.completions.create( + model=config.llm.model, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + max_tokens=config.llm.max_tokens, + temperature=config.llm.temperature, + stream=True, + ) + + chunks: List[str] = [] + print("\n[LLM] 开始抽取,流式输出中:") + for event in response: + if not event.choices: + continue + delta = event.choices[0].delta.content + if not delta: + continue + chunks.append(delta) + sys.stdout.write(delta) + sys.stdout.flush() + print("\n[LLM] 抽取输出结束") + return "".join(chunks) + + +def extract_meeting_info(text: str, stream: bool = False) -> MeetingExtraction: + user_prompt = f""" +请从下面会议记录中提取结构化信息,并重点做“深层关系抽取”。 + +输出 JSON 字段: +- title +- date +- participants +- agenda +- entities: name, entity_type, description +- relations: + - subject + - subject_type + - predicate + - object + - object_type + - description + - fact + - qualifiers + - evidence + - confidence + - valid_at + - invalid_at +- action_items: task, assignee, deadline, status, priority +- decisions: content, proposer, status +- metrics: metric_name, value, target, owner, trend +- summary + +关系抽取规则: +1. 不要只抽“汇报了工作”这种会议动作,要尽量继续下钻出具体事实。 +2. 如果一句话里同时包含“主体 + 指标 + 当前值 + 目标值 + 负责人 + 趋势”,应拆成多条关系或在 qualifiers 中保留这些细节。 +3. 对于“要求、部署、负责、依赖、影响、约束、目标、风险”类信息优先保留。 +4. fact 必须是一句完整、自然、可检索的事实描述。 +5. qualifiers 用于补充数值、范围、状态、条件、截止时间、优先级等信息。 +6. evidence 用原文中的关键词短句,不要太长。 +7. confidence 取值 0 到 1。 + +会议记录如下: +{text} +""" + content = _call_llm(EXTRACTION_SYSTEM_PROMPT, user_prompt, stream=stream) + data = _try_parse_json(content) + data = _normalize_meeting_data(data) + return MeetingExtraction(**data) + + +def _try_parse_json(content: str) -> dict: + try: + return json.loads(content) + except json.JSONDecodeError: + logger.warning("JSON parsing failed; trying to repair extracted block") + match = re.search(r"\{.*\}", content, re.DOTALL) + if match: + try: + return json.loads(match.group()) + except json.JSONDecodeError as exc: + logger.error("Repaired JSON still failed to parse: %s", exc) + raise + + +def _normalize_meeting_data(data: dict) -> dict: + if not isinstance(data, dict): + return {} + + return { + "title": _as_str(data.get("title")), + "date": _as_str(data.get("date")), + "participants": _as_str_list(data.get("participants")), + "agenda": _as_str_list(data.get("agenda")), + "entities": _normalize_entities(data.get("entities")), + "relations": _normalize_relations(data.get("relations")), + "action_items": _normalize_action_items(data.get("action_items")), + "decisions": _normalize_decisions(data.get("decisions")), + "metrics": _normalize_metrics(data.get("metrics")), + "summary": _as_str(data.get("summary")), + } + + +def _as_str(value) -> str: + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _as_float(value) -> float: + if value is None or value == "": + return 0.0 + try: + numeric = float(value) + return max(0.0, min(1.0, numeric)) + except (TypeError, ValueError): + return 0.0 + + +def _as_str_list(value) -> List[str]: + if isinstance(value, dict): + items = [] + for key, item in value.items(): + key_text = _as_str(key) + value_text = _as_str(item) + if key_text and value_text: + items.append(f"{key_text}: {value_text}") + elif key_text: + items.append(key_text) + elif value_text: + items.append(value_text) + return items + if not isinstance(value, list): + return [] + return [_as_str(item) for item in value if item is not None] + + +def _normalize_entities(value) -> List[dict]: + if not isinstance(value, list): + return [] + items = [] + for entity in value: + if not isinstance(entity, dict): + continue + items.append( + { + "name": _as_str(entity.get("name")), + "entity_type": _as_str(entity.get("entity_type")), + "description": _as_str(entity.get("description")), + } + ) + return items + + +def _normalize_relations(value) -> List[dict]: + if not isinstance(value, list): + return [] + + items = [] + for relation in value: + if not isinstance(relation, dict): + continue + + subject = _as_str(relation.get("subject")) + predicate = _as_str(relation.get("predicate")) + obj = _as_str(relation.get("object")) + description = _as_str(relation.get("description")) + fact = _as_str(relation.get("fact")) + + if not fact and subject and predicate and obj: + fact = f"{subject} {predicate} {obj}" + + items.append( + { + "subject": subject, + "subject_type": _as_str(relation.get("subject_type")), + "predicate": predicate, + "object": obj, + "object_type": _as_str(relation.get("object_type")), + "description": description, + "fact": fact, + "qualifiers": _as_str_list(relation.get("qualifiers")), + "evidence": _as_str(relation.get("evidence")), + "confidence": _as_float(relation.get("confidence")), + "valid_at": _as_str(relation.get("valid_at")), + "invalid_at": _as_str(relation.get("invalid_at")), + } + ) + return items + + +def _normalize_action_items(value) -> List[dict]: + if not isinstance(value, list): + return [] + items = [] + for action in value: + if not isinstance(action, dict): + continue + items.append( + { + "task": _as_str(action.get("task")), + "assignee": _as_str(action.get("assignee")), + "deadline": _as_str(action.get("deadline")), + "status": _as_str(action.get("status")) or "待办", + "priority": _as_str(action.get("priority")) or "中", + } + ) + return items + + +def _normalize_decisions(value) -> List[dict]: + if not isinstance(value, list): + return [] + items = [] + for decision in value: + if not isinstance(decision, dict): + continue + items.append( + { + "content": _as_str(decision.get("content")), + "proposer": _as_str(decision.get("proposer")), + "status": _as_str(decision.get("status")) or "已决", + } + ) + return items + + +def _normalize_metrics(value) -> List[dict]: + if not isinstance(value, list): + return [] + items = [] + for metric in value: + if not isinstance(metric, dict): + continue + items.append( + { + "metric_name": _as_str(metric.get("metric_name")), + "value": _as_str(metric.get("value")), + "target": _as_str(metric.get("target")), + "owner": _as_str(metric.get("owner")), + "trend": _as_str(metric.get("trend")), + } + ) + return items diff --git a/meeting_memory/graph_store.py b/meeting_memory/graph_store.py new file mode 100644 index 0000000..ab16f47 --- /dev/null +++ b/meeting_memory/graph_store.py @@ -0,0 +1,784 @@ +import hashlib +import json +import logging +import re +import time +from typing import Any, Dict, List, Optional + +from meeting_memory.config import config +from meeting_memory.services.embedding_service import embedding_service + +logger = logging.getLogger(__name__) + + +def _cosine_similarity(left: List[float], right: List[float]) -> float: + if not left or not right or len(left) != len(right): + return 0.0 + + dot = sum(a * b for a, b in zip(left, right)) + left_norm = sum(a * a for a in left) ** 0.5 + right_norm = sum(b * b for b in right) ** 0.5 + if left_norm == 0 or right_norm == 0: + return 0.0 + return dot / (left_norm * right_norm) + + +def _keyword_score(text: str, question: str) -> float: + source = (text or "").lower() + terms = _keyword_terms(question) + if not source or not terms: + return 0.0 + hits = sum(1 for term in terms if term in source) + return hits / len(terms) + + +def _keyword_terms(text: str) -> List[str]: + normalized = (text or "").lower() + raw_terms = re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]{2,}", normalized) + stopwords = {"是什么", "多少", "分别", "以及", "还有", "当前值", "目标值"} + + terms: List[str] = [] + for raw in raw_terms: + if raw in stopwords: + continue + if raw not in terms: + terms.append(raw) + + if re.fullmatch(r"[\u4e00-\u9fff]{4,}", raw): + for size in (2, 3, 4): + for idx in range(0, len(raw) - size + 1): + piece = raw[idx : idx + size] + if piece not in stopwords and piece not in terms: + terms.append(piece) + return terms + + +class Neo4jGraphStore: + def __init__(self): + self._driver = None + self._enabled = False + self._uri = config.neo4j.uri + self._last_failure_at = 0.0 + self._retry_cooldown_seconds = 10.0 + self._connect() + + def _connect(self): + if not config.neo4j.enabled: + logger.info("Neo4j graph store disabled") + return + + try: + from neo4j import GraphDatabase + except ImportError: + logger.warning("neo4j package is not installed") + return + + if not config.neo4j.password: + logger.warning("Neo4j is enabled but NEO4J_PASSWORD is empty") + return + + tried_uris = [self._uri] + if self._uri.startswith("neo4j://"): + tried_uris.append("bolt://" + self._uri[len("neo4j://") :]) + + for uri in tried_uris: + driver = None + try: + driver = GraphDatabase.driver( + uri, + auth=(config.neo4j.user, config.neo4j.password), + ) + driver.verify_connectivity() + self._driver = driver + self._uri = uri + self._enabled = True + self._last_failure_at = 0.0 + if uri != config.neo4j.uri: + logger.warning("Neo4j routing URI unavailable; fell back to %s", uri) + return + except Exception as exc: + logger.warning("Neo4j connection failed for %s: %s", uri, exc) + try: + driver.close() + except Exception: + pass + + self._mark_unavailable("Neo4j is currently unreachable") + + @property + def enabled(self) -> bool: + if not self._enabled and self._should_retry_connect(): + self._connect() + return self._enabled and self._driver is not None + + def _should_retry_connect(self) -> bool: + return (time.time() - self._last_failure_at) >= self._retry_cooldown_seconds + + def _mark_unavailable(self, reason: str = "") -> None: + if reason: + logger.warning("Neo4j temporarily disabled: %s", reason) + self._enabled = False + self._last_failure_at = time.time() + if self._driver is not None: + try: + self._driver.close() + except Exception: + pass + self._driver = None + + @staticmethod + def meeting_id(meeting_data: dict) -> str: + title = meeting_data.get("title", "") + date = meeting_data.get("date", "") + raw = f"{date}_{title}" + return f"meeting_{hashlib.md5(raw.encode('utf-8')).hexdigest()[:12]}" + + def close(self): + if self._driver is not None: + self._driver.close() + + def run_query(self, query: str, **params) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + + try: + with self._driver.session(database=config.neo4j.database) as session: + result = session.run(query, **params) + return [record.data() for record in result] + except Exception as exc: + logger.warning("Neo4j query failed: %s", exc) + self._mark_unavailable(str(exc)) + return [] + + def initialize_schema(self): + if not self.enabled: + return + + statements = [ + "CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (m:Meeting) REQUIRE m.meeting_id IS UNIQUE", + "CREATE CONSTRAINT episode_id IF NOT EXISTS FOR (e:Episode) REQUIRE e.episode_id IS UNIQUE", + "CREATE CONSTRAINT entity_name IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE", + "CREATE CONSTRAINT fact_id IF NOT EXISTS FOR (f:Fact) REQUIRE f.fact_id IS UNIQUE", + "CREATE INDEX meeting_title IF NOT EXISTS FOR (m:Meeting) ON (m.title)", + "CREATE INDEX episode_title IF NOT EXISTS FOR (e:Episode) ON (e.title)", + "CREATE INDEX entity_type IF NOT EXISTS FOR (e:Entity) ON (e.entity_type)", + "CREATE INDEX fact_predicate IF NOT EXISTS FOR (f:Fact) ON (f.predicate)", + ] + for statement in statements: + self.run_query(statement) + + def get_stats(self) -> Dict[str, Any]: + if not self.enabled: + return {"enabled": False} + + rows = self.run_query( + """ + CALL () { + MATCH (m:Meeting) + RETURN count(m) AS meetings + } + CALL () { + MATCH (ep:Episode) + RETURN count(ep) AS episodes + } + CALL () { + MATCH (e:Entity) + RETURN count(e) AS entities + } + CALL () { + MATCH (f:Fact) + RETURN count(f) AS facts + } + RETURN meetings, episodes, entities, facts + """ + ) + if not rows: + return {"enabled": False, "meetings": 0, "episodes": 0, "entities": 0, "facts": 0} + return {"enabled": True, **rows[0]} + + def upsert_meeting_subgraph(self, meeting_data: dict) -> None: + if not self.enabled: + return + + meeting_id = meeting_data.get("_graph_meeting_id") or self.meeting_id(meeting_data) + episode_text = self._build_episode_text(meeting_data) + episode_embedding = embedding_service.embed_text(episode_text) + + self.initialize_schema() + self.run_query( + """ + MERGE (m:Meeting {meeting_id: $meeting_id}) + SET m.title = $title, + m.date = $date, + m.summary = $summary, + m.content_hash = $content_hash, + m.raw_path = $raw_path, + m.updated_at = datetime() + MERGE (ep:Episode {episode_id: $meeting_id}) + SET ep.title = $title, + ep.date = $date, + ep.summary = $summary, + ep.content = $content, + ep.content_hash = $content_hash, + ep.raw_path = $raw_path, + ep.participants = $participants, + ep.content_embedding = $content_embedding, + ep.updated_at = datetime() + MERGE (m)-[:HAS_EPISODE]->(ep) + """, + meeting_id=meeting_id, + title=meeting_data.get("title", ""), + date=meeting_data.get("date", ""), + summary=meeting_data.get("summary", ""), + content_hash=meeting_data.get("_content_hash", ""), + raw_path=meeting_data.get("_original_text_path", ""), + content=episode_text, + participants=meeting_data.get("participants", []), + content_embedding=episode_embedding, + ) + + for entity in meeting_data.get("entities", []): + self._upsert_entity(meeting_id, entity) + + for participant in meeting_data.get("participants", []): + self._upsert_entity( + meeting_id, + {"name": participant, "entity_type": "participant", "description": ""}, + ) + + for relation in meeting_data.get("relations", []): + self._upsert_relation(meeting_id, relation, meeting_data.get("date", "")) + + def _upsert_entity(self, meeting_id: str, entity: dict) -> None: + name = entity.get("name", "").strip() + if not name: + return + + summary = self._entity_summary(entity) + name_embedding = embedding_service.embed_text(summary or name) + self.run_query( + """ + MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) + MERGE (e:Entity {name: $name}) + SET e.entity_type = CASE + WHEN $entity_type <> '' THEN $entity_type + ELSE coalesce(e.entity_type, '') + END, + e.description = CASE + WHEN $description <> '' THEN $description + ELSE coalesce(e.description, '') + END, + e.summary = CASE + WHEN $summary <> '' THEN $summary + ELSE coalesce(e.summary, '') + END, + e.name_embedding = CASE + WHEN size($name_embedding) > 0 THEN $name_embedding + ELSE coalesce(e.name_embedding, []) + END, + e.updated_at = datetime() + MERGE (ep)-[:MENTIONS]->(e) + """, + meeting_id=meeting_id, + name=name, + entity_type=entity.get("entity_type", ""), + description=entity.get("description", ""), + summary=summary, + name_embedding=name_embedding, + ) + + def _upsert_relation(self, meeting_id: str, relation: dict, meeting_date: str) -> None: + subject = relation.get("subject", "").strip() + predicate = relation.get("predicate", "").strip() + obj = relation.get("object", "").strip() + if not subject or not predicate or not obj: + return + + self._upsert_entity( + meeting_id, + { + "name": subject, + "entity_type": relation.get("subject_type", ""), + "description": "", + }, + ) + self._upsert_entity( + meeting_id, + { + "name": obj, + "entity_type": relation.get("object_type", ""), + "description": "", + }, + ) + + fact_text = self._fact_text(relation) + fact_id = hashlib.md5( + f"{meeting_id}|{subject}|{predicate}|{obj}".encode("utf-8") + ).hexdigest() + fact_embedding = embedding_service.embed_text(fact_text) + + self.run_query( + """ + MATCH (:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) + MATCH (s:Entity {name: $subject}) + MATCH (o:Entity {name: $object}) + MERGE (f:Fact {fact_id: $fact_id}) + SET f.fact = $fact, + f.predicate = $predicate, + f.description = $description, + f.qualifiers = $qualifiers, + f.evidence = $evidence, + f.confidence = $confidence, + f.valid_at = $valid_at, + f.invalid_at = $invalid_at, + f.meeting_id = $meeting_id, + f.meeting_date = $meeting_date, + f.fact_embedding = $fact_embedding, + f.updated_at = datetime() + MERGE (ep)-[:HAS_FACT]->(f) + MERGE (s)-[:FACT_SOURCE]->(f) + MERGE (f)-[:FACT_TARGET]->(o) + """, + meeting_id=meeting_id, + subject=subject, + predicate=predicate, + object=obj, + fact_id=fact_id, + fact=fact_text, + description=relation.get("description", ""), + qualifiers=relation.get("qualifiers", []), + evidence=relation.get("evidence", ""), + confidence=relation.get("confidence", 0.0), + valid_at=relation.get("valid_at", ""), + invalid_at=relation.get("invalid_at", ""), + meeting_date=meeting_date, + fact_embedding=fact_embedding, + ) + + def remove_meeting_subgraph(self, meeting_id: str) -> None: + if not self.enabled: + return + + self.run_query( + """ + MATCH (m:Meeting {meeting_id: $meeting_id})-[:HAS_EPISODE]->(ep:Episode {episode_id: $meeting_id}) + OPTIONAL MATCH (ep)-[mention:MENTIONS]->(entity:Entity) + OPTIONAL MATCH (ep)-[has_fact:HAS_FACT]->(fact:Fact) + OPTIONAL MATCH (fact)-[target_rel:FACT_TARGET]->(:Entity) + OPTIONAL MATCH (:Entity)-[source_rel:FACT_SOURCE]->(fact) + DELETE mention, has_fact, target_rel, source_rel + WITH m, ep, collect(DISTINCT fact) AS facts, collect(DISTINCT entity) AS entities + FOREACH (fact IN facts | DELETE fact) + DELETE ep, m + WITH entities + UNWIND entities AS entity + WITH DISTINCT entity WHERE entity IS NOT NULL + OPTIONAL MATCH (entity)<-[m1:MENTIONS]-(:Episode) + OPTIONAL MATCH (entity)-[m2:FACT_SOURCE|FACT_TARGET]-(:Fact) + WITH entity, count(m1) + count(m2) AS refs + WHERE refs = 0 + DELETE entity + """, + meeting_id=meeting_id, + ) + + def get_meeting(self, title: str, date: str = "") -> Optional[Dict[str, Any]]: + if not self.enabled: + return None + + rows = self.run_query( + """ + MATCH (m:Meeting) + WHERE m.title = $title + AND ($date = '' OR m.date = $date) + RETURN m.meeting_id AS meeting_id, + m.title AS title, + m.date AS date, + m.summary AS summary, + m.content_hash AS content_hash + LIMIT 1 + """, + title=title, + date=date, + ) + return rows[0] if rows else None + + def find_similar_episode(self, text: str, threshold: float = 0.92) -> Optional[Dict[str, Any]]: + if not self.enabled or not text.strip(): + return None + + query_embedding = embedding_service.embed_text(text) + rows = self.run_query( + """ + MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode) + RETURN m.meeting_id AS meeting_id, + m.title AS title, + m.date AS date, + m.content_hash AS content_hash, + ep.content_embedding AS content_embedding + """ + ) + + best_match = None + for row in rows: + score = _cosine_similarity(query_embedding, row.get("content_embedding", [])) + if score >= threshold and (best_match is None or score > best_match["score"]): + best_match = { + "metadata": { + "meeting_id": row.get("meeting_id", ""), + "title": row.get("title", ""), + "date": row.get("date", ""), + "content_hash": row.get("content_hash", ""), + }, + "score": score, + } + return best_match + + def hybrid_search(self, question: str, limit: int = 5) -> List[Dict[str, Any]]: + if not self.enabled or not question.strip(): + return [] + + query_embedding = embedding_service.embed_text(question) + candidates = self._load_fact_candidates() + candidates.extend(self._load_entity_candidates()) + candidates.extend(self._load_episode_candidates()) + + scored = [] + for item in candidates: + combined_text = " ".join( + [ + str(item.get("title") or ""), + str(item.get("text") or ""), + str(item.get("meeting_title") or ""), + str(item.get("date") or ""), + ] + ) + semantic = _cosine_similarity(query_embedding, item.get("embedding", [])) + lexical = _keyword_score(combined_text, question) + graph_bonus = 0.1 if item.get("kind") == "fact" else 0.05 + score = semantic * 0.7 + lexical * 0.2 + graph_bonus + if score <= 0: + continue + + scored.append( + { + **item, + "score": round(score, 4), + "semantic_score": round(semantic, 4), + "keyword_score": round(lexical, 4), + } + ) + + scored.sort(key=lambda row: row["score"], reverse=True) + return scored[:limit] + + def search_facts(self, question: str, limit: int = 5) -> List[Dict[str, Any]]: + return self.hybrid_search(question, limit=limit) + + def get_graph_kinds(self) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + rows = self.run_query( + """ + MATCH (n) + WHERE n:Meeting OR n:Episode OR n:Entity OR n:Fact + WITH [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind + RETURN kind, count(*) AS count + ORDER BY count DESC + """ + ) + return rows + + def get_entity_types(self) -> List[Dict[str, Any]]: + if not self.enabled: + return [] + return self.run_query( + """ + MATCH (e:Entity) + WHERE coalesce(e.entity_type, '') <> '' + RETURN e.entity_type AS entity_type, count(*) AS count + ORDER BY count DESC + """ + ) + + def get_graph_snapshot( + self, + query: str = "", + entity_types: Optional[List[str]] = None, + kinds: Optional[List[str]] = None, + limit_nodes: int = 80, + limit_edges: int = 160, + ) -> Dict[str, Any]: + if not self.enabled: + return {"nodes": [], "edges": [], "stats": {"enabled": False}} + + keyword_terms = _keyword_terms(query) if query else [] + raw_nodes = self.run_query( + """ + MATCH (n) + WHERE (n:Meeting OR n:Episode OR n:Entity OR n:Fact) + AND ($kinds = [] OR [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] IN $kinds) + AND ($terms = [] + OR (n:Meeting AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t)) + OR (n:Episode AND any(t IN $terms WHERE toLower(coalesce(n.title,'')) CONTAINS t OR toLower(coalesce(n.content,'')) CONTAINS t)) + OR (n:Entity AND any(t IN $terms WHERE toLower(coalesce(n.name,'')) CONTAINS t OR toLower(coalesce(n.summary,'')) CONTAINS t OR toLower(coalesce(n.description,'')) CONTAINS t)) + OR (n:Fact AND any(t IN $terms WHERE toLower(coalesce(n.fact,'')) CONTAINS t OR toLower(coalesce(n.predicate,'')) CONTAINS t OR toLower(coalesce(n.description,'')) CONTAINS t)) + ) + AND ($types = [] OR NOT n:Entity OR coalesce(n.entity_type, '') IN $types) + OPTIONAL MATCH (n)-[r]-() + RETURN n.meeting_id AS meeting_id, + n.episode_id AS episode_id, + n.name AS entity_name, + n.fact_id AS fact_id, + n.title AS title, + n.summary AS summary, + n.date AS date, + n.entity_type AS entity_type, + n.description AS description, + n.predicate AS predicate, + n.fact AS fact, + n.confidence AS confidence, + n.meeting_date AS meeting_date, + [lbl IN labels(n) WHERE lbl IN ['Meeting','Episode','Entity','Fact']][0] AS kind, + count(DISTINCT r) AS degree + ORDER BY degree DESC, coalesce(n.title, n.name, n.fact) ASC + LIMIT $limit_nodes + """, + terms=keyword_terms, + types=entity_types or [], + kinds=kinds or [], + limit_nodes=limit_nodes, + ) + if not raw_nodes: + return {"nodes": [], "edges": [], "stats": self.get_stats()} + + all_raw_ids = set() + nodes = [] + for row in raw_nodes: + kind = row.get("kind", "") + if kind == "Meeting": + raw_id = row.get("meeting_id", "") + label = row.get("title", "") or raw_id + elif kind == "Episode": + raw_id = row.get("episode_id", "") + label = row.get("title", "") or raw_id + elif kind == "Entity": + raw_id = row.get("entity_name", "") + label = raw_id + elif kind == "Fact": + raw_id = row.get("fact_id", "") + label = row.get("predicate", "") or row.get("fact", "") or raw_id + else: + continue + if not raw_id: + continue + nid = f"{kind}:{raw_id}" + all_raw_ids.add(raw_id) + nodes.append({ + "id": nid, + "label": label, + "kind": kind, + "entity_type": row.get("entity_type", "") if kind == "Entity" else "", + "description": row.get("description", "") or row.get("summary", "") or "", + "date": row.get("date", "") or row.get("meeting_date", "") or "", + "degree": row.get("degree", 0), + "fact": row.get("fact", "") if kind == "Fact" else "", + "summary": row.get("summary", "") or "", + }) + + if not nodes: + return {"nodes": [], "edges": [], "stats": self.get_stats()} + + ids_list = list(all_raw_ids) + edges_raw = self.run_query( + """ + MATCH (s)-[r]->(t) + WHERE type(r) IN ['HAS_EPISODE','MENTIONS','HAS_FACT','FACT_SOURCE','FACT_TARGET'] + AND ( + (s:Meeting AND s.meeting_id IN $ids) + OR (s:Episode AND s.episode_id IN $ids) + OR (s:Entity AND s.name IN $ids) + OR (s:Fact AND s.fact_id IN $ids) + ) + AND ( + (t:Meeting AND t.meeting_id IN $ids) + OR (t:Episode AND t.episode_id IN $ids) + OR (t:Entity AND t.name IN $ids) + OR (t:Fact AND t.fact_id IN $ids) + ) + RETURN type(r) AS predicate, + CASE WHEN s:Meeting THEN s.meeting_id + WHEN s:Episode THEN s.episode_id + WHEN s:Entity THEN s.name + WHEN s:Fact THEN s.fact_id END AS source_raw, + CASE WHEN t:Meeting THEN t.meeting_id + WHEN t:Episode THEN t.episode_id + WHEN t:Entity THEN t.name + WHEN t:Fact THEN t.fact_id END AS target_raw, + CASE WHEN s:Meeting THEN 'Meeting' WHEN s:Episode THEN 'Episode' + WHEN s:Entity THEN 'Entity' WHEN s:Fact THEN 'Fact' END AS source_kind, + CASE WHEN t:Meeting THEN 'Meeting' WHEN t:Episode THEN 'Episode' + WHEN t:Entity THEN 'Entity' WHEN t:Fact THEN 'Fact' END AS target_kind, + CASE WHEN s:Fact THEN coalesce(s.predicate, '') + WHEN t:Fact THEN coalesce(t.predicate, '') ELSE '' END AS fact_predicate, + CASE WHEN s:Fact THEN coalesce(s.fact, '') + WHEN t:Fact THEN coalesce(t.fact, '') ELSE '' END AS fact_text, + CASE WHEN s:Fact THEN coalesce(s.description, '') + WHEN t:Fact THEN coalesce(t.description, '') ELSE '' END AS fact_description, + CASE WHEN s:Fact THEN coalesce(s.confidence, 0.0) + WHEN t:Fact THEN coalesce(t.confidence, 0.0) ELSE 0.0 END AS fact_confidence, + CASE WHEN s:Fact THEN coalesce(s.meeting_date, '') + WHEN t:Fact THEN coalesce(t.meeting_date, '') ELSE '' END AS fact_date, + CASE WHEN s:Fact THEN coalesce(s.meeting_id, '') + WHEN t:Fact THEN coalesce(t.meeting_id, '') ELSE '' END AS fact_meeting_id + LIMIT $limit_edges + """, + ids=list(all_raw_ids), + limit_edges=limit_edges, + ) + + degree_map: Dict[str, int] = {} + for row in edges_raw: + src = row.get("source", "") + tgt = row.get("target", "") + degree_map[src] = degree_map.get(src, 0) + 1 + degree_map[tgt] = degree_map.get(tgt, 0) + 1 + for node in nodes: + node["degree"] = degree_map.get(node["id"], node.get("degree", 0)) + + edges = [] + for idx, row in enumerate(edges_raw, start=1): + sk = row.get("source_kind", "") + tk = row.get("target_kind", "") + edges.append({ + "id": f"edge_{idx}", + "source": f"{sk}:{row['source_raw']}" if sk and row.get("source_raw") else "", + "target": f"{tk}:{row['target_raw']}" if tk and row.get("target_raw") else "", + "predicate": row.get("predicate", ""), + "fact": row.get("fact_text", "") or row.get("fact_description", "") or "", + "description": row.get("fact_description", "") or "", + "confidence": row.get("fact_confidence", 0.0), + "date": row.get("fact_date", "") or "", + "meeting_id": row.get("fact_meeting_id", "") or "", + }) + + return { + "nodes": nodes, + "edges": edges, + "stats": self.get_stats(), + "query": query, + } + + def format_search_context(self, question: str, top_k: int = 5) -> str: + results = self.hybrid_search(question, limit=top_k) + if not results: + return "" + + lines = [] + for idx, row in enumerate(results, start=1): + date = row.get("date", "") + meeting_title = row.get("meeting_title", "") + title = row.get("title", row.get("kind", "item")) + suffix = f" ({date})" if date else "" + source = f" | 来源会议: {meeting_title}" if meeting_title else "" + lines.append( + f"[{idx}] {title}{suffix}{source}\n" + f"{row.get('text', '')}\n" + f"score={row.get('score', 0):.4f}, semantic={row.get('semantic_score', 0):.4f}, keyword={row.get('keyword_score', 0):.4f}" + ) + return "\n\n".join(lines) + + def _load_fact_candidates(self) -> List[Dict[str, Any]]: + return self.run_query( + """ + MATCH (ep:Episode)-[:HAS_FACT]->(f:Fact) + OPTIONAL MATCH (s:Entity)-[:FACT_SOURCE]->(f) + OPTIONAL MATCH (f)-[:FACT_TARGET]->(o:Entity) + RETURN 'fact' AS kind, + coalesce(s.name + ' -[' + coalesce(f.predicate, '') + ']-> ' + o.name, f.fact) AS title, + coalesce( + f.description + CASE + WHEN size(coalesce(f.qualifiers, [])) > 0 THEN ' | ' + reduce(acc = '', item IN f.qualifiers | + acc + CASE WHEN acc = '' THEN item ELSE '; ' + item END + ) + ELSE '' + END, + f.fact, + '' + ) AS text, + ep.date AS date, + ep.title AS meeting_title, + f.fact_embedding AS embedding + """ + ) + + def _load_entity_candidates(self) -> List[Dict[str, Any]]: + return self.run_query( + """ + MATCH (e:Entity) + OPTIONAL MATCH (ep:Episode)-[:MENTIONS]->(e) + RETURN 'entity' AS kind, + e.name AS title, + coalesce(e.summary, e.description, '') AS text, + max(ep.date) AS date, + head(collect(DISTINCT ep.title)) AS meeting_title, + e.name_embedding AS embedding + """ + ) + + def _load_episode_candidates(self) -> List[Dict[str, Any]]: + return self.run_query( + """ + MATCH (m:Meeting)-[:HAS_EPISODE]->(ep:Episode) + RETURN 'episode' AS kind, + m.title AS title, + coalesce(ep.summary, ep.content, '') AS text, + ep.date AS date, + m.title AS meeting_title, + ep.content_embedding AS embedding + """ + ) + + @staticmethod + def _entity_summary(entity: dict) -> str: + entity_type = entity.get("entity_type", "").strip() + name = entity.get("name", "").strip() + description = entity.get("description", "").strip() + parts = [part for part in [entity_type, name, description] if part] + return " | ".join(parts) + + @staticmethod + def _fact_text(relation: dict) -> str: + subject = relation.get("subject", "").strip() + predicate = relation.get("predicate", "").strip() + obj = relation.get("object", "").strip() + description = relation.get("description", "").strip() + fact = relation.get("fact", "").strip() or f"{subject} {predicate} {obj}".strip() + qualifiers = relation.get("qualifiers", []) + qualifier_text = "; ".join(item for item in qualifiers if item) + if description and qualifier_text: + return f"{fact}. {description}. {qualifier_text}" + if description: + return f"{fact}. {description}" + if qualifier_text: + return f"{fact}. {qualifier_text}" + return fact + + @staticmethod + def _build_episode_text(meeting_data: dict) -> str: + payload = { + "title": meeting_data.get("title", ""), + "date": meeting_data.get("date", ""), + "participants": meeting_data.get("participants", []), + "summary": meeting_data.get("summary", ""), + "entities": meeting_data.get("entities", []), + "relations": meeting_data.get("relations", []), + "action_items": meeting_data.get("action_items", []), + "metrics": meeting_data.get("metrics", []), + "decisions": meeting_data.get("decisions", []), + "original_text": meeting_data.get("_original_text", ""), + } + return json.dumps(payload, ensure_ascii=False) + + +graph_store = Neo4jGraphStore() diff --git a/meeting_processor.py b/meeting_memory/meeting_processor.py similarity index 65% rename from meeting_processor.py rename to meeting_memory/meeting_processor.py index 17f4254..ddeceea 100644 --- a/meeting_processor.py +++ b/meeting_memory/meeting_processor.py @@ -1,38 +1,56 @@ import hashlib import logging -import os -from typing import Optional +from typing import Callable, Optional -from config import config -from extractor import MeetingExtraction, extract_meeting_info -from graph_store import graph_store -from meeting_state import MeetingStateStore -from raw_store import raw_meeting_store -from vector_store import meeting_vector_store +from meeting_memory.config import config +from meeting_memory.extractor import MeetingExtraction, extract_meeting_info +from meeting_memory.graph_store import graph_store +from meeting_memory.meeting_state import MeetingStateStore +from meeting_memory.raw_store import raw_meeting_store logger = logging.getLogger(__name__) state_store = MeetingStateStore(config.state_path) +ProgressCallback = Callable[[int, int, str], None] class MeetingProcessor: def process_meeting_file(self, filepath: str, force: bool = False) -> Optional[str]: - with open(filepath, "r", encoding="utf-8") as f: - text = f.read() + with open(filepath, "r", encoding="utf-8") as file_obj: + text = file_obj.read() return self.process_meeting_text(text, force=force) - def process_meeting_text(self, text: str, force: bool = False) -> Optional[str]: + def process_meeting_text( + self, + text: str, + force: bool = False, + interactive: bool = True, + progress_callback: Optional[ProgressCallback] = None, + ) -> Optional[str]: + def report(step: int, message: str) -> None: + if progress_callback: + progress_callback(step, 7, message) + print(f"[{step}/7] {message}") + + report(1, "计算内容哈希") content_hash = self._compute_content_hash(text) if not force and state_store.has_content_hash(content_hash): - print("\n检测到重复内容,已跳过。") logger.info("Duplicate content hash skipped: %s", content_hash[:12]) return None if not force: - similar = meeting_vector_store.find_similar_text(text, threshold=0.92) + report(2, "Neo4j 语义相似去重检索") + similar = graph_store.find_similar_episode(text, threshold=0.92) if similar: meta = similar["metadata"] + if not interactive: + logger.info( + "Skipped similar meeting in non-interactive mode: %s", + meta.get("title", ""), + ) + return None + print( f"\n发现相似会议:{meta.get('title', '')} ({meta.get('date', '')}) " f"相似度 {similar['score']:.2%}" @@ -46,7 +64,10 @@ class MeetingProcessor: force = True break print("请输入 s 或 o。") + else: + report(2, "跳过语义去重,按覆盖模式继续") + report(3, "调用大模型抽取结构化信息") meeting_data = self._extract(text) if not meeting_data: logger.error("Failed to extract meeting information") @@ -54,21 +75,24 @@ class MeetingProcessor: data_dict = meeting_data.model_dump() data_dict["_content_hash"] = content_hash - data_dict["_graph_meeting_id"] = meeting_vector_store._meeting_id(data_dict) + data_dict["_graph_meeting_id"] = graph_store.meeting_id(data_dict) - should_skip = self._handle_duplicate(data_dict, force) + report(4, "检查标题和日期重复") + should_skip = self._handle_duplicate(data_dict, force=force, interactive=interactive) if should_skip: return None meeting_title = data_dict.get("title", "") meeting_date = data_dict.get("date", "") - raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date) + report(5, "归档原始会议文本") + raw_path = raw_meeting_store.save(text, title=meeting_title, date=meeting_date) data_dict["_original_text"] = text data_dict["_original_text_path"] = raw_path - meeting_filename = f"{meeting_vector_store._meeting_id(data_dict)}.md" + meeting_filename = f"{graph_store.meeting_id(data_dict)}.md" + report(6, "合并行动项和指标状态") data_dict["action_items"] = state_store.merge_action_items( data_dict.get("action_items", []), meeting_title, @@ -84,16 +108,17 @@ class MeetingProcessor: state_store.add_content_hash(content_hash, meeting_title, meeting_date, meeting_filename) state_store.save() - meeting_vector_store.add_meeting(data_dict) + + report(7, "写入 Neo4j 图谱和检索数据") graph_store.upsert_meeting_subgraph(data_dict) logger.info("Meeting processed: %s", meeting_title) return raw_path - def _handle_duplicate(self, data_dict: dict, force: bool) -> bool: + def _handle_duplicate(self, data_dict: dict, force: bool, interactive: bool = True) -> bool: title = data_dict.get("title", "") date = data_dict.get("date", "") - existing = meeting_vector_store.find_meeting(title, date) + existing = graph_store.get_meeting(title, date) if not existing: return False @@ -103,6 +128,10 @@ class MeetingProcessor: self._remove_old(data_dict, existing) return False + if not interactive: + logger.info("Skipped duplicate meeting in non-interactive mode: %s", title) + return True + print(f"\n发现重复会议:{title} ({date})") while True: choice = input("选择 [s]跳过 / [o]覆盖(默认 s):").strip().lower() or "s" @@ -114,9 +143,8 @@ class MeetingProcessor: return False print("请输入 s 或 o。") - def _remove_old(self, data_dict: dict, existing: Optional[dict] = None): - meeting_id = meeting_vector_store._meeting_id(data_dict) - meeting_vector_store.remove_meeting(meeting_id) + def _remove_old(self, data_dict: dict, existing: Optional[dict] = None) -> None: + meeting_id = graph_store.meeting_id(data_dict) graph_store.remove_meeting_subgraph(meeting_id) new_hash = data_dict.get("_content_hash", "") @@ -136,34 +164,16 @@ class MeetingProcessor: def _extract(self, text: str) -> Optional[MeetingExtraction]: try: - return extract_meeting_info(text) + return extract_meeting_info(text, stream=True) except Exception as exc: logger.error("LLM extraction failed: %s", exc) return None def query(self, question: str, top_k: int = 3) -> str: - vector_context = meeting_vector_store.query_as_context(question, top_k=top_k) - graph_results = graph_store.search_facts(question, limit=top_k) - - parts = [] - if vector_context: - parts.append("=== Vector Context ===\n" + vector_context) - - if graph_results: - graph_lines = [] - for idx, row in enumerate(graph_results, start=1): - title = row.get("title", row.get("kind", "graph")) - text = row.get("text", "") - date = row.get("date", "") - suffix = f" ({date})" if date else "" - graph_lines.append(f"[{idx}] {title}{suffix}\n{text}") - parts.append("=== Graph Facts ===\n" + "\n\n".join(graph_lines)) - - return "\n\n".join(parts) + return graph_store.format_search_context(question, top_k=top_k) def stats(self) -> dict: return { - "vector_index": meeting_vector_store.get_stats(), "graph": graph_store.get_stats(), "state": state_store.get_stats(), "raw_dir": config.storage.raw_dir, diff --git a/meeting_state.py b/meeting_memory/meeting_state.py similarity index 85% rename from meeting_state.py rename to meeting_memory/meeting_state.py index e643acf..e99a668 100644 --- a/meeting_state.py +++ b/meeting_memory/meeting_state.py @@ -2,8 +2,8 @@ import hashlib import json import logging import os -from datetime import datetime -from typing import Dict, List, Optional +import re +from typing import List, Optional logger = logging.getLogger(__name__) @@ -28,8 +28,8 @@ class MeetingStateStore: try: with open(self.state_path, "r", encoding="utf-8") as f: return json.load(f) - except Exception as e: - logger.warning(f"加载状态文件失败,将创建新状态: {e}") + except Exception as exc: + logger.warning("Failed to load state file, creating a new one: %s", exc) return { "action_items": {}, "metrics": {}, @@ -55,11 +55,10 @@ class MeetingStateStore: return series_name def _detect_series(self, title: str) -> str: - import re - cleaned = re.sub(r"(\d{4}第\w+期)", "", title) - cleaned = re.sub(r"\(\d{4}第\w+期\)", "", cleaned) - cleaned = re.sub(r"\d{4}第\w+期", "", cleaned) - cleaned = re.sub(r"\d{4}年第\w+次", "", cleaned) + cleaned = re.sub(r"\uFF08\d{4}\u7B2C\w+\u671F\uFF09", "", title) + cleaned = re.sub(r"\(\d{4}\u7B2C\w+\u671F\)", "", cleaned) + cleaned = re.sub(r"\d{4}\u7B2C\w+\u671F", "", cleaned) + cleaned = re.sub(r"\d{4}\u5E74\u7B2C\w+\u6B21", "", cleaned) cleaned = cleaned.strip("-_ ") return cleaned or title @@ -122,26 +121,25 @@ class MeetingStateStore: ) -> List[dict]: merged = [] - for m in new_metrics: - metric_name = m.get("metric_name", "") - owner = m.get("owner", "") + for metric in new_metrics: + metric_name = metric.get("metric_name", "") + owner = metric.get("owner", "") mid = _metric_id(metric_name, owner) history_entry = { "date": meeting_date, "meeting": meeting_filename, - "value": m.get("value", ""), - "target": m.get("target", ""), - "trend": m.get("trend", ""), + "value": metric.get("value", ""), + "target": metric.get("target", ""), + "trend": metric.get("trend", ""), } existing = self._state["metrics"].get(mid) if existing: existing["history"].append(history_entry) existing["latest"] = history_entry - item = m - item["_metric_id"] = mid - item["_history"] = list(existing["history"]) + metric["_metric_id"] = mid + metric["_history"] = list(existing["history"]) else: self._state["metrics"][mid] = { "metric_id": mid, @@ -150,10 +148,10 @@ class MeetingStateStore: "history": [history_entry], "latest": history_entry, } - m["_metric_id"] = mid - m["_history"] = [history_entry] + metric["_metric_id"] = mid + metric["_history"] = [history_entry] - merged.append(m) + merged.append(metric) return merged @@ -186,4 +184,4 @@ class MeetingStateStore: "metrics_tracked": len(self._state["metrics"]), "meeting_series": len(self._state["meeting_series"]), "content_hashes": len(self._state["content_hashes"]), - } \ No newline at end of file + } diff --git a/raw_store.py b/meeting_memory/raw_store.py similarity index 87% rename from raw_store.py rename to meeting_memory/raw_store.py index 6800ad3..568167d 100644 --- a/raw_store.py +++ b/meeting_memory/raw_store.py @@ -2,7 +2,7 @@ import logging import os from datetime import datetime -from config import config +from meeting_memory.config import config logger = logging.getLogger(__name__) @@ -23,9 +23,12 @@ class RawMeetingStore: os.makedirs(self.raw_dir, exist_ok=True) def save(self, text: str, title: str = "", date: str = "") -> str: + os.makedirs(self.raw_dir, exist_ok=True) + date_str = date or datetime.now().strftime("%Y-%m-%d") + safe_date = _sanitize_filename(date_str)[:40] safe_title = _sanitize_filename(title)[:60] - filename = f"{date_str}_{safe_title}.md" + filename = f"{safe_date}_{safe_title}.md" filepath = os.path.join(self.raw_dir, filename) content = "\n".join( diff --git a/meeting_memory/services/__init__.py b/meeting_memory/services/__init__.py new file mode 100644 index 0000000..e338c25 --- /dev/null +++ b/meeting_memory/services/__init__.py @@ -0,0 +1,3 @@ +from meeting_memory.services.embedding_service import EmbeddingService, embedding_service + +__all__ = ["EmbeddingService", "embedding_service"] diff --git a/meeting_memory/services/embedding_service.py b/meeting_memory/services/embedding_service.py new file mode 100644 index 0000000..39feb52 --- /dev/null +++ b/meeting_memory/services/embedding_service.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from openai import OpenAI as OpenAIClient + +from meeting_memory.config import config + + +class EmbeddingService: + def __init__( + self, + model: Optional[str] = None, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + ): + self._client = OpenAIClient( + api_key=api_key or config.embedding.api_key or "not-needed", + base_url=api_base or config.embedding.api_base or None, + ) + self._model = model or config.embedding.model + + def embed_text(self, text: str) -> List[float]: + response = self._client.embeddings.create( + model=self._model, + input=text, + ) + return response.data[0].embedding + + +embedding_service = EmbeddingService() diff --git a/meeting_memory/web_demo/__init__.py b/meeting_memory/web_demo/__init__.py new file mode 100644 index 0000000..b4336d6 --- /dev/null +++ b/meeting_memory/web_demo/__init__.py @@ -0,0 +1,3 @@ +from meeting_memory.web_demo.server import run_demo_server + +__all__ = ["run_demo_server"] diff --git a/meeting_memory/web_demo/server.py b/meeting_memory/web_demo/server.py new file mode 100644 index 0000000..40e19da --- /dev/null +++ b/meeting_memory/web_demo/server.py @@ -0,0 +1,400 @@ +import json +import logging +import mimetypes +import sys +import threading +import time +import uuid +from http import HTTPStatus +from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from urllib.parse import parse_qs, urlparse + +if __package__ in (None, ""): + sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from meeting_memory.config import config +from meeting_memory.graph_store import graph_store +from meeting_memory.meeting_processor import meeting_processor, state_store + +logger = logging.getLogger(__name__) + +STATIC_DIR = Path(__file__).resolve().parent / "static" +RAW_DIR = Path(config.storage.raw_dir) +IMPORT_JOBS = {} +IMPORT_JOBS_LOCK = threading.Lock() + + +class GraphDemoHandler(SimpleHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=str(STATIC_DIR), **kwargs) + + def do_GET(self): + parsed = urlparse(self.path) + if parsed.path == "/api/dashboard": + self._handle_dashboard() + return + if parsed.path == "/api/graph": + self._handle_graph(parsed.query) + return + if parsed.path == "/api/graph-types": + self._handle_graph_types() + return + if parsed.path == "/api/graph-kinds": + self._handle_graph_kinds() + return + if parsed.path == "/api/search": + self._handle_search(parsed.query) + return + if parsed.path == "/api/meetings": + self._handle_meetings(parsed.query) + return + if parsed.path == "/api/meeting": + self._handle_meeting(parsed.query) + return + if parsed.path == "/api/import-status": + self._handle_import_status(parsed.query) + return + if parsed.path in ("/", "/index.html"): + self.path = "/index.html" + elif parsed.path == "/graph": + self.path = "/graph.html" + super().do_GET() + + def do_POST(self): + parsed = urlparse(self.path) + if parsed.path == "/api/import": + self._handle_import() + return + self.send_error(HTTPStatus.NOT_FOUND, "Unsupported endpoint") + + def log_message(self, format, *args): + logger.info("%s - %s", self.address_string(), format % args) + + def end_headers(self): + self.send_header("Cache-Control", "no-store") + super().end_headers() + + def guess_type(self, path): + guessed = super().guess_type(path) + if guessed == "application/octet-stream": + return mimetypes.guess_type(path)[0] or guessed + return guessed + + def _handle_graph(self, raw_query: str): + params = parse_qs(raw_query) + query = (params.get("q") or [""])[0].strip() + limit_nodes = self._safe_int((params.get("limit_nodes") or ["80"])[0], default=80) + limit_edges = self._safe_int((params.get("limit_edges") or ["160"])[0], default=160) + entity_types = params.get("entity_types") + kinds = params.get("kinds") + payload = graph_store.get_graph_snapshot( + query=query, + entity_types=entity_types if entity_types else None, + kinds=kinds if kinds else None, + limit_nodes=limit_nodes, + limit_edges=limit_edges, + ) + self._write_json(payload) + + def _handle_graph_types(self): + types = graph_store.get_entity_types() + self._write_json({"types": types}) + + def _handle_graph_kinds(self): + kinds = graph_store.get_graph_kinds() + self._write_json({"kinds": kinds}) + + def _handle_search(self, raw_query: str): + params = parse_qs(raw_query) + query = (params.get("q") or [""])[0].strip() + limit = self._safe_int((params.get("limit") or ["8"])[0], default=8) + payload = { + "query": query, + "results": graph_store.hybrid_search(query, limit=limit) if query else [], + } + self._write_json(payload) + + def _handle_dashboard(self): + meetings = _load_recent_meetings(limit=6) + action_items = _state_items("action_items", limit=6) + metrics = _state_items("metrics", limit=6) + series = _load_series(limit=6) + graph_stats = graph_store.get_stats() + payload = { + "graph": graph_stats, + "state": state_store.get_stats(), + "meetings": meetings, + "action_items": action_items, + "metrics": metrics, + "series": series, + "highlights": _build_highlights(meetings, action_items, metrics, graph_stats), + } + self._write_json(payload) + + def _handle_meetings(self, raw_query: str): + params = parse_qs(raw_query) + limit = self._safe_int((params.get("limit") or ["24"])[0], default=24) + self._write_json({"meetings": _load_recent_meetings(limit=limit)}) + + def _handle_meeting(self, raw_query: str): + params = parse_qs(raw_query) + filename = (params.get("filename") or [""])[0].strip() + if not filename: + self._write_json({"error": "filename is required"}, status=HTTPStatus.BAD_REQUEST) + return + + file_path = RAW_DIR / filename + if not file_path.exists() or file_path.parent != RAW_DIR: + self._write_json({"error": "meeting not found"}, status=HTTPStatus.NOT_FOUND) + return + + self._write_json(_serialize_meeting(file_path, include_content=True)) + + def _handle_import(self): + payload = self._read_json_body() + if payload is None: + self._write_json({"ok": False, "error": "invalid json body"}, status=HTTPStatus.BAD_REQUEST) + return + + text = str(payload.get("text") or "").strip() + force = bool(payload.get("force", False)) + if not text: + self._write_json({"ok": False, "error": "text is required"}, status=HTTPStatus.BAD_REQUEST) + return + + job_id = str(uuid.uuid4()) + with IMPORT_JOBS_LOCK: + IMPORT_JOBS[job_id] = { + "job_id": job_id, + "status": "queued", + "message": "任务已创建,等待处理", + "archive_path": "", + "created_at": time.time(), + "updated_at": time.time(), + "steps": [], + } + + thread = threading.Thread( + target=_run_import_job, + args=(job_id, text, force), + daemon=True, + ) + thread.start() + + self._write_json({"ok": True, "job_id": job_id, "status": "queued"}) + + def _handle_import_status(self, raw_query: str): + params = parse_qs(raw_query) + job_id = (params.get("job_id") or [""])[0].strip() + if not job_id: + self._write_json({"error": "job_id is required"}, status=HTTPStatus.BAD_REQUEST) + return + + with IMPORT_JOBS_LOCK: + payload = IMPORT_JOBS.get(job_id) + + if not payload: + self._write_json({"error": "job not found"}, status=HTTPStatus.NOT_FOUND) + return + + self._write_json(payload) + + def _read_json_body(self): + length = self._safe_int(self.headers.get("Content-Length"), default=0) + if length <= 0: + return None + + try: + body = self.rfile.read(length) + return json.loads(body.decode("utf-8")) + except Exception: + return None + + def _write_json(self, payload, status: HTTPStatus = HTTPStatus.OK): + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + @staticmethod + def _safe_int(raw_value, default: int) -> int: + try: + value = int(raw_value) + except (TypeError, ValueError): + return default + return max(0, value) + + +def run_demo_server(host: str = "127.0.0.1", port: int = 8765) -> None: + server = ThreadingHTTPServer((host, port), GraphDemoHandler) + logger.info("Graph demo server started at http://%s:%s", host, port) + print(f"Graph demo server started: http://{host}:{port}") + print("Press Ctrl+C to stop.") + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nServer stopped.") + finally: + server.server_close() + + +def _run_import_job(job_id: str, text: str, force: bool) -> None: + def update(status: str | None = None, message: str | None = None, *, append_step: bool = False): + with IMPORT_JOBS_LOCK: + job = IMPORT_JOBS.get(job_id) + if not job: + return + if status: + job["status"] = status + if message: + job["message"] = message + if append_step: + job["steps"].append(message) + job["updated_at"] = time.time() + + def progress(step: int, total: int, message: str): + update( + "running", + f"步骤 {step}/{total}:{message}", + append_step=True, + ) + + update("running", "开始处理会议文本", append_step=True) + try: + archive_path = meeting_processor.process_meeting_text( + text, + force=force, + interactive=False, + progress_callback=progress, + ) + if not archive_path: + update("error", "处理被跳过:可能是重复内容,或结构化抽取失败", append_step=True) + return + + with IMPORT_JOBS_LOCK: + job = IMPORT_JOBS.get(job_id) + if job: + job["status"] = "done" + job["message"] = "导入完成" + job["archive_path"] = archive_path + job["updated_at"] = time.time() + job["dashboard"] = { + "graph": graph_store.get_stats(), + "state": state_store.get_stats(), + "meetings": _load_recent_meetings(limit=6), + } + job["steps"].append("导入完成") + except Exception as exc: + logger.exception("Meeting import failed") + update("error", f"处理失败:{exc}", append_step=True) + + +def _load_recent_meetings(limit: int = 6): + if not RAW_DIR.exists(): + return [] + + files = sorted( + RAW_DIR.glob("*.md"), + key=lambda path: path.stat().st_mtime, + reverse=True, + ) + return [_serialize_meeting(path) for path in files[:limit]] + + +def _serialize_meeting(path: Path, include_content: bool = False): + raw_text = path.read_text(encoding="utf-8") + title = "" + date = "" + lines = raw_text.splitlines() + for line in lines[:12]: + if line.startswith('title: "'): + title = line[len('title: "') : -1] + elif line.startswith('date: "'): + date = line[len('date: "') : -1] + + content_start = 0 + for idx, line in enumerate(lines): + if line.startswith("# "): + content_start = idx + 2 + if not title: + title = line[2:].strip() + break + + body = "\n".join(lines[content_start:]).strip() + snippet = body[:180] + ("..." if len(body) > 180 else "") + payload = { + "filename": path.name, + "title": title or path.stem, + "date": date, + "snippet": snippet, + "updated_at": int(path.stat().st_mtime), + } + if include_content: + payload["content"] = body + return payload + + +def _state_items(key: str, limit: int = 6): + bucket = getattr(state_store, "_state", {}).get(key, {}) + items = [] + for item in bucket.values(): + latest = item.get("latest", {}) + items.append({**item, "latest": latest}) + items.sort(key=lambda row: str(row.get("latest", {}).get("date", "")), reverse=True) + return items[:limit] + + +def _load_series(limit: int = 6): + series = getattr(state_store, "_state", {}).get("meeting_series", {}) + rows = [] + for name, payload in series.items(): + rows.append( + { + "name": name, + "latest_date": payload.get("latest_date", ""), + "processed_titles": payload.get("processed_titles", []), + "meeting_count": len(payload.get("processed_titles", [])), + } + ) + rows.sort(key=lambda row: row.get("latest_date", ""), reverse=True) + return rows[:limit] + + +def _build_highlights(meetings, action_items, metrics, graph_stats): + latest_meeting = meetings[0] if meetings else {} + top_action = action_items[0] if action_items else {} + top_metric = metrics[0] if metrics else {} + return [ + { + "label": "最近归档", + "value": latest_meeting.get("title", "暂无会议"), + "meta": latest_meeting.get("date", ""), + }, + { + "label": "待跟进事项", + "value": str(len(action_items)), + "meta": top_action.get("task", ""), + }, + { + "label": "图谱节点", + "value": str(graph_stats.get("entities", 0)), + "meta": "Neo4j 实体总数", + }, + { + "label": "关键指标", + "value": str(len(metrics)), + "meta": top_metric.get("metric_name", ""), + }, + ] + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + run_demo_server() diff --git a/meeting_memory/web_demo/static/app.js b/meeting_memory/web_demo/static/app.js new file mode 100644 index 0000000..3fb5c6f --- /dev/null +++ b/meeting_memory/web_demo/static/app.js @@ -0,0 +1,399 @@ +const dashboardUrl = "/api/dashboard"; +let currentImportJobId = null; +let importPollTimer = null; + +const highlightGrid = document.getElementById("highlightGrid"); +const statsList = document.getElementById("statsList"); +const meetingCards = document.getElementById("meetingCards"); +const actionList = document.getElementById("actionList"); +const metricList = document.getElementById("metricList"); +const seriesList = document.getElementById("seriesList"); +const searchForm = document.getElementById("searchForm"); +const searchInput = document.getElementById("searchInput"); +const searchResults = document.getElementById("searchResults"); +const refreshDashboardBtn = document.getElementById("refreshDashboardBtn"); +const importForm = document.getElementById("importForm"); +const importFieldset = document.getElementById("importFieldset"); +const importSubmitBtn = document.getElementById("importSubmitBtn"); +const importFile = document.getElementById("importFile"); +const importText = document.getElementById("importText"); +const importForce = document.getElementById("importForce"); +const importStatus = document.getElementById("importStatus"); +const importProgress = document.getElementById("importProgress"); +const meetingDialog = document.getElementById("meetingDialog"); +const closeDialogBtn = document.getElementById("closeDialogBtn"); +const dialogTitle = document.getElementById("dialogTitle"); +const dialogMeta = document.getElementById("dialogMeta"); +const dialogContent = document.getElementById("dialogContent"); + +function escapeHtml(value) { + return String(value ?? "") + .replaceAll("&", "&") + .replaceAll("<", "<") + .replaceAll(">", ">") + .replaceAll('"', """) + .replaceAll("'", "'"); +} + +function emptyMarkup(message) { + return `
${escapeHtml(item.label)}
+ ${escapeHtml(item.value)} +${escapeHtml(item.meta || "")}
+${escapeHtml(c.label)}
+${escapeHtml(item.snippet || "暂无摘要")}
+${escapeHtml(item.assignee || "未分配")} · ${escapeHtml(item.series || "未归类")}
+${escapeHtml(item.owner || "未指定负责人")}
+ +最近:${escapeHtml(item.latest_date || "未知")}
+${escapeHtml(item.text || "")}
+ +Related
+${h(item.text || "")}
+${h(node.description || node.summary || "暂无描述")}
+${h(node.fact || node.description || "暂无描述")}
+${h(node.description || "暂无描述")}
+${h(node.kind)}
+${h(edge.fact || edge.description || edge.predicate || "")}
+Edge
+${h(edge.fact || edge.description || "暂无补充描述")}
+Recent Archives
+Action Items
+Metrics
+Series
+