865 lines
36 KiB
Python
865 lines
36 KiB
Python
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import codecs
|
|
import base64
|
|
import uuid
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
import json
|
|
|
|
import docker
|
|
|
|
|
|
class BotDockerManager:
|
|
def __init__(self, host_data_root: str, base_image: str = "nanobot-base:v0.1.4"):
|
|
try:
|
|
self.client = docker.from_env(timeout=6)
|
|
self.client.version()
|
|
print("✅ Docker 引擎连接成功")
|
|
except Exception as e:
|
|
self.client = None
|
|
print(f"⚠️ 警告: 无法连接到 Docker 引擎。请确保 Docker Desktop 已启动。错误: {e}")
|
|
|
|
self.host_data_root = host_data_root
|
|
self.base_image = base_image
|
|
self.active_monitors = {}
|
|
self._last_delivery_error: Dict[str, str] = {}
|
|
|
|
@staticmethod
|
|
def _build_http_probe_payload_b64(
|
|
url: str,
|
|
method: str = "GET",
|
|
headers: Optional[Dict[str, str]] = None,
|
|
body_json: Optional[Dict[str, Any]] = None,
|
|
timeout_seconds: int = 10,
|
|
) -> str:
|
|
safe_method = str(method or "GET").strip().upper()
|
|
if safe_method not in {"GET", "POST"}:
|
|
safe_method = "GET"
|
|
timeout = max(1, min(int(timeout_seconds or 10), 30))
|
|
payload = {
|
|
"url": str(url or "").strip(),
|
|
"method": safe_method,
|
|
"headers": headers or {},
|
|
"body_json": body_json if isinstance(body_json, dict) else None,
|
|
"timeout": timeout,
|
|
}
|
|
return base64.b64encode(json.dumps(payload, ensure_ascii=False).encode("utf-8")).decode("ascii")
|
|
|
|
@staticmethod
|
|
def _http_probe_python_script() -> str:
|
|
return (
|
|
"import base64, json, os, urllib.request, urllib.error\n"
|
|
"cfg = json.loads(base64.b64decode(os.environ['DASHBOARD_HTTP_PROBE_B64']).decode('utf-8'))\n"
|
|
"url = str(cfg.get('url') or '').strip()\n"
|
|
"method = str(cfg.get('method') or 'GET').upper()\n"
|
|
"headers = cfg.get('headers') or {}\n"
|
|
"timeout = int(cfg.get('timeout') or 10)\n"
|
|
"data = None\n"
|
|
"if method == 'POST':\n"
|
|
" body = cfg.get('body_json')\n"
|
|
" if not isinstance(body, dict):\n"
|
|
" body = {}\n"
|
|
" data = json.dumps(body, ensure_ascii=False).encode('utf-8')\n"
|
|
" if 'Content-Type' not in headers:\n"
|
|
" headers['Content-Type'] = 'application/json'\n"
|
|
"req = urllib.request.Request(url, data=data, headers=headers, method=method)\n"
|
|
"result = {'ok': False, 'status_code': None, 'content_type': '', 'body_preview': '', 'message': ''}\n"
|
|
"try:\n"
|
|
" with urllib.request.urlopen(req, timeout=timeout) as resp:\n"
|
|
" body = resp.read(1024).decode('utf-8', 'ignore')\n"
|
|
" result.update({'ok': True, 'status_code': int(getattr(resp, 'status', 200) or 200), 'content_type': str(resp.headers.get('content-type') or ''), 'body_preview': body[:512], 'message': 'ok'})\n"
|
|
"except urllib.error.HTTPError as e:\n"
|
|
" body = ''\n"
|
|
" try:\n"
|
|
" body = e.read(1024).decode('utf-8', 'ignore')\n"
|
|
" except Exception:\n"
|
|
" body = ''\n"
|
|
" result.update({'ok': False, 'status_code': int(e.code or 0), 'content_type': str((e.headers or {}).get('content-type') or ''), 'body_preview': body[:512], 'message': f'HTTPError: {e.code}'})\n"
|
|
"except Exception as e:\n"
|
|
" result.update({'ok': False, 'status_code': None, 'content_type': '', 'body_preview': '', 'message': f'{type(e).__name__}: {e}'})\n"
|
|
"print(json.dumps(result, ensure_ascii=False))\n"
|
|
)
|
|
|
|
def _run_http_probe_exec(self, container, payload_b64: str) -> Dict[str, Any]:
|
|
py_script = self._http_probe_python_script()
|
|
py_bins = ["python3", "python"]
|
|
last_error = ""
|
|
for py_bin in py_bins:
|
|
try:
|
|
exec_result = container.exec_run(
|
|
[py_bin, "-c", py_script],
|
|
environment={"DASHBOARD_HTTP_PROBE_B64": payload_b64},
|
|
)
|
|
except Exception as e:
|
|
last_error = f"exec {py_bin} failed: {e}"
|
|
continue
|
|
output = exec_result.output.decode("utf-8", errors="ignore") if isinstance(exec_result.output, (bytes, bytearray)) else str(exec_result.output)
|
|
if exec_result.exit_code != 0:
|
|
last_error = f"exec {py_bin} exit={exec_result.exit_code}: {output[:300]}"
|
|
continue
|
|
try:
|
|
parsed = json.loads(output.strip() or "{}")
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
except Exception:
|
|
last_error = f"exec {py_bin} returned non-json: {output[:300]}"
|
|
return {"ok": False, "message": last_error or "Failed to run probe in container"}
|
|
|
|
@staticmethod
|
|
def _normalize_resource_limits(
|
|
cpu_cores: Optional[float],
|
|
memory_mb: Optional[int],
|
|
storage_gb: Optional[int],
|
|
) -> Tuple[float, int, int]:
|
|
try:
|
|
cpu = float(cpu_cores) if cpu_cores is not None else 1.0
|
|
except Exception:
|
|
cpu = 1.0
|
|
try:
|
|
memory = int(memory_mb) if memory_mb is not None else 1024
|
|
except Exception:
|
|
memory = 1024
|
|
try:
|
|
storage = int(storage_gb) if storage_gb is not None else 10
|
|
except Exception:
|
|
storage = 10
|
|
if cpu < 0:
|
|
cpu = 1.0
|
|
if memory < 0:
|
|
memory = 1024
|
|
if storage < 0:
|
|
storage = 10
|
|
cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu))
|
|
memory = 0 if memory == 0 else min(65536, max(256, memory))
|
|
storage = 0 if storage == 0 else min(1024, max(1, storage))
|
|
return cpu, memory, storage
|
|
|
|
def has_image(self, tag: str) -> bool:
|
|
if not self.client:
|
|
return False
|
|
try:
|
|
self.client.images.get(tag)
|
|
return True
|
|
except docker.errors.ImageNotFound:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def list_images_by_repo(self, repository: str = "nanobot-base") -> List[Dict[str, str]]:
|
|
"""List docker images by repository prefix, returning normalized tag/id pairs."""
|
|
if not self.client:
|
|
return []
|
|
rows: List[Dict[str, str]] = []
|
|
try:
|
|
images = self.client.images.list(name=repository)
|
|
for image in images:
|
|
for tag in image.tags:
|
|
repo, _, version = tag.partition(":")
|
|
if repo != repository or not version:
|
|
continue
|
|
rows.append(
|
|
{
|
|
"tag": tag,
|
|
"version": version.removeprefix("v"),
|
|
"image_id": image.id,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
print(f"[DockerManager] list_images_by_repo failed: {e}")
|
|
return rows
|
|
|
|
def start_bot(
|
|
self,
|
|
bot_id: str,
|
|
image_tag: Optional[str] = None,
|
|
env_vars: Optional[Dict[str, str]] = None,
|
|
cpu_cores: Optional[float] = None,
|
|
memory_mb: Optional[int] = None,
|
|
storage_gb: Optional[int] = None,
|
|
on_state_change: Optional[Callable[[str, dict], None]] = None,
|
|
) -> bool:
|
|
if not self.client:
|
|
print("❌ 错误: Docker 客户端未初始化,无法启动机器人。")
|
|
return False
|
|
|
|
image = image_tag or self.base_image
|
|
if not self.has_image(image):
|
|
print(f"❌ 错误: 镜像不存在: {image}")
|
|
return False
|
|
|
|
bot_workspace = os.path.join(self.host_data_root, bot_id, ".nanobot")
|
|
container_name = f"worker_{bot_id}"
|
|
os.makedirs(bot_workspace, exist_ok=True)
|
|
cpu, memory, storage = self._normalize_resource_limits(cpu_cores, memory_mb, storage_gb)
|
|
base_kwargs = {
|
|
"image": image,
|
|
"name": container_name,
|
|
"detach": True,
|
|
"stdin_open": True,
|
|
"tty": True,
|
|
"environment": env_vars or {},
|
|
"volumes": {
|
|
bot_workspace: {"bind": "/root/.nanobot", "mode": "rw"},
|
|
},
|
|
"network_mode": "bridge",
|
|
}
|
|
if memory > 0:
|
|
base_kwargs["mem_limit"] = f"{memory}m"
|
|
if cpu > 0:
|
|
base_kwargs["nano_cpus"] = int(cpu * 1_000_000_000)
|
|
|
|
try:
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
container.reload()
|
|
if container.status == "running":
|
|
if on_state_change:
|
|
self.ensure_monitor(bot_id, on_state_change)
|
|
return True
|
|
container.remove(force=True)
|
|
except docker.errors.NotFound:
|
|
pass
|
|
|
|
container = None
|
|
if storage > 0:
|
|
try:
|
|
container = self.client.containers.run(
|
|
storage_opt={"size": f"{storage}G"},
|
|
**base_kwargs,
|
|
)
|
|
except Exception as e:
|
|
# Some Docker engines (e.g. Desktop/overlay2) may not support size storage option.
|
|
print(f"[DockerManager] storage limit not applied for {bot_id}: {e}")
|
|
container = self.client.containers.run(**base_kwargs)
|
|
else:
|
|
container = self.client.containers.run(**base_kwargs)
|
|
|
|
if on_state_change:
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_container_logs,
|
|
args=(bot_id, container, on_state_change),
|
|
daemon=True,
|
|
)
|
|
monitor_thread.start()
|
|
self.active_monitors[bot_id] = monitor_thread
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error starting bot {bot_id}: {e}")
|
|
return False
|
|
|
|
def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool:
|
|
"""Ensure an active log monitor exists for a running bot container."""
|
|
if not self.client:
|
|
return False
|
|
existing = self.active_monitors.get(bot_id)
|
|
if existing and existing.is_alive():
|
|
return True
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
if container.status != "running":
|
|
return False
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_container_logs,
|
|
args=(bot_id, container, on_state_change),
|
|
daemon=True,
|
|
)
|
|
monitor_thread.start()
|
|
self.active_monitors[bot_id] = monitor_thread
|
|
return True
|
|
except docker.errors.NotFound:
|
|
return False
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error ensuring monitor for {bot_id}: {e}")
|
|
return False
|
|
|
|
def stop_bot(self, bot_id: str) -> bool:
|
|
if not self.client:
|
|
return False
|
|
container_name = f"worker_{bot_id}"
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
container.stop(timeout=5)
|
|
container.remove()
|
|
self.active_monitors.pop(bot_id, None)
|
|
return True
|
|
except docker.errors.NotFound:
|
|
self.active_monitors.pop(bot_id, None)
|
|
return False
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error stopping bot {bot_id}: {e}")
|
|
return False
|
|
|
|
def probe_http_from_container(
|
|
self,
|
|
bot_id: str,
|
|
url: str,
|
|
method: str = "GET",
|
|
headers: Optional[Dict[str, str]] = None,
|
|
body_json: Optional[Dict[str, Any]] = None,
|
|
timeout_seconds: int = 10,
|
|
) -> Dict[str, Any]:
|
|
if not self.client:
|
|
return {"ok": False, "message": "Docker client is not available"}
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
if container.status != "running":
|
|
return {"ok": False, "message": f"Bot container is {container.status}"}
|
|
except docker.errors.NotFound:
|
|
return {"ok": False, "message": "Bot container not found"}
|
|
except Exception as e:
|
|
return {"ok": False, "message": f"Failed to inspect bot container: {e}"}
|
|
payload_b64 = self._build_http_probe_payload_b64(
|
|
url=url,
|
|
method=method,
|
|
headers=headers,
|
|
body_json=body_json,
|
|
timeout_seconds=timeout_seconds,
|
|
)
|
|
return self._run_http_probe_exec(container, payload_b64)
|
|
|
|
def probe_http_via_temporary_container(
|
|
self,
|
|
image_tag: str,
|
|
url: str,
|
|
method: str = "GET",
|
|
headers: Optional[Dict[str, str]] = None,
|
|
body_json: Optional[Dict[str, Any]] = None,
|
|
timeout_seconds: int = 10,
|
|
) -> Dict[str, Any]:
|
|
if not self.client:
|
|
return {"ok": False, "message": "Docker client is not available"}
|
|
image = str(image_tag or self.base_image).strip() or self.base_image
|
|
payload_b64 = self._build_http_probe_payload_b64(
|
|
url=url,
|
|
method=method,
|
|
headers=headers,
|
|
body_json=body_json,
|
|
timeout_seconds=timeout_seconds,
|
|
)
|
|
container = None
|
|
try:
|
|
container = self.client.containers.run(
|
|
image=image,
|
|
name=f"dashboard_probe_{uuid.uuid4().hex[:10]}",
|
|
command=["sh", "-c", "sleep 45"],
|
|
detach=True,
|
|
tty=False,
|
|
stdin_open=False,
|
|
network_mode="bridge",
|
|
)
|
|
return self._run_http_probe_exec(container, payload_b64)
|
|
except docker.errors.ImageNotFound:
|
|
return {"ok": False, "message": f"Probe image not found: {image}"}
|
|
except Exception as e:
|
|
return {"ok": False, "message": f"Failed to run temporary probe container: {e}"}
|
|
finally:
|
|
if container is not None:
|
|
try:
|
|
container.remove(force=True)
|
|
except Exception:
|
|
pass
|
|
|
|
def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
"""Send a command to dashboard channel with robust container-local delivery."""
|
|
if not self.client:
|
|
self._last_delivery_error[bot_id] = "Docker client is not available"
|
|
return False
|
|
media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()]
|
|
self._last_delivery_error.pop(bot_id, None)
|
|
|
|
# Primary path on Docker Desktop/Mac: execute curl inside container namespace.
|
|
for attempt in range(3):
|
|
if self._send_command_via_exec(bot_id, command, media_paths):
|
|
self._last_delivery_error.pop(bot_id, None)
|
|
return True
|
|
time.sleep(0.25 * (attempt + 1))
|
|
|
|
# Secondary path for environments where host can reach container IP.
|
|
if self._send_command_via_host_http(bot_id, command, media_paths):
|
|
self._last_delivery_error.pop(bot_id, None)
|
|
return True
|
|
if bot_id not in self._last_delivery_error:
|
|
self._last_delivery_error[bot_id] = "Unknown delivery failure"
|
|
return False
|
|
|
|
def get_last_delivery_error(self, bot_id: str) -> str:
|
|
return str(self._last_delivery_error.get(bot_id, "") or "").strip()
|
|
|
|
def get_bot_status(self, bot_id: str) -> str:
|
|
"""Return normalized runtime status from Docker: RUNNING or STOPPED."""
|
|
if not self.client:
|
|
return "STOPPED"
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
raw = str(container.status or "").strip().lower()
|
|
if raw in {"running", "restarting"}:
|
|
return "RUNNING"
|
|
return "STOPPED"
|
|
except docker.errors.NotFound:
|
|
return "STOPPED"
|
|
except Exception:
|
|
return "STOPPED"
|
|
|
|
@staticmethod
|
|
def _parse_size_to_bytes(raw: Any) -> Optional[int]:
|
|
if raw is None:
|
|
return None
|
|
text = str(raw).strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return int(float(text))
|
|
except Exception:
|
|
pass
|
|
match = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)\s*([kmgtp]?)(i?b)?", text.lower())
|
|
if not match:
|
|
return None
|
|
number = float(match.group(1))
|
|
unit = (match.group(2) or "").lower()
|
|
scale = {
|
|
"": 1,
|
|
"k": 1024,
|
|
"m": 1024 ** 2,
|
|
"g": 1024 ** 3,
|
|
"t": 1024 ** 4,
|
|
"p": 1024 ** 5,
|
|
}.get(unit, 1)
|
|
return int(number * scale)
|
|
|
|
@staticmethod
|
|
def _calc_cpu_percent(stats: Dict[str, Any]) -> float:
|
|
try:
|
|
cpu_stats = stats.get("cpu_stats") or {}
|
|
precpu_stats = stats.get("precpu_stats") or {}
|
|
cpu_total = float((cpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
|
|
prev_cpu_total = float((precpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
|
|
cpu_delta = cpu_total - prev_cpu_total
|
|
system_total = float(cpu_stats.get("system_cpu_usage") or 0)
|
|
prev_system_total = float(precpu_stats.get("system_cpu_usage") or 0)
|
|
system_delta = system_total - prev_system_total
|
|
online_cpus = int(
|
|
cpu_stats.get("online_cpus")
|
|
or len((cpu_stats.get("cpu_usage") or {}).get("percpu_usage") or [])
|
|
or 1
|
|
)
|
|
if cpu_delta <= 0 or system_delta <= 0:
|
|
return 0.0
|
|
return max(0.0, (cpu_delta / system_delta) * online_cpus * 100.0)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def get_bot_resource_snapshot(self, bot_id: str) -> Dict[str, Any]:
|
|
snapshot: Dict[str, Any] = {
|
|
"docker_status": "STOPPED",
|
|
"limits": {
|
|
"cpu_cores": None,
|
|
"memory_bytes": None,
|
|
"storage_bytes": None,
|
|
"nano_cpus": 0,
|
|
"storage_opt_raw": "",
|
|
},
|
|
"usage": {
|
|
"cpu_percent": 0.0,
|
|
"memory_bytes": 0,
|
|
"memory_limit_bytes": 0,
|
|
"memory_percent": 0.0,
|
|
"network_rx_bytes": 0,
|
|
"network_tx_bytes": 0,
|
|
"blk_read_bytes": 0,
|
|
"blk_write_bytes": 0,
|
|
"pids": 0,
|
|
"container_rw_bytes": 0,
|
|
},
|
|
}
|
|
if not self.client:
|
|
return snapshot
|
|
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
status_raw = str(container.status or "").strip().lower()
|
|
snapshot["docker_status"] = "RUNNING" if status_raw in {"running", "restarting"} else "STOPPED"
|
|
|
|
inspect: Dict[str, Any]
|
|
try:
|
|
inspect = self.client.api.inspect_container(container.id, size=True)
|
|
except TypeError:
|
|
# Older docker SDK versions do not support `size` kwarg.
|
|
inspect = self.client.api.inspect_container(container.id)
|
|
except Exception as e:
|
|
if "unexpected keyword argument 'size'" in str(e):
|
|
inspect = self.client.api.inspect_container(container.id)
|
|
else:
|
|
raise
|
|
host_cfg = inspect.get("HostConfig") or {}
|
|
nano_cpus = int(host_cfg.get("NanoCpus") or 0)
|
|
cpu_quota = int(host_cfg.get("CpuQuota") or 0)
|
|
cpu_period = int(host_cfg.get("CpuPeriod") or 0)
|
|
memory_bytes = int(host_cfg.get("Memory") or 0)
|
|
storage_opt = host_cfg.get("StorageOpt") or {}
|
|
storage_raw = storage_opt.get("size")
|
|
storage_bytes = self._parse_size_to_bytes(storage_raw)
|
|
|
|
if nano_cpus > 0:
|
|
cpu_cores = nano_cpus / 1_000_000_000
|
|
elif cpu_quota > 0 and cpu_period > 0:
|
|
cpu_cores = cpu_quota / cpu_period
|
|
else:
|
|
cpu_cores = None
|
|
|
|
snapshot["limits"] = {
|
|
"cpu_cores": cpu_cores,
|
|
"memory_bytes": memory_bytes if memory_bytes > 0 else None,
|
|
"storage_bytes": storage_bytes,
|
|
"nano_cpus": nano_cpus,
|
|
"storage_opt_raw": str(storage_raw or ""),
|
|
}
|
|
snapshot["usage"]["container_rw_bytes"] = int(inspect.get("SizeRw") or 0)
|
|
|
|
if snapshot["docker_status"] == "RUNNING":
|
|
stats = container.stats(stream=False) or {}
|
|
memory_stats = stats.get("memory_stats") or {}
|
|
memory_usage = int(memory_stats.get("usage") or 0)
|
|
memory_limit = int(memory_stats.get("limit") or 0)
|
|
if memory_usage > 0:
|
|
cache = int((memory_stats.get("stats") or {}).get("inactive_file") or 0)
|
|
memory_usage = max(0, memory_usage - cache)
|
|
networks = stats.get("networks") or {}
|
|
rx_total = 0
|
|
tx_total = 0
|
|
for _, row in networks.items():
|
|
if isinstance(row, dict):
|
|
rx_total += int(row.get("rx_bytes") or 0)
|
|
tx_total += int(row.get("tx_bytes") or 0)
|
|
blk_stats = stats.get("blkio_stats") or {}
|
|
io_rows = blk_stats.get("io_service_bytes_recursive") or []
|
|
blk_read = 0
|
|
blk_write = 0
|
|
for row in io_rows:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
op = str(row.get("op") or "").upper()
|
|
value = int(row.get("value") or 0)
|
|
if op == "READ":
|
|
blk_read += value
|
|
elif op == "WRITE":
|
|
blk_write += value
|
|
pids_current = int((stats.get("pids_stats") or {}).get("current") or 0)
|
|
cpu_percent = self._calc_cpu_percent(stats)
|
|
memory_percent = 0.0
|
|
if memory_limit > 0:
|
|
memory_percent = (memory_usage / memory_limit) * 100.0
|
|
if snapshot["usage"]["container_rw_bytes"] <= 0:
|
|
storage_stats = stats.get("storage_stats") or {}
|
|
rw_size = int(
|
|
storage_stats.get("size_rw")
|
|
or storage_stats.get("rw_size")
|
|
or 0
|
|
)
|
|
snapshot["usage"]["container_rw_bytes"] = max(0, rw_size)
|
|
|
|
snapshot["usage"].update(
|
|
{
|
|
"cpu_percent": cpu_percent,
|
|
"memory_bytes": memory_usage,
|
|
"memory_limit_bytes": memory_limit,
|
|
"memory_percent": max(0.0, memory_percent),
|
|
"network_rx_bytes": rx_total,
|
|
"network_tx_bytes": tx_total,
|
|
"blk_read_bytes": blk_read,
|
|
"blk_write_bytes": blk_write,
|
|
"pids": pids_current,
|
|
}
|
|
)
|
|
except docker.errors.NotFound:
|
|
return snapshot
|
|
except Exception as e:
|
|
print(f"[DockerManager] get_bot_resource_snapshot failed for {bot_id}: {e}")
|
|
return snapshot
|
|
|
|
def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
if container.status != "running":
|
|
self._last_delivery_error[bot_id] = f"Container status is {container.status}"
|
|
return False
|
|
payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False)
|
|
|
|
# Try direct curl first (no shell dependency).
|
|
result = container.exec_run(
|
|
[
|
|
"curl",
|
|
"-sS",
|
|
"--fail",
|
|
"--max-time",
|
|
"6",
|
|
"-X",
|
|
"POST",
|
|
"-H",
|
|
"Content-Type: application/json",
|
|
"-d",
|
|
payload_json,
|
|
"http://127.0.0.1:9000/chat",
|
|
]
|
|
)
|
|
output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output)
|
|
if result.exit_code != 0:
|
|
reason = f"exec curl failed: exit={result.exit_code}, out={output[:300]}"
|
|
print(f"[DockerManager] {reason}")
|
|
self._last_delivery_error[bot_id] = reason
|
|
# Fallback inside container without curl/shell.
|
|
payload_b64 = base64.b64encode(payload_json.encode("utf-8")).decode("ascii")
|
|
py_script = (
|
|
"import base64,json,os,urllib.request\n"
|
|
"payload=json.loads(base64.b64decode(os.environ['DASHBOARD_PAYLOAD_B64']).decode('utf-8'))\n"
|
|
"req=urllib.request.Request('http://127.0.0.1:9000/chat',"
|
|
"data=json.dumps(payload,ensure_ascii=False).encode('utf-8'),"
|
|
"headers={'Content-Type':'application/json'})\n"
|
|
"with urllib.request.urlopen(req, timeout=8) as resp:\n"
|
|
" print(resp.read().decode('utf-8','ignore'))\n"
|
|
)
|
|
py_bins = ["python3", "python"]
|
|
for py_bin in py_bins:
|
|
py_result = container.exec_run(
|
|
[py_bin, "-c", py_script],
|
|
environment={"DASHBOARD_PAYLOAD_B64": payload_b64},
|
|
)
|
|
py_output = py_result.output.decode("utf-8", errors="ignore") if isinstance(py_result.output, (bytes, bytearray)) else str(py_result.output)
|
|
if py_result.exit_code != 0:
|
|
py_reason = f"exec {py_bin} fallback failed: exit={py_result.exit_code}, out={py_output[:300]}"
|
|
print(f"[DockerManager] {py_reason}")
|
|
self._last_delivery_error[bot_id] = py_reason
|
|
continue
|
|
if py_output.strip():
|
|
try:
|
|
parsed = json.loads(py_output)
|
|
if str(parsed.get("status", "")).lower() != "ok":
|
|
py_reason = f"exec {py_bin} fallback non-ok response: {py_output[:300]}"
|
|
print(f"[DockerManager] {py_reason}")
|
|
self._last_delivery_error[bot_id] = py_reason
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return True
|
|
return False
|
|
if output.strip():
|
|
try:
|
|
parsed = json.loads(output)
|
|
if str(parsed.get("status", "")).lower() != "ok":
|
|
reason = f"exec curl non-ok response: {output[:300]}"
|
|
print(f"[DockerManager] {reason}")
|
|
self._last_delivery_error[bot_id] = reason
|
|
return False
|
|
except Exception:
|
|
# Non-JSON but zero exit still treated as success.
|
|
pass
|
|
return True
|
|
except Exception as e:
|
|
reason = f"exec curl exception: {e}"
|
|
print(f"[DockerManager] {reason}")
|
|
self._last_delivery_error[bot_id] = reason
|
|
return False
|
|
|
|
def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
try:
|
|
import httpx
|
|
|
|
container_name = f"worker_{bot_id}"
|
|
payload = {"message": command, "media": media or []}
|
|
container = self.client.containers.get(container_name)
|
|
ip_address = container.attrs["NetworkSettings"]["IPAddress"] or "127.0.0.1"
|
|
target_url = f"http://{ip_address}:9000/chat"
|
|
|
|
with httpx.Client(timeout=4.0) as client:
|
|
resp = client.post(target_url, json=payload)
|
|
if resp.status_code == 200:
|
|
return True
|
|
reason = f"host HTTP failed: {resp.status_code} - {resp.text[:300]}"
|
|
print(f"[DockerManager] {reason}")
|
|
self._last_delivery_error[bot_id] = reason
|
|
return False
|
|
except Exception as e:
|
|
reason = f"host HTTP exception: {e}"
|
|
print(f"[DockerManager] {reason}")
|
|
self._last_delivery_error[bot_id] = reason
|
|
return False
|
|
|
|
def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]:
|
|
if not self.client:
|
|
return []
|
|
container_name = f"worker_{bot_id}"
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
raw = container.logs(tail=max(1, int(tail)))
|
|
text = raw.decode("utf-8", errors="ignore")
|
|
return [line for line in text.splitlines() if line.strip()]
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error reading logs for {bot_id}: {e}")
|
|
return []
|
|
|
|
def _monitor_container_logs(self, bot_id: str, container, callback: Callable[[str, dict], None]):
|
|
try:
|
|
buffer = ""
|
|
dashboard_capture: Optional[str] = None
|
|
decoder = codecs.getincrementaldecoder("utf-8")("replace")
|
|
# Only tail new logs from "now" to avoid replaying historical stdout
|
|
# (which would repopulate cleared chat messages from old dashboard packets).
|
|
since_ts = int(time.time())
|
|
for chunk in container.logs(stream=True, follow=True, since=since_ts):
|
|
if isinstance(chunk, bytes):
|
|
text = decoder.decode(chunk)
|
|
else:
|
|
text = str(chunk)
|
|
if not text:
|
|
continue
|
|
buffer += text
|
|
|
|
while "\n" in buffer:
|
|
line, buffer = buffer.split("\n", 1)
|
|
normalized = line.strip("\r").strip()
|
|
if not normalized:
|
|
continue
|
|
|
|
if dashboard_capture is not None:
|
|
dashboard_capture = f"{dashboard_capture}\n{normalized}"
|
|
if "__DASHBOARD_DATA_END__" in dashboard_capture:
|
|
state_packet = self._parse_dashboard_packet(dashboard_capture)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
dashboard_capture = None
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
continue
|
|
|
|
if "__DASHBOARD_DATA_START__" in normalized and "__DASHBOARD_DATA_END__" not in normalized:
|
|
dashboard_capture = normalized
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
continue
|
|
|
|
state_packet = self._parse_log_line(normalized)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
|
|
rest = decoder.decode(b"", final=True)
|
|
if rest:
|
|
buffer += rest
|
|
|
|
tail = buffer.strip()
|
|
if tail:
|
|
state_packet = self._parse_log_line(tail)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
callback(bot_id, {"type": "RAW_LOG", "text": tail})
|
|
except Exception as e:
|
|
print(f"[DockerManager] Log stream closed for {bot_id}: {e}")
|
|
|
|
def _parse_dashboard_packet(self, line: str):
|
|
if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line:
|
|
return None
|
|
try:
|
|
raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip()
|
|
data = json.loads(raw_json)
|
|
event_type = str(data.get("type", "")).upper()
|
|
content = str(data.get("content") or data.get("text") or "").strip()
|
|
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
|
|
is_progress = bool(data.get("is_progress", False))
|
|
is_tool = bool(data.get("is_tool", False))
|
|
|
|
if event_type == "AGENT_STATE":
|
|
payload = data.get("payload") or {}
|
|
state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING"))
|
|
action_msg = str(payload.get("action_msg") or payload.get("msg") or content)
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": "dashboard",
|
|
"payload": {"state": state, "action_msg": action_msg},
|
|
}
|
|
|
|
if event_type == "ASSISTANT_MESSAGE":
|
|
if content or media:
|
|
return {"type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": content, "media": media}
|
|
return None
|
|
|
|
if event_type == "BUS_EVENT" or is_progress:
|
|
return {
|
|
"type": "BUS_EVENT",
|
|
"channel": "dashboard",
|
|
"content": content,
|
|
"media": media,
|
|
"is_progress": is_progress,
|
|
"is_tool": is_tool,
|
|
}
|
|
|
|
if content or media:
|
|
return {
|
|
"type": "ASSISTANT_MESSAGE",
|
|
"channel": "dashboard",
|
|
"text": content,
|
|
"media": media,
|
|
}
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
def _parse_log_line(self, line: str):
|
|
# 1. 结构化数据解析(首选,直接从机器人总线获取)
|
|
if "__DASHBOARD_DATA_START__" in line:
|
|
packet = self._parse_dashboard_packet(line)
|
|
if packet:
|
|
return packet
|
|
|
|
# 2. 解析全渠道运行态日志(用于右侧状态面板)
|
|
process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line)
|
|
if process_match:
|
|
channel = process_match.group(1).strip().lower()
|
|
action_msg = process_match.group(2).strip()
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": channel,
|
|
"payload": {
|
|
"state": "THINKING",
|
|
"action_msg": action_msg[:4000],
|
|
},
|
|
}
|
|
|
|
response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line)
|
|
if response_match:
|
|
channel = response_match.group(1).strip().lower()
|
|
action_msg = response_match.group(2).strip()
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": channel,
|
|
"payload": {
|
|
"state": "SUCCESS",
|
|
"action_msg": action_msg[:4000],
|
|
},
|
|
}
|
|
|
|
# 3. 备选方案:常规日志解析
|
|
lower = line.lower()
|
|
tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE)
|
|
if tool_call_match:
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"payload": {
|
|
"state": "TOOL_CALL",
|
|
"action_msg": tool_call_match.group(1).strip()[:4000],
|
|
},
|
|
}
|
|
|
|
if "error" in lower or "traceback" in lower:
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"},
|
|
}
|
|
|
|
return None
|