cosmo/backend/app/jobs/predefined.py

633 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
Predefined Scheduled Tasks
All registered tasks for scheduled execution
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert
from app.jobs.registry import task_registry
from app.models.db.celestial_body import CelestialBody
from app.models.db.position import Position
from app.models.db.celestial_event import CelestialEvent
from app.services.horizons import HorizonsService
from app.services.nasa_sbdb_service import nasa_sbdb_service
from app.services.event_service import event_service
from app.services.planetary_events_service import planetary_events_service
logger = logging.getLogger(__name__)
@task_registry.register(
name="sync_solar_system_positions",
description="同步太阳系天体位置数据从NASA Horizons API获取指定天体的位置数据并保存到数据库",
category="data_sync",
parameters=[
{
"name": "body_ids",
"type": "array",
"description": "要同步的天体ID列表例如['10', '199', '299']。如果不指定,则同步所有活跃的太阳系天体",
"required": False,
"default": None
},
{
"name": "days",
"type": "integer",
"description": "同步天数,从今天开始向未来延伸的天数",
"required": False,
"default": 7
},
{
"name": "source",
"type": "string",
"description": "数据源标记,用于标识数据来源",
"required": False,
"default": "nasa_horizons_cron"
}
]
)
async def sync_solar_system_positions(
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Sync solar system body positions from NASA Horizons
Args:
db: Database session
logger: Logger instance
params: Task parameters
- body_ids: List of body IDs to sync (optional, defaults to all active)
- days: Number of days to sync (default: 7)
- source: Source tag for the data (default: "nasa_horizons_cron")
Returns:
Summary of sync operation
"""
# Parse parameters with type conversion (params come from JSON, may be strings)
body_ids = params.get("body_ids")
days = int(params.get("days", 7))
source = str(params.get("source", "nasa_horizons_cron"))
logger.info(f"Starting solar system position sync: days={days}, source={source}")
# Get list of bodies to sync
if body_ids:
# Use specified body IDs
result = await db.execute(
select(CelestialBody).where(
CelestialBody.id.in_(body_ids),
CelestialBody.is_active == True
)
)
bodies = result.scalars().all()
logger.info(f"Syncing {len(bodies)} specified bodies")
else:
# Get all active solar system bodies
# Typically solar system bodies include planets, dwarf planets, major satellites, comets, and probes
result = await db.execute(
select(CelestialBody).where(
CelestialBody.is_active == True,
CelestialBody.system_id == 1,
CelestialBody.type.in_([
'planet', 'dwarf_planet', 'satellite',
'comet', 'probe', 'asteroid','star'
])
)
)
bodies = result.scalars().all()
logger.info(f"Syncing all {len(bodies)} active solar system bodies")
if not bodies:
logger.warning("No bodies found to sync")
return {
"success": True,
"bodies_synced": 0,
"total_positions": 0,
"message": "No bodies found"
}
# Initialize services
horizons = HorizonsService()
# Sync positions for each body
total_positions = 0
synced_bodies = []
failed_bodies = []
start_time = datetime.utcnow()
end_time = start_time + timedelta(days=days)
for body in bodies:
# Use savepoint for this body's operations
async with db.begin_nested(): # Creates a SAVEPOINT
try:
logger.debug(f"Fetching positions for {body.name} ({body.id})")
# Fetch positions from NASA Horizons
positions = await horizons.get_body_positions(
body_id=body.id,
start_time=start_time,
end_time=end_time,
step="1d" # Daily positions
)
# Save positions to database (upsert logic)
count = 0
for pos in positions:
# Use PostgreSQL's INSERT ... ON CONFLICT to handle duplicates
stmt = insert(Position).values(
body_id=body.id,
time=pos.time,
x=pos.x,
y=pos.y,
z=pos.z,
vx=getattr(pos, 'vx', None),
vy=getattr(pos, 'vy', None),
vz=getattr(pos, 'vz', None),
source=source
)
# On conflict (body_id, time), update the existing record
stmt = stmt.on_conflict_do_update(
index_elements=['body_id', 'time'],
set_={
'x': pos.x,
'y': pos.y,
'z': pos.z,
'vx': getattr(pos, 'vx', None),
'vy': getattr(pos, 'vy', None),
'vz': getattr(pos, 'vz', None),
'source': source
}
)
await db.execute(stmt)
count += 1
# Savepoint will auto-commit if no exception
total_positions += count
synced_bodies.append(body.name)
logger.debug(f"Saved {count} positions for {body.name}")
except Exception as e:
# Savepoint will auto-rollback on exception
logger.error(f"Failed to sync {body.name}: {str(e)}")
failed_bodies.append({"body": body.name, "error": str(e)})
# Continue to next body
# Summary
result = {
"success": len(failed_bodies) == 0,
"bodies_synced": len(synced_bodies),
"total_positions": total_positions,
"synced_bodies": synced_bodies,
"failed_bodies": failed_bodies,
"time_range": f"{start_time.date()} to {end_time.date()}",
"source": source
}
logger.info(f"Sync completed: {len(synced_bodies)} bodies, {total_positions} positions")
return result
@task_registry.register(
name="fetch_close_approach_events",
description="从NASA SBDB获取小行星/彗星近距离飞掠事件,并保存到数据库",
category="data_sync",
parameters=[
{
"name": "body_ids",
"type": "array",
"description": "要查询的天体ID列表例如['399', '499']表示地球和火星。如果不指定,默认只查询地球(399)",
"required": False,
"default": None
},
{
"name": "days_ahead",
"type": "integer",
"description": "向未来查询的天数例如30表示查询未来30天内的事件",
"required": False,
"default": 30
},
{
"name": "dist_max",
"type": "string",
"description": "最大距离AU例如'30'表示30天文单位内的飞掠",
"required": False,
"default": "30"
},
{
"name": "limit",
"type": "integer",
"description": "每个天体最大返回事件数量",
"required": False,
"default": 100
},
{
"name": "clean_old_events",
"type": "boolean",
"description": "是否清理已过期的旧事件",
"required": False,
"default": True
}
]
)
async def fetch_close_approach_events(
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Fetch close approach events from NASA SBDB and save to database
This task queries the NASA Small-Body Database (SBDB) for upcoming
close approach events (asteroid/comet flybys) and stores them in
the celestial_events table.
Note: Uses tomorrow's date as the query start date to avoid fetching
events that have already occurred today.
Args:
db: Database session
logger: Logger instance
params: Task parameters
- body_ids: List of body IDs to query (default: ['399'] for Earth)
- days_ahead: Number of days to query ahead from tomorrow (default: 30)
- dist_max: Maximum approach distance in AU (default: '30')
- limit: Maximum number of events per body (default: 100)
- clean_old_events: Clean old events before inserting (default: True)
Returns:
Summary of fetch operation
"""
# Parse parameters with type conversion (params come from JSON, may be strings)
body_ids = params.get("body_ids") or ["399"] # Default to Earth
days_ahead = int(params.get("days_ahead", 30))
dist_max = str(params.get("dist_max", "30")) # Keep as string for API
limit = int(params.get("limit", 100))
clean_old_events = bool(params.get("clean_old_events", True))
logger.info(f"Fetching close approach events: body_ids={body_ids}, days={days_ahead}, dist_max={dist_max}AU")
# Calculate date range - use tomorrow as start date to avoid past events
tomorrow = datetime.utcnow() + timedelta(days=1)
date_min = tomorrow.strftime("%Y-%m-%d")
date_max = (tomorrow + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
# Statistics
total_events_fetched = 0
total_events_saved = 0
total_events_failed = 0
body_results = []
# Process each body
for body_id in body_ids:
try:
# Query celestial_bodies table to find the target body
body_result = await db.execute(
select(CelestialBody).where(CelestialBody.id == body_id)
)
target_body = body_result.scalar_one_or_none()
if not target_body:
logger.warning(f"Body '{body_id}' not found in celestial_bodies table, skipping")
body_results.append({
"body_id": body_id,
"success": False,
"error": "Body not found in database"
})
continue
target_body_id = target_body.id
approach_body_name = target_body.name
# Use short_name from database if available (for NASA SBDB API)
# NASA SBDB API uses abbreviated names for planets (e.g., Juptr for Jupiter)
api_body_name = target_body.short_name if target_body.short_name else approach_body_name
logger.info(f"Processing events for: {target_body.name} (ID: {target_body_id}, API name: {api_body_name})")
# Clean old events if requested
if clean_old_events:
try:
cutoff_date = datetime.utcnow()
deleted_count = await event_service.delete_events_for_body_before(
body_id=target_body_id,
before_time=cutoff_date,
db=db
)
logger.info(f"Cleaned {deleted_count} old events for {target_body.name}")
except Exception as e:
logger.warning(f"Failed to clean old events for {target_body.name}: {e}")
# Fetch events from NASA SBDB
sbdb_events = await nasa_sbdb_service.get_close_approaches(
date_min=date_min,
date_max=date_max,
dist_max=dist_max,
body=api_body_name, # Use mapped API name
limit=limit,
fullname=True
)
logger.info(f"Retrieved {len(sbdb_events)} events from NASA SBDB for {target_body.name}")
total_events_fetched += len(sbdb_events)
if not sbdb_events:
body_results.append({
"body_id": target_body_id,
"body_name": target_body.name,
"events_saved": 0,
"message": "No events found"
})
continue
# Parse and save events
saved_count = 0
failed_count = 0
for sbdb_event in sbdb_events:
try:
# Parse SBDB event to CelestialEvent format
parsed_event = nasa_sbdb_service.parse_event_to_celestial_event(
sbdb_event,
approach_body=approach_body_name
)
if not parsed_event:
logger.warning(f"Failed to parse SBDB event: {sbdb_event.get('des', 'Unknown')}")
failed_count += 1
continue
# Create event data
event_data = {
"body_id": target_body_id,
"title": parsed_event["title"],
"event_type": parsed_event["event_type"],
"event_time": parsed_event["event_time"],
"description": parsed_event["description"],
"details": parsed_event["details"],
"source": parsed_event["source"]
}
event = CelestialEvent(**event_data)
db.add(event)
await db.flush()
saved_count += 1
logger.debug(f"Saved event: {event.title}")
except Exception as e:
logger.error(f"Failed to save event {sbdb_event.get('des', 'Unknown')}: {e}")
failed_count += 1
# Commit events for this body
await db.commit()
total_events_saved += saved_count
total_events_failed += failed_count
body_results.append({
"body_id": target_body_id,
"body_name": target_body.name,
"events_fetched": len(sbdb_events),
"events_saved": saved_count,
"events_failed": failed_count
})
logger.info(f"Saved {saved_count}/{len(sbdb_events)} events for {target_body.name}")
except Exception as e:
logger.error(f"Error processing body {body_id}: {e}")
body_results.append({
"body_id": body_id,
"success": False,
"error": str(e)
})
# Summary
result = {
"success": True,
"total_bodies_processed": len(body_ids),
"total_events_fetched": total_events_fetched,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed,
"date_range": f"{date_min} to {date_max}",
"dist_max_au": dist_max,
"body_results": body_results
}
logger.info(f"Task completed: {total_events_saved} events saved for {len(body_ids)} bodies")
return result
@task_registry.register(
name="calculate_planetary_events",
description="计算太阳系主要天体的合、冲等事件使用Skyfield进行天文计算",
category="data_sync",
parameters=[
{
"name": "body_ids",
"type": "array",
"description": "要计算事件的天体ID列表例如['199', '299', '499']。如果不指定,则计算所有主要行星(水星到海王星)",
"required": False,
"default": None
},
{
"name": "days_ahead",
"type": "integer",
"description": "向未来计算的天数",
"required": False,
"default": 365
},
{
"name": "calculate_close_approaches",
"type": "boolean",
"description": "是否同时计算行星之间的近距离接近事件",
"required": False,
"default": False
},
{
"name": "threshold_degrees",
"type": "number",
"description": "近距离接近的角度阈值仅当calculate_close_approaches为true时有效",
"required": False,
"default": 5.0
},
{
"name": "clean_old_events",
"type": "boolean",
"description": "是否清理已过期的旧事件",
"required": False,
"default": True
}
]
)
async def calculate_planetary_events(
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Calculate planetary events (conjunctions, oppositions) using Skyfield
This task uses the Skyfield library to calculate astronomical events
for major solar system bodies, including conjunctions (合) and oppositions (冲).
Args:
db: Database session
logger: Logger instance
params: Task parameters
- body_ids: List of body IDs to calculate (default: all major planets)
- days_ahead: Number of days to calculate ahead (default: 365)
- calculate_close_approaches: Also calculate planet-planet close approaches (default: False)
- threshold_degrees: Angle threshold for close approaches (default: 5.0)
- clean_old_events: Clean old events before calculating (default: True)
Returns:
Summary of calculation operation
"""
# Parse parameters with type conversion (params come from JSON, may be strings)
body_ids = params.get("body_ids")
days_ahead = int(params.get("days_ahead", 365))
calculate_close_approaches = bool(params.get("calculate_close_approaches", False))
threshold_degrees = float(params.get("threshold_degrees", 5.0))
clean_old_events = bool(params.get("clean_old_events", True))
logger.info(f"Starting planetary event calculation: days_ahead={days_ahead}, close_approaches={calculate_close_approaches}")
# Statistics
total_events_calculated = 0
total_events_saved = 0
total_events_failed = 0
try:
# Calculate oppositions and conjunctions
logger.info("Calculating oppositions and conjunctions...")
events = planetary_events_service.calculate_oppositions_conjunctions(
body_ids=body_ids,
days_ahead=days_ahead
)
logger.info(f"Calculated {len(events)} opposition/conjunction events")
total_events_calculated += len(events)
# Optionally calculate close approaches between planet pairs
if calculate_close_approaches:
logger.info("Calculating planetary close approaches...")
# Define interesting planet pairs
planet_pairs = [
('199', '299'), # Mercury - Venus
('299', '499'), # Venus - Mars
('499', '599'), # Mars - Jupiter
('599', '699'), # Jupiter - Saturn
]
close_approach_events = planetary_events_service.calculate_planetary_distances(
body_pairs=planet_pairs,
days_ahead=days_ahead,
threshold_degrees=threshold_degrees
)
logger.info(f"Calculated {len(close_approach_events)} close approach events")
events.extend(close_approach_events)
total_events_calculated += len(close_approach_events)
# Save events to database
logger.info(f"Saving {len(events)} events to database...")
for event_data in events:
try:
# Check if body exists in database
body_result = await db.execute(
select(CelestialBody).where(CelestialBody.id == event_data['body_id'])
)
body = body_result.scalar_one_or_none()
if not body:
logger.warning(f"Body {event_data['body_id']} not found in database, skipping event")
total_events_failed += 1
continue
# Clean old events for this body if requested (only once per body)
if clean_old_events:
cutoff_date = datetime.utcnow()
deleted_count = await event_service.delete_events_for_body_before(
body_id=event_data['body_id'],
before_time=cutoff_date,
db=db
)
if deleted_count > 0:
logger.debug(f"Cleaned {deleted_count} old events for {body.name}")
# Only clean once per body
clean_old_events = False
# Check if event already exists (to avoid duplicates)
# Truncate event_time to minute precision for comparison
event_time_minute = event_data['event_time'].replace(second=0, microsecond=0)
existing_event = await db.execute(
select(CelestialEvent).where(
CelestialEvent.body_id == event_data['body_id'],
CelestialEvent.event_type == event_data['event_type'],
func.date_trunc('minute', CelestialEvent.event_time) == event_time_minute
)
)
existing = existing_event.scalar_one_or_none()
if existing:
logger.debug(f"Event already exists, skipping: {event_data['title']}")
continue
# Create and save event
event = CelestialEvent(
body_id=event_data['body_id'],
title=event_data['title'],
event_type=event_data['event_type'],
event_time=event_data['event_time'],
description=event_data['description'],
details=event_data['details'],
source=event_data['source']
)
db.add(event)
await db.flush()
total_events_saved += 1
logger.debug(f"Saved event: {event.title}")
except Exception as e:
logger.error(f"Failed to save event {event_data.get('title', 'Unknown')}: {e}")
total_events_failed += 1
# Commit all events
await db.commit()
result = {
"success": True,
"total_events_calculated": total_events_calculated,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed,
"calculation_period_days": days_ahead,
"close_approaches_enabled": calculate_close_approaches,
}
logger.info(f"Task completed: {total_events_saved} events saved, {total_events_failed} failed")
return result
except Exception as e:
logger.error(f"Error in planetary event calculation: {e}")
await db.rollback()
return {
"success": False,
"error": str(e),
"total_events_calculated": total_events_calculated,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed
}