cosmo_backend/scripts/migrate_interstellar_data.py

343 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
迁移 static_data 中的 interstellar 数据到 star_systems 和 celestial_bodies 表
包含自动中文名翻译功能
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import select, func, update
from sqlalchemy.dialects.postgresql import insert
from app.database import AsyncSessionLocal
from app.models.db.static_data import StaticData
from app.models.db.star_system import StarSystem
from app.models.db.celestial_body import CelestialBody
# 恒星名称中文翻译字典(常见恒星)
STAR_NAME_ZH = {
'Proxima Cen': '比邻星',
"Barnard's star": '巴纳德星',
'eps Eri': '天苑四',
'Lalande 21185': '莱兰21185',
'61 Cyg A': '天鹅座61 A',
'61 Cyg B': '天鹅座61 B',
'tau Cet': '天仓五',
'Kapteyn': '开普敦星',
'Lacaille 9352': '拉卡伊9352',
'Ross 128': '罗斯128',
'Wolf 359': '狼359',
'Sirius': '天狼星',
'Alpha Centauri': '南门二',
'TRAPPIST-1': 'TRAPPIST-1',
'Kepler-442': '开普勒-442',
'Kepler-452': '开普勒-452',
'Gliese 581': '格利泽581',
'Gliese 667C': '格利泽667C',
'HD 40307': 'HD 40307',
}
# 常见恒星系后缀翻译
SYSTEM_SUFFIX_ZH = {
'System': '系统',
'system': '系统',
}
def translate_star_name(english_name: str) -> str:
"""
翻译恒星名称为中文
优先使用字典,否则保留英文名
"""
# 直接匹配
if english_name in STAR_NAME_ZH:
return STAR_NAME_ZH[english_name]
# 移除常见后缀尝试匹配
base_name = english_name.replace(' A', '').replace(' B', '').replace(' C', '').strip()
if base_name in STAR_NAME_ZH:
suffix = english_name.replace(base_name, '').strip()
return STAR_NAME_ZH[base_name] + suffix
# Kepler/TRAPPIST 等编号星
if english_name.startswith('Kepler-'):
return f'开普勒-{english_name.split("-")[1]}'
if english_name.startswith('TRAPPIST-'):
return f'TRAPPIST-{english_name.split("-")[1]}'
if english_name.startswith('Gliese '):
return f'格利泽{english_name.split(" ")[1]}'
if english_name.startswith('GJ '):
return f'GJ {english_name.split(" ")[1]}'
if english_name.startswith('HD '):
return f'HD {english_name.split(" ")[1]}'
if english_name.startswith('HIP '):
return f'HIP {english_name.split(" ")[1]}'
# 默认返回英文名
return english_name
def translate_system_name(english_name: str) -> str:
"""翻译恒星系名称"""
if ' System' in english_name:
star_name = english_name.replace(' System', '').strip()
star_name_zh = translate_star_name(star_name)
return f'{star_name_zh}系统'
return translate_star_name(english_name)
def translate_planet_name(english_name: str) -> str:
"""
翻译系外行星名称
格式:恒星名 + 行星字母
"""
# 分离恒星名和行星字母
parts = english_name.rsplit(' ', 1)
if len(parts) == 2:
star_name, planet_letter = parts
star_name_zh = translate_star_name(star_name)
return f'{star_name_zh} {planet_letter}'
return english_name
async def deduplicate_planets(planets: list) -> list:
"""
去除重复的行星记录
保留字段最完整的记录
"""
if not planets:
return []
planet_map = {}
for planet in planets:
name = planet.get('name', '')
if not name:
continue
if name not in planet_map:
planet_map[name] = planet
else:
# 比较字段完整度
existing = planet_map[name]
existing_fields = sum(1 for v in existing.values() if v is not None and v != '')
current_fields = sum(1 for v in planet.values() if v is not None and v != '')
if current_fields > existing_fields:
planet_map[name] = planet
return list(planet_map.values())
async def migrate_star_systems():
"""迁移恒星系统数据"""
async with AsyncSessionLocal() as session:
print("=" * 60)
print("开始迁移系外恒星系数据...")
print("=" * 60)
# 读取所有 interstellar 数据
result = await session.execute(
select(StaticData)
.where(StaticData.category == 'interstellar')
.order_by(StaticData.name)
)
interstellar_data = result.scalars().all()
print(f"\n📊 共找到 {len(interstellar_data)} 个恒星系统")
migrated_systems = 0
migrated_planets = 0
skipped_systems = 0
for star_data in interstellar_data:
try:
data = star_data.data
star_name = star_data.name
# 翻译中文名
star_name_zh = translate_star_name(star_name)
system_name = f"{star_name} System"
system_name_zh = translate_system_name(system_name)
# 创建恒星系统记录
system = StarSystem(
name=system_name,
name_zh=system_name_zh,
host_star_name=star_name,
distance_pc=data.get('distance_pc'),
distance_ly=data.get('distance_ly'),
ra=data.get('ra'),
dec=data.get('dec'),
position_x=data.get('position', {}).get('x') if 'position' in data else None,
position_y=data.get('position', {}).get('y') if 'position' in data else None,
position_z=data.get('position', {}).get('z') if 'position' in data else None,
spectral_type=data.get('spectral_type'),
radius_solar=data.get('radius_solar'),
mass_solar=data.get('mass_solar'),
temperature_k=data.get('temperature_k'),
magnitude=data.get('magnitude'),
color=data.get('color', '#FFFFFF'),
planet_count=0, # 将在迁移行星后更新
description=f"距离地球 {data.get('distance_ly', 0):.2f} 光年的恒星系统。"
)
session.add(system)
await session.flush() # 获取 system.id
print(f"\n✅ 恒星系: {system_name} ({system_name_zh})")
print(f" 距离: {data.get('distance_pc', 0):.2f} pc (~{data.get('distance_ly', 0):.2f} ly)")
# 处理行星数据
planets = data.get('planets', [])
if planets:
# 去重
unique_planets = await deduplicate_planets(planets)
print(f" 行星: {len(planets)} 条记录 → {len(unique_planets)} 颗独立行星(去重 {len(planets) - len(unique_planets)} 条)")
# 迁移行星
for planet_data in unique_planets:
planet_name = planet_data.get('name', '')
if not planet_name:
continue
planet_name_zh = translate_planet_name(planet_name)
# 创建系外行星记录
planet = CelestialBody(
id=f"exo-{system.id}-{planet_name.replace(' ', '-')}", # 生成唯一ID
name=planet_name,
name_zh=planet_name_zh,
type='planet',
system_id=system.id,
description=f"{system_name_zh}的系外行星。",
extra_data={
'semi_major_axis_au': planet_data.get('semi_major_axis_au'),
'period_days': planet_data.get('period_days'),
'eccentricity': planet_data.get('eccentricity'),
'radius_earth': planet_data.get('radius_earth'),
'mass_earth': planet_data.get('mass_earth'),
'temperature_k': planet_data.get('temperature_k'),
}
)
session.add(planet)
migrated_planets += 1
print(f"{planet_name} ({planet_name_zh})")
# 更新恒星系的行星数量
system.planet_count = len(unique_planets)
migrated_systems += 1
# 每100个系统提交一次
if migrated_systems % 100 == 0:
await session.commit()
print(f"\n💾 已提交 {migrated_systems} 个恒星系统...")
except Exception as e:
print(f"\n❌ 错误:迁移 {star_name} 失败 - {str(e)[:200]}")
skipped_systems += 1
# 简单回滚,继续下一个
try:
await session.rollback()
except:
pass
continue
# 最终提交
await session.commit()
print("\n" + "=" * 60)
print("迁移完成!")
print("=" * 60)
print(f"✅ 成功迁移恒星系: {migrated_systems}")
print(f"✅ 成功迁移行星: {migrated_planets}")
print(f"⚠️ 跳过的恒星系: {skipped_systems}")
print(f"📊 平均每个恒星系: {migrated_planets / migrated_systems:.1f} 颗行星")
async def update_solar_system_count():
"""更新太阳系的天体数量"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(func.count(CelestialBody.id))
.where(CelestialBody.system_id == 1)
)
count = result.scalar()
await session.execute(
update(StarSystem)
.where(StarSystem.id == 1)
.values(planet_count=count - 1) # 减去太阳本身
)
await session.commit()
print(f"\n✅ 更新太阳系天体数量: {count} (不含太阳: {count - 1})")
async def verify_migration():
"""验证迁移结果"""
async with AsyncSessionLocal() as session:
print("\n" + "=" * 60)
print("验证迁移结果...")
print("=" * 60)
# 统计恒星系
result = await session.execute(select(func.count(StarSystem.id)))
system_count = result.scalar()
print(f"\n📊 恒星系统总数: {system_count}")
# 统计各系统的行星数量
result = await session.execute(
select(StarSystem.name, StarSystem.name_zh, StarSystem.planet_count)
.order_by(StarSystem.planet_count.desc())
.limit(10)
)
print("\n🏆 行星最多的恒星系前10:")
for name, name_zh, count in result:
print(f" {name} ({name_zh}): {count} 颗行星")
# 统计天体类型分布
result = await session.execute(
select(CelestialBody.type, CelestialBody.system_id, func.count(CelestialBody.id))
.group_by(CelestialBody.type, CelestialBody.system_id)
.order_by(CelestialBody.system_id, CelestialBody.type)
)
print("\n📈 天体类型分布:")
for type_, system_id, count in result:
system_name = "太阳系" if system_id == 1 else f"系外恒星系"
print(f" {system_name} - {type_}: {count}")
async def main():
"""主函数"""
print("\n" + "=" * 60)
print("Cosmo 系外恒星系数据迁移工具")
print("=" * 60)
try:
# 执行迁移
await migrate_star_systems()
# 更新太阳系统计
await update_solar_system_count()
# 验证结果
await verify_migration()
print("\n✅ 所有操作完成!")
except Exception as e:
print(f"\n❌ 迁移失败: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())