""" NASA JPL Horizons data query service """ from datetime import datetime, timedelta from astropy.time import Time import logging import re import httpx import os from sqlalchemy.ext.asyncio import AsyncSession # Added this import from app.models.celestial import Position, CelestialBody from app.config import settings logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates # Proxy is handled via settings.proxy_dict in each request async def get_object_data_raw(self, body_id: str) -> str: """ Get raw object data (terminal style text) from Horizons Args: body_id: JPL Horizons ID Returns: Raw text response from NASA """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Ensure ID is quoted for COMMAND cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO", "EPHEM_TYPE": "VECTORS", "CENTER": "@sun" } try: # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s") response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return response.text except Exception as e: logger.error(f"Error fetching raw data for {body_id}: {str(e)}") raise async def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ try: # Set default times if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time # Format time for Horizons (YYYY-MM-DD HH:MM) # Horizons accepts ISO-like format without 'T' start_str = start_time.strftime('%Y-%m-%d %H:%M') end_str = end_time.strftime('%Y-%m-%d %H:%M') # Special case for single point query (start = end) # Horizons requires START != STOP for ranges, but we can handle single point # by making a very small range or just asking for 1 step. # Actually Horizons API is fine with start=end if we don't ask for range? # Let's keep using range parameters as standard. if start_time == end_time: # Just add 1 minute for range, but we only parse the first result end_dummy = end_time + timedelta(minutes=1) end_str = end_dummy.strftime('%Y-%m-%d %H:%M') # Override step to ensure we get the start point # But wait, '1d' step might skip. # If start==end, we want exactly one point. # We can't use '1' count in API easily via URL params without STEP_SIZE? # Let's just use the provided step. logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "NO", "MAKE_EPHEM": "YES", "EPHEM_TYPE": "VECTORS", "CENTER": self.location, "START_TIME": start_str, "STOP_TIME": end_str, "STEP_SIZE": step, "CSV_FORMAT": "YES" } # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return self._parse_vectors(response.text) except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: """ Parse Horizons CSV output for vector data Format looks like: $$SOE 2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ... $$EOE """ positions = [] # Extract data block between $$SOE and $$EOE match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL) if not match: logger.warning("No data block ($$SOE...$$EOE) found in Horizons response") # Log a snippet of text for debugging logger.debug(f"Response snippet: {text[:200]}...") return [] data_block = match.group(1).strip() lines = data_block.split('\n') for line in lines: parts = [p.strip() for p in line.split(',')] if len(parts) < 5: continue try: # Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ # Time parsing: 2460676.500000000 is JD. # A.D. 2025-Jan-01 00:00:00.0000 is Calendar. # We can use JD or parse the string. Using JD via astropy is accurate. jd_str = parts[0] time_obj = Time(float(jd_str), format="jd").datetime x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # Velocity if available (indices 5, 6, 7) vx = float(parts[5]) if len(parts) > 5 else None vy = float(parts[6]) if len(parts) > 6 else None vz = float(parts[7]) if len(parts) > 7 else None pos = Position( time=time_obj, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz ) positions.append(pos) except ValueError as e: logger.warning(f"Failed to parse line: {line}. Error: {e}") continue return positions """ NASA JPL Horizons data query service """ from datetime import datetime, timedelta # from astroquery.jplhorizons import Horizons # Removed astroquery dependency from astropy.time import Time # Kept astropy for Time object import logging import re import httpx import os from app.models.celestial import Position, CelestialBody from app.config import settings logger = logging.getLogger(__name__) class HorizonsService: """Service for querying NASA JPL Horizons system""" def __init__(self): """Initialize the service""" self.location = "@sun" # Heliocentric coordinates # Proxy is handled via settings.proxy_dict in each request async def get_object_data_raw(self, body_id: str) -> str: """ Get raw object data (terminal style text) from Horizons Args: body_id: JPL Horizons ID Returns: Raw text response from NASA """ url = "https://ssd.jpl.nasa.gov/api/horizons.api" # Ensure ID is quoted for COMMAND cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", "MAKE_EPHEM": "NO", "EPHEM_TYPE": "VECTORS", "CENTER": "@sun" } try: # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict logger.info(f"Using proxy for NASA API: {settings.proxy_dict}") async with httpx.AsyncClient(**client_kwargs) as client: logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s") response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return response.text except Exception as e: logger.error(f"Error fetching raw data for {body_id}: {str(e)}") raise async def get_body_positions( self, body_id: str, start_time: datetime | None = None, end_time: datetime | None = None, step: str = "1d", ) -> list[Position]: """ Get positions for a celestial body over a time range Args: body_id: JPL Horizons ID (e.g., '-31' for Voyager 1) start_time: Start datetime (default: now) end_time: End datetime (default: now) step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour) Returns: List of Position objects """ try: # Set default times if start_time is None: start_time = datetime.utcnow() if end_time is None: end_time = start_time # Format time for Horizons (YYYY-MM-DD HH:MM) # Horizons accepts ISO-like format without 'T' start_str = start_time.strftime('%Y-%m-%d %H:%M') end_str = end_time.strftime('%Y-%m-%d %H:%M') # Special case for single point query (start = end) # Horizons requires START != STOP for ranges, but we can handle single point # by making a very small range or just asking for 1 step. # Actually Horizons API is fine with start=end if we don't ask for range? # Let's keep using range parameters as standard. if start_time == end_time: # Just add 1 minute for range, but we only parse the first result end_dummy = end_time + timedelta(minutes=1) end_str = end_dummy.strftime('%Y-%m-%d %H:%M') # Override step to ensure we get the start point # But wait, '1d' step might skip. # If start==end, we want exactly one point. # We can't use '1' count in API easily via URL params without STEP_SIZE? # Let's just use the provided step. logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "NO", "MAKE_EPHEM": "YES", "EPHEM_TYPE": "VECTORS", "CENTER": self.location, "START_TIME": start_str, "STOP_TIME": end_str, "STEP_SIZE": step, "CSV_FORMAT": "YES" } # Configure proxy if available client_kwargs = {"timeout": settings.nasa_api_timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") return self._parse_vectors(response.text) except Exception as e: logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: """ Parse Horizons CSV output for vector data Format looks like: $$SOE 2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ... $$EOE """ positions = [] # Extract data block between $$SOE and $$EOE match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL) if not match: logger.warning("No data block ($$SOE...$$EOE) found in Horizons response") # Log a snippet of text for debugging logger.debug(f"Response snippet: {text[:200]}...") return [] data_block = match.group(1).strip() lines = data_block.split('\n') for line in lines: parts = [p.strip() for p in line.split(',')] if len(parts) < 5: continue try: # Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ # Time parsing: 2460676.500000000 is JD. # A.D. 2025-Jan-01 00:00:00.0000 is Calendar. # We can use JD or parse the string. Using JD via astropy is accurate. jd_str = parts[0] time_obj = Time(float(jd_str), format="jd").datetime x = float(parts[2]) y = float(parts[3]) z = float(parts[4]) # Velocity if available (indices 5, 6, 7) vx = float(parts[5]) if len(parts) > 5 else None vy = float(parts[6]) if len(parts) > 6 else None vz = float(parts[7]) if len(parts) > 7 else None pos = Position( time=time_obj, x=x, y=y, z=z, vx=vx, vy=vy, vz=vz ) positions.append(pos) except ValueError as e: logger.warning(f"Failed to parse line: {line}. Error: {e}") continue return positions async def search_body_by_name(self, name: str, db: AsyncSession) -> dict: """ Search for a celestial body by name in NASA Horizons database using httpx. This method replaces the astroquery-based search to unify proxy and timeout control. """ try: logger.info(f"Searching Horizons (httpx) for: {name}") url = "https://ssd.jpl.nasa.gov/api/horizons.api" cmd_val = f"'{name}'" # Name can be ID or actual name params = { "format": "text", "COMMAND": cmd_val, "OBJ_DATA": "YES", # Request object data to get canonical name/ID "MAKE_EPHEM": "NO", # Don't need ephemeris "EPHEM_TYPE": "OBSERVER", # Arbitrary, won't be used since MAKE_EPHEM=NO "CENTER": "@ssb" # Search from Solar System Barycenter for consistent object IDs } timeout = await self._get_timeout(db) client_kwargs = {"timeout": timeout} if settings.proxy_dict: client_kwargs["proxies"] = settings.proxy_dict async with httpx.AsyncClient(**client_kwargs) as client: response = await client.get(url, params=params) if response.status_code != 200: raise Exception(f"NASA API returned status {response.status_code}") response_text = response.text # Check for "Ambiguous target name" if "Ambiguous target name" in response_text: logger.warning(f"Ambiguous target name for: {name}") return { "success": False, "id": None, "name": None, "full_name": None, "error": "名称不唯一,请提供更具体的名称或 JPL Horizons ID" } # Check for "No matches found" or "Unknown target" if "No matches found" in response_text or "Unknown target" in response_text: logger.warning(f"No matches found for: {name}") return { "success": False, "id": None, "name": None, "full_name": None, "error": "未找到匹配的天体,请检查名称或 ID" } # Parse canonical name and ID from response (e.g., "Target body name: Jupiter Barycenter (599)") target_name_match = re.search(r"Target body name: (.+?)\s+\((\-?\d+)\)", response_text) if target_name_match: full_name = target_name_match.group(1).strip() numeric_id = target_name_match.group(2).strip() short_name = full_name.split('(')[0].strip() # Remove any part after '(' logger.info(f"Found target: {full_name} with ID: {numeric_id}") return { "success": True, "id": numeric_id, "name": short_name, "full_name": full_name, "error": None } else: # Fallback if specific pattern not found, might be a valid but weird response logger.warning(f"Could not parse target name/ID from response for: {name}. Response snippet: {response_text[:200]}") return { "success": False, "id": None, "name": None, "full_name": None, "error": f"未能解析 JPL Horizons 响应,请尝试精确 ID: {name}" } except Exception as e: error_msg = str(e) logger.error(f"Error searching for {name}: {error_msg}") return { "success": False, "id": None, "name": None, "full_name": None, "error": f"查询失败: {error_msg}" } # Singleton instance horizons_service = HorizonsService()