290 lines
6.6 KiB
Markdown
290 lines
6.6 KiB
Markdown
# 数据库重复数据问题修复
|
||
|
||
## 问题描述
|
||
|
||
在测试过程中发现两个数据库表存在重复数据问题:
|
||
|
||
### 1. positions 表重复数据
|
||
```
|
||
120192 399 2025-11-29 05:00:00 0.387... 0.907... -5.638e-05 nasa_horizons 2025-11-29 05:24:23.173
|
||
120193 399 2025-11-29 05:00:00 0.387... 0.907... -5.638e-05 nasa_horizons 2025-11-29 05:24:23.175
|
||
```
|
||
|
||
**原因**: 同一个天体在同一时刻有多条位置记录。
|
||
|
||
### 2. nasa_cache 表重复键错误
|
||
```
|
||
duplicate key value violates unique constraint "nasa_cache_pkey"
|
||
Key (cache_key)=(136199:2025-11-29T05:00:00+00:00:2025-11-29T05:00:00+00:00:1h) already exists.
|
||
```
|
||
|
||
**原因**: 尝试插入已存在的缓存键。
|
||
|
||
---
|
||
|
||
## 根本原因
|
||
|
||
### 并发竞态条件
|
||
|
||
当多个请求同时查询相同的时间点时:
|
||
|
||
```
|
||
时间线:
|
||
T1: 请求 A 查询 body_id=399, time=2025-11-29 05:00:00
|
||
T2: 请求 B 查询 body_id=399, time=2025-11-29 05:00:00
|
||
T3: 请求 A 检查数据库 -> 未找到 -> 准备插入
|
||
T4: 请求 B 检查数据库 -> 未找到 -> 准备插入
|
||
T5: 请求 A 插入记录(成功)
|
||
T6: 请求 B 插入记录(冲突!)❌
|
||
```
|
||
|
||
### 原始代码问题
|
||
|
||
#### save_positions (旧版本)
|
||
```python
|
||
# ❌ 问题:直接添加,不检查是否存在
|
||
for pos_data in positions:
|
||
position = Position(...)
|
||
s.add(position) # 可能重复
|
||
await s.commit()
|
||
```
|
||
|
||
#### save_response (旧版本)
|
||
```python
|
||
# ❌ 问题:SELECT + INSERT 不是原子操作
|
||
cache = await s.execute(select(...)).scalar_one_or_none()
|
||
if not cache:
|
||
cache = NasaCache(...)
|
||
s.add(cache) # 可能在 SELECT 和 INSERT 之间被插入
|
||
await s.commit()
|
||
```
|
||
|
||
---
|
||
|
||
## 解决方案
|
||
|
||
使用 PostgreSQL 的 **UPSERT** 操作(`INSERT ... ON CONFLICT`),将检查和插入变为原子操作。
|
||
|
||
### 1. 修复 save_positions
|
||
|
||
**文件**: `backend/app/services/db_service.py`
|
||
|
||
```python
|
||
async def save_positions(...):
|
||
from sqlalchemy.dialects.postgresql import insert
|
||
|
||
for pos_data in positions:
|
||
# 使用 UPSERT
|
||
stmt = insert(Position).values(
|
||
body_id=body_id,
|
||
time=pos_data["time"],
|
||
x=pos_data["x"],
|
||
y=pos_data["y"],
|
||
z=pos_data["z"],
|
||
...
|
||
)
|
||
|
||
# 遇到冲突时更新
|
||
stmt = stmt.on_conflict_do_update(
|
||
index_elements=['body_id', 'time'], # 唯一约束
|
||
set_={
|
||
'x': pos_data["x"],
|
||
'y': pos_data["y"],
|
||
'z': pos_data["z"],
|
||
...
|
||
}
|
||
)
|
||
|
||
await s.execute(stmt)
|
||
```
|
||
|
||
**关键点**:
|
||
- ✅ `on_conflict_do_update` 原子操作
|
||
- ✅ 基于 `(body_id, time)` 唯一约束
|
||
- ✅ 冲突时更新而不是报错
|
||
|
||
---
|
||
|
||
### 2. 修复 save_response
|
||
|
||
**文件**: `backend/app/services/db_service.py`
|
||
|
||
```python
|
||
async def save_response(...):
|
||
from sqlalchemy.dialects.postgresql import insert
|
||
|
||
# 使用 UPSERT
|
||
stmt = insert(NasaCache).values(
|
||
cache_key=cache_key,
|
||
body_id=body_id,
|
||
start_time=start_naive,
|
||
end_time=end_naive,
|
||
step=step,
|
||
data=response_data,
|
||
expires_at=now_naive + timedelta(days=ttl_days)
|
||
)
|
||
|
||
# 遇到冲突时更新
|
||
stmt = stmt.on_conflict_do_update(
|
||
index_elements=['cache_key'], # 主键
|
||
set_={
|
||
'data': response_data,
|
||
'created_at': now_naive,
|
||
'expires_at': now_naive + timedelta(days=ttl_days)
|
||
}
|
||
).returning(NasaCache)
|
||
|
||
result = await s.execute(stmt)
|
||
cache = result.scalar_one()
|
||
```
|
||
|
||
**关键点**:
|
||
- ✅ `on_conflict_do_update` 原子操作
|
||
- ✅ 基于 `cache_key` 主键
|
||
- ✅ 冲突时更新数据和过期时间
|
||
|
||
---
|
||
|
||
## 数据库唯一约束验证
|
||
|
||
确保数据库表有正确的唯一约束:
|
||
|
||
### positions 表
|
||
|
||
```sql
|
||
-- 检查唯一约束
|
||
SELECT constraint_name, constraint_type
|
||
FROM information_schema.table_constraints
|
||
WHERE table_name = 'positions'
|
||
AND constraint_type = 'UNIQUE';
|
||
|
||
-- 如果没有,创建唯一约束
|
||
ALTER TABLE positions
|
||
ADD CONSTRAINT positions_body_time_unique
|
||
UNIQUE (body_id, time);
|
||
```
|
||
|
||
### nasa_cache 表
|
||
|
||
```sql
|
||
-- 检查主键
|
||
SELECT constraint_name, constraint_type
|
||
FROM information_schema.table_constraints
|
||
WHERE table_name = 'nasa_cache'
|
||
AND constraint_type = 'PRIMARY KEY';
|
||
|
||
-- cache_key 应该是主键,已有唯一约束
|
||
```
|
||
|
||
---
|
||
|
||
## 清理现有重复数据
|
||
|
||
执行 SQL 脚本清理重复数据:
|
||
|
||
```bash
|
||
psql -U postgres -d cosmo -f backend/scripts/cleanup_duplicates.sql
|
||
```
|
||
|
||
**脚本功能**:
|
||
1. 删除 positions 表中的重复记录(保留最新的)
|
||
2. 删除 nasa_cache 表中的重复记录(保留最新的)
|
||
3. 验证清理结果
|
||
|
||
---
|
||
|
||
## 验证修复效果
|
||
|
||
### 1. 重启后端服务
|
||
|
||
```bash
|
||
cd backend
|
||
python3 app/main.py
|
||
```
|
||
|
||
### 2. 测试并发请求
|
||
|
||
在两个终端同时执行相同的请求:
|
||
|
||
```bash
|
||
# 终端 1
|
||
curl "http://localhost:8000/api/celestial/positions?start_time=2025-11-29T12:00:00Z&end_time=2025-11-29T12:00:00Z&step=1h"
|
||
|
||
# 终端 2(同时执行)
|
||
curl "http://localhost:8000/api/celestial/positions?start_time=2025-11-29T12:00:00Z&end_time=2025-11-29T12:00:00Z&step=1h"
|
||
```
|
||
|
||
**预期结果**:
|
||
- ✅ 两个请求都成功返回
|
||
- ✅ 没有重复数据错误
|
||
- ✅ 数据库中只有一条记录
|
||
|
||
### 3. 验证数据库
|
||
|
||
```sql
|
||
-- 检查是否还有重复
|
||
SELECT body_id, time, COUNT(*)
|
||
FROM positions
|
||
GROUP BY body_id, time
|
||
HAVING COUNT(*) > 1;
|
||
-- 应返回 0 行
|
||
|
||
SELECT cache_key, COUNT(*)
|
||
FROM nasa_cache
|
||
GROUP BY cache_key
|
||
HAVING COUNT(*) > 1;
|
||
-- 应返回 0 行
|
||
```
|
||
|
||
---
|
||
|
||
## 性能优势
|
||
|
||
### UPSERT vs SELECT + INSERT
|
||
|
||
| 操作 | SELECT + INSERT | UPSERT |
|
||
|------|----------------|--------|
|
||
| 数据库往返次数 | 2 次(SELECT + INSERT) | 1 次 |
|
||
| 锁定时间 | 长(两个操作) | 短(单个操作) |
|
||
| 并发安全 | ❌ 不安全 | ✅ 安全 |
|
||
| 性能 | 慢 | 快 |
|
||
|
||
### 示例
|
||
|
||
假设 10 个并发请求:
|
||
|
||
**旧方法**:
|
||
- 10 个 SELECT(可能都返回 NULL)
|
||
- 10 个 INSERT 尝试(9 个失败)
|
||
- 总数据库操作:20 次
|
||
|
||
**新方法**:
|
||
- 10 个 UPSERT(1 个 INSERT,9 个 UPDATE)
|
||
- 总数据库操作:10 次
|
||
- 性能提升:**50%** ⚡
|
||
|
||
---
|
||
|
||
## 总结
|
||
|
||
### ✅ 已修复
|
||
|
||
1. **positions 表**: 使用 UPSERT 避免重复插入
|
||
2. **nasa_cache 表**: 使用 UPSERT 避免重复插入
|
||
3. **并发安全**: 原子操作避免竞态条件
|
||
4. **性能提升**: 减少数据库往返次数
|
||
|
||
### 🎯 后续建议
|
||
|
||
1. **定期清理**: 每天检查并清理潜在的重复数据
|
||
2. **监控告警**: 监控唯一约束冲突次数
|
||
3. **压力测试**: 测试高并发场景下的数据一致性
|
||
|
||
---
|
||
|
||
**文档版本**: v1.0
|
||
**最后更新**: 2025-11-29
|
||
**相关文件**:
|
||
- `backend/app/services/db_service.py` (修改)
|
||
- `backend/scripts/cleanup_duplicates.sql` (新增)
|