#!/usr/bin/env python3 """ 补全多恒星系统数据并启用恒星和行星 参考比邻星系统(Alpha Centauri)的数据结构 """ import asyncio import asyncpg import json import logging from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 数据库连接配置 DB_CONFIG = { "host": "localhost", "port": 5432, "user": "postgres", "password": "postgres", "database": "cosmo_db" } # 已知的多恒星系统数据(来自天文学资料) MULTI_STAR_SYSTEMS = { # Alpha Centauri System (比邻星系统) - system_id = 479 479: { "stars": [ { "id": "star-479-primary", "name": "Alpha Centauri A", "name_zh": "南门二A", "description": "该恒星系主序星,光谱类型: G2V, 表面温度: 5790K", "extra_data": { "spectral_type": "G2V", "mass_solar": 1.1, "radius_solar": 1.22, "temperature_k": 5790 } }, { "id": "star-479-secondary", "name": "Alpha Centauri B", "name_zh": "南门二B", "description": "与南门二A相互绕转的明亮双星,是该系统的主体。", "extra_data": { "spectral_type": "K1V", "mass_solar": 0.93, "radius_solar": 0.86, "temperature_k": 5260 } }, { "id": "star-479-tertiary", "name": "Proxima Centauri", "name_zh": "比邻星", "description": "一颗质量小、光度弱的红矮星,距离南门二A/B约0.2光年,围绕它们公转。", "extra_data": { "spectral_type": "M5.5V", "mass_solar": 0.12, "radius_solar": 0.14, "temperature_k": 2900 } } ] } } async def check_existing_data(conn): """检查现有数据""" logger.info("=== 检查现有数据 ===") # 检查恒星系统 rows = await conn.fetch(""" SELECT id, name, name_zh, host_star_name, planet_count FROM star_systems WHERE id IN (479, 2, 3, 4, 5) ORDER BY id """) print("\n恒星系统:") for row in rows: print(f" ID={row['id']}: {row['name_zh'] or row['name']} (主恒星: {row['host_star_name']}, 行星数: {row['planet_count']})") # 检查比邻星系统的天体 rows = await conn.fetch(""" SELECT id, name, name_zh, type, is_active FROM celestial_bodies WHERE system_id = 479 ORDER BY type, name """) print("\n比邻星系统(479)的天体:") for row in rows: print(f" {row['type']:15} | {row['name']:30} | Active: {row['is_active']}") async def add_missing_stars(conn): """添加缺失的恒星""" logger.info("\n=== 添加缺失的恒星 ===") for system_id, system_data in MULTI_STAR_SYSTEMS.items(): logger.info(f"\n处理恒星系统 ID={system_id}") for star in system_data["stars"]: # 检查是否已存在 existing = await conn.fetchrow( "SELECT id FROM celestial_bodies WHERE id = $1", star["id"] ) if existing: logger.info(f" ✓ 恒星已存在: {star['name']} ({star['id']})") else: # 插入新恒星 await conn.execute(""" INSERT INTO celestial_bodies (id, name, name_zh, type, system_id, description, is_active, extra_data, created_at, updated_at) VALUES ($1, $2, $3, 'star', $4, $5, TRUE, $6::jsonb, NOW(), NOW()) """, star["id"], star["name"], star["name_zh"], system_id, star["description"], json.dumps(star["extra_data"]) ) logger.info(f" ✅ 添加恒星: {star['name_zh']} ({star['id']})") logger.info("\n恒星数据补全完成!") async def activate_stars_and_planets(conn): """启用所有恒星和行星""" logger.info("\n=== 启用恒星和行星 ===") # 启用所有恒星(除了太阳系之外的其他系统) stars = await conn.fetch(""" UPDATE celestial_bodies SET is_active = TRUE, updated_at = NOW() WHERE type = 'star' AND system_id > 1 RETURNING id, name, name_zh """) logger.info(f"\n启用了 {len(stars)} 颗恒星:") for star in stars: logger.info(f" ✓ {star['name_zh'] or star['name']} ({star['id']})") # 启用所有行星(除了太阳系之外的其他系统) planets = await conn.fetch(""" UPDATE celestial_bodies SET is_active = TRUE, updated_at = NOW() WHERE type = 'planet' AND system_id > 1 RETURNING id, name, name_zh """) logger.info(f"\n启用了 {len(planets)} 颗行星:") for planet in planets: logger.info(f" ✓ {planet['name_zh'] or planet['name']} ({planet['id']})") logger.info("\n启用完成!") async def verify_results(conn): """验证结果""" logger.info("\n=== 验证结果 ===") # 统计各系统的天体数量 rows = await conn.fetch(""" SELECT s.id, s.name, s.name_zh, COUNT(CASE WHEN cb.type = 'star' THEN 1 END) as star_count, COUNT(CASE WHEN cb.type = 'planet' THEN 1 END) as planet_count, COUNT(CASE WHEN cb.is_active = TRUE THEN 1 END) as active_count FROM star_systems s LEFT JOIN celestial_bodies cb ON s.id = cb.system_id WHERE s.id IN (479, 2, 3, 4, 5, 6, 7, 8, 9, 10) GROUP BY s.id, s.name, s.name_zh ORDER BY s.id """) print("\n各恒星系统统计:") print(f"{'系统ID':<8} {'名称':<30} {'恒星数':<8} {'行星数':<8} {'启用数':<8}") print("-" * 80) for row in rows: print(f"{row['id']:<8} {(row['name_zh'] or row['name']):<30} {row['star_count']:<8} {row['planet_count']:<8} {row['active_count']:<8}") async def main(): """主函数""" print("=" * 80) print("多恒星系统数据补全和启用脚本") print("=" * 80) # 连接数据库 conn = await asyncpg.connect(**DB_CONFIG) try: # 1. 检查现有数据 await check_existing_data(conn) # 2. 添加缺失的恒星 await add_missing_stars(conn) # 3. 启用恒星和行星 await activate_stars_and_planets(conn) # 4. 验证结果 await verify_results(conn) print("\n" + "=" * 80) print("✅ 所有操作完成!") print("=" * 80) finally: await conn.close() if __name__ == "__main__": asyncio.run(main())