138 lines
4.4 KiB
Python
138 lines
4.4 KiB
Python
"""
|
|
Python Code Validator for Scheduled Jobs
|
|
验证用户提交的 Python 代码安全性和语法正确性
|
|
"""
|
|
import ast
|
|
import re
|
|
from typing import Dict, List, Tuple
|
|
|
|
|
|
class PythonCodeValidator:
|
|
"""验证Python代码的安全性和有效性"""
|
|
|
|
# 危险的内置函数和模块
|
|
DANGEROUS_BUILTINS = {
|
|
'eval', 'exec', 'compile', '__import__',
|
|
'open', 'file', 'input', 'raw_input',
|
|
'execfile', 'reload',
|
|
}
|
|
|
|
# 危险的模块
|
|
DANGEROUS_MODULES = {
|
|
'os', 'sys', 'subprocess', 'socket',
|
|
'shutil', 'pickle', 'multiprocessing',
|
|
'threading', 'ctypes', 'importlib',
|
|
}
|
|
|
|
# 允许的模块(白名单)
|
|
ALLOWED_MODULES = {
|
|
'asyncio', 'datetime', 'math', 'json',
|
|
'logging', 'typing', 'collections',
|
|
'app.services', 'app.models', 'sqlalchemy',
|
|
}
|
|
|
|
@staticmethod
|
|
def validate_syntax(code: str) -> Tuple[bool, str]:
|
|
"""
|
|
验证Python代码语法
|
|
|
|
Returns:
|
|
(is_valid, error_message)
|
|
"""
|
|
try:
|
|
ast.parse(code)
|
|
return True, ""
|
|
except SyntaxError as e:
|
|
return False, f"语法错误 (第{e.lineno}行): {e.msg}"
|
|
except Exception as e:
|
|
return False, f"代码解析错误: {str(e)}"
|
|
|
|
@staticmethod
|
|
def check_dangerous_functions(code: str) -> Tuple[bool, List[str]]:
|
|
"""
|
|
检查是否使用了危险函数
|
|
|
|
Returns:
|
|
(is_safe, dangerous_items)
|
|
"""
|
|
dangerous_found = []
|
|
|
|
try:
|
|
tree = ast.parse(code)
|
|
|
|
for node in ast.walk(tree):
|
|
# 检查函数调用
|
|
if isinstance(node, ast.Call):
|
|
if isinstance(node.func, ast.Name):
|
|
if node.func.id in PythonCodeValidator.DANGEROUS_BUILTINS:
|
|
dangerous_found.append(f"危险函数: {node.func.id}")
|
|
|
|
# 检查模块导入
|
|
elif isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
module_name = alias.name.split('.')[0]
|
|
if module_name in PythonCodeValidator.DANGEROUS_MODULES:
|
|
if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES):
|
|
dangerous_found.append(f"危险模块导入: {alias.name}")
|
|
|
|
elif isinstance(node, ast.ImportFrom):
|
|
if node.module:
|
|
module_name = node.module.split('.')[0]
|
|
if module_name in PythonCodeValidator.DANGEROUS_MODULES:
|
|
if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES):
|
|
dangerous_found.append(f"危险模块导入: from {node.module}")
|
|
|
|
return len(dangerous_found) == 0, dangerous_found
|
|
|
|
except Exception as e:
|
|
return False, [f"代码分析错误: {str(e)}"]
|
|
|
|
@staticmethod
|
|
def validate_code(code: str) -> Dict:
|
|
"""
|
|
完整的代码验证
|
|
|
|
Returns:
|
|
{
|
|
"valid": bool,
|
|
"errors": List[str],
|
|
"warnings": List[str]
|
|
}
|
|
"""
|
|
errors = []
|
|
warnings = []
|
|
|
|
# 1. 检查代码是否为空
|
|
if not code or not code.strip():
|
|
errors.append("代码不能为空")
|
|
return {"valid": False, "errors": errors, "warnings": warnings}
|
|
|
|
# 2. 语法验证
|
|
syntax_valid, syntax_error = PythonCodeValidator.validate_syntax(code)
|
|
if not syntax_valid:
|
|
errors.append(syntax_error)
|
|
return {"valid": False, "errors": errors, "warnings": warnings}
|
|
|
|
# 3. 安全检查
|
|
is_safe, dangerous_items = PythonCodeValidator.check_dangerous_functions(code)
|
|
if not is_safe:
|
|
errors.extend(dangerous_items)
|
|
|
|
# 4. 检查代码长度
|
|
if len(code) > 10000: # 10KB limit
|
|
warnings.append("代码过长,可能影响性能")
|
|
|
|
# 5. 检查是否包含无限循环风险
|
|
if re.search(r'while\s+True\s*:', code):
|
|
warnings.append("检测到 'while True',请确保有退出条件")
|
|
|
|
return {
|
|
"valid": len(errors) == 0,
|
|
"errors": errors,
|
|
"warnings": warnings
|
|
}
|
|
|
|
|
|
# 导出验证器实例
|
|
code_validator = PythonCodeValidator()
|