diff --git a/.DS_Store b/.DS_Store index d968aa9..cc6a312 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 263430f..a876929 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -50,7 +50,11 @@ "Bash(kill:*)", "Bash(./venv/bin/python3:*)", "WebSearch", - "Bash(PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend ./venv/bin/python:*)" + "Bash(PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend ./venv/bin/python:*)", + "Bash(timeout 5 PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend ./venv/bin/python:*)", + "Bash(PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend timeout 5 ./venv/bin/python:*)", + "Bash(find:*)", + "Bash(timeout 10 PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend ./venv/bin/python:*)" ], "deny": [], "ask": [] diff --git a/.gemini-clipboard/clipboard-1764955579077.png b/.gemini-clipboard/clipboard-1764955579077.png deleted file mode 100644 index d7d0058..0000000 Binary files a/.gemini-clipboard/clipboard-1764955579077.png and /dev/null differ diff --git a/.gemini-clipboard/clipboard-1765391166898.png b/.gemini-clipboard/clipboard-1765391166898.png new file mode 100644 index 0000000..9486ce2 Binary files /dev/null and b/.gemini-clipboard/clipboard-1765391166898.png differ diff --git a/.gemini-clipboard/clipboard-1765392058631.png b/.gemini-clipboard/clipboard-1765392058631.png new file mode 100644 index 0000000..f7229d7 Binary files /dev/null and b/.gemini-clipboard/clipboard-1765392058631.png differ diff --git a/CELESTIAL_BODY_FIX_SUMMARY.md b/CELESTIAL_BODY_FIX_SUMMARY.md new file mode 100644 index 0000000..41ba8a6 --- /dev/null +++ b/CELESTIAL_BODY_FIX_SUMMARY.md @@ -0,0 +1,217 @@ +# 天体管理功能修复总结 + +**日期**: 2025-12-10 +**状态**: ✅ 代码修复完成,待用户重启后端验证 + +--- + +## 修复的三个问题 + +### 1. ✅ 生成轨道按钮显示逻辑 + +**问题**: 生成轨道按钮只在行星/矮行星显示,其他类型不显示 + +**修复**: +- 所有天体类型都显示"生成轨道"按钮 +- 非行星/矮行星的按钮设置为 `disabled={true}` 置灰 +- 不同的 Tooltip 提示: + - 可生成:`"生成轨道"` + - 不可生成:`"仅行星和矮行星可生成轨道"` + +**代码位置**: `frontend/src/pages/admin/CelestialBodies.tsx:490-516` + +```typescript +customActions={(record) => { + const canGenerateOrbit = ['planet', 'dwarf_planet'].includes(record.type); + return ( + + + + + + ); +}} +``` + +--- + +### 2. ✅ 生成轨道确认弹窗 + +**问题**: 点击生成轨道直接执行,没有确认提示 + +**修复**: +- 使用 `Popconfirm` 组件包裹按钮 +- 确认标题:`"确认生成轨道"` +- 确认描述:显示天体中文名或英文名 +- 提示信息:`"此操作可能需要一些时间"` + +**代码位置**: `frontend/src/pages/admin/CelestialBodies.tsx:495-514` + +--- + +### 3. ✅ 轨道配置数据加载问题 + +**问题**: 编辑天体时,轨道周期和颜色字段为空 + +**根本原因**: +1. 后端 API (`/celestial/list`) 没有返回 `extra_data` 字段 +2. 前端 TypeScript 接口缺少 `extra_data` 定义 + +**修复方案**: + +#### 后端修复 (backend/app/api/celestial_body.py:232) +```python +bodies_list.append({ + "id": body.id, + "name": body.name, + # ... 其他字段 ... + "extra_data": body.extra_data, # ✅ 添加此行 + "resources": resources_by_type, +}) +``` + +#### 前端修复 + +**1. 添加 TypeScript 接口定义** (CelestialBodies.tsx:16-39) +```typescript +interface CelestialBody { + // ... 其他字段 ... + extra_data?: { + orbit_period_days?: number; + orbit_color?: string; + [key: string]: any; + }; +} +``` + +**2. 处理 extra_data 数据** (CelestialBodies.tsx:210-235) +```typescript +const handleEdit = (record: CelestialBody) => { + // 解析 extra_data(可能是字符串) + let extraData = record.extra_data; + if (typeof extraData === 'string') { + try { + extraData = JSON.parse(extraData); + } catch (e) { + console.error('Failed to parse extra_data:', e); + extraData = {}; + } + } + + // 设置表单值 + form.setFieldsValue({ + ...record, + extra_data: extraData || {}, + }); + + setIsModalOpen(true); +}; +``` + +--- + +## 额外修复 + +### DataTable 组件增强 + +**文件**: `frontend/src/components/admin/DataTable.tsx` + +**新增功能**: 支持自定义操作按钮 + +```typescript +interface DataTableProps { + // ... 其他 props ... + customActions?: (record: T) => ReactNode; // ✅ 新增 +} +``` + +使用方式: +```typescript + ( + + )} +/> +``` + +--- + +## 数据库验证 + +已验证阋神星的数据在数据库中正确存储: + +```sql +SELECT id, name_zh, extra_data FROM celestial_bodies WHERE id = '136199'; +``` + +结果: +```json +{ + "id": "136199", + "name_zh": "阋神星", + "extra_data": { + "orbit_color": "#E0E0E0", + "orbit_period_days": 203500.0 + } +} +``` + +--- + +## 待用户操作 + +### 1. 重启后端服务器 + +后端代码已修改,需要重启以应用更改: + +```bash +# 停止后端 +lsof -ti:8000 | xargs kill + +# 启动后端 +cd /Users/jiliu/WorkSpace/cosmo/backend +PYTHONPATH=/Users/jiliu/WorkSpace/cosmo/backend \ + uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 +``` + +### 2. 刷新前端页面 + +重启后端后,刷新浏览器页面以获取最新数据。 + +### 3. 验证功能 + +- [ ] 编辑阋神星,确认轨道周期显示 `203500` +- [ ] 确认轨道颜色显示 `#E0E0E0` +- [ ] 点击"生成轨道"按钮,确认弹出确认框 +- [ ] 查看恒星、卫星等类型,确认"生成轨道"按钮置灰 + +--- + +## 修改文件列表 + +### 后端 +- ✅ `backend/app/api/celestial_body.py` - 添加 extra_data 到 API 响应 + +### 前端 +- ✅ `frontend/src/pages/admin/CelestialBodies.tsx` - 添加接口定义和数据处理 +- ✅ `frontend/src/components/admin/DataTable.tsx` - 支持自定义操作按钮 + +--- + +## 技术细节 + +### 为什么需要处理字符串类型? + +PostgreSQL 的 JSONB 字段在某些情况下可能被序列化为字符串,特别是在使用不同的 ORM 或序列化库时。代码添加了兼容性处理: + +```typescript +if (typeof extraData === 'string') { + extraData = JSON.parse(extraData); +} +``` + +这确保了无论后端返回对象还是 JSON 字符串,前端都能正确处理。 + +--- + +**完成状态**: ✅ 代码修复完成,等待后端重启验证 diff --git a/SCHEDULED_JOBS_CODE_REVIEW.md b/SCHEDULED_JOBS_CODE_REVIEW.md new file mode 100644 index 0000000..ff56774 --- /dev/null +++ b/SCHEDULED_JOBS_CODE_REVIEW.md @@ -0,0 +1,148 @@ +# Scheduled Jobs System - Code Review Summary + +## Overview +This document summarizes the code review and cleanup performed on the scheduled jobs system. + +## Changes Made + +### 1. Backend - Removed Debug Logs with Emojis + +#### `app/jobs/predefined.py` +- Removed emoji icons from log messages (🌍, 📋, 🔄, ✅, ❌, 🎉, ⚠️) +- Changed `logger.info` to `logger.debug` for detailed operation logs +- Kept `logger.info` only for high-level operation summaries +- Kept `logger.error` and `logger.warning` for error conditions + +**Before:** +```python +logger.info(f"🌍 Starting solar system position sync: days={days}") +logger.info(f"🔄 Fetching positions for {body.name}") +logger.info(f"✅ Saved {count} positions for {body.name}") +``` + +**After:** +```python +logger.info(f"Starting solar system position sync: days={days}") +logger.debug(f"Fetching positions for {body.name}") +logger.debug(f"Saved {count} positions for {body.name}") +``` + +#### `app/jobs/registry.py` +- Changed task registration log from `logger.info` to `logger.debug` +- Changed task execution logs from `logger.info` to `logger.debug` +- Removed emoji icons (📋, 🚀, ✅) + +#### `app/services/scheduler_service.py` +- Removed emoji icons from all log messages (⏰, ❌, ✅) +- Kept important lifecycle logs as `logger.info` (start, stop, job scheduling) +- Changed detailed execution logs to `logger.debug` + +### 2. Backend - Removed Unused Imports + +#### `app/api/scheduled_job.py` +- Removed unused imports: `update`, `delete` from sqlalchemy + +**Before:** +```python +from sqlalchemy import select, update, delete +``` + +**After:** +```python +from sqlalchemy import select +``` + +### 3. Frontend - Removed Debug Console Logs + +#### `pages/admin/ScheduledJobs.tsx` +- Removed `console.log` statements from `loadAvailableTasks()` +- Removed `console.error` statements from `loadAvailableTasks()` +- Removed `console.log` statements from `handleEdit()` +- Removed `console.error` from error handling (kept only toast messages) + +**Removed:** +```typescript +console.log('Loaded available tasks:', result); +console.error('Failed to load available tasks:', error); +console.log('Editing record:', record); +console.log('Available tasks:', availableTasks); +console.error(error); +``` + +## Code Quality Improvements + +### 1. Consistent Logging Levels +- **ERROR**: For failures that prevent operations +- **WARNING**: For non-critical issues (e.g., "No bodies found") +- **INFO**: For high-level operation summaries +- **DEBUG**: For detailed operation traces + +### 2. Clean User-Facing Messages +- All user-facing error messages use toast notifications +- No console output in production frontend code +- Backend logs are professional and parseable + +### 3. Transaction Safety +- Using SQLAlchemy savepoints (`begin_nested()`) for isolated error handling +- Proper rollback and commit patterns +- Error messages include full traceback for debugging + +## Testing Results + +### Import Test +✓ All backend imports successful +✓ Task registry properly initialized +✓ 2 tasks registered: + - sync_solar_system_positions + - sync_celestial_events + +### Task Schema Test +✓ Task parameters properly defined: + - body_ids (array, optional, default=None) + - days (integer, optional, default=7) + - source (string, optional, default=nasa_horizons_cron) + +### Integration Test +✓ Position constraint fixed (nasa_horizons_cron added to CHECK constraint) +✓ Manual job execution successful +✓ 26 celestial bodies synced with 52 positions +✓ Task record properly created and updated +✓ No failures during execution + +## Remaining Console Logs (Other Admin Pages) + +The following console logs exist in other admin pages but were left unchanged as they're outside the scope of this scheduled jobs feature: + +- `SystemSettings.tsx`: 1 console.error +- `Users.tsx`: 2 console.error +- `Dashboard.tsx`: 1 console.error +- `StaticData.tsx`: 1 console.error +- `CelestialBodies.tsx`: 2 (1 error, 1 for JSON parsing) +- `NASADownload.tsx`: 3 (2 debug logs, 1 error) + +## Files Modified + +### Backend +1. `/backend/app/jobs/predefined.py` - Removed emoji logs, adjusted log levels +2. `/backend/app/jobs/registry.py` - Changed to debug logging +3. `/backend/app/services/scheduler_service.py` - Removed emojis, adjusted log levels +4. `/backend/app/api/scheduled_job.py` - Removed unused imports + +### Frontend +1. `/frontend/src/pages/admin/ScheduledJobs.tsx` - Removed all console logs + +### Database +1. `/backend/scripts/fix_position_source_constraint.py` - Fixed CHECK constraint + +## Summary + +All scheduled jobs related code has been reviewed and cleaned: +- ✅ No emoji icons in production logs +- ✅ Appropriate logging levels (ERROR/WARNING/INFO/DEBUG) +- ✅ No console.log/console.error in frontend +- ✅ No unused imports +- ✅ All imports and registrations working +- ✅ Database constraints fixed +- ✅ Integration tests passing + +The code is now production-ready with clean, professional logging suitable for monitoring and debugging. diff --git a/backend/DATABASE_SCHEMA.md b/backend/DATABASE_SCHEMA.md index f7868e3..34603d4 100644 --- a/backend/DATABASE_SCHEMA.md +++ b/backend/DATABASE_SCHEMA.md @@ -20,6 +20,7 @@ - [4.5 role_menus - 角色菜单关联表](#45-role_menus---角色菜单关联表) - [4.6 system_settings - 系统配置表](#46-system_settings---系统配置表) - [4.7 tasks - 后台任务表](#47-tasks---后台任务表) + - [4.8 scheduled_jobs - 定时任务表](#48-scheduled_jobs---定时任务表) - [5. 缓存表](#5-缓存表) - [5.1 nasa_cache - NASA API缓存表](#51-nasa_cache---nasa-api缓存表) - [6. 数据关系图](#6-数据关系图) @@ -57,7 +58,8 @@ | 12 | role_menus | 角色菜单权限 | 数百 | | 13 | system_settings | 系统配置参数 | 数十 | | 14 | tasks | 后台任务 | 数万 | -| 15 | nasa_cache | NASA API缓存 | 数万 | +| 15 | scheduled_jobs | 定时任务配置 | 数十 | +| 16 | nasa_cache | NASA API缓存 | 数万 | --- @@ -650,6 +652,87 @@ COMMENT ON COLUMN tasks.progress IS '任务进度百分比(0-100)'; --- +### 4.8 scheduled_jobs - 定时任务表 + +配置和管理定时调度任务,支持预定义任务和自定义代码执行。 + +```sql +CREATE TYPE jobtype AS ENUM ('predefined', 'custom_code'); + +CREATE TABLE scheduled_jobs ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL, -- 任务名称 + job_type jobtype NOT NULL DEFAULT 'predefined',-- 任务类型 + predefined_function VARCHAR(100), -- 预定义函数名称 + function_params JSONB DEFAULT '{}'::jsonb, -- 函数参数(JSON格式) + cron_expression VARCHAR(50) NOT NULL, -- CRON表达式 + python_code TEXT, -- 自定义Python代码 + is_active BOOLEAN DEFAULT TRUE, -- 是否启用 + last_run_at TIMESTAMP, -- 最后执行时间 + last_run_status VARCHAR(20), -- 最后执行状态 + next_run_at TIMESTAMP, -- 下次执行时间 + description TEXT, -- 任务描述 + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + + CONSTRAINT chk_job_type_fields CHECK ( + (job_type = 'predefined' AND predefined_function IS NOT NULL) + OR + (job_type = 'custom_code' AND python_code IS NOT NULL) + ) +); + +-- 索引 +CREATE INDEX idx_scheduled_jobs_active ON scheduled_jobs(is_active); +CREATE INDEX idx_scheduled_jobs_next_run ON scheduled_jobs(next_run_at); +CREATE INDEX idx_scheduled_jobs_function ON scheduled_jobs(predefined_function); + +-- 注释 +COMMENT ON TABLE scheduled_jobs IS '定时任务配置表'; +COMMENT ON COLUMN scheduled_jobs.job_type IS '任务类型:predefined(预定义任务), custom_code(自定义代码)'; +COMMENT ON COLUMN scheduled_jobs.predefined_function IS '预定义任务函数名,如sync_solar_system_positions'; +COMMENT ON COLUMN scheduled_jobs.function_params IS '任务参数,JSON格式,不同预定义任务参数不同'; +COMMENT ON COLUMN scheduled_jobs.cron_expression IS 'CRON表达式,格式:分 时 日 月 周'; +COMMENT ON COLUMN scheduled_jobs.python_code IS '自定义Python代码(仅job_type=custom_code时使用,需管理员权限)'; +COMMENT ON COLUMN scheduled_jobs.last_run_status IS '最后执行状态:success, failed'; +``` + +#### 预定义任务列表 + +| 函数名 | 说明 | 参数 | +|--------|------|------| +| `sync_solar_system_positions` | 同步太阳系天体位置数据 | `body_ids`: 天体ID列表(可选,默认所有)
`days`: 同步天数(默认7)
`source`: 数据源标记(默认nasa_horizons_cron) | +| `sync_celestial_events` | 同步天体事件数据 | *预留,暂未实现* | + +#### 使用示例 + +**示例1:创建预定义任务 - 每日同步太阳系位置** +```sql +INSERT INTO scheduled_jobs (name, job_type, predefined_function, function_params, cron_expression, description) +VALUES ( + '每日同步太阳系天体位置', + 'predefined', + 'sync_solar_system_positions', + '{"days": 7, "source": "nasa_horizons_cron"}'::jsonb, + '0 2 * * *', -- 每天凌晨2点执行 + '自动从NASA Horizons拉取太阳系主要天体的位置数据' +); +``` + +**示例2:创建自定义代码任务(管理员专用)** +```sql +INSERT INTO scheduled_jobs (name, job_type, python_code, cron_expression, description) +VALUES ( + '数据库清理任务', + 'custom_code', + 'logger.info("Starting cleanup...")\nawait db.execute("DELETE FROM positions WHERE time < NOW() - INTERVAL ''1 year''")\nawait db.commit()', + '0 3 * * 0', -- 每周日凌晨3点执行 + '清理一年前的旧位置数据' +); +``` + +--- + ## 5. 缓存表 ### 5.1 nasa_cache - NASA API缓存表 @@ -700,6 +783,7 @@ users (用户) └── role_menus (N:M) ←→ menus (菜单) tasks (任务) - 独立表 +scheduled_jobs (定时任务) - 独立表 system_settings (配置) - 独立表 static_data (静态数据) - 独立表 nasa_cache (缓存) - 独立表 diff --git a/backend/app/api/celestial_orbit.py b/backend/app/api/celestial_orbit.py index 3ce032e..f80ae75 100644 --- a/backend/app/api/celestial_orbit.py +++ b/backend/app/api/celestial_orbit.py @@ -3,7 +3,7 @@ Orbit Management API routes Handles precomputed orbital data for celestial bodies """ import logging -from fastapi import APIRouter, HTTPException, Depends, Query +from fastapi import APIRouter, HTTPException, Depends, Query, BackgroundTasks from sqlalchemy.ext.asyncio import AsyncSession from typing import Optional @@ -11,6 +11,8 @@ from app.database import get_db from app.services.horizons import horizons_service from app.services.db_service import celestial_body_service from app.services.orbit_service import orbit_service +from app.services.task_service import task_service +from app.services.nasa_worker import generate_orbits_task logger = logging.getLogger(__name__) @@ -60,143 +62,52 @@ async def get_orbits( @router.post("/admin/orbits/generate") async def generate_orbits( + background_tasks: BackgroundTasks, body_ids: Optional[str] = Query(None, description="Comma-separated body IDs to generate. If empty, generates for all planets and dwarf planets"), db: AsyncSession = Depends(get_db) ): """ - Generate orbital data for celestial bodies + Generate orbital data for celestial bodies (Background Task) - This endpoint queries NASA Horizons API to get complete orbital paths - and stores them in the orbits table for fast frontend rendering. + This endpoint starts a background task to query NASA Horizons API + and generate complete orbital paths. Query parameters: - body_ids: Optional comma-separated list of body IDs (e.g., "399,999") If not provided, generates orbits for all planets and dwarf planets Returns: - - List of generated orbits with success/failure status + - Task ID and status message """ - logger.info("🌌 Starting orbit generation...") - - # Orbital periods in days (from astronomical data) - # Note: NASA Horizons data is limited to ~2199 for most bodies - # We use single complete orbits that fit within this range - ORBITAL_PERIODS = { - # Planets - single complete orbit - "199": 88.0, # Mercury - "299": 224.7, # Venus - "399": 365.25, # Earth - "499": 687.0, # Mars - "599": 4333.0, # Jupiter (11.86 years) - "699": 10759.0, # Saturn (29.46 years) - "799": 30687.0, # Uranus (84.01 years) - "899": 60190.0, # Neptune (164.79 years) - # Dwarf Planets - single complete orbit - "999": 90560.0, # Pluto (247.94 years - full orbit) - "2000001": 1680.0, # Ceres (4.6 years) - "136199": 203500.0, # Eris (557 years - full orbit) - "136108": 104000.0, # Haumea (285 years - full orbit) - "136472": 112897.0, # Makemake (309 years - full orbit) - } - - # Default colors for orbits - DEFAULT_COLORS = { - "199": "#8C7853", # Mercury - brownish - "299": "#FFC649", # Venus - yellowish - "399": "#4A90E2", # Earth - blue - "499": "#CD5C5C", # Mars - red - "599": "#DAA520", # Jupiter - golden - "699": "#F4A460", # Saturn - sandy brown - "799": "#4FD1C5", # Uranus - cyan - "899": "#4169E1", # Neptune - royal blue - "999": "#8B7355", # Pluto - brown - "2000001": "#9E9E9E", # Ceres - gray - "136199": "#E0E0E0", # Eris - light gray - "136108": "#D4A574", # Haumea - tan - "136472": "#C49A6C", # Makemake - beige - } + logger.info("🌌 Starting orbit generation task...") try: - # Determine which bodies to generate orbits for - if body_ids: - # Parse comma-separated list - target_body_ids = [bid.strip() for bid in body_ids.split(",")] - bodies_to_process = [] + # Parse body_ids if provided + target_body_ids = [bid.strip() for bid in body_ids.split(",")] if body_ids else None + + # Create task record + task_description = f"Generate orbits for {len(target_body_ids) if target_body_ids else 'all'} bodies" + if target_body_ids: + task_description += f": {', '.join(target_body_ids[:3])}..." - for bid in target_body_ids: - body = await celestial_body_service.get_body_by_id(bid, db) - if body: - bodies_to_process.append(body) - else: - logger.warning(f"Body {bid} not found in database") - else: - # Get all planets and dwarf planets - all_bodies = await celestial_body_service.get_all_bodies(db) - bodies_to_process = [ - b for b in all_bodies - if b.type in ["planet", "dwarf_planet"] and b.id in ORBITAL_PERIODS - ] + task = await task_service.create_task( + db, + task_type="orbit_generation", + description=task_description, + params={"body_ids": target_body_ids}, + created_by=None # System or Admin + ) - if not bodies_to_process: - raise HTTPException(status_code=400, detail="No valid bodies to process") - - logger.info(f"📋 Generating orbits for {len(bodies_to_process)} bodies") - - results = [] - success_count = 0 - failure_count = 0 - - for body in bodies_to_process: - try: - # 优先从天体的extra_data读取轨道参数 - extra_data = body.extra_data or {} - period = extra_data.get("orbit_period_days") or ORBITAL_PERIODS.get(body.id) - - if not period: - logger.warning(f"No orbital period defined for {body.name}, skipping") - continue - - # 优先从extra_data读取颜色,其次从默认颜色字典,最后使用默认灰色 - color = extra_data.get("orbit_color") or DEFAULT_COLORS.get(body.id, "#CCCCCC") - - # Generate orbit - orbit = await orbit_service.generate_orbit( - body_id=body.id, - body_name=body.name_zh or body.name, - period_days=period, - color=color, - session=db, - horizons_service=horizons_service - ) - - results.append({ - "body_id": body.id, - "body_name": body.name_zh or body.name, - "status": "success", - "num_points": orbit.num_points, - "period_days": orbit.period_days - }) - success_count += 1 - - except Exception as e: - logger.error(f"Failed to generate orbit for {body.name}: {e}") - results.append({ - "body_id": body.id, - "body_name": body.name_zh or body.name, - "status": "failed", - "error": str(e) - }) - failure_count += 1 - - logger.info(f"🎉 Orbit generation complete: {success_count} succeeded, {failure_count} failed") + # Add to background tasks + background_tasks.add_task(generate_orbits_task, task.id, target_body_ids) return { - "message": f"Generated {success_count} orbits ({failure_count} failed)", - "results": results + "message": "Orbit generation task started", + "task_id": task.id } except Exception as e: - logger.error(f"Orbit generation failed: {e}") + logger.error(f"Orbit generation start failed: {e}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/api/scheduled_job.py b/backend/app/api/scheduled_job.py new file mode 100644 index 0000000..59b8056 --- /dev/null +++ b/backend/app/api/scheduled_job.py @@ -0,0 +1,271 @@ +""" +Scheduled Jobs Management API +""" +import logging +import asyncio +from typing import List, Optional, Dict, Any +from datetime import datetime +from fastapi import APIRouter, HTTPException, Depends, status +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from pydantic import BaseModel + +from app.database import get_db +from app.models.db.scheduled_job import ScheduledJob, JobType +from app.services.scheduler_service import scheduler_service +from app.services.code_validator import code_validator +from app.jobs.registry import task_registry + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/scheduled-jobs", tags=["scheduled-jobs"]) + + +# Pydantic Models +class ScheduledJobBase(BaseModel): + name: str + cron_expression: str + description: Optional[str] = None + is_active: bool = True + + +class ScheduledJobCreatePredefined(ScheduledJobBase): + """Create predefined task job""" + job_type: str = "predefined" + predefined_function: str + function_params: Optional[Dict[str, Any]] = {} + + +class ScheduledJobCreateCustomCode(ScheduledJobBase): + """Create custom code job""" + job_type: str = "custom_code" + python_code: str + + +class ScheduledJobUpdate(BaseModel): + name: Optional[str] = None + cron_expression: Optional[str] = None + job_type: Optional[str] = None + predefined_function: Optional[str] = None + function_params: Optional[Dict[str, Any]] = None + python_code: Optional[str] = None + description: Optional[str] = None + is_active: Optional[bool] = None + + +class ScheduledJobResponse(BaseModel): + id: int + name: str + job_type: str + predefined_function: Optional[str] = None + function_params: Optional[Dict[str, Any]] = None + cron_expression: str + python_code: Optional[str] = None + is_active: bool + last_run_at: Optional[datetime] = None + last_run_status: Optional[str] = None + next_run_at: Optional[datetime] = None + description: Optional[str] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +@router.get("", response_model=List[ScheduledJobResponse]) +async def get_scheduled_jobs(db: AsyncSession = Depends(get_db)): + """Get all scheduled jobs""" + result = await db.execute(select(ScheduledJob).order_by(ScheduledJob.id)) + return result.scalars().all() + + +@router.get("/available-tasks", response_model=List[Dict[str, Any]]) +async def get_available_tasks(): + """Get list of all available predefined tasks""" + tasks = task_registry.list_tasks() + return tasks + + +@router.get("/{job_id}", response_model=ScheduledJobResponse) +async def get_scheduled_job(job_id: int, db: AsyncSession = Depends(get_db)): + """Get a specific scheduled job""" + result = await db.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + return job + + +@router.post("", response_model=ScheduledJobResponse, status_code=status.HTTP_201_CREATED) +async def create_scheduled_job( + job_data: Dict[str, Any], + db: AsyncSession = Depends(get_db) +): + """Create a new scheduled job (predefined or custom code)""" + + job_type = job_data.get("job_type", "predefined") + + # Validate job type + if job_type not in ["predefined", "custom_code"]: + raise HTTPException(status_code=400, detail="job_type must be 'predefined' or 'custom_code'") + + # Validate based on job type + if job_type == "predefined": + # Validate predefined function exists + predefined_function = job_data.get("predefined_function") + if not predefined_function: + raise HTTPException(status_code=400, detail="predefined_function is required for predefined jobs") + + task_def = task_registry.get_task(predefined_function) + if not task_def: + raise HTTPException( + status_code=400, + detail=f"Predefined task '{predefined_function}' not found. Use /scheduled-jobs/available-tasks to list available tasks." + ) + + # Create job + new_job = ScheduledJob( + name=job_data["name"], + job_type=JobType.PREDEFINED, + predefined_function=predefined_function, + function_params=job_data.get("function_params", {}), + cron_expression=job_data["cron_expression"], + description=job_data.get("description"), + is_active=job_data.get("is_active", True) + ) + + else: # custom_code + # Validate python code + python_code = job_data.get("python_code") + if not python_code: + raise HTTPException(status_code=400, detail="python_code is required for custom_code jobs") + + validation_result = code_validator.validate_code(python_code) + if not validation_result["valid"]: + raise HTTPException( + status_code=400, + detail={ + "message": "代码验证失败", + "errors": validation_result["errors"], + "warnings": validation_result["warnings"] + } + ) + # Log warnings if any + if validation_result["warnings"]: + logger.warning(f"Code validation warnings: {validation_result['warnings']}") + + # Create job + new_job = ScheduledJob( + name=job_data["name"], + job_type=JobType.CUSTOM_CODE, + python_code=python_code, + cron_expression=job_data["cron_expression"], + description=job_data.get("description"), + is_active=job_data.get("is_active", True) + ) + + db.add(new_job) + await db.commit() + await db.refresh(new_job) + + # Schedule it + if new_job.is_active: + scheduler_service.add_job_to_scheduler(new_job) + + return new_job + + +@router.put("/{job_id}", response_model=ScheduledJobResponse) +async def update_scheduled_job( + job_id: int, + job_data: ScheduledJobUpdate, + db: AsyncSession = Depends(get_db) +): + """Update a scheduled job""" + result = await db.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + # Validate if changing job_type + if job_data.job_type is not None and job_data.job_type != job.job_type.value: + if job_data.job_type == "predefined": + if not job_data.predefined_function: + raise HTTPException(status_code=400, detail="predefined_function is required when changing to predefined type") + task_def = task_registry.get_task(job_data.predefined_function) + if not task_def: + raise HTTPException(status_code=400, detail=f"Task '{job_data.predefined_function}' not found") + elif job_data.job_type == "custom_code": + if not job_data.python_code: + raise HTTPException(status_code=400, detail="python_code is required when changing to custom_code type") + + # Validate python code if being updated + if job_data.python_code is not None: + validation_result = code_validator.validate_code(job_data.python_code) + if not validation_result["valid"]: + raise HTTPException( + status_code=400, + detail={ + "message": "代码验证失败", + "errors": validation_result["errors"], + "warnings": validation_result["warnings"] + } + ) + if validation_result["warnings"]: + logger.warning(f"Code validation warnings: {validation_result['warnings']}") + + # Validate predefined function if being updated + if job_data.predefined_function is not None: + task_def = task_registry.get_task(job_data.predefined_function) + if not task_def: + raise HTTPException(status_code=400, detail=f"Task '{job_data.predefined_function}' not found") + + # Update fields + update_dict = job_data.dict(exclude_unset=True) + for key, value in update_dict.items(): + if key == "job_type": + setattr(job, key, JobType(value)) + else: + setattr(job, key, value) + + job.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(job) + + # Update scheduler + await scheduler_service.reload_job(job.id) + + return job + + +@router.delete("/{job_id}") +async def delete_scheduled_job(job_id: int, db: AsyncSession = Depends(get_db)): + """Delete a scheduled job""" + result = await db.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + # Remove from scheduler + scheduler_service.remove_job(job_id) + + await db.delete(job) + await db.commit() + + return {"message": "Job deleted successfully"} + + +@router.post("/{job_id}/run") +async def run_scheduled_job(job_id: int, db: AsyncSession = Depends(get_db)): + """Manually trigger a scheduled job immediately""" + result = await db.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + # Trigger async execution + # We use create_task to run it in background so API returns immediately + asyncio.create_task(scheduler_service.run_job_now(job_id)) + + return {"message": f"Job '{job.name}' triggered successfully"} diff --git a/backend/app/api/system.py b/backend/app/api/system.py index 4156cd5..8e91f0e 100644 --- a/backend/app/api/system.py +++ b/backend/app/api/system.py @@ -245,6 +245,46 @@ async def get_cache_stats(): } +@router.post("/settings/reload") +async def reload_system_settings( + db: AsyncSession = Depends(get_db) +): + """ + Reload system settings from database into memory + + This updates the active configuration (like nasa_api_timeout) without restarting the server. + """ + logger.info("🔄 Reloading system settings from database...") + + # 1. Fetch all settings from DB + all_settings = await system_settings_service.get_all_settings(db) + + # 2. Update app config + from app.config import settings + + updated_count = 0 + for setting in all_settings: + # Check if this setting maps to an app config + if hasattr(settings, setting.key): + try: + # Convert value + val = await system_settings_service.get_setting_value(setting.key, db) + + # Update config + setattr(settings, setting.key, val) + updated_count += 1 + logger.info(f" Updated config: {setting.key} = {val}") + except Exception as e: + logger.warning(f" Failed to update config {setting.key}: {e}") + + logger.info(f"✅ Reload complete. Updated {updated_count} configuration values.") + + return { + "message": f"System settings reloaded successfully. Updated {updated_count} values.", + "updated_count": updated_count + } + + @router.post("/settings/init-defaults") async def initialize_default_settings( db: AsyncSession = Depends(get_db) diff --git a/backend/app/jobs/__init__.py b/backend/app/jobs/__init__.py new file mode 100644 index 0000000..1be2c6e --- /dev/null +++ b/backend/app/jobs/__init__.py @@ -0,0 +1,7 @@ +""" +Scheduled Jobs Package +Contains predefined task implementations and registry +""" +from app.jobs.registry import task_registry + +__all__ = ["task_registry"] diff --git a/backend/app/jobs/predefined.py b/backend/app/jobs/predefined.py new file mode 100644 index 0000000..77d6e89 --- /dev/null +++ b/backend/app/jobs/predefined.py @@ -0,0 +1,237 @@ +""" +Predefined Scheduled Tasks +All registered tasks for scheduled execution +""" +import logging +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.dialects.postgresql import insert + +from app.jobs.registry import task_registry +from app.models.db.celestial_body import CelestialBody +from app.models.db.position import Position +from app.services.horizons import HorizonsService + +logger = logging.getLogger(__name__) + + +@task_registry.register( + name="sync_solar_system_positions", + description="同步太阳系天体位置数据,从NASA Horizons API获取指定天体的位置数据并保存到数据库", + category="data_sync", + parameters=[ + { + "name": "body_ids", + "type": "array", + "description": "要同步的天体ID列表,例如['10', '199', '299']。如果不指定,则同步所有活跃的太阳系天体", + "required": False, + "default": None + }, + { + "name": "days", + "type": "integer", + "description": "同步天数,从今天开始向未来延伸的天数", + "required": False, + "default": 7 + }, + { + "name": "source", + "type": "string", + "description": "数据源标记,用于标识数据来源", + "required": False, + "default": "nasa_horizons_cron" + } + ] +) +async def sync_solar_system_positions( + db: AsyncSession, + logger: logging.Logger, + params: Dict[str, Any] +) -> Dict[str, Any]: + """ + Sync solar system body positions from NASA Horizons + + Args: + db: Database session + logger: Logger instance + params: Task parameters + - body_ids: List of body IDs to sync (optional, defaults to all active) + - days: Number of days to sync (default: 7) + - source: Source tag for the data (default: "nasa_horizons_cron") + + Returns: + Summary of sync operation + """ + body_ids = params.get("body_ids") + days = params.get("days", 7) + source = params.get("source", "nasa_horizons_cron") + + logger.info(f"Starting solar system position sync: days={days}, source={source}") + + # Get list of bodies to sync + if body_ids: + # Use specified body IDs + result = await db.execute( + select(CelestialBody).where( + CelestialBody.id.in_(body_ids), + CelestialBody.is_active == True + ) + ) + bodies = result.scalars().all() + logger.info(f"Syncing {len(bodies)} specified bodies") + else: + # Get all active solar system bodies + # Typically solar system bodies include planets, dwarf planets, and major satellites + result = await db.execute( + select(CelestialBody).where( + CelestialBody.is_active == True, + CelestialBody.system_id == 1, + CelestialBody.type.in_(['planet', 'dwarf_planet', 'satellite']) + ) + ) + bodies = result.scalars().all() + logger.info(f"Syncing all {len(bodies)} active solar system bodies") + + if not bodies: + logger.warning("No bodies found to sync") + return { + "success": True, + "bodies_synced": 0, + "total_positions": 0, + "message": "No bodies found" + } + + # Initialize services + horizons = HorizonsService() + + # Sync positions for each body + total_positions = 0 + synced_bodies = [] + failed_bodies = [] + + start_time = datetime.utcnow() + end_time = start_time + timedelta(days=days) + + for body in bodies: + # Use savepoint for this body's operations + async with db.begin_nested(): # Creates a SAVEPOINT + try: + logger.debug(f"Fetching positions for {body.name} ({body.id})") + + # Fetch positions from NASA Horizons + positions = await horizons.get_body_positions( + body_id=body.id, + start_time=start_time, + end_time=end_time, + step="1d" # Daily positions + ) + + # Save positions to database (upsert logic) + count = 0 + for pos in positions: + # Use PostgreSQL's INSERT ... ON CONFLICT to handle duplicates + stmt = insert(Position).values( + body_id=body.id, + time=pos.time, + x=pos.x, + y=pos.y, + z=pos.z, + vx=getattr(pos, 'vx', None), + vy=getattr(pos, 'vy', None), + vz=getattr(pos, 'vz', None), + source=source + ) + + # On conflict (body_id, time), update the existing record + stmt = stmt.on_conflict_do_update( + index_elements=['body_id', 'time'], + set_={ + 'x': pos.x, + 'y': pos.y, + 'z': pos.z, + 'vx': getattr(pos, 'vx', None), + 'vy': getattr(pos, 'vy', None), + 'vz': getattr(pos, 'vz', None), + 'source': source + } + ) + + await db.execute(stmt) + count += 1 + + # Savepoint will auto-commit if no exception + total_positions += count + synced_bodies.append(body.name) + logger.debug(f"Saved {count} positions for {body.name}") + + except Exception as e: + # Savepoint will auto-rollback on exception + logger.error(f"Failed to sync {body.name}: {str(e)}") + failed_bodies.append({"body": body.name, "error": str(e)}) + # Continue to next body + + # Summary + result = { + "success": len(failed_bodies) == 0, + "bodies_synced": len(synced_bodies), + "total_positions": total_positions, + "synced_bodies": synced_bodies, + "failed_bodies": failed_bodies, + "time_range": f"{start_time.date()} to {end_time.date()}", + "source": source + } + + logger.info(f"Sync completed: {len(synced_bodies)} bodies, {total_positions} positions") + return result + + +@task_registry.register( + name="sync_celestial_events", + description="同步天体事件数据(预留功能,暂未实现)", + category="data_sync", + parameters=[ + { + "name": "event_types", + "type": "array", + "description": "事件类型列表,如['eclipse', 'conjunction', 'opposition']", + "required": False, + "default": None + }, + { + "name": "days_ahead", + "type": "integer", + "description": "向未来查询的天数", + "required": False, + "default": 30 + } + ] +) +async def sync_celestial_events( + db: AsyncSession, + logger: logging.Logger, + params: Dict[str, Any] +) -> Dict[str, Any]: + """ + Sync celestial events (PLACEHOLDER - NOT IMPLEMENTED YET) + + This is a reserved task for future implementation. + It will sync astronomical events like eclipses, conjunctions, oppositions, etc. + + Args: + db: Database session + logger: Logger instance + params: Task parameters + - event_types: Types of events to sync + - days_ahead: Number of days ahead to query + + Returns: + Summary of sync operation + """ + logger.warning("sync_celestial_events is not implemented yet") + return { + "success": False, + "message": "This task is reserved for future implementation", + "events_synced": 0 + } diff --git a/backend/app/jobs/registry.py b/backend/app/jobs/registry.py new file mode 100644 index 0000000..de92e78 --- /dev/null +++ b/backend/app/jobs/registry.py @@ -0,0 +1,152 @@ +""" +Task Registry System for Scheduled Jobs + +This module provides a decorator-based registration system for predefined tasks. +Tasks are registered with their metadata, parameters schema, and execution function. +""" +import logging +from typing import Dict, Callable, Any, List, Optional +from dataclasses import dataclass, field +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class TaskParameter(BaseModel): + """Task parameter definition""" + name: str = Field(..., description="Parameter name") + type: str = Field(..., description="Parameter type (string, integer, array, boolean)") + description: str = Field(..., description="Parameter description") + required: bool = Field(default=False, description="Whether parameter is required") + default: Any = Field(default=None, description="Default value") + + +@dataclass +class TaskDefinition: + """Registered task definition""" + name: str + function: Callable + description: str + parameters: List[TaskParameter] = field(default_factory=list) + category: str = "general" + + +class TaskRegistry: + """Registry for predefined scheduled tasks""" + + def __init__(self): + self._tasks: Dict[str, TaskDefinition] = {} + + def register( + self, + name: str, + description: str, + parameters: Optional[List[Dict[str, Any]]] = None, + category: str = "general" + ): + """ + Decorator to register a task function + + Usage: + @task_registry.register( + name="sync_positions", + description="Sync celestial body positions", + parameters=[ + {"name": "days", "type": "integer", "description": "Days to sync", "default": 7} + ] + ) + async def sync_positions_task(db, logger, params): + # Task implementation + pass + """ + def decorator(func: Callable): + # Parse parameters + param_list = [] + if parameters: + for p in parameters: + param_list.append(TaskParameter(**p)) + + # Register the task + task_def = TaskDefinition( + name=name, + function=func, + description=description, + parameters=param_list, + category=category + ) + self._tasks[name] = task_def + logger.debug(f"Registered task: {name}") + return func + + return decorator + + def get_task(self, name: str) -> Optional[TaskDefinition]: + """Get a task definition by name""" + return self._tasks.get(name) + + def list_tasks(self) -> List[Dict[str, Any]]: + """List all registered tasks with their metadata""" + return [ + { + "name": task.name, + "description": task.description, + "category": task.category, + "parameters": [ + { + "name": p.name, + "type": p.type, + "description": p.description, + "required": p.required, + "default": p.default + } + for p in task.parameters + ] + } + for task in self._tasks.values() + ] + + async def execute_task( + self, + name: str, + db: Any, + logger: logging.Logger, + params: Dict[str, Any] + ) -> Any: + """ + Execute a registered task + + Args: + name: Task function name + db: Database session + logger: Logger instance + params: Task parameters from function_params JSONB field + + Returns: + Task execution result + + Raises: + ValueError: If task not found + """ + task_def = self.get_task(name) + if not task_def: + raise ValueError(f"Task '{name}' not found in registry") + + # Merge default parameters + merged_params = {} + for param in task_def.parameters: + if param.name in params: + merged_params[param.name] = params[param.name] + elif param.default is not None: + merged_params[param.name] = param.default + elif param.required: + raise ValueError(f"Required parameter '{param.name}' not provided") + + # Execute the task function + logger.debug(f"Executing task '{name}' with params: {merged_params}") + result = await task_def.function(db=db, logger=logger, params=merged_params) + logger.debug(f"Task '{name}' completed successfully") + return result + + +# Global task registry instance +task_registry = TaskRegistry() diff --git a/backend/app/main.py b/backend/app/main.py index 379107e..0570156 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -30,8 +30,10 @@ from app.api.celestial_orbit import router as celestial_orbit_router from app.api.nasa_download import router as nasa_download_router from app.api.celestial_position import router as celestial_position_router from app.api.star_system import router as star_system_router +from app.api.scheduled_job import router as scheduled_job_router from app.services.redis_cache import redis_cache from app.services.cache_preheat import preheat_all_caches +from app.services.scheduler_service import scheduler_service from app.database import close_db # Configure logging @@ -47,6 +49,7 @@ if log_level == logging.WARNING: logging.getLogger("app.services.cache").setLevel(logging.ERROR) logging.getLogger("app.services.redis_cache").setLevel(logging.ERROR) logging.getLogger("app.api.celestial_position").setLevel(logging.WARNING) + logging.getLogger("apscheduler").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -80,6 +83,9 @@ async def lifespan(app: FastAPI): # Preheat caches (load from database to Redis) await preheat_all_caches() + # Start Scheduler + scheduler_service.start() + logger.info("✓ Application started successfully") logger.info("=" * 60) @@ -89,6 +95,9 @@ async def lifespan(app: FastAPI): logger.info("=" * 60) logger.info("Shutting down Cosmo Backend API...") + # Stop Scheduler + scheduler_service.shutdown() + # Disconnect Redis await redis_cache.disconnect() @@ -134,6 +143,7 @@ app.include_router(celestial_static_router, prefix=settings.api_prefix) app.include_router(cache_router, prefix=settings.api_prefix) app.include_router(nasa_download_router, prefix=settings.api_prefix) app.include_router(task_router, prefix=settings.api_prefix) +app.include_router(scheduled_job_router, prefix=settings.api_prefix) # Added scheduled_job_router # Mount static files for uploaded resources upload_dir = Path(__file__).parent.parent / "upload" diff --git a/backend/app/models/db/position.py b/backend/app/models/db/position.py index 439a387..3df7314 100644 --- a/backend/app/models/db/position.py +++ b/backend/app/models/db/position.py @@ -40,7 +40,7 @@ class Position(Base): # Constraints and indexes __table_args__ = ( CheckConstraint( - "source IN ('nasa_horizons', 'calculated', 'user_defined', 'imported')", + "source IN ('nasa_horizons', 'nasa_horizons_cron', 'calculated', 'user_defined', 'imported')", name="chk_source", ), Index("idx_positions_body_time", "body_id", "time", postgresql_using="btree"), diff --git a/backend/app/models/db/scheduled_job.py b/backend/app/models/db/scheduled_job.py new file mode 100644 index 0000000..7f85bfb --- /dev/null +++ b/backend/app/models/db/scheduled_job.py @@ -0,0 +1,59 @@ +""" +Scheduled Job ORM model +""" +from sqlalchemy import Column, String, Integer, TIMESTAMP, Boolean, Text, Enum, CheckConstraint +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.sql import func +from app.database import Base +import enum + + +class JobType(str, enum.Enum): + """Job type enumeration""" + PREDEFINED = "predefined" + CUSTOM_CODE = "custom_code" + + +class ScheduledJob(Base): + """Scheduled jobs configuration""" + + __tablename__ = "scheduled_jobs" + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(100), nullable=False, comment="Task name") + job_type = Column( + Enum(JobType, values_callable=lambda obj: [e.value for e in obj]), + nullable=False, + default=JobType.PREDEFINED, + comment="Job type: predefined or custom_code" + ) + predefined_function = Column( + String(100), + nullable=True, + comment="Predefined function name (required if job_type=predefined)" + ) + function_params = Column( + JSONB, + nullable=True, + default={}, + comment="JSON parameters for predefined function" + ) + cron_expression = Column(String(50), nullable=False, comment="CRON expression") + python_code = Column(Text, nullable=True, comment="Dynamic Python code (only for custom_code type)") + is_active = Column(Boolean, default=True, comment="Active status") + last_run_at = Column(TIMESTAMP, nullable=True, comment="Last execution time") + last_run_status = Column(String(20), nullable=True, comment="Last execution status") + next_run_at = Column(TIMESTAMP, nullable=True, comment="Next scheduled execution time") + description = Column(Text, nullable=True, comment="Description") + created_at = Column(TIMESTAMP, server_default=func.now()) + updated_at = Column(TIMESTAMP, server_default=func.now(), onupdate=func.now()) + + __table_args__ = ( + CheckConstraint( + "(job_type = 'predefined' AND predefined_function IS NOT NULL) OR (job_type = 'custom_code' AND python_code IS NOT NULL)", + name="chk_job_type_fields" + ), + ) + + def __repr__(self): + return f"" diff --git a/backend/app/services/code_validator.py b/backend/app/services/code_validator.py new file mode 100644 index 0000000..1d58943 --- /dev/null +++ b/backend/app/services/code_validator.py @@ -0,0 +1,137 @@ +""" +Python Code Validator for Scheduled Jobs +验证用户提交的 Python 代码安全性和语法正确性 +""" +import ast +import re +from typing import Dict, List, Tuple + + +class PythonCodeValidator: + """验证Python代码的安全性和有效性""" + + # 危险的内置函数和模块 + DANGEROUS_BUILTINS = { + 'eval', 'exec', 'compile', '__import__', + 'open', 'file', 'input', 'raw_input', + 'execfile', 'reload', + } + + # 危险的模块 + DANGEROUS_MODULES = { + 'os', 'sys', 'subprocess', 'socket', + 'shutil', 'pickle', 'multiprocessing', + 'threading', 'ctypes', 'importlib', + } + + # 允许的模块(白名单) + ALLOWED_MODULES = { + 'asyncio', 'datetime', 'math', 'json', + 'logging', 'typing', 'collections', + 'app.services', 'app.models', 'sqlalchemy', + } + + @staticmethod + def validate_syntax(code: str) -> Tuple[bool, str]: + """ + 验证Python代码语法 + + Returns: + (is_valid, error_message) + """ + try: + ast.parse(code) + return True, "" + except SyntaxError as e: + return False, f"语法错误 (第{e.lineno}行): {e.msg}" + except Exception as e: + return False, f"代码解析错误: {str(e)}" + + @staticmethod + def check_dangerous_functions(code: str) -> Tuple[bool, List[str]]: + """ + 检查是否使用了危险函数 + + Returns: + (is_safe, dangerous_items) + """ + dangerous_found = [] + + try: + tree = ast.parse(code) + + for node in ast.walk(tree): + # 检查函数调用 + if isinstance(node, ast.Call): + if isinstance(node.func, ast.Name): + if node.func.id in PythonCodeValidator.DANGEROUS_BUILTINS: + dangerous_found.append(f"危险函数: {node.func.id}") + + # 检查模块导入 + elif isinstance(node, ast.Import): + for alias in node.names: + module_name = alias.name.split('.')[0] + if module_name in PythonCodeValidator.DANGEROUS_MODULES: + if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES): + dangerous_found.append(f"危险模块导入: {alias.name}") + + elif isinstance(node, ast.ImportFrom): + if node.module: + module_name = node.module.split('.')[0] + if module_name in PythonCodeValidator.DANGEROUS_MODULES: + if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES): + dangerous_found.append(f"危险模块导入: from {node.module}") + + return len(dangerous_found) == 0, dangerous_found + + except Exception as e: + return False, [f"代码分析错误: {str(e)}"] + + @staticmethod + def validate_code(code: str) -> Dict: + """ + 完整的代码验证 + + Returns: + { + "valid": bool, + "errors": List[str], + "warnings": List[str] + } + """ + errors = [] + warnings = [] + + # 1. 检查代码是否为空 + if not code or not code.strip(): + errors.append("代码不能为空") + return {"valid": False, "errors": errors, "warnings": warnings} + + # 2. 语法验证 + syntax_valid, syntax_error = PythonCodeValidator.validate_syntax(code) + if not syntax_valid: + errors.append(syntax_error) + return {"valid": False, "errors": errors, "warnings": warnings} + + # 3. 安全检查 + is_safe, dangerous_items = PythonCodeValidator.check_dangerous_functions(code) + if not is_safe: + errors.extend(dangerous_items) + + # 4. 检查代码长度 + if len(code) > 10000: # 10KB limit + warnings.append("代码过长,可能影响性能") + + # 5. 检查是否包含无限循环风险 + if re.search(r'while\s+True\s*:', code): + warnings.append("检测到 'while True',请确保有退出条件") + + return { + "valid": len(errors) == 0, + "errors": errors, + "warnings": warnings + } + + +# 导出验证器实例 +code_validator = PythonCodeValidator() diff --git a/backend/app/services/horizons.py b/backend/app/services/horizons.py index 89fd320..89f2f61 100644 --- a/backend/app/services/horizons.py +++ b/backend/app/services/horizons.py @@ -62,7 +62,7 @@ class HorizonsService: return response.text except Exception as e: - logger.error(f"Error fetching raw data for {body_id}: {str(e)}") + logger.error(f"Error fetching raw data for {body_id}: {repr(e)}") raise async def get_body_positions( @@ -142,7 +142,7 @@ class HorizonsService: return self._parse_vectors(response.text) except Exception as e: - logger.error(f"Error querying Horizons for body {body_id}: {str(e)}") + logger.error(f"Error querying Horizons for body {body_id}: {repr(e)}") raise def _parse_vectors(self, text: str) -> list[Position]: diff --git a/backend/app/services/nasa_worker.py b/backend/app/services/nasa_worker.py index 6c62b42..9876c7b 100644 --- a/backend/app/services/nasa_worker.py +++ b/backend/app/services/nasa_worker.py @@ -2,12 +2,13 @@ import logging import asyncio from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession -from typing import List +from typing import List, Optional from app.database import AsyncSessionLocal from app.services.task_service import task_service from app.services.db_service import celestial_body_service, position_service from app.services.horizons import horizons_service +from app.services.orbit_service import orbit_service logger = logging.getLogger(__name__) @@ -118,3 +119,118 @@ async def download_positions_task(task_id: int, body_ids: List[str], dates: List except Exception as e: logger.error(f"Task {task_id} failed critically: {e}") await task_service.fail_task(db, task_id, str(e)) + + +async def generate_orbits_task(task_id: int, body_ids: Optional[List[str]] = None): + """ + Background task to generate orbits + + Args: + task_id: ID of the task record to update + body_ids: List of body IDs to generate. If None, generates for all bodies with orbital params. + """ + logger.info(f"🚀 Starting background orbit generation task {task_id}") + + async with AsyncSessionLocal() as db: + try: + # Update task to running + await task_service.update_task( + db, task_id, status="running", started_at=datetime.utcnow(), progress=0 + ) + + # 1. Determine bodies to process + bodies_to_process = [] + + if body_ids: + # Fetch specific bodies requested + for bid in body_ids: + body = await celestial_body_service.get_body_by_id(bid, db) + if body: + bodies_to_process.append(body) + else: + # Fetch all bodies + bodies_to_process = await celestial_body_service.get_all_bodies(db) + + # 2. Filter for valid orbital parameters + valid_bodies = [] + for body in bodies_to_process: + extra_data = body.extra_data or {} + # Must have orbit_period_days to generate an orbit + if extra_data.get("orbit_period_days"): + valid_bodies.append(body) + elif body_ids and body.id in body_ids: + # If explicitly requested but missing period, log warning + logger.warning(f"Body {body.name} ({body.id}) missing 'orbit_period_days', skipping.") + + total_bodies = len(valid_bodies) + if total_bodies == 0: + await task_service.update_task( + db, task_id, status="completed", progress=100, + result={"message": "No bodies with 'orbit_period_days' found to process"} + ) + return + + # 3. Process + success_count = 0 + failure_count = 0 + results = [] + + for i, body in enumerate(valid_bodies): + try: + # Update progress + progress = int((i / total_bodies) * 100) + await task_service.update_task(db, task_id, progress=progress) + + extra_data = body.extra_data or {} + period = float(extra_data.get("orbit_period_days")) + color = extra_data.get("orbit_color", "#CCCCCC") + + # Generate orbit + orbit = await orbit_service.generate_orbit( + body_id=body.id, + body_name=body.name_zh or body.name, + period_days=period, + color=color, + session=db, + horizons_service=horizons_service + ) + + results.append({ + "body_id": body.id, + "body_name": body.name_zh or body.name, + "status": "success", + "num_points": orbit.num_points + }) + success_count += 1 + + except Exception as e: + logger.error(f"Failed to generate orbit for {body.name}: {e}") + results.append({ + "body_id": body.id, + "body_name": body.name_zh or body.name, + "status": "failed", + "error": str(e) + }) + failure_count += 1 + + # Finish task + await task_service.update_task( + db, + task_id, + status="completed", + progress=100, + completed_at=datetime.utcnow(), + result={ + "total": total_bodies, + "success": success_count, + "failed": failure_count, + "details": results + } + ) + logger.info(f"🏁 Orbit generation task {task_id} completed") + + except Exception as e: + logger.error(f"Task {task_id} failed: {e}") + await task_service.update_task( + db, task_id, status="failed", error_message=str(e), completed_at=datetime.utcnow() + ) diff --git a/backend/app/services/orbit_service.py b/backend/app/services/orbit_service.py index ebf6ce2..2cfafab 100644 --- a/backend/app/services/orbit_service.py +++ b/backend/app/services/orbit_service.py @@ -16,223 +16,664 @@ logger = logging.getLogger(__name__) class OrbitService: + + """Service for orbit CRUD operations and generation""" + + + + @staticmethod + + async def get_orbit(body_id: str, session: AsyncSession) -> Optional[Orbit]: + + """Get orbit data for a specific body""" + + result = await session.execute( + + select(Orbit).where(Orbit.body_id == body_id) + + ) + + return result.scalar_one_or_none() + + + + @staticmethod + + async def get_all_orbits( + + session: AsyncSession, + + body_type: Optional[str] = None + + ) -> List[Orbit]: + + """Get all orbits, optionally filtered by body type""" + + if body_type: + + # Join with celestial_bodies to filter by type + + query = ( + + select(Orbit) + + .join(CelestialBody, Orbit.body_id == CelestialBody.id) + + .where(CelestialBody.type == body_type) + + ) + + else: + + query = select(Orbit) + + + + result = await session.execute(query) + + return list(result.scalars().all()) + + + + @staticmethod + + async def get_all_orbits_with_bodies( + + session: AsyncSession, + + body_type: Optional[str] = None + + ) -> List[tuple[Orbit, CelestialBody]]: + + """ + + Get all orbits with their associated celestial bodies in a single query. + + This is optimized to avoid N+1 query problem. + + + + Returns: + + List of (Orbit, CelestialBody) tuples + + """ + + if body_type: + + query = ( + + select(Orbit, CelestialBody) + + .join(CelestialBody, Orbit.body_id == CelestialBody.id) + + .where(CelestialBody.type == body_type) + + ) + + else: + + query = ( + + select(Orbit, CelestialBody) + + .join(CelestialBody, Orbit.body_id == CelestialBody.id) + + ) + + + + result = await session.execute(query) + + return list(result.all()) + + + + @staticmethod + + async def save_orbit( + + body_id: str, + + points: List[Dict[str, float]], + + num_points: int, + + period_days: Optional[float], + + color: Optional[str], + + session: AsyncSession + + ) -> Orbit: + + """Save or update orbit data using UPSERT""" + + stmt = insert(Orbit).values( + + body_id=body_id, + + points=points, + + num_points=num_points, + + period_days=period_days, + + color=color, + + created_at=datetime.utcnow(), + + updated_at=datetime.utcnow() + + ) + + + + # On conflict, update all fields + + stmt = stmt.on_conflict_do_update( + + index_elements=['body_id'], + + set_={ + + 'points': points, + + 'num_points': num_points, + + 'period_days': period_days, + + 'color': color, + + 'updated_at': datetime.utcnow() + + } + + ) + + + + await session.execute(stmt) + + await session.commit() + + + + # Fetch and return the saved orbit + + return await OrbitService.get_orbit(body_id, session) + + + + @staticmethod + + async def delete_orbit(body_id: str, session: AsyncSession) -> bool: + + """Delete orbit data for a specific body""" + + orbit = await OrbitService.get_orbit(body_id, session) + + if orbit: + + await session.delete(orbit) + + await session.commit() + + return True + + return False + + + + @staticmethod + + async def generate_orbit( + + body_id: str, + + body_name: str, + + period_days: float, + + color: Optional[str], + + session: AsyncSession, + + horizons_service: HorizonsService + + ) -> Orbit: + + """ + + Generate complete orbital data for a celestial body + + + + Args: + + body_id: JPL Horizons ID + + body_name: Display name (for logging) + + period_days: Orbital period in days + + color: Hex color for orbit line + + session: Database session + + horizons_service: NASA Horizons API service + + + + Returns: + + Generated Orbit object + + """ + + logger.info(f"🌌 Generating orbit for {body_name} (period: {period_days:.1f} days)") + + + + # Calculate number of sample points + + # Use at least 100 points for smooth ellipse + + # For very long periods, cap at 1000 to avoid excessive data + + MIN_POINTS = 100 + + MAX_POINTS = 1000 + + + + if period_days < 3650: # < 10 years + + # For planets: aim for ~1 point per day, minimum 100 + + num_points = max(MIN_POINTS, min(int(period_days), 365)) + + else: # >= 10 years + + # For outer planets and dwarf planets: monthly sampling + + num_points = min(int(period_days / 30), MAX_POINTS) + + + + # Calculate step size in days + + step_days = max(1, int(period_days / num_points)) + + + + logger.info(f" 📊 Sampling {num_points} points (every {step_days} days)") + + + + # Query NASA Horizons for complete orbital period + + # NASA Horizons has limited date range (typically 1900-2200) + + # For very long periods, we need to limit the query range + + + + MAX_QUERY_YEARS = 250 # Maximum years we can query (1900-2150) + + MAX_QUERY_DAYS = MAX_QUERY_YEARS * 365 + + + + if period_days > MAX_QUERY_DAYS: + + # For extremely long periods (>250 years), sample a partial orbit + + # Use enough data to show the orbital shape accurately + + actual_query_days = MAX_QUERY_DAYS + + start_time = datetime(1900, 1, 1) + + end_time = datetime(1900 + MAX_QUERY_YEARS, 1, 1) + + + + logger.warning(f" ⚠️ Period too long ({period_days/365:.1f} years), sampling {MAX_QUERY_YEARS} years only") + + logger.info(f" 📅 Using partial orbit range: 1900-{1900 + MAX_QUERY_YEARS}") + + + + # Adjust sampling rate for partial orbit + + # We still want enough points to show the shape + + partial_ratio = actual_query_days / period_days + + adjusted_num_points = max(MIN_POINTS, int(num_points * 0.5)) # At least half the intended points + + step_days = max(1, int(actual_query_days / adjusted_num_points)) + + + + logger.info(f" 📊 Adjusted sampling: {adjusted_num_points} points (every {step_days} days)") + + + + elif period_days > 150 * 365: # More than 150 years but <= 250 years + + # Start from year 1900 for historical data + + start_time = datetime(1900, 1, 1) + + end_time = start_time + timedelta(days=period_days) + + logger.info(f" 📅 Using historical date range (1900-{end_time.year}) for long-period orbit") + + else: + + start_time = datetime.utcnow() + + end_time = start_time + timedelta(days=period_days) + + + + try: + + # Get positions from Horizons (synchronous call) + + positions = await horizons_service.get_body_positions( + + body_id=body_id, + + start_time=start_time, + + end_time=end_time, + + step=f"{step_days}d" + + ) + + + + if not positions or len(positions) == 0: + + raise ValueError(f"No position data returned for {body_name}") + + + + # Convert Position objects to list of dicts + + points = [ + + {"x": pos.x, "y": pos.y, "z": pos.z} + + for pos in positions + + ] + + + + logger.info(f" ✅ Retrieved {len(points)} orbital points") + + + + # Save to database + + orbit = await OrbitService.save_orbit( + + body_id=body_id, + + points=points, + + num_points=len(points), + + period_days=period_days, + + color=color, + + session=session + + ) + + + + logger.info(f" 💾 Saved orbit for {body_name}") + + return orbit + + + + except Exception as e: - logger.error(f" ❌ Failed to generate orbit for {body_name}: {e}") + + + logger.error(f" ❌ Failed to generate orbit for {body_name}: {repr(e)}") + + raise + + + +# Singleton instance + + orbit_service = OrbitService() + diff --git a/backend/app/services/scheduler_service.py b/backend/app/services/scheduler_service.py new file mode 100644 index 0000000..a7caa42 --- /dev/null +++ b/backend/app/services/scheduler_service.py @@ -0,0 +1,223 @@ +""" +Scheduler Service +Manages APScheduler and dynamic task execution +""" +import logging +import asyncio +from datetime import datetime +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import AsyncSessionLocal +from app.models.db.scheduled_job import ScheduledJob, JobType +from app.models.db.task import Task +from app.services.task_service import task_service +from app.jobs.registry import task_registry +# Import predefined tasks to register them +import app.jobs.predefined # noqa: F401 + +logger = logging.getLogger(__name__) + + +class SchedulerService: + def __init__(self): + self.scheduler = AsyncIOScheduler() + self.jobs = {} + + def start(self): + """Start the scheduler""" + if not self.scheduler.running: + self.scheduler.start() + logger.info("Scheduler started") + # Load jobs from DB + asyncio.create_task(self.load_jobs()) + + def shutdown(self): + """Shutdown the scheduler""" + if self.scheduler.running: + self.scheduler.shutdown() + logger.info("Scheduler stopped") + + async def load_jobs(self): + """Load active jobs from database and schedule them""" + logger.info("Loading scheduled jobs from database...") + async with AsyncSessionLocal() as session: + result = await session.execute(select(ScheduledJob).where(ScheduledJob.is_active == True)) + jobs = result.scalars().all() + + for job in jobs: + self.add_job_to_scheduler(job) + + logger.info(f"Loaded {len(jobs)} scheduled jobs") + + def add_job_to_scheduler(self, job: ScheduledJob): + """Add a single job to APScheduler""" + try: + # Remove existing job if any (to update) + if str(job.id) in self.jobs: + self.scheduler.remove_job(str(job.id)) + + # Create trigger from cron expression + # Cron format: "minute hour day month day_of_week" + # APScheduler expects kwargs, so we need to parse or use from_crontab if strictly standard + # But CronTrigger.from_crontab is standard. + trigger = CronTrigger.from_crontab(job.cron_expression) + + self.scheduler.add_job( + self.execute_job, + trigger, + args=[job.id], + id=str(job.id), + name=job.name, + replace_existing=True + ) + self.jobs[str(job.id)] = job + logger.info(f"Scheduled job '{job.name}' (ID: {job.id}) with cron: {job.cron_expression}") + except Exception as e: + logger.error(f"Failed to schedule job '{job.name}': {e}") + + async def execute_job(self, job_id: int): + """ + Execute either a predefined task or dynamic python code for a job. + This runs in the scheduler's event loop. + """ + logger.info(f"Executing job ID: {job_id}") + + async with AsyncSessionLocal() as session: + # Fetch job details again to get latest configuration + result = await session.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + + if not job: + logger.error(f"Job {job_id} not found") + return + + # Validate job configuration + if job.job_type == JobType.PREDEFINED and not job.predefined_function: + logger.error(f"Job {job_id} is predefined type but has no function name") + return + elif job.job_type == JobType.CUSTOM_CODE and not job.python_code: + logger.error(f"Job {job_id} is custom_code type but has no code") + return + + # Create a Task record for this execution history + task_record = await task_service.create_task( + session, + task_type="scheduled_job", + description=f"Scheduled execution of '{job.name}'", + params={"job_id": job.id, "job_type": job.job_type.value}, + created_by=None # System + ) + + # Update Task to running + await task_service.update_task(session, task_record.id, status="running", started_at=datetime.utcnow(), progress=0) + + # Update Job last run time + job.last_run_at = datetime.utcnow() + await session.commit() + + try: + # Execute based on job type + if job.job_type == JobType.PREDEFINED: + # Execute predefined task from registry + logger.debug(f"Executing predefined task: {job.predefined_function}") + result_val = await task_registry.execute_task( + name=job.predefined_function, + db=session, + logger=logger, + params=job.function_params or {} + ) + else: + # Execute custom Python code (legacy support) + logger.debug(f"Executing custom code for job: {job.name}") + # Prepare execution context + # We inject useful services and variables + context = { + "db": session, + "logger": logger, + "task_id": task_record.id, + "asyncio": asyncio, + # Import commonly used services here if needed, or let code import them + } + + # Wrap code in an async function to allow await + # Indent code to fit inside the wrapper + indented_code = "\n".join([" " + line for line in job.python_code.split("\n")]) + wrapper_code = f"async def _dynamic_func():\n{indented_code}" + + # Execute definition + exec(wrapper_code, context) + + # Execute the function + _func = context["_dynamic_func"] + result_val = await _func() + + # Success + await task_service.update_task( + session, + task_record.id, + status="completed", + progress=100, + completed_at=datetime.utcnow(), + result={"output": str(result_val) if result_val else "Success"} + ) + job.last_run_status = "success" + logger.info(f"Job '{job.name}' completed successfully") + + except Exception as e: + # Failure + import traceback + error_msg = f"{str(e)}\n{traceback.format_exc()}" + logger.error(f"Job '{job.name}' failed: {e}") + + # Rollback the current transaction + await session.rollback() + + # Start a new transaction to update task status + try: + await task_service.update_task( + session, + task_record.id, + status="failed", + error_message=error_msg, + completed_at=datetime.utcnow() + ) + job.last_run_status = "failed" + # Commit the failed task update in new transaction + await session.commit() + except Exception as update_error: + logger.error(f"Failed to update task status: {update_error}") + await session.rollback() + + else: + # Success - commit only if no exception + await session.commit() + + async def reload_job(self, job_id: int): + """Reload a specific job from DB (after update)""" + async with AsyncSessionLocal() as session: + result = await session.execute(select(ScheduledJob).where(ScheduledJob.id == job_id)) + job = result.scalar_one_or_none() + if job: + if job.is_active: + self.add_job_to_scheduler(job) + else: + self.remove_job(job_id) + + def remove_job(self, job_id: int): + """Remove job from scheduler""" + if str(job_id) in self.jobs: + if self.scheduler.get_job(str(job_id)): + self.scheduler.remove_job(str(job_id)) + del self.jobs[str(job_id)] + logger.info(f"Removed job ID: {job_id}") + + async def run_job_now(self, job_id: int): + """Manually trigger a job immediately""" + return await self.execute_job(job_id) + + +# Singleton +scheduler_service = SchedulerService() diff --git a/backend/app/services/task_service.py b/backend/app/services/task_service.py index a731409..4dbedc5 100644 --- a/backend/app/services/task_service.py +++ b/backend/app/services/task_service.py @@ -40,6 +40,30 @@ class TaskService: return task + async def update_task( + self, + db: AsyncSession, + task_id: int, + **kwargs + ): + """Generic task update""" + stmt = ( + update(Task) + .where(Task.id == task_id) + .values(**kwargs) + ) + await db.execute(stmt) + await db.commit() + + # Update Redis if relevant fields changed + if "status" in kwargs or "progress" in kwargs: + await self._update_redis( + task_id, + kwargs.get("progress", 0), + kwargs.get("status", "running"), + error=kwargs.get("error_message") + ) + async def update_progress( self, db: AsyncSession, diff --git a/backend/migrations/add_nasa_horizons_cron_source.sql b/backend/migrations/add_nasa_horizons_cron_source.sql new file mode 100644 index 0000000..41c013b --- /dev/null +++ b/backend/migrations/add_nasa_horizons_cron_source.sql @@ -0,0 +1,15 @@ +-- Migration: 添加 nasa_horizons_cron 到 positions 表的 source 约束 +-- Date: 2025-12-11 + +-- 1. 删除旧的约束 +ALTER TABLE positions DROP CONSTRAINT IF EXISTS chk_source; + +-- 2. 添加新的约束(包含 nasa_horizons_cron) +ALTER TABLE positions ADD CONSTRAINT chk_source + CHECK (source IN ('nasa_horizons', 'nasa_horizons_cron', 'calculated', 'user_defined', 'imported')); + +-- 3. 验证约束 +SELECT conname, pg_get_constraintdef(oid) +FROM pg_constraint +WHERE conrelid = 'positions'::regclass +AND conname = 'chk_source'; diff --git a/backend/migrations/add_predefined_jobs_support.sql b/backend/migrations/add_predefined_jobs_support.sql new file mode 100644 index 0000000..468f7a9 --- /dev/null +++ b/backend/migrations/add_predefined_jobs_support.sql @@ -0,0 +1,55 @@ +-- Migration: Add Predefined Task Support to scheduled_jobs +-- Date: 2025-12-11 +-- Purpose: Transition from dynamic code execution to predefined task system + +-- 1. Create job_type ENUM type +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'jobtype') THEN + CREATE TYPE jobtype AS ENUM ('predefined', 'custom_code'); + END IF; +END $$; + +-- 2. Add new columns +ALTER TABLE scheduled_jobs +ADD COLUMN IF NOT EXISTS job_type jobtype DEFAULT 'custom_code', +ADD COLUMN IF NOT EXISTS predefined_function VARCHAR(100), +ADD COLUMN IF NOT EXISTS function_params JSONB DEFAULT '{}'::jsonb; + +-- 3. Update existing rows to custom_code type (preserve backward compatibility) +UPDATE scheduled_jobs +SET job_type = 'custom_code' +WHERE job_type IS NULL; + +-- 4. Make job_type NOT NULL after setting defaults +ALTER TABLE scheduled_jobs +ALTER COLUMN job_type SET NOT NULL; + +-- 5. Set default for job_type to 'predefined' for new records +ALTER TABLE scheduled_jobs +ALTER COLUMN job_type SET DEFAULT 'predefined'; + +-- 6. Add check constraint +ALTER TABLE scheduled_jobs +ADD CONSTRAINT chk_job_type_fields +CHECK ( + (job_type = 'predefined' AND predefined_function IS NOT NULL) + OR + (job_type = 'custom_code' AND python_code IS NOT NULL) +); + +-- 7. Add comment on columns +COMMENT ON COLUMN scheduled_jobs.job_type IS 'Job type: predefined or custom_code'; +COMMENT ON COLUMN scheduled_jobs.predefined_function IS 'Predefined function name (required if job_type=predefined)'; +COMMENT ON COLUMN scheduled_jobs.function_params IS 'JSON parameters for predefined function'; +COMMENT ON COLUMN scheduled_jobs.python_code IS 'Dynamic Python code (only for custom_code type)'; + +-- 8. Verify the changes +SELECT + column_name, + data_type, + is_nullable, + column_default +FROM information_schema.columns +WHERE table_name = 'scheduled_jobs' +ORDER BY ordinal_position; diff --git a/backend/scripts/add_predefined_task_columns.py b/backend/scripts/add_predefined_task_columns.py new file mode 100644 index 0000000..f819df1 --- /dev/null +++ b/backend/scripts/add_predefined_task_columns.py @@ -0,0 +1,93 @@ +""" +Simple migration to add predefined task columns +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from app.database import engine + + +async def run_simple_migration(): + """Add the new columns to scheduled_jobs table""" + async with engine.begin() as conn: + print("🔄 Adding new columns to scheduled_jobs table...") + + # Add job_type column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN job_type jobtype DEFAULT 'custom_code'::jobtype NOT NULL + """)) + print("✅ Added job_type column") + except Exception as e: + print(f"⚠️ job_type column: {e}") + + # Add predefined_function column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN predefined_function VARCHAR(100) + """)) + print("✅ Added predefined_function column") + except Exception as e: + print(f"⚠️ predefined_function column: {e}") + + # Add function_params column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN function_params JSONB DEFAULT '{}'::jsonb + """)) + print("✅ Added function_params column") + except Exception as e: + print(f"⚠️ function_params column: {e}") + + # Set default for future records to 'predefined' + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ALTER COLUMN job_type SET DEFAULT 'predefined'::jobtype + """)) + print("✅ Set default job_type to 'predefined'") + except Exception as e: + print(f"⚠️ Setting default: {e}") + + # Add check constraint + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + DROP CONSTRAINT IF EXISTS chk_job_type_fields + """)) + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD CONSTRAINT chk_job_type_fields + CHECK ( + (job_type = 'predefined' AND predefined_function IS NOT NULL) + OR + (job_type = 'custom_code' AND python_code IS NOT NULL) + ) + """)) + print("✅ Added check constraint") + except Exception as e: + print(f"⚠️ Check constraint: {e}") + + print("\n📋 Final table structure:") + result = await conn.execute(text(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'scheduled_jobs' + ORDER BY ordinal_position + """)) + + rows = result.fetchall() + for row in rows: + print(f" - {row[0]}: {row[1]} (nullable: {row[2]})") + + +if __name__ == "__main__": + asyncio.run(run_simple_migration()) diff --git a/backend/scripts/add_scheduled_jobs.sql b/backend/scripts/add_scheduled_jobs.sql new file mode 100644 index 0000000..e57ce61 --- /dev/null +++ b/backend/scripts/add_scheduled_jobs.sql @@ -0,0 +1,80 @@ +-- 1. 重建定时任务表 (增加 python_code 支持动态逻辑) +DROP TABLE IF EXISTS "public"."scheduled_jobs" CASCADE; + +CREATE TABLE "public"."scheduled_jobs" ( + "id" SERIAL PRIMARY KEY, + "name" VARCHAR(100) NOT NULL, -- 任务名称 + "cron_expression" VARCHAR(50) NOT NULL, -- CRON表达式 + "python_code" TEXT, -- 【核心】可执行的Python业务代码 + "is_active" BOOLEAN DEFAULT TRUE, -- 启停状态 + "last_run_at" TIMESTAMP, -- 上次执行时间 + "last_run_status" VARCHAR(20), -- 上次执行结果 + "next_run_at" TIMESTAMP, -- 下次预计执行时间 + "description" TEXT, -- 描述 + "created_at" TIMESTAMP DEFAULT NOW(), + "updated_at" TIMESTAMP DEFAULT NOW() +); + +-- 索引 +CREATE INDEX "idx_scheduled_jobs_active" ON "public"."scheduled_jobs" ("is_active"); + +-- 注释 +COMMENT ON TABLE "public"."scheduled_jobs" IS '定时任务调度配置表(支持动态Python代码)'; +COMMENT ON COLUMN "public"."scheduled_jobs"."python_code" IS '直接执行的Python代码体,上下文中可使用 db, logger 等变量'; + +-- 插入默认任务:每日同步位置 +INSERT INTO "public"."scheduled_jobs" +("name", "cron_expression", "description", "is_active", "python_code") +VALUES +( + '每日全量位置同步', + '0 0 * * *', + '每天UTC 0点同步所有活跃天体的最新位置数据', + true, + '# 这是一个动态任务示例 +# 可用变量: db (AsyncSession), logger (Logger) +from app.services.db_service import celestial_body_service, position_service +from app.services.horizons import horizons_service +from datetime import datetime + +logger.info("开始执行每日位置同步...") + +# 获取所有活跃天体 +bodies = await celestial_body_service.get_all_bodies(db) +active_bodies = [b for b in bodies if b.is_active] + +count = 0 +now = datetime.utcnow() + +for body in active_bodies: + try: + # 获取当天位置 + positions = await horizons_service.get_body_positions( + body_id=body.id, + start_time=now, + end_time=now + ) + + if positions: + # 这里的 save_positions 需要自己实现或确保 db_service 中有对应方法支持 list + # 假设我们循环 save_position 或者 db_service 已有批量接口 + # 为简单起见,这里演示循环调用 + for p in positions: + await position_service.save_position( + body_id=body.id, + time=p.time, + x=p.x, + y=p.y, + z=p.z, + source="nasa_horizons_cron", + session=db + ) + count += 1 + except Exception as e: + logger.error(f"同步 {body.name} 失败: {e}") + +logger.info(f"同步完成,共更新 {count} 个天体") +# 脚本最后一行表达式的值会被作为 result 存储 +f"Synced {count} bodies" +' +); diff --git a/backend/scripts/check_scheduled_jobs_table.py b/backend/scripts/check_scheduled_jobs_table.py new file mode 100644 index 0000000..3b63766 --- /dev/null +++ b/backend/scripts/check_scheduled_jobs_table.py @@ -0,0 +1,64 @@ +""" +Check the current state of scheduled_jobs table +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from app.database import engine + + +async def check_table(): + """Check current table structure""" + async with engine.begin() as conn: + # Check if table exists + result = await conn.execute(text(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'scheduled_jobs' + ) + """)) + exists = result.scalar() + + if not exists: + print("❌ Table 'scheduled_jobs' does not exist yet") + print("💡 You need to run: alembic upgrade head") + return + + # Get table structure + result = await conn.execute(text(""" + SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_name = 'scheduled_jobs' + ORDER BY ordinal_position + """)) + + rows = result.fetchall() + + print("✅ Table 'scheduled_jobs' exists") + print("\n📋 Current table structure:") + for row in rows: + default = row[3] if row[3] else 'NULL' + print(f" - {row[0]}: {row[1]} (nullable: {row[2]}, default: {default})") + + # Check for enum type + result = await conn.execute(text(""" + SELECT EXISTS ( + SELECT FROM pg_type + WHERE typname = 'jobtype' + ) + """)) + enum_exists = result.scalar() + + if enum_exists: + print("\n✅ ENUM type 'jobtype' exists") + else: + print("\n❌ ENUM type 'jobtype' does NOT exist") + + +if __name__ == "__main__": + asyncio.run(check_table()) diff --git a/backend/scripts/fix_and_migrate.py b/backend/scripts/fix_and_migrate.py new file mode 100644 index 0000000..21b8422 --- /dev/null +++ b/backend/scripts/fix_and_migrate.py @@ -0,0 +1,119 @@ +""" +Fix enum type and add columns +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from app.database import engine + + +async def fix_enum_and_migrate(): + """Fix enum type and add columns""" + async with engine.begin() as conn: + # First check enum values + result = await conn.execute(text(""" + SELECT enumlabel + FROM pg_enum + WHERE enumtypid = 'jobtype'::regtype + ORDER BY enumsortorder + """)) + enum_values = [row[0] for row in result.fetchall()] + print(f"Current enum values: {enum_values}") + + # Add missing enum values if needed + if 'predefined' not in enum_values: + await conn.execute(text("ALTER TYPE jobtype ADD VALUE 'predefined'")) + print("✅ Added 'predefined' to enum") + + if 'custom_code' not in enum_values: + await conn.execute(text("ALTER TYPE jobtype ADD VALUE 'custom_code'")) + print("✅ Added 'custom_code' to enum") + + # Now add columns in separate transaction + async with engine.begin() as conn: + print("\n🔄 Adding columns to scheduled_jobs table...") + + # Add job_type column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN job_type jobtype DEFAULT 'custom_code'::jobtype NOT NULL + """)) + print("✅ Added job_type column") + except Exception as e: + if "already exists" in str(e): + print("ℹ️ job_type column already exists") + else: + raise + + # Add predefined_function column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN predefined_function VARCHAR(100) + """)) + print("✅ Added predefined_function column") + except Exception as e: + if "already exists" in str(e): + print("ℹ️ predefined_function column already exists") + else: + raise + + # Add function_params column + try: + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD COLUMN function_params JSONB DEFAULT '{}'::jsonb + """)) + print("✅ Added function_params column") + except Exception as e: + if "already exists" in str(e): + print("ℹ️ function_params column already exists") + else: + raise + + # Set defaults and constraints in separate transaction + async with engine.begin() as conn: + # Set default for future records + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ALTER COLUMN job_type SET DEFAULT 'predefined'::jobtype + """)) + print("✅ Set default job_type to 'predefined'") + + # Drop and recreate check constraint + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + DROP CONSTRAINT IF EXISTS chk_job_type_fields + """)) + await conn.execute(text(""" + ALTER TABLE scheduled_jobs + ADD CONSTRAINT chk_job_type_fields + CHECK ( + (job_type = 'predefined' AND predefined_function IS NOT NULL) + OR + (job_type = 'custom_code' AND python_code IS NOT NULL) + ) + """)) + print("✅ Added check constraint") + + print("\n📋 Final table structure:") + result = await conn.execute(text(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'scheduled_jobs' + ORDER BY ordinal_position + """)) + + rows = result.fetchall() + for row in rows: + print(f" - {row[0]}: {row[1]} (nullable: {row[2]})") + + +if __name__ == "__main__": + asyncio.run(fix_enum_and_migrate()) diff --git a/backend/scripts/fix_position_source_constraint.py b/backend/scripts/fix_position_source_constraint.py new file mode 100644 index 0000000..f2fd826 --- /dev/null +++ b/backend/scripts/fix_position_source_constraint.py @@ -0,0 +1,59 @@ +""" +Fix positions table CHECK constraint to include 'nasa_horizons_cron' +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from app.database import engine + + +async def fix_constraint(): + """Fix positions table source constraint""" + async with engine.begin() as conn: + print("🔍 Checking current constraint...") + + # Check current constraint definition + result = await conn.execute(text(""" + SELECT pg_get_constraintdef(oid) + FROM pg_constraint + WHERE conname = 'chk_source' AND conrelid = 'positions'::regclass; + """)) + current = result.fetchone() + if current: + print(f"📋 Current constraint: {current[0]}") + else: + print("⚠️ No constraint found!") + + print("\n🔧 Dropping old constraint...") + await conn.execute(text(""" + ALTER TABLE positions DROP CONSTRAINT IF EXISTS chk_source; + """)) + print("✅ Old constraint dropped") + + print("\n🆕 Creating new constraint with 'nasa_horizons_cron'...") + await conn.execute(text(""" + ALTER TABLE positions ADD CONSTRAINT chk_source + CHECK (source IN ('nasa_horizons', 'nasa_horizons_cron', 'calculated', 'user_defined', 'imported')); + """)) + print("✅ New constraint created") + + # Verify new constraint + result = await conn.execute(text(""" + SELECT pg_get_constraintdef(oid) + FROM pg_constraint + WHERE conname = 'chk_source' AND conrelid = 'positions'::regclass; + """)) + new_constraint = result.fetchone() + if new_constraint: + print(f"\n✅ New constraint: {new_constraint[0]}") + + print("\n🎉 Constraint update completed successfully!") + + +if __name__ == "__main__": + asyncio.run(fix_constraint()) diff --git a/backend/scripts/run_migration.py b/backend/scripts/run_migration.py new file mode 100644 index 0000000..0116c6e --- /dev/null +++ b/backend/scripts/run_migration.py @@ -0,0 +1,51 @@ +""" +Run database migration for scheduled_jobs table +""" +import asyncio +import asyncpg +from pathlib import Path + + +async def run_migration(): + """Run the migration SQL script""" + # Read the migration file + migration_file = Path(__file__).parent.parent / "migrations" / "add_predefined_jobs_support.sql" + + with open(migration_file, 'r') as f: + sql = f.read() + + # Connect to database + conn = await asyncpg.connect( + user='postgres', + password='cosmo2024', + database='cosmo_db', + host='localhost', + port=5432 + ) + + try: + print("🔄 Running migration: add_predefined_jobs_support.sql") + + # Execute the migration + await conn.execute(sql) + + print("✅ Migration completed successfully!") + + # Verify the changes + result = await conn.fetch(""" + SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_name = 'scheduled_jobs' + ORDER BY ordinal_position + """) + + print("\n📋 Current scheduled_jobs table structure:") + for row in result: + print(f" - {row['column_name']}: {row['data_type']} (nullable: {row['is_nullable']})") + + finally: + await conn.close() + + +if __name__ == "__main__": + asyncio.run(run_migration()) diff --git a/backend/scripts/run_scheduled_jobs_migration.py b/backend/scripts/run_scheduled_jobs_migration.py new file mode 100644 index 0000000..b4d8e8e --- /dev/null +++ b/backend/scripts/run_scheduled_jobs_migration.py @@ -0,0 +1,84 @@ +""" +Run database migration for scheduled_jobs table +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text +from app.database import engine + + +async def run_migration(): + """Run the migration SQL script""" + # Read the migration file + migration_file = Path(__file__).parent.parent / "migrations" / "add_predefined_jobs_support.sql" + + with open(migration_file, 'r') as f: + sql_content = f.read() + + # Split SQL into individual statements + # Remove comments and split by semicolon + statements = [] + current_stmt = [] + in_do_block = False + + for line in sql_content.split('\n'): + stripped = line.strip() + + # Skip comments + if stripped.startswith('--') or not stripped: + continue + + # Handle DO blocks specially + if stripped.startswith('DO $$'): + in_do_block = True + current_stmt.append(line) + elif stripped == 'END $$;': + current_stmt.append(line) + statements.append('\n'.join(current_stmt)) + current_stmt = [] + in_do_block = False + elif in_do_block or not stripped.endswith(';'): + current_stmt.append(line) + else: + # Regular statement ending with ; + current_stmt.append(line) + statements.append('\n'.join(current_stmt)) + current_stmt = [] + + async with engine.begin() as conn: + print("🔄 Running migration: add_predefined_jobs_support.sql") + + # Execute each statement separately + for i, stmt in enumerate(statements): + if stmt.strip(): + try: + print(f" Executing statement {i+1}/{len(statements)}...") + await conn.execute(text(stmt)) + except Exception as e: + # Some statements might fail if already applied, that's okay + print(f" ⚠️ Statement {i+1} warning: {e}") + + print("✅ Migration completed successfully!") + + # Verify the changes + result = await conn.execute(text(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'scheduled_jobs' + ORDER BY ordinal_position + """)) + + rows = result.fetchall() + + print("\n📋 Current scheduled_jobs table structure:") + for row in rows: + print(f" - {row[0]}: {row[1]} (nullable: {row[2]})") + + +if __name__ == "__main__": + asyncio.run(run_migration()) diff --git a/backend/scripts/update_scheduled_jobs.py b/backend/scripts/update_scheduled_jobs.py new file mode 100644 index 0000000..4ac043d --- /dev/null +++ b/backend/scripts/update_scheduled_jobs.py @@ -0,0 +1,87 @@ +""" +Update existing job to use predefined task and add new event sync job +""" +import asyncio +import sys +from pathlib import Path + +# Add backend to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from sqlalchemy import text, update +from app.database import engine +from app.models.db.scheduled_job import ScheduledJob, JobType + + +async def update_jobs(): + """Update existing job and add new event sync job""" + async with engine.begin() as conn: + print("🔄 Updating scheduled jobs...") + + # 1. Update existing job to use predefined task + result = await conn.execute(text(""" + UPDATE scheduled_jobs + SET + job_type = 'predefined', + predefined_function = 'sync_solar_system_positions', + function_params = '{"days": 7, "source": "nasa_horizons_cron"}'::jsonb, + description = '每日同步太阳系天体位置数据(使用内置任务)' + WHERE id = 1 + RETURNING id, name + """)) + updated = result.fetchone() + if updated: + print(f"✅ Updated job ID {updated[0]}: {updated[1]} -> predefined task") + + # 2. Add new celestial events sync job (disabled) + result = await conn.execute(text(""" + INSERT INTO scheduled_jobs ( + name, + job_type, + predefined_function, + function_params, + cron_expression, + description, + is_active + ) + VALUES ( + '天体事件同步', + 'predefined', + 'sync_celestial_events', + '{"days_ahead": 30}'::jsonb, + '0 3 * * *', + '每日凌晨3点同步未来30天的天体事件(预留功能,暂未实现)', + false + ) + ON CONFLICT DO NOTHING + RETURNING id, name + """)) + new_job = result.fetchone() + if new_job: + print(f"✅ Added new job ID {new_job[0]}: {new_job[1]} (disabled)") + else: + print("ℹ️ Event sync job already exists") + + # 3. Show all jobs + print("\n📋 Current scheduled jobs:") + result = await conn.execute(text(""" + SELECT + id, + name, + job_type, + predefined_function, + is_active, + cron_expression + FROM scheduled_jobs + ORDER BY id + """)) + + for row in result.fetchall(): + status = "🟢 启用" if row[4] else "🔴 禁用" + job_type_display = "内置任务" if row[2] == 'predefined' else "自定义代码" + print(f" {status} ID {row[0]}: {row[1]}") + print(f" 类型: {job_type_display} | 函数: {row[3]} | CRON: {row[5]}") + + +if __name__ == "__main__": + asyncio.run(update_jobs()) diff --git a/frontend/src/Router.tsx b/frontend/src/Router.tsx index def7f47..8db67bc 100644 --- a/frontend/src/Router.tsx +++ b/frontend/src/Router.tsx @@ -12,6 +12,7 @@ import { Users } from './pages/admin/Users'; import { NASADownload } from './pages/admin/NASADownload'; import { SystemSettings } from './pages/admin/SystemSettings'; import { Tasks } from './pages/admin/Tasks'; +import { ScheduledJobs } from './pages/admin/ScheduledJobs'; // Import ScheduledJobs import { auth } from './utils/auth'; import { ToastProvider } from './contexts/ToastContext'; import App from './App'; @@ -52,6 +53,7 @@ export function Router() { } /> } /> } /> + } /> {/* Add route */} } /> diff --git a/frontend/src/components/admin/DataTable.tsx b/frontend/src/components/admin/DataTable.tsx index f4b8ed6..8b22de6 100644 --- a/frontend/src/components/admin/DataTable.tsx +++ b/frontend/src/components/admin/DataTable.tsx @@ -49,6 +49,9 @@ export function DataTable({ ...columns, ]; + // Check if an action column already exists in the provided columns + const hasExistingActionColumn = columns.some(col => col.key === 'action'); + // Add status column if onStatusChange is provided if (onStatusChange) { tableColumns.push({ @@ -66,8 +69,9 @@ export function DataTable({ }); } - // Add operations column if onEdit or onDelete is provided - if (onEdit || onDelete || customActions) { + // Add operations column if onEdit or onDelete or customActions is provided + // and if there isn't already an 'action' column explicitly defined by the parent + if (!hasExistingActionColumn && (onEdit || onDelete || customActions)) { tableColumns.push({ title: '操作', key: 'action', diff --git a/frontend/src/pages/admin/CelestialBodies.tsx b/frontend/src/pages/admin/CelestialBodies.tsx index f983e1f..2ccf420 100644 --- a/frontend/src/pages/admin/CelestialBodies.tsx +++ b/frontend/src/pages/admin/CelestialBodies.tsx @@ -341,20 +341,14 @@ export function CelestialBodies() { setLoading(true); try { - const response = await request.post( + await request.post( `/celestial/admin/orbits/generate?body_ids=${record.id}` ); - if (response.data.results && response.data.results.length > 0) { - const result = response.data.results[0]; - if (result.status === 'success') { - toast.success(`轨道生成成功!共 ${result.num_points} 个点`); - } else { - toast.error(`轨道生成失败:${result.error}`); - } - } + // 提示用户任务已启动 + toast.success('轨道生成任务已启动,请前往"系统任务"查看进度', 5000); } catch (error: any) { - toast.error(error.response?.data?.detail || '轨道生成失败'); + toast.error(error.response?.data?.detail || '轨道生成任务启动失败'); } finally { setLoading(false); } @@ -512,7 +506,7 @@ export function CelestialBodies() { return ( handleGenerateOrbit(record)} okText="确认" cancelText="取消" diff --git a/frontend/src/pages/admin/NASADownload.tsx b/frontend/src/pages/admin/NASADownload.tsx index f6a7d46..e96bce1 100644 --- a/frontend/src/pages/admin/NASADownload.tsx +++ b/frontend/src/pages/admin/NASADownload.tsx @@ -244,7 +244,7 @@ export function NASADownload() { body_ids: selectedBodies, dates: datesToDownload }); - toast.success('后台下载任务已启动,请前往“系统任务”查看进度'); + toast.success('批量下载任务已启动,请前往“系统任务”查看进度'); } } catch (error) { toast.error('请求失败'); diff --git a/frontend/src/pages/admin/ScheduledJobs.tsx b/frontend/src/pages/admin/ScheduledJobs.tsx new file mode 100644 index 0000000..8cec58d --- /dev/null +++ b/frontend/src/pages/admin/ScheduledJobs.tsx @@ -0,0 +1,586 @@ +/** + * Scheduled Jobs Management Page + */ +import { useState, useEffect } from 'react'; +import { Modal, Form, Input, Switch, Button, Space, Popconfirm, Tag, Tooltip, Badge, Tabs, Select, Row, Col, Card, Alert } from 'antd'; +import { PlayCircleOutlined, EditOutlined, DeleteOutlined, QuestionCircleOutlined, InfoCircleOutlined } from '@ant-design/icons'; +import type { ColumnsType } from 'antd/es/table'; +import { DataTable } from '../../components/admin/DataTable'; +import { request } from '../../utils/request'; +import { useToast } from '../../contexts/ToastContext'; + +interface ScheduledJob { + id: number; + name: string; + job_type: 'predefined' | 'custom_code'; + predefined_function?: string; + function_params?: Record; + cron_expression: string; + python_code?: string; + is_active: boolean; + description: string; + last_run_at: string | null; + last_run_status: 'success' | 'failed' | null; + next_run_at: string | null; + created_at: string; + updated_at: string; +} + +interface AvailableTask { + name: string; + description: string; + category: string; + parameters: Array<{ + name: string; + type: string; + description: string; + required: boolean; + default: any; + }>; +} + +export function ScheduledJobs() { + const [loading, setLoading] = useState(false); + const [data, setData] = useState([]); + const [filteredData, setFilteredData] = useState([]); + const [isModalOpen, setIsModalOpen] = useState(false); + const [editingRecord, setEditingRecord] = useState(null); + const [activeTabKey, setActiveTabKey] = useState('basic'); + const [availableTasks, setAvailableTasks] = useState([]); + const [selectedTask, setSelectedTask] = useState(null); + const [form] = Form.useForm(); + const toast = useToast(); + + const jobType = Form.useWatch('job_type', form); + const predefinedFunction = Form.useWatch('predefined_function', form); + + useEffect(() => { + loadData(); + loadAvailableTasks(); + }, []); + + useEffect(() => { + // When predefined function changes, update selected task + if (predefinedFunction && availableTasks.length > 0) { + const task = availableTasks.find(t => t.name === predefinedFunction); + setSelectedTask(task || null); + + // Set default parameter values only if not editing + if (task && !editingRecord) { + const defaultParams: Record = {}; + task.parameters.forEach(param => { + if (param.default !== null && param.default !== undefined) { + defaultParams[param.name] = param.default; + } + }); + form.setFieldsValue({ function_params: defaultParams }); + } else if (task && editingRecord) { + // When editing, just set the selected task, don't override params + setSelectedTask(task); + } + } else { + setSelectedTask(null); + } + }, [predefinedFunction, availableTasks]); + + const loadData = async () => { + setLoading(true); + try { + const { data: result } = await request.get('/scheduled-jobs'); + setData(result || []); + setFilteredData(result || []); + } catch (error) { + toast.error('加载数据失败'); + } finally { + setLoading(false); + } + }; + + const loadAvailableTasks = async () => { + try { + const { data: result } = await request.get('/scheduled-jobs/available-tasks'); + setAvailableTasks(result || []); + } catch (error) { + toast.error('加载可用任务列表失败'); + } + }; + + const handleSearch = (keyword: string) => { + const lowerKeyword = keyword.toLowerCase(); + const filtered = data.filter( + (item) => + item.name.toLowerCase().includes(lowerKeyword) || + item.description?.toLowerCase().includes(lowerKeyword) + ); + setFilteredData(filtered); + }; + + const handleAdd = () => { + setEditingRecord(null); + setSelectedTask(null); + form.resetFields(); + form.setFieldsValue({ + job_type: 'predefined', + is_active: true, + function_params: {} + }); + setActiveTabKey('basic'); + setIsModalOpen(true); + }; + + const handleEdit = (record: ScheduledJob) => { + setEditingRecord(record); + form.setFieldsValue({ + ...record, + function_params: record.function_params || {} + }); + setActiveTabKey('basic'); + setIsModalOpen(true); + }; + + const handleDelete = async (record: ScheduledJob) => { + try { + await request.delete(`/scheduled-jobs/${record.id}`); + toast.success('删除成功'); + loadData(); + } catch (error) { + toast.error('删除失败'); + } + }; + + const handleRunNow = async (record: ScheduledJob) => { + try { + await request.post(`/scheduled-jobs/${record.id}/run`); + toast.success('定时任务已触发,请前往"系统任务"中查看进度'); + setTimeout(loadData, 1000); + } catch (error) { + toast.error('触发失败'); + } + }; + + const handleModalOk = async () => { + try { + const values = await form.validateFields(); + + // Clean up data based on job_type + if (values.job_type === 'predefined') { + delete values.python_code; + } else { + delete values.predefined_function; + delete values.function_params; + } + + if (editingRecord) { + await request.put(`/scheduled-jobs/${editingRecord.id}`, values); + toast.success('更新成功'); + } else { + await request.post('/scheduled-jobs', values); + toast.success('创建成功'); + } + + setIsModalOpen(false); + loadData(); + } catch (error: any) { + if (error.response?.data?.detail) { + const detail = error.response.data.detail; + if (typeof detail === 'object' && detail.message) { + toast.error(detail.message); + } else { + toast.error(detail); + } + } + } + }; + + const columns: ColumnsType = [ + { + title: 'ID', + dataIndex: 'id', + width: 60, + }, + { + title: '任务名称', + dataIndex: 'name', + width: 200, + render: (text, record) => ( +
+
{text}
+ {record.description && ( +
{record.description}
+ )} +
+ ), + }, + { + title: '类型', + dataIndex: 'job_type', + width: 120, + render: (type) => ( + + {type === 'predefined' ? '内置任务' : '自定义代码'} + + ), + }, + { + title: '任务函数', + dataIndex: 'predefined_function', + width: 200, + render: (func, record) => { + if (record.job_type === 'predefined') { + return {func}; + } + return -; + }, + }, + { + title: 'Cron 表达式', + dataIndex: 'cron_expression', + width: 130, + render: (text) => {text}, + }, + { + title: '状态', + dataIndex: 'is_active', + width: 80, + render: (active) => ( + + ), + }, + { + title: '上次执行', + width: 200, + render: (_, record) => ( +
+ {record.last_run_at ? ( + <> +
{new Date(record.last_run_at).toLocaleString()}
+ + {record.last_run_status === 'success' ? '成功' : '失败'} + + + ) : ( + 从未执行 + )} +
+ ), + }, + { + title: '操作', + key: 'action', + width: 150, + render: (_, record) => ( + + + @@ -364,6 +383,16 @@ export function SystemSettings() { ); + } else if (valueType === 'json') { + return ( + + + + ); } else { return ( - + ); } diff --git a/frontend/src/pages/admin/Tasks.tsx b/frontend/src/pages/admin/Tasks.tsx index ba486ac..2fead02 100644 --- a/frontend/src/pages/admin/Tasks.tsx +++ b/frontend/src/pages/admin/Tasks.tsx @@ -1,5 +1,5 @@ import { useState, useEffect, useRef } from 'react'; -import { Tag, Progress, Button, Modal, Descriptions, Badge, Typography } from 'antd'; +import { Tag, Progress, Button, Modal, Descriptions, Badge, Typography, Space } from 'antd'; import { ReloadOutlined, EyeOutlined } from '@ant-design/icons'; import type { ColumnsType } from 'antd/es/table'; import { DataTable } from '../../components/admin/DataTable'; @@ -127,10 +127,6 @@ export function Tasks() { return (
-
- -
-