feat(standalone): 新增单机模式,KILOSTAR_MODE=standalone 时去掉 Ray 依赖

通过 StandaloneProxy 适配层让 .remote() 调用在单机模式下透明降级为
asyncio 协程调用,7 个 Actor 和 workflow task 均可在纯 asyncio 环境运行,
启动快、资源占用低。分布式模式行为完全不变。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-03 15:52:41 +00:00
parent 76a67e8237
commit 457d12834f
14 changed files with 390 additions and 108 deletions
+8 -4
View File
@@ -19,7 +19,10 @@ from fastapi import FastAPI, WebSocket, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from ray import serve
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
if not _STANDALONE:
from ray import serve
from .agent import agent_router from .agent import agent_router
from .auth import auth_router from .auth import auth_router
@@ -176,9 +179,10 @@ else:
) )
@serve.deployment if not _STANDALONE:
@serve.ingress(app) @serve.deployment
class KiloStarGateway: @serve.ingress(app)
class KiloStarGateway:
gateway: Dict[str, WebSocket] gateway: Dict[str, WebSocket]
def __init__(self): def __init__(self):
@@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ray import os
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from kilostar.utils.standalone_proxy import actor_class
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
if not _STANDALONE:
import ray
from kilostar.core.global_state_machine.individual_manager import ( from kilostar.core.global_state_machine.individual_manager import (
GlobalIndividualManager, GlobalIndividualManager,
@@ -25,7 +30,7 @@ from kilostar.core.global_state_machine.gsm_snapshot import GSMSnapshot
from kilostar.core.postgres_database import PostgresDatabase from kilostar.core.postgres_database import PostgresDatabase
@ray.remote @actor_class
class GlobalStateMachine: class GlobalStateMachine:
"""全局状态机 Actor,统一持有 Provider/Tool/Skill/Individual/MCP/CustomToolset 注册表。 """全局状态机 Actor,统一持有 Provider/Tool/Skill/Individual/MCP/CustomToolset 注册表。
@@ -44,10 +49,9 @@ class GlobalStateMachine:
self._tool_configs: Dict[str, Dict[str, Any]] = {} self._tool_configs: Dict[str, Dict[str, Any]] = {}
self._custom_toolsets: Dict[str, Dict[str, Any]] = {} self._custom_toolsets: Dict[str, Dict[str, Any]] = {}
# 配置快照与版本号:每次写入 → version+=1 → ray.put 新 snapshot # 配置快照与版本号:每次写入 → version+=1 → 发布新 snapshot
# 读端通过 current_config_ref 拿 ref 后用 ray.get 直读,绕开 actor 单线程瓶颈
self._config_version: int = 0 self._config_version: int = 0
self._current_ref: Optional[ray.ObjectRef] = None self._current_ref = None
self.postgres_database = postgres_database self.postgres_database = postgres_database
@@ -113,19 +117,19 @@ class GlobalStateMachine:
) )
def _publish_snapshot(self) -> None: def _publish_snapshot(self) -> None:
"""版本号 +1 并当前状态 put 到 Ray Object Store。 """版本号 +1 并发布当前状态快照。"""
旧 ref 会因为引用计数归零而进入回收队列;正在执行的 task 已经把 ref
拷贝到了自己的进程,dec 不会影响它们的读取。
"""
self._config_version += 1 self._config_version += 1
self._current_ref = ray.put(self._build_snapshot()) snapshot = self._build_snapshot()
if _STANDALONE:
self._current_ref = snapshot
else:
self._current_ref = ray.put(snapshot)
async def current_config_ref(self) -> Tuple[int, ray.ObjectRef]: async def current_config_ref(self) -> Tuple[int, Any]:
"""返回 ``(version, ObjectRef)``,调用方拿了 ref 后用 ``ray.get`` 自取 """返回 ``(version, ObjectRef 或 snapshot)``
**不要**直接返回 snapshot 对象 —— 那样会走 actor RPC 反序列化,丧失 分布式模式返回 ObjectRef,调用方用 ``ray.get`` 自取;
object store 的共享内存优势。返回 ref 才能让调用方在自己进程里 ray.get 单机模式直接返回 snapshot 对象
""" """
if self._current_ref is None: if self._current_ref is None:
self._publish_snapshot() self._publish_snapshot()
@@ -30,10 +30,13 @@ GSM 仍然是 source of truth + 写入串行化器,但读路径解耦:
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple from typing import Any, Callable, Dict, List, Optional, Tuple
import ray _STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
if not _STANDALONE:
import ray
from kilostar.core.global_state_machine.model_provider.base_provider import Provider from kilostar.core.global_state_machine.model_provider.base_provider import Provider
from kilostar.utils.logger import get_logger from kilostar.utils.logger import get_logger
@@ -113,14 +116,19 @@ async def fetch_snapshot(
): ):
return _local_cache["snapshot"] return _local_cache["snapshot"]
version, ref = await gsm_actor.current_config_ref.remote() version, ref_or_snapshot = await gsm_actor.current_config_ref.remote()
snapshot = ray.get(ref) if _STANDALONE:
snapshot = ref_or_snapshot
else:
snapshot = ray.get(ref_or_snapshot)
_local_cache["version"] = version _local_cache["version"] = version
_local_cache["snapshot"] = snapshot _local_cache["snapshot"] = snapshot
return snapshot return snapshot
version, ref = await gsm_actor.current_config_ref.remote() version, ref_or_snapshot = await gsm_actor.current_config_ref.remote()
return ray.get(ref) if _STANDALONE:
return ref_or_snapshot
return ray.get(ref_or_snapshot)
def reset_local_cache() -> None: def reset_local_cache() -> None:
@@ -1,6 +1,6 @@
import ray
import asyncio import asyncio
from typing import Dict from typing import Dict
from kilostar.utils.standalone_proxy import actor_class
from kilostar.utils.ray_hook import ray_actor_hook from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.logger import get_logger from kilostar.utils.logger import get_logger
@@ -11,7 +11,7 @@ class TraceQueues:
self.receive: asyncio.Queue[str] = asyncio.Queue() self.receive: asyncio.Queue[str] = asyncio.Queue()
@ray.remote @actor_class
class GlobalWorkflowManager: class GlobalWorkflowManager:
def __init__(self): def __init__(self):
self._traces: Dict[str, TraceQueues] = {} self._traces: Dict[str, TraceQueues] = {}
@@ -13,8 +13,8 @@
# limitations under the License. # limitations under the License.
import ray
from typing import Union, overload from typing import Union, overload
from kilostar.utils.standalone_proxy import actor_class
from kilostar.core.individual.consciousness_node.template import ( from kilostar.core.individual.consciousness_node.template import (
ConsciousnessNodeDeps, ConsciousnessNodeDeps,
ForregulatoryNode, ForregulatoryNode,
@@ -32,7 +32,7 @@ from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.i18n import agent_prompt from kilostar.utils.i18n import agent_prompt
@ray.remote @actor_class
class ConsciousnessNode: class ConsciousnessNode:
def __init__(self) -> None: def __init__(self) -> None:
from kilostar.utils.logger import get_logger from kilostar.utils.logger import get_logger
@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ray
from pydantic_ai import Agent, RunContext from pydantic_ai import Agent, RunContext
from kilostar.utils.standalone_proxy import actor_class
from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine
from kilostar.core.global_state_machine.model_provider.base_provider import Provider from kilostar.core.global_state_machine.model_provider.base_provider import Provider
from kilostar.adapter.model_adapter.agent_factory import AgentFactory from kilostar.adapter.model_adapter.agent_factory import AgentFactory
@@ -25,7 +25,7 @@ from kilostar.core.individual.control_node.template import (
from kilostar.utils.i18n import agent_prompt from kilostar.utils.i18n import agent_prompt
@ray.remote @actor_class
class ControlNode: class ControlNode:
"""ControlNode(控制节点):工作流中具体子任务的执行 Actor。 """ControlNode(控制节点):工作流中具体子任务的执行 Actor。
@@ -13,8 +13,8 @@
# limitations under the License. # limitations under the License.
import datetime import datetime
import ray
from typing import Union from typing import Union
from kilostar.utils.standalone_proxy import actor_class
from kilostar.adapter.model_adapter.agent_factory import AgentFactory from kilostar.adapter.model_adapter.agent_factory import AgentFactory
from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine
from kilostar.core.global_state_machine.model_provider import Provider from kilostar.core.global_state_machine.model_provider import Provider
@@ -27,7 +27,7 @@ from pydantic_ai import RunContext, Agent
from kilostar.utils.i18n import agent_prompt from kilostar.utils.i18n import agent_prompt
@ray.remote @actor_class
class RegulatoryNode: class RegulatoryNode:
"""RegulatoryNode(监管节点):用户请求的入口路由 Actor。 """RegulatoryNode(监管节点):用户请求的入口路由 Actor。
+2 -2
View File
@@ -15,7 +15,7 @@
import os import os
import asyncio import asyncio
import ray from kilostar.utils.standalone_proxy import actor_class
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from kilostar.core.postgres_database.model.base import BaseDataModel from kilostar.core.postgres_database.model.base import BaseDataModel
@@ -55,7 +55,7 @@ from .module.custom_toolset import CustomToolsetDatabase
from .module.system_event_log import SystemEventLogDatabase from .module.system_event_log import SystemEventLogDatabase
@ray.remote @actor_class
class PostgresDatabase: class PostgresDatabase:
"""以 Ray Actor 形式暴露的统一数据库门面。 """以 Ray Actor 形式暴露的统一数据库门面。
@@ -36,7 +36,7 @@ import datetime
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, List, Optional from typing import Any, Awaitable, Callable, Dict, List, Optional
import ray from kilostar.utils.standalone_proxy import remote_task, _STANDALONE
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from pydantic_graph.persistence import BaseStatePersistence from pydantic_graph.persistence import BaseStatePersistence
@@ -519,7 +519,7 @@ async def resume_workflow_graph(
return final_output return final_output
@ray.remote @remote_task
def run_workflow_task( def run_workflow_task(
workflow_data: dict, trace_id: str, resume_only: bool = False workflow_data: dict, trace_id: str, resume_only: bool = False
): ):
@@ -575,4 +575,7 @@ def run_workflow_task(
workflow_data, trace_id, persistence=persistence workflow_data, trace_id, persistence=persistence
) )
if _STANDALONE:
return _entry()
else:
asyncio.run(_entry()) asyncio.run(_entry())
+57 -33
View File
@@ -11,9 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import time import time
import ray
from functools import lru_cache from functools import lru_cache
from typing import Any, Dict
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
if not _STANDALONE:
import ray
class ActorList: class ActorList:
@@ -37,62 +43,80 @@ class ActorList:
raise AttributeError(f"ActorList对象没有属性 '{key}'") raise AttributeError(f"ActorList对象没有属性 '{key}'")
@lru_cache(maxsize=128) # ─── Standalone Registry ───
def _get_cached_actor_handle(actor_name: str):
_standalone_registry: Dict[str, Any] = {}
def register_standalone(name: str, instance: Any) -> None:
"""注册一个单机模式下的 Actor 单例(已包装为 StandaloneProxy)。"""
from kilostar.utils.standalone_proxy import StandaloneProxy
_standalone_registry[name] = StandaloneProxy(instance)
# ─── Distributed Mode Helpers ───
if not _STANDALONE:
@lru_cache(maxsize=128)
def _get_cached_actor_handle(actor_name: str):
"""缓存接口""" """缓存接口"""
return ray.get_actor(actor_name, namespace="kilostar") return ray.get_actor(actor_name, namespace="kilostar")
def clear_actor_cache():
def clear_actor_cache():
"""清理接口""" """清理接口"""
_get_cached_actor_handle.cache_clear() _get_cached_actor_handle.cache_clear()
def wait_for_actor(
def wait_for_actor(
actor_name: str, *, timeout: float = 10.0, interval: float = 0.5 actor_name: str, *, timeout: float = 10.0, interval: float = 0.5
): ):
"""阻塞等待某个 actor 就绪,返回其句柄。 """阻塞等待某个 actor 就绪,返回其句柄。"""
用于"启动期 / ray task 入口刚拉起"这类场景——被依赖的 actor 可能还没注册。
在 ``timeout`` 内按 ``interval`` 轮询 ``ray.get_actor``;拿到就立即返回,
超时则抛带清晰上下文的 ``TimeoutError``(而不是裸 ``ValueError``)。
Args:
actor_name: actor 注册名
timeout: 最长等待秒数;``<=0`` 表示只试一次(等价于直接取句柄)
interval: 轮询间隔秒数
Raises:
TimeoutError: 超时仍未就绪。原始异常通过 ``raise ... from`` 链保留。
"""
deadline = time.monotonic() + max(timeout, 0.0) deadline = time.monotonic() + max(timeout, 0.0)
last_err: Exception | None = None last_err: Exception | None = None
while True: while True:
try: try:
return _get_cached_actor_handle(actor_name) return _get_cached_actor_handle(actor_name)
except Exception as e: # ray.get_actor 失败一般是 ValueError except Exception as e:
last_err = e last_err = e
# 失败不能让 lru_cache 留下脏数据(异常本身不会被缓存,
# 但若底层换实现,这里清一次更稳妥)
if time.monotonic() >= deadline: if time.monotonic() >= deadline:
raise TimeoutError( raise TimeoutError(
f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}" f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}"
) from last_err ) from last_err
time.sleep(interval) time.sleep(interval)
else:
def _get_cached_actor_handle(actor_name: str):
raise RuntimeError("单机模式下不应调用 _get_cached_actor_handle")
def clear_actor_cache():
pass
def wait_for_actor(actor_name: str, **kwargs):
raise RuntimeError("单机模式下不应调用 wait_for_actor")
# ─── 统一入口 ───
def ray_actor_hook(*actor_names: str, timeout: float = 0.0, interval: float = 0.5): def ray_actor_hook(*actor_names: str, timeout: float = 0.0, interval: float = 0.5):
"""按名字批量取出 Ray Actor 句柄,组装成一个 ``ActorList`` 返回。 """按名字批量取出 Actor 句柄,组装成一个 ActorList 返回。
例:``actors = ray_actor_hook("postgres_database", "global_state_machine")`` 单机模式从 _standalone_registry 取,分布式模式走 ray.get_actor。
随后即可用 ``actors.postgres_database`` 拿到对应句柄。
Args:
timeout: ``>0`` 时对每个 actor 走 ``wait_for_actor`` 等待就绪(启动期用);
缺省 ``0`` 保持原"快速失败"语义——actor 不在立即抛异常。
interval: 等待轮询间隔,仅在 ``timeout>0`` 时生效。
""" """
actor_list = ActorList() actor_list = ActorList()
if _STANDALONE:
for name in actor_names:
if name not in _standalone_registry:
raise ValueError(
f"Standalone registry: actor {name!r} not registered"
)
setattr(actor_list, name, _standalone_registry[name])
return actor_list
for actor_name in actor_names: for actor_name in actor_names:
if timeout > 0: if timeout > 0:
handle = wait_for_actor( handle = wait_for_actor(
+86
View File
@@ -0,0 +1,86 @@
"""KiloStar 单机模式适配层:用 asyncio 协程模拟 Ray Actor 接口。
单机模式下,所有 Actor 退化为普通 Python 异步单例,通过 StandaloneProxy
包装后暴露与 Ray Actor Handle 相同的 `.method.remote(args)` 调用接口,
使上层代码在两种模式间无感切换。
"""
from __future__ import annotations
import asyncio
import os
from typing import Any
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
class _MethodProxy:
"""包装单个方法,使 .remote(*args, **kwargs) 返回一个可 await 的 Task。"""
__slots__ = ("_method",)
def __init__(self, method: Any):
self._method = method
def remote(self, *args: Any, **kwargs: Any) -> asyncio.Task:
async def _invoke():
result = self._method(*args, **kwargs)
if asyncio.iscoroutine(result):
return await result
return result
return asyncio.ensure_future(_invoke())
class StandaloneProxy:
"""包装一个普通 Python 实例,模拟 Ray Actor Handle 的属性访问接口。
用法:proxy.some_method.remote(x, y) → 等效于 await instance.some_method(x, y)
"""
__slots__ = ("_instance",)
def __init__(self, instance: Any):
object.__setattr__(self, "_instance", instance)
def __getattr__(self, name: str) -> _MethodProxy:
attr = getattr(object.__getattribute__(self, "_instance"), name)
if callable(attr):
return _MethodProxy(attr)
return attr
# ─── 条件装饰器 ───
def actor_class(cls):
"""条件装饰器:分布式模式 → @ray.remote,单机模式 → 原样返回类。"""
if _STANDALONE:
return cls
import ray
return ray.remote(cls)
def remote_task(func):
"""条件装饰器:分布式 → @ray.remote(func),单机 → .remote() 转为 asyncio task。
单机模式下返回一个 stub 对象,其 .remote() 方法把函数以协程方式调度到
当前事件循环(workflow task 需要用 await 版本的 _entry,由调用方处理)。
"""
if _STANDALONE:
class _TaskProxy:
@staticmethod
def remote(*args, **kwargs):
async def _run():
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
return await result
return result
return asyncio.ensure_future(_run())
return _TaskProxy()
import ray
return ray.remote(func)
+9 -3
View File
@@ -12,12 +12,18 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ray import os
import time import time
import asyncio import asyncio
from collections import OrderedDict from collections import OrderedDict
from ray.util.queue import Queue from kilostar.utils.standalone_proxy import actor_class
from kilostar.utils.ray_hook import ray_actor_hook from kilostar.utils.ray_hook import ray_actor_hook
_STANDALONE = os.environ.get("KILOSTAR_MODE", "distributed") == "standalone"
if _STANDALONE:
from asyncio import Queue
else:
from ray.util.queue import Queue
from kilostar.worker_individual.base_individual import BaseIndividual from kilostar.worker_individual.base_individual import BaseIndividual
from kilostar.worker_individual.skill_individual import SkillIndividual from kilostar.worker_individual.skill_individual import SkillIndividual
from kilostar.worker_individual.ordinary_individual import OrdinaryIndividual from kilostar.worker_individual.ordinary_individual import OrdinaryIndividual
@@ -27,7 +33,7 @@ from kilostar.worker_individual.special_individual import SpecialIndividual
from kilostar.utils.logger import get_logger from kilostar.utils.logger import get_logger
@ray.remote @actor_class
class WorkerCluster: class WorkerCluster:
""" """
工作集群 Actor:管理和调度所有的 worker_individual 工作集群 Actor:管理和调度所有的 worker_individual
+62 -21
View File
@@ -31,8 +31,8 @@ except Exception as e:
sys.exit(1) sys.exit(1)
import asyncio import asyncio
import ray
from ray import serve KILOSTAR_MODE = os.environ.get("KILOSTAR_MODE", "distributed")
from kilostar.worker_cluster import WorkerCluster from kilostar.worker_cluster import WorkerCluster
from kilostar.utils.banner import print_banner from kilostar.utils.banner import print_banner
@@ -42,10 +42,53 @@ from kilostar.core.global_workflow_manager import GlobalWorkflowManager
from kilostar.core.individual.regulatory_node import RegulatoryNode from kilostar.core.individual.regulatory_node import RegulatoryNode
from kilostar.core.individual.consciousness_node import ConsciousnessNode from kilostar.core.individual.consciousness_node import ConsciousnessNode
from kilostar.core.individual.control_node import ControlNode from kilostar.core.individual.control_node import ControlNode
from kilostar.api import KiloStarGateway
if KILOSTAR_MODE != "standalone":
import ray
from ray import serve
from kilostar.api import KiloStarGateway
async def start_system(): async def start_standalone():
"""单机模式:纯 asyncio,不依赖 Ray。"""
import uvicorn
from kilostar.utils.ray_hook import register_standalone
from kilostar.api import app
postgres_database = PostgresDatabase()
await postgres_database.init_db()
register_standalone("postgres_database", postgres_database)
global_state_machine = GlobalStateMachine(postgres_database)
await global_state_machine.init_state_machine()
register_standalone("global_state_machine", global_state_machine)
global_workflow_manager = GlobalWorkflowManager()
await global_workflow_manager.init_manager()
register_standalone("global_workflow_manager", global_workflow_manager)
regulatory_node = RegulatoryNode()
register_standalone("regulatory_node", regulatory_node)
consciousness_node = ConsciousnessNode()
register_standalone("consciousness_node", consciousness_node)
control_node = ControlNode()
register_standalone("control_node", control_node)
worker_cluster = WorkerCluster()
await worker_cluster.start()
register_standalone("worker_cluster", worker_cluster)
print(f"✅ KiloStar 单机模式启动完成,监听 0.0.0.0:8000")
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
await server.serve()
async def start_distributed():
"""分布式模式:使用 Ray Actor + Ray Serve。"""
env_vars = { env_vars = {
"POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"), "POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"),
"POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", ""), "POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", ""),
@@ -63,8 +106,9 @@ async def start_system():
runtime_env={"env_vars": env_vars}, runtime_env={"env_vars": env_vars},
) )
# 2. 启动数据库组件 postgres_database = PostgresDatabase.options(
postgres_database = PostgresDatabase.options(name="postgres_database").remote() name="postgres_database"
).remote()
await postgres_database.init_db.remote() await postgres_database.init_db.remote()
global_state_machine = GlobalStateMachine.options( global_state_machine = GlobalStateMachine.options(
@@ -73,55 +117,52 @@ async def start_system():
print("正在等待 GlobalStateMachine 初始化并加载注册表...") print("正在等待 GlobalStateMachine 初始化并加载注册表...")
try: try:
# 强制执行初始化方法并阻塞等待结果。
# 如果 __init__ 或 init_state_machine 中有任何报错,会立刻在这里抛出!
await global_state_machine.init_state_machine.remote() await global_state_machine.init_state_machine.remote()
print("GlobalStateMachine 初始化成功!") print("GlobalStateMachine 初始化成功!")
except Exception as e: except Exception as e:
print(f"\n[致命错误] GlobalStateMachine 启动失败!真实报错如下:\n{e}\n") print(f"\n[致命错误] GlobalStateMachine 启动失败!\n{e}\n")
return return
global_workflow_manager = GlobalWorkflowManager.options( global_workflow_manager = GlobalWorkflowManager.options(
name="global_workflow_manager", namespace="kilostar", lifetime="detached" name="global_workflow_manager", namespace="kilostar", lifetime="detached"
).remote() ).remote()
# 4. 启动核心节点 RegulatoryNode.options(name="regulatory_node").remote()
regulatory_node = RegulatoryNode.options(name="regulatory_node").remote() ConsciousnessNode.options(name="consciousness_node").remote()
consciousness_node = ConsciousnessNode.options(name="consciousness_node").remote() ControlNode.options(name="control_node").remote()
control_node = ControlNode.options(name="control_node").remote()
try: try:
WorkerCluster.options( WorkerCluster.options(
name="worker_cluster", name="worker_cluster", lifetime="detached"
lifetime="detached", # 保证它在后台一直运行
).remote() ).remote()
print("✅ WorkerCluster 已成功启动并注册!") print("✅ WorkerCluster 已成功启动并注册!")
except ValueError: except ValueError:
print("WorkerCluster 已经存在。") print("WorkerCluster 已经存在。")
# 工作流以一次性 ray task 形式由 ConsciousnessNode 直接 fire,不再需要常驻 engine actor。
print("正在等待 GlobalWorkflowManager 初始化与恢复工作流...") print("正在等待 GlobalWorkflowManager 初始化与恢复工作流...")
try: try:
await global_workflow_manager.init_manager.remote() await global_workflow_manager.init_manager.remote()
print("GlobalWorkflowManager 初始化成功!") print("GlobalWorkflowManager 初始化成功!")
except Exception as e: except Exception as e:
print(f"\n[致命错误] GlobalWorkflowManager 启动失败!真实报错如下:\n{e}\n") print(f"\n[致命错误] GlobalWorkflowManager 启动失败!\n{e}\n")
return return
# 6. 启动 FastAPI 网关 (使用 Ray Serve)
serve.start(http_options={"host": "0.0.0.0", "port": 8000}) serve.start(http_options={"host": "0.0.0.0", "port": 8000})
serve.run(KiloStarGateway.bind()) serve.run(KiloStarGateway.bind())
# 挂起主线程以保持系统运行
while True: while True:
await asyncio.sleep(3600) await asyncio.sleep(3600)
def main(): def main():
print_banner() print_banner()
mode = KILOSTAR_MODE
print(f"启动模式: {mode}")
try: try:
asyncio.run(start_system()) if mode == "standalone":
asyncio.run(start_standalone())
else:
asyncio.run(start_distributed())
except KeyboardInterrupt: except KeyboardInterrupt:
print("系统已退出。") print("系统已退出。")
+106
View File
@@ -0,0 +1,106 @@
"""standalone_proxy 适配层单元测试。
验证 StandaloneProxy / _MethodProxy / actor_class / remote_task
在单机模式下的行为是否正确模拟了 Ray Actor Handle 的 .remote() 接口。
"""
import asyncio
import pytest
from kilostar.utils import standalone_proxy
from kilostar.utils.standalone_proxy import StandaloneProxy, _MethodProxy
class TestMethodProxy:
def test_sync_method(self):
def add(a, b):
return a + b
proxy = _MethodProxy(add)
result = asyncio.get_event_loop().run_until_complete(proxy.remote(2, 3))
assert result == 5
def test_async_method(self):
async def async_add(a, b):
return a + b
proxy = _MethodProxy(async_add)
result = asyncio.get_event_loop().run_until_complete(proxy.remote(4, 6))
assert result == 10
class TestStandaloneProxy:
def test_method_call(self):
class FakeActor:
def greet(self, name):
return f"hello {name}"
proxy = StandaloneProxy(FakeActor())
future = proxy.greet.remote("world")
result = asyncio.get_event_loop().run_until_complete(future)
assert result == "hello world"
def test_async_method_call(self):
class FakeActor:
async def compute(self, x):
return x * 2
proxy = StandaloneProxy(FakeActor())
future = proxy.compute.remote(7)
result = asyncio.get_event_loop().run_until_complete(future)
assert result == 14
def test_attribute_access(self):
class FakeActor:
def __init__(self):
self.name = "test"
proxy = StandaloneProxy(FakeActor())
assert proxy.name == "test"
class TestActorClass:
def test_standalone_returns_class_unchanged(self, monkeypatch):
monkeypatch.setattr(standalone_proxy, "_STANDALONE", True)
@standalone_proxy.actor_class
class MyActor:
def do_work(self):
return 42
instance = MyActor()
assert instance.do_work() == 42
def test_standalone_class_is_plain_python(self, monkeypatch):
monkeypatch.setattr(standalone_proxy, "_STANDALONE", True)
@standalone_proxy.actor_class
class MyActor:
pass
assert not hasattr(MyActor, "remote")
assert not hasattr(MyActor, "options")
class TestRemoteTask:
def test_sync_task(self, monkeypatch):
monkeypatch.setattr(standalone_proxy, "_STANDALONE", True)
@standalone_proxy.remote_task
def multiply(a, b):
return a * b
future = multiply.remote(3, 4)
result = asyncio.get_event_loop().run_until_complete(future)
assert result == 12
def test_async_task(self, monkeypatch):
monkeypatch.setattr(standalone_proxy, "_STANDALONE", True)
@standalone_proxy.remote_task
async def async_multiply(a, b):
return a * b
future = async_multiply.remote(5, 6)
result = asyncio.get_event_loop().run_until_complete(future)
assert result == 30