wip:完善workflow_runner

This commit is contained in:
朝夕 2026-04-11 15:53:03 +08:00
parent 6c5849f3d0
commit 2796c20f5e
4 changed files with 189 additions and 46 deletions

View File

@ -14,13 +14,17 @@
from typing import List, Optional, Union, Literal, Dict, Any
from pydantic import BaseModel, Field, model_validator
from pretor.core.protocol.demand_protocol import DemandProtocol
from pretor.utils.demand_protocol import DemandProtocol
NodeType = Literal[
"consciousness_node", "control_node", "supervisory_node",
"composite_individual", "primary_individual"
]
class EventInfo(BaseModel):
platform: str
username: str
class LogicGate(BaseModel):
if_fail: str = Field(..., description="失败跳转目标,如 'jump_to_step_1'")
if_pass: Literal["continue", "exit"] = Field(default="continue", description="成功后的动作")
@ -62,6 +66,7 @@ class PretorWorkflow(BaseModel):
command: Optional[str] = Field(default=None, description="触发此工作流的原始命令")
output: Dict[str, Any] = Field(default_factory=dict, description="工作流最终产出结果")
status: WorkflowStatus = Field(default_factory=WorkflowStatus, description="运行时状态对象")
event_info: EventInfo | None = Field(default_factory=None)
@model_validator(mode='after')
def validate_workflow_integrity(self) -> 'PretorWorkflow':

View File

@ -18,14 +18,42 @@ from pretor.core.workflow.workflow import PretorWorkflow, WorkStep
from loguru import logger
from typing import Optional, Dict, Union, Any, List
from pretor.utils.error import WorkflowError, WorkflowExit
from pretor.core.individual.control_node.template import ForWorkflowInput as ControlForWorkflowInput, \
ControlNodeResponse, ForWorkflow as ControlForWorkflow
from pretor.core.individual.consciousness_node.template import ForWorkflowInput as ConsciousnessForWorkflowInput, \
ForSupervisoryInput, ForSupervisoryNode, ForWorkflow as ConsciousnessForWorkflow
from pretor.core.individual.supervisory_node.template import TerminationMessage, ForUser
class WorkflowEngine:
def __init__(self, workflow: PretorWorkflow):
def __init__(self,
workflow: PretorWorkflow,
consciousness_node=None,
control_node=None,
supervisory_node=None):
self.workflow: PretorWorkflow = workflow
"""工作流当前WorkflowEngine待执行的workflow"""
self.context_memory: Dict[str, Any] = {}
"""上下文管理器当前workflow执行过程中的缓存"""
self._steps_by_id: Dict[int, WorkStep] = {step.step: step for step in self.workflow.work_link}
"""步骤表将当前workflow的步骤序号和步骤内容存放"""
self.consciousness_node = consciousness_node
"""意识节点"""
self.control_node = control_node
"""控制节点"""
self.supervisory_node = supervisory_node
"""监督节点"""
def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any:
"""
准备输入的方法
Args:
inputs: 待输入的名称
Returns:
"""
match inputs:
case None:
return None
@ -35,44 +63,34 @@ class WorkflowEngine:
return {k: self.context_memory.get(k) for k in names}
async def run(self):
"""
run方法
处理并执行workflow的方法
"""
logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]")
max_step = len(self.workflow.work_link)
# 核心调度循环:只要 step 在合法范围内,就一直执行
while 1 <= self.workflow.status.step <= max_step:
current_step_id = self.workflow.status.step
current_step = self._steps_by_id.get(current_step_id)
if not current_step:
logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。")
self.workflow.status.status = "failed"
break
logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}")
current_step.status = "running"
try:
# 1. 准备依赖数据
step_input_data = self._prepare_inputs(current_step.inputs)
# 2. 派发给具体的 Ray 节点 (这也是整个架构的连接点)
# 这个方法会通过 Ray 去调用真正的模型或工具,并等待结果
step_result, is_success = await self._dispatch_to_node(current_step, step_input_data)
if is_success:
# 3. 记录产出物到全局黑板
if current_step.outputs:
self.context_memory[current_step.outputs] = step_result
logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'")
current_step.status = "completed"
else:
logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。")
current_step.status = "failed"
# 4. 根据执行成功与否处理逻辑门跳转
self._handle_logic_gate(current_step, is_success)
except WorkflowExit:
logger.info("命中 if_pass='exit',工作流被主动要求结束。")
break
@ -81,50 +99,122 @@ class WorkflowEngine:
self.workflow.status.status = "failed"
break
except Exception as e:
# 捕获系统级崩溃 (例如 Ray Actor 断联、网络异常)
logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True)
current_step.status = "failed"
self.workflow.status.status = "failed"
# 发生未预期的崩溃,通常不再走业务 logic_gate而是直接中断
break
logger.info(f"✅ 工作流 {self.workflow.title} 运行结束。")
logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。")
self.workflow.output = self.context_memory
await self._report_results()
async def _report_results(self):
"""
结果汇报函数
在工作流结束后执行
Returns:
"""
if self.workflow.status.status == "failed":
logger.warning("工作流执行失败,跳过正常汇报流程。")
return
try:
logger.info("开始生成工作流结束技术报告...")
report = ""
if self.consciousness_node:
supervisory_input = ForSupervisoryInput(
workflow=self.workflow,
original_command=self.workflow.command or "未知命令"
)
report_obj = await self.consciousness_node.working.remote(supervisory_input)
if isinstance(report_obj, ForSupervisoryNode):
report = report_obj.output
elif isinstance(report_obj, str):
report = report_obj
logger.debug(f"生成的报告摘要: {report[:100]}...")
else:
logger.warning("未提供 consciousness_node 句柄,跳过报告生成。")
if self.supervisory_node:
term_msg = TerminationMessage(
platform=self.workflow.event_info.platform,
user_name=self.workflow.event_info.user_name,
message=f"工作流执行完毕。系统报告:{report}"
)
user_response = await self.supervisory_node.working.remote(term_msg)
self.context_memory["_final_user_response"] = user_response
logger.info(f"Supervisory 最终回复:{user_response}")
else:
logger.warning("未提供 supervisory_node 句柄,跳过用户反馈生成。")
except Exception as e:
logger.exception("生成工作流执行汇报时发生错误")
async def _dispatch_to_node(self, step: WorkStep, input_data: Any) -> tuple[Any, bool]:
"""
重要集成点这里是你将引擎与具体的 Actor (Consciousness, Control, Worker) 桥接的地方
你需要在这里利用 Ray 来调用其他 Actor
分流器
调用当前step的执行对象
Args:
step: WorkStep对象当前需要执行的step
input_data: 输入数据
Returns:
返回llm的输出和一个bool类型的判断
"""
logger.debug(f"正在向 {step.node} 节点发送动作 {step.action}...")
try:
if step.node == "control_node":
if not self.control_node:
raise WorkflowError("未提供 control_node 句柄!")
payload = ControlForWorkflowInput(workflow_step=step)
# 可选:如果 input_data 需要合并,可以扩展 ControlForWorkflowInput 或将其放在 context_memory
result_obj = await self.control_node.working.remote(payload)
if isinstance(result_obj, ControlForWorkflow):
return result_obj.output, True
return result_obj, True
# 伪代码示例:你可以根据节点类型,获取对应的 Actor 句柄并调用
# if step.node == "control_node":
# result = await global_control_actor.execute.remote(step.action, step.desc, input_data)
# return result, True
# elif step.node == "primary_individual":
# # 可能是个 1B 写代码模型
# worker = get_worker_actor(step.action)
# result = await worker.run.remote(step.desc, input_data)
# return result, True
elif step.node == "consciousness_node":
if not self.consciousness_node:
raise WorkflowError("未提供 consciousness_node 句柄!")
# 这里将 command 作为 original_command可根据业务调整
original_cmd = self.workflow.command or ""
payload = ConsciousnessForWorkflowInput(
workflow_step=step,
original_command=original_cmd
)
result_obj = await self.consciousness_node.working.remote(payload)
if isinstance(result_obj, ConsciousnessForWorkflow):
return result_obj.output, True
return result_obj, True
elif step.node in ["primary_individual", "composite_individual"]:
logger.warning(f"当前节点 {step.node} 暂未实现完整调度支持,这里将模拟执行。")
await asyncio.sleep(1)
simulated_result = f"这是 {step.action} 动作产生的模拟结果"
is_success = True
return simulated_result, is_success
simulated_result = f"这是 {step.node} 执行 {step.action} 产生的模拟结果 (输入: {input_data})"
return simulated_result, True
else:
raise WorkflowError(f"未知的节点类型:{step.node}")
except Exception as e:
logger.exception(f"节点 {step.node} 执行动作 {step.action} 失败")
return None, False
def _handle_logic_gate(self, step: WorkStep, is_success: bool):
"""处理逻辑门跳转,修改状态机指针"""
gate = step.logic_gate
"""
状态机检测任务执行情况
Args:
step: WorkStep对象当前执行的step
is_success: bool类型当前步骤是否成功
"""
gate = step.logic_gate
if is_success:
if gate and gate.if_pass == "exit":
raise WorkflowExit()
self.workflow.status.step += 1 # 默认成功则步数 +1继续下一步
self.workflow.status.step += 1
else:
if not gate or not gate.if_fail:
raise WorkflowError(f"步骤 {step.step} 失败且未配置 if_fail 兜底方案")
match gate.if_fail.split("_"):
case ["jump", "to", "step", target] if target.isdigit():
target_step = int(target)
@ -134,12 +224,14 @@ class WorkflowEngine:
raise WorkflowError(f"未知的 if_fail 格式: {gate.if_fail}")
@ray.remote
class WorkflowRunningEngine:
def __init__(self):
def __init__(self, consciousness_node=None, control_node=None, supervisory_node=None):
self.runner_engine = {}
self.workflow_queue: asyncio.Queue[PretorWorkflow] = None
self.consciousness_node = consciousness_node
self.control_node = control_node
self.supervisory_node = supervisory_node
async def run(self):
self.runner_engine = {
@ -148,16 +240,25 @@ class WorkflowRunningEngine:
}
self.workflow_queue = asyncio.Queue()
async def runner(self, i: int):
async def runner(self, i: int) -> None:
"""
runner方法从self.workflow_queue中不断取出任务并执行
Args:
i: runner序列号
"""
while True:
try:
workflow = await self.workflow_queue.get()
logger.info(f"WorkflowRunningEngine: runner_{i}接收工作流{workflow.trace_id}:{workflow.title}")
workflow_engine = WorkflowEngine(workflow)
workflow_engine = WorkflowEngine(workflow,
self.consciousness_node,
self.control_node,
self.supervisory_node)
await workflow_engine.run()
pass
except asyncio.CancelledError:
logger.info(f"WorkflowRunningEngine: runner_{i} 被取消。")
raise
except Exception as e:
logger.error(f"WorkflowRunningEngine: runner_{i} 遇到未捕获的异常: {e}", exc_info=True)

View File

@ -0,0 +1,37 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Any, Literal
from pydantic import BaseModel, Field
# --- 1. 给 Individual (LLM/Agent) 的具体需求 ---
class IndividualDemand(BaseModel):
role_prompt: str = Field(..., description="赋予该个体的角色定义")
task_goal: str = Field(..., description="该个体的具体执行目标")
expected_output: str = Field(..., description="期望产出的数据结构或格式描述")
# --- 2. 给 Tool (插件/函数调用) 的具体需求 ---
class ToolDemand(BaseModel):
method: str = Field(..., description="插件调用的具体方法名")
args: Dict[str, Any] = Field(default_factory=dict, description="传递给插件的参数")
# --- 3. 给 System (系统/物理资源) 的具体需求 ---
class SystemDemand(BaseModel):
operation: Literal["allocate_resource", "docker_manage", "file_io", "network"]
params: Dict[str, Any] = Field(..., description="操作所需的物理参数,如 GPU 核心数、路径等")
# --- 4. 统一需求入口 (裁判官协议体) ---
class DemandProtocol(BaseModel):
variety: Literal["individual", "tool", "system"]
name: str = Field(..., description="目标名称python_expert, pytest_tool, docker_engine")