457d12834f
通过 StandaloneProxy 适配层让 .remote() 调用在单机模式下透明降级为 asyncio 协程调用,7 个 Actor 和 workflow task 均可在纯 asyncio 环境运行, 启动快、资源占用低。分布式模式行为完全不变。 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
87 lines
2.5 KiB
Python
87 lines
2.5 KiB
Python
"""KiloStar 单机模式适配层:用 asyncio 协程模拟 Ray Actor 接口。
|
|
|
|
单机模式下,所有 Actor 退化为普通 Python 异步单例,通过 StandaloneProxy
|
|
包装后暴露与 Ray Actor Handle 相同的 `.method.remote(args)` 调用接口,
|
|
使上层代码在两种模式间无感切换。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
from typing import Any
|
|
|
|
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
|
|
|
|
|
|
class _MethodProxy:
|
|
"""包装单个方法,使 .remote(*args, **kwargs) 返回一个可 await 的 Task。"""
|
|
|
|
__slots__ = ("_method",)
|
|
|
|
def __init__(self, method: Any):
|
|
self._method = method
|
|
|
|
def remote(self, *args: Any, **kwargs: Any) -> asyncio.Task:
|
|
async def _invoke():
|
|
result = self._method(*args, **kwargs)
|
|
if asyncio.iscoroutine(result):
|
|
return await result
|
|
return result
|
|
|
|
return asyncio.ensure_future(_invoke())
|
|
|
|
|
|
class StandaloneProxy:
|
|
"""包装一个普通 Python 实例,模拟 Ray Actor Handle 的属性访问接口。
|
|
|
|
用法:proxy.some_method.remote(x, y) → 等效于 await instance.some_method(x, y)
|
|
"""
|
|
|
|
__slots__ = ("_instance",)
|
|
|
|
def __init__(self, instance: Any):
|
|
object.__setattr__(self, "_instance", instance)
|
|
|
|
def __getattr__(self, name: str) -> _MethodProxy:
|
|
attr = getattr(object.__getattribute__(self, "_instance"), name)
|
|
if callable(attr):
|
|
return _MethodProxy(attr)
|
|
return attr
|
|
|
|
|
|
# ─── 条件装饰器 ───
|
|
|
|
|
|
def actor_class(cls):
|
|
"""条件装饰器:分布式模式 → @ray.remote,单机模式 → 原样返回类。"""
|
|
if _STANDALONE:
|
|
return cls
|
|
import ray
|
|
return ray.remote(cls)
|
|
|
|
|
|
def remote_task(func):
|
|
"""条件装饰器:分布式 → @ray.remote(func),单机 → .remote() 转为 asyncio task。
|
|
|
|
单机模式下返回一个 stub 对象,其 .remote() 方法把函数以协程方式调度到
|
|
当前事件循环(workflow task 需要用 await 版本的 _entry,由调用方处理)。
|
|
"""
|
|
if _STANDALONE:
|
|
|
|
class _TaskProxy:
|
|
@staticmethod
|
|
def remote(*args, **kwargs):
|
|
async def _run():
|
|
result = func(*args, **kwargs)
|
|
if asyncio.iscoroutine(result):
|
|
return await result
|
|
return result
|
|
|
|
return asyncio.ensure_future(_run())
|
|
|
|
return _TaskProxy()
|
|
|
|
import ray
|
|
return ray.remote(func)
|