Files
KiloStar/kilostar/utils/sandbox.py
T
zhaoxi 80174acaae feat(security): 新增工具沙箱安全机制
为所有工具插件添加沙箱拦截层,防止危险的文件访问、Shell命令和Python代码执行。
包含配置文件、核心校验逻辑及31个单元测试。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 12:09:15 +00:00

177 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""KiloStar 工具沙箱:路径校验、命令过滤、代码静态检查。"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import List, Optional
import yaml
from pydantic import BaseModel, Field
_CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config"
_SANDBOX_YAML = _CONFIG_DIR / "sandbox.yaml"
class FilesystemPolicy(BaseModel):
workspace_root: str = "/tmp/kilostar_workspace"
allowed_read_paths: List[str] = Field(default_factory=lambda: ["/tmp"])
denied_paths: List[str] = Field(default_factory=list)
class ShellPolicy(BaseModel):
enabled: bool = True
blocked_commands: List[str] = Field(default_factory=list)
blocked_operators: List[str] = Field(default_factory=list)
max_timeout: int = 60
class PythonExecutorPolicy(BaseModel):
enabled: bool = True
max_timeout: int = 30
blocked_imports: List[str] = Field(default_factory=list)
blocked_builtins: List[str] = Field(default_factory=list)
class SandboxConfig(BaseModel):
enabled: bool = True
filesystem: FilesystemPolicy = Field(default_factory=FilesystemPolicy)
shell: ShellPolicy = Field(default_factory=ShellPolicy)
python_executor: PythonExecutorPolicy = Field(default_factory=PythonExecutorPolicy)
_current: Optional[SandboxConfig] = None
def _load_sandbox_config() -> SandboxConfig:
if not _SANDBOX_YAML.exists():
return SandboxConfig()
with open(_SANDBOX_YAML, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
raw = data.get("sandbox", data)
return SandboxConfig.model_validate(raw)
def get_sandbox_config() -> SandboxConfig:
global _current
if _current is None:
_current = _load_sandbox_config()
return _current
def reload_sandbox_config() -> SandboxConfig:
global _current
_current = _load_sandbox_config()
return _current
# ─── Exceptions ───
class PathViolation(Exception):
pass
class CommandViolation(Exception):
pass
class CodeViolation(Exception):
pass
# ─── Path Validation ───
def validate_path(file_path: str, *, write: bool = False) -> str:
cfg = get_sandbox_config()
if not cfg.enabled:
return os.path.abspath(file_path)
fs = cfg.filesystem
resolved = os.path.realpath(os.path.abspath(file_path))
for denied in fs.denied_paths:
denied_resolved = os.path.realpath(denied)
if resolved == denied_resolved or resolved.startswith(denied_resolved + os.sep):
raise PathViolation(f"路径被禁止访问: {file_path}")
if write:
ws_root = os.path.realpath(fs.workspace_root)
if not (resolved == ws_root or resolved.startswith(ws_root + os.sep)):
raise PathViolation(
f"写操作路径必须在工作目录内: {fs.workspace_root}"
f"当前路径: {file_path}"
)
return resolved
allowed = [os.path.realpath(fs.workspace_root)]
for p in fs.allowed_read_paths:
allowed.append(os.path.realpath(p))
for allowed_dir in allowed:
if resolved == allowed_dir or resolved.startswith(allowed_dir + os.sep):
return resolved
raise PathViolation(
f"读操作路径不在允许范围内: {file_path}"
f"允许的目录: {[fs.workspace_root] + fs.allowed_read_paths}"
)
# ─── Shell Command Validation ───
def validate_shell_command(command: str) -> str:
cfg = get_sandbox_config()
if not cfg.enabled:
return command
shell_cfg = cfg.shell
if not shell_cfg.enabled:
raise CommandViolation("shell_executor 已被沙箱策略禁用")
cmd_lower = command.strip().lower()
for blocked in shell_cfg.blocked_commands:
if cmd_lower.startswith(blocked.lower()):
raise CommandViolation(f"命令被禁止: {blocked}")
for op in shell_cfg.blocked_operators:
if op in command:
raise CommandViolation(f"命令包含被禁止的操作符: '{op}'")
return command
def get_shell_timeout(requested: int) -> int:
cfg = get_sandbox_config()
return min(requested, cfg.shell.max_timeout)
# ─── Python Code Validation ───
def validate_python_code(code: str) -> str:
cfg = get_sandbox_config()
if not cfg.enabled:
return code
py_cfg = cfg.python_executor
if not py_cfg.enabled:
raise CodeViolation("python_executor 已被沙箱策略禁用")
for module in py_cfg.blocked_imports:
pattern = rf"(?:^|\n)\s*(?:import\s+{re.escape(module)}|from\s+{re.escape(module)})\b"
if re.search(pattern, code):
raise CodeViolation(f"禁止导入模块: {module}")
for builtin in py_cfg.blocked_builtins:
pattern = rf"\b{re.escape(builtin)}\s*\("
if re.search(pattern, code):
raise CodeViolation(f"禁止使用: {builtin}()")
return code
def get_python_timeout(requested: int) -> int:
cfg = get_sandbox_config()
return min(requested, cfg.python_executor.max_timeout)