76a67e8237
将分散的 config.yml、workflow.yaml、sandbox.yaml 加载逻辑统一到 AppConfig 模型, 启动时一次性校验,失败则 fast-fail。sandbox.py 改为从统一配置取值,消除重复加载。 同时修复 onebot 测试并新增14个统一配置测试(总测试 285→300)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
136 lines
3.7 KiB
Python
136 lines
3.7 KiB
Python
"""KiloStar 工具沙箱:路径校验、命令过滤、代码静态检查。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
from typing import List, Optional
|
||
|
||
from kilostar.utils.config_loader import (
|
||
SandboxConfig,
|
||
FilesystemPolicy,
|
||
ShellPolicy,
|
||
PythonExecutorPolicy,
|
||
get_app_config,
|
||
reload_app_config,
|
||
)
|
||
|
||
|
||
def get_sandbox_config() -> SandboxConfig:
|
||
return get_app_config().sandbox
|
||
|
||
|
||
def reload_sandbox_config() -> SandboxConfig:
|
||
reload_app_config(section="sandbox")
|
||
return get_app_config().sandbox
|
||
|
||
|
||
# ─── Exceptions ───
|
||
|
||
class PathViolation(Exception):
|
||
pass
|
||
|
||
|
||
class CommandViolation(Exception):
|
||
pass
|
||
|
||
|
||
class CodeViolation(Exception):
|
||
pass
|
||
|
||
|
||
# ─── Path Validation ───
|
||
|
||
def validate_path(file_path: str, *, write: bool = False) -> str:
|
||
cfg = get_sandbox_config()
|
||
if not cfg.enabled:
|
||
return os.path.abspath(file_path)
|
||
|
||
fs = cfg.filesystem
|
||
resolved = os.path.realpath(os.path.abspath(file_path))
|
||
|
||
for denied in fs.denied_paths:
|
||
denied_resolved = os.path.realpath(denied)
|
||
if resolved == denied_resolved or resolved.startswith(denied_resolved + os.sep):
|
||
raise PathViolation(f"路径被禁止访问: {file_path}")
|
||
|
||
if write:
|
||
ws_root = os.path.realpath(fs.workspace_root)
|
||
if not (resolved == ws_root or resolved.startswith(ws_root + os.sep)):
|
||
raise PathViolation(
|
||
f"写操作路径必须在工作目录内: {fs.workspace_root},"
|
||
f"当前路径: {file_path}"
|
||
)
|
||
return resolved
|
||
|
||
allowed = [os.path.realpath(fs.workspace_root)]
|
||
for p in fs.allowed_read_paths:
|
||
allowed.append(os.path.realpath(p))
|
||
|
||
for allowed_dir in allowed:
|
||
if resolved == allowed_dir or resolved.startswith(allowed_dir + os.sep):
|
||
return resolved
|
||
|
||
raise PathViolation(
|
||
f"读操作路径不在允许范围内: {file_path}。"
|
||
f"允许的目录: {[fs.workspace_root] + fs.allowed_read_paths}"
|
||
)
|
||
|
||
|
||
# ─── Shell Command Validation ───
|
||
|
||
def validate_shell_command(command: str) -> str:
|
||
cfg = get_sandbox_config()
|
||
if not cfg.enabled:
|
||
return command
|
||
|
||
shell_cfg = cfg.shell
|
||
if not shell_cfg.enabled:
|
||
raise CommandViolation("shell_executor 已被沙箱策略禁用")
|
||
|
||
cmd_lower = command.strip().lower()
|
||
|
||
for blocked in shell_cfg.blocked_commands:
|
||
if cmd_lower.startswith(blocked.lower()):
|
||
raise CommandViolation(f"命令被禁止: {blocked}")
|
||
|
||
for op in shell_cfg.blocked_operators:
|
||
if op in command:
|
||
raise CommandViolation(f"命令包含被禁止的操作符: '{op}'")
|
||
|
||
return command
|
||
|
||
|
||
def get_shell_timeout(requested: int) -> int:
|
||
cfg = get_sandbox_config()
|
||
return min(requested, cfg.shell.max_timeout)
|
||
|
||
|
||
# ─── Python Code Validation ───
|
||
|
||
def validate_python_code(code: str) -> str:
|
||
cfg = get_sandbox_config()
|
||
if not cfg.enabled:
|
||
return code
|
||
|
||
py_cfg = cfg.python_executor
|
||
if not py_cfg.enabled:
|
||
raise CodeViolation("python_executor 已被沙箱策略禁用")
|
||
|
||
for module in py_cfg.blocked_imports:
|
||
pattern = rf"(?:^|\n)\s*(?:import\s+{re.escape(module)}|from\s+{re.escape(module)})\b"
|
||
if re.search(pattern, code):
|
||
raise CodeViolation(f"禁止导入模块: {module}")
|
||
|
||
for builtin in py_cfg.blocked_builtins:
|
||
pattern = rf"\b{re.escape(builtin)}\s*\("
|
||
if re.search(pattern, code):
|
||
raise CodeViolation(f"禁止使用: {builtin}()")
|
||
|
||
return code
|
||
|
||
|
||
def get_python_timeout(requested: int) -> int:
|
||
cfg = get_sandbox_config()
|
||
return min(requested, cfg.python_executor.max_timeout)
|