dashboard-nanobot/bot-images/dashboard.py

140 lines
5.1 KiB
Python

import json
from types import SimpleNamespace
from typing import Any
from aiohttp import web
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
class DashboardChannel(BaseChannel):
"""
专门为管理面板设计的渠道。
它充当机器人内部总线 (Bus) 与宿主机面板之间的桥梁。
"""
name = "dashboard"
display_name = "Dashboard"
@classmethod
def default_config(cls) -> dict[str, Any]:
return {
"enabled": False,
"host": "0.0.0.0",
"port": 9000,
"allow_from": ["*"],
}
@classmethod
def _coerce_config(cls, config: Any) -> Any:
if config is None:
return SimpleNamespace(**cls.default_config())
if isinstance(config, dict):
merged = cls.default_config()
merged.update(config)
if "allowFrom" in config and "allow_from" not in config:
merged["allow_from"] = config.get("allowFrom")
return SimpleNamespace(**merged)
return config
def __init__(
self,
config: Any,
bus: MessageBus,
host: str | None = None,
port: int | None = None,
):
config_obj = self._coerce_config(config)
super().__init__(config_obj, bus)
self.host = host if host is not None else getattr(config_obj, "host", "0.0.0.0")
self.port = port if port is not None else getattr(config_obj, "port", 9000)
self.runner: web.AppRunner | None = None
async def start(self) -> None:
"""启动 Dashboard HTTP 服务"""
app = web.Application()
app.router.add_post("/chat", self._handle_chat)
self.runner = web.AppRunner(app)
await self.runner.setup()
site = web.TCPSite(self.runner, self.host, self.port)
await site.start()
self._running = True
logger.info(f"🚀 Dashboard Channel 代理已上线,监听端口: {self.port}")
async def stop(self) -> None:
"""停止服务"""
if self.runner:
await self.runner.cleanup()
self.runner = None
self._running = False
logger.info("Dashboard Channel 已下线")
async def send(self, message: OutboundMessage) -> None:
"""
从总线 (Bus) 接收机器人发出的所有消息,并结构化输出到 stdout。
"""
media = [str(v).strip().replace("\\", "/") for v in (message.media or []) if str(v).strip()]
if not message.content and not media:
return
# 核心:从元数据识别消息类型(进度更新 vs 最终回复)
metadata = message.metadata or {}
is_progress = metadata.get("_progress", False)
is_tool_hint = metadata.get("_tool_hint", False)
payload = {
"type": "BUS_EVENT",
"source": "dashboard_channel",
"is_progress": is_progress,
"is_tool": is_tool_hint,
"content": message.content,
"media": media,
}
usage = metadata.get("usage")
if isinstance(usage, dict):
payload["usage"] = usage
request_id = str(metadata.get("request_id") or "").strip()
if request_id:
payload["request_id"] = request_id
provider = str(metadata.get("provider") or "").strip()
if provider:
payload["provider"] = provider
model = str(metadata.get("model") or "").strip()
if model:
payload["model"] = model
# 使用 JSON 格式输出,方便面板后端精准解析,告别正则
print(f"\n__DASHBOARD_DATA_START__{json.dumps(payload, ensure_ascii=False)}__DASHBOARD_DATA_END__\n", flush=True)
async def _handle_chat(self, request: web.Request) -> web.Response:
"""处理来自面板的指令入站"""
try:
data = await request.json()
user_message = data.get("message", "").strip()
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
if not user_message and not media:
return web.json_response({"status": "error", "reason": "empty message and media"}, status=400)
if not user_message:
user_message = "[attachment message]"
# 调试日志:打印收到的原始消息长度和前 20 个字符,确保中文未乱码
logger.info(f"📥 [Dashboard Channel] 收到指令 (len={len(user_message)}): {user_message[:20]}...")
# 统一走基类入口,兼容不同核心的会话与权限逻辑。
await self._handle_message(
sender_id="user",
chat_id="direct",
content=user_message,
media=media,
)
return web.json_response({"status": "ok"})
except Exception as e:
logger.error(f"❌ Dashboard Channel 接收指令失败: {e}")
return web.json_response({"status": "error", "reason": str(e)}, status=500)