26 KiB
Cosmo 缓存架构设计文档
目录
概述
Cosmo 采用四层缓存架构来优化天体位置数据的加载性能:
- L1: Redis 缓存 - 跨进程共享,重启后端保留
- L2: 内存缓存 - 单进程最快,重启清空
- L3: PostgreSQL positions 表 - 永久存储位置数据
- L4: PostgreSQL nasa_cache 表 - 永久存储 NASA API 原始响应
这种设计确保了:
- ✅ 快速响应:多数请求在 <10ms 内从缓存返回
- ✅ 数据持久化:位置数据永久保存,不会丢失
- ✅ 减少 API 调用:避免频繁请求 NASA Horizons API(限流)
- ✅ 高可用性:任意缓存层失效时自动降级到下一层
缓存层次架构
缓存层详细对比
| 层级 | 存储位置 | TTL | 读取速度 | 持久化 | 适用场景 |
|---|---|---|---|---|---|
| L1: Redis | Redis 内存数据库 | 1h(当前)/ 7天(历史) | ~5ms | ❌ Redis 重启丢失 | 跨进程共享,服务器重启保留 |
| L2: 内存 | Python 进程内存 | 3天 | ~1ms | ❌ 进程重启丢失 | 单进程最快访问 |
| L3: positions | PostgreSQL 表 | 永久 | ~50ms | ✅ 永久保存 | 历史位置数据查询 |
| L4: nasa_cache | PostgreSQL 表 | 7天(软删除) | ~50ms | ✅ 永久保存 | NASA API 响应缓存 |
| 源数据 | NASA Horizons API | - | ~5000ms | - | 最终数据来源(慢) |
缓存 Key 设计
Redis Key 格式
positions:<start_time>:<end_time>:<step>
示例:
positions:now:now:1d- 当前位置(无时间参数)positions:2025-01-01T00:00:00+00:00:2025-01-02T00:00:00+00:00:1d- 历史数据
内存缓存 Key 格式
f"{start_str}_{end_str}_{step}"
首页场景:当前位置数据
用户操作流程
用户打开首页 → 加载当前时刻所有天体位置 → 渲染 3D 场景
API 请求
GET /api/celestial/positions?step=1d
缓存查询链路
graph TD
A[请求到达] --> B{L1: Redis 缓存检查}
B -->|命中| C1[返回数据 ✅]
B -->|未命中| D{L2: 内存缓存检查}
D -->|命中| E1[返回 + 写入 Redis ✅]
D -->|未命中| F{L3: positions 表}
F -->|找到最近24h数据| G1[返回 + 写入 L1+L2 ✅]
F -->|数据不完整| H{L1: Redis 缓存<br/>带时间参数}
H -->|命中| I1[返回 ✅]
H -->|未命中| J{L2: 内存缓存<br/>带时间参数}
J -->|命中| K1[返回 ✅]
J -->|未命中| L{L4: nasa_cache 表}
L -->|找到缓存响应| M1[返回 + 写入 L1+L2 ✅]
L -->|未命中| N{L3: positions 历史数据}
N -->|找到完整数据| O1[返回 + 写入 L1+L2 ✅]
N -->|未命中| P[查询 NASA Horizons API]
P --> Q[保存到 L3+L4<br/>写入 L1+L2]
Q --> R[返回数据 ✅]
style C1 fill:#90EE90
style E1 fill:#90EE90
style G1 fill:#90EE90
style I1 fill:#90EE90
style K1 fill:#90EE90
style M1 fill:#90EE90
style O1 fill:#90EE90
style R fill:#FFD700
style P fill:#FF6B6B
详细流程说明
步骤 1: L1 Redis 检查(routes.py:74-81)
start_str = "now"
end_str = "now"
redis_key = make_cache_key("positions", start_str, end_str, step)
redis_cached = await redis_cache.get(redis_key)
if redis_cached is not None:
logger.info("Cache hit (Redis) for recent positions")
return CelestialDataResponse(bodies=redis_cached)
性能: ~5ms 命中率: 80%(服务运行稳定后)
步骤 2: L2 内存缓存检查(routes.py:83-87)
cached_data = cache_service.get(start_dt, end_dt, step)
if cached_data is not None:
logger.info("Cache hit (Memory) for recent positions")
return CelestialDataResponse(bodies=cached_data)
性能: ~1ms 命中率: 10%(Redis 未命中但进程内有缓存)
步骤 3: L3 positions 表检查(routes.py:89-143)
查询最近 24 小时的位置数据:
now = datetime.utcnow()
recent_window = now - timedelta(hours=24)
for body in all_bodies:
recent_positions = await position_service.get_positions(
body_id=body.id,
start_time=recent_window,
end_time=now,
session=db
)
条件: 如果数据库中有所有天体的最近 24 小时数据 性能: ~100ms 命中率: 5%(首次启动后第二天访问)
写入逻辑:
# 写入 L2 内存
cache_service.set(bodies_data, start_dt, end_dt, step)
# 写入 L1 Redis
await redis_cache.set(redis_key, bodies_data, get_ttl_seconds("current_positions"))
步骤 4-7: 带时间参数的缓存检查(routes.py:148-246)
如果 L3 没有最近 24 小时数据,会尝试:
- 检查 Redis(带时间参数)
- 检查内存缓存(带时间参数)
- 检查 nasa_cache 表(NASA API 响应缓存)
- 检查 positions 表的历史数据
步骤 8: NASA Horizons API 查询(routes.py:248-339)
最后的后备方案,当所有缓存层都未命中时:
for body in all_bodies:
# 查询 NASA Horizons API
pos_data = horizons_service.get_body_positions(body.id, start_dt, end_dt, step)
性能: ~5000ms(20+ 个天体 × 200-300ms/天体) 限流: NASA API 有调用频率限制 命中率: <1%(仅首次启动或数据过期)
数据保存:
- 保存到 nasa_cache 表(7天 TTL)
- 保存到 positions 表(永久)
- 写入 Redis(1小时或7天 TTL)
- 写入内存缓存(3天 TTL)
首次启动 vs 再次启动对比
场景 A: 首次启动后端服务
用户请求
↓
L1 Redis ❌ 未命中
↓
L2 内存 ❌ 未命中
↓
L3 positions ❌ 未命中(空表)
↓
L4 nasa_cache ❌ 未命中(空表)
↓
🔴 查询 NASA API (~5秒)
↓
保存到所有层
↓
返回数据
总耗时: ~5秒
场景 B: 重启后端(Redis 未重启)
用户请求
↓
L1 Redis ✅ 命中(数据仍在)
↓
返回数据
总耗时: ~5ms
场景 C: 重启后端 + Redis(数据库有数据)
用户请求
↓
L1 Redis ❌ 未命中
↓
L2 内存 ❌ 未命中
↓
L3 positions ✅ 命中(有昨天的数据)
↓
返回 + 写入 L1+L2
总耗时: ~100ms
时间轴场景:历史位置数据
用户操作流程
用户点击"时间轴"按钮
↓
进入时间轴模式(默认显示 30 天前)
↓
拖动时间轴滑块
↓
每次拖动触发新的 API 请求
↓
加载该时刻的天体位置
↓
渲染 3D 场景
前端实现
1. 时间轴组件(TimelineController.tsx)
功能:
- 时间范围:过去 90 天到现在
- 播放速度:1x, 7x, 30x, 365x(天/秒)
- 交互:拖动滑块、播放/暂停、重置
关键代码:
// App.tsx:104
<TimelineController
onTimeChange={handleTimeChange}
minDate={new Date(Date.now() - 90 * 24 * 60 * 60 * 1000)} // 90 days ago
maxDate={new Date()}
/>
2. 数据加载钩子(useHistoricalData.ts)
每次时间变化触发新请求:
const loadHistoricalData = useCallback(async (date: Date) => {
const startDate = new Date(date);
const endDate = new Date(date);
endDate.setDate(endDate.getDate() + 1); // 1天范围
const data = await fetchCelestialPositions(
startDate.toISOString(),
endDate.toISOString(),
'1d'
);
setBodies(data.bodies);
}, []);
useEffect(() => {
if (selectedDate) {
loadHistoricalData(selectedDate);
}
}, [selectedDate, loadHistoricalData]);
API 请求示例
用户拖动到 2025-01-15:
GET /api/celestial/positions?start_time=2025-01-15T00:00:00Z&end_time=2025-01-16T00:00:00Z&step=1d
缓存查询链路
graph TD
A[时间轴请求<br/>带 start_time + end_time] --> B{L1: Redis 缓存}
B -->|命中| C1[返回数据 ✅<br/>~5ms]
B -->|未命中| D{L2: 内存缓存}
D -->|命中| E1[返回 + 写入 Redis ✅<br/>~1ms]
D -->|未命中| F{L4: nasa_cache 表}
F -->|找到缓存| G1[返回 + 写入 L1+L2 ✅<br/>~50ms]
F -->|未命中| H{L3: positions 表}
H -->|找到完整数据| I1[返回 + 写入 L1+L2 ✅<br/>~100ms]
H -->|数据不完整| J[查询 NASA API<br/>~5000ms]
J --> K[保存到 L3+L4+L1+L2]
K --> L[返回数据 ✅]
style C1 fill:#90EE90
style E1 fill:#90EE90
style G1 fill:#90EE90
style I1 fill:#90EE90
style L fill:#FFD700
style J fill:#FF6B6B
详细流程说明
步骤 1: L1 Redis 检查(routes.py:148-155)
start_str = start_dt.isoformat() if start_dt else "now"
end_str = end_dt.isoformat() if end_dt else "now"
redis_key = make_cache_key("positions", start_str, end_str, step)
redis_cached = await redis_cache.get(redis_key)
if redis_cached is not None:
logger.info("Cache hit (Redis) for positions")
return CelestialDataResponse(bodies=redis_cached)
Redis Key 示例:
positions:2025-01-15T00:00:00+00:00:2025-01-16T00:00:00+00:00:1d
TTL: 7天(历史数据) 命中率: 高(拖动时间轴时重复访问相同日期)
步骤 2: L2 内存缓存检查(routes.py:157-161)
cached_data = cache_service.get(start_dt, end_dt, step)
if cached_data is not None:
logger.info("Cache hit (Memory) for positions")
return CelestialDataResponse(bodies=cached_data)
步骤 3: L4 nasa_cache 表检查(routes.py:163-195)
for body in all_bodies:
cached_response = await nasa_cache_service.get_cached_response(
body.id, start_dt, end_dt, step, db
)
表结构:
CREATE TABLE nasa_cache (
body_id TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
step TEXT,
response_data JSONB,
created_at TIMESTAMP,
expires_at TIMESTAMP
);
TTL: 7天(软删除) 命中率: 中等(依赖于之前是否查询过该时间段)
步骤 4: L3 positions 表检查(routes.py:197-246)
for body in all_bodies:
positions = await position_service.get_positions(
body_id=body.id,
start_time=start_dt_naive,
end_time=end_dt_naive,
session=db
)
表结构:
CREATE TABLE positions (
id SERIAL PRIMARY KEY,
body_id TEXT,
time TIMESTAMP,
x DOUBLE PRECISION,
y DOUBLE PRECISION,
z DOUBLE PRECISION,
vx DOUBLE PRECISION,
vy DOUBLE PRECISION,
vz DOUBLE PRECISION,
source TEXT,
created_at TIMESTAMP
);
索引: (body_id, time) - 优化时间范围查询
步骤 5: NASA Horizons API 查询(routes.py:248-339)
当所有缓存层都未命中时,查询 NASA API:
for body in all_bodies:
pos_data = horizons_service.get_body_positions(body.id, start_dt, end_dt, step)
# 保存到 nasa_cache 表
await nasa_cache_service.save_response(
body_id=body_id,
start_time=start_dt,
end_time=end_dt,
step=step,
response_data={"positions": positions},
ttl_days=7,
session=db
)
# 保存到 positions 表
await position_service.save_positions(
body_id=body_id,
positions=position_records,
source="nasa_horizons",
session=db
)
时间轴性能问题
🚨 当前问题
场景: 用户快速拖动时间轴 结果: 每次拖动触发新的 API 请求
示例:
用户拖动 2025-01-01 → 2025-01-02 → 2025-01-03 → ...
每次触发请求:
- 2025-01-01: 查询 NASA API (~5秒)
- 2025-01-02: 查询 NASA API (~5秒)
- 2025-01-03: 查询 NASA API (~5秒)
问题:
- ⚠️ 性能差: 每次拖动等待 5 秒
- ⚠️ API 限流: 可能触发 NASA Horizons 限流
- ⚠️ 用户体验差: 卡顿,无法流畅播放
✅ 优化方案
方案 1: 预加载时间范围数据
实现: 打开时间轴时,一次性加载整个 90 天范围的数据
# 新增 API 端点
@router.get("/positions/range")
async def get_positions_range(
start_time: str,
end_time: str,
step: str = "1d"
):
"""一次性返回整个时间范围的数据"""
# 查询 90 天 × 20 天体 = 1800 个位置点
# 压缩后约 100KB JSON
优点:
- ✅ 用户体验流畅,无卡顿
- ✅ 减少 API 调用次数
缺点:
- ⚠️ 首次加载慢(~10-20秒)
- ⚠️ 内存占用较大(前端需要缓存 1800 个位置)
方案 2: 请求防抖 + 后台预取
实现:
- 防抖:拖动停止后 300ms 再请求
- 预取:后台预加载前后几天的数据
// 防抖
const debouncedLoadData = useMemo(
() => debounce(loadHistoricalData, 300),
[loadHistoricalData]
);
// 预取
const prefetchNearbyDates = async (date: Date) => {
for (let offset = -3; offset <= 3; offset++) {
const targetDate = new Date(date);
targetDate.setDate(targetDate.getDate() + offset);
// 后台预取
fetchCelestialPositions(targetDate, ...);
}
};
优点:
- ✅ 平衡性能和体验
- ✅ 减少不必要的请求
缺点:
- ⚠️ 实现复杂度较高
方案 3: 后台定时预热(推荐)
实现: 后端定时任务每天预加载过去 90 天的数据
# 定时任务(每天凌晨执行)
@scheduler.scheduled_job('cron', hour=2)
async def preheat_historical_data():
"""预热过去 90 天的数据"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
for body in all_bodies:
# 以 1 天为步长查询
horizons_service.get_body_positions(
body.id, start_date, end_date, "1d"
)
# 保存到数据库
优点:
- ✅ 用户首次访问即可快速加载
- ✅ 完全避免 API 调用
- ✅ 无需修改前端逻辑
缺点:
- ⚠️ 需要定时任务框架(APScheduler / Celery)
- ⚠️ 增加数据库存储(约 1800 条记录/天)
数据持久化策略
数据库表设计
1. celestial_bodies 表
CREATE TABLE celestial_bodies (
id TEXT PRIMARY KEY, -- JPL Horizons ID
name TEXT NOT NULL, -- 英文名
name_zh TEXT, -- 中文名
type TEXT NOT NULL, -- planet/probe/star/dwarf_planet
description TEXT,
extra_data JSONB, -- 额外数据(launch_date, status等)
created_at TIMESTAMP DEFAULT NOW()
);
数据来源: 代码硬编码(celestial.py:52-202)
2. positions 表(永久存储)
CREATE TABLE positions (
id SERIAL PRIMARY KEY,
body_id TEXT REFERENCES celestial_bodies(id),
time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
x DOUBLE PRECISION NOT NULL, -- AU
y DOUBLE PRECISION NOT NULL, -- AU
z DOUBLE PRECISION NOT NULL, -- AU
vx DOUBLE PRECISION, -- AU/day (可选)
vy DOUBLE PRECISION,
vz DOUBLE PRECISION,
source TEXT, -- 'nasa_horizons' / 'manual'
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(body_id, time)
);
CREATE INDEX idx_positions_body_time ON positions(body_id, time);
写入时机:
- 查询 NASA API 后立即保存(routes.py:313-320)
- 数据永久保留,不会自动删除
查询优化:
- 索引
(body_id, time)加速时间范围查询 - 分区表(可选):按月分区,优化大数据查询
3. nasa_cache 表(API 响应缓存)
CREATE TABLE nasa_cache (
id SERIAL PRIMARY KEY,
body_id TEXT REFERENCES celestial_bodies(id),
start_time TIMESTAMP,
end_time TIMESTAMP,
step TEXT,
response_data JSONB NOT NULL, -- 完整的 NASA API 响应
created_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP, -- TTL 过期时间
UNIQUE(body_id, start_time, end_time, step)
);
CREATE INDEX idx_nasa_cache_expires ON nasa_cache(expires_at);
写入时机:
- 查询 NASA API 后保存原始响应(routes.py:283-291)
TTL 策略:
- 默认 7 天
- 软删除:定时任务清理过期数据
清理任务:
@scheduler.scheduled_job('cron', hour=3)
async def cleanup_expired_cache():
"""清理过期的 NASA API 缓存"""
await db.execute(
"DELETE FROM nasa_cache WHERE expires_at < NOW()"
)
4. static_data 表(星座、星系等)
CREATE TABLE static_data (
id SERIAL PRIMARY KEY,
category TEXT NOT NULL, -- 'star', 'constellation', 'galaxy', 'nebula'
name TEXT NOT NULL,
name_zh TEXT,
data JSONB NOT NULL, -- 具体数据(坐标、亮度等)
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_static_category ON static_data(category);
5. resources 表(纹理、模型等资源)
CREATE TABLE resources (
id SERIAL PRIMARY KEY,
body_id TEXT REFERENCES celestial_bodies(id),
resource_type TEXT NOT NULL, -- 'texture', 'model', 'icon', 'thumbnail'
file_path TEXT NOT NULL,
file_size BIGINT,
mime_type TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_resources_body ON resources(body_id);
数据完整性保证
1. 外键约束
ALTER TABLE positions
ADD CONSTRAINT fk_positions_body
FOREIGN KEY (body_id) REFERENCES celestial_bodies(id)
ON DELETE CASCADE;
2. 唯一约束
-- 防止重复插入相同时刻的位置
UNIQUE(body_id, time)
3. 数据迁移
# 导出数据
pg_dump cosmo > backup.sql
# 导入数据
psql cosmo < backup.sql
性能优化建议
1. 启动时预热缓存 ⭐⭐⭐
目的: 避免首次访问查询 NASA API
实现:
# app/main.py
async def preheat_cache_on_startup():
"""启动时预热缓存"""
logger.info("Preheating cache...")
# 1. 从数据库加载最近 24 小时的数据到 Redis
async for db in get_db():
bodies = await celestial_body_service.get_all_bodies(db)
now = datetime.utcnow()
recent_window = now - timedelta(hours=24)
all_positions = []
for body in bodies:
positions = await position_service.get_positions(
body.id, recent_window, now, db
)
if positions:
all_positions.append({
"id": body.id,
"name": body.name,
"positions": [...],
})
# 写入 Redis
if all_positions:
redis_key = make_cache_key("positions", "now", "now", "1d")
await redis_cache.set(redis_key, all_positions, 3600)
logger.info(f"✓ Preheated cache with {len(all_positions)} bodies")
break
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await redis_cache.connect()
await preheat_cache_on_startup() # 新增
yield
# Shutdown
await redis_cache.disconnect()
效果:
- ✅ 首次访问从 5 秒降至 5ms
- ✅ 用户体验大幅提升
2. 定时更新位置数据 ⭐⭐⭐
目的: 保证数据库中始终有最新的 24 小时数据
实现:
# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('interval', hours=1)
async def update_current_positions():
"""每小时更新一次当前位置"""
logger.info("Running scheduled position update...")
async for db in get_db():
bodies = await celestial_body_service.get_all_bodies(db)
now = datetime.utcnow()
for body in bodies:
try:
# 查询 NASA API
positions = horizons_service.get_body_positions(
body.id, now, now, "1d"
)
# 保存到数据库
await position_service.save_positions(
body.id, positions, "nasa_horizons", db
)
except Exception as e:
logger.error(f"Failed to update {body.name}: {e}")
logger.info("✓ Position update completed")
break
# 启动调度器
scheduler.start()
配置:
# app/main.py
from app.scheduler import scheduler
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
scheduler.start()
yield
# Shutdown
scheduler.shutdown()
3. 历史数据预热(时间轴优化)⭐⭐
目的: 加速时间轴模式的数据加载
实现:
@scheduler.scheduled_job('cron', hour=2, minute=0)
async def preheat_historical_data():
"""每天凌晨预热过去 90 天的数据"""
logger.info("Preheating historical data...")
async for db in get_db():
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90)
bodies = await celestial_body_service.get_all_bodies(db)
for body in bodies:
try:
# 查询 90 天数据(step=1d,共 90 个点)
positions = horizons_service.get_body_positions(
body.id, start_date, end_date, "1d"
)
# 保存到数据库
await position_service.save_positions(
body.id, positions, "nasa_horizons", db
)
logger.info(f"✓ Preheated {body.name}: {len(positions)} points")
except Exception as e:
logger.error(f"Failed to preheat {body.name}: {e}")
break
效果:
- ✅ 时间轴拖动时从数据库读取(100ms)而非 API(5秒)
- ✅ 用户体验流畅
4. Redis 持久化配置 ⭐
目的: 防止 Redis 重启导致缓存丢失
配置 (redis.conf):
# RDB 快照
save 900 1 # 900秒内至少1个key变化,保存
save 300 10 # 300秒内至少10个key变化,保存
save 60 10000 # 60秒内至少10000个key变化,保存
# AOF 日志(可选,更安全但性能略低)
appendonly yes
appendfsync everysec
5. 数据库查询优化 ⭐⭐
索引优化
-- 复合索引加速时间范围查询
CREATE INDEX idx_positions_body_time ON positions(body_id, time DESC);
-- 覆盖索引(包含查询所需的所有列)
CREATE INDEX idx_positions_cover
ON positions(body_id, time, x, y, z)
WHERE source = 'nasa_horizons';
分区表(大数据量优化)
-- 按月分区
CREATE TABLE positions (
...
) PARTITION BY RANGE (time);
CREATE TABLE positions_2025_01 PARTITION OF positions
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE positions_2025_02 PARTITION OF positions
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
6. 前端缓存优化 ⭐
浏览器 LocalStorage 缓存:
// 缓存静态数据(星座、星系等)
const cachedData = localStorage.getItem('static_data');
if (cachedData && Date.now() - cachedData.timestamp < 7 * 86400000) {
return JSON.parse(cachedData.data);
}
Service Worker 缓存:
// 缓存 API 响应
self.addEventListener('fetch', (event) => {
if (event.request.url.includes('/api/celestial/positions')) {
event.respondWith(
caches.match(event.request).then((response) => {
return response || fetch(event.request);
})
);
}
});
监控和告警
1. 缓存命中率监控
实现:
from prometheus_client import Counter, Histogram
cache_hits = Counter('cache_hits_total', 'Cache hits', ['layer'])
cache_misses = Counter('cache_misses_total', 'Cache misses', ['layer'])
api_latency = Histogram('api_latency_seconds', 'API latency')
# 在缓存检查时记录
if redis_cached:
cache_hits.labels(layer='redis').inc()
else:
cache_misses.labels(layer='redis').inc()
指标:
- Redis 命中率 > 80%
- 内存命中率 > 10%
- 数据库命中率 > 5%
- API 调用次数 < 100/天
2. 性能监控
关键指标:
# API 响应时间
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
# 记录慢查询
if process_time > 1.0:
logger.warning(f"Slow request: {request.url} took {process_time}s")
return response
告警规则:
- 平均响应时间 > 500ms → 检查缓存失效
- P95 响应时间 > 2s → 检查数据库性能
- API 错误率 > 1% → 检查 NASA API 状态
3. 数据完整性监控
定时检查:
@scheduler.scheduled_job('cron', hour=6)
async def check_data_integrity():
"""检查数据完整性"""
async for db in get_db():
# 检查是否所有天体都有最近 24 小时的数据
bodies = await celestial_body_service.get_all_bodies(db)
now = datetime.utcnow()
recent_window = now - timedelta(hours=24)
missing_bodies = []
for body in bodies:
positions = await position_service.get_positions(
body.id, recent_window, now, db
)
if not positions:
missing_bodies.append(body.name)
if missing_bodies:
logger.error(f"Missing recent data for: {missing_bodies}")
# 发送告警邮件/Slack通知
break
总结
架构优势
✅ 多层缓存: Redis → 内存 → 数据库 → API ✅ 高性能: 80%+ 请求在 10ms 内完成 ✅ 高可用: 任意层失效自动降级 ✅ 数据持久化: 位置数据永久保存 ✅ 可扩展: 支持水平扩展(多 Redis 节点)
待优化项
⚠️ 启动时预热缓存 ⚠️ 定时更新位置数据 ⚠️ 历史数据预热(时间轴优化) ⚠️ Redis 持久化配置 ⚠️ 监控和告警系统
性能对比
| 场景 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 首次启动 | 5秒 | 5秒 | - |
| 再次启动(Redis 在) | 5秒 | 5ms | 1000x |
| 再次启动(Redis 重启) | 5秒 | 100ms | 50x |
| 时间轴拖动 | 5秒/次 | 5ms/次 | 1000x |
| 时间轴播放(90天) | 450秒 | 0.5秒 | 900x |
文档版本: v1.0 最后更新: 2025-11-29 维护者: Cosmo Team