398 lines
16 KiB
Python
398 lines
16 KiB
Python
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import codecs
|
|
from typing import Callable, Dict, List, Optional
|
|
import json
|
|
|
|
import docker
|
|
|
|
|
|
class BotDockerManager:
|
|
def __init__(self, host_data_root: str, base_image: str = "nanobot-base:v0.1.4"):
|
|
try:
|
|
self.client = docker.from_env(timeout=6)
|
|
self.client.version()
|
|
print("✅ Docker 引擎连接成功")
|
|
except Exception as e:
|
|
self.client = None
|
|
print(f"⚠️ 警告: 无法连接到 Docker 引擎。请确保 Docker Desktop 已启动。错误: {e}")
|
|
|
|
self.host_data_root = host_data_root
|
|
self.base_image = base_image
|
|
self.active_monitors = {}
|
|
|
|
def has_image(self, tag: str) -> bool:
|
|
if not self.client:
|
|
return False
|
|
try:
|
|
self.client.images.get(tag)
|
|
return True
|
|
except docker.errors.ImageNotFound:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def list_images_by_repo(self, repository: str = "nanobot-base") -> List[Dict[str, str]]:
|
|
"""List docker images by repository prefix, returning normalized tag/id pairs."""
|
|
if not self.client:
|
|
return []
|
|
rows: List[Dict[str, str]] = []
|
|
try:
|
|
images = self.client.images.list(name=repository)
|
|
for image in images:
|
|
for tag in image.tags:
|
|
repo, _, version = tag.partition(":")
|
|
if repo != repository or not version:
|
|
continue
|
|
rows.append(
|
|
{
|
|
"tag": tag,
|
|
"version": version.removeprefix("v"),
|
|
"image_id": image.id,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
print(f"[DockerManager] list_images_by_repo failed: {e}")
|
|
return rows
|
|
|
|
def start_bot(
|
|
self,
|
|
bot_id: str,
|
|
image_tag: Optional[str] = None,
|
|
env_vars: Optional[Dict[str, str]] = None,
|
|
on_state_change: Optional[Callable[[str, dict], None]] = None,
|
|
) -> bool:
|
|
if not self.client:
|
|
print("❌ 错误: Docker 客户端未初始化,无法启动机器人。")
|
|
return False
|
|
|
|
image = image_tag or self.base_image
|
|
if not self.has_image(image):
|
|
print(f"❌ 错误: 镜像不存在: {image}")
|
|
return False
|
|
|
|
bot_workspace = os.path.join(self.host_data_root, bot_id, ".nanobot")
|
|
container_name = f"worker_{bot_id}"
|
|
os.makedirs(bot_workspace, exist_ok=True)
|
|
|
|
try:
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
container.reload()
|
|
if container.status == "running":
|
|
if on_state_change:
|
|
self.ensure_monitor(bot_id, on_state_change)
|
|
return True
|
|
container.remove(force=True)
|
|
except docker.errors.NotFound:
|
|
pass
|
|
|
|
container = self.client.containers.run(
|
|
image=image,
|
|
name=container_name,
|
|
detach=True,
|
|
stdin_open=True,
|
|
tty=True,
|
|
environment=env_vars or {},
|
|
volumes={
|
|
bot_workspace: {"bind": "/root/.nanobot", "mode": "rw"},
|
|
},
|
|
mem_limit="1g",
|
|
cpu_quota=100000,
|
|
network_mode="bridge",
|
|
)
|
|
|
|
if on_state_change:
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_container_logs,
|
|
args=(bot_id, container, on_state_change),
|
|
daemon=True,
|
|
)
|
|
monitor_thread.start()
|
|
self.active_monitors[bot_id] = monitor_thread
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error starting bot {bot_id}: {e}")
|
|
return False
|
|
|
|
def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool:
|
|
"""Ensure an active log monitor exists for a running bot container."""
|
|
if not self.client:
|
|
return False
|
|
existing = self.active_monitors.get(bot_id)
|
|
if existing and existing.is_alive():
|
|
return True
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
container.reload()
|
|
if container.status != "running":
|
|
return False
|
|
monitor_thread = threading.Thread(
|
|
target=self._monitor_container_logs,
|
|
args=(bot_id, container, on_state_change),
|
|
daemon=True,
|
|
)
|
|
monitor_thread.start()
|
|
self.active_monitors[bot_id] = monitor_thread
|
|
return True
|
|
except docker.errors.NotFound:
|
|
return False
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error ensuring monitor for {bot_id}: {e}")
|
|
return False
|
|
|
|
def stop_bot(self, bot_id: str) -> bool:
|
|
if not self.client:
|
|
return False
|
|
container_name = f"worker_{bot_id}"
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
container.stop(timeout=5)
|
|
container.remove()
|
|
self.active_monitors.pop(bot_id, None)
|
|
return True
|
|
except docker.errors.NotFound:
|
|
self.active_monitors.pop(bot_id, None)
|
|
return False
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error stopping bot {bot_id}: {e}")
|
|
return False
|
|
|
|
def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
"""Send a command to dashboard channel with robust container-local delivery."""
|
|
if not self.client:
|
|
return False
|
|
media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()]
|
|
|
|
# Primary path on Docker Desktop/Mac: execute curl inside container namespace.
|
|
for attempt in range(3):
|
|
if self._send_command_via_exec(bot_id, command, media_paths):
|
|
return True
|
|
time.sleep(0.25 * (attempt + 1))
|
|
|
|
# Secondary path for environments where host can reach container IP.
|
|
return self._send_command_via_host_http(bot_id, command, media_paths)
|
|
|
|
def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
try:
|
|
container = self.client.containers.get(f"worker_{bot_id}")
|
|
payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False)
|
|
cmd = (
|
|
"curl -sS --fail --max-time 6 "
|
|
"-X POST -H 'Content-Type: application/json' "
|
|
"-d \"$DASHBOARD_PAYLOAD\" http://127.0.0.1:9000/chat"
|
|
)
|
|
result = container.exec_run(["/bin/sh", "-c", cmd], environment={"DASHBOARD_PAYLOAD": payload_json})
|
|
output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output)
|
|
if result.exit_code != 0:
|
|
print(f"[DockerManager] exec curl failed for {bot_id}: exit={result.exit_code}, out={output[:300]}")
|
|
return False
|
|
if output.strip():
|
|
try:
|
|
parsed = json.loads(output)
|
|
if str(parsed.get("status", "")).lower() != "ok":
|
|
print(f"[DockerManager] exec curl non-ok response for {bot_id}: {output[:300]}")
|
|
return False
|
|
except Exception:
|
|
# Non-JSON but zero exit still treated as success.
|
|
pass
|
|
return True
|
|
except Exception as e:
|
|
print(f"[DockerManager] exec curl exception for {bot_id}: {e}")
|
|
return False
|
|
|
|
def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
|
|
try:
|
|
import httpx
|
|
|
|
container_name = f"worker_{bot_id}"
|
|
payload = {"message": command, "media": media or []}
|
|
container = self.client.containers.get(container_name)
|
|
ip_address = container.attrs["NetworkSettings"]["IPAddress"] or "127.0.0.1"
|
|
target_url = f"http://{ip_address}:9000/chat"
|
|
|
|
with httpx.Client(timeout=4.0) as client:
|
|
resp = client.post(target_url, json=payload)
|
|
if resp.status_code == 200:
|
|
return True
|
|
print(f"[DockerManager] host HTTP failed: {resp.status_code} - {resp.text[:300]}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[DockerManager] host HTTP exception: {e}")
|
|
return False
|
|
|
|
def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]:
|
|
if not self.client:
|
|
return []
|
|
container_name = f"worker_{bot_id}"
|
|
try:
|
|
container = self.client.containers.get(container_name)
|
|
raw = container.logs(tail=max(1, int(tail)))
|
|
text = raw.decode("utf-8", errors="ignore")
|
|
return [line for line in text.splitlines() if line.strip()]
|
|
except Exception as e:
|
|
print(f"[DockerManager] Error reading logs for {bot_id}: {e}")
|
|
return []
|
|
|
|
def _monitor_container_logs(self, bot_id: str, container, callback: Callable[[str, dict], None]):
|
|
try:
|
|
buffer = ""
|
|
dashboard_capture: Optional[str] = None
|
|
decoder = codecs.getincrementaldecoder("utf-8")("replace")
|
|
# Only tail new logs from "now" to avoid replaying historical stdout
|
|
# (which would repopulate cleared chat messages from old dashboard packets).
|
|
since_ts = int(time.time())
|
|
for chunk in container.logs(stream=True, follow=True, since=since_ts):
|
|
if isinstance(chunk, bytes):
|
|
text = decoder.decode(chunk)
|
|
else:
|
|
text = str(chunk)
|
|
if not text:
|
|
continue
|
|
buffer += text
|
|
|
|
while "\n" in buffer:
|
|
line, buffer = buffer.split("\n", 1)
|
|
normalized = line.strip("\r").strip()
|
|
if not normalized:
|
|
continue
|
|
|
|
if dashboard_capture is not None:
|
|
dashboard_capture = f"{dashboard_capture}\n{normalized}"
|
|
if "__DASHBOARD_DATA_END__" in dashboard_capture:
|
|
state_packet = self._parse_dashboard_packet(dashboard_capture)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
dashboard_capture = None
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
continue
|
|
|
|
if "__DASHBOARD_DATA_START__" in normalized and "__DASHBOARD_DATA_END__" not in normalized:
|
|
dashboard_capture = normalized
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
continue
|
|
|
|
state_packet = self._parse_log_line(normalized)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
|
|
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
|
|
|
|
rest = decoder.decode(b"", final=True)
|
|
if rest:
|
|
buffer += rest
|
|
|
|
tail = buffer.strip()
|
|
if tail:
|
|
state_packet = self._parse_log_line(tail)
|
|
if state_packet:
|
|
callback(bot_id, state_packet)
|
|
callback(bot_id, {"type": "RAW_LOG", "text": tail})
|
|
except Exception as e:
|
|
print(f"[DockerManager] Log stream closed for {bot_id}: {e}")
|
|
|
|
def _parse_dashboard_packet(self, line: str):
|
|
if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line:
|
|
return None
|
|
try:
|
|
raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip()
|
|
data = json.loads(raw_json)
|
|
event_type = str(data.get("type", "")).upper()
|
|
content = str(data.get("content") or data.get("text") or "").strip()
|
|
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
|
|
is_progress = bool(data.get("is_progress", False))
|
|
is_tool = bool(data.get("is_tool", False))
|
|
|
|
if event_type == "AGENT_STATE":
|
|
payload = data.get("payload") or {}
|
|
state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING"))
|
|
action_msg = str(payload.get("action_msg") or payload.get("msg") or content)
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": "dashboard",
|
|
"payload": {"state": state, "action_msg": action_msg},
|
|
}
|
|
|
|
if event_type == "ASSISTANT_MESSAGE":
|
|
if content or media:
|
|
return {"type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": content, "media": media}
|
|
return None
|
|
|
|
if event_type == "BUS_EVENT" or is_progress:
|
|
return {
|
|
"type": "BUS_EVENT",
|
|
"channel": "dashboard",
|
|
"content": content,
|
|
"media": media,
|
|
"is_progress": is_progress,
|
|
"is_tool": is_tool,
|
|
}
|
|
|
|
if content or media:
|
|
return {
|
|
"type": "ASSISTANT_MESSAGE",
|
|
"channel": "dashboard",
|
|
"text": content,
|
|
"media": media,
|
|
}
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
def _parse_log_line(self, line: str):
|
|
# 1. 结构化数据解析(首选,直接从机器人总线获取)
|
|
if "__DASHBOARD_DATA_START__" in line:
|
|
packet = self._parse_dashboard_packet(line)
|
|
if packet:
|
|
return packet
|
|
|
|
# 2. 解析全渠道运行态日志(用于右侧状态面板)
|
|
process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line)
|
|
if process_match:
|
|
channel = process_match.group(1).strip().lower()
|
|
action_msg = process_match.group(2).strip()
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": channel,
|
|
"payload": {
|
|
"state": "THINKING",
|
|
"action_msg": action_msg[:4000],
|
|
},
|
|
}
|
|
|
|
response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line)
|
|
if response_match:
|
|
channel = response_match.group(1).strip().lower()
|
|
action_msg = response_match.group(2).strip()
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"channel": channel,
|
|
"payload": {
|
|
"state": "SUCCESS",
|
|
"action_msg": action_msg[:4000],
|
|
},
|
|
}
|
|
|
|
# 3. 备选方案:常规日志解析
|
|
lower = line.lower()
|
|
tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE)
|
|
if tool_call_match:
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"payload": {
|
|
"state": "TOOL_CALL",
|
|
"action_msg": tool_call_match.group(1).strip()[:4000],
|
|
},
|
|
}
|
|
|
|
if "error" in lower or "traceback" in lower:
|
|
return {
|
|
"type": "AGENT_STATE",
|
|
"payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"},
|
|
}
|
|
|
|
return None
|