Files
KiloStar/kilostar/plugin/tool_plugin/search_file/__init__.py
T
zhaoxi 80174acaae feat(security): 新增工具沙箱安全机制
为所有工具插件添加沙箱拦截层,防止危险的文件访问、Shell命令和Python代码执行。
包含配置文件、核心校验逻辑及31个单元测试。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 12:09:15 +00:00

74 lines
2.0 KiB
Python

import asyncio
from typing import List, Literal, Dict
from kilostar.plugin.tool_plugin.base_tool import BaseToolData
class SearchFileToolData(BaseToolData):
is_system: bool = True
action_scope: List[
Literal[
"control_node",
"consciousness_node",
"regulatory_node",
"growth_node",
"",
]
] = []
config_args: Dict[str, str] = {}
category: str = "system"
async def search_file(
keyword: str,
directory: str = ".",
file_pattern: str = "*",
max_results: int = 20,
) -> str:
"""在指定目录下递归搜索包含关键字的文件内容。
Args:
keyword: 要搜索的关键字或正则表达式
directory: 搜索的根目录,默认当前目录
file_pattern: 文件名匹配模式,如 "*.py"
max_results: 最大返回结果数
Returns:
匹配的文件名和行内容
"""
from kilostar.utils.sandbox import validate_path, PathViolation
try:
directory = validate_path(directory, write=False)
except PathViolation as e:
return f"[Sandbox] {e}"
max_results = min(max_results, 100)
try:
grep_args = [
"grep", "-rn",
f"--include={file_pattern}",
"-m", str(max_results),
"--", keyword, directory,
]
proc = await asyncio.create_subprocess_exec(
*grep_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(
proc.communicate(), timeout=30
)
output = stdout.decode("utf-8", errors="replace").strip()
if not output:
return f"未找到包含 '{keyword}' 的匹配项"
lines = output.split("\n")
if len(lines) > max_results:
output = "\n".join(lines[:max_results])
return output
except asyncio.TimeoutError:
return "[Error] 搜索超时"
except Exception as e:
return f"[Error] 搜索失败: {e}"