From 3e0f3770041b437c9f120e12d879af3e92c52cbb Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Tue, 28 Apr 2026 21:39:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8D=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=BA=9Bbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/components/Chat/LeftPanel.tsx | 2 +- frontend/src/components/Chat/RightPanel.tsx | 240 +++++++++++++------- frontend/src/types/index.ts | 22 ++ pretor/api/workflow.py | 45 +++- pretor/core/workflow/workflow_runner.py | 17 +- 5 files changed, 245 insertions(+), 81 deletions(-) diff --git a/frontend/src/components/Chat/LeftPanel.tsx b/frontend/src/components/Chat/LeftPanel.tsx index aceb50d..c7c1ad9 100644 --- a/frontend/src/components/Chat/LeftPanel.tsx +++ b/frontend/src/components/Chat/LeftPanel.tsx @@ -144,7 +144,7 @@ export function LeftPanel({ activeTab, setActiveTab, selectedWorkflow, setSelect >
{wf.workflow_title || 'Unnamed Workflow'} - +

ID: {wf.event_id}

diff --git a/frontend/src/components/Chat/RightPanel.tsx b/frontend/src/components/Chat/RightPanel.tsx index 6cfb7fd..a13546f 100644 --- a/frontend/src/components/Chat/RightPanel.tsx +++ b/frontend/src/components/Chat/RightPanel.tsx @@ -1,68 +1,91 @@ -import { useState, useEffect } from 'react'; -import { Terminal, Activity } from 'lucide-react'; +import { useState, useEffect, useRef } from 'react'; +import { Terminal, Activity, RefreshCw, CheckCircle2, Circle, XCircle, Clock, Loader2 } from 'lucide-react'; +import apiClient from '../../api/client'; +import type { WorkflowDetail, WorkflowStep } from '../../types'; interface RightPanelProps { selectedWorkflow: string | null; } +function stepStatusIcon(status: string) { + switch (status) { + case 'completed': + return ; + case 'running': + return ; + case 'failed': + return ; + default: + return ; + } +} + export function RightPanel({ selectedWorkflow }: RightPanelProps) { - const [messages, setMessages] = useState([]); - const [isConnected, setIsConnected] = useState(false); + const [detail, setDetail] = useState(null); + const [loading, setLoading] = useState(false); + const [logs, setLogs] = useState([]); + const [sseConnected, setSseConnected] = useState(false); + const eventSourceRef = useRef(null); + + const fetchDetail = async (traceId: string) => { + setLoading(true); + setLogs([]); + try { + const response = await apiClient.get(`/api/v1/workflow/${traceId}`); + setDetail(response.data); + } catch { + setDetail(null); + } finally { + setLoading(false); + } + }; useEffect(() => { if (!selectedWorkflow) { - // eslint-disable-next-line react-hooks/set-state-in-effect - setMessages([]); + setDetail(null); + setLogs([]); return; } - let eventSource: EventSource | null = null; + fetchDetail(selectedWorkflow); - const connect = () => { - const protocol = window.location.protocol; - const host = window.location.host; + const protocol = window.location.protocol; + const host = window.location.host; + const apiBase = import.meta.env.VITE_API_BASE_URL || `${protocol}//${host}`; + const es = new EventSource(`${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`); + eventSourceRef.current = es; - const apiBase = import.meta.env.VITE_API_BASE_URL || `${protocol}//${host}`; - - // Using the workflow router SSE endpoint - eventSource = new EventSource(`${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`); - - eventSource.onopen = () => { - setIsConnected(true); - setMessages([]); // clear previous traces - }; - - eventSource.onmessage = (event) => { - try { - setMessages(prev => [...prev, event.data]); - } catch (e) { - console.error("Error receiving workflow SSE message", e); - } - }; - - eventSource.onerror = (error) => { - console.error("EventSource failed.", error); - setIsConnected(false); - // EventSource automatically attempts to reconnect, so we can just let it be, - // or we could close it if we wanted to handle retries manually. - }; + es.onopen = () => { + setSseConnected(true); }; - connect(); + es.onmessage = (event) => { + setLogs(prev => [...prev, event.data]); + }; + + es.onerror = () => { + setSseConnected(false); + }; + + const interval = setInterval(() => { + fetchDetail(selectedWorkflow); + }, 3000); return () => { - if (eventSource) { - eventSource.close(); - } + es.close(); + eventSourceRef.current = null; + clearInterval(interval); }; }, [selectedWorkflow]); + const isActive = detail?.status === 'llm_working' || detail?.status === 'tool_working'; + if (!selectedWorkflow) { return (
- -

No Workflow Selected

-

Select a workflow from the left panel to view its execution trace.

+ +

No Workflow Selected

+

Select a workflow from the left panel to view its details.

); } @@ -72,50 +95,111 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) {

- Execution Trace + Workflow Detail

- - {isConnected ? 'Running' : 'Disconnected'} - +
+ + {sseConnected ? 'Live' : '--'} + + +
-
-

Workflow Logs

-

ID: {selectedWorkflow}

-
+ {loading && !detail ? ( +
+ + Loading... +
+ ) : !detail ? ( +
Failed to load workflow details
+ ) : ( + <> + {/* Header */} +
+

+ {detail.workflow_title || 'Workflow'} +

+

ID: {detail.event_id}

+ {detail.command && ( +

Command: {detail.command}

+ )} +
+ + {detail.status} + + + Step {detail.current_step}/{detail.steps.length} + +
+
- {/* Dynamic Log Rendering */} -
- - {messages.length === 0 && isConnected && ( -
Waiting for workflow events...
- )} - - {messages.map((msg, idx) => { - // For now, simply parsing the raw text as a basic log entry. - // If the backend sends structured JSON, this could be parsed and styled nicer. - const isLatest = idx === messages.length - 1; - - return ( -
- {/* Dot */} -
- {isLatest && isConnected ? ( - - ) : ( - - )} -
- {/* Card */} -
-

Step {idx + 1}

-

{msg}

+ {/* Steps */} + {detail.steps.length > 0 && ( +
+

Steps

+
+ {detail.steps.map((step: WorkflowStep) => ( +
+ {stepStatusIcon(step.status)} + {step.step} + {step.name} + {step.node} +
+ ))}
- ) - })} -
+ )} + + {detail.steps.length === 0 && ( +
+ +

Workflow is being generated...

+
+ )} + + {/* SSE Logs */} + {logs.length > 0 && ( +
+

Live Logs

+
+ {logs.map((msg, idx) => ( +
+
+

{msg}

+
+ ))} +
+
+ )} + + {logs.length === 0 && sseConnected && isActive && ( +
Waiting for live events...
+ )} + + )}
); diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 1308952..cbd2e8c 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -67,6 +67,28 @@ export interface Workflow { status?: string; } +export interface WorkflowStep { + step: number; + name: string; + node: string; + action: string; + desc: string; + status: string; + agent_id?: string; +} + +export interface WorkflowDetail { + event_id: string; + workflow_title: string | null; + status: string; + command?: string; + current_step: number; + user_name: string; + message: string; + create_time: string; + steps: WorkflowStep[]; +} + // Workflow Template Validation export interface WorkStep { name: string; diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py index 1ff7ba1..e2d6cab 100644 --- a/pretor/api/workflow.py +++ b/pretor/api/workflow.py @@ -14,7 +14,7 @@ from pretor.utils.ray_hook import ray_actor_hook -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, HTTPException from fastapi.responses import StreamingResponse import asyncio @@ -26,6 +26,49 @@ async def get_workflow_list(): events = await global_state_machine.list_events.remote() return events + +@workflow_router.get("/{trace_id}") +async def get_workflow_detail(trace_id: str): + global_state_machine = ray_actor_hook("global_state_machine").global_state_machine + event = await global_state_machine.get_event.remote(trace_id) + if not event: + raise HTTPException(status_code=404, detail="Workflow not found") + + workflow = event.workflow + if not workflow: + return { + "event_id": trace_id, + "workflow_title": None, + "status": "waiting", + "user_name": event.user_name, + "message": event.message, + "create_time": event.create_time, + "steps": [], + } + + steps = [] + for step in workflow.work_link: + steps.append({ + "step": step.step, + "name": step.name, + "node": step.node, + "action": step.action, + "desc": step.desc, + "status": step.status, + "agent_id": step.agent_id, + }) + return { + "event_id": trace_id, + "workflow_title": workflow.title, + "status": workflow.status.status, + "command": workflow.command, + "current_step": workflow.status.step, + "user_name": event.user_name, + "message": event.message, + "create_time": event.create_time, + "steps": steps, + } + @workflow_router.get("/sse/{trace_id}") async def get_workflow_sse(trace_id: str, request: Request): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py index 5242354..3bbd9da 100644 --- a/pretor/core/workflow/workflow_runner.py +++ b/pretor/core/workflow/workflow_runner.py @@ -33,7 +33,6 @@ from pretor.core.individual.supervisory_node.template import TerminationMessage import pathlib - def get_workflow_template(workflow_name: str) -> str: workflow_template = pathlib.Path(__file__).parent.parent.parent / "workflow_template" / (workflow_name + "_workflow_template.json") with open(workflow_template, "r", encoding="utf-8") as workflow_template_file: @@ -59,6 +58,13 @@ class WorkflowEngine: """控制节点""" self.supervisory_node = supervisory_node """监督节点""" + self._gsm = ray_actor_hook("global_state_machine").global_state_machine + + async def _push_sse(self, msg: str) -> None: + try: + await self._gsm.put_pending.remote(self.workflow.trace_id, msg) + except Exception: + pass def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any: """ @@ -84,6 +90,7 @@ class WorkflowEngine: """ self.logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]") + await self._push_sse(f"[工作流启动] {self.workflow.title}") max_step = len(self.workflow.work_link) while 1 <= self.workflow.status.step <= max_step: current_step_id = self.workflow.status.step @@ -91,9 +98,11 @@ class WorkflowEngine: if not current_step: self.logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。") self.workflow.status.status = "failed" + await self._push_sse(f"[工作流失败] 找不到步骤 {current_step_id}") break self.logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}") current_step.status = "running" + await self._push_sse(f"[Step {current_step_id}] {current_step.name}: {current_step.desc}") try: step_input_data = self._prepare_inputs(current_step.inputs) step_result, is_success = await self._dispatch_to_node(current_step, step_input_data) @@ -102,24 +111,30 @@ class WorkflowEngine: self.workflow.context_memory[current_step.outputs] = step_result self.logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'") current_step.status = "completed" + await self._push_sse(f"[Step {current_step_id} 完成] {current_step.name}") else: self.logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。") current_step.status = "failed" + await self._push_sse(f"[Step {current_step_id} 失败] {current_step.name}") self._handle_logic_gate(current_step, is_success) except WorkflowExit: self.logger.info("命中 if_pass='exit',工作流被主动要求结束。") + await self._push_sse("[工作流结束] 主动退出") break except WorkflowError as e: self.logger.error(f"{e},终止工作流。") self.workflow.status.status = "failed" + await self._push_sse(f"[工作流失败] {e}") break except Exception as e: self.logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True) current_step.status = "failed" self.workflow.status.status = "failed" + await self._push_sse(f"[工作流异常] {e}") break self.logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。") self.workflow.output = self.workflow.context_memory + await self._push_sse(f"[工作流完成] {self.workflow.title}") await self._report_results() async def _report_results(self):