# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pretor.utils.ray_hook import ray_actor_hook from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse import asyncio workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) @workflow_router.get("/list") async def get_workflow_list(): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine events = await global_state_machine.list_events.remote() return events @workflow_router.get("/sse/{trace_id}") async def get_workflow_sse(trace_id: str, request: Request): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine async def event_generator(): try: while True: if await request.is_disconnected(): break # You might also want to send the workflow state periodically or when updated # Here we just wait for pending messages and send them message = await global_state_machine.get_pending.remote(trace_id) # Ensure the message is formatted as SSE yield f"data: {message}\n\n" except asyncio.CancelledError: pass return StreamingResponse(event_generator(), media_type="text/event-stream") @workflow_router.post("/reply/{trace_id}") async def post_workflow_reply(trace_id: str, request: Request): data = await request.json() reply_msg = data.get("message", "") global_state_machine = ray_actor_hook("global_state_machine").global_state_machine await global_state_machine.put_received.remote(trace_id, reply_msg) return {"status": "ok"}