import json import os import hashlib from typing import Any, Dict, List, Optional DEFAULT_SOUL_MD = "# Soul\n" DEFAULT_AGENTS_MD = "# Agent Instructions\n" DEFAULT_USER_MD = "# User Preferences\n" DEFAULT_TOOLS_MD = "# Tools\n" DEFAULT_IDENTITY_MD = "# Identity\n" class EdgeProvisionService: def __init__(self, *, host_data_root: str) -> None: self._host_data_root = host_data_root def sync_bot_workspace( self, *, bot_id: str, payload: Any, ) -> Dict[str, Any]: runtime = dict(getattr(payload, "runtime_overrides", None) or {}) workspace_root_override = self._workspace_root_override(runtime) workspace_bot_dir = self._bot_workspace_dir(bot_id, workspace_root_override) state_nanobot_dir = os.path.join(workspace_bot_dir, ".nanobot") workspace_dir = os.path.join(workspace_bot_dir, ".nanobot", "workspace") memory_dir = os.path.join(workspace_dir, "memory") skills_dir = os.path.join(workspace_dir, "skills") for path in [state_nanobot_dir, workspace_dir, memory_dir, skills_dir]: os.makedirs(path, exist_ok=True) channels_override = list(getattr(payload, "channels_override", None) or []) global_delivery_override = dict(getattr(payload, "global_delivery_override", None) or {}) raw_provider_name = str(runtime.get("llm_provider") or "openrouter").strip().lower() provider_name = { "aliyun": "dashscope", "qwen": "dashscope", "aliyun-qwen": "dashscope", "moonshot": "kimi", "vllm": "openai", "xunfei": "openai", "iflytek": "openai", "xfyun": "openai", }.get(raw_provider_name, raw_provider_name) model_name = str(runtime.get("llm_model") or "openai/gpt-4o-mini").strip() if provider_name == "openai" and raw_provider_name in {"xunfei", "iflytek", "xfyun"} and model_name and "/" not in model_name: model_name = f"openai/{model_name}" provider_cfg: Dict[str, Any] = {"apiKey": str(runtime.get("api_key") or "").strip()} api_base = str(runtime.get("api_base") or "").strip() if api_base: provider_cfg["apiBase"] = api_base channels_cfg: Dict[str, Any] = { "sendProgress": bool(global_delivery_override.get("sendProgress", runtime.get("send_progress", False))), "sendToolHints": bool(global_delivery_override.get("sendToolHints", runtime.get("send_tool_hints", False))), } existing_config: Dict[str, Any] = {} config_path = os.path.join(state_nanobot_dir, "config.json") if os.path.isfile(config_path): try: with open(config_path, "r", encoding="utf-8") as fh: loaded = json.load(fh) if isinstance(loaded, dict): existing_config = loaded except Exception: existing_config = {} existing_tools = existing_config.get("tools") tools_cfg: Dict[str, Any] = dict(existing_tools) if isinstance(existing_tools, dict) else {} native_sandbox_mode = self._normalize_native_sandbox_mode(runtime.get("native_sandbox_mode")) if native_sandbox_mode == "workspace": tools_cfg["restrictToWorkspace"] = True elif native_sandbox_mode == "full_access": tools_cfg["restrictToWorkspace"] = False existing_channels = existing_config.get("channels") existing_dashboard_cfg = ( existing_channels.get("dashboard") if isinstance(existing_channels, dict) and isinstance(existing_channels.get("dashboard"), dict) else {} ) dashboard_cfg: Dict[str, Any] = { "enabled": True, "host": "0.0.0.0", "port": self._dashboard_port_for_bot(bot_id), "allowFrom": ["*"], } for key in ("host", "port", "allowFrom"): if key in existing_dashboard_cfg: dashboard_cfg[key] = existing_dashboard_cfg[key] dashboard_cfg["port"] = self._dashboard_port_for_bot(bot_id) channels_cfg["dashboard"] = dashboard_cfg for channel in channels_override: channel_type = str(channel.get("channel_type") or "").strip().lower() if not channel_type or channel_type == "dashboard": continue extra = channel.get("extra_config") if isinstance(channel.get("extra_config"), dict) else {} enabled = bool(channel.get("is_active", True)) external = str(channel.get("external_app_id") or "") secret = str(channel.get("app_secret") or "") if channel_type == "telegram": channels_cfg["telegram"] = { "enabled": enabled, "token": secret, "proxy": extra.get("proxy", ""), "replyToMessage": bool(extra.get("replyToMessage", False)), "allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])), } continue if channel_type == "feishu": channels_cfg["feishu"] = { "enabled": enabled, "appId": external, "appSecret": secret, "encryptKey": extra.get("encryptKey", ""), "verificationToken": extra.get("verificationToken", ""), "allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])), } continue if channel_type == "dingtalk": channels_cfg["dingtalk"] = { "enabled": enabled, "clientId": external, "clientSecret": secret, "allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])), } continue if channel_type == "slack": channels_cfg["slack"] = { "enabled": enabled, "mode": extra.get("mode", "socket"), "botToken": external, "appToken": secret, "replyInThread": bool(extra.get("replyInThread", True)), "groupPolicy": extra.get("groupPolicy", "mention"), "groupAllowFrom": extra.get("groupAllowFrom", []), "reactEmoji": extra.get("reactEmoji", "eyes"), } continue if channel_type == "qq": channels_cfg["qq"] = { "enabled": enabled, "appId": external, "secret": secret, "allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])), } continue if channel_type == "email": channels_cfg["email"] = { "enabled": enabled, "consentGranted": bool(extra.get("consentGranted", False)), "imapHost": extra.get("imapHost", ""), "imapPort": max(1, min(int(extra.get("imapPort", 993) or 993), 65535)), "imapUsername": extra.get("imapUsername", ""), "imapPassword": extra.get("imapPassword", ""), "imapMailbox": extra.get("imapMailbox", "INBOX"), "imapUseSsl": bool(extra.get("imapUseSsl", True)), "smtpHost": extra.get("smtpHost", ""), "smtpPort": max(1, min(int(extra.get("smtpPort", 587) or 587), 65535)), "smtpUsername": extra.get("smtpUsername", ""), "smtpPassword": extra.get("smtpPassword", ""), "smtpUseTls": bool(extra.get("smtpUseTls", True)), "smtpUseSsl": bool(extra.get("smtpUseSsl", False)), "fromAddress": extra.get("fromAddress", ""), "autoReplyEnabled": bool(extra.get("autoReplyEnabled", True)), "pollIntervalSeconds": max(5, int(extra.get("pollIntervalSeconds", 30) or 30)), "markSeen": bool(extra.get("markSeen", True)), "maxBodyChars": max(1, int(extra.get("maxBodyChars", 12000) or 12000)), "subjectPrefix": extra.get("subjectPrefix", "Re: "), "allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])), } continue channels_cfg[channel_type] = { "enabled": enabled, "appId": external, "appSecret": secret, **extra, } config_data: Dict[str, Any] = { "agents": { "defaults": { "workspace": workspace_dir, "model": model_name, "temperature": float(runtime.get("temperature") or 0.2), "topP": float(runtime.get("top_p") or 1.0), "maxTokens": int(runtime.get("max_tokens") or 8192), } }, "providers": {provider_name: provider_cfg}, "channels": channels_cfg, } if tools_cfg: config_data["tools"] = tools_cfg self._write_json(config_path, config_data) runtime_target = { "runtime_kind": str(runtime.get("runtime_kind") or "").strip().lower(), "transport_kind": str(runtime.get("transport_kind") or "").strip().lower(), "core_adapter": str(runtime.get("core_adapter") or "").strip().lower(), } if native_sandbox_mode != "inherit": runtime_target["native_sandbox_mode"] = native_sandbox_mode if workspace_root_override: runtime_target["workspace_root"] = workspace_root_override if any(runtime_target.values()): runtime_target_path = os.path.join(state_nanobot_dir, "runtime-target.json") self._write_json(runtime_target_path, runtime_target) bootstrap_files = { "AGENTS.md": str(runtime.get("agents_md") or DEFAULT_AGENTS_MD).strip() + "\n", "SOUL.md": str(runtime.get("soul_md") or runtime.get("system_prompt") or DEFAULT_SOUL_MD).strip() + "\n", "USER.md": str(runtime.get("user_md") or DEFAULT_USER_MD).strip() + "\n", "TOOLS.md": str(runtime.get("tools_md") or DEFAULT_TOOLS_MD).strip() + "\n", "IDENTITY.md": str(runtime.get("identity_md") or DEFAULT_IDENTITY_MD).strip() + "\n", } for filename, content in bootstrap_files.items(): file_path = os.path.join(workspace_dir, filename) with open(file_path, "w", encoding="utf-8") as fh: fh.write(content) return {"status": "ok"} @staticmethod def _normalize_allow_from(raw: Any) -> List[str]: rows: List[str] = [] if isinstance(raw, list): for item in raw: text = str(item or "").strip() if text and text not in rows: rows.append(text) if not rows: return ["*"] return rows @staticmethod def _dashboard_port_for_bot(bot_id: str) -> int: digest = hashlib.sha1(str(bot_id or "").strip().encode("utf-8")).hexdigest() return 19000 + (int(digest[:6], 16) % 2000) @staticmethod def _workspace_root_override(runtime_overrides: Dict[str, Any]) -> str: raw = str(runtime_overrides.get("workspace_root") or "").strip() if not raw: return "" return os.path.abspath(os.path.expanduser(raw)) @staticmethod def _normalize_native_sandbox_mode(raw_value: Any) -> str: text = str(raw_value or "").strip().lower() if text in {"workspace", "sandbox", "strict"}: return "workspace" if text in {"full_access", "full-access", "danger-full-access", "escape"}: return "full_access" return "inherit" def _bot_workspace_dir(self, bot_id: str, workspace_root_override: str) -> str: if not workspace_root_override: return os.path.abspath(os.path.join(self._host_data_root, str(bot_id or "").strip())) return os.path.abspath(os.path.join(workspace_root_override, str(bot_id or "").strip())) @staticmethod def _write_json(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) edge_provision_service: EdgeProvisionService | None = None