""" NASA JPL Horizons data query service """ from datetime import datetime, timedelta from astropy.time import Time import logging import re import httpx import os from app.models.celestial import Position, CelestialBody from app.config import settings logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates # Proxy is handled via settings.proxy_dict in each request async def get_object_data_raw(self, body_id: str) -> str: """ Get raw object data (terminal style text) from Horizons Args: body_id: JPL Horizons ID Returns: Raw text response from NASA """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Ensure ID is quoted for COMMAND cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO", "EPHEM_TYPE": "VECTORS", "CENTER": "@sun" } try: # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s") response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return response.text except Exception as e: logger.error(f"Error fetching raw data for {body_id}: {str(e)}") raise async def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ try: # Set default times if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time # Format time for Horizons (YYYY-MM-DD HH:MM) # Horizons accepts ISO-like format without 'T' start_str = start_time.strftime('%Y-%m-%d %H:%M') end_str = end_time.strftime('%Y-%m-%d %H:%M') # Special case for single point query (start = end) # Horizons requires START != STOP for ranges, but we can handle single point # by making a very small range or just asking for 1 step. # Actually Horizons API is fine with start=end if we don't ask for range? # Let's keep using range parameters as standard. if start_time == end_time: # Just add 1 minute for range, but we only parse the first result end_dummy = end_time + timedelta(minutes=1) end_str = end_dummy.strftime('%Y-%m-%d %H:%M') # Override step to ensure we get the start point # But wait, '1d' step might skip. # If start==end, we want exactly one point. # We can't use '1' count in API easily via URL params without STEP_SIZE? # Let's just use the provided step. logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "NO", "MAKE_EPHEM": "YES", "EPHEM_TYPE": "VECTORS", "CENTER": self.location, "START_TIME": start_str, "STOP_TIME": end_str, "STEP_SIZE": step, "CSV_FORMAT": "YES" } # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return self._parse_vectors(response.text) except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: """ Parse Horizons CSV output for vector data Format looks like: $$SOE 2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ... $$EOE """ positions = [] # Extract data block between $$SOE and $$EOE match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL) if not match: logger.warning("No data block ($$SOE...$$EOE) found in Horizons response") # Log a snippet of text for debugging logger.debug(f"Response snippet: {text[:200]}...") return [] data_block = match.group(1).strip() lines = data_block.split('\n') for line in lines: parts = [p.strip() for p in line.split(',')] if len(parts) < 5: continue try: # Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ # Time parsing: 2460676.500000000 is JD. # A.D. 2025-Jan-01 00:00:00.0000 is Calendar. # We can use JD or parse the string. Using JD via astropy is accurate. jd_str = parts[0] time_obj = Time(float(jd_str), format="jd").datetime x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # Velocity if available (indices 5, 6, 7) vx = float(parts[5]) if len(parts) > 5 else None vy = float(parts[6]) if len(parts) > 6 else None vz = float(parts[7]) if len(parts) > 7 else None pos = Position( time=time_obj, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz ) positions.append(pos) except ValueError as e: logger.warning(f"Failed to parse line: {line}. Error: {e}") continue return positions """ NASA JPL Horizons data query service """ from datetime import datetime, timedelta # from astroquery.jplhorizons import Horizons # Removed astroquery dependency from astropy.time import Time # Kept astropy for Time object import logging import re import httpx import os from app.models.celestial import Position, CelestialBody from app.config import settings logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates # Proxy is handled via settings.proxy_dict in each request async def get_object_data_raw(self, body_id: str) -> str: """ Get raw object data (terminal style text) from Horizons Args: body_id: JPL Horizons ID Returns: Raw text response from NASA """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Ensure ID is quoted for COMMAND cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO", "EPHEM_TYPE": "VECTORS", "CENTER": "@sun" } try: # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s") response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return response.text except Exception as e: logger.error(f"Error fetching raw data for {body_id}: {str(e)}") raise async def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ try: # Set default times if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time # Format time for Horizons (YYYY-MM-DD HH:MM) # Horizons accepts ISO-like format without 'T' start_str = start_time.strftime('%Y-%m-%d %H:%M') end_str = end_time.strftime('%Y-%m-%d %H:%M') # Special case for single point query (start = end) # Horizons requires START != STOP for ranges, but we can handle single point # by making a very small range or just asking for 1 step. # Actually Horizons API is fine with start=end if we don't ask for range? # Let's keep using range parameters as standard. if start_time == end_time: # Just add 1 minute for range, but we only parse the first result end_dummy = end_time + timedelta(minutes=1) end_str = end_dummy.strftime('%Y-%m-%d %H:%M') # Override step to ensure we get the start point # But wait, '1d' step might skip. # If start==end, we want exactly one point. # We can't use '1' count in API easily via URL params without STEP_SIZE? # Let's just use the provided step. logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "NO", "MAKE_EPHEM": "YES", "EPHEM_TYPE": "VECTORS", "CENTER": self.location, "START_TIME": start_str, "STOP_TIME": end_str, "STEP_SIZE": step, "CSV_FORMAT": "YES" } # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return self._parse_vectors(response.text) except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: """ Parse Horizons CSV output for vector data Format looks like: $$SOE 2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ... $$EOE """ positions = [] # Extract data block between $$SOE and $$EOE match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL) if not match: logger.warning("No data block ($$SOE...$$EOE) found in Horizons response") # Log a snippet of text for debugging logger.debug(f"Response snippet: {text[:200]}...") return [] data_block = match.group(1).strip() lines = data_block.split('\n') for line in lines: parts = [p.strip() for p in line.split(',')] if len(parts) < 5: continue try: # Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ # Time parsing: 2460676.500000000 is JD. # A.D. 2025-Jan-01 00:00:00.0000 is Calendar. # We can use JD or parse the string. Using JD via astropy is accurate. jd_str = parts[0] time_obj = Time(float(jd_str), format="jd").datetime x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # Velocity if available (indices 5, 6, 7) vx = float(parts[5]) if len(parts) > 5 else None vy = float(parts[6]) if len(parts) > 6 else None vz = float(parts[7]) if len(parts) > 7 else None pos = Position( time=time_obj, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz ) positions.append(pos) except ValueError as e: logger.warning(f"Failed to parse line: {line}. Error: {e}") continue return positions async def search_body_by_name(self, name: str) -> dict: """ Search for a celestial body by name in NASA Horizons database using httpx. This method replaces the astroquery-based search to unify proxy and timeout control. """ try: logger.info(f"Searching Horizons (httpx) for: {name}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{name}'" # Name can be ID or actual name params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", # Request object data to get canonical name/ID "MAKE_EPHEM": "NO", # Don't need ephemeris "EPHEM_TYPE": "OBSERVER", # Arbitrary, won't be used since MAKE_EPHEM=NO "CENTER": "@ssb" # Search from Solar System Barycenter for consistent object IDs } client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") response_text = response.text # Check for "Ambiguous target name" if "Ambiguous target name" in response_text: logger.warning(f"Ambiguous target name for: {name}") return { "success": False, "id": None, "name": None, "full_name": None, "error": "名称不唯一,请提供更具体的名称或 JPL Horizons ID" } # Check for "No matches found" or "Unknown target" if "No matches found" in response_text or "Unknown target" in response_text: logger.warning(f"No matches found for: {name}") return { "success": False, "id": None, "name": None, "full_name": None, "error": "未找到匹配的天体,请检查名称或 ID" } # Parse canonical name and ID from response (e.g., "Target body name: Jupiter Barycenter (599)") target_name_match = re.search(r"Target body name: (.+?)\s+\((\-?\d+)\)", response_text) if target_name_match: full_name = target_name_match.group(1).strip() numeric_id = target_name_match.group(2).strip() short_name = full_name.split('(')[0].strip() # Remove any part after '(' logger.info(f"Found target: {full_name} with ID: {numeric_id}") return { "success": True, "id": numeric_id, "name": short_name, "full_name": full_name, "error": None } else: # Fallback if specific pattern not found, might be a valid but weird response logger.warning(f"Could not parse target name/ID from response for: {name}. Response snippet: {response_text[:200]}") return { "success": False, "id": None, "name": None, "full_name": None, "error": f"未能解析 JPL Horizons 响应,请尝试精确 ID: {name}" } except Exception as e: error_msg = str(e) logger.error(f"Error searching for {name}: {error_msg}") return { "success": False, "id": None, "name": None, "full_name": None, "error": f"查询失败: {error_msg}" } # Singleton instance horizons_service = HorizonsService()