From fe49340106c690a27028053f5f033726cc807460 Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Mon, 20 Apr 2026 16:20:38 +0800 Subject: [PATCH] =?UTF-8?q?wip:=20=E4=BC=98=E5=8C=96=E4=BA=86control=5Fnod?= =?UTF-8?q?e=20consciousness=5Fnode=E5=92=8Csupervisory=5Fnode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yml | 3 +- docs/problem.md | 14 ++- pretor/api/agent.py | 2 + pretor/api/auth.py | 3 +- pretor/api/cluster.py | 40 +++++++ pretor/api/platform/frontend.py | 13 +-- pretor/api/workflow.py | 34 ++++++ pretor/core/api/__init__.py | 14 ++- .../global_state_machine.py | 8 +- .../consciousness_node/consciousness_node.py | 106 +++++++++++------- .../individual/consciousness_node/template.py | 10 +- .../individual/control_node/control_node.py | 51 +++++++-- .../core/individual/control_node/template.py | 7 +- .../supervisory_node/supervisory_node.py | 93 ++++++++++----- .../individual/supervisory_node/template.py | 10 +- 15 files changed, 302 insertions(+), 106 deletions(-) create mode 100644 pretor/api/cluster.py create mode 100644 pretor/api/workflow.py diff --git a/config/config.yml b/config/config.yml index f5743bc..e1c4070 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1 +1,2 @@ -version: v0.1 \ No newline at end of file +version: v0.1 +name: \ No newline at end of file diff --git a/docs/problem.md b/docs/problem.md index f5815c2..a5619a5 100644 --- a/docs/problem.md +++ b/docs/problem.md @@ -2,7 +2,7 @@ --- ## 问题栏 #### 🔴 核心缺陷与修复 (Bug Fixes & Stability) -- [ ] /pretor/core/individual每个template进行优化 +- [x] /pretor/core/individual每个template进行优化 - [ ] /pretor/worker_individual待完善复合子个体和基础子个体 #### 🛡️ 安全与合规 (Security & Auth) @@ -13,10 +13,11 @@ - [ ] 优化import #### 🏗️ 架构演进 (Architecture & Refactoring) -- [ ] 使用fastapi-users完善用户系统 +- ~~[ ] 使用fastapi-users完善用户系统~~(2026/4/19 fastapi-users会严重摧毁代码的优雅性) +- [ ] 升级auth功能 - [x] /pretor/api的接口函数进行重构 - [ ] /dockerfile待完善 -- - [ ] 完善沙箱功能 +- [ ] 完善沙箱功能 - [ ] 完善爬虫功能 - [ ] 对接更多的provider @@ -24,7 +25,7 @@ ## 日志 #### 2026/4/12 - [x] /pretor/api的接口函数进行重构 -- [ ] /pretor/core/individual每个template进行优化 +- [x] /pretor/core/individual每个template进行优化 - [ ] /pretor/worker_individual待完善复合子个体和基础子个体 - [ ] /pretor/api待完善 - [ ] /dockerfile待完善 @@ -32,10 +33,11 @@ #### 2026/4/16 - [ ] 发布v0.1.0正式版 - [ ] 增加对应全workflow的情况追踪,使得在任务运行中人机交互更加自然方便 -- [ ] 使用fastapi-users完善用户系统 +- ~~[ ] 使用fastapi-users完善用户系统~~ #### 2026/4/19 - [ ] 完善沙箱功能 - [ ] 完善爬虫功能 - [ ] 对接更多的provider -- [ ] 优化import \ No newline at end of file +- [ ] 优化import +- [ ] 升级auth功能 \ No newline at end of file diff --git a/pretor/api/agent.py b/pretor/api/agent.py index 45e38ef..37a0f31 100644 --- a/pretor/api/agent.py +++ b/pretor/api/agent.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + from typing import Union from pretor.utils.ray_hook import ray_actor_hook from fastapi import APIRouter, Request, Depends diff --git a/pretor/api/auth.py b/pretor/api/auth.py index 773c25b..c3dde40 100644 --- a/pretor/api/auth.py +++ b/pretor/api/auth.py @@ -42,5 +42,4 @@ async def login_user(user_login: UserLogin): if user.user_name != user_login.user_name: pass token = await run_in_threadpool(Accessor.login_hashed_password, user, user_login.password) - return {"message":"success", "token":token} - + return {"message":"success", "token":token} \ No newline at end of file diff --git a/pretor/api/cluster.py b/pretor/api/cluster.py new file mode 100644 index 0000000..54076ac --- /dev/null +++ b/pretor/api/cluster.py @@ -0,0 +1,40 @@ +# Copyright 2026 zhaoxi826 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +import ray +import asyncio + +cluster_router = APIRouter(prefix="/api/v1/cluster", tags=["cluster"]) + +@cluster_router.websocket("/ws/state") +async def update_cluster_state(websocket: WebSocket): + await websocket.accept() + try: + while True: + nodes = ray.nodes() + payload = [ + { + "node_id": node["NodeID"], + "node_name": node["NodeName"], + "alive": node["Alive"], + "resources": node["Resources"], + "remaining": node["RemainingResources"] + } + for node in nodes + ] + await websocket.send_json(payload) + await asyncio.sleep(10) + except WebSocketDisconnect: + pass \ No newline at end of file diff --git a/pretor/api/platform/frontend.py b/pretor/api/platform/frontend.py index a8bd1c1..3e4b3ef 100644 --- a/pretor/api/platform/frontend.py +++ b/pretor/api/platform/frontend.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from fastapi import APIRouter, Request, Depends, HTTPException, status, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect from pydantic import BaseModel from pretor.utils.access import Accessor, TokenData from pretor.api.platform.event import PretorEvent @@ -46,14 +46,3 @@ async def create_message(message: Message, else: return {"message": message} -@client_router.websocket("/ws/{event_id}") -async def websocket_endpoint(websocket: WebSocket, event_id: str): - await websocket.accept() - global_state_machine = websocket.app.state.global_state_machine - try: - while True: - await websocket.send_text(await global_state_machine.get_pending(event_id)) - response = await websocket.receive_text() - await global_state_machine.put_received(event_id, response) - except WebSocketDisconnect: - pass \ No newline at end of file diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py new file mode 100644 index 0000000..22628f8 --- /dev/null +++ b/pretor/api/workflow.py @@ -0,0 +1,34 @@ +# Copyright 2026 zhaoxi826 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from pretor.utils.ray_hook import ray_actor_hook +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + + +workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) + +@workflow_router.websocket("/ws/{event_id}") +async def get_workflow(websocket: WebSocket, event_id: str): + await websocket.accept() + global_state_machine = ray_actor_hook("global_state_machine") + try: + while True: + await websocket.send(await global_state_machine.get_workflow.remote(event_id)) + await websocket.send_text(await global_state_machine.get_pending.remote(event_id)) + response = await websocket.receive_text() + await global_state_machine.put_received(event_id, response) + except WebSocketDisconnect: + pass + diff --git a/pretor/core/api/__init__.py b/pretor/core/api/__init__.py index a6b0821..a0ceac8 100644 --- a/pretor/core/api/__init__.py +++ b/pretor/core/api/__init__.py @@ -28,6 +28,8 @@ from pretor.api.platform.frontend import client_router from pretor.api.auth import auth_router from pretor.api.provider import provider_router from pretor.api.resource import resource_router +from pretor.api.cluster import cluster_router +from pretor.api.agent import agent_router @ray.remote class PretorGateway: @@ -41,11 +43,15 @@ class PretorGateway: self.app = FastAPI() self.gateway = {} - self.app.include_router(client_router) - self.app.include_router(auth_router) - self.app.include_router(provider_router) - self.app.include_router(resource_router) + self.app.include_router(client_router)#客户端路径 + self.app.include_router(auth_router)#用户路径 + self.app.include_router(provider_router)#供应商路径 + self.app.include_router(resource_router)#资源路径 + self.app.include_router(cluster_router)#集群信息路径 + self.app.include_router(agent_router)#agent路径 + + frontend_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), "frontend", "dist") if os.path.exists(frontend_dir): diff --git a/pretor/core/global_state_machine/global_state_machine.py b/pretor/core/global_state_machine/global_state_machine.py index 9b66803..366d5f5 100644 --- a/pretor/core/global_state_machine/global_state_machine.py +++ b/pretor/core/global_state_machine/global_state_machine.py @@ -20,7 +20,7 @@ from pretor.core.global_state_machine.model_provider import Provider, ProviderAr import httpx import json from loguru import logger -from typing import Dict, Literal +from typing import Dict, Literal, List from pretor.core.database.postgres import PostgresDatabase from pretor.api.platform.event import PretorEvent import asyncio @@ -64,6 +64,9 @@ class GlobalStateMachine: def update_workflow(self, event_id: str, workflow: PretorWorkflow) -> None: self.event_dict[event_id].workflow = workflow + def get_workflow(self, event_id: str) -> PretorWorkflow: + return self.event_dict[event_id].workflow + async def put_pending(self, event_id, item) -> None: await self.event_dict[event_id].pending_queue.put(item) @@ -161,6 +164,9 @@ class GlobalStateMachine: def workflow_template_generate(self, workflow_template: WorkflowTemplate) -> None: self.global_workflow_template_manager.generate_workflow_template(workflow_template) + def get_workflow_template_list(self) -> List[Dict[str, str]]: + return self.global_workflow_template_manager.workflow_templates_registry + ###以下为skill_manager方法 def add_skill(self, skill_name: str): skill_plugin_dir = pathlib.Path(__file__).parent.parent.parent / "plugin" / "skill_plugin" / skill_name diff --git a/pretor/core/individual/consciousness_node/consciousness_node.py b/pretor/core/individual/consciousness_node/consciousness_node.py index f65521a..2081f6e 100644 --- a/pretor/core/individual/consciousness_node/consciousness_node.py +++ b/pretor/core/individual/consciousness_node/consciousness_node.py @@ -21,6 +21,8 @@ from pydantic_ai import Agent, RunContext from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine from pretor.core.global_state_machine.model_provider.base_provider import Provider from pretor.adapter.model_adapter.agent_factory import AgentFactory +from loguru import logger + @ray.remote class ConsciousnessNode: @@ -41,7 +43,15 @@ class ConsciousnessNode: Returns: 无返回 """ - system_prompt: str = "" + system_prompt: str = ( + "你叫Pretor,是一个多智能体AI助手系统中的【意识节点 (Consciousness Node)】。\n" + "你是系统的'高级规划师'和'架构师',负责处理监控节点分配过来的复杂任务。\n" + "你的主要工作场景包括:\n" + "1. 拆解任务 (Workflow Generation):结合用户的原始命令和提供的模板,生成严谨、可执行的工作流 (PretorWorkflow),并将其输出为 ForWorkflowEngine 格式。拆解时步骤应清晰连贯。\n" + "2. 中途指导 (Workflow Execution):在工作流执行中,如果某一步骤指派给你,你需要对控制节点的结果进行分析或提供下一步的指导,输出 ForWorkflow 格式。\n" + "3. 总结报告 (Supervisory Report):在整个工作流执行完毕后,你需要对整体流程、各个控制节点的执行情况进行审查,并生成一份技术性的总结报告,输出 ForSupervisoryNode 格式。\n" + "请确保所有的思考和生成过程符合逻辑,严密且高质量。" + ) output_type = Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine] provider: Provider = global_state_machine.get_provider.remote(provider_title) agent_factory = AgentFactory() @@ -54,22 +64,30 @@ class ConsciousnessNode: @self.agent.system_prompt async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]): - return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}" + prompt = system_prompt + "\n\n" + prompt += ( + f"=== 当前任务上下文 ===\n" + f"- 当前指令 (Command): {ctx.deps.command}\n" + f"- 原始用户命令 (Original Command): {ctx.deps.original_command}\n" + ) + if ctx.deps.workflow_template: + prompt += f"- 选定工作流模板 (Workflow Template): {ctx.deps.workflow_template}\n" - async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str: - result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload) - if isinstance(result, ForWorkflowEngine): - return result + return prompt - elif isinstance(result, ForWorkflow): - return result - - elif isinstance(result, ForSupervisoryInput): - return result - - else: + async def working(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode, None]: + try: + result = await self._run(payload) + if isinstance(result, (ForWorkflowEngine, ForWorkflow, ForSupervisoryNode)): + return result + else: + logger.error(f"ConsciousnessNode: 未知或不匹配的返回类型: {type(result)}") + return None + except Exception as e: + logger.exception("ConsciousnessNode在执行working时发生严重错误") return None + @overload async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine: """ @@ -109,29 +127,41 @@ class ConsciousnessNode: """ pass - async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput])\ - -> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]: + async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput]) -> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]: + try: + self.agent.retries = 3 + if isinstance(payload, ForWorkflowEngineInput): + deps = ConsciousnessNodeDeps( + original_command=payload.original_command, + workflow_template=payload.workflow_template, + command="拆解原始命令变成一个工作流" + ) + logger.debug("ConsciousnessNode: 开始生成工作流 (原生重试开启)") + result = await self.agent.run( + "根据original_command制定严密的可执行workflow,可以学习并参考workflow_template的设计理念", + deps=deps) + return result.output - if isinstance(payload, ForWorkflowEngineInput): - deps = ConsciousnessNodeDeps(original_command=payload.original_command, - workflow_template=payload.workflow_template, - command="拆解原始命令变成一个工作流") - result = await self.agent.run(f"根据original_command制定workflow,可以学习workflow_template", - deps=deps, - output_type=ForWorkflowEngine,) - return result.output - elif isinstance(payload, ForWorkflowInput): - deps = ConsciousnessNodeDeps(original_command=payload.original_command, - command="完成workflowstep的任务") - result = await self.agent.run(payload.workflow_step.model_dump_json(), - deps=deps, - output_type=ForWorkflow) - return result.output - elif isinstance(payload, ForSupervisoryInput): - deps = ConsciousnessNodeDeps(original_command=payload.original_command, - command="对于结果进行检查,并且生成一份技术性的总结报告") - result = await self.agent.run(payload.workflow.model_dump_json(), - deps=deps, - output_type=ForSupervisoryNode) - return result.output - return None \ No newline at end of file + elif isinstance(payload, ForWorkflowInput): + deps = ConsciousnessNodeDeps( + original_command=payload.original_command, + command="完成workflow step中分配给意识节点的特定任务或指导" + ) + logger.debug("ConsciousnessNode: 开始处理工作流节点任务 (原生重试开启)") + result = await self.agent.run(f"处理此工作流步骤信息:\n{payload.workflow_step.model_dump_json()}", + deps=deps) + return result.output + + elif isinstance(payload, ForSupervisoryInput): + deps = ConsciousnessNodeDeps( + original_command=payload.original_command, + command="对于工作流整体执行结果进行检查,并且生成一份专业的技术性总结报告" + ) + logger.debug("ConsciousnessNode: 开始生成技术总结报告 (原生重试开启)") + result = await self.agent.run( + f"基于以下工作流的执行记录,生成技术报告:\n{payload.workflow.model_dump_json()}", + deps=deps) + return result.output + except Exception as e: + logger.exception(f"ConsciousnessNode 模型生成最终失败: {str(e)}") + raise RuntimeError(f"ConsciousnessNode 无法完成任务: {str(e)}") from e diff --git a/pretor/core/individual/consciousness_node/template.py b/pretor/core/individual/consciousness_node/template.py index 06e8c3b..1e6db31 100644 --- a/pretor/core/individual/consciousness_node/template.py +++ b/pretor/core/individual/consciousness_node/template.py @@ -15,6 +15,7 @@ from pretor.core.workflow.workflow import PretorWorkflow, WorkStep from pretor.utils.agent_model import ResponseModel, DepsModel, InputModel +from pydantic import Field #意识节点回复类 @@ -25,17 +26,18 @@ class ConsciousnessNodeResponse(ResponseModel): class ForWorkflowEngine(ConsciousnessNodeResponse): """生成workflow并放入WorkflowEngine""" - workflow: PretorWorkflow + workflow: PretorWorkflow = Field(..., description="生成好的符合规范的完整工作流对象。") + reasoning: str = Field(..., description="生成此工作流的原因和思路简述。") class ForWorkflow(ConsciousnessNodeResponse): """处理workflow中需要ConsciousnessNode的工作""" - output: str + output: str = Field(..., description="对当前工作流步骤的具体处理结果或指导意见。") class ForSupervisoryNode(ConsciousnessNodeResponse): """工作流完成后进行校验并返回给SupervisoryNode""" - output: str + output: str = Field(..., description="为监控节点提供的全工作流执行情况的技术性总结报告。") class ConsciousnessNodeDeps(DepsModel): @@ -60,4 +62,4 @@ class ForWorkflowInput(ConsciousnessNodeInput): class ForSupervisoryInput(ConsciousnessNodeInput): workflow: PretorWorkflow - original_command: str \ No newline at end of file + original_command: str diff --git a/pretor/core/individual/control_node/control_node.py b/pretor/core/individual/control_node/control_node.py index d36969f..4c90da1 100644 --- a/pretor/core/individual/control_node/control_node.py +++ b/pretor/core/individual/control_node/control_node.py @@ -13,7 +13,8 @@ # limitations under the License. import ray -from pydantic_ai import Agent +from loguru import logger +from pydantic_ai import Agent, RunContext from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine from pretor.core.global_state_machine.model_provider.base_provider import Provider from pretor.adapter.model_adapter.agent_factory import AgentFactory @@ -38,7 +39,15 @@ class ControlNode: Returns: 无返回 """ - system_prompt: str = "" + system_prompt: str = ( + "你叫Pretor,是一个多智能体AI助手系统中的【控制节点 (Control Node)】。\n" + "你是系统的'执行者'和'车间主任',专门负责执行工作流中分配给你的具体子任务。\n" + "你的工作职责是:\n" + "1. 仔细分析分配给你的工作流步骤 (workflow_step) 的目标和要求。\n" + "2. 运用你被分配的工具 (如有) 或者依靠自身的知识和推理能力,精准、高效地完成该任务。\n" + "3. 将执行的结果、产生的数据或者具体的输出,严格按照 ForWorkflow 格式返回。\n" + "请注意:你的输出应当具体、实用,直接提供任务所要求的结果,不要做过多无关的寒暄。" + ) output_type = ForWorkflow provider: Provider = global_state_machine.get_provider.remote(provider_title) agent_factory = AgentFactory() @@ -48,13 +57,39 @@ class ControlNode: system_prompt=system_prompt, deps_type=ControlNodeDeps, agent_name="control_node") + @self.agent.system_prompt + async def dynamic_prompt(ctx: RunContext[ControlNodeDeps]): + prompt = system_prompt + "\n\n" + prompt += ( + f"=== 当前任务步骤上下文 ===\n" + f"- 步骤名称 (Name): {ctx.deps.workflow_step.name}\n" + f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.description}\n" + f"- 前置步骤结果参考 (Previous Results): {ctx.deps.workflow_step.precondition}\n" + f"- 可用工具 (Available Tools): {ctx.deps.current_tools}\n" + ) + return prompt async def working(self, payload: ForWorkflowInput) -> str: - result: ForWorkflow = await self._run(payload) - return result + try: + result: ForWorkflow = await self._run(payload) + return result + except Exception as e: + logger.exception("ControlNode在执行working时发生严重错误") + return None async def _run(self, payload: ForWorkflowInput) -> ForWorkflow: - deps = ControlNodeDeps(workflow_step=payload.workflow_step) - result = await self.agent.run(f"根据workflow_step分配任务", - deps=deps) - return result.output \ No newline at end of file + try: + self.agent.retries = 3 + deps = ControlNodeDeps( + workflow_step=payload.workflow_step + ) + logger.debug(f"ControlNode: 开始执行工作流节点 [{payload.workflow_step.name}] (原生重试开启)") + + result = await self.agent.run( + f"请根据提供的 workflow_step 上下文,执行此步骤并输出结果。\n详细指令或附加数据:{payload.workflow_step.model_dump_json()}", + deps=deps + ) + return result.output + except Exception as e: + logger.exception(f"ControlNode 在执行步骤 [{payload.workflow_step.name}] 时最终失败: {str(e)}") + raise RuntimeError(f"ControlNode 执行步骤失败: {str(e)}") from e diff --git a/pretor/core/individual/control_node/template.py b/pretor/core/individual/control_node/template.py index d36c7a9..6fb21b8 100644 --- a/pretor/core/individual/control_node/template.py +++ b/pretor/core/individual/control_node/template.py @@ -13,6 +13,7 @@ # limitations under the License. +from pydantic import Field from pretor.core.workflow.workflow import WorkStep from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel @@ -24,13 +25,15 @@ class ControlNodeResponse(ResponseModel): class ControlNodeInput(InputModel): pass + class ControlNodeDeps(DepsModel): workflow_step: WorkStep + # In the future, this can be dynamically populated with tools specific to the current task execution class ForWorkflow(ControlNodeResponse): - output: str + output: str = Field(..., description="控制节点执行特定工作流步骤的结果。包含执行细节和输出数据。") class ForWorkflowInput(ControlNodeInput): - workflow_step: WorkStep \ No newline at end of file + workflow_step: WorkStep diff --git a/pretor/core/individual/supervisory_node/supervisory_node.py b/pretor/core/individual/supervisory_node/supervisory_node.py index 5a28248..25d839b 100644 --- a/pretor/core/individual/supervisory_node/supervisory_node.py +++ b/pretor/core/individual/supervisory_node/supervisory_node.py @@ -21,6 +21,8 @@ from pretor.core.global_state_machine.global_state_machine import GlobalStateMac from pretor.core.global_state_machine.model_provider import Provider from pretor.core.individual.supervisory_node.template import ForConsciousnessNode, ForUser, SupervisoryNodeDeps, TerminationMessage from pydantic_ai import RunContext, Agent +from loguru import logger +from pretor.utils.ray_hook import ray_actor_hook @ray.remote class SupervisoryNode: @@ -40,7 +42,16 @@ class SupervisoryNode: Returns: 无返回 """ - system_prompt: str = "" + system_prompt: str = ( + "你叫Pretor,是一个多智能体AI助手系统中的【监控节点 (Supervisory Node)】。\n" + "你是系统的'前台接待'和'大脑皮层',负责接收用户的初始请求或工作流的最终报告。\n" + "你的核心职责是进行【意图识别与路由】。请仔细阅读用户的请求:\n" + "1. 如果用户只是进行简单的问候、闲聊或查询非常基础的信息,请直接生成友好的回复,使用 ForUser 格式。\n" + "2. 如果用户提出的是复杂任务(如需要编写代码、多步骤规划、数据处理等),请务必将其判定为需要工作流处理的任务," + " 并使用 ForConsciousnessNode 格式,同时从提供的【可用模板列表】中选择最合适的工作流模板移交给意识节点。\n" + "3. 如果你收到的是 TerminationMessage(代表工作流已完成并生成了报告),请将报告内容转化为友好的面向用户的回复,使用 ForUser 格式。\n" + "请保持冷静、专业,并严格遵循上述路由规则。" + ) output_type = Union[ForConsciousnessNode, ForUser] provider: Provider = await global_state_machine.get_provider.remote(provider_title) agent_factory = AgentFactory() @@ -53,7 +64,21 @@ class SupervisoryNode: @self.agent.system_prompt async def dynamic_prompt(ctx: RunContext[SupervisoryNodeDeps]): - return f"Context: Platform={ctx.deps.platform}, User={ctx.deps.user_name}, Time={ctx.deps.time}" + prompt = system_prompt + "\n\n" + prompt += ( + f"=== 当前上下文 ===\n" + f"- 平台 (Platform): {ctx.deps.platform}\n" + f"- 用户名 (User): {ctx.deps.user_name}\n" + f"- 当前时间 (Time): {ctx.deps.time}\n" + f"- 可用工作流模板 (Available Templates): {ctx.deps.available_templates}\n" + ) + if ctx.deps.error_history: + prompt += ( + f"\n=== 错误重试指示 ===\n" + f"警告:前一次尝试失败,错误信息如下:\n{ctx.deps.error_history}\n" + f"请务必修正该错误并按照要求的 Pydantic 格式输出。" + ) + return prompt ###工作函数 async def working(self, payload: Union[PretorEvent, TerminationMessage]) -> str: @@ -65,16 +90,20 @@ class SupervisoryNode: Returns: str,监控节点对于用户的回复 """ - result = await self._run(payload) - if isinstance(result, ForConsciousnessNode): - - return "任务已创建" - - elif isinstance(result, ForUser): - return result.content - - else: - return "未知响应类型" + try: + result = await self._run(payload) + if isinstance(result, ForConsciousnessNode): + logger.info(f"SupervisoryNode: 任务已分配给意识节点,选用模板 [{result.workflow_template}]") + return f"任务已创建,准备创建工作流。原因:{result.reasoning}" + elif isinstance(result, ForUser): + logger.info("SupervisoryNode: 直接向用户返回简单回复。") + return result.context + else: + logger.error(f"SupervisoryNode: 未知响应类型: {type(result)}") + return "抱歉,系统内部遇到未知错误,无法正确处理您的请求。" + except Exception as e: + logger.exception("SupervisoryNode在处理请求时发生未捕获的严重错误") + return "抱歉,监控节点处理请求时发生严重错误,请联系管理员。" @overload async def _run(self, payload: PretorEvent) -> Union[ForConsciousnessNode, ForUser]: @@ -87,7 +116,7 @@ class SupervisoryNode: ForUser对象,监控节点对于用户进行的简单回答 ForConsciousnessNode对象,监控节点将用户的请求判断为复杂任务,将PretorEvent传递给意识节点,并且给选择好的工作流模板 """ - pass + ... @overload async def _run(self, payload: TerminationMessage) -> ForUser: @@ -99,7 +128,7 @@ class SupervisoryNode: Returns: ForUser对象,工作流结束后给用户的返回 """ - pass + ... async def _run(self, payload: Union[PretorEvent, TerminationMessage]) -> Union[ForConsciousnessNode, ForUser]: """ @@ -111,14 +140,28 @@ class SupervisoryNode: ForConsciousnessNode对象,对意识节点发送的消息 ForUser对象,对用户发送到消息 """ - if isinstance(payload, PretorEvent): - deps = SupervisoryNodeDeps(platform=payload.platform, - user_name=payload.user_name, - time=datetime.datetime.now()) - result = await self.agent.run(payload.message, deps=deps) - else: - deps = SupervisoryNodeDeps(platform=payload.platform, - user_name=payload.user_name, - time=datetime.datetime.now()) - result = await self.agent.run(payload.message, deps=deps) - return result.output \ No newline at end of file + platform = payload.platform + user_name = payload.user_name + message = payload.message + time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + global_state_machine = ray_actor_hook("global_state_machine") + workflow_template_dict = await global_state_machine.get_workflow_template_list.remote() + available_templates_str = "\n".join([f"- 名称: {k}, 描述/内容: {v}" for k, v in + workflow_template_dict.items()]) if workflow_template_dict else "暂无注册的工作流模板" + deps = SupervisoryNodeDeps( + platform=platform, + user_name=user_name, + time=time_str, + available_templates=available_templates_str + ) + logger.debug("SupervisoryNode 开始生成 (启用原生 Pydantic-AI 重试)") + prompt_message = message + if isinstance(payload, TerminationMessage): + prompt_message = f"【工作流执行结束报告】\n请将以下技术报告转化为对用户的友好回复:\n{message}" + self.agent.retries = 3 + result = await self.agent.run(prompt_message, deps=deps) + return result.output + except Exception as e: + logger.exception(f"SupervisoryNode 模型生成或解析最终失败: {str(e)}") + return ForUser(context="系统当前负载过高或遇到复杂内部错误,请稍后再试。") diff --git a/pretor/core/individual/supervisory_node/template.py b/pretor/core/individual/supervisory_node/template.py index 5ca0146..4b2705a 100644 --- a/pretor/core/individual/supervisory_node/template.py +++ b/pretor/core/individual/supervisory_node/template.py @@ -20,10 +20,11 @@ class SupervisoryNodeResponse(ResponseModel): pass class ForUser(SupervisoryNodeResponse): - context: str = Field(...,description="对用户的回复,应当使用和蔼的语气进行回复") + context: str = Field(..., description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。") class ForConsciousnessNode(SupervisoryNodeResponse): - workflow_template: str = Field(..., description="选择的工作流模板,应当为对应模板的name字段") + orkflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。") + reasoning: str = Field(..., description="选择将任务移交意识节点并选用该模板的简短原因。") class TerminationMessage(BaseModel): platform: str @@ -33,4 +34,7 @@ class TerminationMessage(BaseModel): class SupervisoryNodeDeps(DepsModel): platform: str user_name: str - time: str \ No newline at end of file + time: str + retry_count: int = 0 + error_history: str = "" + available_templates: str = "默认工作流 (default_workflow)" \ No newline at end of file