diff --git a/pretor/core/individual/consciousness_node/consciousness_node.py b/pretor/core/individual/consciousness_node/consciousness_node.py index bd25673..f65521a 100644 --- a/pretor/core/individual/consciousness_node/consciousness_node.py +++ b/pretor/core/individual/consciousness_node/consciousness_node.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + import ray from typing import Union, overload from pretor.core.individual.consciousness_node.template import (ConsciousnessNodeDeps, ForSupervisoryNode, ForWorkflow,\ @@ -26,10 +27,6 @@ class ConsciousnessNode: def __init__(self) -> None: self.agent: None | Agent = None - @self.agent.system_prompt - async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]): - return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}" - def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None: """ create_agent方法,将agent对象装配到ConsciousnessNode的属性内 @@ -55,7 +52,11 @@ class ConsciousnessNode: deps_type=ConsciousnessNodeDeps, agent_name="consciousness_node") - async def running(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str: + @self.agent.system_prompt + async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]): + return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}" + + async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str: result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload) if isinstance(result, ForWorkflowEngine): return result diff --git a/pretor/core/individual/consciousness_node/template.py b/pretor/core/individual/consciousness_node/template.py index 1cdec15..06e8c3b 100644 --- a/pretor/core/individual/consciousness_node/template.py +++ b/pretor/core/individual/consciousness_node/template.py @@ -12,10 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pydantic import BaseModel from pretor.core.workflow.workflow import PretorWorkflow, WorkStep -from pretor.utils.agent_model import ResponseModel, DepsModel +from pretor.utils.agent_model import ResponseModel, DepsModel, InputModel #意识节点回复类 @@ -45,7 +44,7 @@ class ConsciousnessNodeDeps(DepsModel): command: str -class ConsciousnessNodeInput(BaseModel): +class ConsciousnessNodeInput(InputModel): pass diff --git a/pretor/core/individual/control_node/control_node.py b/pretor/core/individual/control_node/control_node.py index caba843..d36969f 100644 --- a/pretor/core/individual/control_node/control_node.py +++ b/pretor/core/individual/control_node/control_node.py @@ -14,34 +14,47 @@ import ray from pydantic_ai import Agent -from pretor.core.workflow.workflow import WorkStep +from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine +from pretor.core.global_state_machine.model_provider.base_provider import Provider +from pretor.adapter.model_adapter.agent_factory import AgentFactory +from pretor.core.individual.control_node.template import ForWorkflow, ForWorkflowInput, ControlNodeDeps @ray.remote class ControlNode: - def __init__(self, agent: Agent): - self.agent = agent + def __init__(self): + self.agent: Agent | None = None - async def execute_step(self, step: WorkStep) -> WorkStep: - if step.action == "dispatch_model": - # The WorkStep schema from workflow manager may pass target info differently, assuming `input` here or simple `desc` - result = await self.dispatch_model({}, f"Execute task: {step.desc}") - step.output = result - elif step.action == "dispatch_tool": - # Simulating parsing of tool and args from `desc` or `input` - result = await self.dispatch_tool("simulated_tool", {"desc": step.desc}) - step.output = str(result) - else: - result = await self.agent.run(f"Execute generic step: {step.model_dump()}") - step.output = result.data + def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None: + """ + create_agent方法,将agent对象装配到Control的属性内 + 该方法通过provider_title从global_state_machine中获取provider对象,然后从provider对象中取出供应商形象,装配为pydantic_ai的 + Agent实例, + 并挂载到self.agent属性 + Args: + global_state_machine: 全局状态机 + provider_title: 供应商名 + model_id: 模型id - step.status = "completed" - return step + Returns: + 无返回 + """ + system_prompt: str = "" + output_type = ForWorkflow + provider: Provider = global_state_machine.get_provider.remote(provider_title) + agent_factory = AgentFactory() + self.agent = agent_factory.create_agent(provider=provider, + model_id=model_id, + output_type=output_type, + system_prompt=system_prompt, + deps_type=ControlNodeDeps, + agent_name="control_node") - async def dispatch_model(self, model_info: dict, prompt: str) -> str: - # In a real system, we'd select a smaller/specific model based on model_info - result = await self.agent.run(prompt) - return result.data + async def working(self, payload: ForWorkflowInput) -> str: + result: ForWorkflow = await self._run(payload) + return result - async def dispatch_tool(self, tool_name: str, tool_args: dict) -> dict: - # Simulated tool dispatch - return {"tool": tool_name, "status": "executed", "args": tool_args} + async def _run(self, payload: ForWorkflowInput) -> ForWorkflow: + deps = ControlNodeDeps(workflow_step=payload.workflow_step) + result = await self.agent.run(f"根据workflow_step分配任务", + deps=deps) + return result.output \ No newline at end of file diff --git a/pretor/core/individual/control_node/template.py b/pretor/core/individual/control_node/template.py new file mode 100644 index 0000000..d36c7a9 --- /dev/null +++ b/pretor/core/individual/control_node/template.py @@ -0,0 +1,36 @@ +# Copyright 2026 zhaoxi826 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pretor.core.workflow.workflow import WorkStep +from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel + +class ControlNodeResponse(ResponseModel): + """控制节点回复的基类""" + pass + + +class ControlNodeInput(InputModel): + pass + +class ControlNodeDeps(DepsModel): + workflow_step: WorkStep + + +class ForWorkflow(ControlNodeResponse): + output: str + + +class ForWorkflowInput(ControlNodeInput): + workflow_step: WorkStep \ No newline at end of file diff --git a/pretor/utils/agent_model.py b/pretor/utils/agent_model.py index e152792..ef9e0e3 100644 --- a/pretor/utils/agent_model.py +++ b/pretor/utils/agent_model.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. + from pydantic import BaseModel class ResponseModel(BaseModel): pass class DepsModel(BaseModel): + pass + +class InputModel(BaseModel): pass \ No newline at end of file diff --git a/pretor/core/individual/control_node/response.py b/pretor/worker/__init__.py similarity index 100% rename from pretor/core/individual/control_node/response.py rename to pretor/worker/__init__.py