dashboard-nanobot/backend/core/docker_manager.py

398 lines
16 KiB
Python

import os
import re
import threading
import time
import codecs
from typing import Callable, Dict, List, Optional
import json
import docker
class BotDockerManager:
def __init__(self, host_data_root: str, base_image: str = "nanobot-base:v0.1.4"):
try:
self.client = docker.from_env(timeout=6)
self.client.version()
print("✅ Docker 引擎连接成功")
except Exception as e:
self.client = None
print(f"⚠️ 警告: 无法连接到 Docker 引擎。请确保 Docker Desktop 已启动。错误: {e}")
self.host_data_root = host_data_root
self.base_image = base_image
self.active_monitors = {}
def has_image(self, tag: str) -> bool:
if not self.client:
return False
try:
self.client.images.get(tag)
return True
except docker.errors.ImageNotFound:
return False
except Exception:
return False
def list_images_by_repo(self, repository: str = "nanobot-base") -> List[Dict[str, str]]:
"""List docker images by repository prefix, returning normalized tag/id pairs."""
if not self.client:
return []
rows: List[Dict[str, str]] = []
try:
images = self.client.images.list(name=repository)
for image in images:
for tag in image.tags:
repo, _, version = tag.partition(":")
if repo != repository or not version:
continue
rows.append(
{
"tag": tag,
"version": version.removeprefix("v"),
"image_id": image.id,
}
)
except Exception as e:
print(f"[DockerManager] list_images_by_repo failed: {e}")
return rows
def start_bot(
self,
bot_id: str,
image_tag: Optional[str] = None,
env_vars: Optional[Dict[str, str]] = None,
on_state_change: Optional[Callable[[str, dict], None]] = None,
) -> bool:
if not self.client:
print("❌ 错误: Docker 客户端未初始化,无法启动机器人。")
return False
image = image_tag or self.base_image
if not self.has_image(image):
print(f"❌ 错误: 镜像不存在: {image}")
return False
bot_workspace = os.path.join(self.host_data_root, bot_id, ".nanobot")
container_name = f"worker_{bot_id}"
os.makedirs(bot_workspace, exist_ok=True)
try:
try:
container = self.client.containers.get(container_name)
container.reload()
if container.status == "running":
if on_state_change:
self.ensure_monitor(bot_id, on_state_change)
return True
container.remove(force=True)
except docker.errors.NotFound:
pass
container = self.client.containers.run(
image=image,
name=container_name,
detach=True,
stdin_open=True,
tty=True,
environment=env_vars or {},
volumes={
bot_workspace: {"bind": "/root/.nanobot", "mode": "rw"},
},
mem_limit="1g",
cpu_quota=100000,
network_mode="bridge",
)
if on_state_change:
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except Exception as e:
print(f"[DockerManager] Error starting bot {bot_id}: {e}")
return False
def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool:
"""Ensure an active log monitor exists for a running bot container."""
if not self.client:
return False
existing = self.active_monitors.get(bot_id)
if existing and existing.is_alive():
return True
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
if container.status != "running":
return False
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except docker.errors.NotFound:
return False
except Exception as e:
print(f"[DockerManager] Error ensuring monitor for {bot_id}: {e}")
return False
def stop_bot(self, bot_id: str) -> bool:
if not self.client:
return False
container_name = f"worker_{bot_id}"
try:
container = self.client.containers.get(container_name)
container.stop(timeout=5)
container.remove()
self.active_monitors.pop(bot_id, None)
return True
except docker.errors.NotFound:
self.active_monitors.pop(bot_id, None)
return False
except Exception as e:
print(f"[DockerManager] Error stopping bot {bot_id}: {e}")
return False
def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
"""Send a command to dashboard channel with robust container-local delivery."""
if not self.client:
return False
media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()]
# Primary path on Docker Desktop/Mac: execute curl inside container namespace.
for attempt in range(3):
if self._send_command_via_exec(bot_id, command, media_paths):
return True
time.sleep(0.25 * (attempt + 1))
# Secondary path for environments where host can reach container IP.
return self._send_command_via_host_http(bot_id, command, media_paths)
def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
container = self.client.containers.get(f"worker_{bot_id}")
payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False)
cmd = (
"curl -sS --fail --max-time 6 "
"-X POST -H 'Content-Type: application/json' "
"-d \"$DASHBOARD_PAYLOAD\" http://127.0.0.1:9000/chat"
)
result = container.exec_run(["/bin/sh", "-c", cmd], environment={"DASHBOARD_PAYLOAD": payload_json})
output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output)
if result.exit_code != 0:
print(f"[DockerManager] exec curl failed for {bot_id}: exit={result.exit_code}, out={output[:300]}")
return False
if output.strip():
try:
parsed = json.loads(output)
if str(parsed.get("status", "")).lower() != "ok":
print(f"[DockerManager] exec curl non-ok response for {bot_id}: {output[:300]}")
return False
except Exception:
# Non-JSON but zero exit still treated as success.
pass
return True
except Exception as e:
print(f"[DockerManager] exec curl exception for {bot_id}: {e}")
return False
def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
import httpx
container_name = f"worker_{bot_id}"
payload = {"message": command, "media": media or []}
container = self.client.containers.get(container_name)
ip_address = container.attrs["NetworkSettings"]["IPAddress"] or "127.0.0.1"
target_url = f"http://{ip_address}:9000/chat"
with httpx.Client(timeout=4.0) as client:
resp = client.post(target_url, json=payload)
if resp.status_code == 200:
return True
print(f"[DockerManager] host HTTP failed: {resp.status_code} - {resp.text[:300]}")
return False
except Exception as e:
print(f"[DockerManager] host HTTP exception: {e}")
return False
def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]:
if not self.client:
return []
container_name = f"worker_{bot_id}"
try:
container = self.client.containers.get(container_name)
raw = container.logs(tail=max(1, int(tail)))
text = raw.decode("utf-8", errors="ignore")
return [line for line in text.splitlines() if line.strip()]
except Exception as e:
print(f"[DockerManager] Error reading logs for {bot_id}: {e}")
return []
def _monitor_container_logs(self, bot_id: str, container, callback: Callable[[str, dict], None]):
try:
buffer = ""
dashboard_capture: Optional[str] = None
decoder = codecs.getincrementaldecoder("utf-8")("replace")
# Only tail new logs from "now" to avoid replaying historical stdout
# (which would repopulate cleared chat messages from old dashboard packets).
since_ts = int(time.time())
for chunk in container.logs(stream=True, follow=True, since=since_ts):
if isinstance(chunk, bytes):
text = decoder.decode(chunk)
else:
text = str(chunk)
if not text:
continue
buffer += text
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
normalized = line.strip("\r").strip()
if not normalized:
continue
if dashboard_capture is not None:
dashboard_capture = f"{dashboard_capture}\n{normalized}"
if "__DASHBOARD_DATA_END__" in dashboard_capture:
state_packet = self._parse_dashboard_packet(dashboard_capture)
if state_packet:
callback(bot_id, state_packet)
dashboard_capture = None
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
if "__DASHBOARD_DATA_START__" in normalized and "__DASHBOARD_DATA_END__" not in normalized:
dashboard_capture = normalized
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
state_packet = self._parse_log_line(normalized)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
rest = decoder.decode(b"", final=True)
if rest:
buffer += rest
tail = buffer.strip()
if tail:
state_packet = self._parse_log_line(tail)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": tail})
except Exception as e:
print(f"[DockerManager] Log stream closed for {bot_id}: {e}")
def _parse_dashboard_packet(self, line: str):
if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line:
return None
try:
raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip()
data = json.loads(raw_json)
event_type = str(data.get("type", "")).upper()
content = str(data.get("content") or data.get("text") or "").strip()
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
is_progress = bool(data.get("is_progress", False))
is_tool = bool(data.get("is_tool", False))
if event_type == "AGENT_STATE":
payload = data.get("payload") or {}
state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING"))
action_msg = str(payload.get("action_msg") or payload.get("msg") or content)
return {
"type": "AGENT_STATE",
"channel": "dashboard",
"payload": {"state": state, "action_msg": action_msg},
}
if event_type == "ASSISTANT_MESSAGE":
if content or media:
return {"type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": content, "media": media}
return None
if event_type == "BUS_EVENT" or is_progress:
return {
"type": "BUS_EVENT",
"channel": "dashboard",
"content": content,
"media": media,
"is_progress": is_progress,
"is_tool": is_tool,
}
if content or media:
return {
"type": "ASSISTANT_MESSAGE",
"channel": "dashboard",
"text": content,
"media": media,
}
except Exception:
return None
return None
def _parse_log_line(self, line: str):
# 1. 结构化数据解析(首选,直接从机器人总线获取)
if "__DASHBOARD_DATA_START__" in line:
packet = self._parse_dashboard_packet(line)
if packet:
return packet
# 2. 解析全渠道运行态日志(用于右侧状态面板)
process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line)
if process_match:
channel = process_match.group(1).strip().lower()
action_msg = process_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "THINKING",
"action_msg": action_msg[:4000],
},
}
response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line)
if response_match:
channel = response_match.group(1).strip().lower()
action_msg = response_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "SUCCESS",
"action_msg": action_msg[:4000],
},
}
# 3. 备选方案:常规日志解析
lower = line.lower()
tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE)
if tool_call_match:
return {
"type": "AGENT_STATE",
"payload": {
"state": "TOOL_CALL",
"action_msg": tool_call_match.group(1).strip()[:4000],
},
}
if "error" in lower or "traceback" in lower:
return {
"type": "AGENT_STATE",
"payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"},
}
return None