main
mula.liu 2025-12-27 17:38:20 +08:00
parent 488717ffa1
commit 3c16839fa5
10 changed files with 321 additions and 400 deletions

BIN
.DS_Store vendored

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

View File

@ -0,0 +1,177 @@
# 生产环境数据库升级指南
## 概述
此升级脚本包含以下变更:
1. 在 `celestial_bodies` 表增加 `short_name` 字段
2. 完整导入 `menus``role_menus`
3. 清空 `celestial_events` 表(将由定时任务重新生成)
4. 完整导入 `scheduled_jobs`
5. 导入/更新 `system_settings`
6. 保留 `user_follows` 表的现有数据
## 升级前准备
### 1. 备份数据库
```bash
# 在生产服务器上执行
pg_dump -U postgres -d cosmo_db > backup_$(date +%Y%m%d_%H%M%S).sql
```
### 2. 测试升级脚本(推荐)
```bash
# 在测试环境先运行
psql -U postgres -d cosmo_db_test < upgrade_production.sql
```
## 执行升级
### 方式1直接执行SQL文件
```bash
psql -U postgres -d cosmo_db < upgrade_production.sql
```
### 方式2通过Docker容器执行
```bash
docker cp upgrade_production.sql <container_name>:/tmp/
docker exec -it <container_name> psql -U postgres -d cosmo_db -f /tmp/upgrade_production.sql
```
### 方式3交互式执行推荐便于观察
```bash
psql -U postgres -d cosmo_db
\i upgrade_production.sql
```
## 升级后验证
脚本会自动输出验证信息,检查以下内容:
1. **celestial_bodies.short_name 字段**:应该存在
2. **menus 数量**:应该是 14 条
3. **role_menus 数量**:应该是 16 条
4. **scheduled_jobs 数量**:应该是 2 条
5. **system_settings 数量**:应该至少 3 条
### 手动验证命令
```sql
-- 检查 short_name 字段
\d celestial_bodies
-- 检查菜单数据
SELECT id, name, title, path FROM menus ORDER BY parent_id NULLS FIRST, sort_order;
-- 检查角色菜单关联
SELECT r.name as role, m.title as menu
FROM role_menus rm
JOIN roles r ON rm.role_id = r.id
JOIN menus m ON rm.menu_id = m.id
ORDER BY r.name, m.sort_order;
-- 检查定时任务
SELECT id, name, is_active, predefined_function FROM scheduled_jobs;
-- 检查系统设置
SELECT key, value, value_type FROM system_settings;
```
## 升级详情
### 1. celestial_bodies 表升级
- 增加 `short_name VARCHAR(50)` 字段
- 如果字段已存在,则跳过
### 2. menus 和 role_menus 导入
- **重要**:会清空现有菜单数据
- 导入 14 条菜单记录
- 导入 16 条角色-菜单关联记录
- 管理员可访问所有菜单
- 普通用户只能访问:个人资料、我的天体
### 3. celestial_events 清空
- 清空所有现有天体事件
- 数据会由定时任务 `calculate_planetary_events` 自动重新生成
### 4. scheduled_jobs 导入
导入2个定时任务
- **每日更新天体位置数据**(已禁用)
- Cron: `0 2 * * *`每天凌晨2点
- 可通过后台管理界面手动执行
- **获取主要天体事件**(已启用)
- Cron: `0 3 1 * *`每月1日凌晨3点
- 自动计算未来一年的天文事件
### 5. system_settings 导入
导入3个系统设置
- `view_mode`: solar默认视图模式
- `nasa_api_timeout`: 120NASA API超时时间
- `auto_download_positions`: False自动下载位置数据开关
使用 `ON CONFLICT` 策略,如果键已存在则更新值。
### 6. user_follows 保留
- **不会修改此表**
- 保留所有用户关注数据
## 回滚方案
如果升级失败,使用备份恢复:
```bash
# 方式1完整恢复
psql -U postgres -d cosmo_db < backup_YYYYMMDD_HHMMSS.sql
# 方式2选择性回滚
# 如果只是某些表有问题,可以只恢复特定表
pg_restore -U postgres -d cosmo_db -t menus -t role_menus backup.dump
```
## 注意事项
1. **事务安全**:整个脚本在一个事务中执行,失败会自动回滚
2. **外键约束**menus 表有自引用外键,脚本已处理
3. **数据清空**menus、role_menus、celestial_events、scheduled_jobs 会被清空
4. **用户数据**user_follows 不会被修改
5. **定时任务**:位置数据下载任务默认禁用,需要手动执行或启用
## 升级后操作
1. **重启应用服务**
```bash
# 重启后端服务
systemctl restart cosmo-backend
# 或 docker restart cosmo-backend
```
2. **手动执行位置数据下载**(如需要)
- 登录后台管理系统
- 进入"定时任务设置"
- 找到"每日更新天体位置数据"
- 点击"立即执行"
3. **验证前端功能**
- 登录系统
- 检查菜单是否正确显示
- 测试个人资料页面
- 测试我的天体页面
## 常见问题
### Q: 升级过程中断怎么办?
A: 由于使用了事务,中断会自动回滚。使用备份重新开始。
### Q: 如何只导入某个表?
A: 从脚本中复制对应表的部分,单独执行。
### Q: 线上已有自定义菜单怎么办?
A: 脚本会清空菜单,请在升级前导出自定义菜单,升级后手动添加。
### Q: 定时任务什么时候开始执行?
A: 天体事件任务会在下个月1日凌晨3点执行。位置数据任务需手动启用或执行。
## 联系支持
如遇问题,请检查:
1. 数据库日志
2. 应用程序日志
3. 脚本执行输出

View File

@ -0,0 +1,144 @@
-- ============================================================
-- Production Database Upgrade Script
-- ============================================================
-- This script upgrades the production database with the following changes:
-- 1. Add short_name to celestial_bodies
-- 2. Import menus and role_menus
-- 3. Import celestial_events
-- 4. Import scheduled_jobs
-- 5. Import system_settings
-- 6. Import user_follows
--
-- IMPORTANT: Run this script in a transaction and test on a backup first!
-- ============================================================
BEGIN;
-- ============================================================
-- 1. Add short_name column to celestial_bodies
-- ============================================================
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'celestial_bodies'
AND column_name = 'short_name'
) THEN
ALTER TABLE celestial_bodies ADD COLUMN short_name VARCHAR(50);
RAISE NOTICE 'Added short_name column to celestial_bodies';
ELSE
RAISE NOTICE 'short_name column already exists';
END IF;
END $$;
-- ============================================================
-- 2. Import menus and role_menus
-- ============================================================
-- Clear existing menus (will cascade to role_menus due to foreign key)
TRUNCATE TABLE menus CASCADE;
RAISE NOTICE 'Cleared existing menus and role_menus';
-- Disable triggers temporarily to handle circular foreign keys
ALTER TABLE menus DISABLE TRIGGER ALL;
-- Insert menus (parent menus first, then child menus)
INSERT INTO menus (id, parent_id, name, title, icon, path, component, sort_order, is_active, description, created_at, updated_at) VALUES
(1, NULL, 'dashboard', '控制台', 'dashboard', '/admin/dashboard', 'admin/Dashboard', 1, true, '系统控制台', '2025-11-28 18:07:11.767382', '2025-11-28 18:07:11.767382'),
(2, NULL, 'data_management', '数据管理', 'database', '', '', 2, true, '数据管理模块', '2025-11-28 18:07:11.767382', '2025-11-28 18:07:11.767382'),
(6, NULL, 'platform_management', '平台管理', 'settings', '', '', 3, true, '管理用户和系统参数', '2025-11-29 19:03:08.776597', '2025-11-29 19:03:08.776597'),
(14, NULL, 'user_profile', '个人资料', 'profile', '/user/profile', 'user/Profile', 1, true, '个人资料管理', '2025-12-18 16:26:11.778475', '2025-12-18 16:26:11.778475'),
(15, NULL, 'user_follow', '我的天体', 'star', '/user/follow', 'user/UserFollow', 2, true, '我关注的天体', '2025-12-18 16:27:48.688747', '2025-12-18 16:27:48.688747'),
(11, 2, 'star_systems', '恒星系统管理', 'StarOutlined', '/admin/star-systems', 'StarSystems', 1, true, '管理太阳系和系外恒星系统', '2025-12-06 02:35:21.137234', '2025-12-06 02:35:21.137234'),
(3, 2, 'celestial_bodies', '天体数据管理', NULL, '/admin/celestial-bodies', 'admin/CelestialBodies', 2, true, '查看和管理天体数据', '2025-11-28 18:07:11.767382', '2025-11-28 18:07:11.767382'),
(4, 2, 'static_data', '静态数据管理', NULL, '/admin/static-data', 'admin/StaticData', 2, true, '查看和管理静态数据(星座、星系等)', '2025-11-28 18:07:11.767382', '2025-11-28 18:07:11.767382'),
(5, 2, 'nasa_data', 'Horizon数据下载', NULL, '/admin/nasa-data', 'admin/NasaData', 3, true, '管理NASA Horizons数据下载', '2025-11-28 18:07:11.767382', '2025-11-28 18:07:11.767382'),
(13, 2, 'celestial_events', '天体事件', 'CalendarOutlined', '/admin/celestial-events', '', 4, true, '', '2025-12-15 03:20:39.798021', '2025-12-15 03:20:39.798021'),
(7, 6, 'user_management', '用户管理', NULL, '/admin/users', 'admin/Users', 1, true, '管理系统用户账号', '2025-11-29 19:03:08.776597', '2025-11-29 19:03:08.776597'),
(8, 6, 'platform_parameters_management', '平台参数管理', NULL, '/admin/settings', 'admin/Settings', 2, true, '管理系统通用配置参数', '2025-11-29 19:03:08.776597', '2025-11-29 19:03:08.776597'),
(12, 6, 'scheduled_jobs', '定时任务设置', 'ClockCircleOutlined', '/admin/scheduled-jobs', 'admin/ScheduledJobs', 5, true, '管理系统定时任务及脚本', '2025-12-10 17:42:38.031518', '2025-12-10 17:42:38.031518'),
(10, 6, 'system_tasks', '系统任务监控', 'schedule', '/admin/tasks', 'admin/Tasks', 30, true, '', '2025-11-30 16:04:59.572869', '2025-11-30 16:04:59.572869');
-- Re-enable triggers
ALTER TABLE menus ENABLE TRIGGER ALL;
RAISE NOTICE 'Imported menus data';
-- Reset sequence for menus
SELECT setval('menus_id_seq', (SELECT MAX(id) FROM menus));
-- Insert role_menus
INSERT INTO role_menus (role_id, menu_id) VALUES
-- Admin role (role_id = 1) has access to all menus
(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 10), (1, 11), (1, 12), (1, 13), (1, 14), (1, 15),
-- User role (role_id = 2) has access to user menus only
(2, 14), (2, 15);
RAISE NOTICE 'Imported role_menus data';
-- ============================================================
-- 3. Import celestial_events (will be truncated and re-imported)
-- ============================================================
TRUNCATE TABLE celestial_events;
RAISE NOTICE 'Cleared existing celestial_events (data will be regenerated by scheduled jobs)';
-- ============================================================
-- 4. Import scheduled_jobs
-- ============================================================
-- Clear existing scheduled_jobs
TRUNCATE TABLE scheduled_jobs CASCADE;
RAISE NOTICE 'Cleared existing scheduled_jobs';
-- Insert scheduled_jobs
INSERT INTO scheduled_jobs (id, name, cron_expression, python_code, is_active, last_run_at, last_run_status, next_run_at, description, created_at, updated_at, job_type, predefined_function, function_params) VALUES
(1, '每日更新天体位置数据', '0 2 * * *', NULL, false, NULL, NULL, NULL, '每天凌晨2点自动从NASA Horizons下载主要天体的位置数据', '2025-12-10 17:43:01.234567', '2025-12-10 17:43:01.234567', 'predefined', 'download_positions_task', '{"body_ids": ["10", "199", "299", "399", "301", "499", "599", "699", "799", "899"], "days_range": "3"}'),
(2, '获取主要天体的食、合、冲等事件', '0 3 1 * *', NULL, true, NULL, NULL, NULL, '每月1日凌晨3点计算未来一年的主要天文事件', '2025-12-10 17:43:01.234567', '2025-12-10 17:43:01.234567', 'predefined', 'calculate_planetary_events', '{"body_ids": ["199", "299", "499", "599", "699", "799", "899"], "days_ahead": "365", "clean_old_events": true, "threshold_degrees": "5", "calculate_close_approaches": true}');
-- Reset sequence
SELECT setval('scheduled_jobs_id_seq', (SELECT MAX(id) FROM scheduled_jobs));
RAISE NOTICE 'Imported scheduled_jobs data';
-- ============================================================
-- 5. Import system_settings
-- ============================================================
-- Use INSERT ... ON CONFLICT to avoid duplicates
INSERT INTO system_settings (key, value, value_type, category, label, description, is_public, created_at, updated_at) VALUES
('view_mode', 'solar', 'string', 'ui', '默认视图模式', '系统默认的3D场景视图模式solar或galaxy', true, NOW(), NOW()),
('nasa_api_timeout', '120', 'int', 'api', 'NASA API超时时间', 'NASA Horizons API请求超时时间', false, NOW(), NOW()),
('auto_download_positions', 'False', 'bool', 'system', '自动下载位置数据', '当位置数据不存在时是否自动从NASA Horizons下载', false, NOW(), NOW())
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
value_type = EXCLUDED.value_type,
category = EXCLUDED.category,
label = EXCLUDED.label,
description = EXCLUDED.description,
is_public = EXCLUDED.is_public,
updated_at = NOW();
RAISE NOTICE 'Imported/updated system_settings data';
-- ============================================================
-- 6. Import user_follows (keep existing data, don't truncate)
-- ============================================================
-- Note: user_follows should retain existing production data
-- This section is intentionally left empty to preserve user data
RAISE NOTICE 'Skipped user_follows import (preserving existing user data)';
-- ============================================================
-- Commit transaction
-- ============================================================
COMMIT;
-- ============================================================
-- Verification queries
-- ============================================================
\echo '============================================================'
\echo 'Upgrade completed successfully!'
\echo '============================================================'
\echo 'Verification:'
SELECT 'celestial_bodies.short_name exists:' as check,
EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='celestial_bodies' AND column_name='short_name') as result;
SELECT 'menus count:' as check, COUNT(*) as result FROM menus;
SELECT 'role_menus count:' as check, COUNT(*) as result FROM role_menus;
SELECT 'scheduled_jobs count:' as check, COUNT(*) as result FROM scheduled_jobs;
SELECT 'system_settings count:' as check, COUNT(*) as result FROM system_settings;
\echo '============================================================'

View File

@ -1,42 +0,0 @@
"""
Test NASA SBDB API body parameter format
"""
import asyncio
import httpx
async def test_body_param():
"""Test different body parameter formats"""
test_cases = [
("Earth (name)", "Earth"),
("399 (Horizons ID)", "399"),
("Mars (name)", "Mars"),
("499 (Mars Horizons ID)", "499"),
]
for name, body_value in test_cases:
params = {
"date-min": "2025-12-15",
"date-max": "2025-12-16",
"body": body_value,
"limit": "1"
}
try:
async with httpx.AsyncClient(timeout=10.0, proxies={}) as client:
response = await client.get(
"https://ssd-api.jpl.nasa.gov/cad.api",
params=params
)
if response.status_code == 200:
data = response.json()
count = data.get("count", 0)
print(f"{name:30} -> 返回 {count:3} 个结果 ✓")
else:
print(f"{name:30} -> HTTP {response.status_code}")
except Exception as e:
print(f"{name:30} -> 错误: {e}")
if __name__ == "__main__":
asyncio.run(test_body_param())

View File

@ -1,51 +0,0 @@
"""
Test NASA SBDB service directly
"""
import asyncio
from datetime import datetime, timedelta
from app.services.nasa_sbdb_service import nasa_sbdb_service
async def test_nasa_sbdb():
"""Test NASA SBDB API directly"""
# Calculate date range
date_min = datetime.utcnow().strftime("%Y-%m-%d")
date_max = (datetime.utcnow() + timedelta(days=365)).strftime("%Y-%m-%d")
print(f"Querying NASA SBDB for close approaches...")
print(f"Date range: {date_min} to {date_max}")
print(f"Max distance: 1.0 AU")
events = await nasa_sbdb_service.get_close_approaches(
date_min=date_min,
date_max=date_max,
dist_max="1.0",
body="Earth",
limit=10,
fullname=True
)
print(f"\nRetrieved {len(events)} events from NASA SBDB")
if events:
print("\nFirst 3 events:")
for i, event in enumerate(events[:3], 1):
print(f"\n{i}. {event.get('des', 'Unknown')}")
print(f" Full name: {event.get('fullname', 'N/A')}")
print(f" Date: {event.get('cd', 'N/A')}")
print(f" Distance: {event.get('dist', 'N/A')} AU")
print(f" Velocity: {event.get('v_rel', 'N/A')} km/s")
# Test parsing
parsed = nasa_sbdb_service.parse_event_to_celestial_event(event)
if parsed:
print(f" ✓ Parsed successfully")
print(f" Title: {parsed['title']}")
print(f" Body ID: {parsed['body_id']}")
else:
print(f" ✗ Failed to parse")
else:
print("No events found")
if __name__ == "__main__":
asyncio.run(test_nasa_sbdb())

View File

@ -1,307 +0,0 @@
"""
Test script for Phase 5 features
Tests social features (follows, channel messages) and event system
"""
import asyncio
import httpx
import json
from datetime import datetime
BASE_URL = "http://localhost:8000/api"
# Test user credentials (assuming these exist from previous tests)
TEST_USER = {
"username": "testuser",
"password": "testpass123"
}
async def get_auth_token():
"""Login and get JWT token"""
async with httpx.AsyncClient(timeout=30.0, proxies={}) as client:
# Try to register first (in case user doesn't exist)
register_response = await client.post(
f"{BASE_URL}/auth/register",
json={
"username": TEST_USER["username"],
"password": TEST_USER["password"],
"email": "test@example.com"
}
)
# If register fails (user exists), try to login
if register_response.status_code != 200:
response = await client.post(
f"{BASE_URL}/auth/login",
json={
"username": TEST_USER["username"],
"password": TEST_USER["password"]
}
)
else:
response = register_response
if response.status_code == 200:
data = response.json()
return data.get("access_token")
else:
print(f"Login failed: {response.status_code} - {response.text}")
return None
async def test_follow_operations(token):
"""Test user follow operations"""
print("\n=== Testing Follow Operations ===")
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=30.0, proxies={}) as client:
# Test: Follow a celestial body (Mars)
print("\n1. Following Mars (499)...")
response = await client.post(
f"{BASE_URL}/social/follow/499",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code in [200, 400]: # 400 if already following
print(f"Response: {response.json()}")
# Test: Get user's follows
print("\n2. Getting user follows...")
response = await client.get(
f"{BASE_URL}/social/follows",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
follows = response.json()
print(f"Following {len(follows)} bodies:")
for follow in follows[:5]: # Show first 5
print(f" - Body ID: {follow['body_id']}, Since: {follow['created_at']}")
# Test: Check if following Mars
print("\n3. Checking if following Mars...")
response = await client.get(
f"{BASE_URL}/social/follows/check/499",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print(f"Response: {response.json()}")
return response.status_code == 200
async def test_channel_messages(token):
"""Test channel message operations"""
print("\n=== Testing Channel Messages ===")
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=30.0, proxies={}) as client:
# Test: Post a message to Mars channel
print("\n1. Posting message to Mars channel...")
message_data = {
"content": f"Test message at {datetime.now().isoformat()}"
}
response = await client.post(
f"{BASE_URL}/social/channel/499/message",
headers=headers,
json=message_data
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print(f"Response: {response.json()}")
elif response.status_code == 403:
print("Error: User is not following this body (need to follow first)")
# Test: Get channel messages
print("\n2. Getting Mars channel messages...")
response = await client.get(
f"{BASE_URL}/social/channel/499/messages?limit=10",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
messages = response.json()
print(f"Found {len(messages)} messages:")
for msg in messages[-3:]: # Show last 3
print(f" - {msg['username']}: {msg['content'][:50]}...")
return response.status_code == 200
async def test_celestial_events(token):
"""Test celestial event operations"""
print("\n=== Testing Celestial Events ===")
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=30.0, proxies={}) as client:
# Test: Get upcoming events
print("\n1. Getting upcoming celestial events...")
response = await client.get(
f"{BASE_URL}/events?limit=10",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
events = response.json()
print(f"Found {len(events)} events:")
for event in events[:5]: # Show first 5
print(f" - {event['title']} at {event['event_time']}")
print(f" Type: {event['event_type']}, Source: {event['source']}")
# Test: Get events for a specific body
print("\n2. Getting events for Mars (499)...")
response = await client.get(
f"{BASE_URL}/events?body_id=499&limit=5",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
events = response.json()
print(f"Found {len(events)} events for Mars")
return response.status_code == 200
async def test_scheduled_tasks(token):
"""Test scheduled task functionality"""
print("\n=== Testing Scheduled Tasks ===")
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=120.0, proxies={}) as client:
# Test: Get available tasks
print("\n1. Getting available scheduled tasks...")
response = await client.get(
f"{BASE_URL}/scheduled-jobs/available-tasks",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
tasks = response.json()
print(f"Found {len(tasks)} available tasks")
# Find our Phase 5 task
phase5_task = None
for task in tasks:
if task['name'] == 'fetch_close_approach_events':
phase5_task = task
print(f"\nFound Phase 5 task: {task['name']}")
print(f" Description: {task['description']}")
print(f" Category: {task['category']}")
break
if phase5_task:
# Test: Create a scheduled job for this task
print("\n2. Creating a scheduled job for fetch_close_approach_events...")
job_data = {
"name": "Test Phase 5 Close Approach Events",
"job_type": "predefined",
"predefined_function": "fetch_close_approach_events",
"function_params": {
"days_ahead": 30,
"dist_max": "0.2",
"approach_body": "Earth",
"limit": 50,
"clean_old_events": False
},
"cron_expression": "0 0 * * *", # Daily at midnight
"description": "Test job for Phase 5",
"is_active": False # Don't activate for test
}
response = await client.post(
f"{BASE_URL}/scheduled-jobs",
headers=headers,
json=job_data
)
print(f"Status: {response.status_code}")
if response.status_code == 201:
job = response.json()
job_id = job['id']
print(f"Created job with ID: {job_id}")
# Test: Run the job immediately
print(f"\n3. Triggering job {job_id} to run now...")
print(" (This may take 30-60 seconds...)")
response = await client.post(
f"{BASE_URL}/scheduled-jobs/{job_id}/run",
headers=headers
)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print(f"Response: {response.json()}")
# Wait a bit and check job status
print("\n4. Waiting 60 seconds for job to complete...")
await asyncio.sleep(60)
# Get job status
response = await client.get(
f"{BASE_URL}/scheduled-jobs/{job_id}",
headers=headers
)
if response.status_code == 200:
job_status = response.json()
print(f"Job status: {job_status.get('last_run_status')}")
print(f"Last run at: {job_status.get('last_run_at')}")
# Check if events were created
response = await client.get(
f"{BASE_URL}/events?limit=10",
headers=headers
)
if response.status_code == 200:
events = response.json()
print(f"\nEvents in database: {len(events)}")
for event in events[:3]:
print(f" - {event['title']}")
# Clean up: delete the test job
await client.delete(
f"{BASE_URL}/scheduled-jobs/{job_id}",
headers=headers
)
print(f"\nCleaned up test job {job_id}")
return True
else:
print(f"Error triggering job: {response.text}")
else:
print(f"Error creating job: {response.text}")
return False
async def main():
"""Main test function"""
print("=" * 60)
print("Phase 5 Feature Testing")
print("=" * 60)
# Get authentication token
print("\nAuthenticating...")
token = await get_auth_token()
if not token:
print("ERROR: Failed to authenticate. Please ensure test user exists.")
print("You may need to create a test user first.")
return
print(f"✓ Authentication successful")
# Run tests
results = {
"follow_operations": await test_follow_operations(token),
"channel_messages": await test_channel_messages(token),
"celestial_events": await test_celestial_events(token),
"scheduled_tasks": await test_scheduled_tasks(token)
}
# Summary
print("\n" + "=" * 60)
print("Test Summary")
print("=" * 60)
for test_name, passed in results.items():
status = "✓ PASS" if passed else "✗ FAIL"
print(f"{status} - {test_name}")
total_passed = sum(results.values())
total_tests = len(results)
print(f"\nTotal: {total_passed}/{total_tests} tests passed")
if __name__ == "__main__":
asyncio.run(main())