import json import os from typing import Any, Dict, Optional from fastapi import HTTPException class EdgeStateStoreService: _STATE_FILE_MAP = { "config": ("config.json",), "env": ("env.json",), "resources": ("resources.json",), "cron": ("cron", "jobs.json"), } def __init__(self, *, host_data_root: str) -> None: self._host_data_root = os.path.abspath(os.path.expanduser(str(host_data_root or "").strip())) def read_state(self, *, bot_id: str, state_key: str, workspace_root: Optional[str] = None) -> Dict[str, Any]: normalized_bot_id = self._normalize_bot_id(bot_id) normalized_key = self._normalize_state_key(state_key) path = self._state_file_path(normalized_bot_id, normalized_key, workspace_root=workspace_root) payload = self._default_payload(normalized_key) if os.path.isfile(path): loaded = self._read_json(path) if isinstance(loaded, dict): payload = self._normalize_state_payload(normalized_key, loaded) return {"bot_id": normalized_bot_id, "state_key": normalized_key, "data": payload} def write_state( self, *, bot_id: str, state_key: str, data: Dict[str, Any], workspace_root: Optional[str] = None, ) -> Dict[str, Any]: normalized_bot_id = self._normalize_bot_id(bot_id) normalized_key = self._normalize_state_key(state_key) payload = self._normalize_state_payload(normalized_key, data if isinstance(data, dict) else {}) path = self._state_file_path(normalized_bot_id, normalized_key, workspace_root=workspace_root) self._write_json_atomic(path, payload) return {"bot_id": normalized_bot_id, "state_key": normalized_key, "data": payload} def _state_file_path(self, bot_id: str, state_key: str, *, workspace_root: Optional[str] = None) -> str: nanobot_root = self._nanobot_root(bot_id, workspace_root=workspace_root) relative = self._STATE_FILE_MAP[state_key] return os.path.join(nanobot_root, *relative) def _nanobot_root(self, bot_id: str, *, workspace_root: Optional[str] = None) -> str: configured_workspace_root = str(workspace_root or "").strip() if configured_workspace_root: normalized_root = os.path.abspath(os.path.expanduser(configured_workspace_root)) return os.path.abspath(os.path.join(normalized_root, bot_id, ".nanobot")) primary = os.path.abspath(os.path.join(self._host_data_root, bot_id, ".nanobot")) inferred_workspace_root = self._workspace_root_from_runtime_target(primary) if inferred_workspace_root: return os.path.abspath(os.path.join(inferred_workspace_root, bot_id, ".nanobot")) return primary @staticmethod def _workspace_root_from_runtime_target(primary_nanobot_root: str) -> str: path = os.path.join(primary_nanobot_root, "runtime-target.json") if not os.path.isfile(path): return "" try: with open(path, "r", encoding="utf-8") as fh: payload = json.load(fh) if not isinstance(payload, dict): return "" raw_root = str(payload.get("workspace_root") or "").strip() if not raw_root: return "" return os.path.abspath(os.path.expanduser(raw_root)) except Exception: return "" @classmethod def _normalize_state_key(cls, state_key: str) -> str: normalized = str(state_key or "").strip().lower() if normalized not in cls._STATE_FILE_MAP: raise HTTPException(status_code=400, detail=f"unsupported state key: {state_key}") return normalized @staticmethod def _normalize_bot_id(bot_id: str) -> str: normalized = str(bot_id or "").strip() if not normalized: raise HTTPException(status_code=400, detail="bot_id is required") return normalized @staticmethod def _default_payload(state_key: str) -> Dict[str, Any]: if state_key == "cron": return {"version": 1, "jobs": []} return {} @classmethod def _normalize_state_payload(cls, state_key: str, payload: Dict[str, Any]) -> Dict[str, Any]: if state_key == "cron": normalized = dict(payload if isinstance(payload, dict) else {}) jobs = normalized.get("jobs") if not isinstance(jobs, list): jobs = [] try: version = int(normalized.get("version", 1) or 1) except Exception: version = 1 return {"version": max(1, version), "jobs": jobs} return dict(payload if isinstance(payload, dict) else {}) @staticmethod def _read_json(path: str) -> Dict[str, Any]: try: with open(path, "r", encoding="utf-8") as fh: payload = json.load(fh) if isinstance(payload, dict): return payload except Exception: return {} return {} @staticmethod def _write_json_atomic(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) tmp = f"{path}.tmp" with open(tmp, "w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) os.replace(tmp, path) edge_state_store_service: EdgeStateStoreService | None = None