Files
zhaoxi 6d658b4f4d feat: 工具系统迁移 + 重型插件骨架 + 前端交互增强
- 工具系统从 kilostar/plugin/tool_plugin/ 迁移到 data/toolset/(manifest.json 声明式)
- 新增 plugin_runtime 模块:BaseOrganization / GlobalPluginManager / loader / tool_bridge
- 新增 org_task + org_task_event 表及 DAO(alembic 0009)
- 新增 /api/v1/plugin 路由(submit/status/stream/install/reload)
- 新增 data/plugin/example_dept 示例重型插件
- regulatory_node 支持聊天历史上下文注入
- send_file 改为 artifact 存盘 + SSE 推送下载链接
- 前端 WorkflowFileCard 组件 + ToolSettings README 渲染
- utils 整理:合并 access/role_check、standalone_proxy→ray_compat、删除废弃模块
- 项目结构文档移至 docs/STRUCTURE.md 并详细展开

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 05:20:00 +00:00

107 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""KiloStar Ray 兼容层:单机/分布式模式无感切换 + 序列化工具。
单机模式下,所有 Actor 退化为普通 Python 异步单例,通过 StandaloneProxy
包装后暴露与 Ray Actor Handle 相同的 `.method.remote(args)` 调用接口,
使上层代码在两种模式间无感切换。
"""
from __future__ import annotations
import asyncio
import os
from typing import Any, Type, TypeVar
from pydantic import BaseModel
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
T = TypeVar("T", bound=Type[BaseModel])
class _MethodProxy:
"""包装单个方法,使 .remote(*args, **kwargs) 返回一个可 await 的 Task。"""
__slots__ = ("_method",)
def __init__(self, method: Any):
self._method = method
def remote(self, *args: Any, **kwargs: Any) -> asyncio.Task:
async def _invoke():
result = self._method(*args, **kwargs)
if asyncio.iscoroutine(result):
return await result
return result
return asyncio.ensure_future(_invoke())
class StandaloneProxy:
"""包装一个普通 Python 实例,模拟 Ray Actor Handle 的属性访问接口。
用法:proxy.some_method.remote(x, y) → 等效于 await instance.some_method(x, y)
"""
__slots__ = ("_instance",)
def __init__(self, instance: Any):
object.__setattr__(self, "_instance", instance)
def __getattr__(self, name: str) -> _MethodProxy:
attr = getattr(object.__getattribute__(self, "_instance"), name)
if callable(attr):
return _MethodProxy(attr)
return attr
# ─── 条件装饰器 ───
def actor_class(cls):
"""条件装饰器:分布式模式 → @ray.remote,单机模式 → 原样返回类。"""
if _STANDALONE:
return cls
import ray
return ray.remote(cls)
def remote_task(func):
"""条件装饰器:分布式 → @ray.remote(func),单机 → .remote() 转为 asyncio task。
单机模式下返回一个 stub 对象,其 .remote() 方法把函数以协程方式调度到
当前事件循环(workflow task 需要用 await 版本的 _entry,由调用方处理)。
"""
if _STANDALONE:
class _TaskProxy:
@staticmethod
def remote(*args, **kwargs):
async def _run():
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
return await result
return result
return asyncio.ensure_future(_run())
return _TaskProxy()
import ray
return ray.remote(func)
# ─── Pickle (Ray 序列化优化) ───
def pickle(cls: T) -> T:
"""类装饰器:用 Pydantic 的高效 JSON 序列化替代 Python 原生 __reduce__
使 Ray 跨进程通信时对 BaseModel 子类走 Rust 级序列化。
"""
def __reduce__(self):
data = self.model_dump_json()
return cls.model_validate_json, (data,)
cls.__reduce__ = __reduce__
return cls