76a67e8237
将分散的 config.yml、workflow.yaml、sandbox.yaml 加载逻辑统一到 AppConfig 模型, 启动时一次性校验,失败则 fast-fail。sandbox.py 改为从统一配置取值,消除重复加载。 同时修复 onebot 测试并新增14个统一配置测试(总测试 285→300)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
228 lines
7.7 KiB
Python
228 lines
7.7 KiB
Python
"""Tests for kilostar.utils.sandbox module."""
|
|
|
|
import os
|
|
import pytest
|
|
from unittest.mock import patch
|
|
|
|
from kilostar.utils.config_loader import (
|
|
SandboxConfig,
|
|
FilesystemPolicy,
|
|
ShellPolicy,
|
|
PythonExecutorPolicy,
|
|
AppConfig,
|
|
AppInfo,
|
|
WorkflowConfig,
|
|
)
|
|
from kilostar.utils.sandbox import (
|
|
validate_path,
|
|
validate_shell_command,
|
|
validate_python_code,
|
|
get_shell_timeout,
|
|
get_python_timeout,
|
|
get_sandbox_config,
|
|
reload_sandbox_config,
|
|
PathViolation,
|
|
CommandViolation,
|
|
CodeViolation,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_sandbox_config():
|
|
"""每个测试前重置配置缓存。"""
|
|
import kilostar.utils.config_loader as loader
|
|
loader._app_current = None
|
|
yield
|
|
loader._app_current = None
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_config():
|
|
"""注入测试用的沙箱配置。"""
|
|
import kilostar.utils.config_loader as loader
|
|
cfg = AppConfig(
|
|
sandbox=SandboxConfig(
|
|
enabled=True,
|
|
filesystem=FilesystemPolicy(
|
|
workspace_root="/tmp/kilostar_workspace",
|
|
allowed_read_paths=["/tmp"],
|
|
denied_paths=["/etc/shadow", "/root"],
|
|
),
|
|
shell=ShellPolicy(
|
|
enabled=True,
|
|
blocked_commands=["rm -rf /", "mkfs", "shutdown"],
|
|
blocked_operators=["&&", "||", ";", "`", "$("],
|
|
max_timeout=60,
|
|
),
|
|
python_executor=PythonExecutorPolicy(
|
|
enabled=True,
|
|
max_timeout=30,
|
|
blocked_imports=["os", "subprocess", "shutil"],
|
|
blocked_builtins=["exec", "eval", "__import__"],
|
|
),
|
|
),
|
|
)
|
|
loader._app_current = cfg
|
|
return cfg
|
|
|
|
|
|
@pytest.fixture
|
|
def disabled_config():
|
|
"""沙箱关闭时的配置。"""
|
|
import kilostar.utils.config_loader as loader
|
|
cfg = AppConfig(sandbox=SandboxConfig(enabled=False))
|
|
loader._app_current = cfg
|
|
return cfg
|
|
|
|
|
|
# ─── Path Validation Tests ───
|
|
|
|
|
|
class TestValidatePath:
|
|
def test_read_allowed_path(self, mock_config):
|
|
result = validate_path("/tmp/somefile.txt", write=False)
|
|
assert result == os.path.realpath("/tmp/somefile.txt")
|
|
|
|
def test_read_workspace_path(self, mock_config):
|
|
result = validate_path("/tmp/kilostar_workspace/a.py", write=False)
|
|
assert "/tmp/kilostar_workspace/a.py" in result
|
|
|
|
def test_read_denied_path(self, mock_config):
|
|
with pytest.raises(PathViolation, match="路径被禁止访问"):
|
|
validate_path("/etc/shadow", write=False)
|
|
|
|
def test_read_denied_subpath(self, mock_config):
|
|
with pytest.raises(PathViolation, match="路径被禁止访问"):
|
|
validate_path("/root/.ssh/id_rsa", write=False)
|
|
|
|
def test_read_outside_allowed(self, mock_config):
|
|
with pytest.raises(PathViolation, match="读操作路径不在允许范围内"):
|
|
validate_path("/var/log/syslog", write=False)
|
|
|
|
def test_write_inside_workspace(self, mock_config):
|
|
result = validate_path("/tmp/kilostar_workspace/output.txt", write=True)
|
|
assert "kilostar_workspace" in result
|
|
|
|
def test_write_outside_workspace(self, mock_config):
|
|
with pytest.raises(PathViolation, match="写操作路径必须在工作目录内"):
|
|
validate_path("/tmp/other_dir/file.txt", write=True)
|
|
|
|
def test_write_denied_path(self, mock_config):
|
|
with pytest.raises(PathViolation, match="路径被禁止访问"):
|
|
validate_path("/etc/shadow", write=True)
|
|
|
|
def test_disabled_sandbox_allows_all(self, disabled_config):
|
|
result = validate_path("/any/path/file.txt", write=True)
|
|
assert result == os.path.abspath("/any/path/file.txt")
|
|
|
|
|
|
# ─── Shell Command Validation Tests ───
|
|
|
|
|
|
class TestValidateShellCommand:
|
|
def test_allowed_command(self, mock_config):
|
|
assert validate_shell_command("ls -la") == "ls -la"
|
|
|
|
def test_blocked_command(self, mock_config):
|
|
with pytest.raises(CommandViolation, match="命令被禁止"):
|
|
validate_shell_command("rm -rf /")
|
|
|
|
def test_blocked_command_case_insensitive(self, mock_config):
|
|
with pytest.raises(CommandViolation, match="命令被禁止"):
|
|
validate_shell_command("SHUTDOWN now")
|
|
|
|
def test_blocked_operator_semicolon(self, mock_config):
|
|
with pytest.raises(CommandViolation, match="操作符"):
|
|
validate_shell_command("echo hello; rm -rf /")
|
|
|
|
def test_blocked_operator_and(self, mock_config):
|
|
with pytest.raises(CommandViolation, match="操作符"):
|
|
validate_shell_command("true && cat /etc/passwd")
|
|
|
|
def test_blocked_operator_subshell(self, mock_config):
|
|
with pytest.raises(CommandViolation, match="操作符"):
|
|
validate_shell_command("echo $(whoami)")
|
|
|
|
def test_disabled_sandbox(self, disabled_config):
|
|
assert validate_shell_command("rm -rf /") == "rm -rf /"
|
|
|
|
def test_shell_disabled_in_policy(self):
|
|
import kilostar.utils.config_loader as loader
|
|
loader._app_current = AppConfig(
|
|
sandbox=SandboxConfig(
|
|
enabled=True,
|
|
shell=ShellPolicy(enabled=False),
|
|
),
|
|
)
|
|
with pytest.raises(CommandViolation, match="已被沙箱策略禁用"):
|
|
validate_shell_command("ls")
|
|
|
|
|
|
# ─── Python Code Validation Tests ───
|
|
|
|
|
|
class TestValidatePythonCode:
|
|
def test_safe_code(self, mock_config):
|
|
code = "print('hello world')"
|
|
assert validate_python_code(code) == code
|
|
|
|
def test_blocked_import(self, mock_config):
|
|
with pytest.raises(CodeViolation, match="禁止导入模块: os"):
|
|
validate_python_code("import os")
|
|
|
|
def test_blocked_from_import(self, mock_config):
|
|
with pytest.raises(CodeViolation, match="禁止导入模块: subprocess"):
|
|
validate_python_code("from subprocess import run")
|
|
|
|
def test_blocked_builtin_exec(self, mock_config):
|
|
with pytest.raises(CodeViolation, match="禁止使用: exec"):
|
|
validate_python_code("exec('print(1)')")
|
|
|
|
def test_blocked_builtin_eval(self, mock_config):
|
|
with pytest.raises(CodeViolation, match="禁止使用: eval"):
|
|
validate_python_code("x = eval('1+1')")
|
|
|
|
def test_blocked_dunder_import(self, mock_config):
|
|
with pytest.raises(CodeViolation, match="禁止使用: __import__"):
|
|
validate_python_code("m = __import__('os')")
|
|
|
|
def test_import_in_multiline(self, mock_config):
|
|
code = "x = 1\nimport shutil\ny = 2"
|
|
with pytest.raises(CodeViolation, match="禁止导入模块: shutil"):
|
|
validate_python_code(code)
|
|
|
|
def test_safe_similar_name(self, mock_config):
|
|
code = "import os_utils"
|
|
assert validate_python_code(code) == code
|
|
|
|
def test_disabled_sandbox(self, disabled_config):
|
|
assert validate_python_code("import os") == "import os"
|
|
|
|
def test_python_disabled_in_policy(self):
|
|
import kilostar.utils.config_loader as loader
|
|
loader._app_current = AppConfig(
|
|
sandbox=SandboxConfig(
|
|
enabled=True,
|
|
python_executor=PythonExecutorPolicy(enabled=False),
|
|
),
|
|
)
|
|
with pytest.raises(CodeViolation, match="已被沙箱策略禁用"):
|
|
validate_python_code("print(1)")
|
|
|
|
|
|
# ─── Timeout Tests ───
|
|
|
|
|
|
class TestTimeouts:
|
|
def test_shell_timeout_clamped(self, mock_config):
|
|
assert get_shell_timeout(120) == 60
|
|
|
|
def test_shell_timeout_within_limit(self, mock_config):
|
|
assert get_shell_timeout(30) == 30
|
|
|
|
def test_python_timeout_clamped(self, mock_config):
|
|
assert get_python_timeout(60) == 30
|
|
|
|
def test_python_timeout_within_limit(self, mock_config):
|
|
assert get_python_timeout(10) == 10
|