from __future__ import annotations import asyncio from aiohttp import web from loguru import logger from typing import Any import json from nanobot.channels.base import BaseChannel from nanobot.bus.events import InboundMessage, OutboundMessage class DashboardChannel(BaseChannel): """ 专门为管理面板设计的渠道。 它充当机器人内部总线 (Bus) 与宿主机面板之间的桥梁。 """ def __init__(self, config: Any, bus: Any, host: str = "0.0.0.0", port: int = 9000): super().__init__(config, bus) self.host = host self.port = port self.runner = None async def start(self) -> None: """启动 Dashboard HTTP 服务""" app = web.Application() app.router.add_post("/chat", self._handle_chat) self.runner = web.AppRunner(app) await self.runner.setup() site = web.TCPSite(self.runner, self.host, self.port) await site.start() self._is_running = True logger.info(f"🚀 Dashboard Channel 代理已上线,监听端口: {self.port}") async def stop(self) -> None: """停止服务""" if self.runner: await self.runner.cleanup() self._is_running = False logger.info("Dashboard Channel 已下线") async def send(self, message: OutboundMessage) -> None: """ 从总线 (Bus) 接收机器人发出的所有消息,并结构化输出到 stdout。 """ media = [str(v).strip().replace("\\", "/") for v in (message.media or []) if str(v).strip()] if not message.content and not media: return # 核心:从元数据识别消息类型(进度更新 vs 最终回复) is_progress = message.metadata.get("_progress", False) is_tool_hint = message.metadata.get("_tool_hint", False) payload = { "type": "BUS_EVENT", "source": "dashboard_channel", "is_progress": is_progress, "is_tool": is_tool_hint, "content": message.content, "media": media, } # 使用 JSON 格式输出,方便面板后端精准解析,告别正则 print(f"\n__DASHBOARD_DATA_START__{json.dumps(payload, ensure_ascii=False)}__DASHBOARD_DATA_END__\n", flush=True) async def _handle_chat(self, request: web.Request) -> web.Response: """处理来自面板的指令入站""" try: data = await request.json() user_message = data.get("message", "").strip() media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()] if not user_message and not media: return web.json_response({"status": "error", "reason": "empty message and media"}, status=400) if not user_message: user_message = "[attachment message]" # 调试日志:打印收到的原始消息长度和前 20 个字符,确保中文未乱码 logger.info(f"📥 [Dashboard Channel] 收到指令 (len={len(user_message)}): {user_message[:20]}...") # 将消息塞入总线 await self.bus.publish_inbound(InboundMessage( channel="dashboard", sender_id="user", chat_id="direct", content=user_message, media=media, )) return web.json_response({"status": "ok"}) except Exception as e: logger.error(f"❌ Dashboard Channel 接收指令失败: {e}") return web.json_response({"status": "error", "reason": str(e)}, status=500)