# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pretor.utils.ray_hook import ray_actor_hook from fastapi import APIRouter, Request, HTTPException from fastapi.responses import StreamingResponse import asyncio workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) @workflow_router.get("/list") async def get_workflow_list(): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine events = await global_state_machine.list_events.remote() return events @workflow_router.get("/{trace_id}") async def get_workflow_detail(trace_id: str): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine event = await global_state_machine.get_event.remote(trace_id) if not event: raise HTTPException(status_code=404, detail="Workflow not found") workflow = event.workflow if not workflow: return { "event_id": trace_id, "workflow_title": None, "status": "waiting", "user_name": event.user_name, "message": event.message, "create_time": event.create_time, "steps": [], } steps = [] for step in workflow.work_link: steps.append({ "step": step.step, "name": step.name, "node": step.node, "action": step.action, "desc": step.desc, "status": step.status, "agent_id": step.agent_id, }) return { "event_id": trace_id, "workflow_title": workflow.title, "status": workflow.status.status, "command": workflow.command, "current_step": workflow.status.step, "user_name": event.user_name, "message": event.message, "create_time": event.create_time, "steps": steps, } @workflow_router.get("/sse/{trace_id}") async def get_workflow_sse(trace_id: str, request: Request): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine async def event_generator(): try: while True: if await request.is_disconnected(): break # You might also want to send the workflow state periodically or when updated # Here we just wait for pending messages and send them message = await global_state_machine.get_pending.remote(trace_id) # Ensure the message is formatted as SSE yield f"data: {message}\n\n" except asyncio.CancelledError: pass return StreamingResponse(event_generator(), media_type="text/event-stream") @workflow_router.post("/reply/{trace_id}") async def post_workflow_reply(trace_id: str, request: Request): data = await request.json() reply_msg = data.get("message", "") global_state_machine = ray_actor_hook("global_state_machine").global_state_machine await global_state_machine.put_received.remote(trace_id, reply_msg) return {"status": "ok"}