From 478b0813c192a905d93bdfacec6a151891e1741b Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Tue, 28 Apr 2026 21:04:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8D=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=BA=9Bbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pretor/api/workflow.py | 6 ++++++ .../global_state_machine/global_state_machine.py | 14 ++++++++++++++ .../consciousness_node/consciousness_node.py | 2 +- pretor/core/workflow/workflow.py | 10 ++-------- pretor/core/workflow/workflow_runner.py | 8 +++----- 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py index 0eb6ef4..1ff7ba1 100644 --- a/pretor/api/workflow.py +++ b/pretor/api/workflow.py @@ -20,6 +20,12 @@ import asyncio workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) +@workflow_router.get("/list") +async def get_workflow_list(): + global_state_machine = ray_actor_hook("global_state_machine").global_state_machine + events = await global_state_machine.list_events.remote() + return events + @workflow_router.get("/sse/{trace_id}") async def get_workflow_sse(trace_id: str, request: Request): global_state_machine = ray_actor_hook("global_state_machine").global_state_machine diff --git a/pretor/core/global_state_machine/global_state_machine.py b/pretor/core/global_state_machine/global_state_machine.py index 2af8477..acb8479 100644 --- a/pretor/core/global_state_machine/global_state_machine.py +++ b/pretor/core/global_state_machine/global_state_machine.py @@ -139,6 +139,20 @@ class GlobalStateMachine: def get_workflow(self, trace_id: str) -> PretorWorkflow: return self.event_dict[trace_id].workflow + def list_events(self) -> list[dict]: + result = [] + for trace_id, event in self.event_dict.items(): + workflow_title = event.workflow.title if event.workflow else None + workflow_status = event.workflow.status.status if event.workflow and event.workflow.status else None + result.append({ + "event_id": trace_id, + "workflow_title": workflow_title, + "status": workflow_status, + "user_name": event.user_name, + "message": event.message, + }) + return result + async def put_pending(self, trace_id, item) -> None: await self.event_dict[trace_id].pending_queue.put(item) diff --git a/pretor/core/individual/consciousness_node/consciousness_node.py b/pretor/core/individual/consciousness_node/consciousness_node.py index b7b3455..04dab6f 100644 --- a/pretor/core/individual/consciousness_node/consciousness_node.py +++ b/pretor/core/individual/consciousness_node/consciousness_node.py @@ -80,7 +80,7 @@ class ConsciousnessNode: prompt += f"- 选定工作流模板 (Workflow Template): {ctx.deps.workflow_template}\n" if ctx.deps.available_skills: prompt += "\n=== 当前可用 Skill Individual ===\n" - prompt += "你可以直接将以下 Skill Individual 安排进工作流的步骤中(设置 node 为 composite_individual 或 primary_individual,并将 agent_id 设置为对应的 Skill Individual 名称),作为可调用的工具。\n" + prompt += "你可以直接将以下 Skill Individual 安排进工作流的步骤中(设置 node 为 skill_individual,并将 agent_id 设置为对应的 Skill Individual 名称),作为可调用的工具。\n" for skill in ctx.deps.available_skills: prompt += f"- 名称: {skill['name']}\n 描述: {skill['description']}\n" diff --git a/pretor/core/workflow/workflow.py b/pretor/core/workflow/workflow.py index 584a9a0..f3d29ce 100644 --- a/pretor/core/workflow/workflow.py +++ b/pretor/core/workflow/workflow.py @@ -18,8 +18,7 @@ from pydantic import BaseModel, Field, model_validator from pretor.utils.logger import get_logger logger = get_logger('workflow') NodeType = Literal[ - "consciousness_node", "control_node", "supervisory_node", - "composite_individual", "primary_individual" + "consciousness_node", "control_node", "supervisory_node", "skill_individual" ] class EventInfo(BaseModel): @@ -38,17 +37,13 @@ class WorkStep(BaseModel): desc: str = Field(..., description="动作细节的自然语言描述,包含人工规范指导") inputs: Optional[Union[str, List[str]]] = Field(default=None, description="前置依赖输出") outputs: Optional[str] = Field(default=None, description="当前步骤产出物变量名") + agent_id: Optional[str] = Field(default=None, description="分配给 skill_individual 的 Skill Individual 名称") logic_gate: Optional[LogicGate] = Field(default=None, description="逻辑跳转控制") status: Literal["waiting", "running", "completed", "failed"] = Field( default="waiting", description="执行状态 (LLM建议保留默认值)" ) -class WorkerGroup(BaseModel): - name: str = Field(..., description="工作组名称,如 'coding_squad'") - primary_individual: Dict[str, int] = Field(..., description="基础子个体配置,例如 {'coder': 2, 'tester': 1}") - composite_individual: Dict[str, int] = Field(..., description="复合子个体配置,例如 {'code_reviewer': 1}") - class WorkflowStatus(BaseModel): step: int = Field(default=1, gt=0, description="当前运行到的工作流步数") @@ -59,7 +54,6 @@ class WorkflowStatus(BaseModel): class PretorWorkflow(BaseModel): title: str = Field(..., description="工作流的标题") - workgroup_list: List[WorkerGroup] = Field(..., description="工作组资源编排列表") work_link: List[WorkStep] = Field(..., description="工作链逻辑定义") # ---------------- 以下为系统级管控字段,LLM 无需关心 ---------------- # trace_id: str | None = Field(description="系统自动生成的追溯ID") diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py index f089913..5242354 100644 --- a/pretor/core/workflow/workflow_runner.py +++ b/pretor/core/workflow/workflow_runner.py @@ -199,15 +199,13 @@ class WorkflowEngine: return result_obj.output, True return result_obj, True - elif step.node in ["primary_individual", "composite_individual"]: - self.logger.info(f"正在通过 WorkerCluster 调度 {step.node} 的 {step.action} 动作。") + elif step.node == "skill_individual": + self.logger.info(f"正在通过 WorkerCluster 调度 skill_individual 执行 {step.action}。") try: from pretor.utils.ray_hook import ray_actor_hook worker_cluster = ray_actor_hook("worker_cluster").worker_cluster task_id = f"{self.workflow.trace_id}_step_{step.step}" - agent_id = getattr(step, 'agent_id', f"default_{step.node}") - if isinstance(input_data, dict) and "agent_id" in input_data: - agent_id = input_data.get("agent_id") + agent_id = step.agent_id or f"default_{step.node}" task_event = { "action": step.action, "description": step.desc,