"""``GlobalStateMachine`` 中 MCP/ToolConfig/CustomToolset 注册表测试。 GSM 现在走 PostgresDatabase Actor,这里绕过 Ray 直接构造实例, 用 AsyncMock 模拟 postgres_database 的 remote 调用。 """ from __future__ import annotations from typing import Any, Dict from unittest.mock import AsyncMock, MagicMock import pytest from kilostar.core.global_state_machine import global_state_machine as gsm_module @pytest.fixture def gsm_instance(monkeypatch): GSMClass = gsm_module.GlobalStateMachine.__ray_actor_class__ obj = GSMClass.__new__(GSMClass) obj._mcp_servers = {} obj._tool_configs = {} obj._custom_toolsets = {} obj._global_provider_manager = MagicMock() obj._global_tool_manager = MagicMock() obj._global_tool_manager.is_third_party_tool = lambda name: name.startswith("tp_") obj._global_tool_manager.rebuild_custom_toolsets = MagicMock() obj._global_skill_manager = MagicMock() obj._global_individual_manager = MagicMock() obj.postgres_database = MagicMock() # 新加的 object-store 快照状态 obj._config_version = 0 obj._current_ref = None # 这套老测试覆盖的是注册表行为,不关心快照发布; # 把 _publish_snapshot 替换成 no-op 计数器,避免触达 ray.put 与 manager 内部细节 obj._publish_count = 0 def _stub_publish(): obj._publish_count += 1 obj._config_version += 1 obj._publish_snapshot = _stub_publish return obj # ─── MCP server registry ──────────────────────────────────────────────────── @pytest.mark.asyncio async def test_add_mcp_server(gsm_instance): obj = gsm_instance saved = {"server_id": "fs", "name": "fs", "transport": "stdio"} obj.postgres_database.upsert_mcp_server.remote = AsyncMock(return_value=saved) ok = await obj.add_mcp_server("fs", {"name": "fs", "transport": "stdio"}) assert ok is True assert obj._mcp_servers["fs"] == saved def test_get_mcp_server_configs_returns_copy(gsm_instance): obj = gsm_instance obj._mcp_servers["fs"] = {"name": "fs", "transport": "stdio"} res1 = obj.get_mcp_server_configs() res1["fs"] = {"mutated": True} res2 = obj.get_mcp_server_configs() assert res2["fs"]["name"] == "fs" def test_get_mcp_server_returns_none_when_missing(gsm_instance): assert gsm_instance.get_mcp_server("nope") is None def test_list_mcp_servers_includes_server_id(gsm_instance): obj = gsm_instance obj._mcp_servers["fs"] = {"name": "fs", "transport": "stdio"} listed = obj.list_mcp_servers() assert listed[0]["server_id"] == "fs" assert listed[0]["name"] == "fs" @pytest.mark.asyncio async def test_delete_mcp_server(gsm_instance): obj = gsm_instance obj._mcp_servers["fs"] = {"name": "fs"} obj.postgres_database.delete_mcp_server_db.remote = AsyncMock(return_value=True) assert await obj.delete_mcp_server("fs") is True assert "fs" not in obj._mcp_servers @pytest.mark.asyncio async def test_delete_unknown_mcp_server(gsm_instance): obj = gsm_instance obj.postgres_database.delete_mcp_server_db.remote = AsyncMock(return_value=False) assert await obj.delete_mcp_server("nope") is False # ─── tool_configs ─────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_set_and_get_tool_config(gsm_instance): obj = gsm_instance obj.postgres_database.upsert_tool_config.remote = AsyncMock( return_value={"tool_name": "tavily_search", "config": {"api_key": "xxx"}} ) await obj.set_tool_config("tavily_search", {"api_key": "xxx"}) assert obj.get_tool_config("tavily_search") == {"api_key": "xxx"} def test_get_unknown_tool_config_returns_empty(gsm_instance): assert gsm_instance.get_tool_config("not_exist") == {} def test_get_tool_config_is_isolated_copy(gsm_instance): obj = gsm_instance obj._tool_configs["tavily_search"] = {"api_key": "xxx"} snapshot = obj.get_tool_config("tavily_search") snapshot["api_key"] = "changed" assert obj.get_tool_config("tavily_search") == {"api_key": "xxx"} @pytest.mark.asyncio async def test_delete_tool_config(gsm_instance): obj = gsm_instance obj._tool_configs["tavily_search"] = {"api_key": "xxx"} obj.postgres_database.delete_tool_config_db.remote = AsyncMock(return_value=True) assert await obj.delete_tool_config("tavily_search") is True assert obj.get_tool_config("tavily_search") == {} @pytest.mark.asyncio async def test_delete_unknown_tool_config(gsm_instance): obj = gsm_instance obj.postgres_database.delete_tool_config_db.remote = AsyncMock(return_value=False) assert await obj.delete_tool_config("not_exist") is False def test_list_tool_configs(gsm_instance): obj = gsm_instance obj._tool_configs["tavily_search"] = {"api_key": "xxx"} obj._tool_configs["notion"] = {"token": "yyy"} raw = obj.list_tool_configs() assert raw["tavily_search"] == {"api_key": "xxx"} assert raw["notion"] == {"token": "yyy"} # ─── Custom Toolset ───────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_add_custom_toolset_success(gsm_instance): obj = gsm_instance saved = {"toolset_id": "t1", "name": "my-set", "tools": ["tp_a", "tp_b"]} obj.postgres_database.upsert_custom_toolset.remote = AsyncMock(return_value=saved) result = await obj.add_custom_toolset( toolset_id="t1", name="my-set", tools=["tp_a", "tp_b"] ) assert result == saved assert obj._custom_toolsets["t1"] == saved obj._global_tool_manager.rebuild_custom_toolsets.assert_called() @pytest.mark.asyncio async def test_add_custom_toolset_rejects_system_tools(gsm_instance): obj = gsm_instance with pytest.raises(ValueError, match="不合法"): await obj.add_custom_toolset( toolset_id="t2", name="bad", tools=["system_tool"] ) def test_list_custom_toolsets(gsm_instance): obj = gsm_instance obj._custom_toolsets["t1"] = {"toolset_id": "t1", "name": "a", "tools": []} assert len(obj.list_custom_toolsets()) == 1 def test_get_custom_toolset(gsm_instance): obj = gsm_instance obj._custom_toolsets["t1"] = {"toolset_id": "t1", "name": "a"} assert obj.get_custom_toolset("t1")["name"] == "a" assert obj.get_custom_toolset("nope") is None @pytest.mark.asyncio async def test_delete_custom_toolset(gsm_instance): obj = gsm_instance obj._custom_toolsets["t1"] = {"toolset_id": "t1"} obj.postgres_database.delete_custom_toolset.remote = AsyncMock(return_value=True) assert await obj.delete_custom_toolset("t1") is True assert "t1" not in obj._custom_toolsets obj._global_tool_manager.rebuild_custom_toolsets.assert_called()