Files
zhaoxi 6d658b4f4d feat: 工具系统迁移 + 重型插件骨架 + 前端交互增强
- 工具系统从 kilostar/plugin/tool_plugin/ 迁移到 data/toolset/(manifest.json 声明式)
- 新增 plugin_runtime 模块:BaseOrganization / GlobalPluginManager / loader / tool_bridge
- 新增 org_task + org_task_event 表及 DAO(alembic 0009)
- 新增 /api/v1/plugin 路由(submit/status/stream/install/reload)
- 新增 data/plugin/example_dept 示例重型插件
- regulatory_node 支持聊天历史上下文注入
- send_file 改为 artifact 存盘 + SSE 推送下载链接
- 前端 WorkflowFileCard 组件 + ToolSettings README 渲染
- utils 整理:合并 access/role_check、standalone_proxy→ray_compat、删除废弃模块
- 项目结构文档移至 docs/STRUCTURE.md 并详细展开

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 05:20:00 +00:00

443 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
import viceroy
from kilostar.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from kilostar.utils.access import TokenData, RoleChecker, Accessor
from kilostar.core.postgres_database.model import UserAuthority
from kilostar.utils.mcp_helper import list_mcp_tools_from_gsm
from kilostar.utils.settings import get_artifact_dir
resource_router = APIRouter(prefix="/api/v1/resource")
class Skill(BaseModel):
"""``POST /skill`` 入参:技能仓库地址及可选子目录路径。"""
repo_url: str
path: str | None
class MCPServerConfig(BaseModel):
"""``POST /mcp`` 入参:MCP 服务器配置。"""
name: str
transport: str = "stdio" # stdio | sse | http
command: str | None = None
args: list[str] | None = None
url: str | None = None
tool_prefix: str | None = None
env: Dict[str, str] | None = None
@resource_router.post("/skill")
async def install_skill(
skill: Skill, _: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))
):
"""通过 viceroy 把 skill 仓库克隆到 ``data/plugin/skill``,并在状态机中登记。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
import os
from kilostar.utils.settings import get_plugin_dir
skill_output_dir = str(get_plugin_dir() / "skill")
os.makedirs(skill_output_dir, exist_ok=True)
await viceroy.install_skill_async(
url=skill.repo_url, path=skill.path, output=skill_output_dir
)
if skill.path:
skill_name = skill.path.split("/")[-1]
else:
skill_name = skill.repo_url.split("/")[-1]
await global_state_machine.add_skill.remote(skill_name)
return {"message": "创建成功"}
@resource_router.get("/skill")
async def get_skills(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""返回当前状态机中已登记的所有 skill 名称列表。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
skills = await global_state_machine.get_skill_list.remote()
return {"skills": skills}
@resource_router.delete("/skill/{skill_name}")
async def delete_skill(
skill_name: str,
_: TokenData = Depends(
RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)
),
):
"""从状态机中移除 skill 注册项;不会删除磁盘上的代码文件。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
await global_state_machine.remove_skill.remote(skill_name)
return {"message": "success"}
# ─── MCP Server Management ───
@resource_router.post("/mcp")
async def add_mcp_server(
config: MCPServerConfig,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""注册一个 MCP 服务器到全局状态机。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
import uuid
server_id = str(uuid.uuid4())[:8]
cfg_dict = config.model_dump(exclude_none=True)
await global_state_machine.add_mcp_server.remote(server_id, cfg_dict)
return {"server_id": server_id, "message": "MCP server registered"}
@resource_router.get("/mcp")
async def list_mcp_servers(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""返回已注册的全部 MCP 服务器配置;env 中的敏感字段脱敏。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
servers = await global_state_machine.list_mcp_servers.remote()
for s in servers:
if "env" in s and isinstance(s["env"], dict):
s["env"] = _mask_config(s["env"])
return {"servers": servers}
@resource_router.delete("/mcp/{server_id}")
async def delete_mcp_server(
server_id: str,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""从状态机中移除一个 MCP 服务器配置。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
ok = await global_state_machine.delete_mcp_server.remote(server_id)
if not ok:
raise HTTPException(status_code=404, detail="MCP server not found")
return {"message": "success"}
# ─── Workflow Artifact 下载(agent send_file 投递的文件)───
@resource_router.get("/artifact/{trace_id}/{artifact_id}")
async def download_artifact(
trace_id: str,
artifact_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""下载某个 trace 名下的 agent 产物文件。
路径校验三件套:
1. trace 必须存在且属于当前用户
2. ``artifact_id`` 限定为 12 位 hexuuid4 前缀),防止穿越
3. 解析后的最终路径必须仍然落在 ``<artifact_dir>/<trace_id>/`` 之内
"""
if not artifact_id.isalnum() or len(artifact_id) > 32:
raise HTTPException(status_code=400, detail="invalid artifact id")
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
trace_dir = (get_artifact_dir() / trace_id).resolve()
if not trace_dir.exists() or not trace_dir.is_dir():
raise HTTPException(status_code=404, detail="Artifact not found")
matches = list(trace_dir.glob(f"{artifact_id}_*"))
if not matches:
raise HTTPException(status_code=404, detail="Artifact not found")
target = matches[0].resolve()
if not str(target).startswith(str(trace_dir) + "/"):
raise HTTPException(status_code=400, detail="invalid path")
filename = target.name.split("_", 1)[-1]
return FileResponse(
path=str(target),
filename=filename,
media_type="application/octet-stream",
)
# ─── Toolset Packages(磁盘工具包:插件单元)───
@resource_router.get("/toolset-package")
async def list_toolset_packages(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""列出所有磁盘上的工具包(``data/toolset/<name>/`` 单元)。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
packages = await global_state_machine.list_toolset_packages.remote()
return {"packages": packages}
@resource_router.get("/toolset-package/{name}/readme")
async def get_toolset_package_readme(
name: str,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""返回指定工具包的 README.md 内容(markdown 文本)。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
content = await global_state_machine.get_toolset_package_readme.remote(name)
if content is None:
raise HTTPException(status_code=404, detail="README not found")
return {"name": name, "content": content}
# ─── Tool Management ───
@resource_router.get("/tool")
async def get_tools(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""返回按分类聚合的工具信息(包含系统工具、搜索工具、MCP 工具等)。
其中 ``mcp_servers`` 会现场尝试连接每个已注册的 MCP 服务器并列出它们暴露的
工具名,便于前端展示;任意一台 MCP server 不可达不影响其他工具的返回。
"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
tool_mapper = await global_state_machine.get_tool_mapper.remote()
categories = await global_state_machine.get_tool_categories.remote()
all_tool_names = set()
for scope_tools in tool_mapper.values():
all_tool_names.update(scope_tools.keys())
mcp_servers = await list_mcp_tools_from_gsm()
return {
"tools": list(all_tool_names),
"categories": categories,
"mcp_servers": mcp_servers,
}
# ─── Tool Config ManagementTavily API key 等运行期配置)───
def _mask_secret(value: Any) -> Any:
"""对像 ``api_key`` / ``token`` / ``secret`` 这种敏感字段做简单脱敏。"""
if not isinstance(value, str) or not value:
return value
if len(value) <= 8:
return "***"
return value[:4] + "***" + value[-4:]
def _mask_config(config: Dict[str, Any]) -> Dict[str, Any]:
masked: Dict[str, Any] = {}
for k, v in config.items():
if any(s in k.lower() for s in ("key", "token", "secret", "password")):
masked[k] = _mask_secret(v)
else:
masked[k] = v
return masked
class ToolConfigUpdate(BaseModel):
"""``PUT /tool/config/{tool_name}`` 入参:要写入的工具配置 KV。"""
config: Dict[str, Any]
@resource_router.get("/tool/config")
async def list_tool_configs(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""列出所有工具运行期配置;敏感字段会被脱敏。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
raw = await global_state_machine.list_tool_configs.remote()
return {
"configs": {name: _mask_config(cfg) for name, cfg in raw.items()},
}
@resource_router.get("/tool/config/{tool_name}")
async def get_tool_config(
tool_name: str,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""按工具名取出脱敏后的配置。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
raw = await global_state_machine.get_tool_config.remote(tool_name)
return {"tool_name": tool_name, "config": _mask_config(raw)}
@resource_router.put("/tool/config/{tool_name}")
async def set_tool_config(
tool_name: str,
body: ToolConfigUpdate,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""写入/覆盖某工具的运行期配置(如 ``tavily_search`` 的 ``api_key``)。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
await global_state_machine.set_tool_config.remote(tool_name, body.config)
return {"message": "success"}
@resource_router.delete("/tool/config/{tool_name}")
async def delete_tool_config(
tool_name: str,
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)),
):
"""删除某工具的运行期配置。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
ok = await global_state_machine.delete_tool_config.remote(tool_name)
if not ok:
raise HTTPException(status_code=404, detail="Tool config not found")
return {"message": "success"}
# ─── Custom Toolset Management ───
class CustomToolsetCreate(BaseModel):
name: str
tools: List[str]
description: Optional[str] = None
class CustomToolsetUpdate(BaseModel):
name: Optional[str] = None
tools: Optional[List[str]] = None
description: Optional[str] = None
async def _assert_toolset_owner_or_admin(
toolset: Dict[str, Any], token_data: TokenData
) -> None:
"""校验 toolset 归属:非 owner 且非管理员则抛 403。"""
from kilostar.utils.access import get_authority
if toolset.get("owner_id") == token_data.user_id:
return
authority = await get_authority(token_data.user_id)
if authority >= UserAuthority.ADMINISTRATOR:
return
raise HTTPException(status_code=403, detail="无权访问此自定义工具组")
@resource_router.post("/custom-toolset")
async def create_custom_toolset(
body: CustomToolsetCreate,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
import uuid
toolset_id = str(uuid.uuid4())[:8]
try:
saved = await global_state_machine.add_custom_toolset.remote(
toolset_id=toolset_id,
name=body.name,
tools=body.tools,
description=body.description,
owner_id=token_data.user_id,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"toolset_id": toolset_id, "toolset": saved}
@resource_router.get("/custom-toolset")
async def list_custom_toolsets(
category: Optional[str] = None,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""列出工具组:支持按 category 过滤。USER 只能看到自己的+系统的;ADMIN 看全部。"""
from kilostar.utils.access import get_authority
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
toolsets = await global_state_machine.list_custom_toolsets.remote()
authority = await get_authority(token_data.user_id)
if authority < UserAuthority.ADMINISTRATOR:
toolsets = [
t for t in toolsets
if t.get("is_system") or t.get("owner_id") == token_data.user_id
]
if category:
toolsets = [t for t in toolsets if t.get("category") == category]
return {"toolsets": toolsets}
@resource_router.get("/custom-toolset/{toolset_id}")
async def get_custom_toolset(
toolset_id: str,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
ts = await global_state_machine.get_custom_toolset.remote(toolset_id)
if not ts:
raise HTTPException(status_code=404, detail="Custom toolset not found")
await _assert_toolset_owner_or_admin(ts, token_data)
return ts
@resource_router.put("/custom-toolset/{toolset_id}")
async def update_custom_toolset(
toolset_id: str,
body: CustomToolsetUpdate,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
existing = await global_state_machine.get_custom_toolset.remote(toolset_id)
if not existing:
raise HTTPException(status_code=404, detail="Custom toolset not found")
if existing.get("is_system"):
raise HTTPException(status_code=403, detail="系统预置工具集不可修改")
await _assert_toolset_owner_or_admin(existing, token_data)
name = body.name if body.name is not None else existing["name"]
tools = body.tools if body.tools is not None else existing["tools"]
description = body.description if body.description is not None else existing.get("description")
try:
saved = await global_state_machine.add_custom_toolset.remote(
toolset_id=toolset_id,
name=name,
tools=tools,
description=description,
owner_id=existing.get("owner_id", token_data.user_id),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"toolset": saved}
@resource_router.delete("/custom-toolset/{toolset_id}")
async def delete_custom_toolset(
toolset_id: str,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""删除工具组:系统预置不可删;USER 只能删自己的;ADMIN 及以上可删任意用户的。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
existing = await global_state_machine.get_custom_toolset.remote(toolset_id)
if not existing:
raise HTTPException(status_code=404, detail="Custom toolset not found")
if existing.get("is_system"):
raise HTTPException(status_code=403, detail="系统预置工具集不可删除")
await _assert_toolset_owner_or_admin(existing, token_data)
ok = await global_state_machine.delete_custom_toolset.remote(toolset_id)
if not ok:
raise HTTPException(status_code=404, detail="Custom toolset not found")
return {"message": "success"}