# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """MCP 辅助模块:根据全局状态机中的配置创建 pydantic-ai MCPServer 实例。""" from typing import Dict, List, Any, Optional, Sequence from kilostar.utils.logger import get_logger logger = get_logger("mcp_helper") # 延迟导入 pydantic_ai.mcp,避免在 MCP 包未安装时崩溃 try: from pydantic_ai.mcp import ( MCPServerStdio, MCPServerSSE, MCPServerHTTP, ) _MCP_AVAILABLE = True except ImportError: _MCP_AVAILABLE = False logger.warning("MCP package not installed. MCP servers will not be available.") def build_mcp_toolsets(configs: Dict[str, Dict[str, Any]]) -> List[Any]: """根据配置字典创建 MCPServer 实例列表。 Args: configs: {server_id: {"name": ..., "transport": ..., ...}} Returns: MCPServer 实例列表(可直接传给 Agent 的 toolsets 参数) """ if not _MCP_AVAILABLE: return [] toolsets = [] for server_id, cfg in configs.items(): try: transport = cfg.get("transport", "stdio") tool_prefix = cfg.get("tool_prefix") name = cfg.get("name", server_id) if transport == "stdio": server = MCPServerStdio( command=cfg.get("command", ""), args=cfg.get("args", []), env=cfg.get("env"), tool_prefix=tool_prefix, id=server_id, ) elif transport == "sse": server = MCPServerSSE( url=cfg.get("url", ""), tool_prefix=tool_prefix, id=server_id, ) elif transport == "http": server = MCPServerHTTP( url=cfg.get("url", ""), tool_prefix=tool_prefix, id=server_id, ) else: logger.warning(f"Unsupported MCP transport: {transport} for server {name}") continue toolsets.append(server) logger.info(f"MCP server '{name}' ({transport}) registered as toolset") except Exception as e: logger.error(f"Failed to build MCP server '{server_id}': {e}") return toolsets async def get_mcp_toolsets_from_gsm() -> List[Any]: """从 GlobalStateMachine 拉取 MCP 配置并构建 toolsets。""" if not _MCP_AVAILABLE: return [] try: from kilostar.core.global_state_machine.gsm_snapshot import fetch_snapshot # 走快照:MCP 配置变更频率极低,本地缓存命中率近 100% snapshot = await fetch_snapshot() return build_mcp_toolsets(snapshot.mcp_servers) except Exception as e: logger.error(f"Failed to load MCP configs from GSM: {e}") return [] async def get_all_toolsets_for_scope( scope: str, toolset_ids: Optional[List[str]] = None ) -> List[Any]: """汇总某个 scope 下的全部 toolset:system + personal + mcp。 Args: scope: 调用方所属 scope。 toolset_ids: agent 配置的 toolset 列表;为 None 表示返回全部。 返回顺序保持稳定:先本地 toolset(按 toolset_ids),再 MCP toolset。 任意一类拉取失败仅记录日志,不影响其他类。 """ toolsets: List[Any] = [] try: from kilostar.core.global_state_machine.gsm_snapshot import ( build_toolsets_for_scope, fetch_snapshot, ) snapshot = await fetch_snapshot() local = build_toolsets_for_scope(snapshot, scope, toolset_ids=toolset_ids) if local: toolsets.extend(local) except Exception as e: logger.error(f"Failed to load local toolsets from GSM ({scope}): {e}") toolsets.extend(await get_mcp_toolsets_from_gsm()) return toolsets async def get_retrieval_toolsets_for_scope(scope: str) -> List[Any]: """仅返回 retrieval 工具集(system_node 专用)。不含 generation 和 MCP 工具。""" toolsets: List[Any] = [] try: gsm = ray_actor_hook("global_state_machine").global_state_machine retrieval = await gsm.get_retrieval_toolsets_for_scope.remote(scope) if retrieval: toolsets.extend(retrieval) except Exception as e: logger.error(f"Failed to load retrieval toolsets ({scope}): {e}") return toolsets async def list_mcp_tools_for_configs( configs: Dict[str, Dict[str, Any]], ) -> List[Dict[str, Any]]: """对每个 MCP 服务器逐个尝试连接,列出它们暴露的工具名。 实现层面会进入 ``async with server:`` 上下文,调用一次 ``get_tools()``, 再把工具名(带 tool_prefix)抽出来。任何一个 server 失败都不影响其他 server, 出错时该项 ``tools=[]`` 并附带 ``error`` 字段。 """ result: List[Dict[str, Any]] = [] if not _MCP_AVAILABLE: return result servers = build_mcp_toolsets(configs) for server in servers: server_id = getattr(server, "id", None) cfg = configs.get(server_id, {}) if server_id else {} name = cfg.get("name", server_id or "unknown") transport = cfg.get("transport", "stdio") item: Dict[str, Any] = { "server_id": server_id, "name": name, "transport": transport, "tool_prefix": cfg.get("tool_prefix"), "tools": [], } try: async with server: tools = await server.get_tools() item["tools"] = [ getattr(t, "name", None) or getattr(t, "tool_name", str(t)) for t in tools ] except Exception as e: item["error"] = str(e) logger.warning(f"MCP server '{name}' list_tools failed: {e}") result.append(item) return result async def list_mcp_tools_from_gsm() -> List[Dict[str, Any]]: """从 GlobalStateMachine 拉取配置后调用 :func:`list_mcp_tools_for_configs`。""" if not _MCP_AVAILABLE: return [] try: from kilostar.core.global_state_machine.gsm_snapshot import fetch_snapshot snapshot = await fetch_snapshot() return await list_mcp_tools_for_configs(snapshot.mcp_servers) except Exception as e: logger.error(f"Failed to list MCP tools from GSM: {e}") return []