import asyncio import ray from pretor.worker_individual.worker_cluster import WorkerCluster from pretor.utils.banner import print_banner from pretor.core.database.postgres import PostgresDatabase from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine from pretor.core.individual.supervisory_node.supervisory_node import SupervisoryNode from pretor.core.individual.consciousness_node.consciousness_node import ConsciousnessNode from pretor.core.individual.control_node.control_node import ControlNode from pretor.core.workflow.workflow_runner import WorkflowRunningEngine from pretor.core.api import PretorGateway from ray import serve import os async def start_system(): env_vars = { "POSTGRES_USER": os.getenv("POSTGRES_USER", "postgres"), "POSTGRES_PASSWORD": os.getenv("POSTGRES_PASSWORD", ""), "POSTGRES_HOST": os.getenv("POSTGRES_HOST", "db"), "POSTGRES_PORT": os.getenv("POSTGRES_PORT", "5432"), "POSTGRES_DB": os.getenv("POSTGRES_DB", "postgres"), "SECRET_KEY": os.getenv("SECRET_KEY", "secret"), } ray.init(ignore_reinit_error=True, namespace="pretor", dashboard_host="0.0.0.0", dashboard_port=8265, runtime_env={"env_vars": env_vars}) # 2. 启动数据库组件 postgres_database = PostgresDatabase.options(name='postgres_database').remote() await postgres_database.init_db.remote() global_state_machine = GlobalStateMachine.options( name='global_state_machine', namespace='pretor', lifetime='detached' ).remote(postgres_database) print("正在等待 GlobalStateMachine 初始化并加载注册表...") try: # 强制执行初始化方法并阻塞等待结果。 # 如果 __init__ 或 init_state_machine 中有任何报错,会立刻在这里抛出! await global_state_machine.init_state_machine.remote() print("GlobalStateMachine 初始化成功!") except Exception as e: print(f"\n[致命错误] GlobalStateMachine 启动失败!真实报错如下:\n{e}\n") return # 4. 启动核心节点 supervisory_node = SupervisoryNode.options(name='supervisory_node').remote() consciousness_node = ConsciousnessNode.options(name='consciousness_node').remote() control_node = ControlNode.options(name='control_node').remote() try: worker_cluster_actor = WorkerCluster.options( name="worker_cluster", lifetime="detached" # 保证它在后台一直运行 ).remote() print("✅ WorkerCluster 已成功启动并注册!") except ValueError: print("WorkerCluster 已经存在。") # 5. 启动工作流运行引擎 workflow_engine = WorkflowRunningEngine.options(name='workflow_running_engine').remote( consciousness_node=consciousness_node, control_node=control_node, supervisory_node=supervisory_node ) # 异步拉起 runner 协程群 workflow_engine.run.remote() # 6. 启动 FastAPI 网关 (使用 Ray Serve) serve.start(http_options={"host": "0.0.0.0", "port": 8000}) serve.run(PretorGateway.bind()) # 挂起主线程以保持系统运行 while True: await asyncio.sleep(3600) def main(): print_banner() try: asyncio.run(start_system()) except KeyboardInterrupt: print("系统已退出。") if __name__ == '__main__': main()