""" NASA JPL Horizons data query service """ from datetime import datetime, timedelta from astropy.time import Time import logging import re import httpx import os import json from sqlalchemy.ext.asyncio import AsyncSession from app.models.celestial import Position, CelestialBody from app.config import settings from app.services.redis_cache import redis_cache logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates # Proxy is handled via settings.proxy_dict in each request async def get_object_data_raw(self, body_id: str) -> str: """ Get raw object data (terminal style text) from Horizons Args: body_id: JPL Horizons ID Returns: Raw text response from NASA """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Ensure ID is quoted for COMMAND cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO", "EPHEM_TYPE": "VECTORS", "CENTER": "@sun" } try: # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s") response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return response.text except Exception as e: logger.error(f"Error fetching raw data for {body_id}: {repr(e)}") raise async def search_body_by_name(self, name: str, db: AsyncSession = None) -> dict: """ Search for a celestial body in Horizons by name. Args: name: Name to search (e.g. 'Ceres', 'Halley') db: Database session (optional, for future caching) Returns: Dict with success/error and data """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Using a wildcard search command cmd_val = f"'{name}*'" params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO" } try: client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Searching Horizons for: {name}") response = await client.get(url, params=params) if response.status_code != 200: return {"success": False, "error": f"NASA API Error: {response.status_code}"} text = response.text # Case 1: Direct match (Horizon returns data directly) # Look for "Target body name:" or similar indicators of a resolved body if "Target body name:" in text or "Physical properties" in text: # Extract ID and Name # Pattern: "Target body name: 1 Ceres (A801 AA)" or similar match = re.search(r"Target body name:\s*(.*?)\s*\{", text) if not match: match = re.search(r"Target body name:\s*(.*?)\n", text) full_name = match.group(1).strip() if match else name # Try to extract ID from the text, usually in the header or COMMAND output # This is tricky with raw text, but let's try to extract from the command echo # Or we can just use the input name if we can't find a better ID # Ideally we want the numeric ID (e.g. '1' for Ceres, '399' for Earth) # If it's a direct match, the ID might not be explicitly listed as "ID = ..." # But often the name contains it, e.g. "1 Ceres" body_id = name # Fallback # Try to parse "1 Ceres" -> id=1 id_match = re.search(r"^(\d+)\s+([a-zA-Z]+)", full_name) if id_match: body_id = id_match.group(1) clean_name = id_match.group(2) else: clean_name = full_name return { "success": True, "id": body_id, "name": clean_name, "full_name": full_name } # Case 2: Multiple matches (Ambiguous) # Horizons returns a list of matches if "Multiple major-bodies match" in text or "Matching small-bodies" in text: # We need to parse the list and pick the best match or return the first one # For now, let's try to find the most likely match (exact name) # Pattern for small bodies: "record # epoch-yr primary desig >name<" # or " ID Name Designation" # Simple heuristic: Look for lines containing the name lines = text.split('\n') best_match = None for line in lines: if name.lower() in line.lower(): # Try to extract ID (first column usually) parts = line.strip().split() if parts and parts[0].isdigit() or (parts[0].startswith('-') and parts[0][1:].isdigit()): best_match = { "id": parts[0], "name": name, "full_name": line.strip() } break if best_match: return { "success": True, "id": best_match["id"], "name": best_match["name"], "full_name": best_match["full_name"] } return {"success": False, "error": "Multiple matches found, please be more specific"} # Case 3: No match if "No matches found" in text: return {"success": False, "error": "No celestial body found with that name"} # Fallback for unknown response format return {"success": False, "error": "Could not parse NASA response"} except Exception as e: logger.error(f"Search error for {name}: {repr(e)}") return {"success": False, "error": str(e)} async def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ # Set default times and format for cache key if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time start_str_cache = start_time.strftime('%Y-%m-%d') end_str_cache = end_time.strftime('%Y-%m-%d') # 1. Try to fetch from Redis cache cache_key = f"nasa:horizons:positions:{body_id}:{start_str_cache}:{end_str_cache}:{step}" cached_data = await redis_cache.get(cache_key) if cached_data: logger.info(f"Cache HIT for {body_id} positions ({start_str_cache}-{end_str_cache})") # Deserialize cached JSON data back to Position objects positions_data = json.loads(cached_data) positions = [] for item in positions_data: # Ensure 'time' is converted back to datetime object item['time'] = datetime.fromisoformat(item['time']) positions.append(Position(**item)) return positions logger.info(f"Cache MISS for {body_id} positions ({start_str_cache}-{end_str_cache}). Fetching from NASA.") try: # Format time for Horizons API if start_time.date() == end_time.date(): start_str = start_time.strftime('%Y-%m-%d') end_time_adjusted = start_time + timedelta(days=1) end_str = end_time_adjusted.strftime('%Y-%m-%d') else: start_str = start_time.strftime('%Y-%m-%d') end_str = end_time.strftime('%Y-%m-%d') logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "NO", "MAKE_EPHEM": "YES", "EPHEM_TYPE": "VECTORS", "CENTER": self.location, "START_TIME": start_str, "STOP_TIME": end_str, "STEP_SIZE": step, "CSV_FORMAT": "YES", "OUT_UNITS": "AU-D" } # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") positions = self._parse_vectors(response.text) # 2. Cache the result before returning if positions: # Serialize Position objects to list of dicts for JSON storage # Convert datetime to ISO format string for JSON serialization positions_data_to_cache = [] for p in positions: pos_dict = p.dict() # Convert datetime to ISO string if isinstance(pos_dict.get('time'), datetime): pos_dict['time'] = pos_dict['time'].isoformat() positions_data_to_cache.append(pos_dict) # Use a TTL of 7 days (604800 seconds) for now, can be made configurable await redis_cache.set(cache_key, json.dumps(positions_data_to_cache), ttl_seconds=604800) logger.info(f"Cache SET for {body_id} positions ({start_str_cache}-{end_str_cache}) with TTL 7 days.") return positions except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {repr(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: """ Parse Horizons CSV output for vector data Format looks like: $$SOE 2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ... $$EOE """ positions = [] # Extract data block between $$SOE and $$EOE match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL) if not match: logger.warning("No data block ($$SOE...$$EOE) found in Horizons response") logger.debug(f"Response snippet: {text[:500]}...") return [] data_block = match.group(1).strip() lines = data_block.split('\n') for line in lines: parts = [p.strip() for p in line.split(',')] if len(parts) < 5: continue try: # Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ jd_str = parts[0] time_obj = Time(float(jd_str), format="jd").datetime x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # Velocity if available (indices 5, 6, 7) vx = float(parts[5]) if len(parts) > 5 else None vy = float(parts[6]) if len(parts) > 6 else None vz = float(parts[7]) if len(parts) > 7 else None pos = Position( time=time_obj, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz ) positions.append(pos) except (ValueError, IndexError) as e: logger.warning(f"Failed to parse line: {line}. Error: {e}") continue return positions # Global singleton instance horizons_service = HorizonsService()