diff --git a/kilostar/api/__init__.py b/kilostar/api/__init__.py index 8bf6ca1..cc9acef 100644 --- a/kilostar/api/__init__.py +++ b/kilostar/api/__init__.py @@ -19,7 +19,10 @@ from fastapi import FastAPI, WebSocket, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles -from ray import serve + +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" +if not _STANDALONE: + from ray import serve from .agent import agent_router from .auth import auth_router @@ -176,10 +179,11 @@ else: ) -@serve.deployment -@serve.ingress(app) -class KiloStarGateway: - gateway: Dict[str, WebSocket] +if not _STANDALONE: + @serve.deployment + @serve.ingress(app) + class KiloStarGateway: + gateway: Dict[str, WebSocket] - def __init__(self): - self.gateway = {} + def __init__(self): + self.gateway = {} diff --git a/kilostar/core/global_state_machine/global_state_machine.py b/kilostar/core/global_state_machine/global_state_machine.py index 5c5df8e..9410dc7 100644 --- a/kilostar/core/global_state_machine/global_state_machine.py +++ b/kilostar/core/global_state_machine/global_state_machine.py @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ray +import os from typing import Any, Dict, List, Optional, Tuple +from kilostar.utils.standalone_proxy import actor_class + +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" +if not _STANDALONE: + import ray from kilostar.core.global_state_machine.individual_manager import ( GlobalIndividualManager, @@ -25,7 +30,7 @@ from kilostar.core.global_state_machine.gsm_snapshot import GSMSnapshot from kilostar.core.postgres_database import PostgresDatabase -@ray.remote +@actor_class class GlobalStateMachine: """全局状态机 Actor,统一持有 Provider/Tool/Skill/Individual/MCP/CustomToolset 注册表。 @@ -44,10 +49,9 @@ class GlobalStateMachine: self._tool_configs: Dict[str, Dict[str, Any]] = {} self._custom_toolsets: Dict[str, Dict[str, Any]] = {} - # 配置快照与版本号:每次写入 → version+=1 → ray.put 新 snapshot - # 读端通过 current_config_ref 拿 ref 后用 ray.get 直读,绕开 actor 单线程瓶颈 + # 配置快照与版本号:每次写入 → version+=1 → 发布新 snapshot self._config_version: int = 0 - self._current_ref: Optional[ray.ObjectRef] = None + self._current_ref = None self.postgres_database = postgres_database @@ -113,19 +117,19 @@ class GlobalStateMachine: ) def _publish_snapshot(self) -> None: - """版本号 +1 并把当前状态 put 到 Ray Object Store。 - - 旧 ref 会因为引用计数归零而进入回收队列;正在执行的 task 已经把 ref - 拷贝到了自己的进程,dec 不会影响它们的读取。 - """ + """版本号 +1 并发布当前状态快照。""" self._config_version += 1 - self._current_ref = ray.put(self._build_snapshot()) + snapshot = self._build_snapshot() + if _STANDALONE: + self._current_ref = snapshot + else: + self._current_ref = ray.put(snapshot) - async def current_config_ref(self) -> Tuple[int, ray.ObjectRef]: - """返回 ``(version, ObjectRef)``,调用方拿了 ref 后用 ``ray.get`` 自取。 + async def current_config_ref(self) -> Tuple[int, Any]: + """返回 ``(version, ObjectRef 或 snapshot)``。 - **不要**直接返回 snapshot 对象 —— 那样会走 actor RPC 反序列化,丧失 - object store 的共享内存优势。返回 ref 才能让调用方在自己进程里 ray.get。 + 分布式模式返回 ObjectRef,调用方用 ``ray.get`` 自取; + 单机模式直接返回 snapshot 对象。 """ if self._current_ref is None: self._publish_snapshot() diff --git a/kilostar/core/global_state_machine/gsm_snapshot.py b/kilostar/core/global_state_machine/gsm_snapshot.py index 46ce129..2cd02ec 100644 --- a/kilostar/core/global_state_machine/gsm_snapshot.py +++ b/kilostar/core/global_state_machine/gsm_snapshot.py @@ -30,10 +30,13 @@ GSM 仍然是 source of truth + 写入串行化器,但读路径解耦: from __future__ import annotations import asyncio +import os from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple -import ray +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" +if not _STANDALONE: + import ray from kilostar.core.global_state_machine.model_provider.base_provider import Provider from kilostar.utils.logger import get_logger @@ -113,14 +116,19 @@ async def fetch_snapshot( ): return _local_cache["snapshot"] - version, ref = await gsm_actor.current_config_ref.remote() - snapshot = ray.get(ref) + version, ref_or_snapshot = await gsm_actor.current_config_ref.remote() + if _STANDALONE: + snapshot = ref_or_snapshot + else: + snapshot = ray.get(ref_or_snapshot) _local_cache["version"] = version _local_cache["snapshot"] = snapshot return snapshot - version, ref = await gsm_actor.current_config_ref.remote() - return ray.get(ref) + version, ref_or_snapshot = await gsm_actor.current_config_ref.remote() + if _STANDALONE: + return ref_or_snapshot + return ray.get(ref_or_snapshot) def reset_local_cache() -> None: diff --git a/kilostar/core/global_workflow_manager/global_workflow_manager.py b/kilostar/core/global_workflow_manager/global_workflow_manager.py index 29c7c0f..6d4bc8c 100644 --- a/kilostar/core/global_workflow_manager/global_workflow_manager.py +++ b/kilostar/core/global_workflow_manager/global_workflow_manager.py @@ -1,6 +1,6 @@ -import ray import asyncio from typing import Dict +from kilostar.utils.standalone_proxy import actor_class from kilostar.utils.ray_hook import ray_actor_hook from kilostar.utils.logger import get_logger @@ -11,7 +11,7 @@ class TraceQueues: self.receive: asyncio.Queue[str] = asyncio.Queue() -@ray.remote +@actor_class class GlobalWorkflowManager: def __init__(self): self._traces: Dict[str, TraceQueues] = {} diff --git a/kilostar/core/individual/consciousness_node/consciousness_node.py b/kilostar/core/individual/consciousness_node/consciousness_node.py index 30c196a..a38b73d 100644 --- a/kilostar/core/individual/consciousness_node/consciousness_node.py +++ b/kilostar/core/individual/consciousness_node/consciousness_node.py @@ -13,8 +13,8 @@ # limitations under the License. -import ray from typing import Union, overload +from kilostar.utils.standalone_proxy import actor_class from kilostar.core.individual.consciousness_node.template import ( ConsciousnessNodeDeps, ForregulatoryNode, @@ -32,7 +32,7 @@ from kilostar.utils.ray_hook import ray_actor_hook from kilostar.utils.i18n import agent_prompt -@ray.remote +@actor_class class ConsciousnessNode: def __init__(self) -> None: from kilostar.utils.logger import get_logger diff --git a/kilostar/core/individual/control_node/control_node.py b/kilostar/core/individual/control_node/control_node.py index c1472f1..88b7d46 100644 --- a/kilostar/core/individual/control_node/control_node.py +++ b/kilostar/core/individual/control_node/control_node.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ray from pydantic_ai import Agent, RunContext +from kilostar.utils.standalone_proxy import actor_class from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine from kilostar.core.global_state_machine.model_provider.base_provider import Provider from kilostar.adapter.model_adapter.agent_factory import AgentFactory @@ -25,7 +25,7 @@ from kilostar.core.individual.control_node.template import ( from kilostar.utils.i18n import agent_prompt -@ray.remote +@actor_class class ControlNode: """ControlNode(控制节点):工作流中具体子任务的执行 Actor。 diff --git a/kilostar/core/individual/regulatory_node/regulatory_node.py b/kilostar/core/individual/regulatory_node/regulatory_node.py index f6caa38..bd84b9d 100644 --- a/kilostar/core/individual/regulatory_node/regulatory_node.py +++ b/kilostar/core/individual/regulatory_node/regulatory_node.py @@ -13,8 +13,8 @@ # limitations under the License. import datetime -import ray from typing import Union +from kilostar.utils.standalone_proxy import actor_class from kilostar.adapter.model_adapter.agent_factory import AgentFactory from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine from kilostar.core.global_state_machine.model_provider import Provider @@ -27,7 +27,7 @@ from pydantic_ai import RunContext, Agent from kilostar.utils.i18n import agent_prompt -@ray.remote +@actor_class class RegulatoryNode: """RegulatoryNode(监管节点):用户请求的入口路由 Actor。 diff --git a/kilostar/core/postgres_database/postgres.py b/kilostar/core/postgres_database/postgres.py index efdcab4..91ae64f 100644 --- a/kilostar/core/postgres_database/postgres.py +++ b/kilostar/core/postgres_database/postgres.py @@ -15,7 +15,7 @@ import os import asyncio -import ray +from kilostar.utils.standalone_proxy import actor_class from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from kilostar.core.postgres_database.model.base import BaseDataModel @@ -55,7 +55,7 @@ from .module.custom_toolset import CustomToolsetDatabase from .module.system_event_log import SystemEventLogDatabase -@ray.remote +@actor_class class PostgresDatabase: """以 Ray Actor 形式暴露的统一数据库门面。 diff --git a/kilostar/core/work/workflow/workflow_engine.py b/kilostar/core/work/workflow/workflow_engine.py index ba443ca..7088131 100644 --- a/kilostar/core/work/workflow/workflow_engine.py +++ b/kilostar/core/work/workflow/workflow_engine.py @@ -36,7 +36,7 @@ import datetime from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, List, Optional -import ray +from kilostar.utils.standalone_proxy import remote_task, _STANDALONE from pydantic import BaseModel, Field from pydantic_graph import BaseNode, End, Graph, GraphRunContext from pydantic_graph.persistence import BaseStatePersistence @@ -519,7 +519,7 @@ async def resume_workflow_graph( return final_output -@ray.remote +@remote_task def run_workflow_task( workflow_data: dict, trace_id: str, resume_only: bool = False ): @@ -575,4 +575,7 @@ def run_workflow_task( workflow_data, trace_id, persistence=persistence ) - asyncio.run(_entry()) + if _STANDALONE: + return _entry() + else: + asyncio.run(_entry()) diff --git a/kilostar/utils/ray_hook.py b/kilostar/utils/ray_hook.py index 19d9f74..3096ebd 100644 --- a/kilostar/utils/ray_hook.py +++ b/kilostar/utils/ray_hook.py @@ -11,9 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import time -import ray from functools import lru_cache +from typing import Any, Dict + +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" + +if not _STANDALONE: + import ray class ActorList: @@ -37,62 +43,80 @@ class ActorList: raise AttributeError(f"ActorList对象没有属性 '{key}'") -@lru_cache(maxsize=128) -def _get_cached_actor_handle(actor_name: str): - """缓存接口""" - return ray.get_actor(actor_name, namespace="kilostar") +# ─── Standalone Registry ─── + +_standalone_registry: Dict[str, Any] = {} -def clear_actor_cache(): - """清理接口""" - _get_cached_actor_handle.cache_clear() +def register_standalone(name: str, instance: Any) -> None: + """注册一个单机模式下的 Actor 单例(已包装为 StandaloneProxy)。""" + from kilostar.utils.standalone_proxy import StandaloneProxy + + _standalone_registry[name] = StandaloneProxy(instance) -def wait_for_actor( - actor_name: str, *, timeout: float = 10.0, interval: float = 0.5 -): - """阻塞等待某个 actor 就绪,返回其句柄。 +# ─── Distributed Mode Helpers ─── - 用于"启动期 / ray task 入口刚拉起"这类场景——被依赖的 actor 可能还没注册。 - 在 ``timeout`` 内按 ``interval`` 轮询 ``ray.get_actor``;拿到就立即返回, - 超时则抛带清晰上下文的 ``TimeoutError``(而不是裸 ``ValueError``)。 - Args: - actor_name: actor 注册名 - timeout: 最长等待秒数;``<=0`` 表示只试一次(等价于直接取句柄) - interval: 轮询间隔秒数 +if not _STANDALONE: - Raises: - TimeoutError: 超时仍未就绪。原始异常通过 ``raise ... from`` 链保留。 - """ - deadline = time.monotonic() + max(timeout, 0.0) - last_err: Exception | None = None - while True: - try: - return _get_cached_actor_handle(actor_name) - except Exception as e: # ray.get_actor 失败一般是 ValueError - last_err = e - # 失败不能让 lru_cache 留下脏数据(异常本身不会被缓存, - # 但若底层换实现,这里清一次更稳妥) - if time.monotonic() >= deadline: - raise TimeoutError( - f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}" - ) from last_err - time.sleep(interval) + @lru_cache(maxsize=128) + def _get_cached_actor_handle(actor_name: str): + """缓存接口""" + return ray.get_actor(actor_name, namespace="kilostar") + + def clear_actor_cache(): + """清理接口""" + _get_cached_actor_handle.cache_clear() + + def wait_for_actor( + actor_name: str, *, timeout: float = 10.0, interval: float = 0.5 + ): + """阻塞等待某个 actor 就绪,返回其句柄。""" + deadline = time.monotonic() + max(timeout, 0.0) + last_err: Exception | None = None + while True: + try: + return _get_cached_actor_handle(actor_name) + except Exception as e: + last_err = e + if time.monotonic() >= deadline: + raise TimeoutError( + f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}" + ) from last_err + time.sleep(interval) + +else: + + def _get_cached_actor_handle(actor_name: str): + raise RuntimeError("单机模式下不应调用 _get_cached_actor_handle") + + def clear_actor_cache(): + pass + + def wait_for_actor(actor_name: str, **kwargs): + raise RuntimeError("单机模式下不应调用 wait_for_actor") + + +# ─── 统一入口 ─── def ray_actor_hook(*actor_names: str, timeout: float = 0.0, interval: float = 0.5): - """按名字批量取出 Ray Actor 句柄,组装成一个 ``ActorList`` 返回。 + """按名字批量取出 Actor 句柄,组装成一个 ActorList 返回。 - 例:``actors = ray_actor_hook("postgres_database", "global_state_machine")``, - 随后即可用 ``actors.postgres_database`` 拿到对应句柄。 - - Args: - timeout: ``>0`` 时对每个 actor 走 ``wait_for_actor`` 等待就绪(启动期用); - 缺省 ``0`` 保持原"快速失败"语义——actor 不在立即抛异常。 - interval: 等待轮询间隔,仅在 ``timeout>0`` 时生效。 + 单机模式从 _standalone_registry 取,分布式模式走 ray.get_actor。 """ actor_list = ActorList() + + if _STANDALONE: + for name in actor_names: + if name not in _standalone_registry: + raise ValueError( + f"Standalone registry: actor {name!r} not registered" + ) + setattr(actor_list, name, _standalone_registry[name]) + return actor_list + for actor_name in actor_names: if timeout > 0: handle = wait_for_actor( diff --git a/kilostar/utils/standalone_proxy.py b/kilostar/utils/standalone_proxy.py new file mode 100644 index 0000000..3034587 --- /dev/null +++ b/kilostar/utils/standalone_proxy.py @@ -0,0 +1,86 @@ +"""KiloStar 单机模式适配层:用 asyncio 协程模拟 Ray Actor 接口。 + +单机模式下,所有 Actor 退化为普通 Python 异步单例,通过 StandaloneProxy +包装后暴露与 Ray Actor Handle 相同的 `.method.remote(args)` 调用接口, +使上层代码在两种模式间无感切换。 +""" + +from __future__ import annotations + +import asyncio +import os +from typing import Any + +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" + + +class _MethodProxy: + """包装单个方法,使 .remote(*args, **kwargs) 返回一个可 await 的 Task。""" + + __slots__ = ("_method",) + + def __init__(self, method: Any): + self._method = method + + def remote(self, *args: Any, **kwargs: Any) -> asyncio.Task: + async def _invoke(): + result = self._method(*args, **kwargs) + if asyncio.iscoroutine(result): + return await result + return result + + return asyncio.ensure_future(_invoke()) + + +class StandaloneProxy: + """包装一个普通 Python 实例,模拟 Ray Actor Handle 的属性访问接口。 + + 用法:proxy.some_method.remote(x, y) → 等效于 await instance.some_method(x, y) + """ + + __slots__ = ("_instance",) + + def __init__(self, instance: Any): + object.__setattr__(self, "_instance", instance) + + def __getattr__(self, name: str) -> _MethodProxy: + attr = getattr(object.__getattribute__(self, "_instance"), name) + if callable(attr): + return _MethodProxy(attr) + return attr + + +# ─── 条件装饰器 ─── + + +def actor_class(cls): + """条件装饰器:分布式模式 → @ray.remote,单机模式 → 原样返回类。""" + if _STANDALONE: + return cls + import ray + return ray.remote(cls) + + +def remote_task(func): + """条件装饰器:分布式 → @ray.remote(func),单机 → .remote() 转为 asyncio task。 + + 单机模式下返回一个 stub 对象,其 .remote() 方法把函数以协程方式调度到 + 当前事件循环(workflow task 需要用 await 版本的 _entry,由调用方处理)。 + """ + if _STANDALONE: + + class _TaskProxy: + @staticmethod + def remote(*args, **kwargs): + async def _run(): + result = func(*args, **kwargs) + if asyncio.iscoroutine(result): + return await result + return result + + return asyncio.ensure_future(_run()) + + return _TaskProxy() + + import ray + return ray.remote(func) diff --git a/kilostar/worker_cluster/worker_cluster.py b/kilostar/worker_cluster/worker_cluster.py index 8d5d23e..62db4fc 100644 --- a/kilostar/worker_cluster/worker_cluster.py +++ b/kilostar/worker_cluster/worker_cluster.py @@ -12,12 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ray +import os import time import asyncio from collections import OrderedDict -from ray.util.queue import Queue +from kilostar.utils.standalone_proxy import actor_class from kilostar.utils.ray_hook import ray_actor_hook + +_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone" +if _STANDALONE: + from asyncio import Queue +else: + from ray.util.queue import Queue from kilostar.worker_individual.base_individual import BaseIndividual from kilostar.worker_individual.skill_individual import SkillIndividual from kilostar.worker_individual.ordinary_individual import OrdinaryIndividual @@ -27,7 +33,7 @@ from kilostar.worker_individual.special_individual import SpecialIndividual from kilostar.utils.logger import get_logger -@ray.remote +@actor_class class WorkerCluster: """ 工作集群 Actor:管理和调度所有的 worker_individual diff --git a/main.py b/main.py index 0061f44..34f0019 100644 --- a/main.py +++ b/main.py @@ -31,8 +31,8 @@ except Exception as e: sys.exit(1) import asyncio -import ray -from ray import serve + +KILOSTAR_MODE = os.environ.get("KILOSTAR_MODE", "distributed") from kilostar.worker_cluster import WorkerCluster from kilostar.utils.banner import print_banner @@ -42,10 +42,53 @@ from kilostar.core.global_workflow_manager import GlobalWorkflowManager from kilostar.core.individual.regulatory_node import RegulatoryNode from kilostar.core.individual.consciousness_node import ConsciousnessNode from kilostar.core.individual.control_node import ControlNode -from kilostar.api import KiloStarGateway + +if KILOSTAR_MODE != "standalone": + import ray + from ray import serve + from kilostar.api import KiloStarGateway -async def start_system(): +async def start_standalone(): + """单机模式:纯 asyncio,不依赖 Ray。""" + import uvicorn + from kilostar.utils.ray_hook import register_standalone + from kilostar.api import app + + postgres_database = PostgresDatabase() + await postgres_database.init_db() + register_standalone("postgres_database", postgres_database) + + global_state_machine = GlobalStateMachine(postgres_database) + await global_state_machine.init_state_machine() + register_standalone("global_state_machine", global_state_machine) + + global_workflow_manager = GlobalWorkflowManager() + await global_workflow_manager.init_manager() + register_standalone("global_workflow_manager", global_workflow_manager) + + regulatory_node = RegulatoryNode() + register_standalone("regulatory_node", regulatory_node) + + consciousness_node = ConsciousnessNode() + register_standalone("consciousness_node", consciousness_node) + + control_node = ControlNode() + register_standalone("control_node", control_node) + + worker_cluster = WorkerCluster() + await worker_cluster.start() + register_standalone("worker_cluster", worker_cluster) + + print(f"✅ KiloStar 单机模式启动完成,监听 0.0.0.0:8000") + + config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") + server = uvicorn.Server(config) + await server.serve() + + +async def start_distributed(): + """分布式模式:使用 Ray Actor + Ray Serve。""" env_vars = { "POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"), "POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", ""), @@ -63,8 +106,9 @@ async def start_system(): runtime_env={"env_vars": env_vars}, ) - # 2. 启动数据库组件 - postgres_database = PostgresDatabase.options(name="postgres_database").remote() + postgres_database = PostgresDatabase.options( + name="postgres_database" + ).remote() await postgres_database.init_db.remote() global_state_machine = GlobalStateMachine.options( @@ -73,55 +117,52 @@ async def start_system(): print("正在等待 GlobalStateMachine 初始化并加载注册表...") try: - # 强制执行初始化方法并阻塞等待结果。 - # 如果 __init__ 或 init_state_machine 中有任何报错,会立刻在这里抛出! await global_state_machine.init_state_machine.remote() print("GlobalStateMachine 初始化成功!") except Exception as e: - print(f"\n[致命错误] GlobalStateMachine 启动失败!真实报错如下:\n{e}\n") + print(f"\n[致命错误] GlobalStateMachine 启动失败!\n{e}\n") return global_workflow_manager = GlobalWorkflowManager.options( name="global_workflow_manager", namespace="kilostar", lifetime="detached" ).remote() - # 4. 启动核心节点 - regulatory_node = RegulatoryNode.options(name="regulatory_node").remote() - consciousness_node = ConsciousnessNode.options(name="consciousness_node").remote() - control_node = ControlNode.options(name="control_node").remote() + RegulatoryNode.options(name="regulatory_node").remote() + ConsciousnessNode.options(name="consciousness_node").remote() + ControlNode.options(name="control_node").remote() try: WorkerCluster.options( - name="worker_cluster", - lifetime="detached", # 保证它在后台一直运行 + name="worker_cluster", lifetime="detached" ).remote() print("✅ WorkerCluster 已成功启动并注册!") except ValueError: print("WorkerCluster 已经存在。") - # 工作流以一次性 ray task 形式由 ConsciousnessNode 直接 fire,不再需要常驻 engine actor。 - print("正在等待 GlobalWorkflowManager 初始化与恢复工作流...") try: await global_workflow_manager.init_manager.remote() print("GlobalWorkflowManager 初始化成功!") except Exception as e: - print(f"\n[致命错误] GlobalWorkflowManager 启动失败!真实报错如下:\n{e}\n") + print(f"\n[致命错误] GlobalWorkflowManager 启动失败!\n{e}\n") return - # 6. 启动 FastAPI 网关 (使用 Ray Serve) serve.start(http_options={"host": "0.0.0.0", "port": 8000}) serve.run(KiloStarGateway.bind()) - # 挂起主线程以保持系统运行 while True: await asyncio.sleep(3600) def main(): print_banner() + mode = KILOSTAR_MODE + print(f"启动模式: {mode}") try: - asyncio.run(start_system()) + if mode == "standalone": + asyncio.run(start_standalone()) + else: + asyncio.run(start_distributed()) except KeyboardInterrupt: print("系统已退出。") diff --git a/tests/unit/test_standalone_proxy.py b/tests/unit/test_standalone_proxy.py new file mode 100644 index 0000000..7cc92da --- /dev/null +++ b/tests/unit/test_standalone_proxy.py @@ -0,0 +1,106 @@ +"""standalone_proxy 适配层单元测试。 + +验证 StandaloneProxy / _MethodProxy / actor_class / remote_task +在单机模式下的行为是否正确模拟了 Ray Actor Handle 的 .remote() 接口。 +""" + +import asyncio +import pytest + +from kilostar.utils import standalone_proxy +from kilostar.utils.standalone_proxy import StandaloneProxy, _MethodProxy + + +class TestMethodProxy: + def test_sync_method(self): + def add(a, b): + return a + b + + proxy = _MethodProxy(add) + result = asyncio.get_event_loop().run_until_complete(proxy.remote(2, 3)) + assert result == 5 + + def test_async_method(self): + async def async_add(a, b): + return a + b + + proxy = _MethodProxy(async_add) + result = asyncio.get_event_loop().run_until_complete(proxy.remote(4, 6)) + assert result == 10 + + +class TestStandaloneProxy: + def test_method_call(self): + class FakeActor: + def greet(self, name): + return f"hello {name}" + + proxy = StandaloneProxy(FakeActor()) + future = proxy.greet.remote("world") + result = asyncio.get_event_loop().run_until_complete(future) + assert result == "hello world" + + def test_async_method_call(self): + class FakeActor: + async def compute(self, x): + return x * 2 + + proxy = StandaloneProxy(FakeActor()) + future = proxy.compute.remote(7) + result = asyncio.get_event_loop().run_until_complete(future) + assert result == 14 + + def test_attribute_access(self): + class FakeActor: + def __init__(self): + self.name = "test" + + proxy = StandaloneProxy(FakeActor()) + assert proxy.name == "test" + + +class TestActorClass: + def test_standalone_returns_class_unchanged(self, monkeypatch): + monkeypatch.setattr(standalone_proxy, "_STANDALONE", True) + + @standalone_proxy.actor_class + class MyActor: + def do_work(self): + return 42 + + instance = MyActor() + assert instance.do_work() == 42 + + def test_standalone_class_is_plain_python(self, monkeypatch): + monkeypatch.setattr(standalone_proxy, "_STANDALONE", True) + + @standalone_proxy.actor_class + class MyActor: + pass + + assert not hasattr(MyActor, "remote") + assert not hasattr(MyActor, "options") + + +class TestRemoteTask: + def test_sync_task(self, monkeypatch): + monkeypatch.setattr(standalone_proxy, "_STANDALONE", True) + + @standalone_proxy.remote_task + def multiply(a, b): + return a * b + + future = multiply.remote(3, 4) + result = asyncio.get_event_loop().run_until_complete(future) + assert result == 12 + + def test_async_task(self, monkeypatch): + monkeypatch.setattr(standalone_proxy, "_STANDALONE", True) + + @standalone_proxy.remote_task + async def async_multiply(a, b): + return a * b + + future = async_multiply.remote(5, 6) + result = asyncio.get_event_loop().run_until_complete(future) + assert result == 30