Files
2026-07-01 09:22:26 +00:00

205 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""GlobalPluginManager:重型插件统一管理 actor。
职责:
- 启动期扫描 ``data/plugin/`` 下所有组织,依次 setup
- 运行期提供 install / uninstall / reload 三个热装接口
- 把每个组织注册为 cabinet tool + 挂 FastAPI router
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional
from kilostar.plugin_runtime.loader import (
discover_plugins,
install_dependencies,
load_plugin,
)
from kilostar.plugin_runtime.manifest import OrgManifest
from kilostar.plugin_runtime.tool_bridge import make_dispatch_tool
from kilostar.utils.logger import get_logger
from kilostar.utils.ray_compat import _STANDALONE, actor_class
from kilostar.utils.ray_hook import register_standalone
from kilostar.utils.settings import get_plugin_data_dir, get_plugin_dir
logger = get_logger("plugin_manager")
@actor_class
class GlobalPluginManager:
"""单机模式下是对象,分布式下是 ray actor。
每个 loaded 组织保存其 manifest 和 actor handlestandalone=proxydist=ray handle)。
"""
def __init__(self):
self._orgs: Dict[str, Dict[str, Any]] = {}
self._dispatch_tools: Dict[str, Any] = {}
async def bootstrap(self) -> None:
"""启动期一次性扫描并加载所有插件。"""
plugin_root = get_plugin_dir()
plugin_dirs = discover_plugins(plugin_root)
for plugin_dir in plugin_dirs:
try:
await self._install_from_path(plugin_dir)
except Exception as e:
logger.error(f"bootstrap: failed to load plugin {plugin_dir.name}: {e}")
# ─── 热装载接口 ─────────────────────────────────────────────
async def install(self, name: str) -> Dict[str, Any]:
"""热装载一个插件(按目录名)。"""
plugin_dir = get_plugin_dir() / name
if not plugin_dir.exists():
raise FileNotFoundError(f"plugin dir not found: {plugin_dir}")
if name in self._orgs:
await self.uninstall(name)
await self._install_from_path(plugin_dir)
return {"name": name, "status": "installed"}
async def uninstall(self, name: str) -> Dict[str, Any]:
"""卸载一个插件。"""
org_info = self._orgs.pop(name, None)
if org_info is None:
return {"name": name, "status": "not_found"}
# shutdown actor
try:
handle = org_info.get("handle")
if handle is not None:
await handle.shutdown.remote()
except Exception as e:
logger.warning(f"shutdown org_{name} failed: {e}")
# 移除 dispatch tool
self._dispatch_tools.pop(f"dispatch_to_{name}", None)
logger.info(f"uninstalled plugin: {name}")
return {"name": name, "status": "uninstalled"}
async def reload(self, name: str) -> Dict[str, Any]:
"""热重载(卸载 + 安装)。"""
await self.uninstall(name)
return await self.install(name)
# ─── 查询接口 ──────────────────────────────────────────────
def list_plugins(self) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
plugin_root = get_plugin_dir()
for name, info in self._orgs.items():
manifest = info.get("manifest", {}) or {}
ui = manifest.get("ui", {}) or {}
wc_manifest = plugin_root / name / "frontend" / "dist" / "wc-manifest.json"
out.append({
"name": name,
"display_name": info.get("display_name", name),
"description": info.get("description", ""),
"status": "running",
"has_ui": wc_manifest.exists(),
"icon": ui.get("icon"),
})
return out
def get_dispatch_tools(self) -> Dict[str, Any]:
"""返回所有 dispatch tools 的 {tool_name: callable} 字典。"""
return dict(self._dispatch_tools)
# ─── 内部 ──────────────────────────────────────────────────
async def _install_from_path(self, plugin_dir: Path) -> None:
cls, manifest_dict, agents_dict, dir_str = load_plugin(plugin_dir)
manifest = OrgManifest.model_validate(manifest_dict)
name = manifest.name
# 装依赖
if manifest.dependencies.python:
await install_dependencies(manifest.dependencies.python)
# 实例化 organization actor
instance = cls(manifest_dict, agents_dict, dir_str)
# 一次性安装钩子:marker 文件不存在时调用 on_first_install
marker = get_plugin_data_dir(name) / ".installed"
first_install = not marker.exists()
if first_install:
try:
await instance.on_first_install()
except Exception as e:
logger.exception(
f"plugin {name} on_first_install failed: {e}; aborting install"
)
raise
marker.write_text(manifest.version, encoding="utf-8")
# 把 agents.json 的 slot 登记到 plugin_owned 表(best-effortDB 不可用时静默跳过)
await self._register_plugin_slots(name, agents_dict)
await instance.setup()
# 注册到 ray_actor_hook 命名空间
actor_name = manifest.actor_name
if _STANDALONE:
register_standalone(actor_name, instance)
else:
# 分布式模式下,这里需要把 instance 包装成 ray actor
# 第一版走 standalone 逻辑(两种模式统一 register 到本进程)
# 真正分布式隔离等后续做
register_standalone(actor_name, instance)
# 生成 dispatch tool
tool = make_dispatch_tool(name, manifest.display_name, manifest.description)
self._dispatch_tools[f"dispatch_to_{name}"] = tool
self._orgs[name] = {
"display_name": manifest.display_name,
"description": manifest.description,
"manifest": manifest_dict,
"handle": instance,
"actor_name": actor_name,
}
logger.info(f"loaded plugin: {name} (actor={actor_name})")
async def _register_plugin_slots(self, name: str, agents_dict: Dict[str, Any]) -> None:
"""把插件 agents.json 中的每个 agent upsert 为一行 plugin_owned slot。
只刷新 description/node_affinity;用户在前端配置的 provider/model 不被覆盖。
DB 不可用时静默跳过(standalone 启动早期 / 单测场景)。
"""
try:
from kilostar.utils.ray_hook import ray_actor_hook
pg = ray_actor_hook("postgres_database").postgres_database
for adef in agents_dict.get("agents", []):
slot_name = adef.get("name")
if not slot_name:
continue
description = adef.get("role") or adef.get("system_prompt") or slot_name
await pg.upsert_plugin_slot.remote(
plugin_name=name,
slot_name=slot_name,
description=description,
)
except Exception as e:
logger.debug(f"register_plugin_slots skipped for {name}: {e}")
async def cleanup_orphan_plugin_slots(self) -> None:
"""启动期兜底:DB 中存在但目录已不在的 plugin_owned slot 全部清掉。"""
try:
from kilostar.utils.ray_hook import ray_actor_hook
pg = ray_actor_hook("postgres_database").postgres_database
recorded: List[str] = await pg.list_plugin_owned_names.remote() or []
except Exception as e:
logger.debug(f"cleanup_orphan_plugin_slots skipped: {e}")
return
plugin_root = get_plugin_dir()
present = {p.name for p in plugin_root.iterdir() if p.is_dir()} if plugin_root.exists() else set()
for plugin_name in recorded:
if plugin_name not in present:
try:
n = await pg.delete_plugin_slots.remote(plugin_name)
logger.info(f"cleaned {n} orphan slots for missing plugin {plugin_name!r}")
except Exception as e:
logger.warning(f"failed to clean orphan slots for {plugin_name}: {e}")