cosmo/backend/app/api/nasa_download.py

384 lines
14 KiB
Python

"""
NASA Data Download API routes
Handles batch downloading of position data from NASA Horizons
"""
import logging
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, Query, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.database import get_db
from app.services.horizons import horizons_service
from app.services.db_service import celestial_body_service, position_service
from app.services.task_service import task_service
from app.services.nasa_worker import download_positions_task
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/celestial/positions", tags=["nasa-download"])
# Pydantic models
class DownloadPositionRequest(BaseModel):
body_ids: list[str]
dates: list[str] # List of dates in YYYY-MM-DD format
@router.get("/download/bodies")
async def get_downloadable_bodies(
db: AsyncSession = Depends(get_db)
):
"""
Get list of celestial bodies available for NASA data download, grouped by type
Returns:
- Dictionary with body types as keys and lists of bodies as values
"""
logger.info("Fetching downloadable bodies for NASA data download")
try:
# Get all active celestial bodies
all_bodies = await celestial_body_service.get_all_bodies(db)
# Group bodies by type
grouped_bodies = {}
for body in all_bodies:
if body.type not in grouped_bodies:
grouped_bodies[body.type] = []
grouped_bodies[body.type].append({
"id": body.id,
"name": body.name,
"name_zh": body.name_zh,
"type": body.type,
"is_active": body.is_active,
"description": body.description
})
# Sort each group by name
for body_type in grouped_bodies:
grouped_bodies[body_type].sort(key=lambda x: x["name"])
logger.info(f"✅ Returning {len(all_bodies)} bodies in {len(grouped_bodies)} groups")
return {"bodies": grouped_bodies}
except Exception as e:
logger.error(f"Failed to fetch downloadable bodies: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/download/status")
async def get_download_status(
body_id: str = Query(..., description="Celestial body ID"),
start_date: str = Query(..., description="Start date (YYYY-MM-DD)"),
end_date: str = Query(..., description="End date (YYYY-MM-DD)"),
db: AsyncSession = Depends(get_db)
):
"""
Get data availability status for a specific body within a date range
Returns:
- List of dates that have position data
"""
logger.info(f"Checking download status for {body_id} from {start_date} to {end_date}")
try:
# Parse dates
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
# Get available dates
available_dates = await position_service.get_available_dates(
body_id=body_id,
start_time=start_dt,
end_time=end_dt,
session=db
)
# Convert dates to ISO format strings
available_date_strings = [
date.isoformat() if hasattr(date, 'isoformat') else str(date)
for date in available_dates
]
logger.info(f"✅ Found {len(available_date_strings)} dates with data")
return {
"body_id": body_id,
"start_date": start_date,
"end_date": end_date,
"available_dates": available_date_strings
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid date format: {str(e)}")
except Exception as e:
logger.error(f"Failed to check download status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/download-async")
async def download_positions_async(
request: DownloadPositionRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Start asynchronous background task to download position data
"""
# Create task record
task = await task_service.create_task(
db,
task_type="nasa_download",
description=f"Download positions for {len(request.body_ids)} bodies on {len(request.dates)} dates",
params=request.dict(),
created_by=None
)
# Add to background tasks
background_tasks.add_task(
download_positions_task,
task.id,
request.body_ids,
request.dates
)
return {
"message": "Download task started",
"task_id": task.id
}
@router.post("/download")
async def download_positions(
request: DownloadPositionRequest,
db: AsyncSession = Depends(get_db)
):
"""
Download position data for specified bodies on specified dates (Synchronous)
This endpoint will:
1. Query NASA Horizons API for the position at 00:00:00 UTC on each date
2. Save the data to the positions table
3. Return the downloaded data
Args:
- body_ids: List of celestial body IDs
- dates: List of dates (YYYY-MM-DD format)
Returns:
- Summary of downloaded data with success/failure status
"""
logger.info(f"Downloading positions (sync) for {len(request.body_ids)} bodies on {len(request.dates)} dates")
try:
results = []
total_success = 0
total_failed = 0
for body_id in request.body_ids:
# Check if body exists
body = await celestial_body_service.get_body_by_id(body_id, db)
if not body:
results.append({
"body_id": body_id,
"status": "failed",
"error": "Body not found"
})
total_failed += 1
continue
body_results = {
"body_id": body_id,
"body_name": body.name_zh or body.name,
"dates": []
}
for date_str in request.dates:
try:
# Parse date and set to midnight UTC
target_date = datetime.strptime(date_str, "%Y-%m-%d")
# Check if data already exists for this date
existing = await position_service.get_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date.replace(hour=23, minute=59, second=59),
session=db
)
if existing and len(existing) > 0:
body_results["dates"].append({
"date": date_str,
"status": "exists",
"message": "Data already exists"
})
total_success += 1
continue
# Download from NASA Horizons
logger.info(f"Downloading position for body {body_id} on {date_str}")
positions = await horizons_service.get_body_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date,
step="1d"
)
if positions and len(positions) > 0:
logger.info(f"Received position data for body {body_id}: x={positions[0].x}, y={positions[0].y}, z={positions[0].z}")
# Save to database
position_data = [{
"time": target_date,
"x": positions[0].x,
"y": positions[0].y,
"z": positions[0].z,
"vx": getattr(positions[0], 'vx', None),
"vy": getattr(positions[0], 'vy', None),
"vz": getattr(positions[0], 'vz', None),
}]
await position_service.save_positions(
body_id=body_id,
positions=position_data,
source="nasa_horizons",
session=db
)
logger.info(f"Saved position for body {body_id} on {date_str}")
# Invalidate caches for this date to ensure fresh data is served
from app.services.redis_cache import redis_cache, make_cache_key
start_str = target_date.isoformat()
end_str = target_date.isoformat()
# Clear both "all bodies" cache and specific body cache
for body_ids_str in ["all", body_id]:
redis_key = make_cache_key("positions", start_str, end_str, "1d", body_ids_str)
await redis_cache.delete(redis_key)
logger.debug(f"Invalidated cache: {redis_key}")
body_results["dates"].append({
"date": date_str,
"status": "success",
"position": {
"x": positions[0].x,
"y": positions[0].y,
"z": positions[0].z
}
})
total_success += 1
else:
body_results["dates"].append({
"date": date_str,
"status": "failed",
"error": "No data returned from NASA"
})
total_failed += 1
except Exception as e:
logger.error(f"Failed to download {body_id} on {date_str}: {e}")
body_results["dates"].append({
"date": date_str,
"status": "failed",
"error": str(e)
})
total_failed += 1
results.append(body_results)
return {
"message": f"Downloaded {total_success} positions ({total_failed} failed)",
"total_success": total_success,
"total_failed": total_failed,
"results": results
}
except Exception as e:
logger.error(f"Download failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/delete")
async def delete_positions(
request: DownloadPositionRequest,
db: AsyncSession = Depends(get_db)
):
"""
Delete position data for specified bodies on specified dates
Args:
- body_ids: List of celestial body IDs
- dates: List of dates (YYYY-MM-DD format)
Returns:
- Summary of deleted data
"""
logger.info(f"Deleting positions for {len(request.body_ids)} bodies on {len(request.dates)} dates")
try:
total_deleted = 0
from sqlalchemy import text
for body_id in request.body_ids:
# Invalidate caches for this body
from app.services.redis_cache import redis_cache, make_cache_key
# We need to loop dates to delete specific records
for date_str in request.dates:
try:
# Parse date
target_date = datetime.strptime(date_str, "%Y-%m-%d")
# End of day
end_of_day = target_date.replace(hour=23, minute=59, second=59, microsecond=999999)
# Execute deletion
# Using text() for raw SQL is often simpler for range deletes,
# but ORM is safer. Let's use ORM with execute.
# But since position_service might not have delete, we do it here.
stmt = text("""
DELETE FROM positions
WHERE body_id = :body_id
AND time >= :start_time
AND time <= :end_time
""")
result = await db.execute(stmt, {
"body_id": body_id,
"start_time": target_date,
"end_time": end_of_day
})
deleted_count = result.rowcount
total_deleted += deleted_count
if deleted_count > 0:
logger.info(f"Deleted {deleted_count} records for {body_id} on {date_str}")
# Invalidate cache for this specific date/body combo
# Note: This is approximate as cache keys might cover ranges
start_str = target_date.isoformat()
end_str = target_date.isoformat()
# Clear both "all bodies" cache and specific body cache
for body_ids_str in ["all", body_id]:
# We try to clear '1d' step cache
redis_key = make_cache_key("positions", start_str, end_str, "1d", body_ids_str)
await redis_cache.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete data for {body_id} on {date_str}: {e}")
await db.commit()
# Clear general patterns to be safe if ranges were cached
await redis_cache.clear_pattern("positions:*")
return {
"message": f"Successfully deleted {total_deleted} position records",
"total_deleted": total_deleted
}
except Exception as e:
await db.rollback()
logger.error(f"Delete failed: {e}")
raise HTTPException(status_code=500, detail=str(e))