""" Python Code Validator for Scheduled Jobs 验证用户提交的 Python 代码安全性和语法正确性 """ import ast import re from typing import Dict, List, Tuple class PythonCodeValidator: """验证Python代码的安全性和有效性""" # 危险的内置函数和模块 DANGEROUS_BUILTINS = { 'eval', 'exec', 'compile', '__import__', 'open', 'file', 'input', 'raw_input', 'execfile', 'reload', } # 危险的模块 DANGEROUS_MODULES = { 'os', 'sys', 'subprocess', 'socket', 'shutil', 'pickle', 'multiprocessing', 'threading', 'ctypes', 'importlib', } # 允许的模块(白名单) ALLOWED_MODULES = { 'asyncio', 'datetime', 'math', 'json', 'logging', 'typing', 'collections', 'app.services', 'app.models', 'sqlalchemy', } @staticmethod def validate_syntax(code: str) -> Tuple[bool, str]: """ 验证Python代码语法 Returns: (is_valid, error_message) """ try: ast.parse(code) return True, "" except SyntaxError as e: return False, f"语法错误 (第{e.lineno}行): {e.msg}" except Exception as e: return False, f"代码解析错误: {str(e)}" @staticmethod def check_dangerous_functions(code: str) -> Tuple[bool, List[str]]: """ 检查是否使用了危险函数 Returns: (is_safe, dangerous_items) """ dangerous_found = [] try: tree = ast.parse(code) for node in ast.walk(tree): # 检查函数调用 if isinstance(node, ast.Call): if isinstance(node.func, ast.Name): if node.func.id in PythonCodeValidator.DANGEROUS_BUILTINS: dangerous_found.append(f"危险函数: {node.func.id}") # 检查模块导入 elif isinstance(node, ast.Import): for alias in node.names: module_name = alias.name.split('.')[0] if module_name in PythonCodeValidator.DANGEROUS_MODULES: if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES): dangerous_found.append(f"危险模块导入: {alias.name}") elif isinstance(node, ast.ImportFrom): if node.module: module_name = node.module.split('.')[0] if module_name in PythonCodeValidator.DANGEROUS_MODULES: if not any(module_name.startswith(allowed) for allowed in PythonCodeValidator.ALLOWED_MODULES): dangerous_found.append(f"危险模块导入: from {node.module}") return len(dangerous_found) == 0, dangerous_found except Exception as e: return False, [f"代码分析错误: {str(e)}"] @staticmethod def validate_code(code: str) -> Dict: """ 完整的代码验证 Returns: { "valid": bool, "errors": List[str], "warnings": List[str] } """ errors = [] warnings = [] # 1. 检查代码是否为空 if not code or not code.strip(): errors.append("代码不能为空") return {"valid": False, "errors": errors, "warnings": warnings} # 2. 语法验证 syntax_valid, syntax_error = PythonCodeValidator.validate_syntax(code) if not syntax_valid: errors.append(syntax_error) return {"valid": False, "errors": errors, "warnings": warnings} # 3. 安全检查 is_safe, dangerous_items = PythonCodeValidator.check_dangerous_functions(code) if not is_safe: errors.extend(dangerous_items) # 4. 检查代码长度 if len(code) > 10000: # 10KB limit warnings.append("代码过长,可能影响性能") # 5. 检查是否包含无限循环风险 if re.search(r'while\s+True\s*:', code): warnings.append("检测到 'while True',请确保有退出条件") return { "valid": len(errors) == 0, "errors": errors, "warnings": warnings } # 导出验证器实例 code_validator = PythonCodeValidator()