From 80174acaaea3b74c6ee74f031c33b6b595747e73 Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Wed, 3 Jun 2026 12:09:15 +0000 Subject: [PATCH] =?UTF-8?q?feat(security):=20=E6=96=B0=E5=A2=9E=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E6=B2=99=E7=AE=B1=E5=AE=89=E5=85=A8=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 为所有工具插件添加沙箱拦截层,防止危险的文件访问、Shell命令和Python代码执行。 包含配置文件、核心校验逻辑及31个单元测试。 Co-Authored-By: Claude Opus 4.7 (1M context) --- config/sandbox.yaml | 46 ++++ .../plugin/tool_plugin/edit_file/__init__.py | 7 + .../tool_plugin/file_reader/file_reader.py | 7 + .../tool_plugin/python_executor/__init__.py | 10 + .../tool_plugin/search_file/__init__.py | 31 ++- .../tool_plugin/shell_executor/__init__.py | 10 + .../plugin/tool_plugin/write_file/__init__.py | 7 + kilostar/utils/sandbox.py | 176 ++++++++++++++ tests/unit/test_sandbox.py | 216 ++++++++++++++++++ 9 files changed, 502 insertions(+), 8 deletions(-) create mode 100644 config/sandbox.yaml create mode 100644 kilostar/utils/sandbox.py create mode 100644 tests/unit/test_sandbox.py diff --git a/config/sandbox.yaml b/config/sandbox.yaml new file mode 100644 index 0000000..69fe789 --- /dev/null +++ b/config/sandbox.yaml @@ -0,0 +1,46 @@ +# KiloStar 沙箱安全策略配置 +sandbox: + enabled: true + + # 文件系统沙箱 + filesystem: + workspace_root: "/tmp/kilostar_workspace" + allowed_read_paths: + - "/tmp" + denied_paths: + - "/etc/shadow" + - "/etc/passwd" + - "/root" + + # Shell 命令沙箱 + shell: + enabled: true + blocked_commands: + - "rm -rf /" + - "mkfs" + - "dd " + - "shutdown" + - "reboot" + blocked_operators: + - "&&" + - "||" + - ";" + - "`" + - "$(" + max_timeout: 60 + + # Python 执行器沙箱 + python_executor: + enabled: true + max_timeout: 30 + blocked_imports: + - "os" + - "subprocess" + - "shutil" + - "socket" + - "ctypes" + blocked_builtins: + - "exec" + - "eval" + - "compile" + - "__import__" diff --git a/kilostar/plugin/tool_plugin/edit_file/__init__.py b/kilostar/plugin/tool_plugin/edit_file/__init__.py index 113bd4f..247dd89 100644 --- a/kilostar/plugin/tool_plugin/edit_file/__init__.py +++ b/kilostar/plugin/tool_plugin/edit_file/__init__.py @@ -34,6 +34,13 @@ async def edit_file( Returns: 操作结果描述 """ + from kilostar.utils.sandbox import validate_path, PathViolation + + try: + file_path = validate_path(file_path, write=True) + except PathViolation as e: + return f"[Sandbox] {e}" + try: if not os.path.exists(file_path): return f"[Error] 文件不存在: {file_path}" diff --git a/kilostar/plugin/tool_plugin/file_reader/file_reader.py b/kilostar/plugin/tool_plugin/file_reader/file_reader.py index 8668297..65cf3c1 100644 --- a/kilostar/plugin/tool_plugin/file_reader/file_reader.py +++ b/kilostar/plugin/tool_plugin/file_reader/file_reader.py @@ -47,6 +47,13 @@ async def file_reader(file_path: str) -> str: Returns: 文件内容文本,若文件不存在则返回错误信息 """ + from kilostar.utils.sandbox import validate_path, PathViolation + + try: + file_path = validate_path(file_path, write=False) + except PathViolation as e: + return f"[Sandbox] {e}" + try: with open(file_path, "r", encoding="utf-8") as f: return f.read() diff --git a/kilostar/plugin/tool_plugin/python_executor/__init__.py b/kilostar/plugin/tool_plugin/python_executor/__init__.py index 07f2386..6ec9449 100644 --- a/kilostar/plugin/tool_plugin/python_executor/__init__.py +++ b/kilostar/plugin/tool_plugin/python_executor/__init__.py @@ -32,6 +32,16 @@ async def python_executor(code: str, timeout: int = 30) -> str: Returns: 代码的标准输出 + 标准错误 """ + from kilostar.utils.sandbox import ( + validate_python_code, CodeViolation, get_python_timeout, + ) + + try: + code = validate_python_code(code) + except CodeViolation as e: + return f"[Sandbox] {e}" + timeout = get_python_timeout(timeout) + tmp_file = None try: with tempfile.NamedTemporaryFile( diff --git a/kilostar/plugin/tool_plugin/search_file/__init__.py b/kilostar/plugin/tool_plugin/search_file/__init__.py index 83c79e6..6b5bfa0 100644 --- a/kilostar/plugin/tool_plugin/search_file/__init__.py +++ b/kilostar/plugin/tool_plugin/search_file/__init__.py @@ -36,21 +36,36 @@ async def search_file( Returns: 匹配的文件名和行内容 """ + from kilostar.utils.sandbox import validate_path, PathViolation + try: - cmd = ( - f"grep -rn --include='{file_pattern}' " - f"-m {max_results} '{keyword}' '{directory}' 2>/dev/null " - f"| head -n {max_results}" - ) - proc = await asyncio.create_subprocess_shell( - cmd, + directory = validate_path(directory, write=False) + except PathViolation as e: + return f"[Sandbox] {e}" + + max_results = min(max_results, 100) + + try: + grep_args = [ + "grep", "-rn", + f"--include={file_pattern}", + "-m", str(max_results), + "--", keyword, directory, + ] + proc = await asyncio.create_subprocess_exec( + *grep_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30) + stdout, _ = await asyncio.wait_for( + proc.communicate(), timeout=30 + ) output = stdout.decode("utf-8", errors="replace").strip() if not output: return f"未找到包含 '{keyword}' 的匹配项" + lines = output.split("\n") + if len(lines) > max_results: + output = "\n".join(lines[:max_results]) return output except asyncio.TimeoutError: return "[Error] 搜索超时" diff --git a/kilostar/plugin/tool_plugin/shell_executor/__init__.py b/kilostar/plugin/tool_plugin/shell_executor/__init__.py index 00cd5ed..5f802e3 100644 --- a/kilostar/plugin/tool_plugin/shell_executor/__init__.py +++ b/kilostar/plugin/tool_plugin/shell_executor/__init__.py @@ -29,6 +29,16 @@ async def shell_executor(command: str, timeout: int = 30) -> str: Returns: 命令的 stdout + stderr 输出 """ + from kilostar.utils.sandbox import ( + validate_shell_command, CommandViolation, get_shell_timeout, + ) + + try: + command = validate_shell_command(command) + except CommandViolation as e: + return f"[Sandbox] {e}" + timeout = get_shell_timeout(timeout) + try: proc = await asyncio.create_subprocess_shell( command, diff --git a/kilostar/plugin/tool_plugin/write_file/__init__.py b/kilostar/plugin/tool_plugin/write_file/__init__.py index 4ddc8f0..0400705 100644 --- a/kilostar/plugin/tool_plugin/write_file/__init__.py +++ b/kilostar/plugin/tool_plugin/write_file/__init__.py @@ -29,6 +29,13 @@ async def write_file(file_path: str, content: str) -> str: Returns: 操作结果描述 """ + from kilostar.utils.sandbox import validate_path, PathViolation + + try: + file_path = validate_path(file_path, write=True) + except PathViolation as e: + return f"[Sandbox] {e}" + try: dir_path = os.path.dirname(file_path) if dir_path: diff --git a/kilostar/utils/sandbox.py b/kilostar/utils/sandbox.py new file mode 100644 index 0000000..3d3102a --- /dev/null +++ b/kilostar/utils/sandbox.py @@ -0,0 +1,176 @@ +"""KiloStar 工具沙箱:路径校验、命令过滤、代码静态检查。""" + +from __future__ import annotations + +import os +import re +from pathlib import Path +from typing import List, Optional + +import yaml +from pydantic import BaseModel, Field + +_CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" +_SANDBOX_YAML = _CONFIG_DIR / "sandbox.yaml" + + +class FilesystemPolicy(BaseModel): + workspace_root: str = "/tmp/kilostar_workspace" + allowed_read_paths: List[str] = Field(default_factory=lambda: ["/tmp"]) + denied_paths: List[str] = Field(default_factory=list) + + +class ShellPolicy(BaseModel): + enabled: bool = True + blocked_commands: List[str] = Field(default_factory=list) + blocked_operators: List[str] = Field(default_factory=list) + max_timeout: int = 60 + + +class PythonExecutorPolicy(BaseModel): + enabled: bool = True + max_timeout: int = 30 + blocked_imports: List[str] = Field(default_factory=list) + blocked_builtins: List[str] = Field(default_factory=list) + + +class SandboxConfig(BaseModel): + enabled: bool = True + filesystem: FilesystemPolicy = Field(default_factory=FilesystemPolicy) + shell: ShellPolicy = Field(default_factory=ShellPolicy) + python_executor: PythonExecutorPolicy = Field(default_factory=PythonExecutorPolicy) + + +_current: Optional[SandboxConfig] = None + + +def _load_sandbox_config() -> SandboxConfig: + if not _SANDBOX_YAML.exists(): + return SandboxConfig() + with open(_SANDBOX_YAML, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + raw = data.get("sandbox", data) + return SandboxConfig.model_validate(raw) + + +def get_sandbox_config() -> SandboxConfig: + global _current + if _current is None: + _current = _load_sandbox_config() + return _current + + +def reload_sandbox_config() -> SandboxConfig: + global _current + _current = _load_sandbox_config() + return _current + + +# ─── Exceptions ─── + +class PathViolation(Exception): + pass + + +class CommandViolation(Exception): + pass + + +class CodeViolation(Exception): + pass + + +# ─── Path Validation ─── + +def validate_path(file_path: str, *, write: bool = False) -> str: + cfg = get_sandbox_config() + if not cfg.enabled: + return os.path.abspath(file_path) + + fs = cfg.filesystem + resolved = os.path.realpath(os.path.abspath(file_path)) + + for denied in fs.denied_paths: + denied_resolved = os.path.realpath(denied) + if resolved == denied_resolved or resolved.startswith(denied_resolved + os.sep): + raise PathViolation(f"路径被禁止访问: {file_path}") + + if write: + ws_root = os.path.realpath(fs.workspace_root) + if not (resolved == ws_root or resolved.startswith(ws_root + os.sep)): + raise PathViolation( + f"写操作路径必须在工作目录内: {fs.workspace_root}," + f"当前路径: {file_path}" + ) + return resolved + + allowed = [os.path.realpath(fs.workspace_root)] + for p in fs.allowed_read_paths: + allowed.append(os.path.realpath(p)) + + for allowed_dir in allowed: + if resolved == allowed_dir or resolved.startswith(allowed_dir + os.sep): + return resolved + + raise PathViolation( + f"读操作路径不在允许范围内: {file_path}。" + f"允许的目录: {[fs.workspace_root] + fs.allowed_read_paths}" + ) + + +# ─── Shell Command Validation ─── + +def validate_shell_command(command: str) -> str: + cfg = get_sandbox_config() + if not cfg.enabled: + return command + + shell_cfg = cfg.shell + if not shell_cfg.enabled: + raise CommandViolation("shell_executor 已被沙箱策略禁用") + + cmd_lower = command.strip().lower() + + for blocked in shell_cfg.blocked_commands: + if cmd_lower.startswith(blocked.lower()): + raise CommandViolation(f"命令被禁止: {blocked}") + + for op in shell_cfg.blocked_operators: + if op in command: + raise CommandViolation(f"命令包含被禁止的操作符: '{op}'") + + return command + + +def get_shell_timeout(requested: int) -> int: + cfg = get_sandbox_config() + return min(requested, cfg.shell.max_timeout) + + +# ─── Python Code Validation ─── + +def validate_python_code(code: str) -> str: + cfg = get_sandbox_config() + if not cfg.enabled: + return code + + py_cfg = cfg.python_executor + if not py_cfg.enabled: + raise CodeViolation("python_executor 已被沙箱策略禁用") + + for module in py_cfg.blocked_imports: + pattern = rf"(?:^|\n)\s*(?:import\s+{re.escape(module)}|from\s+{re.escape(module)})\b" + if re.search(pattern, code): + raise CodeViolation(f"禁止导入模块: {module}") + + for builtin in py_cfg.blocked_builtins: + pattern = rf"\b{re.escape(builtin)}\s*\(" + if re.search(pattern, code): + raise CodeViolation(f"禁止使用: {builtin}()") + + return code + + +def get_python_timeout(requested: int) -> int: + cfg = get_sandbox_config() + return min(requested, cfg.python_executor.max_timeout) diff --git a/tests/unit/test_sandbox.py b/tests/unit/test_sandbox.py new file mode 100644 index 0000000..cc2dc3e --- /dev/null +++ b/tests/unit/test_sandbox.py @@ -0,0 +1,216 @@ +"""Tests for kilostar.utils.sandbox module.""" + +import os +import pytest +from unittest.mock import patch + +from kilostar.utils.sandbox import ( + SandboxConfig, + FilesystemPolicy, + ShellPolicy, + PythonExecutorPolicy, + validate_path, + validate_shell_command, + validate_python_code, + get_shell_timeout, + get_python_timeout, + get_sandbox_config, + reload_sandbox_config, + PathViolation, + CommandViolation, + CodeViolation, +) + + +@pytest.fixture(autouse=True) +def reset_sandbox_config(): + """每个测试前重置沙箱配置缓存。""" + import kilostar.utils.sandbox as mod + mod._current = None + yield + mod._current = None + + +@pytest.fixture +def mock_config(): + """注入测试用的沙箱配置。""" + import kilostar.utils.sandbox as mod + cfg = SandboxConfig( + enabled=True, + filesystem=FilesystemPolicy( + workspace_root="/tmp/kilostar_workspace", + allowed_read_paths=["/tmp"], + denied_paths=["/etc/shadow", "/root"], + ), + shell=ShellPolicy( + enabled=True, + blocked_commands=["rm -rf /", "mkfs", "shutdown"], + blocked_operators=["&&", "||", ";", "`", "$("], + max_timeout=60, + ), + python_executor=PythonExecutorPolicy( + enabled=True, + max_timeout=30, + blocked_imports=["os", "subprocess", "shutil"], + blocked_builtins=["exec", "eval", "__import__"], + ), + ) + mod._current = cfg + return cfg + + +@pytest.fixture +def disabled_config(): + """沙箱关闭时的配置。""" + import kilostar.utils.sandbox as mod + cfg = SandboxConfig(enabled=False) + mod._current = cfg + return cfg + + +# ─── Path Validation Tests ─── + + +class TestValidatePath: + def test_read_allowed_path(self, mock_config): + result = validate_path("/tmp/somefile.txt", write=False) + assert result == os.path.realpath("/tmp/somefile.txt") + + def test_read_workspace_path(self, mock_config): + result = validate_path("/tmp/kilostar_workspace/a.py", write=False) + assert "/tmp/kilostar_workspace/a.py" in result + + def test_read_denied_path(self, mock_config): + with pytest.raises(PathViolation, match="路径被禁止访问"): + validate_path("/etc/shadow", write=False) + + def test_read_denied_subpath(self, mock_config): + with pytest.raises(PathViolation, match="路径被禁止访问"): + validate_path("/root/.ssh/id_rsa", write=False) + + def test_read_outside_allowed(self, mock_config): + with pytest.raises(PathViolation, match="读操作路径不在允许范围内"): + validate_path("/var/log/syslog", write=False) + + def test_write_inside_workspace(self, mock_config): + result = validate_path("/tmp/kilostar_workspace/output.txt", write=True) + assert "kilostar_workspace" in result + + def test_write_outside_workspace(self, mock_config): + with pytest.raises(PathViolation, match="写操作路径必须在工作目录内"): + validate_path("/tmp/other_dir/file.txt", write=True) + + def test_write_denied_path(self, mock_config): + with pytest.raises(PathViolation, match="路径被禁止访问"): + validate_path("/etc/shadow", write=True) + + def test_disabled_sandbox_allows_all(self, disabled_config): + result = validate_path("/any/path/file.txt", write=True) + assert result == os.path.abspath("/any/path/file.txt") + + +# ─── Shell Command Validation Tests ─── + + +class TestValidateShellCommand: + def test_allowed_command(self, mock_config): + assert validate_shell_command("ls -la") == "ls -la" + + def test_blocked_command(self, mock_config): + with pytest.raises(CommandViolation, match="命令被禁止"): + validate_shell_command("rm -rf /") + + def test_blocked_command_case_insensitive(self, mock_config): + with pytest.raises(CommandViolation, match="命令被禁止"): + validate_shell_command("SHUTDOWN now") + + def test_blocked_operator_semicolon(self, mock_config): + with pytest.raises(CommandViolation, match="操作符"): + validate_shell_command("echo hello; rm -rf /") + + def test_blocked_operator_and(self, mock_config): + with pytest.raises(CommandViolation, match="操作符"): + validate_shell_command("true && cat /etc/passwd") + + def test_blocked_operator_subshell(self, mock_config): + with pytest.raises(CommandViolation, match="操作符"): + validate_shell_command("echo $(whoami)") + + def test_disabled_sandbox(self, disabled_config): + assert validate_shell_command("rm -rf /") == "rm -rf /" + + def test_shell_disabled_in_policy(self): + import kilostar.utils.sandbox as mod + mod._current = SandboxConfig( + enabled=True, + shell=ShellPolicy(enabled=False), + ) + with pytest.raises(CommandViolation, match="已被沙箱策略禁用"): + validate_shell_command("ls") + + +# ─── Python Code Validation Tests ─── + + +class TestValidatePythonCode: + def test_safe_code(self, mock_config): + code = "print('hello world')" + assert validate_python_code(code) == code + + def test_blocked_import(self, mock_config): + with pytest.raises(CodeViolation, match="禁止导入模块: os"): + validate_python_code("import os") + + def test_blocked_from_import(self, mock_config): + with pytest.raises(CodeViolation, match="禁止导入模块: subprocess"): + validate_python_code("from subprocess import run") + + def test_blocked_builtin_exec(self, mock_config): + with pytest.raises(CodeViolation, match="禁止使用: exec"): + validate_python_code("exec('print(1)')") + + def test_blocked_builtin_eval(self, mock_config): + with pytest.raises(CodeViolation, match="禁止使用: eval"): + validate_python_code("x = eval('1+1')") + + def test_blocked_dunder_import(self, mock_config): + with pytest.raises(CodeViolation, match="禁止使用: __import__"): + validate_python_code("m = __import__('os')") + + def test_import_in_multiline(self, mock_config): + code = "x = 1\nimport shutil\ny = 2" + with pytest.raises(CodeViolation, match="禁止导入模块: shutil"): + validate_python_code(code) + + def test_safe_similar_name(self, mock_config): + code = "import os_utils" + assert validate_python_code(code) == code + + def test_disabled_sandbox(self, disabled_config): + assert validate_python_code("import os") == "import os" + + def test_python_disabled_in_policy(self): + import kilostar.utils.sandbox as mod + mod._current = SandboxConfig( + enabled=True, + python_executor=PythonExecutorPolicy(enabled=False), + ) + with pytest.raises(CodeViolation, match="已被沙箱策略禁用"): + validate_python_code("print(1)") + + +# ─── Timeout Tests ─── + + +class TestTimeouts: + def test_shell_timeout_clamped(self, mock_config): + assert get_shell_timeout(120) == 60 + + def test_shell_timeout_within_limit(self, mock_config): + assert get_shell_timeout(30) == 30 + + def test_python_timeout_clamped(self, mock_config): + assert get_python_timeout(60) == 30 + + def test_python_timeout_within_limit(self, mock_config): + assert get_python_timeout(10) == 10