dashboard-nanobot/backend/main.py

2369 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import json
import mimetypes
import os
import re
import shutil
import tempfile
import zipfile
from datetime import datetime
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel
from fastapi import Depends, FastAPI, File, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from sqlmodel import Session, select
from core.config_manager import BotConfigManager
from core.database import engine, get_session, init_database
from core.docker_manager import BotDockerManager
from core.settings import (
BOTS_WORKSPACE_ROOT,
DATA_ROOT,
DATABASE_ECHO,
DATABASE_ENGINE,
DATABASE_URL_DISPLAY,
DEFAULT_AGENTS_MD,
DEFAULT_IDENTITY_MD,
DEFAULT_SOUL_MD,
DEFAULT_TOOLS_MD,
DEFAULT_USER_MD,
PROJECT_ROOT,
UPLOAD_MAX_MB,
)
from models.bot import BotInstance, BotMessage, NanobotImage
app = FastAPI(title="Dashboard Nanobot API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
os.makedirs(BOTS_WORKSPACE_ROOT, exist_ok=True)
os.makedirs(DATA_ROOT, exist_ok=True)
docker_manager = BotDockerManager(host_data_root=BOTS_WORKSPACE_ROOT)
config_manager = BotConfigManager(host_data_root=BOTS_WORKSPACE_ROOT)
class ChannelConfigRequest(BaseModel):
channel_type: str
external_app_id: Optional[str] = None
app_secret: Optional[str] = None
internal_port: Optional[int] = None
is_active: bool = True
extra_config: Optional[Dict[str, Any]] = None
class ChannelConfigUpdateRequest(BaseModel):
channel_type: Optional[str] = None
external_app_id: Optional[str] = None
app_secret: Optional[str] = None
internal_port: Optional[int] = None
is_active: Optional[bool] = None
extra_config: Optional[Dict[str, Any]] = None
class BotCreateRequest(BaseModel):
id: str
name: str
llm_provider: str
llm_model: str
api_key: str
image_tag: str
system_prompt: Optional[str] = None
api_base: Optional[str] = None
temperature: float = 0.2
top_p: float = 1.0
max_tokens: int = 8192
cpu_cores: float = 1.0
memory_mb: int = 1024
storage_gb: int = 10
soul_md: Optional[str] = None
agents_md: Optional[str] = None
user_md: Optional[str] = None
tools_md: Optional[str] = None
tools_config: Optional[Dict[str, Any]] = None
env_params: Optional[Dict[str, str]] = None
identity_md: Optional[str] = None
channels: Optional[List[ChannelConfigRequest]] = None
send_progress: Optional[bool] = None
send_tool_hints: Optional[bool] = None
class BotUpdateRequest(BaseModel):
name: Optional[str] = None
llm_provider: Optional[str] = None
llm_model: Optional[str] = None
api_key: Optional[str] = None
api_base: Optional[str] = None
image_tag: Optional[str] = None
system_prompt: Optional[str] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
max_tokens: Optional[int] = None
cpu_cores: Optional[float] = None
memory_mb: Optional[int] = None
storage_gb: Optional[int] = None
soul_md: Optional[str] = None
agents_md: Optional[str] = None
user_md: Optional[str] = None
tools_md: Optional[str] = None
tools_config: Optional[Dict[str, Any]] = None
env_params: Optional[Dict[str, str]] = None
identity_md: Optional[str] = None
send_progress: Optional[bool] = None
send_tool_hints: Optional[bool] = None
class BotToolsConfigUpdateRequest(BaseModel):
tools_config: Optional[Dict[str, Any]] = None
class BotEnvParamsUpdateRequest(BaseModel):
env_params: Optional[Dict[str, str]] = None
class CommandRequest(BaseModel):
command: Optional[str] = None
attachments: Optional[List[str]] = None
def _normalize_packet_channel(packet: Dict[str, Any]) -> str:
raw = str(packet.get("channel") or packet.get("source") or "").strip().lower()
if raw in {"dashboard", "dashboard_channel", "dashboard-channel"}:
return "dashboard"
return raw
def _normalize_media_item(bot_id: str, value: Any) -> str:
raw = str(value or "").strip().replace("\\", "/")
if not raw:
return ""
if raw.startswith("/root/.nanobot/workspace/"):
return raw[len("/root/.nanobot/workspace/") :].lstrip("/")
root = _workspace_root(bot_id)
if os.path.isabs(raw):
try:
if os.path.commonpath([root, raw]) == root:
return os.path.relpath(raw, root).replace("\\", "/")
except Exception:
pass
return raw.lstrip("/")
def _normalize_media_list(raw: Any, bot_id: str) -> List[str]:
if not isinstance(raw, list):
return []
rows: List[str] = []
for v in raw:
s = _normalize_media_item(bot_id, v)
if s:
rows.append(s)
return rows
def _persist_runtime_packet(bot_id: str, packet: Dict[str, Any]):
packet_type = str(packet.get("type", "")).upper()
if packet_type not in {"AGENT_STATE", "ASSISTANT_MESSAGE", "USER_COMMAND", "BUS_EVENT"}:
return
source_channel = _normalize_packet_channel(packet)
if source_channel != "dashboard":
return
with Session(engine) as session:
bot = session.get(BotInstance, bot_id)
if not bot:
return
if packet_type == "AGENT_STATE":
payload = packet.get("payload") or {}
state = str(payload.get("state") or "").strip()
action = str(payload.get("action_msg") or payload.get("msg") or "").strip()
if state:
bot.current_state = state
if action:
bot.last_action = action[:4000]
elif packet_type == "ASSISTANT_MESSAGE":
bot.current_state = "IDLE"
text_msg = str(packet.get("text") or "").strip()
media_list = _normalize_media_list(packet.get("media"), bot_id)
if text_msg or media_list:
if text_msg:
bot.last_action = " ".join(text_msg.split())[:4000]
session.add(
BotMessage(
bot_id=bot_id,
role="assistant",
text=text_msg,
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
)
)
elif packet_type == "USER_COMMAND":
text_msg = str(packet.get("text") or "").strip()
media_list = _normalize_media_list(packet.get("media"), bot_id)
if text_msg or media_list:
session.add(
BotMessage(
bot_id=bot_id,
role="user",
text=text_msg,
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
)
)
elif packet_type == "BUS_EVENT":
# Dashboard channel emits BUS_EVENT for both progress and final replies.
# Persist only non-progress events to keep durable chat history clean.
is_progress = bool(packet.get("is_progress"))
if not is_progress:
text_msg = str(packet.get("content") or packet.get("text") or "").strip()
media_list = _normalize_media_list(packet.get("media"), bot_id)
if text_msg or media_list:
bot.current_state = "IDLE"
if text_msg:
bot.last_action = " ".join(text_msg.split())[:4000]
session.add(
BotMessage(
bot_id=bot_id,
role="assistant",
text=text_msg,
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
)
)
bot.updated_at = datetime.utcnow()
session.add(bot)
session.commit()
class WSConnectionManager:
def __init__(self):
self.connections: Dict[str, List[WebSocket]] = {}
async def connect(self, bot_id: str, websocket: WebSocket):
await websocket.accept()
self.connections.setdefault(bot_id, []).append(websocket)
def disconnect(self, bot_id: str, websocket: WebSocket):
conns = self.connections.get(bot_id, [])
if websocket in conns:
conns.remove(websocket)
if not conns and bot_id in self.connections:
del self.connections[bot_id]
async def broadcast(self, bot_id: str, data: Dict[str, Any]):
conns = list(self.connections.get(bot_id, []))
for ws in conns:
try:
await ws.send_json(data)
except Exception:
self.disconnect(bot_id, ws)
manager = WSConnectionManager()
def docker_callback(bot_id: str, packet: Dict[str, Any]):
_persist_runtime_packet(bot_id, packet)
loop = getattr(app.state, "main_loop", None)
if not loop or not loop.is_running():
return
asyncio.run_coroutine_threadsafe(manager.broadcast(bot_id, packet), loop)
@app.on_event("startup")
async def on_startup():
app.state.main_loop = asyncio.get_running_loop()
print(f"📁 项目根目录: {PROJECT_ROOT}")
print(f"🗄️ 数据库引擎: {DATABASE_ENGINE} (echo={DATABASE_ECHO})")
print(f"📁 数据库连接: {DATABASE_URL_DISPLAY}")
init_database()
with Session(engine) as session:
for bot in session.exec(select(BotInstance)).all():
_migrate_bot_resources_store(bot.id)
running_bots = session.exec(select(BotInstance).where(BotInstance.docker_status == "RUNNING")).all()
for bot in running_bots:
docker_manager.ensure_monitor(bot.id, docker_callback)
def _provider_defaults(provider: str) -> tuple[str, str]:
p = provider.lower().strip()
if p in {"openrouter"}:
return "openrouter", "https://openrouter.ai/api/v1"
if p in {"dashscope", "aliyun", "qwen", "aliyun-qwen"}:
return "dashscope", "https://dashscope.aliyuncs.com/compatible-mode/v1"
if p in {"kimi", "moonshot"}:
return "kimi", "https://api.moonshot.cn/v1"
if p in {"minimax"}:
return "minimax", "https://api.minimax.chat/v1"
return p, ""
@app.get("/api/system/defaults")
def get_system_defaults():
return {
"templates": {
"soul_md": DEFAULT_SOUL_MD,
"agents_md": DEFAULT_AGENTS_MD,
"user_md": DEFAULT_USER_MD,
"tools_md": DEFAULT_TOOLS_MD,
"identity_md": DEFAULT_IDENTITY_MD,
},
"limits": {
"upload_max_mb": UPLOAD_MAX_MB,
},
}
@app.get("/api/health")
def get_health():
try:
with Session(engine) as session:
session.exec(select(BotInstance).limit(1)).first()
return {"status": "ok", "database": DATABASE_ENGINE}
except Exception as e:
raise HTTPException(status_code=503, detail=f"database check failed: {e}")
def _config_json_path(bot_id: str) -> str:
return os.path.join(_bot_data_root(bot_id), "config.json")
def _read_bot_config(bot_id: str) -> Dict[str, Any]:
path = _config_json_path(bot_id)
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _write_bot_config(bot_id: str, config_data: Dict[str, Any]) -> None:
path = _config_json_path(bot_id)
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def _resources_json_path(bot_id: str) -> str:
return os.path.join(_bot_data_root(bot_id), "resources.json")
def _write_bot_resources(bot_id: str, cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> None:
normalized = _normalize_resource_limits(cpu_cores, memory_mb, storage_gb)
payload = {
"cpuCores": normalized["cpu_cores"],
"memoryMB": normalized["memory_mb"],
"storageGB": normalized["storage_gb"],
}
path = _resources_json_path(bot_id)
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def _read_bot_resources(bot_id: str, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
cpu_raw: Any = None
memory_raw: Any = None
storage_raw: Any = None
path = _resources_json_path(bot_id)
if os.path.isfile(path):
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
cpu_raw = data.get("cpuCores", data.get("cpu_cores"))
memory_raw = data.get("memoryMB", data.get("memory_mb"))
storage_raw = data.get("storageGB", data.get("storage_gb"))
except Exception:
pass
# Backward compatibility: read old runtime.resources only if new file is missing/incomplete.
if cpu_raw is None or memory_raw is None or storage_raw is None:
cfg = config_data if isinstance(config_data, dict) else _read_bot_config(bot_id)
runtime_cfg = cfg.get("runtime")
if isinstance(runtime_cfg, dict):
resources_raw = runtime_cfg.get("resources")
if isinstance(resources_raw, dict):
if cpu_raw is None:
cpu_raw = resources_raw.get("cpuCores", resources_raw.get("cpu_cores"))
if memory_raw is None:
memory_raw = resources_raw.get("memoryMB", resources_raw.get("memory_mb"))
if storage_raw is None:
storage_raw = resources_raw.get("storageGB", resources_raw.get("storage_gb"))
return _normalize_resource_limits(cpu_raw, memory_raw, storage_raw)
def _migrate_bot_resources_store(bot_id: str) -> None:
config_data = _read_bot_config(bot_id)
runtime_cfg = config_data.get("runtime")
resources_raw: Dict[str, Any] = {}
if isinstance(runtime_cfg, dict):
legacy_raw = runtime_cfg.get("resources")
if isinstance(legacy_raw, dict):
resources_raw = legacy_raw
path = _resources_json_path(bot_id)
if not os.path.isfile(path):
_write_bot_resources(
bot_id,
resources_raw.get("cpuCores", resources_raw.get("cpu_cores")),
resources_raw.get("memoryMB", resources_raw.get("memory_mb")),
resources_raw.get("storageGB", resources_raw.get("storage_gb")),
)
if isinstance(runtime_cfg, dict) and "resources" in runtime_cfg:
runtime_cfg.pop("resources", None)
if not runtime_cfg:
config_data.pop("runtime", None)
_write_bot_config(bot_id, config_data)
def _normalize_channel_extra(raw: Any) -> Dict[str, Any]:
if not isinstance(raw, dict):
return {}
return raw
def _read_global_delivery_flags(channels_cfg: Any) -> tuple[bool, bool]:
if not isinstance(channels_cfg, dict):
return False, False
send_progress = channels_cfg.get("sendProgress")
send_tool_hints = channels_cfg.get("sendToolHints")
dashboard_cfg = channels_cfg.get("dashboard")
if isinstance(dashboard_cfg, dict):
if send_progress is None and "sendProgress" in dashboard_cfg:
send_progress = dashboard_cfg.get("sendProgress")
if send_tool_hints is None and "sendToolHints" in dashboard_cfg:
send_tool_hints = dashboard_cfg.get("sendToolHints")
return bool(send_progress), bool(send_tool_hints)
def _channel_cfg_to_api_dict(bot_id: str, ctype: str, cfg: Dict[str, Any]) -> Dict[str, Any]:
ctype = str(ctype or "").strip().lower()
enabled = bool(cfg.get("enabled", True))
port = max(1, min(int(cfg.get("port", 8080) or 8080), 65535))
extra: Dict[str, Any] = {}
external_app_id = ""
app_secret = ""
if ctype == "feishu":
external_app_id = str(cfg.get("appId") or "")
app_secret = str(cfg.get("appSecret") or "")
extra = {
"encryptKey": cfg.get("encryptKey", ""),
"verificationToken": cfg.get("verificationToken", ""),
"allowFrom": cfg.get("allowFrom", []),
}
elif ctype == "dingtalk":
external_app_id = str(cfg.get("clientId") or "")
app_secret = str(cfg.get("clientSecret") or "")
extra = {"allowFrom": cfg.get("allowFrom", [])}
elif ctype == "telegram":
app_secret = str(cfg.get("token") or "")
extra = {
"proxy": cfg.get("proxy", ""),
"replyToMessage": bool(cfg.get("replyToMessage", False)),
"allowFrom": cfg.get("allowFrom", []),
}
elif ctype == "slack":
external_app_id = str(cfg.get("botToken") or "")
app_secret = str(cfg.get("appToken") or "")
extra = {
"mode": cfg.get("mode", "socket"),
"replyInThread": bool(cfg.get("replyInThread", True)),
"groupPolicy": cfg.get("groupPolicy", "mention"),
"groupAllowFrom": cfg.get("groupAllowFrom", []),
"reactEmoji": cfg.get("reactEmoji", "eyes"),
}
elif ctype == "qq":
external_app_id = str(cfg.get("appId") or "")
app_secret = str(cfg.get("secret") or "")
extra = {"allowFrom": cfg.get("allowFrom", [])}
else:
external_app_id = str(
cfg.get("appId") or cfg.get("clientId") or cfg.get("botToken") or cfg.get("externalAppId") or ""
)
app_secret = str(
cfg.get("appSecret") or cfg.get("clientSecret") or cfg.get("secret") or cfg.get("token") or cfg.get("appToken") or ""
)
extra = {k: v for k, v in cfg.items() if k not in {"enabled", "port", "appId", "clientId", "botToken", "externalAppId", "appSecret", "clientSecret", "secret", "token", "appToken"}}
return {
"id": ctype,
"bot_id": bot_id,
"channel_type": ctype,
"external_app_id": external_app_id,
"app_secret": app_secret,
"internal_port": port,
"is_active": enabled,
"extra_config": extra,
"locked": ctype == "dashboard",
}
def _channel_api_to_cfg(row: Dict[str, Any]) -> Dict[str, Any]:
ctype = str(row.get("channel_type") or "").strip().lower()
enabled = bool(row.get("is_active", True))
extra = _normalize_channel_extra(row.get("extra_config"))
external_app_id = str(row.get("external_app_id") or "")
app_secret = str(row.get("app_secret") or "")
port = max(1, min(int(row.get("internal_port") or 8080), 65535))
if ctype == "feishu":
return {
"enabled": enabled,
"appId": external_app_id,
"appSecret": app_secret,
"encryptKey": extra.get("encryptKey", ""),
"verificationToken": extra.get("verificationToken", ""),
"allowFrom": extra.get("allowFrom", []),
}
if ctype == "dingtalk":
return {
"enabled": enabled,
"clientId": external_app_id,
"clientSecret": app_secret,
"allowFrom": extra.get("allowFrom", []),
}
if ctype == "telegram":
return {
"enabled": enabled,
"token": app_secret,
"proxy": extra.get("proxy", ""),
"replyToMessage": bool(extra.get("replyToMessage", False)),
"allowFrom": extra.get("allowFrom", []),
}
if ctype == "slack":
return {
"enabled": enabled,
"mode": extra.get("mode", "socket"),
"botToken": external_app_id,
"appToken": app_secret,
"replyInThread": bool(extra.get("replyInThread", True)),
"groupPolicy": extra.get("groupPolicy", "mention"),
"groupAllowFrom": extra.get("groupAllowFrom", []),
"reactEmoji": extra.get("reactEmoji", "eyes"),
}
if ctype == "qq":
return {
"enabled": enabled,
"appId": external_app_id,
"secret": app_secret,
"allowFrom": extra.get("allowFrom", []),
}
merged = dict(extra)
merged.update(
{
"enabled": enabled,
"appId": external_app_id,
"appSecret": app_secret,
"port": port,
}
)
return merged
def _get_bot_channels_from_config(bot: BotInstance) -> List[Dict[str, Any]]:
config_data = _read_bot_config(bot.id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
send_progress, send_tool_hints = _read_global_delivery_flags(channels_cfg)
rows: List[Dict[str, Any]] = [
{
"id": "dashboard",
"bot_id": bot.id,
"channel_type": "dashboard",
"external_app_id": f"dashboard-{bot.id}",
"app_secret": "",
"internal_port": 9000,
"is_active": True,
"extra_config": {
"sendProgress": send_progress,
"sendToolHints": send_tool_hints,
},
"locked": True,
}
]
for ctype, cfg in channels_cfg.items():
if ctype in {"sendProgress", "sendToolHints", "dashboard"}:
continue
if not isinstance(cfg, dict):
continue
rows.append(_channel_cfg_to_api_dict(bot.id, ctype, cfg))
return rows
def _normalize_initial_channels(bot_id: str, channels: Optional[List[ChannelConfigRequest]]) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
seen_types: set[str] = set()
for c in channels or []:
ctype = (c.channel_type or "").strip().lower()
if not ctype or ctype == "dashboard" or ctype in seen_types:
continue
seen_types.add(ctype)
rows.append(
{
"id": ctype,
"bot_id": bot_id,
"channel_type": ctype,
"external_app_id": (c.external_app_id or "").strip() or f"{ctype}-{bot_id}",
"app_secret": (c.app_secret or "").strip(),
"internal_port": max(1, min(int(c.internal_port or 8080), 65535)),
"is_active": bool(c.is_active),
"extra_config": _normalize_channel_extra(c.extra_config),
"locked": False,
}
)
return rows
def _parse_message_media(bot_id: str, media_raw: Optional[str]) -> List[str]:
if not media_raw:
return []
try:
parsed = json.loads(media_raw)
return _normalize_media_list(parsed, bot_id)
except Exception:
return []
def _default_tools_config() -> Dict[str, Any]:
return {
"web": {
"search": {
"apiKey": "",
"maxResults": 5,
}
}
}
def _normalize_tools_config(raw: Any) -> Dict[str, Any]:
cfg = _default_tools_config()
if not isinstance(raw, dict):
return cfg
web_raw = raw.get("web")
if isinstance(web_raw, dict):
search_raw = web_raw.get("search")
if isinstance(search_raw, dict):
api_key = str(search_raw.get("apiKey") or search_raw.get("api_key") or "").strip()
max_results_raw = search_raw.get("maxResults", search_raw.get("max_results", 5))
try:
max_results = int(max_results_raw)
except Exception:
max_results = 5
max_results = max(1, min(max_results, 10))
cfg["web"]["search"]["apiKey"] = api_key
cfg["web"]["search"]["maxResults"] = max_results
return cfg
def _parse_tools_config(raw: Optional[str]) -> Dict[str, Any]:
if not raw:
return _default_tools_config()
try:
parsed = json.loads(raw)
except Exception:
return _default_tools_config()
return _normalize_tools_config(parsed)
_ENV_KEY_RE = re.compile(r"^[A-Z_][A-Z0-9_]{0,127}$")
def _normalize_env_params(raw: Any) -> Dict[str, str]:
if not isinstance(raw, dict):
return {}
rows: Dict[str, str] = {}
for k, v in raw.items():
key = str(k or "").strip().upper()
if not key or not _ENV_KEY_RE.fullmatch(key):
continue
rows[key] = str(v or "").strip()
return rows
def _parse_env_params(raw: Any) -> Dict[str, str]:
return _normalize_env_params(raw)
def _safe_float(raw: Any, default: float) -> float:
try:
return float(raw)
except Exception:
return default
def _safe_int(raw: Any, default: int) -> int:
try:
return int(raw)
except Exception:
return default
def _normalize_resource_limits(cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> Dict[str, Any]:
cpu = _safe_float(cpu_cores, 1.0)
mem = _safe_int(memory_mb, 1024)
storage = _safe_int(storage_gb, 10)
if cpu < 0:
cpu = 1.0
if mem < 0:
mem = 1024
if storage < 0:
storage = 10
normalized_cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu))
normalized_mem = 0 if mem == 0 else min(65536, max(256, mem))
normalized_storage = 0 if storage == 0 else min(1024, max(1, storage))
return {
"cpu_cores": normalized_cpu,
"memory_mb": normalized_mem,
"storage_gb": normalized_storage,
}
def _read_workspace_md(bot_id: str, filename: str, default_value: str) -> str:
path = os.path.join(_workspace_root(bot_id), filename)
if not os.path.isfile(path):
return default_value
try:
with open(path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
return default_value
def _read_bot_runtime_snapshot(bot: BotInstance) -> Dict[str, Any]:
config_data = _read_bot_config(bot.id)
provider_name = ""
provider_cfg: Dict[str, Any] = {}
providers_cfg = config_data.get("providers")
if isinstance(providers_cfg, dict):
for p_name, p_cfg in providers_cfg.items():
provider_name = str(p_name or "").strip()
if isinstance(p_cfg, dict):
provider_cfg = p_cfg
break
agents_defaults: Dict[str, Any] = {}
agents_cfg = config_data.get("agents")
if isinstance(agents_cfg, dict):
defaults = agents_cfg.get("defaults")
if isinstance(defaults, dict):
agents_defaults = defaults
channels_cfg = config_data.get("channels")
send_progress, send_tool_hints = _read_global_delivery_flags(channels_cfg)
tools_cfg = _normalize_tools_config(config_data.get("tools"))
llm_provider = provider_name or "dashscope"
llm_model = str(agents_defaults.get("model") or "")
api_key = str(provider_cfg.get("apiKey") or "").strip()
api_base = str(provider_cfg.get("apiBase") or "").strip()
soul_md = _read_workspace_md(bot.id, "SOUL.md", DEFAULT_SOUL_MD)
resources = _read_bot_resources(bot.id, config_data=config_data)
return {
"llm_provider": llm_provider,
"llm_model": llm_model,
"api_key": api_key,
"api_base": api_base,
"temperature": _safe_float(agents_defaults.get("temperature"), 0.2),
"top_p": _safe_float(agents_defaults.get("topP"), 1.0),
"max_tokens": _safe_int(agents_defaults.get("maxTokens"), 8192),
"cpu_cores": resources["cpu_cores"],
"memory_mb": resources["memory_mb"],
"storage_gb": resources["storage_gb"],
"send_progress": send_progress,
"send_tool_hints": send_tool_hints,
"soul_md": soul_md,
"agents_md": _read_workspace_md(bot.id, "AGENTS.md", DEFAULT_AGENTS_MD),
"user_md": _read_workspace_md(bot.id, "USER.md", DEFAULT_USER_MD),
"tools_md": _read_workspace_md(bot.id, "TOOLS.md", DEFAULT_TOOLS_MD),
"identity_md": _read_workspace_md(bot.id, "IDENTITY.md", DEFAULT_IDENTITY_MD),
"system_prompt": soul_md,
"tools_config_json": json.dumps(tools_cfg, ensure_ascii=False),
"tools_config": tools_cfg,
}
def _serialize_bot(bot: BotInstance) -> Dict[str, Any]:
runtime = _read_bot_runtime_snapshot(bot)
return {
"id": bot.id,
"name": bot.name,
"avatar_model": "base",
"avatar_skin": "blue_suit",
"image_tag": bot.image_tag,
"llm_provider": runtime.get("llm_provider") or "",
"llm_model": runtime.get("llm_model") or "",
"system_prompt": runtime.get("system_prompt") or "",
"api_base": runtime.get("api_base") or "",
"temperature": _safe_float(runtime.get("temperature"), 0.2),
"top_p": _safe_float(runtime.get("top_p"), 1.0),
"max_tokens": _safe_int(runtime.get("max_tokens"), 8192),
"cpu_cores": _safe_float(runtime.get("cpu_cores"), 1.0),
"memory_mb": _safe_int(runtime.get("memory_mb"), 1024),
"storage_gb": _safe_int(runtime.get("storage_gb"), 10),
"send_progress": bool(runtime.get("send_progress")),
"send_tool_hints": bool(runtime.get("send_tool_hints")),
"soul_md": runtime.get("soul_md") or "",
"agents_md": runtime.get("agents_md") or "",
"user_md": runtime.get("user_md") or "",
"tools_md": runtime.get("tools_md") or "",
"identity_md": runtime.get("identity_md") or "",
"workspace_dir": bot.workspace_dir,
"docker_status": bot.docker_status,
"current_state": bot.current_state,
"last_action": bot.last_action,
"created_at": bot.created_at,
"updated_at": bot.updated_at,
}
def _sync_workspace_channels(
session: Session,
bot_id: str,
channels_override: Optional[List[Dict[str, Any]]] = None,
global_delivery_override: Optional[Dict[str, Any]] = None,
runtime_overrides: Optional[Dict[str, Any]] = None,
) -> None:
bot = session.get(BotInstance, bot_id)
if not bot:
return
snapshot = _read_bot_runtime_snapshot(bot)
bot_data: Dict[str, Any] = {
"name": bot.name,
"system_prompt": snapshot.get("system_prompt") or DEFAULT_SOUL_MD,
"soul_md": snapshot.get("soul_md") or DEFAULT_SOUL_MD,
"agents_md": snapshot.get("agents_md") or DEFAULT_AGENTS_MD,
"user_md": snapshot.get("user_md") or DEFAULT_USER_MD,
"tools_md": snapshot.get("tools_md") or DEFAULT_TOOLS_MD,
"identity_md": snapshot.get("identity_md") or DEFAULT_IDENTITY_MD,
"llm_provider": snapshot.get("llm_provider") or "dashscope",
"llm_model": snapshot.get("llm_model") or "",
"api_key": snapshot.get("api_key") or "",
"api_base": snapshot.get("api_base") or "",
"temperature": _safe_float(snapshot.get("temperature"), 0.2),
"top_p": _safe_float(snapshot.get("top_p"), 1.0),
"max_tokens": _safe_int(snapshot.get("max_tokens"), 8192),
"cpu_cores": _safe_float(snapshot.get("cpu_cores"), 1.0),
"memory_mb": _safe_int(snapshot.get("memory_mb"), 1024),
"storage_gb": _safe_int(snapshot.get("storage_gb"), 10),
"send_progress": bool(snapshot.get("send_progress")),
"send_tool_hints": bool(snapshot.get("send_tool_hints")),
"tools_config_json": json.dumps(_normalize_tools_config(snapshot.get("tools_config")), ensure_ascii=False),
}
if isinstance(runtime_overrides, dict):
for key, value in runtime_overrides.items():
bot_data[key] = value
resources = _normalize_resource_limits(
bot_data.get("cpu_cores"),
bot_data.get("memory_mb"),
bot_data.get("storage_gb"),
)
bot_data["cpu_cores"] = resources["cpu_cores"]
bot_data["memory_mb"] = resources["memory_mb"]
bot_data["storage_gb"] = resources["storage_gb"]
if "tools_config" in bot_data:
bot_data["tools_config_json"] = json.dumps(_normalize_tools_config(bot_data.get("tools_config")), ensure_ascii=False)
send_progress = bool(bot_data.get("send_progress", False))
send_tool_hints = bool(bot_data.get("send_tool_hints", False))
if isinstance(global_delivery_override, dict):
if "sendProgress" in global_delivery_override:
send_progress = bool(global_delivery_override.get("sendProgress"))
if "sendToolHints" in global_delivery_override:
send_tool_hints = bool(global_delivery_override.get("sendToolHints"))
channels_data = channels_override if channels_override is not None else _get_bot_channels_from_config(bot)
bot_data["send_progress"] = send_progress
bot_data["send_tool_hints"] = send_tool_hints
normalized_channels: List[Dict[str, Any]] = []
for row in channels_data:
ctype = str(row.get("channel_type") or "").strip().lower()
if not ctype or ctype == "dashboard":
continue
normalized_channels.append(
{
"channel_type": ctype,
"external_app_id": str(row.get("external_app_id") or ""),
"app_secret": str(row.get("app_secret") or ""),
"internal_port": max(1, min(int(row.get("internal_port") or 8080), 65535)),
"is_active": bool(row.get("is_active", True)),
"extra_config": _normalize_channel_extra(row.get("extra_config")),
}
)
config_manager.update_workspace(
bot_id=bot_id,
bot_data=bot_data,
channels=normalized_channels,
)
_write_bot_resources(
bot_id,
bot_data.get("cpu_cores"),
bot_data.get("memory_mb"),
bot_data.get("storage_gb"),
)
def reconcile_image_registry(session: Session):
"""Only reconcile status for images explicitly registered in DB."""
db_images = session.exec(select(NanobotImage)).all()
for img in db_images:
if docker_manager.has_image(img.tag):
try:
docker_img = docker_manager.client.images.get(img.tag) if docker_manager.client else None
img.image_id = docker_img.id if docker_img else img.image_id
except Exception:
pass
img.status = "READY"
else:
img.status = "UNKNOWN"
session.add(img)
session.commit()
def _workspace_root(bot_id: str) -> str:
return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot", "workspace"))
def _bot_data_root(bot_id: str) -> str:
return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot"))
def _skills_root(bot_id: str) -> str:
return os.path.join(_workspace_root(bot_id), "skills")
def _is_valid_top_level_skill_name(name: str) -> bool:
text = str(name or "").strip()
if not text:
return False
if "/" in text or "\\" in text:
return False
if text in {".", ".."}:
return False
return True
def _read_skill_description(entry_path: str) -> str:
candidates: List[str] = []
if os.path.isdir(entry_path):
candidates = [
os.path.join(entry_path, "SKILL.md"),
os.path.join(entry_path, "skill.md"),
os.path.join(entry_path, "README.md"),
os.path.join(entry_path, "readme.md"),
]
elif entry_path.lower().endswith(".md"):
candidates = [entry_path]
for candidate in candidates:
if not os.path.isfile(candidate):
continue
try:
with open(candidate, "r", encoding="utf-8") as f:
for line in f:
text = line.strip()
if text and not text.startswith("#"):
return text[:240]
except Exception:
continue
return ""
def _list_workspace_skills(bot_id: str) -> List[Dict[str, Any]]:
root = _skills_root(bot_id)
os.makedirs(root, exist_ok=True)
rows: List[Dict[str, Any]] = []
names = sorted(os.listdir(root), key=lambda n: (not os.path.isdir(os.path.join(root, n)), n.lower()))
for name in names:
if not name or name.startswith("."):
continue
if not _is_valid_top_level_skill_name(name):
continue
abs_path = os.path.join(root, name)
if not os.path.exists(abs_path):
continue
stat = os.stat(abs_path)
rows.append(
{
"id": name,
"name": name,
"type": "dir" if os.path.isdir(abs_path) else "file",
"path": f"skills/{name}",
"size": stat.st_size if os.path.isfile(abs_path) else None,
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
"description": _read_skill_description(abs_path),
}
)
return rows
def _cron_store_path(bot_id: str) -> str:
return os.path.join(_bot_data_root(bot_id), "cron", "jobs.json")
def _env_store_path(bot_id: str) -> str:
return os.path.join(_bot_data_root(bot_id), "env.json")
def _read_env_store(bot_id: str) -> Dict[str, str]:
path = _env_store_path(bot_id)
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return _normalize_env_params(data)
except Exception:
return {}
def _write_env_store(bot_id: str, env_params: Dict[str, str]) -> None:
path = _env_store_path(bot_id)
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(_normalize_env_params(env_params), f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def _read_cron_store(bot_id: str) -> Dict[str, Any]:
path = _cron_store_path(bot_id)
if not os.path.isfile(path):
return {"version": 1, "jobs": []}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return {"version": 1, "jobs": []}
jobs = data.get("jobs")
if not isinstance(jobs, list):
data["jobs"] = []
if "version" not in data:
data["version"] = 1
return data
except Exception:
return {"version": 1, "jobs": []}
def _write_cron_store(bot_id: str, store: Dict[str, Any]) -> None:
path = _cron_store_path(bot_id)
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(store, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def _resolve_workspace_path(bot_id: str, rel_path: Optional[str] = None) -> tuple[str, str]:
root = _workspace_root(bot_id)
rel = (rel_path or "").strip().replace("\\", "/")
target = os.path.abspath(os.path.join(root, rel))
if os.path.commonpath([root, target]) != root:
raise HTTPException(status_code=400, detail="invalid workspace path")
return root, target
def _calc_dir_size_bytes(path: str) -> int:
total = 0
if not os.path.exists(path):
return 0
for root, _, files in os.walk(path):
for filename in files:
try:
file_path = os.path.join(root, filename)
if os.path.islink(file_path):
continue
total += os.path.getsize(file_path)
except Exception:
continue
return max(0, total)
def _is_image_attachment_path(path: str) -> bool:
lower = str(path or "").strip().lower()
return lower.endswith(".png") or lower.endswith(".jpg") or lower.endswith(".jpeg") or lower.endswith(".webp")
def _is_video_attachment_path(path: str) -> bool:
lower = str(path or "").strip().lower()
return (
lower.endswith(".mp4")
or lower.endswith(".mov")
or lower.endswith(".m4v")
or lower.endswith(".webm")
or lower.endswith(".mkv")
or lower.endswith(".avi")
)
def _is_visual_attachment_path(path: str) -> bool:
return _is_image_attachment_path(path) or _is_video_attachment_path(path)
def _build_workspace_tree(path: str, root: str, depth: int) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
try:
names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower()))
except FileNotFoundError:
return rows
for name in names:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(path, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
base: Dict[str, Any] = {
"name": name,
"path": rel_path,
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
if os.path.isdir(abs_path):
node = {**base, "type": "dir"}
if depth > 0:
node["children"] = _build_workspace_tree(abs_path, root, depth - 1)
rows.append(node)
continue
rows.append(
{
**base,
"type": "file",
"size": stat.st_size,
"ext": os.path.splitext(name)[1].lower(),
}
)
return rows
def _list_workspace_dir(path: str, root: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower()))
for name in names:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(path, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
rows.append(
{
"name": name,
"path": rel_path,
"type": "dir" if os.path.isdir(abs_path) else "file",
"size": stat.st_size if os.path.isfile(abs_path) else None,
"ext": os.path.splitext(name)[1].lower() if os.path.isfile(abs_path) else "",
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
)
return rows
@app.get("/api/images", response_model=List[NanobotImage])
def list_images(session: Session = Depends(get_session)):
reconcile_image_registry(session)
return session.exec(select(NanobotImage)).all()
@app.delete("/api/images/{tag:path}")
def delete_image(tag: str, session: Session = Depends(get_session)):
image = session.get(NanobotImage, tag)
if not image:
raise HTTPException(status_code=404, detail="Image not found")
# 检查是否有机器人正在使用此镜像
bots_using = session.exec(select(BotInstance).where(BotInstance.image_tag == tag)).all()
if bots_using:
raise HTTPException(status_code=400, detail=f"Cannot delete image: {len(bots_using)} bots are using it.")
session.delete(image)
session.commit()
return {"status": "deleted"}
@app.get("/api/docker-images")
def list_docker_images(repository: str = "nanobot-base"):
rows = docker_manager.list_images_by_repo(repository)
return rows
@app.post("/api/images/register")
def register_image(payload: dict, session: Session = Depends(get_session)):
tag = (payload.get("tag") or "").strip()
source_dir = (payload.get("source_dir") or "manual").strip() or "manual"
if not tag:
raise HTTPException(status_code=400, detail="tag is required")
if not docker_manager.has_image(tag):
raise HTTPException(status_code=404, detail=f"Docker image not found: {tag}")
version = tag.split(":")[-1].removeprefix("v") if ":" in tag else tag
try:
docker_img = docker_manager.client.images.get(tag) if docker_manager.client else None
image_id = docker_img.id if docker_img else None
except Exception:
image_id = None
row = session.get(NanobotImage, tag)
if not row:
row = NanobotImage(
tag=tag,
version=version,
status="READY",
source_dir=source_dir,
image_id=image_id,
)
else:
row.version = version
row.status = "READY"
row.source_dir = source_dir
row.image_id = image_id
session.add(row)
session.commit()
session.refresh(row)
return row
@app.post("/api/providers/test")
async def test_provider(payload: dict):
provider = (payload.get("provider") or "").strip()
api_key = (payload.get("api_key") or "").strip()
model = (payload.get("model") or "").strip()
api_base = (payload.get("api_base") or "").strip()
if not provider or not api_key:
raise HTTPException(status_code=400, detail="provider and api_key are required")
normalized_provider, default_base = _provider_defaults(provider)
base = (api_base or default_base).rstrip("/")
if normalized_provider not in {"openrouter", "dashscope", "kimi", "minimax"}:
raise HTTPException(status_code=400, detail=f"provider not supported for test: {provider}")
if not base:
raise HTTPException(status_code=400, detail=f"api_base is required for provider: {provider}")
headers = {"Authorization": f"Bearer {api_key}"}
timeout = httpx.Timeout(20.0, connect=10.0)
url = f"{base}/models"
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url, headers=headers)
if resp.status_code >= 400:
return {
"ok": False,
"provider": normalized_provider,
"status_code": resp.status_code,
"detail": resp.text[:500],
}
data = resp.json()
models_raw = data.get("data", []) if isinstance(data, dict) else []
model_ids: List[str] = []
for item in models_raw[:20]:
if isinstance(item, dict) and item.get("id"):
model_ids.append(str(item["id"]))
model_hint = ""
if model:
model_hint = "model_found" if any(model in m for m in model_ids) else "model_not_listed"
return {
"ok": True,
"provider": normalized_provider,
"endpoint": url,
"models_preview": model_ids[:8],
"model_hint": model_hint,
}
except Exception as e:
return {
"ok": False,
"provider": normalized_provider,
"endpoint": url,
"detail": str(e),
}
@app.post("/api/bots")
def create_bot(payload: BotCreateRequest, session: Session = Depends(get_session)):
image_row = session.get(NanobotImage, payload.image_tag)
if not image_row:
raise HTTPException(status_code=400, detail=f"Image not registered in DB: {payload.image_tag}")
if image_row.status != "READY":
raise HTTPException(status_code=400, detail=f"Image status is not READY: {payload.image_tag} ({image_row.status})")
if not docker_manager.has_image(payload.image_tag):
raise HTTPException(status_code=400, detail=f"Docker image not found locally: {payload.image_tag}")
bot = BotInstance(
id=payload.id,
name=payload.name,
image_tag=payload.image_tag,
workspace_dir=os.path.join(BOTS_WORKSPACE_ROOT, payload.id),
)
session.add(bot)
session.commit()
session.refresh(bot)
resource_limits = _normalize_resource_limits(payload.cpu_cores, payload.memory_mb, payload.storage_gb)
_write_env_store(payload.id, _normalize_env_params(payload.env_params))
_sync_workspace_channels(
session,
payload.id,
channels_override=_normalize_initial_channels(payload.id, payload.channels),
global_delivery_override={
"sendProgress": bool(payload.send_progress) if payload.send_progress is not None else False,
"sendToolHints": bool(payload.send_tool_hints) if payload.send_tool_hints is not None else False,
},
runtime_overrides={
"llm_provider": payload.llm_provider,
"llm_model": payload.llm_model,
"api_key": payload.api_key,
"api_base": payload.api_base or "",
"temperature": payload.temperature,
"top_p": payload.top_p,
"max_tokens": payload.max_tokens,
"cpu_cores": resource_limits["cpu_cores"],
"memory_mb": resource_limits["memory_mb"],
"storage_gb": resource_limits["storage_gb"],
"system_prompt": payload.system_prompt or payload.soul_md or DEFAULT_SOUL_MD,
"soul_md": payload.soul_md or payload.system_prompt or DEFAULT_SOUL_MD,
"agents_md": payload.agents_md or DEFAULT_AGENTS_MD,
"user_md": payload.user_md or DEFAULT_USER_MD,
"tools_md": payload.tools_md or DEFAULT_TOOLS_MD,
"identity_md": payload.identity_md or DEFAULT_IDENTITY_MD,
"tools_config_json": json.dumps(_normalize_tools_config(payload.tools_config), ensure_ascii=False),
"send_progress": bool(payload.send_progress) if payload.send_progress is not None else False,
"send_tool_hints": bool(payload.send_tool_hints) if payload.send_tool_hints is not None else False,
},
)
session.refresh(bot)
return _serialize_bot(bot)
@app.get("/api/bots")
def list_bots(session: Session = Depends(get_session)):
bots = session.exec(select(BotInstance)).all()
dirty = False
for bot in bots:
actual_status = docker_manager.get_bot_status(bot.id)
if bot.docker_status != actual_status:
bot.docker_status = actual_status
if actual_status != "RUNNING" and str(bot.current_state or "").upper() not in {"ERROR"}:
bot.current_state = "IDLE"
session.add(bot)
dirty = True
if dirty:
session.commit()
for bot in bots:
session.refresh(bot)
return [_serialize_bot(bot) for bot in bots]
@app.get("/api/bots/{bot_id}/resources")
def get_bot_resources(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
configured = _read_bot_resources(bot_id)
runtime = docker_manager.get_bot_resource_snapshot(bot_id)
workspace_root = _workspace_root(bot_id)
workspace_bytes = _calc_dir_size_bytes(workspace_root)
configured_storage_bytes = int(configured.get("storage_gb", 0) or 0) * 1024 * 1024 * 1024
workspace_percent = 0.0
if configured_storage_bytes > 0:
workspace_percent = (workspace_bytes / configured_storage_bytes) * 100.0
limits = runtime.get("limits") or {}
cpu_limited = (limits.get("cpu_cores") or 0) > 0
memory_limited = (limits.get("memory_bytes") or 0) > 0
storage_limited = bool(limits.get("storage_bytes")) or bool(limits.get("storage_opt_raw"))
return {
"bot_id": bot_id,
"docker_status": runtime.get("docker_status") or bot.docker_status,
"configured": configured,
"runtime": runtime,
"workspace": {
"path": workspace_root,
"usage_bytes": workspace_bytes,
"configured_limit_bytes": configured_storage_bytes if configured_storage_bytes > 0 else None,
"usage_percent": max(0.0, workspace_percent),
},
"enforcement": {
"cpu_limited": cpu_limited,
"memory_limited": memory_limited,
"storage_limited": storage_limited,
},
"note": (
"Resource value 0 means unlimited. CPU/Memory limits come from Docker HostConfig and are enforced by cgroup. "
"Storage limit depends on Docker storage driver support."
),
"collected_at": datetime.utcnow().isoformat() + "Z",
}
@app.put("/api/bots/{bot_id}")
def update_bot(bot_id: str, payload: BotUpdateRequest, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
update_data = payload.model_dump(exclude_unset=True)
if "image_tag" in update_data and update_data["image_tag"]:
image_tag = str(update_data["image_tag"]).strip()
image_row = session.get(NanobotImage, image_tag)
if not image_row:
raise HTTPException(status_code=400, detail=f"Image not registered in DB: {image_tag}")
if image_row.status != "READY":
raise HTTPException(status_code=400, detail=f"Image status is not READY: {image_tag} ({image_row.status})")
if not docker_manager.has_image(image_tag):
raise HTTPException(status_code=400, detail=f"Docker image not found locally: {image_tag}")
env_params = update_data.pop("env_params", None) if isinstance(update_data, dict) else None
runtime_overrides: Dict[str, Any] = {}
tools_config = update_data.pop("tools_config", None) if isinstance(update_data, dict) else None
if tools_config is not None:
runtime_overrides["tools_config_json"] = json.dumps(_normalize_tools_config(tools_config), ensure_ascii=False)
runtime_fields = {
"llm_provider",
"llm_model",
"api_key",
"api_base",
"temperature",
"top_p",
"max_tokens",
"cpu_cores",
"memory_mb",
"storage_gb",
"soul_md",
"agents_md",
"user_md",
"tools_md",
"identity_md",
"send_progress",
"send_tool_hints",
"system_prompt",
}
for field in runtime_fields:
if field in update_data:
runtime_overrides[field] = update_data.pop(field)
if "system_prompt" in runtime_overrides and "soul_md" not in runtime_overrides:
runtime_overrides["soul_md"] = runtime_overrides["system_prompt"]
if "soul_md" in runtime_overrides and "system_prompt" not in runtime_overrides:
runtime_overrides["system_prompt"] = runtime_overrides["soul_md"]
if {"cpu_cores", "memory_mb", "storage_gb"} & set(runtime_overrides.keys()):
normalized_resources = _normalize_resource_limits(
runtime_overrides.get("cpu_cores"),
runtime_overrides.get("memory_mb"),
runtime_overrides.get("storage_gb"),
)
runtime_overrides.update(normalized_resources)
db_fields = {"name", "image_tag"}
for key, value in update_data.items():
if key in db_fields:
setattr(bot, key, value)
session.add(bot)
session.commit()
session.refresh(bot)
if env_params is not None:
_write_env_store(bot_id, _normalize_env_params(env_params))
global_delivery_override: Optional[Dict[str, Any]] = None
if "send_progress" in runtime_overrides or "send_tool_hints" in runtime_overrides:
global_delivery_override = {}
if "send_progress" in runtime_overrides:
global_delivery_override["sendProgress"] = bool(runtime_overrides.get("send_progress"))
if "send_tool_hints" in runtime_overrides:
global_delivery_override["sendToolHints"] = bool(runtime_overrides.get("send_tool_hints"))
_sync_workspace_channels(
session,
bot_id,
runtime_overrides=runtime_overrides if runtime_overrides else None,
global_delivery_override=global_delivery_override,
)
session.refresh(bot)
return _serialize_bot(bot)
@app.post("/api/bots/{bot_id}/start")
async def start_bot(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
_sync_workspace_channels(session, bot_id)
runtime_snapshot = _read_bot_runtime_snapshot(bot)
env_params = _read_env_store(bot_id)
success = docker_manager.start_bot(
bot_id,
image_tag=bot.image_tag,
on_state_change=docker_callback,
env_vars=env_params,
cpu_cores=_safe_float(runtime_snapshot.get("cpu_cores"), 1.0),
memory_mb=_safe_int(runtime_snapshot.get("memory_mb"), 1024),
storage_gb=_safe_int(runtime_snapshot.get("storage_gb"), 10),
)
if not success:
bot.docker_status = "STOPPED"
session.add(bot)
session.commit()
raise HTTPException(status_code=500, detail=f"Failed to start container with image {bot.image_tag}")
actual_status = docker_manager.get_bot_status(bot_id)
bot.docker_status = actual_status
if actual_status != "RUNNING":
session.add(bot)
session.commit()
raise HTTPException(
status_code=500,
detail="Bot container failed shortly after startup. Check bot logs/config.",
)
session.add(bot)
session.commit()
return {"status": "started"}
@app.post("/api/bots/{bot_id}/stop")
def stop_bot(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
docker_manager.stop_bot(bot_id)
bot.docker_status = "STOPPED"
session.add(bot)
session.commit()
return {"status": "stopped"}
@app.post("/api/bots/{bot_id}/deactivate")
def deactivate_bot(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
docker_manager.stop_bot(bot_id)
bot.docker_status = "STOPPED"
session.add(bot)
session.commit()
return {"status": "deactivated"}
@app.delete("/api/bots/{bot_id}")
def delete_bot(bot_id: str, delete_workspace: bool = True, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
docker_manager.stop_bot(bot_id)
messages = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
for row in messages:
session.delete(row)
session.delete(bot)
session.commit()
if delete_workspace:
workspace_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id)
if os.path.isdir(workspace_root):
shutil.rmtree(workspace_root, ignore_errors=True)
return {"status": "deleted", "workspace_deleted": bool(delete_workspace)}
@app.get("/api/bots/{bot_id}/channels")
def list_bot_channels(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return _get_bot_channels_from_config(bot)
@app.get("/api/bots/{bot_id}/skills")
def list_bot_skills(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return _list_workspace_skills(bot_id)
@app.get("/api/bots/{bot_id}/tools-config")
def get_bot_tools_config(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return {
"bot_id": bot_id,
"tools_config": {},
"managed_by_dashboard": False,
"hint": "Tools config is disabled in dashboard. Configure tool-related env vars manually.",
}
@app.put("/api/bots/{bot_id}/tools-config")
def update_bot_tools_config(bot_id: str, payload: BotToolsConfigUpdateRequest, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
raise HTTPException(
status_code=400,
detail="Tools config is no longer managed by dashboard. Please set required env vars manually.",
)
@app.get("/api/bots/{bot_id}/env-params")
def get_bot_env_params(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return {
"bot_id": bot_id,
"env_params": _read_env_store(bot_id),
}
@app.put("/api/bots/{bot_id}/env-params")
def update_bot_env_params(bot_id: str, payload: BotEnvParamsUpdateRequest, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
normalized = _normalize_env_params(payload.env_params)
_write_env_store(bot_id, normalized)
return {
"status": "updated",
"bot_id": bot_id,
"env_params": normalized,
"restart_required": True,
}
@app.post("/api/bots/{bot_id}/skills/upload")
async def upload_bot_skill_zip(bot_id: str, file: UploadFile = File(...), session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
filename = str(file.filename or "").strip()
if not filename.lower().endswith(".zip"):
raise HTTPException(status_code=400, detail="Only .zip skill package is supported")
max_bytes = UPLOAD_MAX_MB * 1024 * 1024
tmp_zip_path: Optional[str] = None
total_size = 0
try:
with tempfile.NamedTemporaryFile(prefix=".skill_upload_", suffix=".zip", delete=False) as tmp_zip:
tmp_zip_path = tmp_zip.name
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
total_size += len(chunk)
if total_size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"Zip package too large (max {max_bytes // (1024 * 1024)}MB)",
)
tmp_zip.write(chunk)
except Exception:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
raise
finally:
await file.close()
if total_size == 0:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
raise HTTPException(status_code=400, detail="Zip package is empty")
try:
archive = zipfile.ZipFile(tmp_zip_path)
except Exception:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
raise HTTPException(status_code=400, detail="Invalid zip file")
skills_root = _skills_root(bot_id)
os.makedirs(skills_root, exist_ok=True)
installed: List[str] = []
try:
with archive:
members = archive.infolist()
file_members = [m for m in members if not m.is_dir()]
if not file_members:
raise HTTPException(status_code=400, detail="Zip package has no files")
top_names: List[str] = []
for member in file_members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
first = raw_name.split("/", 1)[0].strip()
if not _is_valid_top_level_skill_name(first):
raise HTTPException(status_code=400, detail=f"Invalid skill entry name in zip: {first}")
if first not in top_names:
top_names.append(first)
if not top_names:
raise HTTPException(status_code=400, detail="Zip package has no valid skill entries")
conflicts = [name for name in top_names if os.path.exists(os.path.join(skills_root, name))]
if conflicts:
raise HTTPException(status_code=400, detail=f"Skill already exists: {', '.join(conflicts)}")
with tempfile.TemporaryDirectory(prefix=".skill_upload_", dir=skills_root) as tmp_dir:
tmp_root = os.path.abspath(tmp_dir)
for member in members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
target = os.path.abspath(os.path.join(tmp_root, raw_name))
if os.path.commonpath([tmp_root, target]) != tmp_root:
raise HTTPException(status_code=400, detail=f"Unsafe zip entry path: {raw_name}")
if member.is_dir():
os.makedirs(target, exist_ok=True)
continue
os.makedirs(os.path.dirname(target), exist_ok=True)
with archive.open(member, "r") as source, open(target, "wb") as dest:
shutil.copyfileobj(source, dest)
for name in top_names:
src = os.path.join(tmp_root, name)
dst = os.path.join(skills_root, name)
if not os.path.exists(src):
continue
shutil.move(src, dst)
installed.append(name)
finally:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
if not installed:
raise HTTPException(status_code=400, detail="No skill entries installed from zip")
return {
"status": "installed",
"bot_id": bot_id,
"installed": installed,
"skills": _list_workspace_skills(bot_id),
}
@app.delete("/api/bots/{bot_id}/skills/{skill_name}")
def delete_bot_skill(bot_id: str, skill_name: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
name = str(skill_name or "").strip()
if not _is_valid_top_level_skill_name(name):
raise HTTPException(status_code=400, detail="Invalid skill name")
root = _skills_root(bot_id)
target = os.path.abspath(os.path.join(root, name))
if os.path.commonpath([os.path.abspath(root), target]) != os.path.abspath(root):
raise HTTPException(status_code=400, detail="Invalid skill path")
if not os.path.exists(target):
raise HTTPException(status_code=404, detail="Skill not found in workspace")
if os.path.isdir(target):
shutil.rmtree(target, ignore_errors=False)
else:
os.remove(target)
return {"status": "deleted", "bot_id": bot_id, "skill": name}
@app.post("/api/bots/{bot_id}/channels")
def create_bot_channel(bot_id: str, payload: ChannelConfigRequest, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
ctype = (payload.channel_type or "").strip().lower()
if not ctype:
raise HTTPException(status_code=400, detail="channel_type is required")
if ctype == "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually")
current_rows = _get_bot_channels_from_config(bot)
if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows):
raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}")
new_row = {
"id": ctype,
"bot_id": bot_id,
"channel_type": ctype,
"external_app_id": (payload.external_app_id or "").strip() or f"{ctype}-{bot_id}",
"app_secret": (payload.app_secret or "").strip(),
"internal_port": max(1, min(int(payload.internal_port or 8080), 65535)),
"is_active": bool(payload.is_active),
"extra_config": _normalize_channel_extra(payload.extra_config),
"locked": False,
}
config_data = _read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
channels_cfg[ctype] = _channel_api_to_cfg(new_row)
_write_bot_config(bot_id, config_data)
_sync_workspace_channels(session, bot_id)
return new_row
@app.put("/api/bots/{bot_id}/channels/{channel_id}")
def update_bot_channel(
bot_id: str,
channel_id: str,
payload: ChannelConfigUpdateRequest,
session: Session = Depends(get_session),
):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
channel_key = str(channel_id or "").strip().lower()
rows = _get_bot_channels_from_config(bot)
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
update_data = payload.model_dump(exclude_unset=True)
existing_type = str(row.get("channel_type") or "").strip().lower()
new_type = existing_type
if "channel_type" in update_data and update_data["channel_type"] is not None:
new_type = str(update_data["channel_type"]).strip().lower()
if not new_type:
raise HTTPException(status_code=400, detail="channel_type cannot be empty")
if existing_type == "dashboard" and new_type != "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed")
if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows):
raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}")
if "external_app_id" in update_data and update_data["external_app_id"] is not None:
row["external_app_id"] = str(update_data["external_app_id"]).strip()
if "app_secret" in update_data and update_data["app_secret"] is not None:
row["app_secret"] = str(update_data["app_secret"]).strip()
if "internal_port" in update_data and update_data["internal_port"] is not None:
row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535))
if "is_active" in update_data and update_data["is_active"] is not None:
next_active = bool(update_data["is_active"])
if existing_type == "dashboard" and not next_active:
raise HTTPException(status_code=400, detail="dashboard channel must remain enabled")
row["is_active"] = next_active
if "extra_config" in update_data:
row["extra_config"] = _normalize_channel_extra(update_data.get("extra_config"))
row["channel_type"] = new_type
row["id"] = new_type
row["locked"] = new_type == "dashboard"
config_data = _read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
current_send_progress, current_send_tool_hints = _read_global_delivery_flags(channels_cfg)
if new_type == "dashboard":
extra = _normalize_channel_extra(row.get("extra_config"))
channels_cfg["sendProgress"] = bool(extra.get("sendProgress", current_send_progress))
channels_cfg["sendToolHints"] = bool(extra.get("sendToolHints", current_send_tool_hints))
else:
channels_cfg["sendProgress"] = current_send_progress
channels_cfg["sendToolHints"] = current_send_tool_hints
channels_cfg.pop("dashboard", None)
if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type:
channels_cfg.pop(existing_type, None)
if new_type != "dashboard":
channels_cfg[new_type] = _channel_api_to_cfg(row)
_write_bot_config(bot_id, config_data)
session.commit()
_sync_workspace_channels(session, bot_id)
return row
@app.delete("/api/bots/{bot_id}/channels/{channel_id}")
def delete_bot_channel(bot_id: str, channel_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
channel_key = str(channel_id or "").strip().lower()
rows = _get_bot_channels_from_config(bot)
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
if str(row.get("channel_type") or "").lower() == "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted")
config_data = _read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
channels_cfg.pop(str(row.get("channel_type") or "").lower(), None)
_write_bot_config(bot_id, config_data)
session.commit()
_sync_workspace_channels(session, bot_id)
return {"status": "deleted"}
@app.post("/api/bots/{bot_id}/command")
def send_command(bot_id: str, payload: CommandRequest, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
attachments = _normalize_media_list(payload.attachments, bot_id)
command = str(payload.command or "").strip()
if not command and not attachments:
raise HTTPException(status_code=400, detail="Command or attachments is required")
checked_attachments: List[str] = []
for rel in attachments:
_, target = _resolve_workspace_path(bot_id, rel)
if not os.path.isfile(target):
raise HTTPException(status_code=400, detail=f"attachment not found: {rel}")
checked_attachments.append(rel)
delivery_media = [f"/root/.nanobot/workspace/{p.lstrip('/')}" for p in checked_attachments]
display_command = command if command else "[attachment message]"
delivery_command = display_command
if checked_attachments:
all_visual = all(_is_visual_attachment_path(p) for p in checked_attachments)
if all_visual:
has_video = any(_is_video_attachment_path(p) for p in checked_attachments)
media_label = "图片/视频" if has_video else "图片"
if command:
delivery_command = (
f"{command}\n\n"
"【附件处理要求】\n"
f"1) 附件中的{media_label}已作为多模态输入提供,优先直接理解并回答。\n"
"2) 若当前模型无法直接理解图片或视频,必须先明确回复:"
"\"无法直接理解该图片/视频,正在调用工具解析。\"\n"
"3) 在给出上述提示后,再调用工具解析附件并继续完成用户任务。\n"
"4) 除非用户明确要求,不要先调用工具读取附件文件。\n"
"5) 回复语言必须遵循 USER.md若未指定则与用户当前输入语言保持一致。\n"
"6) 仅基于可见内容回答;看不清或无法确认的部分请明确说明,不要猜测。"
)
else:
delivery_command = (
f"请直接分析已附带的{media_label}并总结关键信息。\n"
"若当前模型无法直接理解图片或视频,请先明确回复:"
"\"无法直接理解该图片/视频,正在调用工具解析。\",然后再调用工具解析。\n"
"回复语言必须遵循 USER.md若未指定则与用户当前输入语言保持一致。\n"
"仅基于可见内容回答;看不清或无法确认的部分请明确说明,不要猜测。"
)
else:
command_has_paths = all(p in command for p in checked_attachments) if command else False
attachment_block = "\n".join(f"- {p}" for p in checked_attachments)
if command and not command_has_paths:
delivery_command = (
f"{command}\n\n"
"[Attached files]\n"
f"{attachment_block}\n\n"
"Please process the attached file(s) listed above when answering this request.\n"
"Reply language must follow USER.md. If not specified, use the same language as the user input."
)
elif not command:
delivery_command = (
"Please process the uploaded file(s) listed below:\n"
f"{attachment_block}\n\n"
"Reply language must follow USER.md. If not specified, use the same language as the user input."
)
if display_command or checked_attachments:
_persist_runtime_packet(
bot_id,
{"type": "USER_COMMAND", "channel": "dashboard", "text": display_command, "media": checked_attachments},
)
loop = getattr(app.state, "main_loop", None)
if loop and loop.is_running():
asyncio.run_coroutine_threadsafe(
manager.broadcast(
bot_id,
{
"type": "USER_COMMAND",
"channel": "dashboard",
"text": display_command,
"media": checked_attachments,
},
),
loop,
)
success = docker_manager.send_command(bot_id, delivery_command, media=delivery_media)
if not success:
detail = docker_manager.get_last_delivery_error(bot_id)
if loop and loop.is_running():
asyncio.run_coroutine_threadsafe(
manager.broadcast(
bot_id,
{
"type": "AGENT_STATE",
"channel": "dashboard",
"payload": {
"state": "ERROR",
"action_msg": detail or "command delivery failed",
},
},
),
loop,
)
raise HTTPException(
status_code=502,
detail=f"Failed to deliver command to bot dashboard channel{': ' + detail if detail else ''}",
)
return {"success": True}
@app.get("/api/bots/{bot_id}/messages")
def list_bot_messages(bot_id: str, limit: int = 200, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
safe_limit = max(1, min(int(limit), 500))
rows = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id)
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
.limit(safe_limit)
).all()
ordered = list(reversed(rows))
return [
{
"id": row.id,
"bot_id": row.bot_id,
"role": row.role,
"text": row.text,
"media": _parse_message_media(bot_id, getattr(row, "media_json", None)),
"ts": int(row.created_at.timestamp() * 1000),
}
for row in ordered
]
@app.delete("/api/bots/{bot_id}/messages")
def clear_bot_messages(bot_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
deleted = 0
for row in rows:
session.delete(row)
deleted += 1
bot.last_action = ""
bot.current_state = "IDLE"
bot.updated_at = datetime.utcnow()
session.add(bot)
session.commit()
return {"bot_id": bot_id, "deleted": deleted}
@app.get("/api/bots/{bot_id}/logs")
def get_bot_logs(bot_id: str, tail: int = 300, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return {"bot_id": bot_id, "logs": docker_manager.get_recent_logs(bot_id, tail=tail)}
@app.get("/api/bots/{bot_id}/workspace/tree")
def get_workspace_tree(bot_id: str, path: Optional[str] = None, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
root = _workspace_root(bot_id)
if not os.path.isdir(root):
return {"bot_id": bot_id, "root": root, "cwd": "", "parent": None, "entries": []}
_, target = _resolve_workspace_path(bot_id, path)
if not os.path.isdir(target):
raise HTTPException(status_code=400, detail="workspace path is not a directory")
cwd = os.path.relpath(target, root).replace("\\", "/")
if cwd == ".":
cwd = ""
parent = None
if cwd:
parent = os.path.dirname(cwd).replace("\\", "/")
if parent == ".":
parent = ""
return {
"bot_id": bot_id,
"root": root,
"cwd": cwd,
"parent": parent,
"entries": _list_workspace_dir(target, root),
}
@app.get("/api/bots/{bot_id}/workspace/file")
def read_workspace_file(
bot_id: str,
path: str,
max_bytes: int = 200000,
session: Session = Depends(get_session),
):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
root, target = _resolve_workspace_path(bot_id, path)
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="workspace file not found")
ext = os.path.splitext(target)[1].lower()
text_ext = {
"",
".md",
".txt",
".log",
".json",
".yaml",
".yml",
".cfg",
".ini",
".csv",
".tsv",
".toml",
".py",
".sh",
}
if ext not in text_ext:
raise HTTPException(status_code=400, detail=f"unsupported file type: {ext or '(none)'}")
safe_max = max(4096, min(int(max_bytes), 1000000))
with open(target, "rb") as f:
raw = f.read(safe_max + 1)
if b"\x00" in raw:
raise HTTPException(status_code=400, detail="binary file is not previewable")
truncated = len(raw) > safe_max
body = raw[:safe_max] if truncated else raw
text_body = body.decode("utf-8", errors="replace")
rel_path = os.path.relpath(target, root).replace("\\", "/")
return {
"bot_id": bot_id,
"path": rel_path,
"size": os.path.getsize(target),
"is_markdown": rel_path.lower().endswith(".md"),
"truncated": truncated,
"content": text_body,
}
@app.get("/api/bots/{bot_id}/cron/jobs")
def list_cron_jobs(bot_id: str, include_disabled: bool = True, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
store = _read_cron_store(bot_id)
rows = []
for row in store.get("jobs", []):
if not isinstance(row, dict):
continue
enabled = bool(row.get("enabled", True))
if not include_disabled and not enabled:
continue
rows.append(row)
rows.sort(key=lambda v: int(((v.get("state") or {}).get("nextRunAtMs")) or 2**62))
return {"bot_id": bot_id, "version": int(store.get("version", 1) or 1), "jobs": rows}
@app.post("/api/bots/{bot_id}/cron/jobs/{job_id}/stop")
def stop_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
store = _read_cron_store(bot_id)
jobs = store.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
found = None
for row in jobs:
if isinstance(row, dict) and str(row.get("id")) == job_id:
found = row
break
if not found:
raise HTTPException(status_code=404, detail="Cron job not found")
found["enabled"] = False
found["updatedAtMs"] = int(datetime.utcnow().timestamp() * 1000)
_write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs})
return {"status": "stopped", "job_id": job_id}
@app.delete("/api/bots/{bot_id}/cron/jobs/{job_id}")
def delete_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
store = _read_cron_store(bot_id)
jobs = store.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
kept = [row for row in jobs if not (isinstance(row, dict) and str(row.get("id")) == job_id)]
if len(kept) == len(jobs):
raise HTTPException(status_code=404, detail="Cron job not found")
_write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": kept})
return {"status": "deleted", "job_id": job_id}
@app.get("/api/bots/{bot_id}/workspace/download")
def download_workspace_file(
bot_id: str,
path: str,
download: bool = False,
session: Session = Depends(get_session),
):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
_, target = _resolve_workspace_path(bot_id, path)
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="workspace file not found")
media_type, _ = mimetypes.guess_type(target)
if download:
return FileResponse(target, filename=os.path.basename(target), media_type=media_type)
return FileResponse(target, media_type=media_type)
@app.post("/api/bots/{bot_id}/workspace/upload")
async def upload_workspace_files(
bot_id: str,
files: List[UploadFile] = File(...),
path: Optional[str] = None,
session: Session = Depends(get_session),
):
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
if not files:
raise HTTPException(status_code=400, detail="no files uploaded")
max_bytes = UPLOAD_MAX_MB * 1024 * 1024
root, upload_dir = _resolve_workspace_path(bot_id, path or "uploads")
os.makedirs(upload_dir, exist_ok=True)
safe_dir_real = os.path.abspath(upload_dir)
if os.path.commonpath([root, safe_dir_real]) != root:
raise HTTPException(status_code=400, detail="invalid upload target path")
rows: List[Dict[str, Any]] = []
for upload in files:
original = (upload.filename or "upload.bin").strip() or "upload.bin"
name = os.path.basename(original).replace("\\", "_").replace("/", "_")
name = re.sub(r"[^\w.\-()+@ ]+", "_", name)
if not name:
name = "upload.bin"
abs_path = os.path.join(safe_dir_real, name)
if os.path.exists(abs_path):
base, ext = os.path.splitext(name)
name = f"{base}-{int(datetime.utcnow().timestamp())}{ext}"
abs_path = os.path.join(safe_dir_real, name)
total_size = 0
try:
with open(abs_path, "wb") as f:
while True:
chunk = await upload.read(1024 * 1024)
if not chunk:
break
total_size += len(chunk)
if total_size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File '{name}' too large (max {max_bytes // (1024 * 1024)}MB)",
)
f.write(chunk)
except HTTPException:
if os.path.exists(abs_path):
os.remove(abs_path)
raise
except OSError as exc:
if os.path.exists(abs_path):
os.remove(abs_path)
raise HTTPException(
status_code=500,
detail=f"Failed to write file '{name}': {exc.strerror or str(exc)}",
)
except Exception:
if os.path.exists(abs_path):
os.remove(abs_path)
raise HTTPException(status_code=500, detail=f"Failed to upload file '{name}'")
finally:
await upload.close()
rel = os.path.relpath(abs_path, root).replace("\\", "/")
rows.append({"name": name, "path": rel, "size": total_size})
return {"bot_id": bot_id, "files": rows}
@app.websocket("/ws/monitor/{bot_id}")
async def websocket_endpoint(websocket: WebSocket, bot_id: str):
await manager.connect(bot_id, websocket)
docker_manager.ensure_monitor(bot_id, docker_callback)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(bot_id, websocket)
def _main_server_options() -> tuple[str, int, bool]:
host = str(os.getenv("APP_HOST", "0.0.0.0") or "0.0.0.0").strip() or "0.0.0.0"
try:
port = int(os.getenv("APP_PORT", "8000"))
except Exception:
port = 8000
port = max(1, min(port, 65535))
reload_flag = str(os.getenv("APP_RELOAD", "true")).strip().lower() in {"1", "true", "yes", "on"}
return host, port, reload_flag
if __name__ == "__main__":
import uvicorn
host, port, reload_flag = _main_server_options()
app_module = f"{os.path.splitext(os.path.basename(__file__))[0]}:app"
if reload_flag:
uvicorn.run(app_module, host=host, port=port, reload=True)
else:
uvicorn.run(app, host=host, port=port)