Files
zhaoxi 005ce566a8 feat: Provider model_settings 全链路 + 监管节点工具集 + 重型插件注入 + 前端打磨
- Provider model_settings (Provider+Model 级别参数配置): DB JSONB → API → GSM → AgentFactory.resolve → 三节点 agent.run 注入
- 新增 data/toolset/regulatory_toolset/: 监管节点专属工具(query_workflow_status / query_task_list / send_file)
- send_file 从 interactive_toolset 迁移至 regulatory_toolset,interactive 仅保留 approval
- mcp_helper 合入 GlobalPluginManager dispatch tools
- 前端 Provider 弹窗参数设置区加 JSON 编辑器(model_settings)
- 前端 Plugin 页面新增"重型插件"Tab(HeavyPluginList 占位)
- .gitignore 精简:去除系统默认项,修复 data/ 子目录追踪
- data/toolset/ 与 data/plugin/ 首次纳入版本控制

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 13:10:31 +00:00

219 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MCP 辅助模块:根据全局状态机中的配置创建 pydantic-ai MCPServer 实例。"""
from typing import Dict, List, Any, Optional, Sequence
from kilostar.utils.logger import get_logger
from kilostar.utils.ray_hook import ray_actor_hook
logger = get_logger("mcp_helper")
# 延迟导入 pydantic_ai.mcp,避免在 MCP 包未安装时崩溃
try:
from pydantic_ai.mcp import (
MCPServerStdio,
MCPServerSSE,
MCPServerHTTP,
)
_MCP_AVAILABLE = True
except ImportError:
_MCP_AVAILABLE = False
logger.warning("MCP package not installed. MCP servers will not be available.")
def build_mcp_toolsets(configs: Dict[str, Dict[str, Any]]) -> List[Any]:
"""根据配置字典创建 MCPServer 实例列表。
Args:
configs: {server_id: {"name": ..., "transport": ..., ...}}
Returns:
MCPServer 实例列表(可直接传给 Agent 的 toolsets 参数)
"""
if not _MCP_AVAILABLE:
return []
toolsets = []
for server_id, cfg in configs.items():
try:
transport = cfg.get("transport", "stdio")
tool_prefix = cfg.get("tool_prefix")
name = cfg.get("name", server_id)
if transport == "stdio":
server = MCPServerStdio(
command=cfg.get("command", ""),
args=cfg.get("args", []),
env=cfg.get("env"),
tool_prefix=tool_prefix,
id=server_id,
)
elif transport == "sse":
server = MCPServerSSE(
url=cfg.get("url", ""),
tool_prefix=tool_prefix,
id=server_id,
)
elif transport == "http":
server = MCPServerHTTP(
url=cfg.get("url", ""),
tool_prefix=tool_prefix,
id=server_id,
)
else:
logger.warning(f"Unsupported MCP transport: {transport} for server {name}")
continue
toolsets.append(server)
logger.info(f"MCP server '{name}' ({transport}) registered as toolset")
except Exception as e:
logger.error(f"Failed to build MCP server '{server_id}': {e}")
return toolsets
async def get_mcp_toolsets_from_gsm() -> List[Any]:
"""从 GlobalStateMachine 拉取 MCP 配置并构建 toolsets。"""
if not _MCP_AVAILABLE:
return []
try:
from kilostar.core.global_state_machine.gsm_snapshot import fetch_snapshot
# 走快照:MCP 配置变更频率极低,本地缓存命中率近 100%
snapshot = await fetch_snapshot()
return build_mcp_toolsets(snapshot.mcp_servers)
except Exception as e:
logger.error(f"Failed to load MCP configs from GSM: {e}")
return []
async def get_all_tools_and_toolsets_for_scope(
scope: str, toolset_ids: Optional[List[str]] = None
) -> tuple:
"""汇总某个 scope 下的工具:展开的 tool callable 列表 + MCP toolsets。
Args:
scope: 调用方所属 scope。
toolset_ids: agent 配置的 toolset 列表;为 None 或空列表表示返回全部。
Returns:
(tools, toolsets) — tools 是展开的 callable 列表(传给 Agent(tools=...)),
toolsets 是 MCP server 等需要 toolset 语义的对象(传给 Agent(toolsets=...))。
"""
tools: List[Any] = []
toolsets: List[Any] = []
try:
from kilostar.core.global_state_machine.gsm_snapshot import (
build_tools_for_scope,
fetch_snapshot,
)
snapshot = await fetch_snapshot()
effective_ids = toolset_ids if toolset_ids else None
tools = build_tools_for_scope(snapshot, scope, toolset_ids=effective_ids)
except Exception as e:
logger.error(f"Failed to load tools from GSM ({scope}): {e}")
toolsets = await get_mcp_toolsets_from_gsm()
# 合入重型插件的 dispatch tools
try:
pm = ray_actor_hook("global_plugin_manager").global_plugin_manager
dispatch = await pm.get_dispatch_tools.remote()
if dispatch:
tools.extend(dispatch.values())
except Exception as e:
logger.debug(f"No dispatch tools available: {e}")
return tools, toolsets
async def get_all_toolsets_for_scope(
scope: str, toolset_ids: Optional[List[str]] = None
) -> List[Any]:
"""兼容旧调用:返回 MCP toolsets 列表(不再包含 FunctionToolset)。"""
_, toolsets = await get_all_tools_and_toolsets_for_scope(scope, toolset_ids)
return toolsets
async def get_retrieval_toolsets_for_scope(scope: str) -> List[Any]:
"""仅返回 retrieval 工具集(system_node 专用)。不含 generation 和 MCP 工具。"""
toolsets: List[Any] = []
try:
gsm = ray_actor_hook("global_state_machine").global_state_machine
retrieval = await gsm.get_retrieval_toolsets_for_scope.remote(scope)
if retrieval:
toolsets.extend(retrieval)
except Exception as e:
logger.error(f"Failed to load retrieval toolsets ({scope}): {e}")
return toolsets
async def list_mcp_tools_for_configs(
configs: Dict[str, Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""对每个 MCP 服务器逐个尝试连接,列出它们暴露的工具名。
实现层面会进入 ``async with server:`` 上下文,调用一次 ``get_tools()``
再把工具名(带 tool_prefix)抽出来。任何一个 server 失败都不影响其他 server
出错时该项 ``tools=[]`` 并附带 ``error`` 字段。
"""
result: List[Dict[str, Any]] = []
if not _MCP_AVAILABLE:
return result
servers = build_mcp_toolsets(configs)
for server in servers:
server_id = getattr(server, "id", None)
cfg = configs.get(server_id, {}) if server_id else {}
name = cfg.get("name", server_id or "unknown")
transport = cfg.get("transport", "stdio")
item: Dict[str, Any] = {
"server_id": server_id,
"name": name,
"transport": transport,
"tool_prefix": cfg.get("tool_prefix"),
"tools": [],
}
try:
async with server:
tools = await server.get_tools()
item["tools"] = [
getattr(t, "name", None) or getattr(t, "tool_name", str(t))
for t in tools
]
except Exception as e:
item["error"] = str(e)
logger.warning(f"MCP server '{name}' list_tools failed: {e}")
result.append(item)
return result
async def list_mcp_tools_from_gsm() -> List[Dict[str, Any]]:
"""从 GlobalStateMachine 拉取配置后调用 :func:`list_mcp_tools_for_configs`。"""
if not _MCP_AVAILABLE:
return []
try:
from kilostar.core.global_state_machine.gsm_snapshot import fetch_snapshot
snapshot = await fetch_snapshot()
return await list_mcp_tools_for_configs(snapshot.mcp_servers)
except Exception as e:
logger.error(f"Failed to list MCP tools from GSM: {e}")
return []