import json from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import JSONResponse, Response from pydantic import BaseModel from sqlalchemy import func from sqlmodel import Session, select from core.database import get_session from models.bot import BotInstance from models.topic import TopicItem, TopicTopic from services.topic_service import ( TOPIC_MCP_TOKEN_HEADER, _TOPIC_KEY_RE, _handle_topic_mcp_rpc_item, _jsonrpc_error, _list_topics, _normalize_topic_key, _resolve_topic_mcp_bot_id_by_token, _topic_item_to_dict, _topic_to_dict, ) router = APIRouter() def _count_topic_items( session: Session, bot_id: str, topic_key: Optional[str] = None, unread_only: bool = False, ) -> int: stmt = select(func.count()).select_from(TopicItem).where(TopicItem.bot_id == bot_id) normalized_topic_key = _normalize_topic_key(topic_key or "") if normalized_topic_key: stmt = stmt.where(TopicItem.topic_key == normalized_topic_key) if unread_only: stmt = stmt.where(TopicItem.is_read == False) # noqa: E712 value = session.exec(stmt).one() return int(value or 0) class TopicCreateRequest(BaseModel): topic_key: str name: Optional[str] = None description: Optional[str] = None is_active: bool = True routing: Optional[Dict[str, Any]] = None view_schema: Optional[Dict[str, Any]] = None class TopicUpdateRequest(BaseModel): name: Optional[str] = None description: Optional[str] = None is_active: Optional[bool] = None routing: Optional[Dict[str, Any]] = None view_schema: Optional[Dict[str, Any]] = None @router.get("/api/bots/{bot_id}/topics") def list_bot_topics(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return _list_topics(session, bot_id) @router.post("/api/bots/{bot_id}/topics") def create_bot_topic(bot_id: str, payload: TopicCreateRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") topic_key = _normalize_topic_key(payload.topic_key) if not topic_key: raise HTTPException(status_code=400, detail="topic_key is required") if not _TOPIC_KEY_RE.fullmatch(topic_key): raise HTTPException(status_code=400, detail="invalid topic_key") exists = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.topic_key == topic_key) .limit(1) ).first() if exists: raise HTTPException(status_code=400, detail=f"Topic already exists: {topic_key}") now = datetime.utcnow() row = TopicTopic( bot_id=bot_id, topic_key=topic_key, name=str(payload.name or topic_key).strip() or topic_key, description=str(payload.description or "").strip(), is_active=bool(payload.is_active), is_default_fallback=False, routing_json=json.dumps(payload.routing or {}, ensure_ascii=False), view_schema_json=json.dumps(payload.view_schema or {}, ensure_ascii=False), created_at=now, updated_at=now, ) session.add(row) session.commit() session.refresh(row) return _topic_to_dict(row) @router.put("/api/bots/{bot_id}/topics/{topic_key}") def update_bot_topic(bot_id: str, topic_key: str, payload: TopicUpdateRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") normalized_key = _normalize_topic_key(topic_key) if not normalized_key: raise HTTPException(status_code=400, detail="topic_key is required") row = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.topic_key == normalized_key) .limit(1) ).first() if not row: raise HTTPException(status_code=404, detail="Topic not found") update_data = payload.model_dump(exclude_unset=True) if "name" in update_data: row.name = str(update_data.get("name") or "").strip() or row.topic_key if "description" in update_data: row.description = str(update_data.get("description") or "").strip() if "is_active" in update_data: row.is_active = bool(update_data.get("is_active")) if "routing" in update_data: row.routing_json = json.dumps(update_data.get("routing") or {}, ensure_ascii=False) if "view_schema" in update_data: row.view_schema_json = json.dumps(update_data.get("view_schema") or {}, ensure_ascii=False) row.is_default_fallback = False row.updated_at = datetime.utcnow() session.add(row) session.commit() session.refresh(row) return _topic_to_dict(row) @router.delete("/api/bots/{bot_id}/topics/{topic_key}") def delete_bot_topic(bot_id: str, topic_key: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") normalized_key = _normalize_topic_key(topic_key) if not normalized_key: raise HTTPException(status_code=400, detail="topic_key is required") row = session.exec( select(TopicTopic) .where(TopicTopic.bot_id == bot_id) .where(TopicTopic.topic_key == normalized_key) .limit(1) ).first() if not row: raise HTTPException(status_code=404, detail="Topic not found") items = session.exec( select(TopicItem) .where(TopicItem.bot_id == bot_id) .where(TopicItem.topic_key == normalized_key) ).all() for item in items: session.delete(item) session.delete(row) session.commit() return {"status": "deleted", "bot_id": bot_id, "topic_key": normalized_key} @router.get("/api/bots/{bot_id}/topic-items") def list_bot_topic_items( bot_id: str, topic_key: Optional[str] = None, cursor: Optional[int] = None, limit: int = 50, session: Session = Depends(get_session), ): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") normalized_limit = max(1, min(int(limit or 50), 100)) stmt = select(TopicItem).where(TopicItem.bot_id == bot_id) normalized_topic_key = _normalize_topic_key(topic_key or "") if normalized_topic_key: stmt = stmt.where(TopicItem.topic_key == normalized_topic_key) if cursor is not None: normalized_cursor = int(cursor) if normalized_cursor > 0: stmt = stmt.where(TopicItem.id < normalized_cursor) rows = session.exec( stmt.order_by(TopicItem.id.desc()).limit(normalized_limit + 1) ).all() next_cursor: Optional[int] = None if len(rows) > normalized_limit: next_cursor = rows[-1].id rows = rows[:normalized_limit] return { "bot_id": bot_id, "topic_key": normalized_topic_key or None, "items": [_topic_item_to_dict(row) for row in rows], "next_cursor": next_cursor, "unread_count": _count_topic_items(session, bot_id, normalized_topic_key, unread_only=True), "total_unread_count": _count_topic_items(session, bot_id, unread_only=True), } @router.get("/api/bots/{bot_id}/topic-items/stats") def get_bot_topic_item_stats(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") latest_item = session.exec( select(TopicItem) .where(TopicItem.bot_id == bot_id) .order_by(TopicItem.id.desc()) .limit(1) ).first() return { "bot_id": bot_id, "total_count": _count_topic_items(session, bot_id), "unread_count": _count_topic_items(session, bot_id, unread_only=True), "latest_item_id": int(latest_item.id or 0) if latest_item and latest_item.id else None, } @router.post("/api/bots/{bot_id}/topic-items/{item_id}/read") def mark_bot_topic_item_read(bot_id: str, item_id: int, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") row = session.exec( select(TopicItem) .where(TopicItem.bot_id == bot_id) .where(TopicItem.id == item_id) .limit(1) ).first() if not row: raise HTTPException(status_code=404, detail="Topic item not found") if not bool(row.is_read): row.is_read = True session.add(row) session.commit() session.refresh(row) return { "status": "updated", "bot_id": bot_id, "item": _topic_item_to_dict(row), } @router.post("/api/mcp/topic") async def topic_mcp_entry(request: Request, session: Session = Depends(get_session)): token = str(request.headers.get(TOPIC_MCP_TOKEN_HEADER) or "").strip() if not token: auth = str(request.headers.get("authorization") or "").strip() if auth.lower().startswith("bearer "): token = auth[7:].strip() if not token: raise HTTPException(status_code=401, detail="Missing topic_mcp token") bot_id = _resolve_topic_mcp_bot_id_by_token(session, token) if not bot_id: raise HTTPException(status_code=401, detail="Invalid topic_mcp token") bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=401, detail="Invalid topic_mcp token") try: payload = await request.json() except Exception: return JSONResponse(status_code=400, content=_jsonrpc_error(None, -32700, "Parse error")) if isinstance(payload, list): if not payload: return JSONResponse(status_code=400, content=_jsonrpc_error(None, -32600, "Invalid Request")) responses: List[Dict[str, Any]] = [] for item in payload: resp = _handle_topic_mcp_rpc_item(session, bot_id, item) if resp is not None: responses.append(resp) if not responses: return Response(status_code=204) return JSONResponse(content=responses) resp = _handle_topic_mcp_rpc_item(session, bot_id, payload) if resp is None: return Response(status_code=204) return JSONResponse(content=resp)