feat: 修复了一些bug

This commit is contained in:
朝夕 2026-04-28 21:39:04 +08:00
parent 478b0813c1
commit 3e0f377004
5 changed files with 245 additions and 81 deletions

View File

@ -144,7 +144,7 @@ export function LeftPanel({ activeTab, setActiveTab, selectedWorkflow, setSelect
> >
<div className="flex justify-between items-center mb-1"> <div className="flex justify-between items-center mb-1">
<span className={`font-medium text-sm ${selectedWorkflow === wf.event_id ? 'text-blue-700' : 'text-slate-700'}`}>{wf.workflow_title || 'Unnamed Workflow'}</span> <span className={`font-medium text-sm ${selectedWorkflow === wf.event_id ? 'text-blue-700' : 'text-slate-700'}`}>{wf.workflow_title || 'Unnamed Workflow'}</span>
<span className={`flex h-2 w-2 rounded-full ${wf.status === 'running' ? 'bg-green-400 animate-pulse' : 'bg-slate-300'}`}></span> <span className={`flex h-2 w-2 rounded-full ${wf.status === 'llm_working' || wf.status === 'tool_working' ? 'bg-green-400 animate-pulse' : wf.status === 'failed' ? 'bg-red-400' : 'bg-slate-300'}`}></span>
</div> </div>
<p className="text-xs text-slate-500 font-mono line-clamp-1">ID: {wf.event_id}</p> <p className="text-xs text-slate-500 font-mono line-clamp-1">ID: {wf.event_id}</p>
</div> </div>

View File

@ -1,68 +1,91 @@
import { useState, useEffect } from 'react'; import { useState, useEffect, useRef } from 'react';
import { Terminal, Activity } from 'lucide-react'; import { Terminal, Activity, RefreshCw, CheckCircle2, Circle, XCircle, Clock, Loader2 } from 'lucide-react';
import apiClient from '../../api/client';
import type { WorkflowDetail, WorkflowStep } from '../../types';
interface RightPanelProps { interface RightPanelProps {
selectedWorkflow: string | null; selectedWorkflow: string | null;
} }
function stepStatusIcon(status: string) {
switch (status) {
case 'completed':
return <CheckCircle2 size={14} className="text-green-500" />;
case 'running':
return <Loader2 size={14} className="text-blue-500 animate-spin" />;
case 'failed':
return <XCircle size={14} className="text-red-500" />;
default:
return <Circle size={14} className="text-slate-300" />;
}
}
export function RightPanel({ selectedWorkflow }: RightPanelProps) { export function RightPanel({ selectedWorkflow }: RightPanelProps) {
const [messages, setMessages] = useState<string[]>([]); const [detail, setDetail] = useState<WorkflowDetail | null>(null);
const [isConnected, setIsConnected] = useState(false); const [loading, setLoading] = useState(false);
const [logs, setLogs] = useState<string[]>([]);
const [sseConnected, setSseConnected] = useState(false);
const eventSourceRef = useRef<EventSource | null>(null);
const fetchDetail = async (traceId: string) => {
setLoading(true);
setLogs([]);
try {
const response = await apiClient.get(`/api/v1/workflow/${traceId}`);
setDetail(response.data);
} catch {
setDetail(null);
} finally {
setLoading(false);
}
};
useEffect(() => { useEffect(() => {
if (!selectedWorkflow) { if (!selectedWorkflow) {
// eslint-disable-next-line react-hooks/set-state-in-effect setDetail(null);
setMessages([]); setLogs([]);
return; return;
} }
let eventSource: EventSource | null = null; fetchDetail(selectedWorkflow);
const connect = () => { const protocol = window.location.protocol;
const protocol = window.location.protocol; const host = window.location.host;
const host = window.location.host; const apiBase = import.meta.env.VITE_API_BASE_URL || `${protocol}//${host}`;
const es = new EventSource(`${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`);
eventSourceRef.current = es;
const apiBase = import.meta.env.VITE_API_BASE_URL || `${protocol}//${host}`; es.onopen = () => {
setSseConnected(true);
// Using the workflow router SSE endpoint
eventSource = new EventSource(`${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`);
eventSource.onopen = () => {
setIsConnected(true);
setMessages([]); // clear previous traces
};
eventSource.onmessage = (event) => {
try {
setMessages(prev => [...prev, event.data]);
} catch (e) {
console.error("Error receiving workflow SSE message", e);
}
};
eventSource.onerror = (error) => {
console.error("EventSource failed.", error);
setIsConnected(false);
// EventSource automatically attempts to reconnect, so we can just let it be,
// or we could close it if we wanted to handle retries manually.
};
}; };
connect(); es.onmessage = (event) => {
setLogs(prev => [...prev, event.data]);
};
es.onerror = () => {
setSseConnected(false);
};
const interval = setInterval(() => {
fetchDetail(selectedWorkflow);
}, 3000);
return () => { return () => {
if (eventSource) { es.close();
eventSource.close(); eventSourceRef.current = null;
} clearInterval(interval);
}; };
}, [selectedWorkflow]); }, [selectedWorkflow]);
const isActive = detail?.status === 'llm_working' || detail?.status === 'tool_working';
if (!selectedWorkflow) { if (!selectedWorkflow) {
return ( return (
<div className="w-80 bg-white border-l border-slate-200 flex flex-col z-0 justify-center items-center p-6 text-center"> <div className="w-80 bg-white border-l border-slate-200 flex flex-col z-0 justify-center items-center p-6 text-center">
<Activity size={32} className="text-slate-300 mb-4" /> <Activity size={32} className="text-slate-300 mb-4" />
<h3 className="text-sm font-semibold text-slate-600">No Workflow Selected</h3> <h3 className="text-sm font-semibold text-slate-600">No Workflow Selected</h3>
<p className="text-xs text-slate-400 mt-2">Select a workflow from the left panel to view its execution trace.</p> <p className="text-xs text-slate-400 mt-2">Select a workflow from the left panel to view its details.</p>
</div> </div>
); );
} }
@ -72,50 +95,111 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) {
<div className="h-14 border-b border-slate-100 flex items-center px-4 justify-between bg-slate-50/50"> <div className="h-14 border-b border-slate-100 flex items-center px-4 justify-between bg-slate-50/50">
<h2 className="font-semibold text-slate-800 text-sm flex items-center"> <h2 className="font-semibold text-slate-800 text-sm flex items-center">
<Terminal size={16} className="mr-2 text-slate-500" /> <Terminal size={16} className="mr-2 text-slate-500" />
Execution Trace Workflow Detail
</h2> </h2>
<span className={`px-2 py-1 text-xs rounded-md font-medium border ${isConnected ? 'bg-green-100 text-green-700 border-green-200' : 'bg-slate-100 text-slate-500 border-slate-200'}`}> <div className="flex items-center gap-2">
{isConnected ? 'Running' : 'Disconnected'} <span className={`px-2 py-1 text-xs rounded-md font-medium border ${sseConnected ? 'bg-green-100 text-green-700 border-green-200' : 'bg-slate-100 text-slate-500 border-slate-200'}`}>
</span> {sseConnected ? 'Live' : '--'}
</span>
<button
onClick={() => selectedWorkflow && fetchDetail(selectedWorkflow)}
className="p-1 text-slate-400 hover:text-blue-600 rounded transition-colors"
title="Refresh"
>
<RefreshCw size={14} />
</button>
</div>
</div> </div>
<div className="flex-1 p-4 overflow-y-auto"> <div className="flex-1 p-4 overflow-y-auto">
<div className="mb-6"> {loading && !detail ? (
<h3 className="text-lg font-bold text-slate-800 mb-1">Workflow Logs</h3> <div className="text-center text-slate-400 text-sm py-8">
<p className="text-xs text-slate-500 mb-4 font-mono">ID: {selectedWorkflow}</p> <Loader2 size={24} className="animate-spin mx-auto mb-2" />
</div> Loading...
</div>
) : !detail ? (
<div className="text-center text-slate-400 text-sm py-8">Failed to load workflow details</div>
) : (
<>
{/* Header */}
<div className="mb-4">
<h3 className="text-base font-bold text-slate-800">
{detail.workflow_title || 'Workflow'}
</h3>
<p className="text-xs text-slate-500 font-mono mt-1">ID: {detail.event_id}</p>
{detail.command && (
<p className="text-xs text-slate-500 mt-1 truncate">Command: {detail.command}</p>
)}
<div className="flex items-center gap-2 mt-2">
<span className={`text-xs px-2 py-0.5 rounded-full font-medium ${
detail.status === 'failed' ? 'bg-red-100 text-red-700' :
isActive ? 'bg-blue-100 text-blue-700' :
detail.status === 'waiting_llm_working' || detail.status === 'waiting_tool_working' ? 'bg-yellow-100 text-yellow-700' :
'bg-green-100 text-green-700'
}`}>
{detail.status}
</span>
<span className="text-xs text-slate-400">
Step {detail.current_step}/{detail.steps.length}
</span>
</div>
</div>
{/* Dynamic Log Rendering */} {/* Steps */}
<div className="relative border-l-2 border-slate-200 ml-3 pl-6 space-y-6"> {detail.steps.length > 0 && (
<div className="mb-4">
{messages.length === 0 && isConnected && ( <h4 className="text-xs font-semibold text-slate-500 uppercase tracking-wider mb-2">Steps</h4>
<div className="text-xs text-slate-400 italic">Waiting for workflow events...</div> <div className="space-y-1.5">
)} {detail.steps.map((step: WorkflowStep) => (
<div
{messages.map((msg, idx) => { key={step.step}
// For now, simply parsing the raw text as a basic log entry. className={`flex items-center gap-2 px-2.5 py-1.5 rounded-md text-xs border ${
// If the backend sends structured JSON, this could be parsed and styled nicer. step.step === detail.current_step && isActive
const isLatest = idx === messages.length - 1; ? 'border-blue-200 bg-blue-50'
: step.status === 'completed'
return ( ? 'border-green-100 bg-green-50/50'
<div key={idx} className="relative"> : step.status === 'failed'
{/* Dot */} ? 'border-red-100 bg-red-50/50'
<div className={`absolute -left-[31px] top-1 flex items-center justify-center w-6 h-6 rounded-full border-2 border-white shadow-sm ${isLatest && isConnected ? 'bg-blue-500' : 'bg-green-500 text-white'}`}> : 'border-slate-100 bg-white'
{isLatest && isConnected ? ( }`}
<span className="h-2 w-2 bg-white rounded-full animate-pulse"></span> >
) : ( {stepStatusIcon(step.status)}
<svg className="w-3 h-3" fill="none" viewBox="0 0 24 24" stroke="currentColor"><path strokeLinecap="round" strokeLinejoin="round" strokeWidth="3" d="M5 13l4 4L19 7"></path></svg> <span className="font-medium text-slate-700 w-5 text-right">{step.step}</span>
)} <span className="text-slate-600 truncate flex-1">{step.name}</span>
</div> <span className="text-slate-400 text-[10px]">{step.node}</span>
{/* Card */} </div>
<div className={`p-3 rounded-lg border shadow-sm ${isLatest && isConnected ? 'border-blue-200 bg-blue-50' : 'border-slate-100 bg-white'}`}> ))}
<h4 className={`font-semibold text-xs mb-1 ${isLatest && isConnected ? 'text-blue-800' : 'text-slate-800'}`}>Step {idx + 1}</h4>
<p className={`text-[11px] font-mono break-all ${isLatest && isConnected ? 'text-blue-600' : 'text-slate-500'}`}>{msg}</p>
</div> </div>
</div> </div>
) )}
})}
</div> {detail.steps.length === 0 && (
<div className="text-center py-4">
<Clock size={24} className="text-slate-300 mx-auto mb-2" />
<p className="text-xs text-slate-400">Workflow is being generated...</p>
</div>
)}
{/* SSE Logs */}
{logs.length > 0 && (
<div>
<h4 className="text-xs font-semibold text-slate-500 uppercase tracking-wider mb-2">Live Logs</h4>
<div className="relative border-l-2 border-slate-200 ml-3 pl-5 space-y-3">
{logs.map((msg, idx) => (
<div key={idx} className="relative">
<div className={`absolute -left-[27px] top-1 w-3 h-3 rounded-full border-2 border-white shadow-sm ${idx === logs.length - 1 && sseConnected ? 'bg-blue-500 animate-pulse' : 'bg-green-500'}`} />
<p className="text-[11px] font-mono text-slate-600 leading-relaxed break-all">{msg}</p>
</div>
))}
</div>
</div>
)}
{logs.length === 0 && sseConnected && isActive && (
<div className="text-xs text-slate-400 italic mt-2">Waiting for live events...</div>
)}
</>
)}
</div> </div>
</div> </div>
); );

View File

@ -67,6 +67,28 @@ export interface Workflow {
status?: string; status?: string;
} }
export interface WorkflowStep {
step: number;
name: string;
node: string;
action: string;
desc: string;
status: string;
agent_id?: string;
}
export interface WorkflowDetail {
event_id: string;
workflow_title: string | null;
status: string;
command?: string;
current_step: number;
user_name: string;
message: string;
create_time: string;
steps: WorkflowStep[];
}
// Workflow Template Validation // Workflow Template Validation
export interface WorkStep { export interface WorkStep {
name: string; name: string;

View File

@ -14,7 +14,7 @@
from pretor.utils.ray_hook import ray_actor_hook from pretor.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Request from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
import asyncio import asyncio
@ -26,6 +26,49 @@ async def get_workflow_list():
events = await global_state_machine.list_events.remote() events = await global_state_machine.list_events.remote()
return events return events
@workflow_router.get("/{trace_id}")
async def get_workflow_detail(trace_id: str):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
event = await global_state_machine.get_event.remote(trace_id)
if not event:
raise HTTPException(status_code=404, detail="Workflow not found")
workflow = event.workflow
if not workflow:
return {
"event_id": trace_id,
"workflow_title": None,
"status": "waiting",
"user_name": event.user_name,
"message": event.message,
"create_time": event.create_time,
"steps": [],
}
steps = []
for step in workflow.work_link:
steps.append({
"step": step.step,
"name": step.name,
"node": step.node,
"action": step.action,
"desc": step.desc,
"status": step.status,
"agent_id": step.agent_id,
})
return {
"event_id": trace_id,
"workflow_title": workflow.title,
"status": workflow.status.status,
"command": workflow.command,
"current_step": workflow.status.step,
"user_name": event.user_name,
"message": event.message,
"create_time": event.create_time,
"steps": steps,
}
@workflow_router.get("/sse/{trace_id}") @workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(trace_id: str, request: Request): async def get_workflow_sse(trace_id: str, request: Request):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine global_state_machine = ray_actor_hook("global_state_machine").global_state_machine

View File

@ -33,7 +33,6 @@ from pretor.core.individual.supervisory_node.template import TerminationMessage
import pathlib import pathlib
def get_workflow_template(workflow_name: str) -> str: def get_workflow_template(workflow_name: str) -> str:
workflow_template = pathlib.Path(__file__).parent.parent.parent / "workflow_template" / (workflow_name + "_workflow_template.json") workflow_template = pathlib.Path(__file__).parent.parent.parent / "workflow_template" / (workflow_name + "_workflow_template.json")
with open(workflow_template, "r", encoding="utf-8") as workflow_template_file: with open(workflow_template, "r", encoding="utf-8") as workflow_template_file:
@ -59,6 +58,13 @@ class WorkflowEngine:
"""控制节点""" """控制节点"""
self.supervisory_node = supervisory_node self.supervisory_node = supervisory_node
"""监督节点""" """监督节点"""
self._gsm = ray_actor_hook("global_state_machine").global_state_machine
async def _push_sse(self, msg: str) -> None:
try:
await self._gsm.put_pending.remote(self.workflow.trace_id, msg)
except Exception:
pass
def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any: def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any:
""" """
@ -84,6 +90,7 @@ class WorkflowEngine:
""" """
self.logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]") self.logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]")
await self._push_sse(f"[工作流启动] {self.workflow.title}")
max_step = len(self.workflow.work_link) max_step = len(self.workflow.work_link)
while 1 <= self.workflow.status.step <= max_step: while 1 <= self.workflow.status.step <= max_step:
current_step_id = self.workflow.status.step current_step_id = self.workflow.status.step
@ -91,9 +98,11 @@ class WorkflowEngine:
if not current_step: if not current_step:
self.logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。") self.logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。")
self.workflow.status.status = "failed" self.workflow.status.status = "failed"
await self._push_sse(f"[工作流失败] 找不到步骤 {current_step_id}")
break break
self.logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}") self.logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}")
current_step.status = "running" current_step.status = "running"
await self._push_sse(f"[Step {current_step_id}] {current_step.name}: {current_step.desc}")
try: try:
step_input_data = self._prepare_inputs(current_step.inputs) step_input_data = self._prepare_inputs(current_step.inputs)
step_result, is_success = await self._dispatch_to_node(current_step, step_input_data) step_result, is_success = await self._dispatch_to_node(current_step, step_input_data)
@ -102,24 +111,30 @@ class WorkflowEngine:
self.workflow.context_memory[current_step.outputs] = step_result self.workflow.context_memory[current_step.outputs] = step_result
self.logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'") self.logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'")
current_step.status = "completed" current_step.status = "completed"
await self._push_sse(f"[Step {current_step_id} 完成] {current_step.name}")
else: else:
self.logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。") self.logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。")
current_step.status = "failed" current_step.status = "failed"
await self._push_sse(f"[Step {current_step_id} 失败] {current_step.name}")
self._handle_logic_gate(current_step, is_success) self._handle_logic_gate(current_step, is_success)
except WorkflowExit: except WorkflowExit:
self.logger.info("命中 if_pass='exit',工作流被主动要求结束。") self.logger.info("命中 if_pass='exit',工作流被主动要求结束。")
await self._push_sse("[工作流结束] 主动退出")
break break
except WorkflowError as e: except WorkflowError as e:
self.logger.error(f"{e},终止工作流。") self.logger.error(f"{e},终止工作流。")
self.workflow.status.status = "failed" self.workflow.status.status = "failed"
await self._push_sse(f"[工作流失败] {e}")
break break
except Exception as e: except Exception as e:
self.logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True) self.logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True)
current_step.status = "failed" current_step.status = "failed"
self.workflow.status.status = "failed" self.workflow.status.status = "failed"
await self._push_sse(f"[工作流异常] {e}")
break break
self.logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。") self.logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。")
self.workflow.output = self.workflow.context_memory self.workflow.output = self.workflow.context_memory
await self._push_sse(f"[工作流完成] {self.workflow.title}")
await self._report_results() await self._report_results()
async def _report_results(self): async def _report_results(self):