wip: 优化了control_node consciousness_node和supervisory_node

This commit is contained in:
朝夕 2026-04-20 16:20:38 +08:00
parent 43135e47c8
commit fe49340106
15 changed files with 302 additions and 106 deletions

View File

@ -1 +1,2 @@
version: v0.1 version: v0.1
name:

View File

@ -2,7 +2,7 @@
--- ---
## 问题栏 ## 问题栏
#### 🔴 核心缺陷与修复 (Bug Fixes & Stability) #### 🔴 核心缺陷与修复 (Bug Fixes & Stability)
- [ ] /pretor/core/individual每个template进行优化 - [x] /pretor/core/individual每个template进行优化
- [ ] /pretor/worker_individual待完善复合子个体和基础子个体 - [ ] /pretor/worker_individual待完善复合子个体和基础子个体
#### 🛡️ 安全与合规 (Security & Auth) #### 🛡️ 安全与合规 (Security & Auth)
@ -13,10 +13,11 @@
- [ ] 优化import - [ ] 优化import
#### 🏗️ 架构演进 (Architecture & Refactoring) #### 🏗️ 架构演进 (Architecture & Refactoring)
- [ ] 使用fastapi-users完善用户系统 - ~~[ ] 使用fastapi-users完善用户系统~~(2026/4/19 fastapi-users会严重摧毁代码的优雅性)
- [ ] 升级auth功能
- [x] /pretor/api的接口函数进行重构 - [x] /pretor/api的接口函数进行重构
- [ ] /dockerfile待完善 - [ ] /dockerfile待完善
- - [ ] 完善沙箱功能 - [ ] 完善沙箱功能
- [ ] 完善爬虫功能 - [ ] 完善爬虫功能
- [ ] 对接更多的provider - [ ] 对接更多的provider
@ -24,7 +25,7 @@
## 日志 ## 日志
#### 2026/4/12 #### 2026/4/12
- [x] /pretor/api的接口函数进行重构 - [x] /pretor/api的接口函数进行重构
- [ ] /pretor/core/individual每个template进行优化 - [x] /pretor/core/individual每个template进行优化
- [ ] /pretor/worker_individual待完善复合子个体和基础子个体 - [ ] /pretor/worker_individual待完善复合子个体和基础子个体
- [ ] /pretor/api待完善 - [ ] /pretor/api待完善
- [ ] /dockerfile待完善 - [ ] /dockerfile待完善
@ -32,10 +33,11 @@
#### 2026/4/16 #### 2026/4/16
- [ ] 发布v0.1.0正式版 - [ ] 发布v0.1.0正式版
- [ ] 增加对应全workflow的情况追踪使得在任务运行中人机交互更加自然方便 - [ ] 增加对应全workflow的情况追踪使得在任务运行中人机交互更加自然方便
- [ ] 使用fastapi-users完善用户系统 - ~~[ ] 使用fastapi-users完善用户系统~~
#### 2026/4/19 #### 2026/4/19
- [ ] 完善沙箱功能 - [ ] 完善沙箱功能
- [ ] 完善爬虫功能 - [ ] 完善爬虫功能
- [ ] 对接更多的provider - [ ] 对接更多的provider
- [ ] 优化import - [ ] 优化import
- [ ] 升级auth功能

View File

@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Union from typing import Union
from pretor.utils.ray_hook import ray_actor_hook from pretor.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Request, Depends from fastapi import APIRouter, Request, Depends

View File

@ -42,5 +42,4 @@ async def login_user(user_login: UserLogin):
if user.user_name != user_login.user_name: if user.user_name != user_login.user_name:
pass pass
token = await run_in_threadpool(Accessor.login_hashed_password, user, user_login.password) token = await run_in_threadpool(Accessor.login_hashed_password, user, user_login.password)
return {"message":"success", "token":token} return {"message":"success", "token":token}

40
pretor/api/cluster.py Normal file
View File

@ -0,0 +1,40 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import ray
import asyncio
cluster_router = APIRouter(prefix="/api/v1/cluster", tags=["cluster"])
@cluster_router.websocket("/ws/state")
async def update_cluster_state(websocket: WebSocket):
await websocket.accept()
try:
while True:
nodes = ray.nodes()
payload = [
{
"node_id": node["NodeID"],
"node_name": node["NodeName"],
"alive": node["Alive"],
"resources": node["Resources"],
"remaining": node["RemainingResources"]
}
for node in nodes
]
await websocket.send_json(payload)
await asyncio.sleep(10)
except WebSocketDisconnect:
pass

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from fastapi import APIRouter, Request, Depends, HTTPException, status, WebSocket, WebSocketDisconnect from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect
from pydantic import BaseModel from pydantic import BaseModel
from pretor.utils.access import Accessor, TokenData from pretor.utils.access import Accessor, TokenData
from pretor.api.platform.event import PretorEvent from pretor.api.platform.event import PretorEvent
@ -46,14 +46,3 @@ async def create_message(message: Message,
else: else:
return {"message": message} return {"message": message}
@client_router.websocket("/ws/{event_id}")
async def websocket_endpoint(websocket: WebSocket, event_id: str):
await websocket.accept()
global_state_machine = websocket.app.state.global_state_machine
try:
while True:
await websocket.send_text(await global_state_machine.get_pending(event_id))
response = await websocket.receive_text()
await global_state_machine.put_received(event_id, response)
except WebSocketDisconnect:
pass

34
pretor/api/workflow.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pretor.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
@workflow_router.websocket("/ws/{event_id}")
async def get_workflow(websocket: WebSocket, event_id: str):
await websocket.accept()
global_state_machine = ray_actor_hook("global_state_machine")
try:
while True:
await websocket.send(await global_state_machine.get_workflow.remote(event_id))
await websocket.send_text(await global_state_machine.get_pending.remote(event_id))
response = await websocket.receive_text()
await global_state_machine.put_received(event_id, response)
except WebSocketDisconnect:
pass

View File

@ -28,6 +28,8 @@ from pretor.api.platform.frontend import client_router
from pretor.api.auth import auth_router from pretor.api.auth import auth_router
from pretor.api.provider import provider_router from pretor.api.provider import provider_router
from pretor.api.resource import resource_router from pretor.api.resource import resource_router
from pretor.api.cluster import cluster_router
from pretor.api.agent import agent_router
@ray.remote @ray.remote
class PretorGateway: class PretorGateway:
@ -41,11 +43,15 @@ class PretorGateway:
self.app = FastAPI() self.app = FastAPI()
self.gateway = {} self.gateway = {}
self.app.include_router(client_router) self.app.include_router(client_router)#客户端路径
self.app.include_router(auth_router)
self.app.include_router(provider_router)
self.app.include_router(resource_router)
self.app.include_router(auth_router)#用户路径
self.app.include_router(provider_router)#供应商路径
self.app.include_router(resource_router)#资源路径
self.app.include_router(cluster_router)#集群信息路径
self.app.include_router(agent_router)#agent路径
frontend_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "frontend", "dist") frontend_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "frontend", "dist")
if os.path.exists(frontend_dir): if os.path.exists(frontend_dir):

View File

@ -20,7 +20,7 @@ from pretor.core.global_state_machine.model_provider import Provider, ProviderAr
import httpx import httpx
import json import json
from loguru import logger from loguru import logger
from typing import Dict, Literal from typing import Dict, Literal, List
from pretor.core.database.postgres import PostgresDatabase from pretor.core.database.postgres import PostgresDatabase
from pretor.api.platform.event import PretorEvent from pretor.api.platform.event import PretorEvent
import asyncio import asyncio
@ -64,6 +64,9 @@ class GlobalStateMachine:
def update_workflow(self, event_id: str, workflow: PretorWorkflow) -> None: def update_workflow(self, event_id: str, workflow: PretorWorkflow) -> None:
self.event_dict[event_id].workflow = workflow self.event_dict[event_id].workflow = workflow
def get_workflow(self, event_id: str) -> PretorWorkflow:
return self.event_dict[event_id].workflow
async def put_pending(self, event_id, item) -> None: async def put_pending(self, event_id, item) -> None:
await self.event_dict[event_id].pending_queue.put(item) await self.event_dict[event_id].pending_queue.put(item)
@ -161,6 +164,9 @@ class GlobalStateMachine:
def workflow_template_generate(self, workflow_template: WorkflowTemplate) -> None: def workflow_template_generate(self, workflow_template: WorkflowTemplate) -> None:
self.global_workflow_template_manager.generate_workflow_template(workflow_template) self.global_workflow_template_manager.generate_workflow_template(workflow_template)
def get_workflow_template_list(self) -> List[Dict[str, str]]:
return self.global_workflow_template_manager.workflow_templates_registry
###以下为skill_manager方法 ###以下为skill_manager方法
def add_skill(self, skill_name: str): def add_skill(self, skill_name: str):
skill_plugin_dir = pathlib.Path(__file__).parent.parent.parent / "plugin" / "skill_plugin" / skill_name skill_plugin_dir = pathlib.Path(__file__).parent.parent.parent / "plugin" / "skill_plugin" / skill_name

View File

@ -21,6 +21,8 @@ from pydantic_ai import Agent, RunContext
from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
from pretor.core.global_state_machine.model_provider.base_provider import Provider from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory from pretor.adapter.model_adapter.agent_factory import AgentFactory
from loguru import logger
@ray.remote @ray.remote
class ConsciousnessNode: class ConsciousnessNode:
@ -41,7 +43,15 @@ class ConsciousnessNode:
Returns: Returns:
无返回 无返回
""" """
system_prompt: str = "" system_prompt: str = (
"你叫Pretor是一个多智能体AI助手系统中的【意识节点 (Consciousness Node)】。\n"
"你是系统的'高级规划师''架构师',负责处理监控节点分配过来的复杂任务。\n"
"你的主要工作场景包括:\n"
"1. 拆解任务 (Workflow Generation):结合用户的原始命令和提供的模板,生成严谨、可执行的工作流 (PretorWorkflow),并将其输出为 ForWorkflowEngine 格式。拆解时步骤应清晰连贯。\n"
"2. 中途指导 (Workflow Execution):在工作流执行中,如果某一步骤指派给你,你需要对控制节点的结果进行分析或提供下一步的指导,输出 ForWorkflow 格式。\n"
"3. 总结报告 (Supervisory Report):在整个工作流执行完毕后,你需要对整体流程、各个控制节点的执行情况进行审查,并生成一份技术性的总结报告,输出 ForSupervisoryNode 格式。\n"
"请确保所有的思考和生成过程符合逻辑,严密且高质量。"
)
output_type = Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine] output_type = Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]
provider: Provider = global_state_machine.get_provider.remote(provider_title) provider: Provider = global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory() agent_factory = AgentFactory()
@ -54,22 +64,30 @@ class ConsciousnessNode:
@self.agent.system_prompt @self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]): async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}" prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务上下文 ===\n"
f"- 当前指令 (Command): {ctx.deps.command}\n"
f"- 原始用户命令 (Original Command): {ctx.deps.original_command}\n"
)
if ctx.deps.workflow_template:
prompt += f"- 选定工作流模板 (Workflow Template): {ctx.deps.workflow_template}\n"
async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str: return prompt
result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload)
if isinstance(result, ForWorkflowEngine):
return result
elif isinstance(result, ForWorkflow): async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode, None]:
return result try:
result = await self._run(payload)
elif isinstance(result, ForSupervisoryInput): if isinstance(result, (ForWorkflowEngine, ForWorkflow, ForSupervisoryNode)):
return result return result
else:
else: logger.error(f"ConsciousnessNode: 未知或不匹配的返回类型: {type(result)}")
return None
except Exception as e:
logger.exception("ConsciousnessNode在执行working时发生严重错误")
return None return None
@overload @overload
async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine: async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine:
""" """
@ -109,29 +127,41 @@ class ConsciousnessNode:
""" """
pass pass
async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput])\ async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput]) -> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]:
-> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]: try:
self.agent.retries = 3
if isinstance(payload, ForWorkflowEngineInput):
deps = ConsciousnessNodeDeps(
original_command=payload.original_command,
workflow_template=payload.workflow_template,
command="拆解原始命令变成一个工作流"
)
logger.debug("ConsciousnessNode: 开始生成工作流 (原生重试开启)")
result = await self.agent.run(
"根据original_command制定严密的可执行workflow可以学习并参考workflow_template的设计理念",
deps=deps)
return result.output
if isinstance(payload, ForWorkflowEngineInput): elif isinstance(payload, ForWorkflowInput):
deps = ConsciousnessNodeDeps(original_command=payload.original_command, deps = ConsciousnessNodeDeps(
workflow_template=payload.workflow_template, original_command=payload.original_command,
command="拆解原始命令变成一个工作流") command="完成workflow step中分配给意识节点的特定任务或指导"
result = await self.agent.run(f"根据original_command制定workflow可以学习workflow_template", )
deps=deps, logger.debug("ConsciousnessNode: 开始处理工作流节点任务 (原生重试开启)")
output_type=ForWorkflowEngine,) result = await self.agent.run(f"处理此工作流步骤信息:\n{payload.workflow_step.model_dump_json()}",
return result.output deps=deps)
elif isinstance(payload, ForWorkflowInput): return result.output
deps = ConsciousnessNodeDeps(original_command=payload.original_command,
command="完成workflowstep的任务") elif isinstance(payload, ForSupervisoryInput):
result = await self.agent.run(payload.workflow_step.model_dump_json(), deps = ConsciousnessNodeDeps(
deps=deps, original_command=payload.original_command,
output_type=ForWorkflow) command="对于工作流整体执行结果进行检查,并且生成一份专业的技术性总结报告"
return result.output )
elif isinstance(payload, ForSupervisoryInput): logger.debug("ConsciousnessNode: 开始生成技术总结报告 (原生重试开启)")
deps = ConsciousnessNodeDeps(original_command=payload.original_command, result = await self.agent.run(
command="对于结果进行检查,并且生成一份技术性的总结报告") f"基于以下工作流的执行记录,生成技术报告:\n{payload.workflow.model_dump_json()}",
result = await self.agent.run(payload.workflow.model_dump_json(), deps=deps)
deps=deps, return result.output
output_type=ForSupervisoryNode) except Exception as e:
return result.output logger.exception(f"ConsciousnessNode 模型生成最终失败: {str(e)}")
return None raise RuntimeError(f"ConsciousnessNode 无法完成任务: {str(e)}") from e

View File

@ -15,6 +15,7 @@
from pretor.core.workflow.workflow import PretorWorkflow, WorkStep from pretor.core.workflow.workflow import PretorWorkflow, WorkStep
from pretor.utils.agent_model import ResponseModel, DepsModel, InputModel from pretor.utils.agent_model import ResponseModel, DepsModel, InputModel
from pydantic import Field
#意识节点回复类 #意识节点回复类
@ -25,17 +26,18 @@ class ConsciousnessNodeResponse(ResponseModel):
class ForWorkflowEngine(ConsciousnessNodeResponse): class ForWorkflowEngine(ConsciousnessNodeResponse):
"""生成workflow并放入WorkflowEngine""" """生成workflow并放入WorkflowEngine"""
workflow: PretorWorkflow workflow: PretorWorkflow = Field(..., description="生成好的符合规范的完整工作流对象。")
reasoning: str = Field(..., description="生成此工作流的原因和思路简述。")
class ForWorkflow(ConsciousnessNodeResponse): class ForWorkflow(ConsciousnessNodeResponse):
"""处理workflow中需要ConsciousnessNode的工作""" """处理workflow中需要ConsciousnessNode的工作"""
output: str output: str = Field(..., description="对当前工作流步骤的具体处理结果或指导意见。")
class ForSupervisoryNode(ConsciousnessNodeResponse): class ForSupervisoryNode(ConsciousnessNodeResponse):
"""工作流完成后进行校验并返回给SupervisoryNode""" """工作流完成后进行校验并返回给SupervisoryNode"""
output: str output: str = Field(..., description="为监控节点提供的全工作流执行情况的技术性总结报告。")
class ConsciousnessNodeDeps(DepsModel): class ConsciousnessNodeDeps(DepsModel):
@ -60,4 +62,4 @@ class ForWorkflowInput(ConsciousnessNodeInput):
class ForSupervisoryInput(ConsciousnessNodeInput): class ForSupervisoryInput(ConsciousnessNodeInput):
workflow: PretorWorkflow workflow: PretorWorkflow
original_command: str original_command: str

View File

@ -13,7 +13,8 @@
# limitations under the License. # limitations under the License.
import ray import ray
from pydantic_ai import Agent from loguru import logger
from pydantic_ai import Agent, RunContext
from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
from pretor.core.global_state_machine.model_provider.base_provider import Provider from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory from pretor.adapter.model_adapter.agent_factory import AgentFactory
@ -38,7 +39,15 @@ class ControlNode:
Returns: Returns:
无返回 无返回
""" """
system_prompt: str = "" system_prompt: str = (
"你叫Pretor是一个多智能体AI助手系统中的【控制节点 (Control Node)】。\n"
"你是系统的'执行者''车间主任',专门负责执行工作流中分配给你的具体子任务。\n"
"你的工作职责是:\n"
"1. 仔细分析分配给你的工作流步骤 (workflow_step) 的目标和要求。\n"
"2. 运用你被分配的工具 (如有) 或者依靠自身的知识和推理能力,精准、高效地完成该任务。\n"
"3. 将执行的结果、产生的数据或者具体的输出,严格按照 ForWorkflow 格式返回。\n"
"请注意:你的输出应当具体、实用,直接提供任务所要求的结果,不要做过多无关的寒暄。"
)
output_type = ForWorkflow output_type = ForWorkflow
provider: Provider = global_state_machine.get_provider.remote(provider_title) provider: Provider = global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory() agent_factory = AgentFactory()
@ -48,13 +57,39 @@ class ControlNode:
system_prompt=system_prompt, system_prompt=system_prompt,
deps_type=ControlNodeDeps, deps_type=ControlNodeDeps,
agent_name="control_node") agent_name="control_node")
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ControlNodeDeps]):
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务步骤上下文 ===\n"
f"- 步骤名称 (Name): {ctx.deps.workflow_step.name}\n"
f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.description}\n"
f"- 前置步骤结果参考 (Previous Results): {ctx.deps.workflow_step.precondition}\n"
f"- 可用工具 (Available Tools): {ctx.deps.current_tools}\n"
)
return prompt
async def working(self, payload: ForWorkflowInput) -> str: async def working(self, payload: ForWorkflowInput) -> str:
result: ForWorkflow = await self._run(payload) try:
return result result: ForWorkflow = await self._run(payload)
return result
except Exception as e:
logger.exception("ControlNode在执行working时发生严重错误")
return None
async def _run(self, payload: ForWorkflowInput) -> ForWorkflow: async def _run(self, payload: ForWorkflowInput) -> ForWorkflow:
deps = ControlNodeDeps(workflow_step=payload.workflow_step) try:
result = await self.agent.run(f"根据workflow_step分配任务", self.agent.retries = 3
deps=deps) deps = ControlNodeDeps(
return result.output workflow_step=payload.workflow_step
)
logger.debug(f"ControlNode: 开始执行工作流节点 [{payload.workflow_step.name}] (原生重试开启)")
result = await self.agent.run(
f"请根据提供的 workflow_step 上下文,执行此步骤并输出结果。\n详细指令或附加数据:{payload.workflow_step.model_dump_json()}",
deps=deps
)
return result.output
except Exception as e:
logger.exception(f"ControlNode 在执行步骤 [{payload.workflow_step.name}] 时最终失败: {str(e)}")
raise RuntimeError(f"ControlNode 执行步骤失败: {str(e)}") from e

View File

@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from pydantic import Field
from pretor.core.workflow.workflow import WorkStep from pretor.core.workflow.workflow import WorkStep
from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel
@ -24,13 +25,15 @@ class ControlNodeResponse(ResponseModel):
class ControlNodeInput(InputModel): class ControlNodeInput(InputModel):
pass pass
class ControlNodeDeps(DepsModel): class ControlNodeDeps(DepsModel):
workflow_step: WorkStep workflow_step: WorkStep
# In the future, this can be dynamically populated with tools specific to the current task execution
class ForWorkflow(ControlNodeResponse): class ForWorkflow(ControlNodeResponse):
output: str output: str = Field(..., description="控制节点执行特定工作流步骤的结果。包含执行细节和输出数据。")
class ForWorkflowInput(ControlNodeInput): class ForWorkflowInput(ControlNodeInput):
workflow_step: WorkStep workflow_step: WorkStep

View File

@ -21,6 +21,8 @@ from pretor.core.global_state_machine.global_state_machine import GlobalStateMac
from pretor.core.global_state_machine.model_provider import Provider from pretor.core.global_state_machine.model_provider import Provider
from pretor.core.individual.supervisory_node.template import ForConsciousnessNode, ForUser, SupervisoryNodeDeps, TerminationMessage from pretor.core.individual.supervisory_node.template import ForConsciousnessNode, ForUser, SupervisoryNodeDeps, TerminationMessage
from pydantic_ai import RunContext, Agent from pydantic_ai import RunContext, Agent
from loguru import logger
from pretor.utils.ray_hook import ray_actor_hook
@ray.remote @ray.remote
class SupervisoryNode: class SupervisoryNode:
@ -40,7 +42,16 @@ class SupervisoryNode:
Returns: Returns:
无返回 无返回
""" """
system_prompt: str = "" system_prompt: str = (
"你叫Pretor是一个多智能体AI助手系统中的【监控节点 (Supervisory Node)】。\n"
"你是系统的'前台接待''大脑皮层',负责接收用户的初始请求或工作流的最终报告。\n"
"你的核心职责是进行【意图识别与路由】。请仔细阅读用户的请求:\n"
"1. 如果用户只是进行简单的问候、闲聊或查询非常基础的信息,请直接生成友好的回复,使用 ForUser 格式。\n"
"2. 如果用户提出的是复杂任务(如需要编写代码、多步骤规划、数据处理等),请务必将其判定为需要工作流处理的任务,"
" 并使用 ForConsciousnessNode 格式,同时从提供的【可用模板列表】中选择最合适的工作流模板移交给意识节点。\n"
"3. 如果你收到的是 TerminationMessage代表工作流已完成并生成了报告请将报告内容转化为友好的面向用户的回复使用 ForUser 格式。\n"
"请保持冷静、专业,并严格遵循上述路由规则。"
)
output_type = Union[ForConsciousnessNode, ForUser] output_type = Union[ForConsciousnessNode, ForUser]
provider: Provider = await global_state_machine.get_provider.remote(provider_title) provider: Provider = await global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory() agent_factory = AgentFactory()
@ -53,7 +64,21 @@ class SupervisoryNode:
@self.agent.system_prompt @self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[SupervisoryNodeDeps]): async def dynamic_prompt(ctx: RunContext[SupervisoryNodeDeps]):
return f"Context: Platform={ctx.deps.platform}, User={ctx.deps.user_name}, Time={ctx.deps.time}" prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前上下文 ===\n"
f"- 平台 (Platform): {ctx.deps.platform}\n"
f"- 用户名 (User): {ctx.deps.user_name}\n"
f"- 当前时间 (Time): {ctx.deps.time}\n"
f"- 可用工作流模板 (Available Templates): {ctx.deps.available_templates}\n"
)
if ctx.deps.error_history:
prompt += (
f"\n=== 错误重试指示 ===\n"
f"警告:前一次尝试失败,错误信息如下:\n{ctx.deps.error_history}\n"
f"请务必修正该错误并按照要求的 Pydantic 格式输出。"
)
return prompt
###工作函数 ###工作函数
async def working(self, payload: Union[PretorEvent, TerminationMessage]) -> str: async def working(self, payload: Union[PretorEvent, TerminationMessage]) -> str:
@ -65,16 +90,20 @@ class SupervisoryNode:
Returns: Returns:
str,监控节点对于用户的回复 str,监控节点对于用户的回复
""" """
result = await self._run(payload) try:
if isinstance(result, ForConsciousnessNode): result = await self._run(payload)
if isinstance(result, ForConsciousnessNode):
return "任务已创建" logger.info(f"SupervisoryNode: 任务已分配给意识节点,选用模板 [{result.workflow_template}]")
return f"任务已创建,准备创建工作流。原因:{result.reasoning}"
elif isinstance(result, ForUser): elif isinstance(result, ForUser):
return result.content logger.info("SupervisoryNode: 直接向用户返回简单回复。")
return result.context
else: else:
return "未知响应类型" logger.error(f"SupervisoryNode: 未知响应类型: {type(result)}")
return "抱歉,系统内部遇到未知错误,无法正确处理您的请求。"
except Exception as e:
logger.exception("SupervisoryNode在处理请求时发生未捕获的严重错误")
return "抱歉,监控节点处理请求时发生严重错误,请联系管理员。"
@overload @overload
async def _run(self, payload: PretorEvent) -> Union[ForConsciousnessNode, ForUser]: async def _run(self, payload: PretorEvent) -> Union[ForConsciousnessNode, ForUser]:
@ -87,7 +116,7 @@ class SupervisoryNode:
ForUser对象监控节点对于用户进行的简单回答 ForUser对象监控节点对于用户进行的简单回答
ForConsciousnessNode对象监控节点将用户的请求判断为复杂任务将PretorEvent传递给意识节点并且给选择好的工作流模板 ForConsciousnessNode对象监控节点将用户的请求判断为复杂任务将PretorEvent传递给意识节点并且给选择好的工作流模板
""" """
pass ...
@overload @overload
async def _run(self, payload: TerminationMessage) -> ForUser: async def _run(self, payload: TerminationMessage) -> ForUser:
@ -99,7 +128,7 @@ class SupervisoryNode:
Returns: Returns:
ForUser对象工作流结束后给用户的返回 ForUser对象工作流结束后给用户的返回
""" """
pass ...
async def _run(self, payload: Union[PretorEvent, TerminationMessage]) -> Union[ForConsciousnessNode, ForUser]: async def _run(self, payload: Union[PretorEvent, TerminationMessage]) -> Union[ForConsciousnessNode, ForUser]:
""" """
@ -111,14 +140,28 @@ class SupervisoryNode:
ForConsciousnessNode对象对意识节点发送的消息 ForConsciousnessNode对象对意识节点发送的消息
ForUser对象对用户发送到消息 ForUser对象对用户发送到消息
""" """
if isinstance(payload, PretorEvent): platform = payload.platform
deps = SupervisoryNodeDeps(platform=payload.platform, user_name = payload.user_name
user_name=payload.user_name, message = payload.message
time=datetime.datetime.now()) time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result = await self.agent.run(payload.message, deps=deps) try:
else: global_state_machine = ray_actor_hook("global_state_machine")
deps = SupervisoryNodeDeps(platform=payload.platform, workflow_template_dict = await global_state_machine.get_workflow_template_list.remote()
user_name=payload.user_name, available_templates_str = "\n".join([f"- 名称: {k}, 描述/内容: {v}" for k, v in
time=datetime.datetime.now()) workflow_template_dict.items()]) if workflow_template_dict else "暂无注册的工作流模板"
result = await self.agent.run(payload.message, deps=deps) deps = SupervisoryNodeDeps(
return result.output platform=platform,
user_name=user_name,
time=time_str,
available_templates=available_templates_str
)
logger.debug("SupervisoryNode 开始生成 (启用原生 Pydantic-AI 重试)")
prompt_message = message
if isinstance(payload, TerminationMessage):
prompt_message = f"【工作流执行结束报告】\n请将以下技术报告转化为对用户的友好回复:\n{message}"
self.agent.retries = 3
result = await self.agent.run(prompt_message, deps=deps)
return result.output
except Exception as e:
logger.exception(f"SupervisoryNode 模型生成或解析最终失败: {str(e)}")
return ForUser(context="系统当前负载过高或遇到复杂内部错误,请稍后再试。")

View File

@ -20,10 +20,11 @@ class SupervisoryNodeResponse(ResponseModel):
pass pass
class ForUser(SupervisoryNodeResponse): class ForUser(SupervisoryNodeResponse):
context: str = Field(...,description="对用户的回复,应当使用和蔼的语气进行回复") context: str = Field(..., description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。")
class ForConsciousnessNode(SupervisoryNodeResponse): class ForConsciousnessNode(SupervisoryNodeResponse):
workflow_template: str = Field(..., description="选择的工作流模板应当为对应模板的name字段") orkflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。")
reasoning: str = Field(..., description="选择将任务移交意识节点并选用该模板的简短原因。")
class TerminationMessage(BaseModel): class TerminationMessage(BaseModel):
platform: str platform: str
@ -33,4 +34,7 @@ class TerminationMessage(BaseModel):
class SupervisoryNodeDeps(DepsModel): class SupervisoryNodeDeps(DepsModel):
platform: str platform: str
user_name: str user_name: str
time: str time: str
retry_count: int = 0
error_history: str = ""
available_templates: str = "默认工作流 (default_workflow)"