cosmo/backend/scripts/migrate_data.py

185 lines
6.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Data migration script
Migrates existing data from code/JSON files to PostgreSQL database:
1. CELESTIAL_BODIES dict → celestial_bodies table
2. Frontend JSON files → static_data table
Usage:
python scripts/migrate_data.py [--force | --skip-existing]
Options:
--force Overwrite existing data without prompting
--skip-existing Skip migration if data already exists
"""
import asyncio
import sys
from pathlib import Path
import json
import argparse
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.database import AsyncSessionLocal
from app.models.celestial import CELESTIAL_BODIES
from app.models.db import CelestialBody, StaticData
from sqlalchemy import select
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def migrate_celestial_bodies(force: bool = False, skip_existing: bool = False):
"""Migrate CELESTIAL_BODIES dict to database"""
logger.info("=" * 60)
logger.info("Migrating celestial bodies...")
logger.info("=" * 60)
async with AsyncSessionLocal() as session:
# Check if data already exists
result = await session.execute(select(CelestialBody))
existing_count = len(result.scalars().all())
if existing_count > 0:
logger.warning(f"Found {existing_count} existing celestial bodies in database")
if skip_existing:
logger.info("Skipping celestial bodies migration (--skip-existing)")
return
if not force:
response = input("Do you want to overwrite? (yes/no): ")
if response.lower() not in ['yes', 'y']:
logger.info("Skipping celestial bodies migration")
return
else:
logger.info("Overwriting existing data (--force)")
# Delete existing data
from sqlalchemy import text
await session.execute(text("DELETE FROM celestial_bodies"))
logger.info(f"Deleted {existing_count} existing records")
# Insert new data
count = 0
for body_id, info in CELESTIAL_BODIES.items():
body = CelestialBody(
id=body_id,
name=info["name"],
name_zh=info.get("name_zh"),
type=info["type"],
description=info.get("description"),
extra_data={
"launch_date": info.get("launch_date"),
"status": info.get("status"),
} if "launch_date" in info or "status" in info else None
)
session.add(body)
count += 1
await session.commit()
logger.info(f"✓ Migrated {count} celestial bodies")
async def migrate_static_data(force: bool = False, skip_existing: bool = False):
"""Migrate frontend JSON files to database"""
logger.info("=" * 60)
logger.info("Migrating static data from JSON files...")
logger.info("=" * 60)
# Define JSON files to migrate
frontend_data_dir = Path(__file__).parent.parent.parent / "frontend" / "public" / "data"
json_files = {
"nearby-stars.json": "star",
"constellations.json": "constellation",
"galaxies.json": "galaxy",
}
async with AsyncSessionLocal() as session:
for filename, category in json_files.items():
file_path = frontend_data_dir / filename
if not file_path.exists():
logger.warning(f"File not found: {file_path}")
continue
# Load JSON data
with open(file_path, 'r', encoding='utf-8') as f:
data_list = json.load(f)
# Check if category data already exists
result = await session.execute(
select(StaticData).where(StaticData.category == category)
)
existing = result.scalars().all()
if existing:
logger.warning(f"Found {len(existing)} existing {category} records")
if skip_existing:
logger.info(f"Skipping {category} migration (--skip-existing)")
continue
if not force:
response = input(f"Overwrite {category} data? (yes/no): ")
if response.lower() not in ['yes', 'y']:
logger.info(f"Skipping {category} migration")
continue
else:
logger.info(f"Overwriting {category} data (--force)")
# Delete existing
for record in existing:
await session.delete(record)
# Insert new data
count = 0
for item in data_list:
static_item = StaticData(
category=category,
name=item.get("name", "Unknown"),
name_zh=item.get("name_zh"),
data=item
)
session.add(static_item)
count += 1
await session.commit()
logger.info(f"✓ Migrated {count} {category} records")
async def main():
"""Run all migrations"""
# Parse command line arguments
parser = argparse.ArgumentParser(description='Migrate data to PostgreSQL database')
group = parser.add_mutually_exclusive_group()
group.add_argument('--force', action='store_true', help='Overwrite existing data without prompting')
group.add_argument('--skip-existing', action='store_true', help='Skip migration if data already exists')
args = parser.parse_args()
logger.info("\n" + "=" * 60)
logger.info("Cosmo Data Migration")
logger.info("=" * 60 + "\n")
try:
# Migrate celestial bodies
await migrate_celestial_bodies(force=args.force, skip_existing=args.skip_existing)
# Migrate static data
await migrate_static_data(force=args.force, skip_existing=args.skip_existing)
logger.info("\n" + "=" * 60)
logger.info("✓ Migration completed successfully!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"\n✗ Migration failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())