1.0.5
parent
e720d656da
commit
c334711675
|
|
@ -42,4 +42,6 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
|
||||||
CMD curl -f http://localhost:8000/health || exit 1
|
CMD curl -f http://localhost:8000/health || exit 1
|
||||||
|
|
||||||
# Run the application
|
# Run the application
|
||||||
|
# Using 4 workers for better performance and availability
|
||||||
|
# Scheduler uses Redis lock to ensure only one instance runs
|
||||||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
|
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
|
||||||
|
|
|
||||||
|
|
@ -352,12 +352,9 @@ async def get_celestial_positions(
|
||||||
logger.info(f"Returning incomplete data from positions table ({len(all_bodies_positions)} bodies)")
|
logger.info(f"Returning incomplete data from positions table ({len(all_bodies_positions)} bodies)")
|
||||||
return CelestialDataResponse(bodies=all_bodies_positions)
|
return CelestialDataResponse(bodies=all_bodies_positions)
|
||||||
else:
|
else:
|
||||||
# Return empty or cached data
|
# Return empty data instead of error when auto download is disabled
|
||||||
logger.info("No cached data available and auto download is disabled")
|
logger.info("No cached data available and auto download is disabled. Returning empty data.")
|
||||||
raise HTTPException(
|
return CelestialDataResponse(bodies=[])
|
||||||
status_code=503,
|
|
||||||
detail="Position data not available. Auto download is disabled. Please use the admin panel to download data manually."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Query Horizons (no cache available) - fetch from database + Horizons API
|
# Query Horizons (no cache available) - fetch from database + Horizons API
|
||||||
logger.info(f"Auto download enabled. Fetching celestial data from Horizons: start={start_dt}, end={end_dt}, step={step}")
|
logger.info(f"Auto download enabled. Fetching celestial data from Horizons: start={start_dt}, end={end_dt}, step={step}")
|
||||||
|
|
|
||||||
|
|
@ -86,8 +86,36 @@ async def lifespan(app: FastAPI):
|
||||||
# Preheat caches (load from database to Redis)
|
# Preheat caches (load from database to Redis)
|
||||||
await preheat_all_caches()
|
await preheat_all_caches()
|
||||||
|
|
||||||
# Start Scheduler
|
# Start Scheduler (use Redis lock to ensure only one instance runs across multiple workers)
|
||||||
scheduler_service.start()
|
from app.services.cache import redis_cache
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
scheduler_lock_key = "scheduler:lock"
|
||||||
|
scheduler_lock_ttl = 30 # Lock expires after 30 seconds (renew periodically)
|
||||||
|
|
||||||
|
# Try to acquire scheduler lock
|
||||||
|
lock_acquired = await redis_cache.set_if_not_exists(
|
||||||
|
scheduler_lock_key,
|
||||||
|
"locked",
|
||||||
|
ttl=scheduler_lock_ttl
|
||||||
|
)
|
||||||
|
|
||||||
|
if lock_acquired:
|
||||||
|
scheduler_service.start()
|
||||||
|
logger.info("✓ Scheduler started in this worker (acquired lock)")
|
||||||
|
|
||||||
|
# Start background task to renew lock periodically
|
||||||
|
async def renew_scheduler_lock():
|
||||||
|
while scheduler_service.scheduler.running:
|
||||||
|
await asyncio.sleep(15) # Renew every 15 seconds
|
||||||
|
try:
|
||||||
|
await redis_cache.set(scheduler_lock_key, "locked", ttl=scheduler_lock_ttl)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to renew scheduler lock: {e}")
|
||||||
|
|
||||||
|
asyncio.create_task(renew_scheduler_lock())
|
||||||
|
else:
|
||||||
|
logger.info("⊘ Scheduler not started in this worker (another worker holds the lock)")
|
||||||
|
|
||||||
logger.info("✓ Application started successfully")
|
logger.info("✓ Application started successfully")
|
||||||
logger.info("=" * 60)
|
logger.info("=" * 60)
|
||||||
|
|
@ -98,8 +126,15 @@ async def lifespan(app: FastAPI):
|
||||||
logger.info("=" * 60)
|
logger.info("=" * 60)
|
||||||
logger.info("Shutting down Cosmo Backend API...")
|
logger.info("Shutting down Cosmo Backend API...")
|
||||||
|
|
||||||
# Stop Scheduler
|
# Stop Scheduler and release lock
|
||||||
scheduler_service.shutdown()
|
if scheduler_service.scheduler.running:
|
||||||
|
scheduler_service.shutdown()
|
||||||
|
# Release scheduler lock
|
||||||
|
try:
|
||||||
|
await redis_cache.delete(scheduler_lock_key)
|
||||||
|
logger.info("✓ Scheduler lock released")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to release scheduler lock: {e}")
|
||||||
|
|
||||||
# Disconnect Redis
|
# Disconnect Redis
|
||||||
await redis_cache.disconnect()
|
await redis_cache.disconnect()
|
||||||
|
|
|
||||||
|
|
@ -68,23 +68,58 @@ class RedisCache:
|
||||||
key: str,
|
key: str,
|
||||||
value: Any,
|
value: Any,
|
||||||
ttl_seconds: Optional[int] = None,
|
ttl_seconds: Optional[int] = None,
|
||||||
|
ttl: Optional[int] = None, # Alias for ttl_seconds
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Set value in Redis cache with optional TTL"""
|
"""Set value in Redis cache with optional TTL"""
|
||||||
if not self._connected or not self.client:
|
if not self._connected or not self.client:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Support both ttl_seconds and ttl parameter names
|
||||||
|
ttl_value = ttl_seconds or ttl
|
||||||
|
|
||||||
try:
|
try:
|
||||||
serialized = json.dumps(value, default=str)
|
serialized = json.dumps(value, default=str)
|
||||||
if ttl_seconds:
|
if ttl_value:
|
||||||
await self.client.setex(key, ttl_seconds, serialized)
|
await self.client.setex(key, ttl_value, serialized)
|
||||||
else:
|
else:
|
||||||
await self.client.set(key, serialized)
|
await self.client.set(key, serialized)
|
||||||
logger.debug(f"Redis cache SET: {key} (TTL: {ttl_seconds}s)")
|
logger.debug(f"Redis cache SET: {key} (TTL: {ttl_value}s)")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Redis set error for key '{key}': {e}")
|
logger.error(f"Redis set error for key '{key}': {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def set_if_not_exists(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
value: Any,
|
||||||
|
ttl: Optional[int] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Set value only if key does not exist (SETNX operation)
|
||||||
|
Returns True if the key was set, False if it already existed
|
||||||
|
"""
|
||||||
|
if not self._connected or not self.client:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
serialized = json.dumps(value, default=str)
|
||||||
|
if ttl:
|
||||||
|
# Use SET with NX and EX options (Redis 2.6.12+)
|
||||||
|
result = await self.client.set(key, serialized, nx=True, ex=ttl)
|
||||||
|
else:
|
||||||
|
result = await self.client.setnx(key, serialized)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
logger.debug(f"Redis cache SETNX SUCCESS: {key} (TTL: {ttl}s)")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.debug(f"Redis cache SETNX FAILED: {key} (already exists)")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Redis setnx error for key '{key}': {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
async def delete(self, key: str) -> bool:
|
async def delete(self, key: str) -> bool:
|
||||||
"""Delete key from Redis cache"""
|
"""Delete key from Redis cache"""
|
||||||
if not self._connected or not self.client:
|
if not self._connected or not self.client:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue