cosmo/backend/app/services/nasa_worker.py

344 lines
14 KiB
Python

"""
Worker functions for background tasks
"""
import logging
import asyncio
import httpx
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from app.database import AsyncSessionLocal
from app.services.task_service import task_service
from app.services.db_service import celestial_body_service, position_service
from app.services.horizons import horizons_service
from app.services.orbit_service import orbit_service
from app.services.event_service import event_service
from app.models.schemas.social import CelestialEventCreate
logger = logging.getLogger(__name__)
async def download_positions_task(task_id: int, body_ids: List[str], dates: List[str]):
"""
Background task worker for downloading NASA positions
"""
logger.info(f"Task {task_id}: Starting download for {len(body_ids)} bodies and {len(dates)} dates")
async with AsyncSessionLocal() as db:
try:
# Mark as running
await task_service.update_task(db, task_id, progress=0, status="running")
total_operations = len(body_ids) * len(dates)
current_op = 0
success_count = 0
failed_count = 0
results = []
for body_id in body_ids:
# Check body
body = await celestial_body_service.get_body_by_id(body_id, db)
if not body:
results.append({"body_id": body_id, "error": "Body not found"})
failed_count += len(dates)
current_op += len(dates)
continue
body_result = {
"body_id": body_id,
"body_name": body.name,
"dates": []
}
for date_str in dates:
try:
target_date = datetime.strptime(date_str, "%Y-%m-%d")
# Check existing
existing = await position_service.get_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date.replace(hour=23, minute=59, second=59),
session=db
)
if existing and len(existing) > 0:
body_result["dates"].append({"date": date_str, "status": "skipped"})
success_count += 1
else:
# Download
positions = await horizons_service.get_body_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date,
step="1d"
)
if positions and len(positions) > 0:
pos_data = [{
"time": target_date,
"x": positions[0].x,
"y": positions[0].y,
"z": positions[0].z,
"vx": getattr(positions[0], 'vx', None),
"vy": getattr(positions[0], 'vy', None),
"vz": getattr(positions[0], 'vz', None),
}]
await position_service.save_positions(
body_id=body_id,
positions=pos_data,
source="nasa_horizons",
session=db
)
body_result["dates"].append({"date": date_str, "status": "success"})
success_count += 1
else:
body_result["dates"].append({"date": date_str, "status": "failed", "error": "No data"})
failed_count += 1
# Sleep slightly to prevent rate limiting and allow context switching
# await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error processing {body_id} on {date_str}: {e}")
body_result["dates"].append({"date": date_str, "status": "error", "error": str(e)})
failed_count += 1
# Update progress
current_op += 1
progress = int((current_op / total_operations) * 100)
# Only update DB every 5% or so to reduce load, but update Redis frequently
# For now, update every item for simplicity
await task_service.update_task(db, task_id, progress=progress)
results.append(body_result)
# Complete
final_result = {
"total_success": success_count,
"total_failed": failed_count,
"details": results
}
await task_service.update_task(db, task_id, status="completed", progress=100, result=final_result)
logger.info(f"Task {task_id} completed successfully")
except Exception as e:
logger.error(f"Task {task_id} failed critically: {e}")
await task_service.update_task(db, task_id, status="failed", error_message=str(e))
async def generate_orbits_task(task_id: int, body_ids: Optional[List[str]] = None):
"""
Background task to generate orbits
Args:
task_id: ID of the task record to update
body_ids: List of body IDs to generate. If None, generates for all bodies with orbital params.
"""
logger.info(f"🚀 Starting background orbit generation task {task_id}")
async with AsyncSessionLocal() as db:
try:
await task_service.update_task(
db, task_id, status="running", started_at=datetime.utcnow(), progress=0
)
bodies_to_process = []
if body_ids:
for bid in body_ids:
body = await celestial_body_service.get_body_by_id(bid, db)
if body:
bodies_to_process.append(body)
else:
bodies_to_process = await celestial_body_service.get_all_bodies(db)
valid_bodies = []
for body in bodies_to_process:
extra_data = body.extra_data or {}
if extra_data.get("orbit_period_days"):
valid_bodies.append(body)
elif body_ids and body.id in body_ids:
logger.warning(f"Body {body.name} ({body.id}) missing 'orbit_period_days', skipping.")
total_bodies = len(valid_bodies)
if total_bodies == 0:
await task_service.update_task(
db, task_id, status="completed", progress=100,
result={"message": "No bodies with 'orbit_period_days' found to process"}
)
return
success_count = 0
failure_count = 0
results = []
for i, body in enumerate(valid_bodies):
try:
progress = int((i / total_bodies) * 100)
await task_service.update_task(db, task_id, progress=progress)
extra_data = body.extra_data or {}
period = float(extra_data.get("orbit_period_days"))
color = extra_data.get("orbit_color", "#CCCCCC")
orbit = await orbit_service.generate_orbit(
body_id=body.id,
body_name=body.name_zh or body.name,
period_days=period,
color=color,
session=db,
horizons_service=horizons_service
)
results.append({
"body_id": body.id,
"body_name": body.name_zh or body.name,
"status": "success",
"num_points": orbit.num_points
})
success_count += 1
except Exception as e:
logger.error(f"Failed to generate orbit for {body.name}: {e}")
results.append({
"body_id": body.id,
"body_name": body.name_zh or body.name,
"status": "failed",
"error": str(e)
})
failure_count += 1
await task_service.update_task(
db,
task_id,
status="completed",
progress=100,
completed_at=datetime.utcnow(),
result={
"total": total_bodies,
"success": success_count,
"failed": failure_count,
"details": results
}
)
logger.info(f"🏁 Orbit generation task {task_id} completed")
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
await task_service.update_task(
db, task_id, status="failed", error_message=str(e), completed_at=datetime.utcnow()
)
async def fetch_celestial_events_task(task_id: int):
"""
Background task to fetch celestial events (Close Approaches) from NASA SBDB
"""
logger.info(f"🚀 Starting celestial event fetch task {task_id}")
url = "https://ssd-api.jpl.nasa.gov/cad.api"
# Fetch data for next 60 days, close approach < 0.05 AU (approx 7.5M km)
params = {
"dist-max": "0.05",
"date-min": datetime.utcnow().strftime("%Y-%m-%d"),
"date-max": (datetime.utcnow() + timedelta(days=60)).strftime("%Y-%m-%d"),
"body": "ALL"
}
async with AsyncSessionLocal() as db:
try:
await task_service.update_task(db, task_id, status="running", progress=10)
async with httpx.AsyncClient(timeout=30) as client:
logger.info(f"Querying NASA SBDB CAD API: {url}")
response = await client.get(url, params=params)
if response.status_code != 200:
raise Exception(f"NASA API returned {response.status_code}: {response.text}")
data = response.json()
count = int(data.get("count", 0))
fields = data.get("fields", [])
data_rows = data.get("data", [])
logger.info(f"Fetched {count} close approach events")
# Map fields to indices
try:
idx_des = fields.index("des")
idx_cd = fields.index("cd")
idx_dist = fields.index("dist")
idx_v_rel = fields.index("v_rel")
except ValueError as e:
raise Exception(f"Missing expected field in NASA response: {e}")
processed_count = 0
saved_count = 0
# Get all active bodies to match against
all_bodies = await celestial_body_service.get_all_bodies(db)
# Map name/designation to body_id.
# NASA 'des' (designation) might match our 'name' or 'id'
# Simple lookup: dictionary of name -> id
body_map = {b.name.lower(): b.id for b in all_bodies}
# Also map id -> id just in case
for b in all_bodies:
body_map[b.id.lower()] = b.id
for row in data_rows:
des = row[idx_des].strip()
date_str = row[idx_cd] # YYYY-MMM-DD HH:MM
dist = row[idx_dist]
v_rel = row[idx_v_rel]
# Try to find matching body
# NASA des often looks like "2024 XK" or "433" (Eros)
# We try exact match first
target_id = body_map.get(des.lower())
if target_id:
# Found a match! Create event.
# NASA date format: 2025-Dec-18 12:00
try:
event_time = datetime.strptime(date_str, "%Y-%b-%d %H:%M")
except ValueError:
# Fallback if format differs slightly
event_time = datetime.utcnow()
event_data = CelestialEventCreate(
body_id=target_id,
title=f"Close Approach: {des}",
event_type="approach",
event_time=event_time,
description=f"Close approach to Earth at distance {dist} AU with relative velocity {v_rel} km/s",
details={
"nominal_dist_au": float(dist),
"v_rel_kms": float(v_rel),
"designation": des
},
source="nasa_sbdb"
)
# Ideally check for duplicates here (e.g. by body_id + event_time)
# For now, just create
await event_service.create_event(event_data, db)
saved_count += 1
processed_count += 1
await task_service.update_task(
db, task_id, status="completed", progress=100,
result={
"fetched": count,
"processed": processed_count,
"saved": saved_count,
"message": f"Successfully fetched {count} events, saved {saved_count} matched events."
}
)
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
await task_service.update_task(db, task_id, status="failed", error_message=str(e))