-
-
Workflow Logs
-
ID: {selectedWorkflow}
-
+ {loading && !detail ? (
+
+
+ Loading...
+
+ ) : !detail ? (
+
Failed to load workflow details
+ ) : (
+ <>
+ {/* Header */}
+
+
+ {detail.workflow_title || 'Workflow'}
+
+
ID: {detail.event_id}
+ {detail.command && (
+
Command: {detail.command}
+ )}
+
+
+ {detail.status}
+
+
+ Step {detail.current_step}/{detail.steps.length}
+
+
+
- {/* Dynamic Log Rendering */}
-
-
- {messages.length === 0 && isConnected && (
-
Waiting for workflow events...
- )}
-
- {messages.map((msg, idx) => {
- // For now, simply parsing the raw text as a basic log entry.
- // If the backend sends structured JSON, this could be parsed and styled nicer.
- const isLatest = idx === messages.length - 1;
-
- return (
-
- {/* Dot */}
-
- {isLatest && isConnected ? (
-
- ) : (
-
- )}
-
- {/* Card */}
-
-
Step {idx + 1}
-
{msg}
+ {/* Steps */}
+ {detail.steps.length > 0 && (
+
+
Steps
+
+ {detail.steps.map((step: WorkflowStep) => (
+
+ {stepStatusIcon(step.status)}
+ {step.step}
+ {step.name}
+ {step.node}
+
+ ))}
- )
- })}
-
+ )}
+
+ {detail.steps.length === 0 && (
+
+
+
Workflow is being generated...
+
+ )}
+
+ {/* SSE Logs */}
+ {logs.length > 0 && (
+
+
Live Logs
+
+ {logs.map((msg, idx) => (
+
+ ))}
+
+
+ )}
+
+ {logs.length === 0 && sseConnected && isActive && (
+
Waiting for live events...
+ )}
+ >
+ )}
);
diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts
index 1308952..cbd2e8c 100644
--- a/frontend/src/types/index.ts
+++ b/frontend/src/types/index.ts
@@ -67,6 +67,28 @@ export interface Workflow {
status?: string;
}
+export interface WorkflowStep {
+ step: number;
+ name: string;
+ node: string;
+ action: string;
+ desc: string;
+ status: string;
+ agent_id?: string;
+}
+
+export interface WorkflowDetail {
+ event_id: string;
+ workflow_title: string | null;
+ status: string;
+ command?: string;
+ current_step: number;
+ user_name: string;
+ message: string;
+ create_time: string;
+ steps: WorkflowStep[];
+}
+
// Workflow Template Validation
export interface WorkStep {
name: string;
diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py
index 1ff7ba1..e2d6cab 100644
--- a/pretor/api/workflow.py
+++ b/pretor/api/workflow.py
@@ -14,7 +14,7 @@
from pretor.utils.ray_hook import ray_actor_hook
-from fastapi import APIRouter, Request
+from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import StreamingResponse
import asyncio
@@ -26,6 +26,49 @@ async def get_workflow_list():
events = await global_state_machine.list_events.remote()
return events
+
+@workflow_router.get("/{trace_id}")
+async def get_workflow_detail(trace_id: str):
+ global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
+ event = await global_state_machine.get_event.remote(trace_id)
+ if not event:
+ raise HTTPException(status_code=404, detail="Workflow not found")
+
+ workflow = event.workflow
+ if not workflow:
+ return {
+ "event_id": trace_id,
+ "workflow_title": None,
+ "status": "waiting",
+ "user_name": event.user_name,
+ "message": event.message,
+ "create_time": event.create_time,
+ "steps": [],
+ }
+
+ steps = []
+ for step in workflow.work_link:
+ steps.append({
+ "step": step.step,
+ "name": step.name,
+ "node": step.node,
+ "action": step.action,
+ "desc": step.desc,
+ "status": step.status,
+ "agent_id": step.agent_id,
+ })
+ return {
+ "event_id": trace_id,
+ "workflow_title": workflow.title,
+ "status": workflow.status.status,
+ "command": workflow.command,
+ "current_step": workflow.status.step,
+ "user_name": event.user_name,
+ "message": event.message,
+ "create_time": event.create_time,
+ "steps": steps,
+ }
+
@workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(trace_id: str, request: Request):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py
index 5242354..3bbd9da 100644
--- a/pretor/core/workflow/workflow_runner.py
+++ b/pretor/core/workflow/workflow_runner.py
@@ -33,7 +33,6 @@ from pretor.core.individual.supervisory_node.template import TerminationMessage
import pathlib
-
def get_workflow_template(workflow_name: str) -> str:
workflow_template = pathlib.Path(__file__).parent.parent.parent / "workflow_template" / (workflow_name + "_workflow_template.json")
with open(workflow_template, "r", encoding="utf-8") as workflow_template_file:
@@ -59,6 +58,13 @@ class WorkflowEngine:
"""控制节点"""
self.supervisory_node = supervisory_node
"""监督节点"""
+ self._gsm = ray_actor_hook("global_state_machine").global_state_machine
+
+ async def _push_sse(self, msg: str) -> None:
+ try:
+ await self._gsm.put_pending.remote(self.workflow.trace_id, msg)
+ except Exception:
+ pass
def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any:
"""
@@ -84,6 +90,7 @@ class WorkflowEngine:
"""
self.logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]")
+ await self._push_sse(f"[工作流启动] {self.workflow.title}")
max_step = len(self.workflow.work_link)
while 1 <= self.workflow.status.step <= max_step:
current_step_id = self.workflow.status.step
@@ -91,9 +98,11 @@ class WorkflowEngine:
if not current_step:
self.logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。")
self.workflow.status.status = "failed"
+ await self._push_sse(f"[工作流失败] 找不到步骤 {current_step_id}")
break
self.logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}")
current_step.status = "running"
+ await self._push_sse(f"[Step {current_step_id}] {current_step.name}: {current_step.desc}")
try:
step_input_data = self._prepare_inputs(current_step.inputs)
step_result, is_success = await self._dispatch_to_node(current_step, step_input_data)
@@ -102,24 +111,30 @@ class WorkflowEngine:
self.workflow.context_memory[current_step.outputs] = step_result
self.logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'")
current_step.status = "completed"
+ await self._push_sse(f"[Step {current_step_id} 完成] {current_step.name}")
else:
self.logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。")
current_step.status = "failed"
+ await self._push_sse(f"[Step {current_step_id} 失败] {current_step.name}")
self._handle_logic_gate(current_step, is_success)
except WorkflowExit:
self.logger.info("命中 if_pass='exit',工作流被主动要求结束。")
+ await self._push_sse("[工作流结束] 主动退出")
break
except WorkflowError as e:
self.logger.error(f"{e},终止工作流。")
self.workflow.status.status = "failed"
+ await self._push_sse(f"[工作流失败] {e}")
break
except Exception as e:
self.logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True)
current_step.status = "failed"
self.workflow.status.status = "failed"
+ await self._push_sse(f"[工作流异常] {e}")
break
self.logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。")
self.workflow.output = self.workflow.context_memory
+ await self._push_sse(f"[工作流完成] {self.workflow.title}")
await self._report_results()
async def _report_results(self):