cosmo/backend/app/services/horizons.py

99 lines
3.2 KiB
Python

"""
NASA JPL Horizons data query service
"""
from datetime import datetime, timedelta
from astroquery.jplhorizons import Horizons
from astropy.time import Time
import logging
from app.models.celestial import Position, CelestialBody
logger = logging.getLogger(__name__)
class HorizonsService:
"""Service for querying NASA JPL Horizons system"""
def __init__(self):
"""Initialize the service"""
self.location = "@sun" # Heliocentric coordinates
def get_body_positions(
self,
body_id: str,
start_time: datetime | None = None,
end_time: datetime | None = None,
step: str = "1d",
) -> list[Position]:
"""
Get positions for a celestial body over a time range
Args:
body_id: JPL Horizons ID (e.g., '-31' for Voyager 1)
start_time: Start datetime (default: now)
end_time: End datetime (default: now)
step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour)
Returns:
List of Position objects
"""
try:
# Set default times
if start_time is None:
start_time = datetime.utcnow()
if end_time is None:
end_time = start_time
# Convert to astropy Time objects for single point queries
# For ranges, use ISO format strings which Horizons prefers
# Create time range
if start_time == end_time:
# Single time point - use JD format
epochs = Time(start_time).jd
else:
# Time range - use ISO format (YYYY-MM-DD HH:MM)
# Horizons expects this format for ranges
start_str = start_time.strftime('%Y-%m-%d %H:%M')
end_str = end_time.strftime('%Y-%m-%d %H:%M')
epochs = {"start": start_str, "stop": end_str, "step": step}
logger.info(f"Querying Horizons for body {body_id} from {start_time} to {end_time}")
# Query JPL Horizons
obj = Horizons(id=body_id, location=self.location, epochs=epochs)
vectors = obj.vectors()
# Extract positions
positions = []
if isinstance(epochs, dict):
# Multiple time points
for i in range(len(vectors)):
pos = Position(
time=Time(vectors["datetime_jd"][i], format="jd").datetime,
x=float(vectors["x"][i]),
y=float(vectors["y"][i]),
z=float(vectors["z"][i]),
)
positions.append(pos)
else:
# Single time point
pos = Position(
time=start_time,
x=float(vectors["x"][0]),
y=float(vectors["y"][0]),
z=float(vectors["z"][0]),
)
positions.append(pos)
logger.info(f"Successfully retrieved {len(positions)} positions for body {body_id}")
return positions
except Exception as e:
logger.error(f"Error querying Horizons for body {body_id}: {str(e)}")
raise
# Singleton instance
horizons_service = HorizonsService()