From a83c5fa5bdf8b193e311edc1480907d5d607bc2a Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Mon, 18 May 2026 05:33:11 +0000 Subject: [PATCH] =?UTF-8?q?style(agent):=20=E8=B0=83=E6=95=B4agent?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kilostar/api/workflow.py | 5 +- .../individual/consciousness_node/template.py | 31 +++-- .../regulatory_node/regulatory_node.py | 106 +++++------------- .../individual/regulatory_node/template.py | 73 ++++++------ kilostar/utils/agent_model.py | 23 ++-- 5 files changed, 93 insertions(+), 145 deletions(-) diff --git a/kilostar/api/workflow.py b/kilostar/api/workflow.py index 6f52975..16cfba8 100644 --- a/kilostar/api/workflow.py +++ b/kilostar/api/workflow.py @@ -19,6 +19,8 @@ from pydantic import BaseModel from ulid import ULID import asyncio from kilostar.utils.access import Accessor, TokenData +from kilostar.utils.check_user.role_check import RoleChecker +from kilostar.core.postgres_database.model import UserAuthority workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) @@ -54,7 +56,8 @@ async def create_workflow( @workflow_router.get("/list") -async def get_workflow_list(token_data: TokenData = Depends(Accessor.get_current_user)): +async def get_workflow_list( + token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))): postgres_database = ray_actor_hook("postgres_database").postgres_database workflows = await postgres_database.list_workflows.remote( user_id=token_data.user_id diff --git a/kilostar/core/individual/consciousness_node/template.py b/kilostar/core/individual/consciousness_node/template.py index ab2674d..8a9d358 100644 --- a/kilostar/core/individual/consciousness_node/template.py +++ b/kilostar/core/individual/consciousness_node/template.py @@ -14,14 +14,26 @@ from kilostar.core.work.workflow.workflow import KiloStarWorkflow, WorkflowStep -from kilostar.utils.agent_model import ResponseModel, DepsModel, InputModel +from kilostar.utils.agent_model import ResponseModel, DepsModel, RequestModel from pydantic import Field +from typing import Optional, List # 意识节点回复类 class ConsciousnessNodeResponse(ResponseModel): """Consciousness response model,是意识节点所有回复类型的父类""" + pass +class ConsciousnessNodeDeps(DepsModel): + """ConsciousnessNodeDeps 核心组件类。 + 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" + original_command: str + command: str + available_skills: Optional[List[str]] + +class ConsciousnessNodeInput(RequestModel): + """ConsciousnessNodeInput 核心组件类。 + 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" pass @@ -47,23 +59,6 @@ class ForregulatoryNode(ConsciousnessNodeResponse): ..., description="为监控节点提供的全工作流执行情况的技术性总结报告。" ) - -class ConsciousnessNodeDeps(DepsModel): - """ConsciousnessNodeDeps 核心组件类。 - 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" - - original_command: str - command: str - available_skills: list[dict] | None = None - - -class ConsciousnessNodeInput(InputModel): - """ConsciousnessNodeInput 核心组件类。 - 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" - - pass - - class ForWorkflowEngineInput(ConsciousnessNodeInput): """ForWorkflowEngineInput 核心组件类。 这是一个领域数据模型或功能封装类,承载了 ForWorkflowEngineInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" diff --git a/kilostar/core/individual/regulatory_node/regulatory_node.py b/kilostar/core/individual/regulatory_node/regulatory_node.py index a3b150a..0de3b32 100644 --- a/kilostar/core/individual/regulatory_node/regulatory_node.py +++ b/kilostar/core/individual/regulatory_node/regulatory_node.py @@ -14,26 +14,16 @@ import datetime import ray -from typing import Union, overload +from typing import Union from kilostar.adapter.model_adapter.agent_factory import AgentFactory from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine from kilostar.core.global_state_machine.model_provider import Provider from kilostar.core.individual.regulatory_node.template import ( - ForConsciousnessNode, - ForUser, - regulatoryNodeDeps, - TerminationMessage, + MessageRequest, + RegulatoryNodeDeps, + MessageResponse ) from pydantic_ai import RunContext, Agent -from kilostar.utils.ray_hook import ray_actor_hook - - -class ClientMessage: - def __init__(self, user_id: str, user_name: str, message: str): - self.user_id = user_id - self.user_name = user_name - self.message = message - self.platform = "client" @ray.remote @@ -62,7 +52,7 @@ class RegulatoryNode: global_state_machine: 全局状态机 provider_title: 供应商名 model_id: 模型id - + tools_list: 工具列表 Returns: 无返回 """ @@ -76,9 +66,8 @@ class RegulatoryNode: "3. 如果你收到的是 TerminationMessage(代表工作流已完成并生成了报告),请将报告内容转化为友好的面向用户的回复,使用 ForUser 格式。\n" "请保持冷静、专业,并严格遵循上述路由规则。" ) - output_type = Union[ForConsciousnessNode, ForUser] + output_type = Union[MessageResponse] from kilostar.utils.get_tool import load_tools_from_list - provider: Provider = await global_state_machine.get_provider.remote( provider_title ) @@ -90,13 +79,13 @@ class RegulatoryNode: model_id=model_id, output_type=output_type, system_prompt=system_prompt, - deps_type=regulatoryNodeDeps, + deps_type=RegulatoryNodeDeps, agent_name="regulatory_node", tools=callables, ) @self.agent.system_prompt - async def dynamic_prompt(ctx: RunContext[regulatoryNodeDeps]): + async def dynamic_prompt(ctx: RunContext[RegulatoryNodeDeps]): """执行与 dynamic prompt 相关的核心业务流转操作。 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。 Args: ctx (RunContext[regulatoryNodeDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。 @@ -123,23 +112,7 @@ class RegulatoryNode: ) return prompt - async def handle_chat_message( - self, user_id: str, chat_id: str, message: str - ) -> str: - payload = ClientMessage( - user_id=user_id, user_name="", message=message - ) - return await self._process(payload) - - async def handle_client_message( - self, user_id: str, user_name: str, message: str - ) -> str: - payload = ClientMessage( - user_id=user_id, user_name=user_name, message=message - ) - return await self._process(payload) - - async def working(self, payload: Union[ClientMessage, TerminationMessage]) -> str: + async def working(self, payload: MessageRequest) -> str: """working方法,是节点唯一的调用方法,对于_run函数的结果进行判断并实现最终回复 Args: payload: 消息载荷,包含所有信息 @@ -147,56 +120,27 @@ class RegulatoryNode: Returns: str,监控节点对于用户的回复 """ - return await self._process(payload) - - async def _process( - self, payload: Union[ClientMessage, TerminationMessage] - ) -> str: - try: - result = await self._run(payload) - if isinstance(result, ForConsciousnessNode): - self.logger.info("regulatoryNode: 任务需交意识节点处理") - workflow_running_engine = ray_actor_hook( - "workflow_running_engine" - ).workflow_running_engine - await workflow_running_engine.put_event.remote(payload) - return f"任务已创建,准备创建工作流。原因:{result.reasoning}" - elif isinstance(result, ForUser): - self.logger.info("regulatoryNode: 直接向用户返回简单回复。") - return result.context - else: - self.logger.error(f"regulatoryNode: 未知响应类型: {type(result)}") - return "抱歉,系统内部遇到未知错误,无法正确处理您的请求。" - except Exception: - self.logger.exception("regulatoryNode在处理请求时发生未捕获的严重错误") - return "抱歉,监控节点处理请求时发生严重错误,请联系管理员。" - - @overload - async def _run( - self, payload: ClientMessage - ) -> Union[ForConsciousnessNode, ForUser]: ... - - @overload - async def _run(self, payload: TerminationMessage) -> ForUser: ... + await self._run(payload) + return "" async def _run( - self, payload: Union[ClientMessage, TerminationMessage] - ) -> Union[ForConsciousnessNode, ForUser]: + self, payload: MessageRequest + ) -> Union[MessageResponse, None]: platform = payload.platform user_name = payload.user_name message = payload.message time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: - deps = regulatoryNodeDeps( - platform=platform, user_name=user_name, time=time_str + deps = RegulatoryNodeDeps( + platform=platform, + user_name=user_name, + time=time_str ) - self.logger.debug("regulatoryNode 开始生成 (启用原生 Pydantic-AI 重试)") - prompt_message = message - if isinstance(payload, TerminationMessage): - prompt_message = f"【工作流执行结束报告】\n请将以下技术报告转化为对用户的友好回复:\n{message}" - self.agent.retries = 3 - result = await self.agent.run(prompt_message, deps=deps) - return result.output - except Exception as e: - self.logger.exception(f"regulatoryNode 模型生成或解析最终失败: {str(e)}") - return ForUser(context="系统当前负载过高或遇到复杂内部错误,请稍后再试。") + agent_response = await self.agent.run(user_prompt=message, + deps=deps,) + response: MessageResponse = agent_response.output + response.platform = platform + response.platform_id = MessageRequest.platform_id + return response + except: + pass \ No newline at end of file diff --git a/kilostar/core/individual/regulatory_node/template.py b/kilostar/core/individual/regulatory_node/template.py index 4be7331..f22c161 100644 --- a/kilostar/core/individual/regulatory_node/template.py +++ b/kilostar/core/individual/regulatory_node/template.py @@ -11,51 +11,54 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Literal, Optional from pydantic import Field -from kilostar.utils.agent_model import ResponseModel, DepsModel -from pydantic import BaseModel + +from kilostar.utils.agent_model import ResponseModel, DepsModel, RequestModel -class regulatoryNodeResponse(ResponseModel): - """regulatoryNodeResponse 核心组件类。 - 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" +class RegulatoryNodeResponse(ResponseModel): + """ + RegulatoryNodeResponse类 + 一切regulatory_node回复的父类 + """ pass +class RegulatoryNodeRequest(RequestModel): + """ + RegulatoryNodeRequest类 + 向regulatory请求的父类 + """ + pass -class ForUser(regulatoryNodeResponse): - """ForUser 核心组件类。 - 这是一个领域数据模型或功能封装类,承载了 ForUser 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" - - context: str = Field( - ..., - description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。", - ) - - -class ForConsciousnessNode(regulatoryNodeResponse): - """ForConsciousnessNode 核心组件类。 - 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" - - reasoning: str = Field(..., description="选择将任务移交意识节点的简短原因。") - - -class TerminationMessage(BaseModel): - """TerminationMessage 核心组件类。 - 这是一个领域数据模型或功能封装类,承载了 TerminationMessage 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" - - platform: str - user_name: str - message: str - - -class regulatoryNodeDeps(DepsModel): - """regulatoryNodeDeps 核心组件类。 - 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。""" - +class RegulatoryNodeDeps(DepsModel): + """ + RegulatoryNodeDeps类 + regulatory_node的依赖模型 + """ platform: str user_name: str time: str retry_count: int = 0 error_history: str = "" + +class MessageRequest(RequestModel): + """ + MessageRequest类 + 任何消息渠道向regulatory_node发送消息请求的模型 + """ + platform: Literal["client"] + user_name: str + platform_id: Optional[str] + message: str + +class MessageResponse(RegulatoryNodeResponse): + """ + MessageResponse类 + regulatory_node回复的模型 + """ + platform: Optional[Literal["client"]] = Field(description="系统自动填入的platform") + platform_id: Optional[str] = Field(description="系统自动填入的platform_id") + reply_message: str = Field(...,description="模型回复的消息") diff --git a/kilostar/utils/agent_model.py b/kilostar/utils/agent_model.py index 92ae2ba..8baeae5 100644 --- a/kilostar/utils/agent_model.py +++ b/kilostar/utils/agent_model.py @@ -17,21 +17,24 @@ from pydantic import BaseModel class ResponseModel(BaseModel): - """ResponseModel 核心组件类。 - 这是一个领域数据模型或功能封装类,承载了 ResponseModel 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" - + """ + ResponseModel类 + 继承自pydantic的BaseModel类,是一切回复模型的父类 + """ pass class DepsModel(BaseModel): - """DepsModel 核心组件类。 - 这是一个领域数据模型或功能封装类,承载了 DepsModel 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" - + """ + DepsModel类 + 继承自pydantic的BaseModel类,是agent运行时依赖模型的父类 + """ pass -class InputModel(BaseModel): - """InputModel 核心组件类。 - 这是一个领域数据模型或功能封装类,承载了 InputModel 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。""" - +class RequestModel(BaseModel): + """ + RequestModel类 + 继承自pydantic的BaseModel类,是一切请求模型的父类 + """ pass