""" NASA Data Download API routes Handles batch downloading of position data from NASA Horizons """ import logging from datetime import datetime from fastapi import APIRouter, HTTPException, Depends, Query, BackgroundTasks from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel from app.database import get_db from app.services.horizons import horizons_service from app.services.db_service import celestial_body_service, position_service from app.services.task_service import task_service from app.services.nasa_worker import download_positions_task logger = logging.getLogger(__name__) router = APIRouter(prefix="/celestial/positions", tags=["nasa-download"]) # Pydantic models class DownloadPositionRequest(BaseModel): body_ids: list[str] dates: list[str] # List of dates in YYYY-MM-DD format @router.get("/download/bodies") async def get_downloadable_bodies( db: AsyncSession = Depends(get_db) ): """ Get list of celestial bodies available for NASA data download, grouped by type Returns: - Dictionary with body types as keys and lists of bodies as values """ logger.info("Fetching downloadable bodies for NASA data download") try: # Get all active celestial bodies all_bodies = await celestial_body_service.get_all_bodies(db) # Group bodies by type grouped_bodies = {} for body in all_bodies: if body.type not in grouped_bodies: grouped_bodies[body.type] = [] grouped_bodies[body.type].append({ "id": body.id, "name": body.name, "name_zh": body.name_zh, "type": body.type, "is_active": body.is_active, "description": body.description }) # Sort each group by name for body_type in grouped_bodies: grouped_bodies[body_type].sort(key=lambda x: x["name"]) logger.info(f"✅ Returning {len(all_bodies)} bodies in {len(grouped_bodies)} groups") return {"bodies": grouped_bodies} except Exception as e: logger.error(f"Failed to fetch downloadable bodies: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/download/status") async def get_download_status( body_id: str = Query(..., description="Celestial body ID"), start_date: str = Query(..., description="Start date (YYYY-MM-DD)"), end_date: str = Query(..., description="End date (YYYY-MM-DD)"), db: AsyncSession = Depends(get_db) ): """ Get data availability status for a specific body within a date range Returns: - List of dates that have position data """ logger.info(f"Checking download status for {body_id} from {start_date} to {end_date}") try: # Parse dates start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) # Get available dates available_dates = await position_service.get_available_dates( body_id=body_id, start_time=start_dt, end_time=end_dt, session=db ) # Convert dates to ISO format strings available_date_strings = [ date.isoformat() if hasattr(date, 'isoformat') else str(date) for date in available_dates ] logger.info(f"✅ Found {len(available_date_strings)} dates with data") return { "body_id": body_id, "start_date": start_date, "end_date": end_date, "available_dates": available_date_strings } except ValueError as e: raise HTTPException(status_code=400, detail=f"Invalid date format: {str(e)}") except Exception as e: logger.error(f"Failed to check download status: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/download-async") async def download_positions_async( request: DownloadPositionRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db) ): """ Start asynchronous background task to download position data """ # Create task record task = await task_service.create_task( db, task_type="nasa_download", description=f"Download positions for {len(request.body_ids)} bodies on {len(request.dates)} dates", params=request.dict(), created_by=None ) # Add to background tasks background_tasks.add_task( download_positions_task, task.id, request.body_ids, request.dates ) return { "message": "Download task started", "task_id": task.id } @router.post("/download") async def download_positions( request: DownloadPositionRequest, db: AsyncSession = Depends(get_db) ): """ Download position data for specified bodies on specified dates (Synchronous) This endpoint will: 1. Query NASA Horizons API for the position at 00:00:00 UTC on each date 2. Save the data to the positions table 3. Return the downloaded data Args: - body_ids: List of celestial body IDs - dates: List of dates (YYYY-MM-DD format) Returns: - Summary of downloaded data with success/failure status """ logger.info(f"Downloading positions (sync) for {len(request.body_ids)} bodies on {len(request.dates)} dates") try: results = [] total_success = 0 total_failed = 0 for body_id in request.body_ids: # Check if body exists body = await celestial_body_service.get_body_by_id(body_id, db) if not body: results.append({ "body_id": body_id, "status": "failed", "error": "Body not found" }) total_failed += 1 continue body_results = { "body_id": body_id, "body_name": body.name_zh or body.name, "dates": [] } for date_str in request.dates: try: # Parse date and set to midnight UTC target_date = datetime.strptime(date_str, "%Y-%m-%d") # Check if data already exists for this date existing = await position_service.get_positions( body_id=body_id, start_time=target_date, end_time=target_date.replace(hour=23, minute=59, second=59), session=db ) if existing and len(existing) > 0: body_results["dates"].append({ "date": date_str, "status": "exists", "message": "Data already exists" }) total_success += 1 continue # Download from NASA Horizons logger.info(f"Downloading position for body {body_id} on {date_str}") positions = await horizons_service.get_body_positions( body_id=body_id, start_time=target_date, end_time=target_date, step="1d" ) if positions and len(positions) > 0: logger.info(f"Received position data for body {body_id}: x={positions[0].x}, y={positions[0].y}, z={positions[0].z}") # Save to database position_data = [{ "time": target_date, "x": positions[0].x, "y": positions[0].y, "z": positions[0].z, "vx": getattr(positions[0], 'vx', None), "vy": getattr(positions[0], 'vy', None), "vz": getattr(positions[0], 'vz', None), }] await position_service.save_positions( body_id=body_id, positions=position_data, source="nasa_horizons", session=db ) logger.info(f"Saved position for body {body_id} on {date_str}") # Invalidate caches for this date to ensure fresh data is served from app.services.redis_cache import redis_cache, make_cache_key start_str = target_date.isoformat() end_str = target_date.isoformat() # Clear both "all bodies" cache and specific body cache for body_ids_str in ["all", body_id]: redis_key = make_cache_key("positions", start_str, end_str, "1d", body_ids_str) await redis_cache.delete(redis_key) logger.debug(f"Invalidated cache: {redis_key}") body_results["dates"].append({ "date": date_str, "status": "success", "position": { "x": positions[0].x, "y": positions[0].y, "z": positions[0].z } }) total_success += 1 else: body_results["dates"].append({ "date": date_str, "status": "failed", "error": "No data returned from NASA" }) total_failed += 1 except Exception as e: logger.error(f"Failed to download {body_id} on {date_str}: {e}") body_results["dates"].append({ "date": date_str, "status": "failed", "error": str(e) }) total_failed += 1 results.append(body_results) return { "message": f"Downloaded {total_success} positions ({total_failed} failed)", "total_success": total_success, "total_failed": total_failed, "results": results } except Exception as e: logger.error(f"Download failed: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/delete") async def delete_positions( request: DownloadPositionRequest, db: AsyncSession = Depends(get_db) ): """ Delete position data for specified bodies on specified dates Args: - body_ids: List of celestial body IDs - dates: List of dates (YYYY-MM-DD format) Returns: - Summary of deleted data """ logger.info(f"Deleting positions for {len(request.body_ids)} bodies on {len(request.dates)} dates") try: total_deleted = 0 from sqlalchemy import text for body_id in request.body_ids: # Invalidate caches for this body from app.services.redis_cache import redis_cache, make_cache_key # We need to loop dates to delete specific records for date_str in request.dates: try: # Parse date target_date = datetime.strptime(date_str, "%Y-%m-%d") # End of day end_of_day = target_date.replace(hour=23, minute=59, second=59, microsecond=999999) # Execute deletion # Using text() for raw SQL is often simpler for range deletes, # but ORM is safer. Let's use ORM with execute. # But since position_service might not have delete, we do it here. stmt = text(""" DELETE FROM positions WHERE body_id = :body_id AND time >= :start_time AND time <= :end_time """) result = await db.execute(stmt, { "body_id": body_id, "start_time": target_date, "end_time": end_of_day }) deleted_count = result.rowcount total_deleted += deleted_count if deleted_count > 0: logger.info(f"Deleted {deleted_count} records for {body_id} on {date_str}") # Invalidate cache for this specific date/body combo # Note: This is approximate as cache keys might cover ranges start_str = target_date.isoformat() end_str = target_date.isoformat() # Clear both "all bodies" cache and specific body cache for body_ids_str in ["all", body_id]: # We try to clear '1d' step cache redis_key = make_cache_key("positions", start_str, end_str, "1d", body_ids_str) await redis_cache.delete(redis_key) except Exception as e: logger.error(f"Failed to delete data for {body_id} on {date_str}: {e}") await db.commit() # Clear general patterns to be safe if ranges were cached await redis_cache.clear_pattern("positions:*") return { "message": f"Successfully deleted {total_deleted} position records", "total_deleted": total_deleted } except Exception as e: await db.rollback() logger.error(f"Delete failed: {e}") raise HTTPException(status_code=500, detail=str(e))