Files
zhaoxi 6d658b4f4d feat: 工具系统迁移 + 重型插件骨架 + 前端交互增强
- 工具系统从 kilostar/plugin/tool_plugin/ 迁移到 data/toolset/(manifest.json 声明式)
- 新增 plugin_runtime 模块:BaseOrganization / GlobalPluginManager / loader / tool_bridge
- 新增 org_task + org_task_event 表及 DAO(alembic 0009)
- 新增 /api/v1/plugin 路由(submit/status/stream/install/reload)
- 新增 data/plugin/example_dept 示例重型插件
- regulatory_node 支持聊天历史上下文注入
- send_file 改为 artifact 存盘 + SSE 推送下载链接
- 前端 WorkflowFileCard 组件 + ToolSettings README 渲染
- utils 整理:合并 access/role_check、standalone_proxy→ray_compat、删除废弃模块
- 项目结构文档移至 docs/STRUCTURE.md 并详细展开

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 05:20:00 +00:00

118 lines
3.8 KiB
Python

import importlib.util
import json
import os
import sys
from typing import Callable, Dict, List, Optional
from kilostar.utils.logger import get_logger
from kilostar.utils.settings import get_toolset_dir
logger = get_logger("get_tool")
_tool_cache: Dict[str, Callable] = {}
_manifest_cache: Optional[Dict[str, Dict]] = None
def _load_manifests() -> Dict[str, Dict]:
"""扫描所有 toolset 的 manifest.json,建立 tool_name → {toolset_dir, file} 的映射。"""
global _manifest_cache
if _manifest_cache is not None:
return _manifest_cache
_manifest_cache = {}
toolset_root = get_toolset_dir()
if not toolset_root.exists():
return _manifest_cache
for item in toolset_root.iterdir():
if not item.is_dir() or item.name.startswith("__"):
continue
manifest_path = item / "manifest.json"
if not manifest_path.exists():
continue
try:
with open(manifest_path, "r", encoding="utf-8") as f:
manifest = json.load(f)
for tool in manifest.get("tools", []):
tool_name = tool.get("name")
if tool_name:
_manifest_cache[tool_name] = {
"toolset_dir": str(item),
"toolset_name": item.name,
"file": tool.get("file", f"{tool_name}.py"),
}
except Exception as e:
logger.error(f"Failed to read manifest {manifest_path}: {e}")
return _manifest_cache
def _get_tool_func(tool_name: str) -> Callable | None:
"""按名字从 toolset 中加载工具函数。
根据 manifest 找到工具所在的 toolset 和文件,动态加载模块并取出同名函数。
"""
func = _tool_cache.get(tool_name)
if func:
return func
manifests = _load_manifests()
info = manifests.get(tool_name)
if not info:
logger.error(f"Tool '{tool_name}' not found in any toolset manifest")
return None
tool_file = os.path.join(info["toolset_dir"], info["file"])
if not os.path.exists(tool_file):
logger.error(f"Tool file not found: {tool_file}")
return None
try:
module_name = f"data.toolset.{info['toolset_name']}.{tool_name}"
spec = importlib.util.spec_from_file_location(module_name, tool_file)
if spec is None or spec.loader is None:
logger.error(f"Failed to create spec for {module_name}")
return None
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
func = getattr(module, tool_name, None)
if not callable(func):
logger.error(
f"Tool function '{tool_name}' not found or not callable in {module_name}"
)
return None
_tool_cache[tool_name] = func
return func
except Exception as e:
logger.error(f"Failed to load module {tool_name}: {e}")
return None
def del_tool_cache(tool_name: str) -> None:
"""从内存缓存中移除某个工具,下次调用 ``load_tools_from_list`` 会重新从磁盘加载。"""
if tool_name in _tool_cache:
del _tool_cache[tool_name]
def invalidate_manifest_cache() -> None:
"""清除 manifest 缓存,下次加载时重新扫描磁盘。"""
global _manifest_cache
_manifest_cache = None
def load_tools_from_list(tool_names: List[str] | None) -> List[Callable]:
"""批量加载工具:传入工具名列表,返回成功加载到的函数对象列表(失败项被跳过)。"""
if not tool_names:
return []
tool_list = []
for tool_name in tool_names:
tool_func = _get_tool_func(tool_name)
if tool_func:
tool_list.append(tool_func)
return tool_list