dashboard-nanobot/backend/core/database.py

462 lines
18 KiB
Python

from sqlalchemy import inspect, text
from sqlmodel import SQLModel, Session, create_engine
from core.settings import (
DATABASE_ECHO,
DATABASE_ENGINE,
DATABASE_MAX_OVERFLOW,
DATABASE_POOL_RECYCLE,
DATABASE_POOL_SIZE,
DATABASE_POOL_TIMEOUT,
DATABASE_URL,
)
# Ensure table models are registered in SQLModel metadata before create_all.
from models import bot as _bot_models # noqa: F401
from models import topic as _topic_models # noqa: F401
_engine_kwargs = {
"echo": DATABASE_ECHO,
}
if DATABASE_ENGINE == "sqlite":
_engine_kwargs["connect_args"] = {"check_same_thread": False}
else:
_engine_kwargs.update(
{
"pool_pre_ping": True,
"pool_size": DATABASE_POOL_SIZE,
"max_overflow": DATABASE_MAX_OVERFLOW,
"pool_timeout": DATABASE_POOL_TIMEOUT,
"pool_recycle": DATABASE_POOL_RECYCLE,
}
)
engine = create_engine(DATABASE_URL, **_engine_kwargs)
def _ensure_botinstance_columns() -> None:
if engine.dialect.name != "sqlite":
return
required_columns = {
"current_state": "TEXT DEFAULT 'IDLE'",
"last_action": "TEXT",
"image_tag": "TEXT DEFAULT 'nanobot-base:v0.1.4'",
"access_password": "TEXT DEFAULT ''",
}
with engine.connect() as conn:
existing_rows = conn.execute(text("PRAGMA table_info(botinstance)")).fetchall()
existing = {str(row[1]) for row in existing_rows}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE botinstance ADD COLUMN {col} {ddl}"))
conn.commit()
def _drop_legacy_botinstance_columns() -> None:
legacy_columns = [
"avatar_model",
"avatar_skin",
"system_prompt",
"soul_md",
"agents_md",
"user_md",
"tools_md",
"tools_config_json",
"identity_md",
"llm_provider",
"llm_model",
"api_key",
"api_base",
"temperature",
"top_p",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"send_progress",
"send_tool_hints",
"bot_env_json",
]
with engine.connect() as conn:
existing = {
str(col.get("name"))
for col in inspect(conn).get_columns("botinstance")
if col.get("name")
}
for col in legacy_columns:
if col not in existing:
continue
try:
if engine.dialect.name == "mysql":
conn.execute(text(f"ALTER TABLE botinstance DROP COLUMN `{col}`"))
elif engine.dialect.name == "sqlite":
conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN "{col}"'))
else:
conn.execute(text(f'ALTER TABLE botinstance DROP COLUMN IF EXISTS "{col}"'))
except Exception:
# Keep startup resilient on mixed/legacy database engines.
continue
conn.commit()
def _ensure_botmessage_columns() -> None:
if engine.dialect.name != "sqlite":
return
required_columns = {
"media_json": "TEXT",
"feedback": "TEXT",
"feedback_at": "DATETIME",
}
with engine.connect() as conn:
existing_rows = conn.execute(text("PRAGMA table_info(botmessage)")).fetchall()
existing = {str(row[1]) for row in existing_rows}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE botmessage ADD COLUMN {col} {ddl}"))
conn.commit()
def _drop_legacy_skill_tables() -> None:
"""Drop deprecated skill registry tables (moved to workspace filesystem mode)."""
with engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS botskillmapping"))
conn.execute(text("DROP TABLE IF EXISTS skillregistry"))
conn.commit()
def _ensure_topic_tables_sqlite() -> None:
if engine.dialect.name != "sqlite":
return
with engine.connect() as conn:
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS topic_bot_settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bot_id TEXT NOT NULL,
topic_enabled INTEGER NOT NULL DEFAULT 1,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(bot_id) REFERENCES botinstance(id)
)
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS topic_topic (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bot_id TEXT NOT NULL,
topic_key TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
is_active INTEGER NOT NULL DEFAULT 1,
is_default_fallback INTEGER NOT NULL DEFAULT 0,
routing_json TEXT NOT NULL DEFAULT '{}',
view_schema_json TEXT NOT NULL DEFAULT '{}',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(bot_id) REFERENCES botinstance(id)
)
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS topic_item (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bot_id TEXT NOT NULL,
topic_key TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
content TEXT NOT NULL DEFAULT '',
level TEXT NOT NULL DEFAULT 'info',
tags_json TEXT,
view_json TEXT,
source TEXT NOT NULL DEFAULT 'mcp',
dedupe_key TEXT,
is_read INTEGER NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(bot_id) REFERENCES botinstance(id)
)
"""
)
)
conn.execute(text("CREATE UNIQUE INDEX IF NOT EXISTS uq_topic_bot_settings_bot_id ON topic_bot_settings(bot_id)"))
conn.execute(text("CREATE UNIQUE INDEX IF NOT EXISTS uq_topic_topic_bot_topic_key ON topic_topic(bot_id, topic_key)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_topic_bot_id ON topic_topic(bot_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_topic_topic_key ON topic_topic(topic_key)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_topic_bot_fallback ON topic_topic(bot_id, is_default_fallback)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_bot_id ON topic_item(bot_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_topic_key ON topic_item(topic_key)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_level ON topic_item(level)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_source ON topic_item(source)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_is_read ON topic_item(is_read)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_created_at ON topic_item(created_at)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_bot_topic_created_at ON topic_item(bot_id, topic_key, created_at)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_topic_item_bot_dedupe ON topic_item(bot_id, dedupe_key)"))
conn.commit()
def _ensure_topic_columns() -> None:
dialect = engine.dialect.name
required_columns = {
"topic_bot_settings": {
"topic_enabled": {
"sqlite": "INTEGER NOT NULL DEFAULT 1",
"postgresql": "BOOLEAN NOT NULL DEFAULT TRUE",
"mysql": "BOOLEAN NOT NULL DEFAULT TRUE",
},
"created_at": {
"sqlite": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"postgresql": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"mysql": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
"updated_at": {
"sqlite": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"postgresql": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"mysql": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
},
"topic_topic": {
"name": {
"sqlite": "TEXT NOT NULL DEFAULT ''",
"postgresql": "TEXT NOT NULL DEFAULT ''",
"mysql": "VARCHAR(255) NOT NULL DEFAULT ''",
},
"description": {
"sqlite": "TEXT NOT NULL DEFAULT ''",
"postgresql": "TEXT NOT NULL DEFAULT ''",
"mysql": "LONGTEXT",
},
"is_active": {
"sqlite": "INTEGER NOT NULL DEFAULT 1",
"postgresql": "BOOLEAN NOT NULL DEFAULT TRUE",
"mysql": "BOOLEAN NOT NULL DEFAULT TRUE",
},
"is_default_fallback": {
"sqlite": "INTEGER NOT NULL DEFAULT 0",
"postgresql": "BOOLEAN NOT NULL DEFAULT FALSE",
"mysql": "BOOLEAN NOT NULL DEFAULT FALSE",
},
"routing_json": {
"sqlite": "TEXT NOT NULL DEFAULT '{}'",
"postgresql": "TEXT NOT NULL DEFAULT '{}'",
"mysql": "LONGTEXT",
},
"view_schema_json": {
"sqlite": "TEXT NOT NULL DEFAULT '{}'",
"postgresql": "TEXT NOT NULL DEFAULT '{}'",
"mysql": "LONGTEXT",
},
"created_at": {
"sqlite": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"postgresql": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"mysql": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
"updated_at": {
"sqlite": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"postgresql": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"mysql": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
},
"topic_item": {
"title": {
"sqlite": "TEXT NOT NULL DEFAULT ''",
"postgresql": "TEXT NOT NULL DEFAULT ''",
"mysql": "VARCHAR(2000) NOT NULL DEFAULT ''",
},
"level": {
"sqlite": "TEXT NOT NULL DEFAULT 'info'",
"postgresql": "TEXT NOT NULL DEFAULT 'info'",
"mysql": "VARCHAR(32) NOT NULL DEFAULT 'info'",
},
"tags_json": {
"sqlite": "TEXT",
"postgresql": "TEXT",
"mysql": "LONGTEXT",
},
"view_json": {
"sqlite": "TEXT",
"postgresql": "TEXT",
"mysql": "LONGTEXT",
},
"source": {
"sqlite": "TEXT NOT NULL DEFAULT 'mcp'",
"postgresql": "TEXT NOT NULL DEFAULT 'mcp'",
"mysql": "VARCHAR(64) NOT NULL DEFAULT 'mcp'",
},
"dedupe_key": {
"sqlite": "TEXT",
"postgresql": "TEXT",
"mysql": "VARCHAR(200)",
},
"is_read": {
"sqlite": "INTEGER NOT NULL DEFAULT 0",
"postgresql": "BOOLEAN NOT NULL DEFAULT FALSE",
"mysql": "BOOLEAN NOT NULL DEFAULT FALSE",
},
"created_at": {
"sqlite": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
"postgresql": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"mysql": "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
},
}
inspector = inspect(engine)
with engine.connect() as conn:
for table_name, cols in required_columns.items():
if not inspector.has_table(table_name):
continue
existing = {
str(row.get("name"))
for row in inspector.get_columns(table_name)
if row.get("name")
}
for col, ddl_map in cols.items():
if col in existing:
continue
ddl = ddl_map.get(dialect) or ddl_map.get("sqlite")
conn.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {col} {ddl}"))
conn.commit()
def _ensure_topic_indexes() -> None:
required_indexes = [
("uq_topic_bot_settings_bot_id", "topic_bot_settings", ["bot_id"], True),
("uq_topic_topic_bot_topic_key", "topic_topic", ["bot_id", "topic_key"], True),
("idx_topic_topic_bot_id", "topic_topic", ["bot_id"], False),
("idx_topic_topic_topic_key", "topic_topic", ["topic_key"], False),
("idx_topic_topic_bot_fallback", "topic_topic", ["bot_id", "is_default_fallback"], False),
("idx_topic_item_bot_id", "topic_item", ["bot_id"], False),
("idx_topic_item_topic_key", "topic_item", ["topic_key"], False),
("idx_topic_item_level", "topic_item", ["level"], False),
("idx_topic_item_source", "topic_item", ["source"], False),
("idx_topic_item_is_read", "topic_item", ["is_read"], False),
("idx_topic_item_created_at", "topic_item", ["created_at"], False),
("idx_topic_item_bot_topic_created_at", "topic_item", ["bot_id", "topic_key", "created_at"], False),
("idx_topic_item_bot_dedupe", "topic_item", ["bot_id", "dedupe_key"], False),
]
inspector = inspect(engine)
with engine.connect() as conn:
for name, table_name, columns, unique in required_indexes:
if not inspector.has_table(table_name):
continue
existing = {
str(item.get("name"))
for item in inspector.get_indexes(table_name)
if item.get("name")
}
existing.update(
str(item.get("name"))
for item in inspector.get_unique_constraints(table_name)
if item.get("name")
)
if name in existing:
continue
unique_sql = "UNIQUE " if unique else ""
cols_sql = ", ".join(columns)
conn.execute(text(f"CREATE {unique_sql}INDEX {name} ON {table_name} ({cols_sql})"))
conn.commit()
def _cleanup_legacy_default_topics() -> None:
"""
Remove legacy auto-created fallback topic rows from early topic-feed design.
Historical rows look like:
- topic_key = inbox
- name = Inbox
- description = Default topic for uncategorized items
- routing_json contains "Fallback topic"
"""
with engine.connect() as conn:
legacy_rows = conn.execute(
text(
"""
SELECT bot_id, topic_key
FROM topic_topic
WHERE lower(coalesce(topic_key, '')) = 'inbox'
AND lower(coalesce(name, '')) = 'inbox'
AND lower(coalesce(description, '')) = 'default topic for uncategorized items'
AND lower(coalesce(routing_json, '')) LIKE '%fallback topic%'
"""
)
).fetchall()
if not legacy_rows:
return
for row in legacy_rows:
bot_id = str(row[0] or "").strip()
topic_key = str(row[1] or "").strip().lower()
if not bot_id or not topic_key:
continue
conn.execute(
text(
"""
DELETE FROM topic_item
WHERE bot_id = :bot_id AND lower(coalesce(topic_key, '')) = :topic_key
"""
),
{"bot_id": bot_id, "topic_key": topic_key},
)
conn.execute(
text(
"""
DELETE FROM topic_topic
WHERE bot_id = :bot_id AND lower(coalesce(topic_key, '')) = :topic_key
"""
),
{"bot_id": bot_id, "topic_key": topic_key},
)
conn.commit()
def align_postgres_sequences() -> None:
if engine.dialect.name != "postgresql":
return
sequence_targets = [
("botmessage", "id"),
]
with engine.connect() as conn:
for table_name, column_name in sequence_targets:
seq_name = conn.execute(
text("SELECT pg_get_serial_sequence(:table_name, :column_name)"),
{"table_name": table_name, "column_name": column_name},
).scalar()
if not seq_name:
continue
max_id = conn.execute(
text(f'SELECT COALESCE(MAX("{column_name}"), 0) FROM "{table_name}"')
).scalar()
max_id = int(max_id or 0)
conn.execute(
text("SELECT setval(:seq_name, :next_value, :is_called)"),
{
"seq_name": seq_name,
"next_value": max_id if max_id > 0 else 1,
"is_called": max_id > 0,
},
)
conn.commit()
def init_database() -> None:
SQLModel.metadata.create_all(engine)
_drop_legacy_skill_tables()
_ensure_botinstance_columns()
_drop_legacy_botinstance_columns()
_ensure_botmessage_columns()
_ensure_topic_tables_sqlite()
_ensure_topic_columns()
_cleanup_legacy_default_topics()
align_postgres_sequences()
def get_session():
with Session(engine) as session:
yield session