import asyncio import io import json import mimetypes import os import re import shutil import tempfile import zipfile from datetime import datetime from typing import Any, Dict, List, Optional import httpx from pydantic import BaseModel from fastapi import Depends, FastAPI, File, HTTPException, UploadFile, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from sqlmodel import Session, select from core.config_manager import BotConfigManager from core.database import engine, get_session, init_database from core.docker_manager import BotDockerManager from core.settings import ( BOTS_WORKSPACE_ROOT, DATA_ROOT, DATABASE_ECHO, DATABASE_ENGINE, DATABASE_URL_DISPLAY, DEFAULT_AGENTS_MD, DEFAULT_IDENTITY_MD, DEFAULT_SOUL_MD, DEFAULT_TOOLS_MD, DEFAULT_USER_MD, PROJECT_ROOT, ) from models.bot import BotInstance, BotMessage, NanobotImage app = FastAPI(title="Dashboard Nanobot API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) os.makedirs(BOTS_WORKSPACE_ROOT, exist_ok=True) os.makedirs(DATA_ROOT, exist_ok=True) docker_manager = BotDockerManager(host_data_root=BOTS_WORKSPACE_ROOT) config_manager = BotConfigManager(host_data_root=BOTS_WORKSPACE_ROOT) class ChannelConfigRequest(BaseModel): channel_type: str external_app_id: Optional[str] = None app_secret: Optional[str] = None internal_port: Optional[int] = None is_active: bool = True extra_config: Optional[Dict[str, Any]] = None class ChannelConfigUpdateRequest(BaseModel): channel_type: Optional[str] = None external_app_id: Optional[str] = None app_secret: Optional[str] = None internal_port: Optional[int] = None is_active: Optional[bool] = None extra_config: Optional[Dict[str, Any]] = None class BotCreateRequest(BaseModel): id: str name: str llm_provider: str llm_model: str api_key: str image_tag: str system_prompt: Optional[str] = None api_base: Optional[str] = None temperature: float = 0.2 top_p: float = 1.0 max_tokens: int = 8192 soul_md: Optional[str] = None agents_md: Optional[str] = None user_md: Optional[str] = None tools_md: Optional[str] = None tools_config: Optional[Dict[str, Any]] = None env_params: Optional[Dict[str, str]] = None identity_md: Optional[str] = None channels: Optional[List[ChannelConfigRequest]] = None send_progress: Optional[bool] = None send_tool_hints: Optional[bool] = None class BotUpdateRequest(BaseModel): name: Optional[str] = None llm_provider: Optional[str] = None llm_model: Optional[str] = None api_key: Optional[str] = None api_base: Optional[str] = None image_tag: Optional[str] = None system_prompt: Optional[str] = None temperature: Optional[float] = None top_p: Optional[float] = None max_tokens: Optional[int] = None soul_md: Optional[str] = None agents_md: Optional[str] = None user_md: Optional[str] = None tools_md: Optional[str] = None tools_config: Optional[Dict[str, Any]] = None env_params: Optional[Dict[str, str]] = None identity_md: Optional[str] = None send_progress: Optional[bool] = None send_tool_hints: Optional[bool] = None class BotToolsConfigUpdateRequest(BaseModel): tools_config: Optional[Dict[str, Any]] = None class BotEnvParamsUpdateRequest(BaseModel): env_params: Optional[Dict[str, str]] = None class CommandRequest(BaseModel): command: Optional[str] = None attachments: Optional[List[str]] = None def _normalize_media_item(bot_id: str, value: Any) -> str: raw = str(value or "").strip().replace("\\", "/") if not raw: return "" if raw.startswith("/root/.nanobot/workspace/"): return raw[len("/root/.nanobot/workspace/") :].lstrip("/") root = _workspace_root(bot_id) if os.path.isabs(raw): try: if os.path.commonpath([root, raw]) == root: return os.path.relpath(raw, root).replace("\\", "/") except Exception: pass return raw.lstrip("/") def _normalize_media_list(raw: Any, bot_id: str) -> List[str]: if not isinstance(raw, list): return [] rows: List[str] = [] for v in raw: s = _normalize_media_item(bot_id, v) if s: rows.append(s) return rows def _persist_runtime_packet(bot_id: str, packet: Dict[str, Any]): packet_type = str(packet.get("type", "")).upper() if packet_type not in {"AGENT_STATE", "ASSISTANT_MESSAGE", "USER_COMMAND", "BUS_EVENT"}: return source_channel = str(packet.get("channel") or "").strip().lower() if source_channel != "dashboard": return with Session(engine) as session: bot = session.get(BotInstance, bot_id) if not bot: return if packet_type == "AGENT_STATE": payload = packet.get("payload") or {} state = str(payload.get("state") or "").strip() action = str(payload.get("action_msg") or payload.get("msg") or "").strip() if state: bot.current_state = state if action: bot.last_action = action[:4000] elif packet_type == "ASSISTANT_MESSAGE": bot.current_state = "IDLE" text_msg = str(packet.get("text") or "").strip() media_list = _normalize_media_list(packet.get("media"), bot_id) if text_msg or media_list: if text_msg: bot.last_action = " ".join(text_msg.split())[:4000] session.add( BotMessage( bot_id=bot_id, role="assistant", text=text_msg, media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None, ) ) elif packet_type == "USER_COMMAND": text_msg = str(packet.get("text") or "").strip() media_list = _normalize_media_list(packet.get("media"), bot_id) if text_msg or media_list: session.add( BotMessage( bot_id=bot_id, role="user", text=text_msg, media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None, ) ) elif packet_type == "BUS_EVENT": # Dashboard channel emits BUS_EVENT for both progress and final replies. # Persist only non-progress events to keep durable chat history clean. is_progress = bool(packet.get("is_progress")) if not is_progress: text_msg = str(packet.get("content") or packet.get("text") or "").strip() media_list = _normalize_media_list(packet.get("media"), bot_id) if text_msg or media_list: bot.current_state = "IDLE" if text_msg: bot.last_action = " ".join(text_msg.split())[:4000] session.add( BotMessage( bot_id=bot_id, role="assistant", text=text_msg, media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None, ) ) bot.updated_at = datetime.utcnow() session.add(bot) session.commit() class WSConnectionManager: def __init__(self): self.connections: Dict[str, List[WebSocket]] = {} async def connect(self, bot_id: str, websocket: WebSocket): await websocket.accept() self.connections.setdefault(bot_id, []).append(websocket) def disconnect(self, bot_id: str, websocket: WebSocket): conns = self.connections.get(bot_id, []) if websocket in conns: conns.remove(websocket) if not conns and bot_id in self.connections: del self.connections[bot_id] async def broadcast(self, bot_id: str, data: Dict[str, Any]): conns = list(self.connections.get(bot_id, [])) for ws in conns: try: await ws.send_json(data) except Exception: self.disconnect(bot_id, ws) manager = WSConnectionManager() def docker_callback(bot_id: str, packet: Dict[str, Any]): _persist_runtime_packet(bot_id, packet) loop = getattr(app.state, "main_loop", None) if not loop or not loop.is_running(): return asyncio.run_coroutine_threadsafe(manager.broadcast(bot_id, packet), loop) @app.on_event("startup") async def on_startup(): app.state.main_loop = asyncio.get_running_loop() print(f"📁 项目根目录: {PROJECT_ROOT}") print(f"🗄️ 数据库引擎: {DATABASE_ENGINE} (echo={DATABASE_ECHO})") print(f"📁 数据库连接: {DATABASE_URL_DISPLAY}") init_database() with Session(engine) as session: running_bots = session.exec(select(BotInstance).where(BotInstance.docker_status == "RUNNING")).all() for bot in running_bots: docker_manager.ensure_monitor(bot.id, docker_callback) def _provider_defaults(provider: str) -> tuple[str, str]: p = provider.lower().strip() if p in {"openrouter"}: return "openrouter", "https://openrouter.ai/api/v1" if p in {"dashscope", "aliyun", "qwen", "aliyun-qwen"}: return "dashscope", "https://dashscope.aliyuncs.com/compatible-mode/v1" if p in {"kimi", "moonshot"}: return "kimi", "https://api.moonshot.cn/v1" if p in {"minimax"}: return "minimax", "https://api.minimax.chat/v1" return p, "" @app.get("/api/system/defaults") def get_system_defaults(): return { "templates": { "soul_md": DEFAULT_SOUL_MD, "agents_md": DEFAULT_AGENTS_MD, "user_md": DEFAULT_USER_MD, "tools_md": DEFAULT_TOOLS_MD, "identity_md": DEFAULT_IDENTITY_MD, } } @app.get("/api/health") def get_health(): try: with Session(engine) as session: session.exec(select(BotInstance).limit(1)).first() return {"status": "ok", "database": DATABASE_ENGINE} except Exception as e: raise HTTPException(status_code=503, detail=f"database check failed: {e}") def _config_json_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "config.json") def _read_bot_config(bot_id: str) -> Dict[str, Any]: path = _config_json_path(bot_id) if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, dict) else {} except Exception: return {} def _write_bot_config(bot_id: str, config_data: Dict[str, Any]) -> None: path = _config_json_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(config_data, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def _normalize_channel_extra(raw: Any) -> Dict[str, Any]: if not isinstance(raw, dict): return {} return raw def _channel_cfg_to_api_dict(bot_id: str, ctype: str, cfg: Dict[str, Any]) -> Dict[str, Any]: ctype = str(ctype or "").strip().lower() enabled = bool(cfg.get("enabled", True)) port = max(1, min(int(cfg.get("port", 8080) or 8080), 65535)) extra: Dict[str, Any] = {} external_app_id = "" app_secret = "" if ctype == "feishu": external_app_id = str(cfg.get("appId") or "") app_secret = str(cfg.get("appSecret") or "") extra = { "encryptKey": cfg.get("encryptKey", ""), "verificationToken": cfg.get("verificationToken", ""), "allowFrom": cfg.get("allowFrom", []), } elif ctype == "dingtalk": external_app_id = str(cfg.get("clientId") or "") app_secret = str(cfg.get("clientSecret") or "") extra = {"allowFrom": cfg.get("allowFrom", [])} elif ctype == "telegram": app_secret = str(cfg.get("token") or "") extra = { "proxy": cfg.get("proxy", ""), "replyToMessage": bool(cfg.get("replyToMessage", False)), "allowFrom": cfg.get("allowFrom", []), } elif ctype == "slack": external_app_id = str(cfg.get("botToken") or "") app_secret = str(cfg.get("appToken") or "") extra = { "mode": cfg.get("mode", "socket"), "replyInThread": bool(cfg.get("replyInThread", True)), "groupPolicy": cfg.get("groupPolicy", "mention"), "groupAllowFrom": cfg.get("groupAllowFrom", []), "reactEmoji": cfg.get("reactEmoji", "eyes"), } elif ctype == "qq": external_app_id = str(cfg.get("appId") or "") app_secret = str(cfg.get("secret") or "") extra = {"allowFrom": cfg.get("allowFrom", [])} else: external_app_id = str( cfg.get("appId") or cfg.get("clientId") or cfg.get("botToken") or cfg.get("externalAppId") or "" ) app_secret = str( cfg.get("appSecret") or cfg.get("clientSecret") or cfg.get("secret") or cfg.get("token") or cfg.get("appToken") or "" ) extra = {k: v for k, v in cfg.items() if k not in {"enabled", "port", "appId", "clientId", "botToken", "externalAppId", "appSecret", "clientSecret", "secret", "token", "appToken"}} return { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": external_app_id, "app_secret": app_secret, "internal_port": port, "is_active": enabled, "extra_config": extra, "locked": ctype == "dashboard", } def _channel_api_to_cfg(row: Dict[str, Any]) -> Dict[str, Any]: ctype = str(row.get("channel_type") or "").strip().lower() enabled = bool(row.get("is_active", True)) extra = _normalize_channel_extra(row.get("extra_config")) external_app_id = str(row.get("external_app_id") or "") app_secret = str(row.get("app_secret") or "") port = max(1, min(int(row.get("internal_port") or 8080), 65535)) if ctype == "feishu": return { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, "encryptKey": extra.get("encryptKey", ""), "verificationToken": extra.get("verificationToken", ""), "allowFrom": extra.get("allowFrom", []), } if ctype == "dingtalk": return { "enabled": enabled, "clientId": external_app_id, "clientSecret": app_secret, "allowFrom": extra.get("allowFrom", []), } if ctype == "telegram": return { "enabled": enabled, "token": app_secret, "proxy": extra.get("proxy", ""), "replyToMessage": bool(extra.get("replyToMessage", False)), "allowFrom": extra.get("allowFrom", []), } if ctype == "slack": return { "enabled": enabled, "mode": extra.get("mode", "socket"), "botToken": external_app_id, "appToken": app_secret, "replyInThread": bool(extra.get("replyInThread", True)), "groupPolicy": extra.get("groupPolicy", "mention"), "groupAllowFrom": extra.get("groupAllowFrom", []), "reactEmoji": extra.get("reactEmoji", "eyes"), } if ctype == "qq": return { "enabled": enabled, "appId": external_app_id, "secret": app_secret, "allowFrom": extra.get("allowFrom", []), } merged = dict(extra) merged.update( { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, "port": port, } ) return merged def _get_bot_channels_from_config(bot: BotInstance) -> List[Dict[str, Any]]: config_data = _read_bot_config(bot.id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} send_progress = bool(channels_cfg.get("sendProgress", bot.send_progress)) send_tool_hints = bool(channels_cfg.get("sendToolHints", bot.send_tool_hints)) rows: List[Dict[str, Any]] = [ { "id": "dashboard", "bot_id": bot.id, "channel_type": "dashboard", "external_app_id": f"dashboard-{bot.id}", "app_secret": "", "internal_port": 9000, "is_active": True, "extra_config": { "sendProgress": send_progress, "sendToolHints": send_tool_hints, }, "locked": True, } ] for ctype, cfg in channels_cfg.items(): if ctype in {"sendProgress", "sendToolHints"}: continue if not isinstance(cfg, dict): continue rows.append(_channel_cfg_to_api_dict(bot.id, ctype, cfg)) return rows def _normalize_initial_channels(bot_id: str, channels: Optional[List[ChannelConfigRequest]]) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] seen_types: set[str] = set() for c in channels or []: ctype = (c.channel_type or "").strip().lower() if not ctype or ctype == "dashboard" or ctype in seen_types: continue seen_types.add(ctype) rows.append( { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": (c.external_app_id or "").strip() or f"{ctype}-{bot_id}", "app_secret": (c.app_secret or "").strip(), "internal_port": max(1, min(int(c.internal_port or 8080), 65535)), "is_active": bool(c.is_active), "extra_config": _normalize_channel_extra(c.extra_config), "locked": False, } ) return rows def _parse_message_media(bot_id: str, media_raw: Optional[str]) -> List[str]: if not media_raw: return [] try: parsed = json.loads(media_raw) return _normalize_media_list(parsed, bot_id) except Exception: return [] def _default_tools_config() -> Dict[str, Any]: return { "web": { "search": { "apiKey": "", "maxResults": 5, } } } def _normalize_tools_config(raw: Any) -> Dict[str, Any]: cfg = _default_tools_config() if not isinstance(raw, dict): return cfg web_raw = raw.get("web") if isinstance(web_raw, dict): search_raw = web_raw.get("search") if isinstance(search_raw, dict): api_key = str(search_raw.get("apiKey") or search_raw.get("api_key") or "").strip() max_results_raw = search_raw.get("maxResults", search_raw.get("max_results", 5)) try: max_results = int(max_results_raw) except Exception: max_results = 5 max_results = max(1, min(max_results, 10)) cfg["web"]["search"]["apiKey"] = api_key cfg["web"]["search"]["maxResults"] = max_results return cfg def _parse_tools_config(raw: Optional[str]) -> Dict[str, Any]: if not raw: return _default_tools_config() try: parsed = json.loads(raw) except Exception: return _default_tools_config() return _normalize_tools_config(parsed) _ENV_KEY_RE = re.compile(r"^[A-Z_][A-Z0-9_]{0,127}$") def _normalize_env_params(raw: Any) -> Dict[str, str]: if not isinstance(raw, dict): return {} rows: Dict[str, str] = {} for k, v in raw.items(): key = str(k or "").strip().upper() if not key or not _ENV_KEY_RE.fullmatch(key): continue rows[key] = str(v or "").strip() return rows def _parse_env_params(raw: Any) -> Dict[str, str]: return _normalize_env_params(raw) def _sync_workspace_channels( session: Session, bot_id: str, channels_override: Optional[List[Dict[str, Any]]] = None, ) -> None: bot = session.get(BotInstance, bot_id) if not bot: return channels_data = channels_override if channels_override is not None else _get_bot_channels_from_config(bot) bot_data = bot.model_dump() for row in channels_data: if str(row.get("channel_type") or "").strip().lower() != "dashboard": continue extra = _normalize_channel_extra(row.get("extra_config")) bot_data["send_progress"] = bool(extra.get("sendProgress", bot.send_progress)) bot_data["send_tool_hints"] = bool(extra.get("sendToolHints", bot.send_tool_hints)) break normalized_channels: List[Dict[str, Any]] = [] for row in channels_data: ctype = str(row.get("channel_type") or "").strip().lower() if not ctype or ctype == "dashboard": continue normalized_channels.append( { "channel_type": ctype, "external_app_id": str(row.get("external_app_id") or ""), "app_secret": str(row.get("app_secret") or ""), "internal_port": max(1, min(int(row.get("internal_port") or 8080), 65535)), "is_active": bool(row.get("is_active", True)), "extra_config": _normalize_channel_extra(row.get("extra_config")), } ) config_manager.update_workspace( bot_id=bot_id, bot_data=bot_data, channels=normalized_channels, ) def reconcile_image_registry(session: Session): """Only reconcile status for images explicitly registered in DB.""" db_images = session.exec(select(NanobotImage)).all() for img in db_images: if docker_manager.has_image(img.tag): try: docker_img = docker_manager.client.images.get(img.tag) if docker_manager.client else None img.image_id = docker_img.id if docker_img else img.image_id except Exception: pass img.status = "READY" else: img.status = "UNKNOWN" session.add(img) session.commit() def _workspace_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot", "workspace")) def _bot_data_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot")) def _skills_root(bot_id: str) -> str: return os.path.join(_workspace_root(bot_id), "skills") def _is_valid_top_level_skill_name(name: str) -> bool: text = str(name or "").strip() if not text: return False if "/" in text or "\\" in text: return False if text in {".", ".."}: return False return True def _read_skill_description(entry_path: str) -> str: candidates: List[str] = [] if os.path.isdir(entry_path): candidates = [ os.path.join(entry_path, "SKILL.md"), os.path.join(entry_path, "skill.md"), os.path.join(entry_path, "README.md"), os.path.join(entry_path, "readme.md"), ] elif entry_path.lower().endswith(".md"): candidates = [entry_path] for candidate in candidates: if not os.path.isfile(candidate): continue try: with open(candidate, "r", encoding="utf-8") as f: for line in f: text = line.strip() if text and not text.startswith("#"): return text[:240] except Exception: continue return "" def _list_workspace_skills(bot_id: str) -> List[Dict[str, Any]]: root = _skills_root(bot_id) os.makedirs(root, exist_ok=True) rows: List[Dict[str, Any]] = [] names = sorted(os.listdir(root), key=lambda n: (not os.path.isdir(os.path.join(root, n)), n.lower())) for name in names: if not name or name.startswith("."): continue if not _is_valid_top_level_skill_name(name): continue abs_path = os.path.join(root, name) if not os.path.exists(abs_path): continue stat = os.stat(abs_path) rows.append( { "id": name, "name": name, "type": "dir" if os.path.isdir(abs_path) else "file", "path": f"skills/{name}", "size": stat.st_size if os.path.isfile(abs_path) else None, "mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z", "description": _read_skill_description(abs_path), } ) return rows def _cron_store_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "cron", "jobs.json") def _env_store_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "env.json") def _read_env_store(bot_id: str) -> Dict[str, str]: path = _env_store_path(bot_id) if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return _normalize_env_params(data) except Exception: return {} def _write_env_store(bot_id: str, env_params: Dict[str, str]) -> None: path = _env_store_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(_normalize_env_params(env_params), f, ensure_ascii=False, indent=2) os.replace(tmp, path) def _read_cron_store(bot_id: str) -> Dict[str, Any]: path = _cron_store_path(bot_id) if not os.path.isfile(path): return {"version": 1, "jobs": []} try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): return {"version": 1, "jobs": []} jobs = data.get("jobs") if not isinstance(jobs, list): data["jobs"] = [] if "version" not in data: data["version"] = 1 return data except Exception: return {"version": 1, "jobs": []} def _write_cron_store(bot_id: str, store: Dict[str, Any]) -> None: path = _cron_store_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(store, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def _resolve_workspace_path(bot_id: str, rel_path: Optional[str] = None) -> tuple[str, str]: root = _workspace_root(bot_id) rel = (rel_path or "").strip().replace("\\", "/") target = os.path.abspath(os.path.join(root, rel)) if os.path.commonpath([root, target]) != root: raise HTTPException(status_code=400, detail="invalid workspace path") return root, target def _build_workspace_tree(path: str, root: str, depth: int) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] try: names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower())) except FileNotFoundError: return rows for name in names: if name in {".DS_Store"}: continue abs_path = os.path.join(path, name) rel_path = os.path.relpath(abs_path, root).replace("\\", "/") stat = os.stat(abs_path) base: Dict[str, Any] = { "name": name, "path": rel_path, "mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z", } if os.path.isdir(abs_path): node = {**base, "type": "dir"} if depth > 0: node["children"] = _build_workspace_tree(abs_path, root, depth - 1) rows.append(node) continue rows.append( { **base, "type": "file", "size": stat.st_size, "ext": os.path.splitext(name)[1].lower(), } ) return rows def _list_workspace_dir(path: str, root: str) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower())) for name in names: if name in {".DS_Store"}: continue abs_path = os.path.join(path, name) rel_path = os.path.relpath(abs_path, root).replace("\\", "/") stat = os.stat(abs_path) rows.append( { "name": name, "path": rel_path, "type": "dir" if os.path.isdir(abs_path) else "file", "size": stat.st_size if os.path.isfile(abs_path) else None, "ext": os.path.splitext(name)[1].lower() if os.path.isfile(abs_path) else "", "mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z", } ) return rows @app.get("/api/images", response_model=List[NanobotImage]) def list_images(session: Session = Depends(get_session)): reconcile_image_registry(session) return session.exec(select(NanobotImage)).all() @app.delete("/api/images/{tag:path}") def delete_image(tag: str, session: Session = Depends(get_session)): image = session.get(NanobotImage, tag) if not image: raise HTTPException(status_code=404, detail="Image not found") # 检查是否有机器人正在使用此镜像 bots_using = session.exec(select(BotInstance).where(BotInstance.image_tag == tag)).all() if bots_using: raise HTTPException(status_code=400, detail=f"Cannot delete image: {len(bots_using)} bots are using it.") session.delete(image) session.commit() return {"status": "deleted"} @app.get("/api/docker-images") def list_docker_images(repository: str = "nanobot-base"): rows = docker_manager.list_images_by_repo(repository) return rows @app.post("/api/images/register") def register_image(payload: dict, session: Session = Depends(get_session)): tag = (payload.get("tag") or "").strip() source_dir = (payload.get("source_dir") or "manual").strip() or "manual" if not tag: raise HTTPException(status_code=400, detail="tag is required") if not docker_manager.has_image(tag): raise HTTPException(status_code=404, detail=f"Docker image not found: {tag}") version = tag.split(":")[-1].removeprefix("v") if ":" in tag else tag try: docker_img = docker_manager.client.images.get(tag) if docker_manager.client else None image_id = docker_img.id if docker_img else None except Exception: image_id = None row = session.get(NanobotImage, tag) if not row: row = NanobotImage( tag=tag, version=version, status="READY", source_dir=source_dir, image_id=image_id, ) else: row.version = version row.status = "READY" row.source_dir = source_dir row.image_id = image_id session.add(row) session.commit() session.refresh(row) return row @app.post("/api/providers/test") async def test_provider(payload: dict): provider = (payload.get("provider") or "").strip() api_key = (payload.get("api_key") or "").strip() model = (payload.get("model") or "").strip() api_base = (payload.get("api_base") or "").strip() if not provider or not api_key: raise HTTPException(status_code=400, detail="provider and api_key are required") normalized_provider, default_base = _provider_defaults(provider) base = (api_base or default_base).rstrip("/") if normalized_provider not in {"openrouter", "dashscope", "kimi", "minimax"}: raise HTTPException(status_code=400, detail=f"provider not supported for test: {provider}") if not base: raise HTTPException(status_code=400, detail=f"api_base is required for provider: {provider}") headers = {"Authorization": f"Bearer {api_key}"} timeout = httpx.Timeout(20.0, connect=10.0) url = f"{base}/models" try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.get(url, headers=headers) if resp.status_code >= 400: return { "ok": False, "provider": normalized_provider, "status_code": resp.status_code, "detail": resp.text[:500], } data = resp.json() models_raw = data.get("data", []) if isinstance(data, dict) else [] model_ids: List[str] = [] for item in models_raw[:20]: if isinstance(item, dict) and item.get("id"): model_ids.append(str(item["id"])) model_hint = "" if model: model_hint = "model_found" if any(model in m for m in model_ids) else "model_not_listed" return { "ok": True, "provider": normalized_provider, "endpoint": url, "models_preview": model_ids[:8], "model_hint": model_hint, } except Exception as e: return { "ok": False, "provider": normalized_provider, "endpoint": url, "detail": str(e), } @app.post("/api/bots", response_model=BotInstance) def create_bot(payload: BotCreateRequest, session: Session = Depends(get_session)): image_row = session.get(NanobotImage, payload.image_tag) if not image_row: raise HTTPException(status_code=400, detail=f"Image not registered in DB: {payload.image_tag}") if image_row.status != "READY": raise HTTPException(status_code=400, detail=f"Image status is not READY: {payload.image_tag} ({image_row.status})") if not docker_manager.has_image(payload.image_tag): raise HTTPException(status_code=400, detail=f"Docker image not found locally: {payload.image_tag}") bot = BotInstance( id=payload.id, name=payload.name, system_prompt=payload.system_prompt or payload.soul_md or DEFAULT_SOUL_MD, soul_md=payload.soul_md or DEFAULT_SOUL_MD, agents_md=payload.agents_md or DEFAULT_AGENTS_MD, user_md=payload.user_md or DEFAULT_USER_MD, tools_md=payload.tools_md or DEFAULT_TOOLS_MD, tools_config_json=json.dumps(_normalize_tools_config(payload.tools_config), ensure_ascii=False), identity_md=payload.identity_md or DEFAULT_IDENTITY_MD, llm_provider=payload.llm_provider, llm_model=payload.llm_model, api_key=payload.api_key, api_base=payload.api_base, temperature=payload.temperature, top_p=payload.top_p, max_tokens=payload.max_tokens, send_progress=bool(payload.send_progress) if payload.send_progress is not None else False, send_tool_hints=bool(payload.send_tool_hints) if payload.send_tool_hints is not None else False, image_tag=payload.image_tag, workspace_dir=os.path.join(BOTS_WORKSPACE_ROOT, payload.id), ) session.add(bot) session.commit() session.refresh(bot) _write_env_store(payload.id, _normalize_env_params(payload.env_params)) _sync_workspace_channels(session, payload.id, channels_override=_normalize_initial_channels(payload.id, payload.channels)) return bot @app.get("/api/bots", response_model=List[BotInstance]) def list_bots(session: Session = Depends(get_session)): return session.exec(select(BotInstance)).all() @app.put("/api/bots/{bot_id}", response_model=BotInstance) def update_bot(bot_id: str, payload: BotUpdateRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") update_data = payload.model_dump(exclude_unset=True) if "image_tag" in update_data and update_data["image_tag"]: image_tag = str(update_data["image_tag"]).strip() image_row = session.get(NanobotImage, image_tag) if not image_row: raise HTTPException(status_code=400, detail=f"Image not registered in DB: {image_tag}") if image_row.status != "READY": raise HTTPException(status_code=400, detail=f"Image status is not READY: {image_tag} ({image_row.status})") if not docker_manager.has_image(image_tag): raise HTTPException(status_code=400, detail=f"Docker image not found locally: {image_tag}") tools_config = update_data.pop("tools_config", None) if isinstance(update_data, dict) else None if tools_config is not None: bot.tools_config_json = json.dumps(_normalize_tools_config(tools_config), ensure_ascii=False) env_params = update_data.pop("env_params", None) if isinstance(update_data, dict) else None for key, value in update_data.items(): setattr(bot, key, value) session.add(bot) session.commit() session.refresh(bot) if env_params is not None: _write_env_store(bot_id, _normalize_env_params(env_params)) _sync_workspace_channels(session, bot_id) return bot @app.post("/api/bots/{bot_id}/start") async def start_bot(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") _sync_workspace_channels(session, bot_id) env_params = _read_env_store(bot_id) success = docker_manager.start_bot( bot_id, image_tag=bot.image_tag, on_state_change=docker_callback, env_vars=env_params, ) if success: bot.docker_status = "RUNNING" session.add(bot) session.commit() return {"status": "started"} raise HTTPException(status_code=500, detail=f"Failed to start container with image {bot.image_tag}") @app.post("/api/bots/{bot_id}/stop") def stop_bot(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") docker_manager.stop_bot(bot_id) bot.docker_status = "STOPPED" session.add(bot) session.commit() return {"status": "stopped"} @app.post("/api/bots/{bot_id}/deactivate") def deactivate_bot(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") docker_manager.stop_bot(bot_id) bot.docker_status = "STOPPED" session.add(bot) session.commit() return {"status": "deactivated"} @app.delete("/api/bots/{bot_id}") def delete_bot(bot_id: str, delete_workspace: bool = True, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") docker_manager.stop_bot(bot_id) messages = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all() for row in messages: session.delete(row) session.delete(bot) session.commit() if delete_workspace: workspace_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id) if os.path.isdir(workspace_root): shutil.rmtree(workspace_root, ignore_errors=True) return {"status": "deleted", "workspace_deleted": bool(delete_workspace)} @app.get("/api/bots/{bot_id}/channels") def list_bot_channels(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return _get_bot_channels_from_config(bot) @app.get("/api/bots/{bot_id}/skills") def list_bot_skills(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return _list_workspace_skills(bot_id) @app.get("/api/bots/{bot_id}/tools-config") def get_bot_tools_config(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return { "bot_id": bot_id, "tools_config": _parse_tools_config(bot.tools_config_json), } @app.put("/api/bots/{bot_id}/tools-config") def update_bot_tools_config(bot_id: str, payload: BotToolsConfigUpdateRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") normalized = _normalize_tools_config(payload.tools_config) bot.tools_config_json = json.dumps(normalized, ensure_ascii=False) session.add(bot) session.commit() session.refresh(bot) _sync_workspace_channels(session, bot_id) return { "status": "updated", "bot_id": bot_id, "tools_config": normalized, } @app.get("/api/bots/{bot_id}/env-params") def get_bot_env_params(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return { "bot_id": bot_id, "env_params": _read_env_store(bot_id), } @app.put("/api/bots/{bot_id}/env-params") def update_bot_env_params(bot_id: str, payload: BotEnvParamsUpdateRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") normalized = _normalize_env_params(payload.env_params) _write_env_store(bot_id, normalized) return { "status": "updated", "bot_id": bot_id, "env_params": normalized, "restart_required": True, } @app.post("/api/bots/{bot_id}/skills/upload") async def upload_bot_skill_zip(bot_id: str, file: UploadFile = File(...), session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") filename = str(file.filename or "").strip() if not filename.lower().endswith(".zip"): raise HTTPException(status_code=400, detail="Only .zip skill package is supported") try: zip_bytes = await file.read() if not zip_bytes: raise HTTPException(status_code=400, detail="Zip package is empty") archive = zipfile.ZipFile(io.BytesIO(zip_bytes)) except Exception: raise HTTPException(status_code=400, detail="Invalid zip file") skills_root = _skills_root(bot_id) os.makedirs(skills_root, exist_ok=True) installed: List[str] = [] with archive: members = archive.infolist() file_members = [m for m in members if not m.is_dir()] if not file_members: raise HTTPException(status_code=400, detail="Zip package has no files") top_names: List[str] = [] for member in file_members: raw_name = str(member.filename or "").replace("\\", "/").lstrip("/") if not raw_name: continue first = raw_name.split("/", 1)[0].strip() if not _is_valid_top_level_skill_name(first): raise HTTPException(status_code=400, detail=f"Invalid skill entry name in zip: {first}") if first not in top_names: top_names.append(first) if not top_names: raise HTTPException(status_code=400, detail="Zip package has no valid skill entries") conflicts = [name for name in top_names if os.path.exists(os.path.join(skills_root, name))] if conflicts: raise HTTPException(status_code=400, detail=f"Skill already exists: {', '.join(conflicts)}") with tempfile.TemporaryDirectory(prefix=".skill_upload_", dir=skills_root) as tmp_dir: tmp_root = os.path.abspath(tmp_dir) for member in members: raw_name = str(member.filename or "").replace("\\", "/").lstrip("/") if not raw_name: continue target = os.path.abspath(os.path.join(tmp_root, raw_name)) if os.path.commonpath([tmp_root, target]) != tmp_root: raise HTTPException(status_code=400, detail=f"Unsafe zip entry path: {raw_name}") if member.is_dir(): os.makedirs(target, exist_ok=True) continue os.makedirs(os.path.dirname(target), exist_ok=True) with archive.open(member, "r") as source, open(target, "wb") as dest: shutil.copyfileobj(source, dest) for name in top_names: src = os.path.join(tmp_root, name) dst = os.path.join(skills_root, name) if not os.path.exists(src): continue shutil.move(src, dst) installed.append(name) if not installed: raise HTTPException(status_code=400, detail="No skill entries installed from zip") return { "status": "installed", "bot_id": bot_id, "installed": installed, "skills": _list_workspace_skills(bot_id), } @app.delete("/api/bots/{bot_id}/skills/{skill_name}") def delete_bot_skill(bot_id: str, skill_name: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") name = str(skill_name or "").strip() if not _is_valid_top_level_skill_name(name): raise HTTPException(status_code=400, detail="Invalid skill name") root = _skills_root(bot_id) target = os.path.abspath(os.path.join(root, name)) if os.path.commonpath([os.path.abspath(root), target]) != os.path.abspath(root): raise HTTPException(status_code=400, detail="Invalid skill path") if not os.path.exists(target): raise HTTPException(status_code=404, detail="Skill not found in workspace") if os.path.isdir(target): shutil.rmtree(target, ignore_errors=False) else: os.remove(target) return {"status": "deleted", "bot_id": bot_id, "skill": name} @app.post("/api/bots/{bot_id}/channels") def create_bot_channel(bot_id: str, payload: ChannelConfigRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") ctype = (payload.channel_type or "").strip().lower() if not ctype: raise HTTPException(status_code=400, detail="channel_type is required") if ctype == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually") current_rows = _get_bot_channels_from_config(bot) if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}") new_row = { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": (payload.external_app_id or "").strip() or f"{ctype}-{bot_id}", "app_secret": (payload.app_secret or "").strip(), "internal_port": max(1, min(int(payload.internal_port or 8080), 65535)), "is_active": bool(payload.is_active), "extra_config": _normalize_channel_extra(payload.extra_config), "locked": False, } config_data = _read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg channels_cfg[ctype] = _channel_api_to_cfg(new_row) _write_bot_config(bot_id, config_data) _sync_workspace_channels(session, bot_id) return new_row @app.put("/api/bots/{bot_id}/channels/{channel_id}") def update_bot_channel( bot_id: str, channel_id: str, payload: ChannelConfigUpdateRequest, session: Session = Depends(get_session), ): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") channel_key = str(channel_id or "").strip().lower() rows = _get_bot_channels_from_config(bot) row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None) if not row: raise HTTPException(status_code=404, detail="Channel not found") update_data = payload.model_dump(exclude_unset=True) existing_type = str(row.get("channel_type") or "").strip().lower() new_type = existing_type if "channel_type" in update_data and update_data["channel_type"] is not None: new_type = str(update_data["channel_type"]).strip().lower() if not new_type: raise HTTPException(status_code=400, detail="channel_type cannot be empty") if existing_type == "dashboard" and new_type != "dashboard": raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed") if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}") if "external_app_id" in update_data and update_data["external_app_id"] is not None: row["external_app_id"] = str(update_data["external_app_id"]).strip() if "app_secret" in update_data and update_data["app_secret"] is not None: row["app_secret"] = str(update_data["app_secret"]).strip() if "internal_port" in update_data and update_data["internal_port"] is not None: row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535)) if "is_active" in update_data and update_data["is_active"] is not None: next_active = bool(update_data["is_active"]) if existing_type == "dashboard" and not next_active: raise HTTPException(status_code=400, detail="dashboard channel must remain enabled") row["is_active"] = next_active if "extra_config" in update_data: row["extra_config"] = _normalize_channel_extra(update_data.get("extra_config")) row["channel_type"] = new_type row["id"] = new_type row["locked"] = new_type == "dashboard" if new_type == "dashboard": extra = _normalize_channel_extra(row.get("extra_config")) bot.send_progress = bool(extra.get("sendProgress", bot.send_progress)) bot.send_tool_hints = bool(extra.get("sendToolHints", bot.send_tool_hints)) session.add(bot) config_data = _read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg channels_cfg["sendProgress"] = bool(bot.send_progress) channels_cfg["sendToolHints"] = bool(bot.send_tool_hints) if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type: channels_cfg.pop(existing_type, None) if new_type != "dashboard": channels_cfg[new_type] = _channel_api_to_cfg(row) _write_bot_config(bot_id, config_data) session.commit() _sync_workspace_channels(session, bot_id) return row @app.delete("/api/bots/{bot_id}/channels/{channel_id}") def delete_bot_channel(bot_id: str, channel_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") channel_key = str(channel_id or "").strip().lower() rows = _get_bot_channels_from_config(bot) row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None) if not row: raise HTTPException(status_code=404, detail="Channel not found") if str(row.get("channel_type") or "").lower() == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted") config_data = _read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg channels_cfg.pop(str(row.get("channel_type") or "").lower(), None) _write_bot_config(bot_id, config_data) session.commit() _sync_workspace_channels(session, bot_id) return {"status": "deleted"} @app.post("/api/bots/{bot_id}/command") def send_command(bot_id: str, payload: CommandRequest, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") attachments = _normalize_media_list(payload.attachments, bot_id) command = str(payload.command or "").strip() if not command and not attachments: raise HTTPException(status_code=400, detail="Command or attachments is required") checked_attachments: List[str] = [] for rel in attachments: _, target = _resolve_workspace_path(bot_id, rel) if not os.path.isfile(target): raise HTTPException(status_code=400, detail=f"attachment not found: {rel}") checked_attachments.append(rel) display_command = command if command else "[attachment message]" delivery_command = display_command if checked_attachments: command_has_paths = all(p in command for p in checked_attachments) if command else False attachment_block = "\n".join(f"- {p}" for p in checked_attachments) if command and not command_has_paths: delivery_command = ( f"{command}\n\n" "[Attached files]\n" f"{attachment_block}\n\n" "Please process the attached file(s) listed above when answering this request." ) elif not command: delivery_command = ( "Please process the uploaded file(s) listed below:\n" f"{attachment_block}" ) if display_command or checked_attachments: _persist_runtime_packet( bot_id, {"type": "USER_COMMAND", "channel": "dashboard", "text": display_command, "media": checked_attachments}, ) loop = getattr(app.state, "main_loop", None) if loop and loop.is_running(): asyncio.run_coroutine_threadsafe( manager.broadcast( bot_id, { "type": "USER_COMMAND", "channel": "dashboard", "text": display_command, "media": checked_attachments, }, ), loop, ) success = docker_manager.send_command(bot_id, delivery_command, media=checked_attachments) if not success: if loop and loop.is_running(): asyncio.run_coroutine_threadsafe( manager.broadcast( bot_id, { "type": "AGENT_STATE", "channel": "dashboard", "payload": { "state": "ERROR", "action_msg": "command delivery failed", }, }, ), loop, ) raise HTTPException(status_code=502, detail="Failed to deliver command to bot dashboard channel") return {"success": True} @app.get("/api/bots/{bot_id}/messages") def list_bot_messages(bot_id: str, limit: int = 200, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") safe_limit = max(1, min(int(limit), 500)) rows = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id) .order_by(BotMessage.created_at.desc(), BotMessage.id.desc()) .limit(safe_limit) ).all() ordered = list(reversed(rows)) return [ { "id": row.id, "bot_id": row.bot_id, "role": row.role, "text": row.text, "media": _parse_message_media(bot_id, getattr(row, "media_json", None)), "ts": int(row.created_at.timestamp() * 1000), } for row in ordered ] @app.delete("/api/bots/{bot_id}/messages") def clear_bot_messages(bot_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all() deleted = 0 for row in rows: session.delete(row) deleted += 1 bot.last_action = "" bot.current_state = "IDLE" bot.updated_at = datetime.utcnow() session.add(bot) session.commit() return {"bot_id": bot_id, "deleted": deleted} @app.get("/api/bots/{bot_id}/logs") def get_bot_logs(bot_id: str, tail: int = 300, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return {"bot_id": bot_id, "logs": docker_manager.get_recent_logs(bot_id, tail=tail)} @app.get("/api/bots/{bot_id}/workspace/tree") def get_workspace_tree(bot_id: str, path: Optional[str] = None, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") root = _workspace_root(bot_id) if not os.path.isdir(root): return {"bot_id": bot_id, "root": root, "cwd": "", "parent": None, "entries": []} _, target = _resolve_workspace_path(bot_id, path) if not os.path.isdir(target): raise HTTPException(status_code=400, detail="workspace path is not a directory") cwd = os.path.relpath(target, root).replace("\\", "/") if cwd == ".": cwd = "" parent = None if cwd: parent = os.path.dirname(cwd).replace("\\", "/") if parent == ".": parent = "" return { "bot_id": bot_id, "root": root, "cwd": cwd, "parent": parent, "entries": _list_workspace_dir(target, root), } @app.get("/api/bots/{bot_id}/workspace/file") def read_workspace_file( bot_id: str, path: str, max_bytes: int = 200000, session: Session = Depends(get_session), ): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") root, target = _resolve_workspace_path(bot_id, path) if not os.path.isfile(target): raise HTTPException(status_code=404, detail="workspace file not found") ext = os.path.splitext(target)[1].lower() text_ext = { "", ".md", ".txt", ".log", ".json", ".yaml", ".yml", ".cfg", ".ini", ".csv", ".tsv", ".toml", ".py", ".sh", } if ext not in text_ext: raise HTTPException(status_code=400, detail=f"unsupported file type: {ext or '(none)'}") safe_max = max(4096, min(int(max_bytes), 1000000)) with open(target, "rb") as f: raw = f.read(safe_max + 1) if b"\x00" in raw: raise HTTPException(status_code=400, detail="binary file is not previewable") truncated = len(raw) > safe_max body = raw[:safe_max] if truncated else raw text_body = body.decode("utf-8", errors="replace") rel_path = os.path.relpath(target, root).replace("\\", "/") return { "bot_id": bot_id, "path": rel_path, "size": os.path.getsize(target), "is_markdown": rel_path.lower().endswith(".md"), "truncated": truncated, "content": text_body, } @app.get("/api/bots/{bot_id}/cron/jobs") def list_cron_jobs(bot_id: str, include_disabled: bool = True, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") store = _read_cron_store(bot_id) rows = [] for row in store.get("jobs", []): if not isinstance(row, dict): continue enabled = bool(row.get("enabled", True)) if not include_disabled and not enabled: continue rows.append(row) rows.sort(key=lambda v: int(((v.get("state") or {}).get("nextRunAtMs")) or 2**62)) return {"bot_id": bot_id, "version": int(store.get("version", 1) or 1), "jobs": rows} @app.post("/api/bots/{bot_id}/cron/jobs/{job_id}/stop") def stop_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") store = _read_cron_store(bot_id) jobs = store.get("jobs", []) if not isinstance(jobs, list): jobs = [] found = None for row in jobs: if isinstance(row, dict) and str(row.get("id")) == job_id: found = row break if not found: raise HTTPException(status_code=404, detail="Cron job not found") found["enabled"] = False found["updatedAtMs"] = int(datetime.utcnow().timestamp() * 1000) _write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs}) return {"status": "stopped", "job_id": job_id} @app.delete("/api/bots/{bot_id}/cron/jobs/{job_id}") def delete_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") store = _read_cron_store(bot_id) jobs = store.get("jobs", []) if not isinstance(jobs, list): jobs = [] kept = [row for row in jobs if not (isinstance(row, dict) and str(row.get("id")) == job_id)] if len(kept) == len(jobs): raise HTTPException(status_code=404, detail="Cron job not found") _write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": kept}) return {"status": "deleted", "job_id": job_id} @app.get("/api/bots/{bot_id}/workspace/download") def download_workspace_file( bot_id: str, path: str, download: bool = False, session: Session = Depends(get_session), ): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") _, target = _resolve_workspace_path(bot_id, path) if not os.path.isfile(target): raise HTTPException(status_code=404, detail="workspace file not found") media_type, _ = mimetypes.guess_type(target) if download: return FileResponse(target, filename=os.path.basename(target), media_type=media_type) return FileResponse(target, media_type=media_type) @app.post("/api/bots/{bot_id}/workspace/upload") async def upload_workspace_files( bot_id: str, files: List[UploadFile] = File(...), path: Optional[str] = None, session: Session = Depends(get_session), ): bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") if not files: raise HTTPException(status_code=400, detail="no files uploaded") root, upload_dir = _resolve_workspace_path(bot_id, path or "uploads") os.makedirs(upload_dir, exist_ok=True) safe_dir_real = os.path.abspath(upload_dir) if os.path.commonpath([root, safe_dir_real]) != root: raise HTTPException(status_code=400, detail="invalid upload target path") rows: List[Dict[str, Any]] = [] for upload in files: original = (upload.filename or "upload.bin").strip() or "upload.bin" name = os.path.basename(original).replace("\\", "_").replace("/", "_") name = re.sub(r"[^\w.\-()+@ ]+", "_", name) if not name: name = "upload.bin" abs_path = os.path.join(safe_dir_real, name) if os.path.exists(abs_path): base, ext = os.path.splitext(name) name = f"{base}-{int(datetime.utcnow().timestamp())}{ext}" abs_path = os.path.join(safe_dir_real, name) content = await upload.read() with open(abs_path, "wb") as f: f.write(content) rel = os.path.relpath(abs_path, root).replace("\\", "/") rows.append({"name": name, "path": rel, "size": len(content)}) return {"bot_id": bot_id, "files": rows} @app.websocket("/ws/monitor/{bot_id}") async def websocket_endpoint(websocket: WebSocket, bot_id: str): await manager.connect(bot_id, websocket) docker_manager.ensure_monitor(bot_id, docker_callback) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(bot_id, websocket) def _main_server_options() -> tuple[str, int, bool]: host = str(os.getenv("APP_HOST", "0.0.0.0") or "0.0.0.0").strip() or "0.0.0.0" try: port = int(os.getenv("APP_PORT", "8000")) except Exception: port = 8000 port = max(1, min(port, 65535)) reload_flag = str(os.getenv("APP_RELOAD", "true")).strip().lower() in {"1", "true", "yes", "on"} return host, port, reload_flag if __name__ == "__main__": import uvicorn host, port, reload_flag = _main_server_options() app_module = f"{os.path.splitext(os.path.basename(__file__))[0]}:app" if reload_flag: uvicorn.run(app_module, host=host, port=port, reload=True) else: uvicorn.run(app, host=host, port=port)