dashboard-nanobot/backend/core/docker_manager.py

734 lines
30 KiB
Python

import os
import re
import threading
import time
import codecs
import base64
from typing import Any, Callable, Dict, List, Optional, Tuple
import json
import docker
class BotDockerManager:
def __init__(self, host_data_root: str, base_image: str = "nanobot-base:v0.1.4"):
try:
self.client = docker.from_env(timeout=6)
self.client.version()
print("✅ Docker 引擎连接成功")
except Exception as e:
self.client = None
print(f"⚠️ 警告: 无法连接到 Docker 引擎。请确保 Docker Desktop 已启动。错误: {e}")
self.host_data_root = host_data_root
self.base_image = base_image
self.active_monitors = {}
self._last_delivery_error: Dict[str, str] = {}
@staticmethod
def _normalize_resource_limits(
cpu_cores: Optional[float],
memory_mb: Optional[int],
storage_gb: Optional[int],
) -> Tuple[float, int, int]:
try:
cpu = float(cpu_cores) if cpu_cores is not None else 1.0
except Exception:
cpu = 1.0
try:
memory = int(memory_mb) if memory_mb is not None else 1024
except Exception:
memory = 1024
try:
storage = int(storage_gb) if storage_gb is not None else 10
except Exception:
storage = 10
if cpu < 0:
cpu = 1.0
if memory < 0:
memory = 1024
if storage < 0:
storage = 10
cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu))
memory = 0 if memory == 0 else min(65536, max(256, memory))
storage = 0 if storage == 0 else min(1024, max(1, storage))
return cpu, memory, storage
def has_image(self, tag: str) -> bool:
if not self.client:
return False
try:
self.client.images.get(tag)
return True
except docker.errors.ImageNotFound:
return False
except Exception:
return False
def list_images_by_repo(self, repository: str = "nanobot-base") -> List[Dict[str, str]]:
"""List docker images by repository prefix, returning normalized tag/id pairs."""
if not self.client:
return []
rows: List[Dict[str, str]] = []
try:
images = self.client.images.list(name=repository)
for image in images:
for tag in image.tags:
repo, _, version = tag.partition(":")
if repo != repository or not version:
continue
rows.append(
{
"tag": tag,
"version": version.removeprefix("v"),
"image_id": image.id,
}
)
except Exception as e:
print(f"[DockerManager] list_images_by_repo failed: {e}")
return rows
def start_bot(
self,
bot_id: str,
image_tag: Optional[str] = None,
env_vars: Optional[Dict[str, str]] = None,
cpu_cores: Optional[float] = None,
memory_mb: Optional[int] = None,
storage_gb: Optional[int] = None,
on_state_change: Optional[Callable[[str, dict], None]] = None,
) -> bool:
if not self.client:
print("❌ 错误: Docker 客户端未初始化,无法启动机器人。")
return False
image = image_tag or self.base_image
if not self.has_image(image):
print(f"❌ 错误: 镜像不存在: {image}")
return False
bot_workspace = os.path.join(self.host_data_root, bot_id, ".nanobot")
container_name = f"worker_{bot_id}"
os.makedirs(bot_workspace, exist_ok=True)
cpu, memory, storage = self._normalize_resource_limits(cpu_cores, memory_mb, storage_gb)
base_kwargs = {
"image": image,
"name": container_name,
"detach": True,
"stdin_open": True,
"tty": True,
"environment": env_vars or {},
"volumes": {
bot_workspace: {"bind": "/root/.nanobot", "mode": "rw"},
},
"network_mode": "bridge",
}
if memory > 0:
base_kwargs["mem_limit"] = f"{memory}m"
if cpu > 0:
base_kwargs["nano_cpus"] = int(cpu * 1_000_000_000)
try:
try:
container = self.client.containers.get(container_name)
container.reload()
if container.status == "running":
if on_state_change:
self.ensure_monitor(bot_id, on_state_change)
return True
container.remove(force=True)
except docker.errors.NotFound:
pass
container = None
if storage > 0:
try:
container = self.client.containers.run(
storage_opt={"size": f"{storage}G"},
**base_kwargs,
)
except Exception as e:
# Some Docker engines (e.g. Desktop/overlay2) may not support size storage option.
print(f"[DockerManager] storage limit not applied for {bot_id}: {e}")
container = self.client.containers.run(**base_kwargs)
else:
container = self.client.containers.run(**base_kwargs)
if on_state_change:
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except Exception as e:
print(f"[DockerManager] Error starting bot {bot_id}: {e}")
return False
def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool:
"""Ensure an active log monitor exists for a running bot container."""
if not self.client:
return False
existing = self.active_monitors.get(bot_id)
if existing and existing.is_alive():
return True
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
if container.status != "running":
return False
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except docker.errors.NotFound:
return False
except Exception as e:
print(f"[DockerManager] Error ensuring monitor for {bot_id}: {e}")
return False
def stop_bot(self, bot_id: str) -> bool:
if not self.client:
return False
container_name = f"worker_{bot_id}"
try:
container = self.client.containers.get(container_name)
container.stop(timeout=5)
container.remove()
self.active_monitors.pop(bot_id, None)
return True
except docker.errors.NotFound:
self.active_monitors.pop(bot_id, None)
return False
except Exception as e:
print(f"[DockerManager] Error stopping bot {bot_id}: {e}")
return False
def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
"""Send a command to dashboard channel with robust container-local delivery."""
if not self.client:
self._last_delivery_error[bot_id] = "Docker client is not available"
return False
media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()]
self._last_delivery_error.pop(bot_id, None)
# Primary path on Docker Desktop/Mac: execute curl inside container namespace.
for attempt in range(3):
if self._send_command_via_exec(bot_id, command, media_paths):
self._last_delivery_error.pop(bot_id, None)
return True
time.sleep(0.25 * (attempt + 1))
# Secondary path for environments where host can reach container IP.
if self._send_command_via_host_http(bot_id, command, media_paths):
self._last_delivery_error.pop(bot_id, None)
return True
if bot_id not in self._last_delivery_error:
self._last_delivery_error[bot_id] = "Unknown delivery failure"
return False
def get_last_delivery_error(self, bot_id: str) -> str:
return str(self._last_delivery_error.get(bot_id, "") or "").strip()
def get_bot_status(self, bot_id: str) -> str:
"""Return normalized runtime status from Docker: RUNNING or STOPPED."""
if not self.client:
return "STOPPED"
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
raw = str(container.status or "").strip().lower()
if raw in {"running", "restarting"}:
return "RUNNING"
return "STOPPED"
except docker.errors.NotFound:
return "STOPPED"
except Exception:
return "STOPPED"
@staticmethod
def _parse_size_to_bytes(raw: Any) -> Optional[int]:
if raw is None:
return None
text = str(raw).strip()
if not text:
return None
try:
return int(float(text))
except Exception:
pass
match = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)\s*([kmgtp]?)(i?b)?", text.lower())
if not match:
return None
number = float(match.group(1))
unit = (match.group(2) or "").lower()
scale = {
"": 1,
"k": 1024,
"m": 1024 ** 2,
"g": 1024 ** 3,
"t": 1024 ** 4,
"p": 1024 ** 5,
}.get(unit, 1)
return int(number * scale)
@staticmethod
def _calc_cpu_percent(stats: Dict[str, Any]) -> float:
try:
cpu_stats = stats.get("cpu_stats") or {}
precpu_stats = stats.get("precpu_stats") or {}
cpu_total = float((cpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
prev_cpu_total = float((precpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
cpu_delta = cpu_total - prev_cpu_total
system_total = float(cpu_stats.get("system_cpu_usage") or 0)
prev_system_total = float(precpu_stats.get("system_cpu_usage") or 0)
system_delta = system_total - prev_system_total
online_cpus = int(
cpu_stats.get("online_cpus")
or len((cpu_stats.get("cpu_usage") or {}).get("percpu_usage") or [])
or 1
)
if cpu_delta <= 0 or system_delta <= 0:
return 0.0
return max(0.0, (cpu_delta / system_delta) * online_cpus * 100.0)
except Exception:
return 0.0
def get_bot_resource_snapshot(self, bot_id: str) -> Dict[str, Any]:
snapshot: Dict[str, Any] = {
"docker_status": "STOPPED",
"limits": {
"cpu_cores": None,
"memory_bytes": None,
"storage_bytes": None,
"nano_cpus": 0,
"storage_opt_raw": "",
},
"usage": {
"cpu_percent": 0.0,
"memory_bytes": 0,
"memory_limit_bytes": 0,
"memory_percent": 0.0,
"network_rx_bytes": 0,
"network_tx_bytes": 0,
"blk_read_bytes": 0,
"blk_write_bytes": 0,
"pids": 0,
"container_rw_bytes": 0,
},
}
if not self.client:
return snapshot
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
status_raw = str(container.status or "").strip().lower()
snapshot["docker_status"] = "RUNNING" if status_raw in {"running", "restarting"} else "STOPPED"
inspect: Dict[str, Any]
try:
inspect = self.client.api.inspect_container(container.id, size=True)
except TypeError:
# Older docker SDK versions do not support `size` kwarg.
inspect = self.client.api.inspect_container(container.id)
except Exception as e:
if "unexpected keyword argument 'size'" in str(e):
inspect = self.client.api.inspect_container(container.id)
else:
raise
host_cfg = inspect.get("HostConfig") or {}
nano_cpus = int(host_cfg.get("NanoCpus") or 0)
cpu_quota = int(host_cfg.get("CpuQuota") or 0)
cpu_period = int(host_cfg.get("CpuPeriod") or 0)
memory_bytes = int(host_cfg.get("Memory") or 0)
storage_opt = host_cfg.get("StorageOpt") or {}
storage_raw = storage_opt.get("size")
storage_bytes = self._parse_size_to_bytes(storage_raw)
if nano_cpus > 0:
cpu_cores = nano_cpus / 1_000_000_000
elif cpu_quota > 0 and cpu_period > 0:
cpu_cores = cpu_quota / cpu_period
else:
cpu_cores = None
snapshot["limits"] = {
"cpu_cores": cpu_cores,
"memory_bytes": memory_bytes if memory_bytes > 0 else None,
"storage_bytes": storage_bytes,
"nano_cpus": nano_cpus,
"storage_opt_raw": str(storage_raw or ""),
}
snapshot["usage"]["container_rw_bytes"] = int(inspect.get("SizeRw") or 0)
if snapshot["docker_status"] == "RUNNING":
stats = container.stats(stream=False) or {}
memory_stats = stats.get("memory_stats") or {}
memory_usage = int(memory_stats.get("usage") or 0)
memory_limit = int(memory_stats.get("limit") or 0)
if memory_usage > 0:
cache = int((memory_stats.get("stats") or {}).get("inactive_file") or 0)
memory_usage = max(0, memory_usage - cache)
networks = stats.get("networks") or {}
rx_total = 0
tx_total = 0
for _, row in networks.items():
if isinstance(row, dict):
rx_total += int(row.get("rx_bytes") or 0)
tx_total += int(row.get("tx_bytes") or 0)
blk_stats = stats.get("blkio_stats") or {}
io_rows = blk_stats.get("io_service_bytes_recursive") or []
blk_read = 0
blk_write = 0
for row in io_rows:
if not isinstance(row, dict):
continue
op = str(row.get("op") or "").upper()
value = int(row.get("value") or 0)
if op == "READ":
blk_read += value
elif op == "WRITE":
blk_write += value
pids_current = int((stats.get("pids_stats") or {}).get("current") or 0)
cpu_percent = self._calc_cpu_percent(stats)
memory_percent = 0.0
if memory_limit > 0:
memory_percent = (memory_usage / memory_limit) * 100.0
if snapshot["usage"]["container_rw_bytes"] <= 0:
storage_stats = stats.get("storage_stats") or {}
rw_size = int(
storage_stats.get("size_rw")
or storage_stats.get("rw_size")
or 0
)
snapshot["usage"]["container_rw_bytes"] = max(0, rw_size)
snapshot["usage"].update(
{
"cpu_percent": cpu_percent,
"memory_bytes": memory_usage,
"memory_limit_bytes": memory_limit,
"memory_percent": max(0.0, memory_percent),
"network_rx_bytes": rx_total,
"network_tx_bytes": tx_total,
"blk_read_bytes": blk_read,
"blk_write_bytes": blk_write,
"pids": pids_current,
}
)
except docker.errors.NotFound:
return snapshot
except Exception as e:
print(f"[DockerManager] get_bot_resource_snapshot failed for {bot_id}: {e}")
return snapshot
def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
if container.status != "running":
self._last_delivery_error[bot_id] = f"Container status is {container.status}"
return False
payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False)
# Try direct curl first (no shell dependency).
result = container.exec_run(
[
"curl",
"-sS",
"--fail",
"--max-time",
"6",
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
payload_json,
"http://127.0.0.1:9000/chat",
]
)
output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output)
if result.exit_code != 0:
reason = f"exec curl failed: exit={result.exit_code}, out={output[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
# Fallback inside container without curl/shell.
payload_b64 = base64.b64encode(payload_json.encode("utf-8")).decode("ascii")
py_script = (
"import base64,json,os,urllib.request\n"
"payload=json.loads(base64.b64decode(os.environ['DASHBOARD_PAYLOAD_B64']).decode('utf-8'))\n"
"req=urllib.request.Request('http://127.0.0.1:9000/chat',"
"data=json.dumps(payload,ensure_ascii=False).encode('utf-8'),"
"headers={'Content-Type':'application/json'})\n"
"with urllib.request.urlopen(req, timeout=8) as resp:\n"
" print(resp.read().decode('utf-8','ignore'))\n"
)
py_bins = ["python3", "python"]
for py_bin in py_bins:
py_result = container.exec_run(
[py_bin, "-c", py_script],
environment={"DASHBOARD_PAYLOAD_B64": payload_b64},
)
py_output = py_result.output.decode("utf-8", errors="ignore") if isinstance(py_result.output, (bytes, bytearray)) else str(py_result.output)
if py_result.exit_code != 0:
py_reason = f"exec {py_bin} fallback failed: exit={py_result.exit_code}, out={py_output[:300]}"
print(f"[DockerManager] {py_reason}")
self._last_delivery_error[bot_id] = py_reason
continue
if py_output.strip():
try:
parsed = json.loads(py_output)
if str(parsed.get("status", "")).lower() != "ok":
py_reason = f"exec {py_bin} fallback non-ok response: {py_output[:300]}"
print(f"[DockerManager] {py_reason}")
self._last_delivery_error[bot_id] = py_reason
continue
except Exception:
pass
return True
return False
if output.strip():
try:
parsed = json.loads(output)
if str(parsed.get("status", "")).lower() != "ok":
reason = f"exec curl non-ok response: {output[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
except Exception:
# Non-JSON but zero exit still treated as success.
pass
return True
except Exception as e:
reason = f"exec curl exception: {e}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
import httpx
container_name = f"worker_{bot_id}"
payload = {"message": command, "media": media or []}
container = self.client.containers.get(container_name)
ip_address = container.attrs["NetworkSettings"]["IPAddress"] or "127.0.0.1"
target_url = f"http://{ip_address}:9000/chat"
with httpx.Client(timeout=4.0) as client:
resp = client.post(target_url, json=payload)
if resp.status_code == 200:
return True
reason = f"host HTTP failed: {resp.status_code} - {resp.text[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
except Exception as e:
reason = f"host HTTP exception: {e}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]:
if not self.client:
return []
container_name = f"worker_{bot_id}"
try:
container = self.client.containers.get(container_name)
raw = container.logs(tail=max(1, int(tail)))
text = raw.decode("utf-8", errors="ignore")
return [line for line in text.splitlines() if line.strip()]
except Exception as e:
print(f"[DockerManager] Error reading logs for {bot_id}: {e}")
return []
def _monitor_container_logs(self, bot_id: str, container, callback: Callable[[str, dict], None]):
try:
buffer = ""
dashboard_capture: Optional[str] = None
decoder = codecs.getincrementaldecoder("utf-8")("replace")
# Only tail new logs from "now" to avoid replaying historical stdout
# (which would repopulate cleared chat messages from old dashboard packets).
since_ts = int(time.time())
for chunk in container.logs(stream=True, follow=True, since=since_ts):
if isinstance(chunk, bytes):
text = decoder.decode(chunk)
else:
text = str(chunk)
if not text:
continue
buffer += text
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
normalized = line.strip("\r").strip()
if not normalized:
continue
if dashboard_capture is not None:
dashboard_capture = f"{dashboard_capture}\n{normalized}"
if "__DASHBOARD_DATA_END__" in dashboard_capture:
state_packet = self._parse_dashboard_packet(dashboard_capture)
if state_packet:
callback(bot_id, state_packet)
dashboard_capture = None
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
if "__DASHBOARD_DATA_START__" in normalized and "__DASHBOARD_DATA_END__" not in normalized:
dashboard_capture = normalized
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
state_packet = self._parse_log_line(normalized)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
rest = decoder.decode(b"", final=True)
if rest:
buffer += rest
tail = buffer.strip()
if tail:
state_packet = self._parse_log_line(tail)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": tail})
except Exception as e:
print(f"[DockerManager] Log stream closed for {bot_id}: {e}")
def _parse_dashboard_packet(self, line: str):
if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line:
return None
try:
raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip()
data = json.loads(raw_json)
event_type = str(data.get("type", "")).upper()
content = str(data.get("content") or data.get("text") or "").strip()
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
is_progress = bool(data.get("is_progress", False))
is_tool = bool(data.get("is_tool", False))
usage = data.get("usage") if isinstance(data.get("usage"), dict) else None
request_id = str(data.get("request_id") or "").strip() or None
provider = str(data.get("provider") or "").strip() or None
model = str(data.get("model") or "").strip() or None
if event_type == "AGENT_STATE":
payload = data.get("payload") or {}
state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING"))
action_msg = str(payload.get("action_msg") or payload.get("msg") or content)
return {
"type": "AGENT_STATE",
"channel": "dashboard",
"payload": {"state": state, "action_msg": action_msg},
"request_id": request_id,
}
if event_type == "ASSISTANT_MESSAGE":
if content or media:
return {
"type": "ASSISTANT_MESSAGE",
"channel": "dashboard",
"text": content,
"media": media,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
return None
if event_type == "BUS_EVENT" or is_progress:
return {
"type": "BUS_EVENT",
"channel": "dashboard",
"content": content,
"media": media,
"is_progress": is_progress,
"is_tool": is_tool,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
if content or media:
return {
"type": "ASSISTANT_MESSAGE",
"channel": "dashboard",
"text": content,
"media": media,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
except Exception:
return None
return None
def _parse_log_line(self, line: str):
# 1. 结构化数据解析(首选,直接从机器人总线获取)
if "__DASHBOARD_DATA_START__" in line:
packet = self._parse_dashboard_packet(line)
if packet:
return packet
# 2. 解析全渠道运行态日志(用于右侧状态面板)
process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line)
if process_match:
channel = process_match.group(1).strip().lower()
action_msg = process_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "THINKING",
"action_msg": action_msg[:4000],
},
}
response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line)
if response_match:
channel = response_match.group(1).strip().lower()
action_msg = response_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "SUCCESS",
"action_msg": action_msg[:4000],
},
}
# 3. 备选方案:常规日志解析
lower = line.lower()
tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE)
if tool_call_match:
return {
"type": "AGENT_STATE",
"payload": {
"state": "TOOL_CALL",
"action_msg": tool_call_match.group(1).strip()[:4000],
},
}
if "error" in lower or "traceback" in lower:
return {
"type": "AGENT_STATE",
"payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"},
}
return None