"""GlobalPluginManager:重型插件统一管理 actor。 职责: - 启动期扫描 ``data/plugin/`` 下所有组织,依次 setup - 运行期提供 install / uninstall / reload 三个热装接口 - 把每个组织注册为 cabinet tool + 挂 FastAPI router """ from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional from kilostar.plugin_runtime.loader import ( discover_plugins, install_dependencies, load_plugin, ) from kilostar.plugin_runtime.manifest import OrgManifest from kilostar.plugin_runtime.tool_bridge import make_dispatch_tool from kilostar.utils.logger import get_logger from kilostar.utils.ray_compat import _STANDALONE, actor_class from kilostar.utils.ray_hook import register_standalone from kilostar.utils.settings import get_plugin_data_dir, get_plugin_dir logger = get_logger("plugin_manager") @actor_class class GlobalPluginManager: """单机模式下是对象,分布式下是 ray actor。 每个 loaded 组织保存其 manifest 和 actor handle(standalone=proxy,dist=ray handle)。 """ def __init__(self): self._orgs: Dict[str, Dict[str, Any]] = {} self._dispatch_tools: Dict[str, Any] = {} async def bootstrap(self) -> None: """启动期一次性扫描并加载所有插件。""" plugin_root = get_plugin_dir() plugin_dirs = discover_plugins(plugin_root) for plugin_dir in plugin_dirs: try: await self._install_from_path(plugin_dir) except Exception as e: logger.error(f"bootstrap: failed to load plugin {plugin_dir.name}: {e}") # ─── 热装载接口 ───────────────────────────────────────────── async def install(self, name: str) -> Dict[str, Any]: """热装载一个插件(按目录名)。""" plugin_dir = get_plugin_dir() / name if not plugin_dir.exists(): raise FileNotFoundError(f"plugin dir not found: {plugin_dir}") if name in self._orgs: await self.uninstall(name) await self._install_from_path(plugin_dir) return {"name": name, "status": "installed"} async def uninstall(self, name: str) -> Dict[str, Any]: """卸载一个插件。""" org_info = self._orgs.pop(name, None) if org_info is None: return {"name": name, "status": "not_found"} # shutdown actor try: handle = org_info.get("handle") if handle is not None: await handle.shutdown.remote() except Exception as e: logger.warning(f"shutdown org_{name} failed: {e}") # 移除 dispatch tool self._dispatch_tools.pop(f"dispatch_to_{name}", None) logger.info(f"uninstalled plugin: {name}") return {"name": name, "status": "uninstalled"} async def reload(self, name: str) -> Dict[str, Any]: """热重载(卸载 + 安装)。""" await self.uninstall(name) return await self.install(name) # ─── 查询接口 ────────────────────────────────────────────── def list_plugins(self) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] plugin_root = get_plugin_dir() for name, info in self._orgs.items(): manifest = info.get("manifest", {}) or {} ui = manifest.get("ui", {}) or {} wc_manifest = plugin_root / name / "frontend" / "dist" / "wc-manifest.json" out.append({ "name": name, "display_name": info.get("display_name", name), "description": info.get("description", ""), "status": "running", "has_ui": wc_manifest.exists(), "icon": ui.get("icon"), }) return out def get_dispatch_tools(self) -> Dict[str, Any]: """返回所有 dispatch tools 的 {tool_name: callable} 字典。""" return dict(self._dispatch_tools) # ─── 内部 ────────────────────────────────────────────────── async def _install_from_path(self, plugin_dir: Path) -> None: cls, manifest_dict, agents_dict, dir_str = load_plugin(plugin_dir) manifest = OrgManifest.model_validate(manifest_dict) name = manifest.name # 装依赖 if manifest.dependencies.python: await install_dependencies(manifest.dependencies.python) # 实例化 organization actor instance = cls(manifest_dict, agents_dict, dir_str) # 一次性安装钩子:marker 文件不存在时调用 on_first_install marker = get_plugin_data_dir(name) / ".installed" first_install = not marker.exists() if first_install: try: await instance.on_first_install() except Exception as e: logger.exception( f"plugin {name} on_first_install failed: {e}; aborting install" ) raise marker.write_text(manifest.version, encoding="utf-8") # 把 agents.json 的 slot 登记到 plugin_owned 表(best-effort,DB 不可用时静默跳过) await self._register_plugin_slots(name, agents_dict) await instance.setup() # 注册到 ray_actor_hook 命名空间 actor_name = manifest.actor_name if _STANDALONE: register_standalone(actor_name, instance) else: # 分布式模式下,这里需要把 instance 包装成 ray actor # 第一版走 standalone 逻辑(两种模式统一 register 到本进程) # 真正分布式隔离等后续做 register_standalone(actor_name, instance) # 生成 dispatch tool tool = make_dispatch_tool(name, manifest.display_name, manifest.description) self._dispatch_tools[f"dispatch_to_{name}"] = tool self._orgs[name] = { "display_name": manifest.display_name, "description": manifest.description, "manifest": manifest_dict, "handle": instance, "actor_name": actor_name, } logger.info(f"loaded plugin: {name} (actor={actor_name})") async def _register_plugin_slots(self, name: str, agents_dict: Dict[str, Any]) -> None: """把插件 agents.json 中的每个 agent upsert 为一行 plugin_owned slot。 只刷新 description/node_affinity;用户在前端配置的 provider/model 不被覆盖。 DB 不可用时静默跳过(standalone 启动早期 / 单测场景)。 """ try: from kilostar.utils.ray_hook import ray_actor_hook pg = ray_actor_hook("postgres_database").postgres_database for adef in agents_dict.get("agents", []): slot_name = adef.get("name") if not slot_name: continue description = adef.get("role") or adef.get("system_prompt") or slot_name await pg.upsert_plugin_slot.remote( plugin_name=name, slot_name=slot_name, description=description, ) except Exception as e: logger.debug(f"register_plugin_slots skipped for {name}: {e}") async def cleanup_orphan_plugin_slots(self) -> None: """启动期兜底:DB 中存在但目录已不在的 plugin_owned slot 全部清掉。""" try: from kilostar.utils.ray_hook import ray_actor_hook pg = ray_actor_hook("postgres_database").postgres_database recorded: List[str] = await pg.list_plugin_owned_names.remote() or [] except Exception as e: logger.debug(f"cleanup_orphan_plugin_slots skipped: {e}") return plugin_root = get_plugin_dir() present = {p.name for p in plugin_root.iterdir() if p.is_dir()} if plugin_root.exists() else set() for plugin_name in recorded: if plugin_name not in present: try: n = await pg.delete_plugin_slots.remote(plugin_name) logger.info(f"cleaned {n} orphan slots for missing plugin {plugin_name!r}") except Exception as e: logger.warning(f"failed to clean orphan slots for {plugin_name}: {e}")