import json from types import SimpleNamespace from typing import Any from aiohttp import web from loguru import logger from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.base import BaseChannel class DashboardChannel(BaseChannel): """ 专门为管理面板设计的渠道。 它充当机器人内部总线 (Bus) 与宿主机面板之间的桥梁。 """ name = "dashboard" display_name = "Dashboard" @classmethod def default_config(cls) -> dict[str, Any]: return { "enabled": False, "host": "0.0.0.0", "port": 9000, "allow_from": ["*"], } @classmethod def _coerce_config(cls, config: Any) -> Any: if config is None: return SimpleNamespace(**cls.default_config()) if isinstance(config, dict): merged = cls.default_config() merged.update(config) if "allowFrom" in config and "allow_from" not in config: merged["allow_from"] = config.get("allowFrom") return SimpleNamespace(**merged) return config def __init__( self, config: Any, bus: MessageBus, host: str | None = None, port: int | None = None, ): config_obj = self._coerce_config(config) super().__init__(config_obj, bus) self.host = host if host is not None else getattr(config_obj, "host", "0.0.0.0") self.port = port if port is not None else getattr(config_obj, "port", 9000) self.runner: web.AppRunner | None = None async def start(self) -> None: """启动 Dashboard HTTP 服务""" app = web.Application() app.router.add_post("/chat", self._handle_chat) self.runner = web.AppRunner(app) await self.runner.setup() site = web.TCPSite(self.runner, self.host, self.port) await site.start() self._running = True logger.info(f"🚀 Dashboard Channel 代理已上线,监听端口: {self.port}") async def stop(self) -> None: """停止服务""" if self.runner: await self.runner.cleanup() self.runner = None self._running = False logger.info("Dashboard Channel 已下线") async def send(self, message: OutboundMessage) -> None: """ 从总线 (Bus) 接收机器人发出的所有消息,并结构化输出到 stdout。 """ media = [str(v).strip().replace("\\", "/") for v in (message.media or []) if str(v).strip()] if not message.content and not media: return # 核心:从元数据识别消息类型(进度更新 vs 最终回复) metadata = message.metadata or {} is_progress = metadata.get("_progress", False) is_tool_hint = metadata.get("_tool_hint", False) payload = { "type": "BUS_EVENT", "source": "dashboard_channel", "is_progress": is_progress, "is_tool": is_tool_hint, "content": message.content, "media": media, } usage = metadata.get("usage") if isinstance(usage, dict): payload["usage"] = usage request_id = str(metadata.get("request_id") or "").strip() if request_id: payload["request_id"] = request_id provider = str(metadata.get("provider") or "").strip() if provider: payload["provider"] = provider model = str(metadata.get("model") or "").strip() if model: payload["model"] = model # 使用 JSON 格式输出,方便面板后端精准解析,告别正则 print(f"\n__DASHBOARD_DATA_START__{json.dumps(payload, ensure_ascii=False)}__DASHBOARD_DATA_END__\n", flush=True) async def _handle_chat(self, request: web.Request) -> web.Response: """处理来自面板的指令入站""" try: data = await request.json() user_message = data.get("message", "").strip() media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()] if not user_message and not media: return web.json_response({"status": "error", "reason": "empty message and media"}, status=400) if not user_message: user_message = "[attachment message]" # 调试日志:打印收到的原始消息长度和前 20 个字符,确保中文未乱码 logger.info(f"📥 [Dashboard Channel] 收到指令 (len={len(user_message)}): {user_message[:20]}...") # 统一走基类入口,兼容不同核心的会话与权限逻辑。 await self._handle_message( sender_id="user", chat_id="direct", content=user_message, media=media, ) return web.json_response({"status": "ok"}) except Exception as e: logger.error(f"❌ Dashboard Channel 接收指令失败: {e}") return web.json_response({"status": "error", "reason": str(e)}, status=500)