from sqlalchemy import inspect, text from sqlmodel import SQLModel, Session, create_engine from core.settings import DATABASE_ECHO, DATABASE_URL # Ensure table models are registered in SQLModel metadata before create_all. from models import bot as _bot_models # noqa: F401 engine = create_engine(DATABASE_URL, echo=DATABASE_ECHO) def _ensure_botinstance_columns() -> None: if engine.dialect.name != "sqlite": return required_columns = { "current_state": "TEXT DEFAULT 'IDLE'", "last_action": "TEXT", "image_tag": "TEXT DEFAULT 'nanobot-base:v0.1.4'", "access_password": "TEXT DEFAULT ''", } with engine.connect() as conn: existing_rows = conn.execute(text("PRAGMA table_info(botinstance)")).fetchall() existing = {str(row[1]) for row in existing_rows} for col, ddl in required_columns.items(): if col in existing: continue conn.execute(text(f"ALTER TABLE botinstance ADD COLUMN {col} {ddl}")) conn.commit() def _drop_legacy_botinstance_columns() -> None: legacy_columns = [ "avatar_model", "avatar_skin", "system_prompt", "soul_md", "agents_md", "user_md", "tools_md", "tools_config_json", "identity_md", "llm_provider", "llm_model", "api_key", "api_base", "temperature", "top_p", "max_tokens", "presence_penalty", "frequency_penalty", "send_progress", "send_tool_hints", "bot_env_json", ] with engine.connect() as conn: existing = { str(col.get("name")) for col in inspect(conn).get_columns("botinstance") if col.get("name") } for col in legacy_columns: if col not in existing: continue try: if engine.dialect.name == "mysql": conn.execute(text(f"ALTER TABLE botinstance DROP COLUMN `{col}`")) elif engine.dialect.name == "sqlite": conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN "{col}"')) else: conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN IF EXISTS "{col}"')) except Exception: # Keep startup resilient on mixed/legacy database engines. continue conn.commit() def _ensure_botmessage_columns() -> None: if engine.dialect.name != "sqlite": return required_columns = { "media_json": "TEXT", "feedback": "TEXT", "feedback_at": "DATETIME", } with engine.connect() as conn: existing_rows = conn.execute(text("PRAGMA table_info(botmessage)")).fetchall() existing = {str(row[1]) for row in existing_rows} for col, ddl in required_columns.items(): if col in existing: continue conn.execute(text(f"ALTER TABLE botmessage ADD COLUMN {col} {ddl}")) conn.commit() def _drop_legacy_skill_tables() -> None: """Drop deprecated skill registry tables (moved to workspace filesystem mode).""" with engine.connect() as conn: conn.execute(text("DROP TABLE IF EXISTS botskillmapping")) conn.execute(text("DROP TABLE IF EXISTS skillregistry")) conn.commit() def align_postgres_sequences() -> None: if engine.dialect.name != "postgresql": return sequence_targets = [ ("botmessage", "id"), ] with engine.connect() as conn: for table_name, column_name in sequence_targets: seq_name = conn.execute( text("SELECT pg_get_serial_sequence(:table_name, :column_name)"), {"table_name": table_name, "column_name": column_name}, ).scalar() if not seq_name: continue max_id = conn.execute( text(f'SELECT COALESCE(MAX("{column_name}"), 0) FROM "{table_name}"') ).scalar() max_id = int(max_id or 0) conn.execute( text("SELECT setval(:seq_name, :next_value, :is_called)"), { "seq_name": seq_name, "next_value": max_id if max_id > 0 else 1, "is_called": max_id > 0, }, ) conn.commit() def init_database() -> None: SQLModel.metadata.create_all(engine) _drop_legacy_skill_tables() _ensure_botinstance_columns() _drop_legacy_botinstance_columns() _ensure_botmessage_columns() align_postgres_sequences() def get_session(): with Session(engine) as session: yield session