# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import ray from functools import lru_cache class ActorList: """属性式访问的简易容器,用 ``a.actor_name`` 取代 ``d["actor_name"]``。""" def __init__(self): super().__setattr__("dict", {}) def __setattr__(self, key, value): self.dict[key] = value def __getattr__(self, key): if key in self.dict: return self.dict[key] raise AttributeError(f"ActorList 对象没有属性 '{key}'") def __delattr__(self, key): if key in self.dict: del self.dict[key] else: raise AttributeError(f"ActorList对象没有属性 '{key}'") @lru_cache(maxsize=128) def _get_cached_actor_handle(actor_name: str): """缓存接口""" return ray.get_actor(actor_name, namespace="kilostar") def clear_actor_cache(): """清理接口""" _get_cached_actor_handle.cache_clear() def wait_for_actor( actor_name: str, *, timeout: float = 10.0, interval: float = 0.5 ): """阻塞等待某个 actor 就绪,返回其句柄。 用于"启动期 / ray task 入口刚拉起"这类场景——被依赖的 actor 可能还没注册。 在 ``timeout`` 内按 ``interval`` 轮询 ``ray.get_actor``;拿到就立即返回, 超时则抛带清晰上下文的 ``TimeoutError``(而不是裸 ``ValueError``)。 Args: actor_name: actor 注册名 timeout: 最长等待秒数;``<=0`` 表示只试一次(等价于直接取句柄) interval: 轮询间隔秒数 Raises: TimeoutError: 超时仍未就绪。原始异常通过 ``raise ... from`` 链保留。 """ deadline = time.monotonic() + max(timeout, 0.0) last_err: Exception | None = None while True: try: return _get_cached_actor_handle(actor_name) except Exception as e: # ray.get_actor 失败一般是 ValueError last_err = e # 失败不能让 lru_cache 留下脏数据(异常本身不会被缓存, # 但若底层换实现,这里清一次更稳妥) if time.monotonic() >= deadline: raise TimeoutError( f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}" ) from last_err time.sleep(interval) def ray_actor_hook(*actor_names: str, timeout: float = 0.0, interval: float = 0.5): """按名字批量取出 Ray Actor 句柄,组装成一个 ``ActorList`` 返回。 例:``actors = ray_actor_hook("postgres_database", "global_state_machine")``, 随后即可用 ``actors.postgres_database`` 拿到对应句柄。 Args: timeout: ``>0`` 时对每个 actor 走 ``wait_for_actor`` 等待就绪(启动期用); 缺省 ``0`` 保持原"快速失败"语义——actor 不在立即抛异常。 interval: 等待轮询间隔,仅在 ``timeout>0`` 时生效。 """ actor_list = ActorList() for actor_name in actor_names: if timeout > 0: handle = wait_for_actor( actor_name, timeout=timeout, interval=interval ) else: handle = _get_cached_actor_handle(actor_name) setattr(actor_list, actor_name, handle) return actor_list