94 lines
3.4 KiB
Python
94 lines
3.4 KiB
Python
import asyncio
|
|
import ray
|
|
from pretor.worker_individual.worker_cluster import WorkerCluster
|
|
from pretor.utils.banner import print_banner
|
|
from pretor.core.database.postgres import PostgresDatabase
|
|
from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
|
|
from pretor.core.individual.supervisory_node.supervisory_node import SupervisoryNode
|
|
from pretor.core.individual.consciousness_node.consciousness_node import ConsciousnessNode
|
|
from pretor.core.individual.control_node.control_node import ControlNode
|
|
from pretor.core.workflow.workflow_runner import WorkflowRunningEngine
|
|
from pretor.core.api import PretorGateway
|
|
from ray import serve
|
|
import os
|
|
|
|
|
|
|
|
async def start_system():
|
|
env_vars = {
|
|
"POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"),
|
|
"POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", ""),
|
|
"POSTGRES_HOST": os.getenv("POSTGRES_HOST", "db"),
|
|
"POSTGRES_PORT": os.getenv("POSTGRES_PORT", "5432"),
|
|
"POSTGRES_DB": os.getenv("POSTGRES_DB", "postgres"),
|
|
"SECRET_KEY": os.getenv("SECRET_KEY", "secret"),
|
|
}
|
|
|
|
ray.init(ignore_reinit_error=True,
|
|
namespace="pretor",
|
|
dashboard_host="0.0.0.0",
|
|
dashboard_port=8265,
|
|
runtime_env={"env_vars": env_vars})
|
|
|
|
|
|
# 2. 启动数据库组件
|
|
postgres_database = PostgresDatabase.options(name='postgres_database').remote()
|
|
await postgres_database.init_db.remote()
|
|
|
|
global_state_machine = GlobalStateMachine.options(
|
|
name='global_state_machine',
|
|
namespace='pretor',
|
|
lifetime='detached'
|
|
).remote(postgres_database)
|
|
|
|
print("正在等待 GlobalStateMachine 初始化并加载注册表...")
|
|
try:
|
|
# 强制执行初始化方法并阻塞等待结果。
|
|
# 如果 __init__ 或 init_state_machine 中有任何报错,会立刻在这里抛出!
|
|
await global_state_machine.init_state_machine.remote()
|
|
print("GlobalStateMachine 初始化成功!")
|
|
except Exception as e:
|
|
print(f"\n[致命错误] GlobalStateMachine 启动失败!真实报错如下:\n{e}\n")
|
|
return
|
|
|
|
# 4. 启动核心节点
|
|
supervisory_node = SupervisoryNode.options(name='supervisory_node').remote()
|
|
consciousness_node = ConsciousnessNode.options(name='consciousness_node').remote()
|
|
control_node = ControlNode.options(name='control_node').remote()
|
|
|
|
try:
|
|
worker_cluster_actor = WorkerCluster.options(
|
|
name="worker_cluster",
|
|
lifetime="detached" # 保证它在后台一直运行
|
|
).remote()
|
|
print("✅ WorkerCluster 已成功启动并注册!")
|
|
except ValueError:
|
|
print("WorkerCluster 已经存在。")
|
|
# 5. 启动工作流运行引擎
|
|
workflow_engine = WorkflowRunningEngine.options(name='workflow_running_engine').remote(
|
|
consciousness_node=consciousness_node,
|
|
control_node=control_node,
|
|
supervisory_node=supervisory_node
|
|
)
|
|
# 异步拉起 runner 协程群
|
|
workflow_engine.run.remote()
|
|
|
|
# 6. 启动 FastAPI 网关 (使用 Ray Serve)
|
|
serve.start(http_options={"host": "0.0.0.0", "port": 8000})
|
|
serve.run(PretorGateway.bind())
|
|
|
|
# 挂起主线程以保持系统运行
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
|
|
|
|
def main():
|
|
print_banner()
|
|
try:
|
|
asyncio.run(start_system())
|
|
except KeyboardInterrupt:
|
|
print("系统已退出。")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |