From d17f6384fcd2a52750e18e1ee1ccb4ca7fb3696d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E5=A4=95?= Date: Mon, 27 Apr 2026 13:06:15 +0800 Subject: [PATCH] Feat/dashboard clean monitor upload 1334918525438687015 (#40) * feat: Clean up dashboard UI, shift workflow WS to SSE, and add file upload support - Removed Monitoring view and associated `/ws/state` cluster websocket route. - Modified workflow tracing from WebSocket (`/api/v1/workflow/ws/{trace_id}`) to Server-Sent Events (`/api/v1/workflow/sse/{trace_id}`) for unidirectional pushes, introducing a new `/api/v1/workflow/reply/{trace_id}` POST route to handle incoming client replies. - Cleaned up dummy data and unneeded links in the chat layout (LeftPanel, ChatPanel). - Implemented file upload functionality: added a `/api/v1/adapter/client/upload` endpoint to the backend which saves files to a local `uploads` directory, and added an integrated file input triggered via the `+` button in the frontend chat interface to facilitate uploading with an automated chat message confirmation. Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com> * fix: prevent global_state_machine actor from being garbage collected Added `lifetime="detached"` and kept a local reference to the `GlobalStateMachine` actor in `main.py` so that it doesn't get cleaned up by Ray due to going out of scope, which was causing `ray.get_actor('global_state_machine')` calls to fail in API route handlers (resulting in 500 errors). Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com> * fix: resolve named actor addressing failure across Ray processes via explicit namespace The `ray.get_actor` calls in API routes executing within a Ray Serve worker were failing to resolve the actors created by the main process because the implicit random namespace of `ray.init()` did not match the namespace of the Ray Serve application scope. Instead of overriding garbage collection via `lifetime="detached"` (which can lead to actor leakage), this assigns an explicit `namespace="pretor"` when initializing Ray in the main process, and uses the identical namespace in `ray_hook.py` when looking up named actors. Also retains the local variable assignments in `main.py` to prevent them from being eliminated as unused variables. Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com> * fix: defer actor lookup in WorkflowRunningEngine to avoid startup race conditions The `WorkflowRunningEngine` was trying to fetch the `global_state_machine` actor during its `__init__` method via `ray_actor_hook()`. Since actor creation requests are dispatched asynchronously, the `global_state_machine` might not be completely registered and discoverable via `ray.get_actor()` by the time the `WorkflowRunningEngine`'s `__init__` is evaluated. Moved the actor lookup to the async `run()` method, which gets executed after the engine itself is fully up, allowing time for other components to become available in the global Ray namespace. Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com> --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com> --- pretor/core/workflow/workflow_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py index 77b3229..057f121 100644 --- a/pretor/core/workflow/workflow_runner.py +++ b/pretor/core/workflow/workflow_runner.py @@ -268,9 +268,11 @@ class WorkflowRunningEngine: self.consciousness_node = consciousness_node self.control_node = control_node self.supervisory_node = supervisory_node - self.global_state_machine = ray_actor_hook("global_state_machine").global_state_machine + self.global_state_machine = None async def run(self): + # Move actor hook to async start so we don't race during __init__ across cluster + self.global_state_machine = ray_actor_hook("global_state_machine").global_state_machine self.workflow_queue = asyncio.Queue() self.runner_engine = { f"runner_{i}": asyncio.create_task(self.runner(i))