wip:增加control_node

This commit is contained in:
朝夕 2026-04-11 14:00:37 +08:00
parent dc857cbff7
commit 6c5849f3d0
6 changed files with 85 additions and 32 deletions

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import ray import ray
from typing import Union, overload from typing import Union, overload
from pretor.core.individual.consciousness_node.template import (ConsciousnessNodeDeps, ForSupervisoryNode, ForWorkflow,\ from pretor.core.individual.consciousness_node.template import (ConsciousnessNodeDeps, ForSupervisoryNode, ForWorkflow,\
@ -26,10 +27,6 @@ class ConsciousnessNode:
def __init__(self) -> None: def __init__(self) -> None:
self.agent: None | Agent = None self.agent: None | Agent = None
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}"
def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None: def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
""" """
create_agent方法将agent对象装配到ConsciousnessNode的属性内 create_agent方法将agent对象装配到ConsciousnessNode的属性内
@ -55,7 +52,11 @@ class ConsciousnessNode:
deps_type=ConsciousnessNodeDeps, deps_type=ConsciousnessNodeDeps,
agent_name="consciousness_node") agent_name="consciousness_node")
async def running(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str: @self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}"
async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str:
result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload) result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload)
if isinstance(result, ForWorkflowEngine): if isinstance(result, ForWorkflowEngine):
return result return result

View File

@ -12,10 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from pydantic import BaseModel
from pretor.core.workflow.workflow import PretorWorkflow, WorkStep from pretor.core.workflow.workflow import PretorWorkflow, WorkStep
from pretor.utils.agent_model import ResponseModel, DepsModel from pretor.utils.agent_model import ResponseModel, DepsModel, InputModel
#意识节点回复类 #意识节点回复类
@ -45,7 +44,7 @@ class ConsciousnessNodeDeps(DepsModel):
command: str command: str
class ConsciousnessNodeInput(BaseModel): class ConsciousnessNodeInput(InputModel):
pass pass

View File

@ -14,34 +14,47 @@
import ray import ray
from pydantic_ai import Agent from pydantic_ai import Agent
from pretor.core.workflow.workflow import WorkStep from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory
from pretor.core.individual.control_node.template import ForWorkflow, ForWorkflowInput, ControlNodeDeps
@ray.remote @ray.remote
class ControlNode: class ControlNode:
def __init__(self, agent: Agent): def __init__(self):
self.agent = agent self.agent: Agent | None = None
async def execute_step(self, step: WorkStep) -> WorkStep: def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
if step.action == "dispatch_model": """
# The WorkStep schema from workflow manager may pass target info differently, assuming `input` here or simple `desc` create_agent方法将agent对象装配到Control的属性内
result = await self.dispatch_model({}, f"Execute task: {step.desc}") 该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的
step.output = result Agent实例
elif step.action == "dispatch_tool": 并挂载到self.agent属性
# Simulating parsing of tool and args from `desc` or `input` Args:
result = await self.dispatch_tool("simulated_tool", {"desc": step.desc}) global_state_machine: 全局状态机
step.output = str(result) provider_title: 供应商名
else: model_id: 模型id
result = await self.agent.run(f"Execute generic step: {step.model_dump()}")
step.output = result.data
step.status = "completed" Returns:
return step 无返回
"""
system_prompt: str = ""
output_type = ForWorkflow
provider: Provider = global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory()
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=ControlNodeDeps,
agent_name="control_node")
async def dispatch_model(self, model_info: dict, prompt: str) -> str: async def working(self, payload: ForWorkflowInput) -> str:
# In a real system, we'd select a smaller/specific model based on model_info result: ForWorkflow = await self._run(payload)
result = await self.agent.run(prompt) return result
return result.data
async def dispatch_tool(self, tool_name: str, tool_args: dict) -> dict: async def _run(self, payload: ForWorkflowInput) -> ForWorkflow:
# Simulated tool dispatch deps = ControlNodeDeps(workflow_step=payload.workflow_step)
return {"tool": tool_name, "status": "executed", "args": tool_args} result = await self.agent.run(f"根据workflow_step分配任务",
deps=deps)
return result.output

View File

@ -0,0 +1,36 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pretor.core.workflow.workflow import WorkStep
from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel
class ControlNodeResponse(ResponseModel):
"""控制节点回复的基类"""
pass
class ControlNodeInput(InputModel):
pass
class ControlNodeDeps(DepsModel):
workflow_step: WorkStep
class ForWorkflow(ControlNodeResponse):
output: str
class ForWorkflowInput(ControlNodeInput):
workflow_step: WorkStep

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from pydantic import BaseModel from pydantic import BaseModel
class ResponseModel(BaseModel): class ResponseModel(BaseModel):
@ -19,3 +20,6 @@ class ResponseModel(BaseModel):
class DepsModel(BaseModel): class DepsModel(BaseModel):
pass pass
class InputModel(BaseModel):
pass