cosmo/backend/app/services/horizons.py

157 lines
5.2 KiB
Python

"""
NASA JPL Horizons data query service
"""
from datetime import datetime, timedelta
from astroquery.jplhorizons import Horizons
from astropy.time import Time
import logging
from app.models.celestial import Position, CelestialBody, CELESTIAL_BODIES
logger = logging.getLogger(__name__)
class HorizonsService:
"""Service for querying NASA JPL Horizons system"""
def __init__(self):
"""Initialize the service"""
self.location = "@sun" # Heliocentric coordinates
def get_body_positions(
self,
body_id: str,
start_time: datetime | None = None,
end_time: datetime | None = None,
step: str = "1d",
) -> list[Position]:
"""
Get positions for a celestial body over a time range
Args:
body_id: JPL Horizons ID (e.g., '-31' for Voyager 1)
start_time: Start datetime (default: now)
end_time: End datetime (default: now)
step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour)
Returns:
List of Position objects
"""
try:
# Set default times
if start_time is None:
start_time = datetime.utcnow()
if end_time is None:
end_time = start_time
# Convert to astropy Time objects
start_jd = Time(start_time).jd
end_jd = Time(end_time).jd
# Create time range
if start_jd == end_jd:
epochs = start_jd
else:
# Create range with step
epochs = {"start": start_time.isoformat(), "stop": end_time.isoformat(), "step": step}
logger.info(f"Querying Horizons for body {body_id} from {start_time} to {end_time}")
# Query JPL Horizons
obj = Horizons(id=body_id, location=self.location, epochs=epochs)
vectors = obj.vectors()
# Extract positions
positions = []
if isinstance(epochs, dict):
# Multiple time points
for i in range(len(vectors)):
pos = Position(
time=Time(vectors["datetime_jd"][i], format="jd").datetime,
x=float(vectors["x"][i]),
y=float(vectors["y"][i]),
z=float(vectors["z"][i]),
)
positions.append(pos)
else:
# Single time point
pos = Position(
time=start_time,
x=float(vectors["x"][0]),
y=float(vectors["y"][0]),
z=float(vectors["z"][0]),
)
positions.append(pos)
logger.info(f"Successfully retrieved {len(positions)} positions for body {body_id}")
return positions
except Exception as e:
logger.error(f"Error querying Horizons for body {body_id}: {str(e)}")
raise
def get_all_bodies(
self,
start_time: datetime | None = None,
end_time: datetime | None = None,
step: str = "1d",
) -> list[CelestialBody]:
"""
Get positions for all predefined celestial bodies
Args:
start_time: Start datetime
end_time: End datetime
step: Time step
Returns:
List of CelestialBody objects
"""
bodies = []
for body_id, info in CELESTIAL_BODIES.items():
try:
# Special handling for the Sun (it's at origin)
if body_id == "10":
# Sun is at (0, 0, 0)
if start_time is None:
start_time = datetime.utcnow()
if end_time is None:
end_time = start_time
positions = [
Position(time=start_time, x=0.0, y=0.0, z=0.0)
]
if start_time != end_time:
# Add end position as well
positions.append(
Position(time=end_time, x=0.0, y=0.0, z=0.0)
)
# Special handling for Cassini (mission ended 2017-09-15)
elif body_id == "-82":
# Use Cassini's last known position (2017-09-15)
cassini_date = datetime(2017, 9, 15, 11, 58, 0)
positions = self.get_body_positions(body_id, cassini_date, cassini_date, step)
else:
# Query other bodies
positions = self.get_body_positions(body_id, start_time, end_time, step)
body = CelestialBody(
id=body_id,
name=info["name"],
type=info["type"],
positions=positions,
description=info["description"],
)
bodies.append(body)
except Exception as e:
logger.error(f"Failed to get data for {info['name']}: {str(e)}")
# Continue with other bodies even if one fails
return bodies
# Singleton instance
horizons_service = HorizonsService()