import json import re from datetime import datetime from typing import Any, Dict, List, Optional from sqlmodel import Session, select from models.topic import TopicItem, TopicTopic TOPIC_DEDUPE_WINDOW_SECONDS = 10 * 60 TOPIC_LEVEL_SET = {"info", "warn", "error", "success"} _TOPIC_KEY_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}$") def _as_bool(value: Any) -> bool: if isinstance(value, bool): return value text = str(value or "").strip().lower() return text in {"1", "true", "yes", "on", "y"} def _normalize_topic_key(raw: Any) -> str: value = str(raw or "").strip().lower() if not value: return "" return value def _parse_json_dict(raw: str) -> Dict[str, Any]: text = str(raw or "").strip() if not text: return {} try: data = json.loads(text) return data if isinstance(data, dict) else {} except Exception: return {} def _parse_json_list(raw: str) -> List[Any]: text = str(raw or "").strip() if not text: return [] try: data = json.loads(text) except Exception: return [] return data if isinstance(data, list) else [] def _topic_to_dict(row: TopicTopic) -> Dict[str, Any]: return { "id": row.id, "bot_id": row.bot_id, "topic_key": str(row.topic_key or "").strip().lower(), "name": row.name or "", "description": row.description or "", "is_active": bool(row.is_active), "routing": _parse_json_dict(row.routing_json or "{}"), "view_schema": _parse_json_dict(row.view_schema_json or "{}"), "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } def _list_topics(session: Session, bot_id: str) -> List[Dict[str, Any]]: rows = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .order_by(TopicTopic.is_active.desc(), TopicTopic.topic_key.asc()) ).all() return [_topic_to_dict(row) for row in rows] def _topic_item_to_dict(row: TopicItem) -> Dict[str, Any]: return { "id": row.id, "bot_id": row.bot_id, "topic_key": str(row.topic_key or "").strip().lower(), "title": row.title or "", "content": row.content or "", "level": str(row.level or "info").strip().lower(), "tags": _parse_json_list(row.tags_json or "[]"), "view": _parse_json_dict(row.view_json or "{}"), "source": row.source or "dashboard", "dedupe_key": row.dedupe_key or "", "is_read": bool(row.is_read), "created_at": row.created_at.isoformat() if row.created_at else None, } def _topic_get_row(session: Session, bot_id: str, topic_key: str) -> Optional[TopicTopic]: normalized = _normalize_topic_key(topic_key) if not normalized: return None return session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.topic_key == normalized) .limit(1) ).first() def _normalize_topic_keywords(raw: Any) -> List[str]: rows: List[str] = [] if isinstance(raw, list): for item in raw: text = str(item or "").strip().lower() if text and text not in rows: rows.append(text) elif isinstance(raw, str): text = raw.strip().lower() if text: rows.append(text) return rows def _topic_filter_reason(payload: Dict[str, Any]) -> str: if _as_bool(payload.get("is_progress")): return "progress message is filtered" if _as_bool(payload.get("is_tool_hint")): return "tool hint message is filtered" source = str(payload.get("source") or payload.get("type") or "").strip().lower() if source in {"progress", "tool_hint", "sendprogress", "sendtoolhints"}: return f"{source} message is filtered" return "" def _topic_route_pick( session: Session, bot_id: str, payload: Dict[str, Any], requested_topic_key: str = "", ) -> Dict[str, Any]: active_topics = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.is_active == True) .order_by(TopicTopic.topic_key.asc()) ).all() if not active_topics: return { "matched": False, "topic_key": None, "confidence": 1.0, "reason": "no active topic configured", } req_key = _normalize_topic_key(requested_topic_key or payload.get("topic_key") or payload.get("topic")) if req_key: row = _topic_get_row(session, bot_id, req_key) if row and bool(row.is_active): return { "matched": True, "topic_key": req_key, "confidence": 0.99, "reason": "explicit topic key accepted", } return { "matched": False, "topic_key": None, "confidence": 0.72, "reason": f"requested topic {req_key} unavailable or inactive", } text = " ".join( [ str(payload.get("title") or "").strip(), str(payload.get("content") or payload.get("text") or "").strip(), " ".join([str(v or "").strip() for v in (payload.get("tags") or [])]), ] ).strip().lower() if not text: return { "matched": False, "topic_key": None, "confidence": 1.0, "reason": "no routing evidence", } best_key = "" best_score = -10.0 best_reason = "no topic matched" matched_include = False for topic in active_topics: key = _normalize_topic_key(topic.topic_key) if not key: continue routing = _parse_json_dict(topic.routing_json or "{}") include_when = _normalize_topic_keywords(routing.get("include_when")) exclude_when = _normalize_topic_keywords(routing.get("exclude_when")) priority_raw = routing.get("priority", 0) try: priority = max(0, min(int(priority_raw), 100)) except Exception: priority = 0 include_hits = [kw for kw in include_when if kw in text] exclude_hits = [kw for kw in exclude_when if kw in text] if not include_hits: continue matched_include = True score = float(len(include_hits) * 2 - len(exclude_hits) * 3) + (priority / 1000.0) if score > best_score: best_score = score best_key = key if include_hits: best_reason = f"matched include_when: {', '.join(include_hits[:3])}" elif exclude_hits: best_reason = f"matched exclude_when: {', '.join(exclude_hits[:3])}" else: best_reason = "no include/exclude match, used highest priority active topic" if not matched_include: return { "matched": False, "topic_key": None, "confidence": 0.68, "reason": "no include_when matched", } if best_score <= 0: return { "matched": False, "topic_key": None, "confidence": 0.68, "reason": "no positive routing score", } confidence = min(0.95, max(0.61, 0.61 + best_score / 12.0)) return { "matched": True, "topic_key": best_key, "confidence": round(confidence, 3), "reason": best_reason, } def _topic_publish_internal(session: Session, bot_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: filter_reason = _topic_filter_reason(payload) if filter_reason: return { "published": False, "skipped": True, "reason": filter_reason, } title = str(payload.get("title") or "").strip() content = str(payload.get("content") or payload.get("text") or "").strip() if not title and not content: return { "published": False, "skipped": True, "reason": "empty title/content", } level = str(payload.get("level") or "info").strip().lower() if level not in TOPIC_LEVEL_SET: level = "info" tags = payload.get("tags") tags_rows: List[str] = [] if isinstance(tags, list): for tag in tags: text = str(tag or "").strip() if text and text not in tags_rows: tags_rows.append(text[:64]) route_result = _topic_route_pick(session, bot_id, payload, requested_topic_key=str(payload.get("topic_key") or "")) if not bool(route_result.get("matched")): return { "published": False, "skipped": True, "reason": str(route_result.get("reason") or "no topic matched"), "route": route_result, } topic_key = _normalize_topic_key(route_result.get("topic_key")) if not topic_key: return { "published": False, "skipped": True, "reason": "invalid topic route result", "route": route_result, } row = _topic_get_row(session, bot_id, topic_key) if not row or not bool(row.is_active): return { "published": False, "skipped": True, "reason": f"topic {topic_key} unavailable or inactive", "route": route_result, } dedupe_key = str(payload.get("dedupe_key") or "").strip() if dedupe_key: existing = session.exec( select(TopicItem) .where(TopicItem.bot_id == bot_id) .where(TopicItem.dedupe_key == dedupe_key) .order_by(TopicItem.id.desc()) .limit(1) ).first() if existing and existing.created_at: age_s = (datetime.utcnow() - existing.created_at).total_seconds() if age_s <= TOPIC_DEDUPE_WINDOW_SECONDS: return { "published": False, "deduped": True, "dedupe_window_seconds": TOPIC_DEDUPE_WINDOW_SECONDS, "topic_key": _normalize_topic_key(existing.topic_key), "reason": "dedupe_key hit within dedupe window", "item": _topic_item_to_dict(existing), } view = payload.get("view") view_json = json.dumps(view, ensure_ascii=False) if isinstance(view, dict) else None source = str(payload.get("source") or "dashboard").strip().lower() or "dashboard" now = datetime.utcnow() item = TopicItem( bot_id=bot_id, topic_key=topic_key, title=title[:2000], content=content[:20000], level=level, tags_json=json.dumps(tags_rows, ensure_ascii=False) if tags_rows else None, view_json=view_json, source=source[:64], dedupe_key=dedupe_key[:200] if dedupe_key else None, is_read=False, created_at=now, ) session.add(item) session.commit() session.refresh(item) return { "published": True, "topic_key": topic_key, "item": _topic_item_to_dict(item), "route": route_result, }