Files
KiloStar/tests/unit/test_plugin_runtime.py
2026-07-01 09:22:26 +00:00

269 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Tests for the heavy plugin (Organization) runtime."""
import json
from pathlib import Path
import pytest
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from kilostar.plugin_runtime.base_organization import BaseOrganization
from kilostar.plugin_runtime.manifest import OrgManifest
from kilostar.plugin_runtime.agents_config import AgentsConfig
from kilostar.plugin_runtime.loader import (
collect_plugin_routers,
discover_plugin_api,
discover_plugins,
load_plugin,
)
from kilostar.plugin_runtime.tool_bridge import make_dispatch_tool
_PLUGIN_ROOT = Path(__file__).parent.parent.parent / "data" / "plugin"
_EXAMPLE_DEPT = _PLUGIN_ROOT / "example_dept"
def test_example_dept_structure():
"""example_dept stub has the required files."""
assert (_EXAMPLE_DEPT / "manifest.json").exists()
assert (_EXAMPLE_DEPT / "agents.json").exists()
assert (_EXAMPLE_DEPT / "README.md").exists()
assert (_EXAMPLE_DEPT / "core" / "organization.py").exists()
def test_example_dept_manifest_valid():
with open(_EXAMPLE_DEPT / "manifest.json", "r", encoding="utf-8") as f:
data = json.load(f)
manifest = OrgManifest.model_validate(data)
assert manifest.name == "example_dept"
assert manifest.actor_name == "org_example_dept"
assert manifest.entry == "core.organization:ExampleOrganization"
def test_example_dept_agents_valid():
with open(_EXAMPLE_DEPT / "agents.json", "r", encoding="utf-8") as f:
data = json.load(f)
config = AgentsConfig.model_validate(data)
assert len(config.agents) == 2
names = {a.name for a in config.agents}
assert names == {"analyst", "executor"}
assert config.orchestration.entry == "analyst"
analyst = config.get("analyst")
assert analyst is not None
assert "executor" in analyst.peers
def test_discover_plugins_finds_example_dept():
plugins = discover_plugins(_PLUGIN_ROOT)
names = {p.name for p in plugins}
assert "example_dept" in names
def test_load_plugin_returns_class():
cls, manifest_dict, agents_dict, dir_str = load_plugin(_EXAMPLE_DEPT)
assert cls.__name__ == "ExampleOrganization"
assert manifest_dict["name"] == "example_dept"
assert "agents" in agents_dict
assert dir_str == str(_EXAMPLE_DEPT)
def test_make_dispatch_tool_signature():
tool = make_dispatch_tool("example_dept", "示例部门", "演示用")
assert tool.__name__ == "dispatch_to_example_dept"
assert callable(tool)
assert "演示用" in tool.__doc__
# ─── 框架层:SQLite 隔离基础设施 ─────────────────────────────────
class _PluginBase(DeclarativeBase):
pass
class _PluginThing(_PluginBase):
__tablename__ = "plugin_thing"
id = Column(Integer, primary_key=True)
name = Column(String(50))
@pytest.mark.anyio
async def test_init_local_db_creates_sqlite_with_metadata(tmp_path, monkeypatch):
"""init_local_db 在插件 _data 目录下落 SQLitecreate_all 建表,session_maker 可用。"""
from kilostar.utils import settings as _settings_mod
monkeypatch.setenv("KILOSTAR_PLUGIN_DIR", str(tmp_path))
_settings_mod.get_settings.cache_clear()
manifest = {"name": "tplug", "version": "0.1.0", "display_name": "测试插件"}
agents = {
"agents": [
{"name": "a", "role": "r", "model": {"provider_title": "p", "model_id": "m"}}
],
"orchestration": {"type": "react", "entry": "a"},
}
org = BaseOrganization(manifest, agents, str(tmp_path / "tplug"))
try:
await org.init_local_db([_PluginBase])
db_path = tmp_path / "tplug" / "_data" / "tplug.db"
assert db_path.exists()
assert org._engine is not None and org._session_maker is not None
async with org._session_maker() as session:
session.add(_PluginThing(name="hello"))
await session.commit()
from sqlalchemy import select
async with org._session_maker() as session:
row = (await session.execute(select(_PluginThing))).scalar_one()
assert row.name == "hello"
finally:
if org._engine is not None:
await org._engine.dispose()
_settings_mod.get_settings.cache_clear()
@pytest.mark.anyio
async def test_on_first_install_default_is_noop():
"""默认 on_first_install 是空实现,子类可覆盖;调用不抛错。"""
manifest = {"name": "noop", "version": "0.1.0"}
agents = {
"agents": [{"name": "a", "role": "r", "model": {"provider_title": "p", "model_id": "m"}}],
"orchestration": {"type": "react", "entry": "a"},
}
org = BaseOrganization(manifest, agents, "/tmp/x")
result = await org.on_first_install()
assert result is None
@pytest.mark.anyio
async def test_install_marker_drives_on_first_install(tmp_path, monkeypatch):
"""首次装载触发 on_first_install + 写 marker;二次装载不再触发。"""
from kilostar.utils import settings as _settings_mod
from kilostar.utils import ray_compat
monkeypatch.setattr(ray_compat, "_STANDALONE", True)
monkeypatch.setenv("KILOSTAR_PLUGIN_DIR", str(tmp_path))
_settings_mod.get_settings.cache_clear()
# 强制走 standalone 分支,重新 import plugin_manager 以 re-decorate
import importlib
import kilostar.plugin_runtime.plugin_manager as pm_mod
importlib.reload(pm_mod)
GlobalPluginManager = pm_mod.GlobalPluginManager
plugin_dir = tmp_path / "demo_plug"
(plugin_dir / "core").mkdir(parents=True)
(plugin_dir / "manifest.json").write_text(json.dumps({
"name": "demo_plug",
"version": "0.1.0",
"display_name": "demo",
}))
(plugin_dir / "agents.json").write_text(json.dumps({
"agents": [{"name": "a", "role": "r", "model": {"provider_title": "p", "model_id": "m"}}],
"orchestration": {"type": "react", "entry": "a"},
}))
install_count = {"n": 0}
async def _no_setup(self_):
return None
async def _count_install(self_):
install_count["n"] += 1
monkeypatch.setattr(BaseOrganization, "setup", _no_setup)
monkeypatch.setattr(BaseOrganization, "on_first_install", _count_install)
try:
pm = GlobalPluginManager()
await pm._install_from_path(plugin_dir)
assert install_count["n"] == 1
marker = tmp_path / "demo_plug" / "_data" / ".installed"
assert marker.exists()
# 模拟"二次装载":清掉内存态后重装
pm._orgs.pop("demo_plug", None)
await pm._install_from_path(plugin_dir)
assert install_count["n"] == 1 # 不应再触发
finally:
_settings_mod.get_settings.cache_clear()
# ─── 框架层:插件 API router 自动挂载 ────────────────────────────
def _write_plugin_skeleton(root: Path, name: str, *, with_api: bool, api_prefix: str | None) -> Path:
plugin_dir = root / name
(plugin_dir / "core").mkdir(parents=True)
manifest = {
"name": name,
"version": "0.1.0",
"display_name": name,
}
if api_prefix:
manifest["api_prefix"] = api_prefix
(plugin_dir / "manifest.json").write_text(json.dumps(manifest))
(plugin_dir / "agents.json").write_text(json.dumps({
"agents": [{"name": "a", "role": "r", "model": {"provider_title": "p", "model_id": "m"}}],
"orchestration": {"type": "react", "entry": "a"},
}))
if with_api:
(plugin_dir / "api.py").write_text(
"from fastapi import APIRouter\n"
"router = APIRouter()\n"
"@router.get('/ping')\n"
"async def _ping():\n"
" return {'ok': True}\n"
)
return plugin_dir
def test_discover_plugin_api_returns_router_when_present(tmp_path):
plugin_dir = _write_plugin_skeleton(tmp_path, "p_with_api", with_api=True, api_prefix="/x")
router = discover_plugin_api(plugin_dir, "p_with_api")
assert router is not None
assert any(r.path == "/ping" for r in router.routes)
def test_discover_plugin_api_returns_none_when_missing(tmp_path):
plugin_dir = _write_plugin_skeleton(tmp_path, "p_no_api", with_api=False, api_prefix="/x")
assert discover_plugin_api(plugin_dir, "p_no_api") is None
def test_collect_plugin_routers_filters_by_api_prefix_and_api_py(tmp_path):
"""没 api_prefix 的不挂;没 api.py 的也不挂;都满足才返回。"""
_write_plugin_skeleton(tmp_path, "ok", with_api=True, api_prefix="/api/v1/plugin/ok")
_write_plugin_skeleton(tmp_path, "no_prefix", with_api=True, api_prefix=None)
_write_plugin_skeleton(tmp_path, "no_api", with_api=False, api_prefix="/api/v1/plugin/no_api")
routers = collect_plugin_routers(tmp_path)
prefixes = [p for p, _r in routers]
assert prefixes == ["/api/v1/plugin/ok"]
# ─── 框架层:plugin_owned slot ────────────────────────────
def test_agent_def_model_is_optional():
"""slot 形态:agents.json 里 model 可省略,等用户在前端装配。"""
cfg = AgentsConfig.model_validate({
"agents": [{"name": "analyst", "role": "数据分析师"}],
"orchestration": {"type": "react", "entry": "analyst"},
})
assert cfg.agents[0].model is None
def test_agent_def_model_static_still_works():
"""老插件留一手:写死 model 仍然合法。"""
cfg = AgentsConfig.model_validate({
"agents": [{
"name": "x", "role": "r",
"model": {"provider_title": "p", "model_id": "m"},
}],
"orchestration": {"type": "react", "entry": "x"},
})
assert cfg.agents[0].model is not None
assert cfg.agents[0].model.provider_title == "p"