99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
"""
|
|
NASA JPL Horizons data query service
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from astroquery.jplhorizons import Horizons
|
|
from astropy.time import Time
|
|
import logging
|
|
|
|
from app.models.celestial import Position, CelestialBody
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HorizonsService:
|
|
"""Service for querying NASA JPL Horizons system"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the service"""
|
|
self.location = "@sun" # Heliocentric coordinates
|
|
|
|
def get_body_positions(
|
|
self,
|
|
body_id: str,
|
|
start_time: datetime | None = None,
|
|
end_time: datetime | None = None,
|
|
step: str = "1d",
|
|
) -> list[Position]:
|
|
"""
|
|
Get positions for a celestial body over a time range
|
|
|
|
Args:
|
|
body_id: JPL Horizons ID (e.g., '-31' for Voyager 1)
|
|
start_time: Start datetime (default: now)
|
|
end_time: End datetime (default: now)
|
|
step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour)
|
|
|
|
Returns:
|
|
List of Position objects
|
|
"""
|
|
try:
|
|
# Set default times
|
|
if start_time is None:
|
|
start_time = datetime.utcnow()
|
|
if end_time is None:
|
|
end_time = start_time
|
|
|
|
# Convert to astropy Time objects for single point queries
|
|
# For ranges, use ISO format strings which Horizons prefers
|
|
|
|
# Create time range
|
|
if start_time == end_time:
|
|
# Single time point - use JD format
|
|
epochs = Time(start_time).jd
|
|
else:
|
|
# Time range - use ISO format (YYYY-MM-DD HH:MM)
|
|
# Horizons expects this format for ranges
|
|
start_str = start_time.strftime('%Y-%m-%d %H:%M')
|
|
end_str = end_time.strftime('%Y-%m-%d %H:%M')
|
|
epochs = {"start": start_str, "stop": end_str, "step": step}
|
|
|
|
logger.info(f"Querying Horizons for body {body_id} from {start_time} to {end_time}")
|
|
|
|
# Query JPL Horizons
|
|
obj = Horizons(id=body_id, location=self.location, epochs=epochs)
|
|
vectors = obj.vectors()
|
|
|
|
# Extract positions
|
|
positions = []
|
|
if isinstance(epochs, dict):
|
|
# Multiple time points
|
|
for i in range(len(vectors)):
|
|
pos = Position(
|
|
time=Time(vectors["datetime_jd"][i], format="jd").datetime,
|
|
x=float(vectors["x"][i]),
|
|
y=float(vectors["y"][i]),
|
|
z=float(vectors["z"][i]),
|
|
)
|
|
positions.append(pos)
|
|
else:
|
|
# Single time point
|
|
pos = Position(
|
|
time=start_time,
|
|
x=float(vectors["x"][0]),
|
|
y=float(vectors["y"][0]),
|
|
z=float(vectors["z"][0]),
|
|
)
|
|
positions.append(pos)
|
|
|
|
logger.info(f"Successfully retrieved {len(positions)} positions for body {body_id}")
|
|
return positions
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error querying Horizons for body {body_id}: {str(e)}")
|
|
raise
|
|
|
|
|
|
# Singleton instance
|
|
horizons_service = HorizonsService()
|