diff --git a/pretor/core/workflow/workflow.py b/pretor/core/workflow/workflow.py index 1a0c550..80f68d7 100644 --- a/pretor/core/workflow/workflow.py +++ b/pretor/core/workflow/workflow.py @@ -14,13 +14,17 @@ from typing import List, Optional, Union, Literal, Dict, Any from pydantic import BaseModel, Field, model_validator -from pretor.core.protocol.demand_protocol import DemandProtocol +from pretor.utils.demand_protocol import DemandProtocol NodeType = Literal[ "consciousness_node", "control_node", "supervisory_node", "composite_individual", "primary_individual" ] +class EventInfo(BaseModel): + platform: str + username: str + class LogicGate(BaseModel): if_fail: str = Field(..., description="失败跳转目标,如 'jump_to_step_1'") if_pass: Literal["continue", "exit"] = Field(default="continue", description="成功后的动作") @@ -62,6 +66,7 @@ class PretorWorkflow(BaseModel): command: Optional[str] = Field(default=None, description="触发此工作流的原始命令") output: Dict[str, Any] = Field(default_factory=dict, description="工作流最终产出结果") status: WorkflowStatus = Field(default_factory=WorkflowStatus, description="运行时状态对象") + event_info: EventInfo | None = Field(default_factory=None) @model_validator(mode='after') def validate_workflow_integrity(self) -> 'PretorWorkflow': diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py index 892d35c..8fe27a3 100644 --- a/pretor/core/workflow/workflow_runner.py +++ b/pretor/core/workflow/workflow_runner.py @@ -18,14 +18,42 @@ from pretor.core.workflow.workflow import PretorWorkflow, WorkStep from loguru import logger from typing import Optional, Dict, Union, Any, List from pretor.utils.error import WorkflowError, WorkflowExit +from pretor.core.individual.control_node.template import ForWorkflowInput as ControlForWorkflowInput, \ + ControlNodeResponse, ForWorkflow as ControlForWorkflow +from pretor.core.individual.consciousness_node.template import ForWorkflowInput as ConsciousnessForWorkflowInput, \ + ForSupervisoryInput, ForSupervisoryNode, ForWorkflow as ConsciousnessForWorkflow +from pretor.core.individual.supervisory_node.template import TerminationMessage, ForUser + class WorkflowEngine: - def __init__(self, workflow: PretorWorkflow): + def __init__(self, + workflow: PretorWorkflow, + consciousness_node=None, + control_node=None, + supervisory_node=None): self.workflow: PretorWorkflow = workflow + """工作流:当前WorkflowEngine待执行的workflow""" self.context_memory: Dict[str, Any] = {} + """上下文管理器:当前workflow执行过程中的缓存""" self._steps_by_id: Dict[int, WorkStep] = {step.step: step for step in self.workflow.work_link} + """步骤表:将当前workflow的步骤序号和步骤内容存放""" + + self.consciousness_node = consciousness_node + """意识节点""" + self.control_node = control_node + """控制节点""" + self.supervisory_node = supervisory_node + """监督节点""" def _prepare_inputs(self, inputs: Optional[Union[str, List[str]]]) -> Any: + """ + 准备输入的方法 + Args: + inputs: 待输入的名称 + + Returns: + + """ match inputs: case None: return None @@ -35,44 +63,34 @@ class WorkflowEngine: return {k: self.context_memory.get(k) for k in names} async def run(self): + """ + run方法 + 处理并执行workflow的方法 + + """ logger.info(f"🚀 工作流引擎启动: {self.workflow.title} [Trace ID: {self.workflow.trace_id}]") max_step = len(self.workflow.work_link) - - # 核心调度循环:只要 step 在合法范围内,就一直执行 while 1 <= self.workflow.status.step <= max_step: current_step_id = self.workflow.status.step current_step = self._steps_by_id.get(current_step_id) - if not current_step: logger.error(f"严重错误:找不到步骤 {current_step_id},工作流强制终止。") self.workflow.status.status = "failed" break - logger.info(f"▶️ 开始执行 Step {current_step_id}: [{current_step.node}] -> {current_step.action}") current_step.status = "running" - try: - # 1. 准备依赖数据 step_input_data = self._prepare_inputs(current_step.inputs) - - # 2. 派发给具体的 Ray 节点 (这也是整个架构的连接点) - # 这个方法会通过 Ray 去调用真正的模型或工具,并等待结果 step_result, is_success = await self._dispatch_to_node(current_step, step_input_data) - if is_success: - # 3. 记录产出物到全局黑板 if current_step.outputs: self.context_memory[current_step.outputs] = step_result logger.debug(f"Step {current_step_id} 产出已保存至变量: '{current_step.outputs}'") - current_step.status = "completed" else: logger.warning(f"Step {current_step_id} 执行遇到业务失败/驳回。") current_step.status = "failed" - - # 4. 根据执行成功与否处理逻辑门跳转 self._handle_logic_gate(current_step, is_success) - except WorkflowExit: logger.info("命中 if_pass='exit',工作流被主动要求结束。") break @@ -81,50 +99,122 @@ class WorkflowEngine: self.workflow.status.status = "failed" break except Exception as e: - # 捕获系统级崩溃 (例如 Ray Actor 断联、网络异常) logger.error(f"❌ Step {current_step_id} 发生系统级未捕获异常: {e}", exc_info=True) current_step.status = "failed" self.workflow.status.status = "failed" - # 发生未预期的崩溃,通常不再走业务 logic_gate,而是直接中断 break - - logger.info(f"✅ 工作流 {self.workflow.title} 运行结束。") + logger.info(f"✅ 工作流 {self.workflow.title} 执行步骤结束。") self.workflow.output = self.context_memory + await self._report_results() + + async def _report_results(self): + """ + 结果汇报函数 + 在工作流结束后执行 + Returns: + + """ + if self.workflow.status.status == "failed": + logger.warning("工作流执行失败,跳过正常汇报流程。") + return + try: + logger.info("开始生成工作流结束技术报告...") + report = "" + if self.consciousness_node: + supervisory_input = ForSupervisoryInput( + workflow=self.workflow, + original_command=self.workflow.command or "未知命令" + ) + report_obj = await self.consciousness_node.working.remote(supervisory_input) + if isinstance(report_obj, ForSupervisoryNode): + report = report_obj.output + elif isinstance(report_obj, str): + report = report_obj + logger.debug(f"生成的报告摘要: {report[:100]}...") + else: + logger.warning("未提供 consciousness_node 句柄,跳过报告生成。") + + if self.supervisory_node: + term_msg = TerminationMessage( + platform=self.workflow.event_info.platform, + user_name=self.workflow.event_info.user_name, + message=f"工作流执行完毕。系统报告:{report}" + ) + user_response = await self.supervisory_node.working.remote(term_msg) + self.context_memory["_final_user_response"] = user_response + logger.info(f"Supervisory 最终回复:{user_response}") + else: + logger.warning("未提供 supervisory_node 句柄,跳过用户反馈生成。") + except Exception as e: + logger.exception("生成工作流执行汇报时发生错误") async def _dispatch_to_node(self, step: WorkStep, input_data: Any) -> tuple[Any, bool]: """ - 【重要集成点】这里是你将引擎与具体的 Actor (Consciousness, Control, Worker) 桥接的地方。 - 你需要在这里利用 Ray 来调用其他 Actor。 + 分流器 + 调用当前step的执行对象 + Args: + step: WorkStep对象,当前需要执行的step + input_data: 输入数据 + + Returns: + 返回llm的输出和一个bool类型的判断 """ logger.debug(f"正在向 {step.node} 节点发送动作 {step.action}...") + try: + if step.node == "control_node": + if not self.control_node: + raise WorkflowError("未提供 control_node 句柄!") + payload = ControlForWorkflowInput(workflow_step=step) + # 可选:如果 input_data 需要合并,可以扩展 ControlForWorkflowInput 或将其放在 context_memory + result_obj = await self.control_node.working.remote(payload) + if isinstance(result_obj, ControlForWorkflow): + return result_obj.output, True + return result_obj, True - # 伪代码示例:你可以根据节点类型,获取对应的 Actor 句柄并调用 - # if step.node == "control_node": - # result = await global_control_actor.execute.remote(step.action, step.desc, input_data) - # return result, True - # elif step.node == "primary_individual": - # # 可能是个 1B 写代码模型 - # worker = get_worker_actor(step.action) - # result = await worker.run.remote(step.desc, input_data) - # return result, True + elif step.node == "consciousness_node": + if not self.consciousness_node: + raise WorkflowError("未提供 consciousness_node 句柄!") + # 这里将 command 作为 original_command,可根据业务调整 + original_cmd = self.workflow.command or "" + payload = ConsciousnessForWorkflowInput( + workflow_step=step, + original_command=original_cmd + ) + result_obj = await self.consciousness_node.working.remote(payload) + if isinstance(result_obj, ConsciousnessForWorkflow): + return result_obj.output, True + return result_obj, True - await asyncio.sleep(1) - simulated_result = f"这是 {step.action} 动作产生的模拟结果" - is_success = True - return simulated_result, is_success + elif step.node in ["primary_individual", "composite_individual"]: + logger.warning(f"当前节点 {step.node} 暂未实现完整调度支持,这里将模拟执行。") + await asyncio.sleep(1) + simulated_result = f"这是 {step.node} 执行 {step.action} 产生的模拟结果 (输入: {input_data})" + return simulated_result, True + + else: + raise WorkflowError(f"未知的节点类型:{step.node}") + + except Exception as e: + logger.exception(f"节点 {step.node} 执行动作 {step.action} 失败") + return None, False def _handle_logic_gate(self, step: WorkStep, is_success: bool): - """处理逻辑门跳转,修改状态机指针""" - gate = step.logic_gate + """ + 状态机,检测任务执行情况 + Args: + step: WorkStep对象,当前执行的step + is_success: bool类型,当前步骤是否成功 + + """ + gate = step.logic_gate if is_success: if gate and gate.if_pass == "exit": raise WorkflowExit() - self.workflow.status.step += 1 # 默认成功则步数 +1,继续下一步 + self.workflow.status.step += 1 else: if not gate or not gate.if_fail: raise WorkflowError(f"步骤 {step.step} 失败且未配置 if_fail 兜底方案") - match gate.if_fail.split("_"): case ["jump", "to", "step", target] if target.isdigit(): target_step = int(target) @@ -134,12 +224,14 @@ class WorkflowEngine: raise WorkflowError(f"未知的 if_fail 格式: {gate.if_fail}") - @ray.remote class WorkflowRunningEngine: - def __init__(self): + def __init__(self, consciousness_node=None, control_node=None, supervisory_node=None): self.runner_engine = {} self.workflow_queue: asyncio.Queue[PretorWorkflow] = None + self.consciousness_node = consciousness_node + self.control_node = control_node + self.supervisory_node = supervisory_node async def run(self): self.runner_engine = { @@ -148,16 +240,25 @@ class WorkflowRunningEngine: } self.workflow_queue = asyncio.Queue() - async def runner(self, i: int): + async def runner(self, i: int) -> None: + """ + runner方法,从self.workflow_queue中不断取出任务并执行 + Args: + i: runner序列号 + + """ while True: try: workflow = await self.workflow_queue.get() logger.info(f"WorkflowRunningEngine: runner_{i}接收工作流{workflow.trace_id}:{workflow.title}") - workflow_engine = WorkflowEngine(workflow) + workflow_engine = WorkflowEngine(workflow, + self.consciousness_node, + self.control_node, + self.supervisory_node) await workflow_engine.run() + pass except asyncio.CancelledError: logger.info(f"WorkflowRunningEngine: runner_{i} 被取消。") raise except Exception as e: - logger.error(f"WorkflowRunningEngine: runner_{i} 遇到未捕获的异常: {e}", exc_info=True) - + logger.error(f"WorkflowRunningEngine: runner_{i} 遇到未捕获的异常: {e}", exc_info=True) \ No newline at end of file diff --git a/pretor/utils/demand_protocol.py b/pretor/utils/demand_protocol.py new file mode 100644 index 0000000..92689b6 --- /dev/null +++ b/pretor/utils/demand_protocol.py @@ -0,0 +1,37 @@ +# Copyright 2026 zhaoxi826 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Any, Literal +from pydantic import BaseModel, Field + +# --- 1. 给 Individual (LLM/Agent) 的具体需求 --- +class IndividualDemand(BaseModel): + role_prompt: str = Field(..., description="赋予该个体的角色定义") + task_goal: str = Field(..., description="该个体的具体执行目标") + expected_output: str = Field(..., description="期望产出的数据结构或格式描述") + +# --- 2. 给 Tool (插件/函数调用) 的具体需求 --- +class ToolDemand(BaseModel): + method: str = Field(..., description="插件调用的具体方法名") + args: Dict[str, Any] = Field(default_factory=dict, description="传递给插件的参数") + +# --- 3. 给 System (系统/物理资源) 的具体需求 --- +class SystemDemand(BaseModel): + operation: Literal["allocate_resource", "docker_manage", "file_io", "network"] + params: Dict[str, Any] = Field(..., description="操作所需的物理参数,如 GPU 核心数、路径等") + +# --- 4. 统一需求入口 (裁判官协议体) --- +class DemandProtocol(BaseModel): + variety: Literal["individual", "tool", "system"] + name: str = Field(..., description="目标名称(如:python_expert, pytest_tool, docker_engine)") \ No newline at end of file diff --git a/pretor/worker/__init__.py b/pretor/worker_individual/__init__.py similarity index 100% rename from pretor/worker/__init__.py rename to pretor/worker_individual/__init__.py