""" NASA JPL Horizons data query service """ from datetime import datetime, timedelta from astroquery.jplhorizons import Horizons from astropy.time import Time import logging from app.models.celestial import Position, CelestialBody logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ try: # Set default times if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time # Convert to astropy Time objects for single point queries # For ranges, use ISO format strings which Horizons prefers # Create time range if start_time == end_time: # Single time point - use JD format epochs = Time(start_time).jd else: # Time range - use ISO format (YYYY-MM-DD HH:MM) # Horizons expects this format for ranges start_str = start_time.strftime('%Y-%m-%d %H:%M') end_str = end_time.strftime('%Y-%m-%d %H:%M') epochs = {"start": start_str, "stop": end_str, "step": step} logger.info(f"Querying Horizons for body {body_id} from {start_time} to {end_time}") # Query JPL Horizons obj = Horizons(id=body_id, location=self.location, epochs=epochs) vectors = obj.vectors() # Extract positions positions = [] if isinstance(epochs, dict): # Multiple time points for i in range(len(vectors)): pos = Position( time=Time(vectors["datetime_jd"][i], format="jd").datetime, x=float(vectors["x"][i]), y=float(vectors["y"][i]), z=float(vectors["z"][i]), ) positions.append(pos) else: # Single time point pos = Position( time=start_time, x=float(vectors["x"][0]), y=float(vectors["y"][0]), z=float(vectors["z"][0]), ) positions.append(pos) logger.info(f"Successfully retrieved {len(positions)} positions for body {body_id}") return positions except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") raise # Singleton instance horizons_service = HorizonsService()