#!/usr/bin/env python3 """ 迁移 static_data 中的 interstellar 数据到 star_systems 和 celestial_bodies 表 包含自动中文名翻译功能 """ import asyncio import sys from pathlib import Path # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from sqlalchemy import select, func, update from sqlalchemy.dialects.postgresql import insert from app.database import AsyncSessionLocal from app.models.db.static_data import StaticData from app.models.db.star_system import StarSystem from app.models.db.celestial_body import CelestialBody # 恒星名称中文翻译字典(常见恒星) STAR_NAME_ZH = { 'Proxima Cen': '比邻星', "Barnard's star": '巴纳德星', 'eps Eri': '天苑四', 'Lalande 21185': '莱兰21185', '61 Cyg A': '天鹅座61 A', '61 Cyg B': '天鹅座61 B', 'tau Cet': '天仓五', 'Kapteyn': '开普敦星', 'Lacaille 9352': '拉卡伊9352', 'Ross 128': '罗斯128', 'Wolf 359': '狼359', 'Sirius': '天狼星', 'Alpha Centauri': '南门二', 'TRAPPIST-1': 'TRAPPIST-1', 'Kepler-442': '开普勒-442', 'Kepler-452': '开普勒-452', 'Gliese 581': '格利泽581', 'Gliese 667C': '格利泽667C', 'HD 40307': 'HD 40307', } # 常见恒星系后缀翻译 SYSTEM_SUFFIX_ZH = { 'System': '系统', 'system': '系统', } def translate_star_name(english_name: str) -> str: """ 翻译恒星名称为中文 优先使用字典,否则保留英文名 """ # 直接匹配 if english_name in STAR_NAME_ZH: return STAR_NAME_ZH[english_name] # 移除常见后缀尝试匹配 base_name = english_name.replace(' A', '').replace(' B', '').replace(' C', '').strip() if base_name in STAR_NAME_ZH: suffix = english_name.replace(base_name, '').strip() return STAR_NAME_ZH[base_name] + suffix # Kepler/TRAPPIST 等编号星 if english_name.startswith('Kepler-'): return f'开普勒-{english_name.split("-")[1]}' if english_name.startswith('TRAPPIST-'): return f'TRAPPIST-{english_name.split("-")[1]}' if english_name.startswith('Gliese '): return f'格利泽{english_name.split(" ")[1]}' if english_name.startswith('GJ '): return f'GJ {english_name.split(" ")[1]}' if english_name.startswith('HD '): return f'HD {english_name.split(" ")[1]}' if english_name.startswith('HIP '): return f'HIP {english_name.split(" ")[1]}' # 默认返回英文名 return english_name def translate_system_name(english_name: str) -> str: """翻译恒星系名称""" if ' System' in english_name: star_name = english_name.replace(' System', '').strip() star_name_zh = translate_star_name(star_name) return f'{star_name_zh}系统' return translate_star_name(english_name) def translate_planet_name(english_name: str) -> str: """ 翻译系外行星名称 格式:恒星名 + 行星字母 """ # 分离恒星名和行星字母 parts = english_name.rsplit(' ', 1) if len(parts) == 2: star_name, planet_letter = parts star_name_zh = translate_star_name(star_name) return f'{star_name_zh} {planet_letter}' return english_name async def deduplicate_planets(planets: list) -> list: """ 去除重复的行星记录 保留字段最完整的记录 """ if not planets: return [] planet_map = {} for planet in planets: name = planet.get('name', '') if not name: continue if name not in planet_map: planet_map[name] = planet else: # 比较字段完整度 existing = planet_map[name] existing_fields = sum(1 for v in existing.values() if v is not None and v != '') current_fields = sum(1 for v in planet.values() if v is not None and v != '') if current_fields > existing_fields: planet_map[name] = planet return list(planet_map.values()) async def migrate_star_systems(): """迁移恒星系统数据""" async with AsyncSessionLocal() as session: print("=" * 60) print("开始迁移系外恒星系数据...") print("=" * 60) # 读取所有 interstellar 数据 result = await session.execute( select(StaticData) .where(StaticData.category == 'interstellar') .order_by(StaticData.name) ) interstellar_data = result.scalars().all() print(f"\n📊 共找到 {len(interstellar_data)} 个恒星系统") migrated_systems = 0 migrated_planets = 0 skipped_systems = 0 for star_data in interstellar_data: try: data = star_data.data star_name = star_data.name # 翻译中文名 star_name_zh = translate_star_name(star_name) system_name = f"{star_name} System" system_name_zh = translate_system_name(system_name) # 创建恒星系统记录 system = StarSystem( name=system_name, name_zh=system_name_zh, host_star_name=star_name, distance_pc=data.get('distance_pc'), distance_ly=data.get('distance_ly'), ra=data.get('ra'), dec=data.get('dec'), position_x=data.get('position', {}).get('x') if 'position' in data else None, position_y=data.get('position', {}).get('y') if 'position' in data else None, position_z=data.get('position', {}).get('z') if 'position' in data else None, spectral_type=data.get('spectral_type'), radius_solar=data.get('radius_solar'), mass_solar=data.get('mass_solar'), temperature_k=data.get('temperature_k'), magnitude=data.get('magnitude'), color=data.get('color', '#FFFFFF'), planet_count=0, # 将在迁移行星后更新 description=f"距离地球 {data.get('distance_ly', 0):.2f} 光年的恒星系统。" ) session.add(system) await session.flush() # 获取 system.id print(f"\n✅ 恒星系: {system_name} ({system_name_zh})") print(f" 距离: {data.get('distance_pc', 0):.2f} pc (~{data.get('distance_ly', 0):.2f} ly)") # 处理行星数据 planets = data.get('planets', []) if planets: # 去重 unique_planets = await deduplicate_planets(planets) print(f" 行星: {len(planets)} 条记录 → {len(unique_planets)} 颗独立行星(去重 {len(planets) - len(unique_planets)} 条)") # 迁移行星 for planet_data in unique_planets: planet_name = planet_data.get('name', '') if not planet_name: continue planet_name_zh = translate_planet_name(planet_name) # 创建系外行星记录 planet = CelestialBody( id=f"exo-{system.id}-{planet_name.replace(' ', '-')}", # 生成唯一ID name=planet_name, name_zh=planet_name_zh, type='planet', system_id=system.id, description=f"{system_name_zh}的系外行星。", extra_data={ 'semi_major_axis_au': planet_data.get('semi_major_axis_au'), 'period_days': planet_data.get('period_days'), 'eccentricity': planet_data.get('eccentricity'), 'radius_earth': planet_data.get('radius_earth'), 'mass_earth': planet_data.get('mass_earth'), 'temperature_k': planet_data.get('temperature_k'), } ) session.add(planet) migrated_planets += 1 print(f" • {planet_name} ({planet_name_zh})") # 更新恒星系的行星数量 system.planet_count = len(unique_planets) migrated_systems += 1 # 每100个系统提交一次 if migrated_systems % 100 == 0: await session.commit() print(f"\n💾 已提交 {migrated_systems} 个恒星系统...") except Exception as e: print(f"\n❌ 错误:迁移 {star_name} 失败 - {str(e)[:200]}") skipped_systems += 1 # 简单回滚,继续下一个 try: await session.rollback() except: pass continue # 最终提交 await session.commit() print("\n" + "=" * 60) print("迁移完成!") print("=" * 60) print(f"✅ 成功迁移恒星系: {migrated_systems}") print(f"✅ 成功迁移行星: {migrated_planets}") print(f"⚠️ 跳过的恒星系: {skipped_systems}") print(f"📊 平均每个恒星系: {migrated_planets / migrated_systems:.1f} 颗行星") async def update_solar_system_count(): """更新太阳系的天体数量""" async with AsyncSessionLocal() as session: result = await session.execute( select(func.count(CelestialBody.id)) .where(CelestialBody.system_id == 1) ) count = result.scalar() await session.execute( update(StarSystem) .where(StarSystem.id == 1) .values(planet_count=count - 1) # 减去太阳本身 ) await session.commit() print(f"\n✅ 更新太阳系天体数量: {count} (不含太阳: {count - 1})") async def verify_migration(): """验证迁移结果""" async with AsyncSessionLocal() as session: print("\n" + "=" * 60) print("验证迁移结果...") print("=" * 60) # 统计恒星系 result = await session.execute(select(func.count(StarSystem.id))) system_count = result.scalar() print(f"\n📊 恒星系统总数: {system_count}") # 统计各系统的行星数量 result = await session.execute( select(StarSystem.name, StarSystem.name_zh, StarSystem.planet_count) .order_by(StarSystem.planet_count.desc()) .limit(10) ) print("\n🏆 行星最多的恒星系(前10):") for name, name_zh, count in result: print(f" {name} ({name_zh}): {count} 颗行星") # 统计天体类型分布 result = await session.execute( select(CelestialBody.type, CelestialBody.system_id, func.count(CelestialBody.id)) .group_by(CelestialBody.type, CelestialBody.system_id) .order_by(CelestialBody.system_id, CelestialBody.type) ) print("\n📈 天体类型分布:") for type_, system_id, count in result: system_name = "太阳系" if system_id == 1 else f"系外恒星系" print(f" {system_name} - {type_}: {count}") async def main(): """主函数""" print("\n" + "=" * 60) print("Cosmo 系外恒星系数据迁移工具") print("=" * 60) try: # 执行迁移 await migrate_star_systems() # 更新太阳系统计 await update_solar_system_count() # 验证结果 await verify_migration() print("\n✅ 所有操作完成!") except Exception as e: print(f"\n❌ 迁移失败: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())