cosmo/backend/app/services/code_validator.py

138 lines
4.4 KiB
Python

"""
Python Code Validator for Scheduled Jobs
验证用户提交的 Python 代码安全性和语法正确性
"""
import ast
import re
from typing import Dict, List, Tuple
class PythonCodeValidator:
"""验证Python代码的安全性和有效性"""
# 危险的内置函数和模块
DANGEROUS_BUILTINS = {
'eval', 'exec', 'compile', '__import__',
'open', 'file', 'input', 'raw_input',
'execfile', 'reload',
}
# 危险的模块
DANGEROUS_MODULES = {
'os', 'sys', 'subprocess', 'socket',
'shutil', 'pickle', 'multiprocessing',
'threading', 'ctypes', 'importlib',
}
# 允许的模块(白名单)
ALLOWED_MODULES = {
'asyncio', 'datetime', 'math', 'json',
'logging', 'typing', 'collections',
'app.services', 'app.models', 'sqlalchemy',
}
@staticmethod
def validate_syntax(code: str) -> Tuple[bool, str]:
"""
验证Python代码语法
Returns:
(is_valid, error_message)
"""
try:
ast.parse(code)
return True, ""
except SyntaxError as e:
return False, f"语法错误 (第{e.lineno}行): {e.msg}"
except Exception as e:
return False, f"代码解析错误: {str(e)}"
@staticmethod
def check_dangerous_functions(code: str) -> Tuple[bool, List[str]]:
"""
检查是否使用了危险函数
Returns:
(is_safe, dangerous_items)
"""
dangerous_found = []
try:
tree = ast.parse(code)
for node in ast.walk(tree):
# 检查函数调用
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in PythonCodeValidator.DANGEROUS_BUILTINS:
dangerous_found.append(f"危险函数: {node.func.id}")
# 检查模块导入
elif isinstance(node, ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name in PythonCodeValidator.DANGEROUS_MODULES:
if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES):
dangerous_found.append(f"危险模块导入: {alias.name}")
elif isinstance(node, ast.ImportFrom):
if node.module:
module_name = node.module.split('.')[0]
if module_name in PythonCodeValidator.DANGEROUS_MODULES:
if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES):
dangerous_found.append(f"危险模块导入: from {node.module}")
return len(dangerous_found) == 0, dangerous_found
except Exception as e:
return False, [f"代码分析错误: {str(e)}"]
@staticmethod
def validate_code(code: str) -> Dict:
"""
完整的代码验证
Returns:
{
"valid": bool,
"errors": List[str],
"warnings": List[str]
}
"""
errors = []
warnings = []
# 1. 检查代码是否为空
if not code or not code.strip():
errors.append("代码不能为空")
return {"valid": False, "errors": errors, "warnings": warnings}
# 2. 语法验证
syntax_valid, syntax_error = PythonCodeValidator.validate_syntax(code)
if not syntax_valid:
errors.append(syntax_error)
return {"valid": False, "errors": errors, "warnings": warnings}
# 3. 安全检查
is_safe, dangerous_items = PythonCodeValidator.check_dangerous_functions(code)
if not is_safe:
errors.extend(dangerous_items)
# 4. 检查代码长度
if len(code) > 10000: # 10KB limit
warnings.append("代码过长,可能影响性能")
# 5. 检查是否包含无限循环风险
if re.search(r'while\s+True\s*:', code):
warnings.append("检测到 'while True',请确保有退出条件")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings
}
# 导出验证器实例
code_validator = PythonCodeValidator()