dashboard-nanobot/backend/core/config_manager.py

227 lines
9.2 KiB
Python

import json
import os
from typing import Any, Dict, List
from core.settings import (
DEFAULT_AGENTS_MD,
DEFAULT_IDENTITY_MD,
DEFAULT_SOUL_MD,
DEFAULT_TOOLS_MD,
DEFAULT_USER_MD,
)
class BotConfigManager:
def __init__(self, host_data_root: str):
self.host_data_root = host_data_root
def update_workspace(self, bot_id: str, bot_data: Dict[str, Any], channels: List[Dict[str, Any]]):
"""Generate/update nanobot workspace files and config.json."""
bot_dir = os.path.join(self.host_data_root, bot_id)
dot_nanobot_dir = os.path.join(bot_dir, ".nanobot")
workspace_dir = os.path.join(dot_nanobot_dir, "workspace")
memory_dir = os.path.join(workspace_dir, "memory")
skills_dir = os.path.join(workspace_dir, "skills")
for d in [dot_nanobot_dir, workspace_dir, memory_dir, skills_dir]:
os.makedirs(d, exist_ok=True)
provider_name = (bot_data.get("llm_provider") or "openrouter").strip().lower()
model_name = (bot_data.get("llm_model") or "openai/gpt-4o-mini").strip()
api_key = (bot_data.get("api_key") or "").strip()
api_base = (bot_data.get("api_base") or "").strip() or None
provider_alias = {
"aliyun": "dashscope",
"qwen": "dashscope",
"aliyun-qwen": "dashscope",
"moonshot": "kimi",
}
provider_name = provider_alias.get(provider_name, provider_name)
provider_cfg: Dict[str, Any] = {
"apiKey": api_key,
}
if api_base:
provider_cfg["apiBase"] = api_base
channels_cfg: Dict[str, Any] = {
"sendProgress": bool(bot_data.get("send_progress", False)),
"sendToolHints": bool(bot_data.get("send_tool_hints", False)),
}
existing_config: Dict[str, Any] = {}
config_path = os.path.join(dot_nanobot_dir, "config.json")
if os.path.isfile(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
loaded = json.load(f)
if isinstance(loaded, dict):
existing_config = loaded
except Exception:
existing_config = {}
existing_tools = existing_config.get("tools")
tools_cfg: Dict[str, Any] = dict(existing_tools) if isinstance(existing_tools, dict) else {}
if "mcp_servers" in bot_data:
mcp_servers = bot_data.get("mcp_servers")
if isinstance(mcp_servers, dict):
tools_cfg["mcpServers"] = mcp_servers
config_data: Dict[str, Any] = {
"agents": {
"defaults": {
"model": model_name,
"temperature": float(bot_data.get("temperature") or 0.2),
"topP": float(bot_data.get("top_p") or 1.0),
"maxTokens": int(bot_data.get("max_tokens") or 8192),
}
},
"providers": {
provider_name: provider_cfg,
},
"channels": channels_cfg,
}
if tools_cfg:
config_data["tools"] = tools_cfg
for channel in channels:
channel_type = (channel.get("channel_type") or "").strip()
if not channel_type:
continue
raw_extra = channel.get("extra_config")
extra: Dict[str, Any] = {}
if isinstance(raw_extra, str) and raw_extra.strip():
try:
parsed = json.loads(raw_extra)
if isinstance(parsed, dict):
extra = parsed
except Exception:
extra = {}
elif isinstance(raw_extra, dict):
extra = raw_extra
# Dashboard channel is deprecated in DB routing. Global flags now come from bot fields.
if channel_type == "dashboard":
continue
enabled = bool(channel.get("is_active", True))
external = channel.get("external_app_id", "") or ""
secret = channel.get("app_secret", "") or ""
if channel_type == "telegram":
channels_cfg["telegram"] = {
"enabled": enabled,
"token": secret,
"proxy": extra.get("proxy", ""),
"replyToMessage": bool(extra.get("replyToMessage", False)),
"allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])),
}
continue
if channel_type == "feishu":
channels_cfg["feishu"] = {
"enabled": enabled,
"appId": external,
"appSecret": secret,
"encryptKey": extra.get("encryptKey", ""),
"verificationToken": extra.get("verificationToken", ""),
"allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])),
}
continue
if channel_type == "dingtalk":
channels_cfg["dingtalk"] = {
"enabled": enabled,
"clientId": external,
"clientSecret": secret,
"allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])),
}
continue
if channel_type == "slack":
channels_cfg["slack"] = {
"enabled": enabled,
"mode": extra.get("mode", "socket"),
"botToken": external,
"appToken": secret,
"replyInThread": bool(extra.get("replyInThread", True)),
"groupPolicy": extra.get("groupPolicy", "mention"),
"groupAllowFrom": extra.get("groupAllowFrom", []),
"reactEmoji": extra.get("reactEmoji", "eyes"),
}
continue
if channel_type == "qq":
channels_cfg["qq"] = {
"enabled": enabled,
"appId": external,
"secret": secret,
"allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])),
}
continue
if channel_type == "email":
channels_cfg["email"] = {
"enabled": enabled,
"consentGranted": bool(extra.get("consentGranted", False)),
"imapHost": extra.get("imapHost", ""),
"imapPort": max(1, min(int(extra.get("imapPort", 993) or 993), 65535)),
"imapUsername": extra.get("imapUsername", ""),
"imapPassword": extra.get("imapPassword", ""),
"imapMailbox": extra.get("imapMailbox", "INBOX"),
"imapUseSsl": bool(extra.get("imapUseSsl", True)),
"smtpHost": extra.get("smtpHost", ""),
"smtpPort": max(1, min(int(extra.get("smtpPort", 587) or 587), 65535)),
"smtpUsername": extra.get("smtpUsername", ""),
"smtpPassword": extra.get("smtpPassword", ""),
"smtpUseTls": bool(extra.get("smtpUseTls", True)),
"smtpUseSsl": bool(extra.get("smtpUseSsl", False)),
"fromAddress": extra.get("fromAddress", ""),
"autoReplyEnabled": bool(extra.get("autoReplyEnabled", True)),
"pollIntervalSeconds": max(5, int(extra.get("pollIntervalSeconds", 30) or 30)),
"markSeen": bool(extra.get("markSeen", True)),
"maxBodyChars": max(1, int(extra.get("maxBodyChars", 12000) or 12000)),
"subjectPrefix": extra.get("subjectPrefix", "Re: "),
"allowFrom": self._normalize_allow_from(extra.get("allowFrom", [])),
}
continue
# Fallback for future custom channels.
channels_cfg[channel_type] = {
"enabled": enabled,
"appId": external,
"appSecret": secret,
**extra,
}
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config_data, f, indent=4, ensure_ascii=False)
bootstrap_files = {
"AGENTS.md": bot_data.get("agents_md") or DEFAULT_AGENTS_MD,
"SOUL.md": bot_data.get("soul_md") or bot_data.get("system_prompt") or DEFAULT_SOUL_MD,
"USER.md": bot_data.get("user_md") or DEFAULT_USER_MD,
"TOOLS.md": bot_data.get("tools_md") or DEFAULT_TOOLS_MD,
"IDENTITY.md": bot_data.get("identity_md") or DEFAULT_IDENTITY_MD,
}
for filename, content in bootstrap_files.items():
file_path = os.path.join(workspace_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(str(content).strip() + "\n")
return dot_nanobot_dir
@staticmethod
def _normalize_allow_from(raw: Any) -> List[str]:
rows: List[str] = []
if isinstance(raw, list):
for item in raw:
text = str(item or "").strip()
if text and text not in rows:
rows.append(text)
if not rows:
return ["*"]
return rows