import json import os import re from pathlib import Path from typing import Final from urllib.parse import urlsplit, urlunsplit from dotenv import dotenv_values, load_dotenv BACKEND_ROOT: Final[Path] = Path(__file__).resolve().parents[1] PROJECT_ROOT: Final[Path] = BACKEND_ROOT.parent # Load env files used by this project. # Priority (high -> low): # 1) process environment # 2) project/.env.prod # 3) backend/.env # # We keep process-provided env untouched, while allowing .env.prod to override backend/.env. _process_env_keys = set(os.environ.keys()) _backend_env_values = dotenv_values(BACKEND_ROOT / ".env") _prod_env_values = dotenv_values(PROJECT_ROOT / ".env.prod") load_dotenv(BACKEND_ROOT / ".env", override=False) for _k, _v in _prod_env_values.items(): if _v is None: continue if _k in _process_env_keys: continue os.environ[_k] = str(_v) def _env_text(name: str, default: str) -> str: raw = os.getenv(name) if raw is None: return default return str(raw).replace("\\n", "\n") def _env_bool(name: str, default: bool) -> bool: raw = os.getenv(name) if raw is None: return default return str(raw).strip().lower() in {"1", "true", "yes", "on"} def _env_int(name: str, default: int, min_value: int, max_value: int) -> int: raw = os.getenv(name) if raw is None: return default try: value = int(str(raw).strip()) except Exception: value = default return max(min_value, min(max_value, value)) def _is_truthy(raw: object) -> bool: return str(raw or "").strip().lower() in {"1", "true", "yes", "on", "y"} def _normalize_extension(raw: str) -> str: text = str(raw or "").strip().lower() if not text: return "" if text.startswith("*."): text = text[1:] if not text.startswith("."): text = f".{text}" if not re.fullmatch(r"\.[a-z0-9][a-z0-9._+-]{0,31}", text): return "" return text def _env_extensions(name: str, default: tuple[str, ...]) -> tuple[str, ...]: raw = os.getenv(name) if raw is None: source = list(default) else: source = re.split(r"[,;\s]+", str(raw)) rows: list[str] = [] for item in source: ext = _normalize_extension(item) if ext and ext not in rows: rows.append(ext) if raw is None: return tuple(rows or list(default)) return tuple(rows) def _normalize_dir_path(path_value: str) -> str: raw = str(path_value or "").strip() if not raw: return raw raw = os.path.expandvars(os.path.expanduser(raw)) p = Path(raw) if p.is_absolute(): return str(p) return str((BACKEND_ROOT / p).resolve()) def _load_json_object(path: Path) -> dict[str, object]: try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): return data except Exception: pass return {} def _read_template_md(raw: object) -> str: if raw is None: return "" return str(raw).replace("\r\n", "\n").strip() DATA_ROOT: Final[str] = _normalize_dir_path(os.getenv("DATA_ROOT", str(PROJECT_ROOT / "data"))) BOTS_WORKSPACE_ROOT: Final[str] = _normalize_dir_path( os.getenv("BOTS_WORKSPACE_ROOT", str(PROJECT_ROOT / "workspace" / "bots")) ) def _normalize_database_url(url: str) -> str: raw = str(url or "").strip() prefix = "sqlite:///" if not raw.startswith(prefix): return raw path_part = raw[len(prefix) :] if not path_part or path_part.startswith("/"): return raw abs_path = (BACKEND_ROOT / path_part).resolve() return f"{prefix}{abs_path.as_posix()}" def _database_engine(url: str) -> str: raw = str(url or "").strip().lower() if raw.startswith("sqlite"): return "sqlite" if raw.startswith("postgresql"): return "postgresql" if raw.startswith("mysql"): return "mysql" if "+" in raw: return raw.split("+", 1)[0] if "://" in raw: return raw.split("://", 1)[0] return "unknown" def _mask_database_url(url: str) -> str: raw = str(url or "").strip() if not raw or raw.startswith("sqlite"): return raw try: parsed = urlsplit(raw) if parsed.password is None: return raw host = parsed.hostname or "" if parsed.port: host = f"{host}:{parsed.port}" auth = parsed.username or "" if auth: auth = f"{auth}:***@{host}" else: auth = host netloc = auth return urlunsplit((parsed.scheme, netloc, parsed.path, parsed.query, parsed.fragment)) except Exception: return raw _db_env = str(os.getenv("DATABASE_URL") or "").strip() DATABASE_URL: Final[str] = _normalize_database_url( _db_env if _db_env else f"sqlite:///{Path(DATA_ROOT) / 'nanobot_dashboard.db'}" ) DATABASE_ENGINE: Final[str] = _database_engine(DATABASE_URL) DATABASE_URL_DISPLAY: Final[str] = _mask_database_url(DATABASE_URL) DATABASE_ECHO: Final[bool] = _env_bool("DATABASE_ECHO", True) UPLOAD_MAX_MB: Final[int] = _env_int("UPLOAD_MAX_MB", 100, 1, 2048) WORKSPACE_DOWNLOAD_EXTENSIONS_DEFAULT: Final[tuple[str, ...]] = ( ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".xlsm", ".ppt", ".pptx", ".odt", ".ods", ".odp", ".wps", ) WORKSPACE_DOWNLOAD_EXTENSIONS: Final[tuple[str, ...]] = _env_extensions( "WORKSPACE_DOWNLOAD_EXTENSIONS", WORKSPACE_DOWNLOAD_EXTENSIONS_DEFAULT, ) STT_ENABLED: Final[bool] = _env_bool("STT_ENABLED", True) STT_MODEL: Final[str] = str(os.getenv("STT_MODEL") or "ggml-small-q8_0.bin").strip() _DEFAULT_STT_MODEL_DIR: Final[Path] = (Path(DATA_ROOT) / "model").resolve() _configured_stt_model_dir = _normalize_dir_path(os.getenv("STT_MODEL_DIR", str(_DEFAULT_STT_MODEL_DIR))) if _configured_stt_model_dir and not Path(_configured_stt_model_dir).exists() and _DEFAULT_STT_MODEL_DIR.exists(): STT_MODEL_DIR: Final[str] = str(_DEFAULT_STT_MODEL_DIR) else: STT_MODEL_DIR: Final[str] = _configured_stt_model_dir STT_DEVICE: Final[str] = str(os.getenv("STT_DEVICE") or "cpu").strip().lower() or "cpu" STT_MAX_AUDIO_SECONDS: Final[int] = _env_int("STT_MAX_AUDIO_SECONDS", 20, 5, 600) STT_DEFAULT_LANGUAGE: Final[str] = str(os.getenv("STT_DEFAULT_LANGUAGE") or "zh").strip().lower() or "zh" STT_FORCE_SIMPLIFIED: Final[bool] = _env_bool("STT_FORCE_SIMPLIFIED", True) STT_AUDIO_PREPROCESS: Final[bool] = _env_bool("STT_AUDIO_PREPROCESS", True) STT_AUDIO_FILTER: Final[str] = str( os.getenv("STT_AUDIO_FILTER") or "highpass=f=120,lowpass=f=7600,afftdn=nf=-20" ).strip() STT_INITIAL_PROMPT: Final[str] = str( os.getenv("STT_INITIAL_PROMPT") or "以下内容可能包含简体中文和英文术语。请优先输出简体中文,英文单词、缩写、品牌名和数字保持原文,不要翻译。" ).strip() REDIS_ENABLED: Final[bool] = _env_bool("REDIS_ENABLED", False) REDIS_URL: Final[str] = str(os.getenv("REDIS_URL") or "").strip() REDIS_PREFIX: Final[str] = str(os.getenv("REDIS_PREFIX") or "dashboard_nanobot").strip() or "dashboard_nanobot" REDIS_DEFAULT_TTL: Final[int] = _env_int("REDIS_DEFAULT_TTL", 60, 1, 86400) PANEL_ACCESS_PASSWORD: Final[str] = str(os.getenv("PANEL_ACCESS_PASSWORD") or "").strip() _topic_mcp_default = "http://host.docker.internal:8000/api/mcp/topic" _topic_mcp_from_env = str(os.getenv("TOPIC_MCP_INTERNAL_URL") or "").strip() _topic_mcp_from_backend_env = str(_backend_env_values.get("TOPIC_MCP_INTERNAL_URL") or "").strip() _dev_mode = _is_truthy(os.getenv("APP_RELOAD")) or str(os.getenv("APP_ENV") or "").strip().lower() in { "dev", "development", "local", } if _dev_mode and _topic_mcp_from_backend_env: TOPIC_MCP_INTERNAL_URL: Final[str] = _topic_mcp_from_backend_env else: TOPIC_MCP_INTERNAL_URL: Final[str] = _topic_mcp_from_env or _topic_mcp_default TEMPLATE_ROOT: Final[Path] = (BACKEND_ROOT / "templates").resolve() AGENT_MD_TEMPLATES_FILE: Final[Path] = TEMPLATE_ROOT / "agent_md_templates.json" TOPIC_PRESETS_TEMPLATES_FILE: Final[Path] = TEMPLATE_ROOT / "topic_presets.json" _agent_md_templates_raw = _load_json_object(AGENT_MD_TEMPLATES_FILE) DEFAULT_AGENTS_MD: Final[str] = _env_text( "DEFAULT_AGENTS_MD", _read_template_md(_agent_md_templates_raw.get("agents_md")), ).strip() DEFAULT_SOUL_MD: Final[str] = _env_text( "DEFAULT_SOUL_MD", _read_template_md(_agent_md_templates_raw.get("soul_md")), ).strip() DEFAULT_USER_MD: Final[str] = _env_text( "DEFAULT_USER_MD", _read_template_md(_agent_md_templates_raw.get("user_md")), ).strip() DEFAULT_TOOLS_MD: Final[str] = _env_text( "DEFAULT_TOOLS_MD", _read_template_md(_agent_md_templates_raw.get("tools_md")), ).strip() DEFAULT_IDENTITY_MD: Final[str] = _env_text( "DEFAULT_IDENTITY_MD", _read_template_md(_agent_md_templates_raw.get("identity_md")), ).strip() _topic_presets_raw = _load_json_object(TOPIC_PRESETS_TEMPLATES_FILE) _topic_presets_list = _topic_presets_raw.get("presets") TOPIC_PRESET_TEMPLATES: Final[list[dict[str, object]]] = [ dict(row) for row in (_topic_presets_list if isinstance(_topic_presets_list, list) else []) if isinstance(row, dict) ] def load_agent_md_templates() -> dict[str, str]: raw = _load_json_object(AGENT_MD_TEMPLATES_FILE) rows: dict[str, str] = {} for key in ("agents_md", "soul_md", "user_md", "tools_md", "identity_md"): rows[key] = _read_template_md(raw.get(key)) return rows def load_topic_presets_template() -> dict[str, object]: raw = _load_json_object(TOPIC_PRESETS_TEMPLATES_FILE) presets = raw.get("presets") if not isinstance(presets, list): return {"presets": []} return {"presets": [dict(row) for row in presets if isinstance(row, dict)]}