dashboard-nanobot/backend/core/database.py

144 lines
4.6 KiB
Python

from sqlalchemy import inspect, text
from sqlmodel import SQLModel, Session, create_engine
from core.settings import DATABASE_ECHO, DATABASE_URL
# Ensure table models are registered in SQLModel metadata before create_all.
from models import bot as _bot_models # noqa: F401
engine = create_engine(DATABASE_URL, echo=DATABASE_ECHO)
def _ensure_botinstance_columns() -> None:
if engine.dialect.name != "sqlite":
return
required_columns = {
"current_state": "TEXT DEFAULT 'IDLE'",
"last_action": "TEXT",
"image_tag": "TEXT DEFAULT 'nanobot-base:v0.1.4'",
"access_password": "TEXT DEFAULT ''",
}
with engine.connect() as conn:
existing_rows = conn.execute(text("PRAGMA table_info(botinstance)")).fetchall()
existing = {str(row[1]) for row in existing_rows}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE botinstance ADD COLUMN {col} {ddl}"))
conn.commit()
def _drop_legacy_botinstance_columns() -> None:
legacy_columns = [
"avatar_model",
"avatar_skin",
"system_prompt",
"soul_md",
"agents_md",
"user_md",
"tools_md",
"tools_config_json",
"identity_md",
"llm_provider",
"llm_model",
"api_key",
"api_base",
"temperature",
"top_p",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"send_progress",
"send_tool_hints",
"bot_env_json",
]
with engine.connect() as conn:
existing = {
str(col.get("name"))
for col in inspect(conn).get_columns("botinstance")
if col.get("name")
}
for col in legacy_columns:
if col not in existing:
continue
try:
if engine.dialect.name == "mysql":
conn.execute(text(f"ALTER TABLE botinstance DROP COLUMN `{col}`"))
elif engine.dialect.name == "sqlite":
conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN "{col}"'))
else:
conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN IF EXISTS "{col}"'))
except Exception:
# Keep startup resilient on mixed/legacy database engines.
continue
conn.commit()
def _ensure_botmessage_columns() -> None:
if engine.dialect.name != "sqlite":
return
required_columns = {
"media_json": "TEXT",
"feedback": "TEXT",
"feedback_at": "DATETIME",
}
with engine.connect() as conn:
existing_rows = conn.execute(text("PRAGMA table_info(botmessage)")).fetchall()
existing = {str(row[1]) for row in existing_rows}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE botmessage ADD COLUMN {col} {ddl}"))
conn.commit()
def _drop_legacy_skill_tables() -> None:
"""Drop deprecated skill registry tables (moved to workspace filesystem mode)."""
with engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS botskillmapping"))
conn.execute(text("DROP TABLE IF EXISTS skillregistry"))
conn.commit()
def align_postgres_sequences() -> None:
if engine.dialect.name != "postgresql":
return
sequence_targets = [
("botmessage", "id"),
]
with engine.connect() as conn:
for table_name, column_name in sequence_targets:
seq_name = conn.execute(
text("SELECT pg_get_serial_sequence(:table_name, :column_name)"),
{"table_name": table_name, "column_name": column_name},
).scalar()
if not seq_name:
continue
max_id = conn.execute(
text(f'SELECT COALESCE(MAX("{column_name}"), 0) FROM "{table_name}"')
).scalar()
max_id = int(max_id or 0)
conn.execute(
text("SELECT setval(:seq_name, :next_value, :is_called)"),
{
"seq_name": seq_name,
"next_value": max_id if max_id > 0 else 1,
"is_called": max_id > 0,
},
)
conn.commit()
def init_database() -> None:
SQLModel.metadata.create_all(engine)
_drop_legacy_skill_tables()
_ensure_botinstance_columns()
_drop_legacy_botinstance_columns()
_ensure_botmessage_columns()
align_postgres_sequences()
def get_session():
with Session(engine) as session:
yield session