import json import logging import os import re import secrets from datetime import datetime from typing import Any, Dict, List, Optional from sqlmodel import Session, select from core.settings import BOTS_WORKSPACE_ROOT, TOPIC_MCP_INTERNAL_URL from models.bot import BotInstance from models.topic import TopicItem, TopicTopic logger = logging.getLogger("dashboard.topic_mcp") BOT_ID_PATTERN = re.compile(r"^[A-Za-z0-9_]+$") TOPIC_MCP_SERVER_NAME = "topic_mcp" TOPIC_MCP_TOKEN_HEADER = "x-topic-mcp-token" TOPIC_MCP_DEFAULT_URL = TOPIC_MCP_INTERNAL_URL TOPIC_MCP_DEFAULT_TIMEOUT = 30 TOPIC_MCP_PROTOCOL_VERSION = "2025-03-26" TOPIC_DEDUPE_WINDOW_SECONDS = 10 * 60 TOPIC_LEVEL_SET = {"info", "warn", "error", "success"} _TOPIC_KEY_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}$") def _bot_data_root(bot_id: str) -> str: return os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot") def _config_json_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "config.json") def _read_bot_config(bot_id: str) -> Dict[str, Any]: path = _config_json_path(bot_id) if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, dict) else {} except Exception: return {} def _write_bot_config(bot_id: str, config_data: Dict[str, Any]) -> None: path = _config_json_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def _dict_get_ci(raw: Any, key: str) -> Any: if not isinstance(raw, dict): return None wanted = str(key or "").strip().lower() for k, v in raw.items(): if str(k or "").strip().lower() == wanted: return v return None def _as_bool(value: Any) -> bool: if isinstance(value, bool): return value text = str(value or "").strip().lower() return text in {"1", "true", "yes", "on", "y"} def _extract_topic_mcp_token(server_cfg: Any) -> str: headers = server_cfg.get("headers") if isinstance(server_cfg, dict) else None return str(_dict_get_ci(headers, TOPIC_MCP_TOKEN_HEADER) or "").strip() def _generate_topic_mcp_token(bot_id: str) -> str: return f"{bot_id}.{secrets.token_urlsafe(24)}" def _build_locked_topic_mcp_server(bot_id: str, token: str) -> Dict[str, Any]: fixed_token = str(token or "").strip() or _generate_topic_mcp_token(bot_id) return { "type": "streamableHttp", "url": TOPIC_MCP_DEFAULT_URL, "headers": {TOPIC_MCP_TOKEN_HEADER: fixed_token}, "toolTimeout": TOPIC_MCP_DEFAULT_TIMEOUT, } def _annotate_locked_mcp_servers(raw_servers: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: rows: Dict[str, Dict[str, Any]] = {} for name, cfg in raw_servers.items(): if not isinstance(cfg, dict): continue row = dict(cfg) row["locked"] = name == TOPIC_MCP_SERVER_NAME rows[name] = row return rows def _ensure_topic_mcp_server(bot_id: str, config_data: Optional[Dict[str, Any]] = None, persist: bool = True) -> Dict[str, Any]: working = config_data if isinstance(config_data, dict) else _read_bot_config(bot_id) tools_cfg = working.get("tools") if not isinstance(tools_cfg, dict): tools_cfg = {} mcp_servers = tools_cfg.get("mcpServers") if not isinstance(mcp_servers, dict): mcp_servers = {} existing_server = mcp_servers.get(TOPIC_MCP_SERVER_NAME) existing_token = _extract_topic_mcp_token(existing_server) locked_server = _build_locked_topic_mcp_server(bot_id, existing_token) changed = mcp_servers.get(TOPIC_MCP_SERVER_NAME) != locked_server mcp_servers[TOPIC_MCP_SERVER_NAME] = locked_server tools_cfg["mcpServers"] = mcp_servers working["tools"] = tools_cfg if persist and changed: _write_bot_config(bot_id, working) return locked_server def _resolve_topic_mcp_bot_id_by_token(session: Session, token: str) -> Optional[str]: incoming = str(token or "").strip() if not incoming: return None candidates: List[str] = [] hinted_bot_id = incoming.split(".", 1)[0].strip() if hinted_bot_id and BOT_ID_PATTERN.fullmatch(hinted_bot_id): candidates.append(hinted_bot_id) for bot in session.exec(select(BotInstance)).all(): if bot.id not in candidates: candidates.append(bot.id) for bot_id in candidates: config_data = _read_bot_config(bot_id) tools_cfg = config_data.get("tools") if not isinstance(tools_cfg, dict): continue mcp_servers = tools_cfg.get("mcpServers") if not isinstance(mcp_servers, dict): continue expected = _extract_topic_mcp_token(mcp_servers.get(TOPIC_MCP_SERVER_NAME)) if expected and secrets.compare_digest(expected, incoming): return bot_id return None def _has_topic_mcp_server(bot_id: str) -> bool: config_data = _read_bot_config(bot_id) tools_cfg = config_data.get("tools") if not isinstance(tools_cfg, dict): return False mcp_servers = tools_cfg.get("mcpServers") if not isinstance(mcp_servers, dict): return False token = _extract_topic_mcp_token(mcp_servers.get(TOPIC_MCP_SERVER_NAME)) return bool(token) def _normalize_topic_key(raw: Any) -> str: value = str(raw or "").strip().lower() if not value: return "" return value def _ensure_topic_defaults(session: Session, bot_id: str) -> None: # Deprecated: topic feed global switch/fallback removed. # Keep as no-op for call-site compatibility. _ = session _ = bot_id return None def _parse_json_dict(raw: str) -> Dict[str, Any]: text = str(raw or "").strip() if not text: return {} try: data = json.loads(text) return data if isinstance(data, dict) else {} except Exception: return {} def _parse_json_list(raw: str) -> List[Any]: text = str(raw or "").strip() if not text: return [] try: data = json.loads(text) except Exception: return [] return data if isinstance(data, list) else [] def _topic_to_dict(row: TopicTopic) -> Dict[str, Any]: return { "id": row.id, "bot_id": row.bot_id, "topic_key": str(row.topic_key or "").strip().lower(), "name": row.name or "", "description": row.description or "", "is_active": bool(row.is_active), "routing": _parse_json_dict(row.routing_json or "{}"), "view_schema": _parse_json_dict(row.view_schema_json or "{}"), "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } def _list_topics(session: Session, bot_id: str) -> List[Dict[str, Any]]: rows = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .order_by(TopicTopic.is_active.desc(), TopicTopic.topic_key.asc()) ).all() return [_topic_to_dict(row) for row in rows] def _topic_item_to_dict(row: TopicItem) -> Dict[str, Any]: return { "id": row.id, "bot_id": row.bot_id, "topic_key": str(row.topic_key or "").strip().lower(), "title": row.title or "", "content": row.content or "", "level": str(row.level or "info").strip().lower(), "tags": _parse_json_list(row.tags_json or "[]"), "view": _parse_json_dict(row.view_json or "{}"), "source": row.source or "mcp", "dedupe_key": row.dedupe_key or "", "is_read": bool(row.is_read), "created_at": row.created_at.isoformat() if row.created_at else None, } def _topic_get_row(session: Session, bot_id: str, topic_key: str) -> Optional[TopicTopic]: normalized = _normalize_topic_key(topic_key) if not normalized: return None return session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.topic_key == normalized) .limit(1) ).first() def _normalize_topic_keywords(raw: Any) -> List[str]: rows: List[str] = [] if isinstance(raw, list): for item in raw: text = str(item or "").strip().lower() if text and text not in rows: rows.append(text) elif isinstance(raw, str): text = raw.strip().lower() if text: rows.append(text) return rows def _topic_filter_reason(payload: Dict[str, Any]) -> str: if _as_bool(payload.get("is_progress")): return "progress message is filtered" if _as_bool(payload.get("is_tool_hint")): return "tool hint message is filtered" source = str(payload.get("source") or payload.get("type") or "").strip().lower() if source in {"progress", "tool_hint", "sendprogress", "sendtoolhints"}: return f"{source} message is filtered" return "" def _topic_route_pick( session: Session, bot_id: str, payload: Dict[str, Any], requested_topic_key: str = "", ) -> Dict[str, Any]: active_topics = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.is_active == True) .order_by(TopicTopic.topic_key.asc()) ).all() if not active_topics: return { "matched": False, "topic_key": None, "confidence": 1.0, "reason": "no active topic configured", } req_key = _normalize_topic_key(requested_topic_key or payload.get("topic_key") or payload.get("topic")) if req_key: row = _topic_get_row(session, bot_id, req_key) if row and bool(row.is_active): return { "matched": True, "topic_key": req_key, "confidence": 0.99, "reason": "explicit topic key accepted", } return { "matched": False, "topic_key": None, "confidence": 0.72, "reason": f"requested topic {req_key} unavailable or inactive", } text = " ".join( [ str(payload.get("title") or "").strip(), str(payload.get("content") or payload.get("text") or "").strip(), " ".join([str(v or "").strip() for v in (payload.get("tags") or [])]), ] ).strip().lower() if not text: return { "matched": False, "topic_key": None, "confidence": 1.0, "reason": "no routing evidence", } best_key = "" best_score = -10.0 best_reason = "no topic matched" matched_include = False for topic in active_topics: key = _normalize_topic_key(topic.topic_key) if not key: continue routing = _parse_json_dict(topic.routing_json or "{}") include_when = _normalize_topic_keywords(routing.get("include_when")) exclude_when = _normalize_topic_keywords(routing.get("exclude_when")) priority_raw = routing.get("priority", 0) try: priority = max(0, min(int(priority_raw), 100)) except Exception: priority = 0 include_hits = [kw for kw in include_when if kw in text] exclude_hits = [kw for kw in exclude_when if kw in text] if not include_hits: continue matched_include = True score = float(len(include_hits) * 2 - len(exclude_hits) * 3) + (priority / 1000.0) if score > best_score: best_score = score best_key = key if include_hits: best_reason = f"matched include_when: {', '.join(include_hits[:3])}" elif exclude_hits: best_reason = f"matched exclude_when: {', '.join(exclude_hits[:3])}" else: best_reason = "no include/exclude match, used highest priority active topic" if not matched_include: return { "matched": False, "topic_key": None, "confidence": 0.68, "reason": "no include_when matched", } if best_score <= 0: return { "matched": False, "topic_key": None, "confidence": 0.68, "reason": "no positive routing score", } confidence = min(0.95, max(0.61, 0.61 + best_score / 12.0)) return { "matched": True, "topic_key": best_key, "confidence": round(confidence, 3), "reason": best_reason, } def _topic_publish_internal(session: Session, bot_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: filter_reason = _topic_filter_reason(payload) if filter_reason: return { "published": False, "skipped": True, "reason": filter_reason, } title = str(payload.get("title") or "").strip() content = str(payload.get("content") or payload.get("text") or "").strip() if not title and not content: return { "published": False, "skipped": True, "reason": "empty title/content", } level = str(payload.get("level") or "info").strip().lower() if level not in TOPIC_LEVEL_SET: level = "info" tags = payload.get("tags") tags_rows: List[str] = [] if isinstance(tags, list): for tag in tags: text = str(tag or "").strip() if text and text not in tags_rows: tags_rows.append(text[:64]) route_result = _topic_route_pick(session, bot_id, payload, requested_topic_key=str(payload.get("topic_key") or "")) if not bool(route_result.get("matched")): return { "published": False, "skipped": True, "reason": str(route_result.get("reason") or "no topic matched"), "route": route_result, } topic_key = _normalize_topic_key(route_result.get("topic_key")) if not topic_key: return { "published": False, "skipped": True, "reason": "invalid topic route result", "route": route_result, } row = _topic_get_row(session, bot_id, topic_key) if not row or not bool(row.is_active): return { "published": False, "skipped": True, "reason": f"topic {topic_key} unavailable or inactive", "route": route_result, } dedupe_key = str(payload.get("dedupe_key") or "").strip() if dedupe_key: existing = session.exec( select(TopicItem) .where(TopicItem.bot_id == bot_id) .where(TopicItem.dedupe_key == dedupe_key) .order_by(TopicItem.id.desc()) .limit(1) ).first() if existing and existing.created_at: age_s = (datetime.utcnow() - existing.created_at).total_seconds() if age_s <= TOPIC_DEDUPE_WINDOW_SECONDS: return { "published": False, "deduped": True, "dedupe_window_seconds": TOPIC_DEDUPE_WINDOW_SECONDS, "topic_key": _normalize_topic_key(existing.topic_key), "reason": "dedupe_key hit within dedupe window", "item": _topic_item_to_dict(existing), } view = payload.get("view") view_json = json.dumps(view, ensure_ascii=False) if isinstance(view, dict) else None source = str(payload.get("source") or "mcp").strip().lower() or "mcp" now = datetime.utcnow() item = TopicItem( bot_id=bot_id, topic_key=topic_key, title=title[:2000], content=content[:20000], level=level, tags_json=json.dumps(tags_rows, ensure_ascii=False) if tags_rows else None, view_json=view_json, source=source[:64], dedupe_key=dedupe_key[:200] if dedupe_key else None, is_read=False, created_at=now, ) session.add(item) session.commit() session.refresh(item) return { "published": True, "topic_key": topic_key, "item": _topic_item_to_dict(item), "route": route_result, } def _jsonrpc_success(rpc_id: Any, result: Any) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": rpc_id, "result": result, } def _jsonrpc_error(rpc_id: Any, code: int, message: str, data: Any = None) -> Dict[str, Any]: payload: Dict[str, Any] = { "jsonrpc": "2.0", "id": rpc_id, "error": { "code": int(code), "message": str(message or "unknown error"), }, } if data is not None: payload["error"]["data"] = data return payload def _mcp_tool_result(structured: Dict[str, Any], is_error: bool = False) -> Dict[str, Any]: return { "content": [ { "type": "text", "text": json.dumps(structured, ensure_ascii=False), } ], "structuredContent": structured, "isError": bool(is_error), } def _topic_mcp_tools() -> List[Dict[str, Any]]: return [ { "name": "topic_list_topics", "description": "List available topics for the current bot.", "inputSchema": { "type": "object", "properties": { "include_inactive": {"type": "boolean"}, }, "additionalProperties": False, }, }, { "name": "topic_get_schema", "description": "Get allowed view schema and optional topic-specific schema.", "inputSchema": { "type": "object", "properties": { "topic_key": {"type": "string"}, }, "additionalProperties": False, }, }, { "name": "topic_route", "description": "Route candidate content to a topic and decide if publish is needed.", "inputSchema": { "type": "object", "properties": { "topic_key": {"type": "string"}, "title": {"type": "string"}, "content": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "is_progress": {"type": "boolean"}, "is_tool_hint": {"type": "boolean"}, "source": {"type": "string"}, }, "additionalProperties": True, }, }, { "name": "topic_publish", "description": "Publish one item into topic feed with dedupe support.", "inputSchema": { "type": "object", "properties": { "topic_key": {"type": "string"}, "title": {"type": "string"}, "content": {"type": "string"}, "level": {"type": "string"}, "tags": {"type": "array", "items": {"type": "string"}}, "view": {"type": "object"}, "dedupe_key": {"type": "string"}, "source": {"type": "string"}, "is_progress": {"type": "boolean"}, "is_tool_hint": {"type": "boolean"}, }, "additionalProperties": True, }, }, ] def _topic_mcp_list_topics(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]: _ensure_topic_defaults(session, bot_id) include_inactive = _as_bool(args.get("include_inactive")) or ("include_inactive" not in args) topics = _list_topics(session, bot_id) if not include_inactive: topics = [row for row in topics if bool(row.get("is_active"))] return { "bot_id": bot_id, "topics": topics, } def _topic_mcp_get_schema(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]: _ensure_topic_defaults(session, bot_id) topic_key = _normalize_topic_key(args.get("topic_key")) topic_payload: Optional[Dict[str, Any]] = None if topic_key: row = _topic_get_row(session, bot_id, topic_key) if row: topic_payload = _topic_to_dict(row) return { "version": "v1", "view_types": ["markdown", "card", "table", "checklist", "metric", "timeline"], "topic": topic_payload, "view_schema": { "type": "object", "description": "Declarative view payload only. Scripts and unsafe HTML are not allowed.", }, "publish_constraints": { "level": sorted(list(TOPIC_LEVEL_SET)), "dedupe_window_seconds": TOPIC_DEDUPE_WINDOW_SECONDS, }, } def _topic_mcp_route(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]: _ensure_topic_defaults(session, bot_id) filter_reason = _topic_filter_reason(args) if filter_reason: return { "should_publish": False, "topic_key": None, "confidence": 1.0, "reason": filter_reason, } title = str(args.get("title") or "").strip() content = str(args.get("content") or args.get("text") or "").strip() if not title and not content: return { "should_publish": False, "topic_key": None, "confidence": 1.0, "reason": "empty title/content", } route = _topic_route_pick(session, bot_id, args, requested_topic_key=str(args.get("topic_key") or "")) return { "should_publish": bool(route.get("matched")), "topic_key": route.get("topic_key"), "confidence": route.get("confidence"), "reason": route.get("reason"), } def _topic_mcp_publish(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]: return _topic_publish_internal(session, bot_id, args) def _dispatch_topic_mcp_method(session: Session, bot_id: str, method: str, params: Dict[str, Any]) -> Any: if method == "initialize": return { "protocolVersion": TOPIC_MCP_PROTOCOL_VERSION, "capabilities": { "tools": {}, }, "serverInfo": { "name": TOPIC_MCP_SERVER_NAME, "version": "0.1.0", }, } if method in {"notifications/initialized", "initialized"}: return None if method == "ping": return {} if method == "tools/list": return { "tools": _topic_mcp_tools(), } if method != "tools/call": raise KeyError(f"Unknown method: {method}") tool_name = str(params.get("name") or "").strip() arguments = params.get("arguments") if not isinstance(arguments, dict): arguments = {} if tool_name == "topic_list_topics": return _mcp_tool_result(_topic_mcp_list_topics(session, bot_id, arguments)) if tool_name == "topic_get_schema": return _mcp_tool_result(_topic_mcp_get_schema(session, bot_id, arguments)) if tool_name == "topic_route": return _mcp_tool_result(_topic_mcp_route(session, bot_id, arguments)) if tool_name == "topic_publish": return _mcp_tool_result(_topic_mcp_publish(session, bot_id, arguments)) return _mcp_tool_result( { "error": f"unknown tool: {tool_name}", "available_tools": [tool["name"] for tool in _topic_mcp_tools()], }, is_error=True, ) def _handle_topic_mcp_rpc_item(session: Session, bot_id: str, item: Any) -> Optional[Dict[str, Any]]: if not isinstance(item, dict): return _jsonrpc_error(None, -32600, "Invalid Request") rpc_id = item.get("id") method = str(item.get("method") or "").strip() if not method: return _jsonrpc_error(rpc_id, -32600, "Invalid Request: method is required") params = item.get("params") if params is None: params = {} if not isinstance(params, dict): return _jsonrpc_error(rpc_id, -32602, "Invalid params") try: result = _dispatch_topic_mcp_method(session, bot_id, method, params) except KeyError as exc: return _jsonrpc_error(rpc_id, -32601, str(exc)) except ValueError as exc: return _jsonrpc_error(rpc_id, -32602, str(exc)) except Exception as exc: logger.exception("topic_mcp method failed: %s", method) return _jsonrpc_error(rpc_id, -32000, f"topic_mcp execution failed: {type(exc).__name__}: {exc}") if rpc_id is None: return None return _jsonrpc_success(rpc_id, result)