import asyncio from typing import List, Literal, Dict from kilostar.plugin.tool_plugin.base_tool import BaseToolData class SearchFileToolData(BaseToolData): is_system: bool = True action_scope: List[ Literal[ "control_node", "consciousness_node", "regulatory_node", "growth_node", "", ] ] = [] config_args: Dict[str, str] = {} category: str = "system" async def search_file( keyword: str, directory: str = ".", file_pattern: str = "*", max_results: int = 20, ) -> str: """在指定目录下递归搜索包含关键字的文件内容。 Args: keyword: 要搜索的关键字或正则表达式 directory: 搜索的根目录,默认当前目录 file_pattern: 文件名匹配模式,如 "*.py" max_results: 最大返回结果数 Returns: 匹配的文件名和行内容 """ try: cmd = ( f"grep -rn --include='{file_pattern}' " f"-m {max_results} '{keyword}' '{directory}' 2>/dev/null " f"| head -n {max_results}" ) proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30) output = stdout.decode("utf-8", errors="replace").strip() if not output: return f"未找到包含 '{keyword}' 的匹配项" return output except asyncio.TimeoutError: return "[Error] 搜索超时" except Exception as e: return f"[Error] 搜索失败: {e}"