#!/usr/bin/env python3 """ Data migration script Migrates existing data from code/JSON files to PostgreSQL database: 1. CELESTIAL_BODIES dict → celestial_bodies table 2. Frontend JSON files → static_data table Usage: python scripts/migrate_data.py [--force | --skip-existing] Options: --force Overwrite existing data without prompting --skip-existing Skip migration if data already exists """ import asyncio import sys from pathlib import Path import json import argparse sys.path.insert(0, str(Path(__file__).parent.parent)) from app.database import AsyncSessionLocal from app.models.celestial import CELESTIAL_BODIES from app.models.db import CelestialBody, StaticData from sqlalchemy import select import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def migrate_celestial_bodies(force: bool = False, skip_existing: bool = False): """Migrate CELESTIAL_BODIES dict to database""" logger.info("=" * 60) logger.info("Migrating celestial bodies...") logger.info("=" * 60) async with AsyncSessionLocal() as session: # Check if data already exists result = await session.execute(select(CelestialBody)) existing_count = len(result.scalars().all()) if existing_count > 0: logger.warning(f"Found {existing_count} existing celestial bodies in database") if skip_existing: logger.info("Skipping celestial bodies migration (--skip-existing)") return if not force: response = input("Do you want to overwrite? (yes/no): ") if response.lower() not in ['yes', 'y']: logger.info("Skipping celestial bodies migration") return else: logger.info("Overwriting existing data (--force)") # Delete existing data from sqlalchemy import text await session.execute(text("DELETE FROM celestial_bodies")) logger.info(f"Deleted {existing_count} existing records") # Insert new data count = 0 for body_id, info in CELESTIAL_BODIES.items(): body = CelestialBody( id=body_id, name=info["name"], name_zh=info.get("name_zh"), type=info["type"], description=info.get("description"), extra_data={ "launch_date": info.get("launch_date"), "status": info.get("status"), } if "launch_date" in info or "status" in info else None ) session.add(body) count += 1 await session.commit() logger.info(f"✓ Migrated {count} celestial bodies") async def migrate_static_data(force: bool = False, skip_existing: bool = False): """Migrate frontend JSON files to database""" logger.info("=" * 60) logger.info("Migrating static data from JSON files...") logger.info("=" * 60) # Define JSON files to migrate frontend_data_dir = Path(__file__).parent.parent.parent / "frontend" / "public" / "data" json_files = { "nearby-stars.json": "star", "constellations.json": "constellation", "galaxies.json": "galaxy", } async with AsyncSessionLocal() as session: for filename, category in json_files.items(): file_path = frontend_data_dir / filename if not file_path.exists(): logger.warning(f"File not found: {file_path}") continue # Load JSON data with open(file_path, 'r', encoding='utf-8') as f: data_list = json.load(f) # Check if category data already exists result = await session.execute( select(StaticData).where(StaticData.category == category) ) existing = result.scalars().all() if existing: logger.warning(f"Found {len(existing)} existing {category} records") if skip_existing: logger.info(f"Skipping {category} migration (--skip-existing)") continue if not force: response = input(f"Overwrite {category} data? (yes/no): ") if response.lower() not in ['yes', 'y']: logger.info(f"Skipping {category} migration") continue else: logger.info(f"Overwriting {category} data (--force)") # Delete existing for record in existing: await session.delete(record) # Insert new data count = 0 for item in data_list: static_item = StaticData( category=category, name=item.get("name", "Unknown"), name_zh=item.get("name_zh"), data=item ) session.add(static_item) count += 1 await session.commit() logger.info(f"✓ Migrated {count} {category} records") async def main(): """Run all migrations""" # Parse command line arguments parser = argparse.ArgumentParser(description='Migrate data to PostgreSQL database') group = parser.add_mutually_exclusive_group() group.add_argument('--force', action='store_true', help='Overwrite existing data without prompting') group.add_argument('--skip-existing', action='store_true', help='Skip migration if data already exists') args = parser.parse_args() logger.info("\n" + "=" * 60) logger.info("Cosmo Data Migration") logger.info("=" * 60 + "\n") try: # Migrate celestial bodies await migrate_celestial_bodies(force=args.force, skip_existing=args.skip_existing) # Migrate static data await migrate_static_data(force=args.force, skip_existing=args.skip_existing) logger.info("\n" + "=" * 60) logger.info("✓ Migration completed successfully!") logger.info("=" * 60) except Exception as e: logger.error(f"\n✗ Migration failed: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())