diff --git a/docs/problem.md b/docs/problem.md index ab8f2d7..ede8aa4 100644 --- a/docs/problem.md +++ b/docs/problem.md @@ -1,13 +1,11 @@ -待解决问题 ---- -## 问题栏 -#### 🔴 核心缺陷与修复 (Bug Fixes & Stability) +## 待解决问题 +## 问题栏 + +#### 🔴 核心缺陷与修复 (Bug Fixes & Stability) #### 🛡️ 安全与合规 (Security & Auth) #### ⚡ 性能与资源优化 (Performance & Scalability) - #### 🏗️ 架构演进 (Architecture & Refactoring) - diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py index 0a42980..c20d975 100644 --- a/pretor/api/workflow.py +++ b/pretor/api/workflow.py @@ -19,16 +19,16 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) -@workflow_router.websocket("/ws/{event_id}") -async def get_workflow(websocket: WebSocket, event_id: str): +@workflow_router.websocket("/ws/{trace_id}") +async def get_workflow(websocket: WebSocket, trace_id: str): await websocket.accept() global_state_machine = ray_actor_hook("global_state_machine") try: while True: - await websocket.send(await global_state_machine.get_workflow.remote(event_id)) - await websocket.send_text(await global_state_machine.get_pending.remote(event_id)) + await websocket.send(await global_state_machine.get_workflow.remote(trace_id)) + await websocket.send_text(await global_state_machine.get_pending.remote(trace_id)) response = await websocket.receive_text() - await global_state_machine.put_received(event_id, response) + await global_state_machine.put_received(trace_id, response) except WebSocketDisconnect: pass except RuntimeError as e: diff --git a/pretor/core/database/table/individual.py b/pretor/core/database/table/individual.py index dcc7dd2..b05e65c 100644 --- a/pretor/core/database/table/individual.py +++ b/pretor/core/database/table/individual.py @@ -13,7 +13,7 @@ # limitations under the License. from sqlmodel import SQLModel, Field -from typing import List, Dict +from typing import List, Optional from sqlalchemy import Column, JSON from enum import Enum @@ -30,8 +30,8 @@ class WorkerIndividual(SQLModel, table=True): description: str = Field(nullable=False) provider_title: str model_id: str - system_prompt: str - output_template: dict = Field(sa_column=Column(JSON),description="输出模板标识") - bound_skill: Dict[str, List[str]] = Field(sa_column=Column(JSON)) - workspace: List[str] = Field(sa_column=Column(JSON)) + system_prompt: Optional[dict] + output_template: Optional[dict] = Field(sa_column=Column(JSON),description="输出模板标识") + bound_skill: Optional[str] = Field(sa_column=Column(JSON)) + workspace: Optional[List[str]] = Field(sa_column=Column(JSON)) owner_id: str \ No newline at end of file diff --git a/pretor/core/global_state_machine/global_state_machine.py b/pretor/core/global_state_machine/global_state_machine.py index a6f2d49..90c0dab 100644 --- a/pretor/core/global_state_machine/global_state_machine.py +++ b/pretor/core/global_state_machine/global_state_machine.py @@ -116,31 +116,31 @@ class GlobalStateMachine: def add_event(self, event: PretorEvent) -> None: event.pending_queue = asyncio.Queue() event.receive_queue = asyncio.Queue() - self.event_dict[event.event_id] = event + self.event_dict[event.trace_id] = event - def delete_event(self, event_id: str) -> None: - del self.event_dict[event_id] + def delete_event(self, trace_id: str) -> None: + del self.event_dict[trace_id] - def get_event(self, event_id: str) -> PretorEvent: - return self.event_dict.get(event_id, None) + def get_event(self, trace_id: str) -> PretorEvent: + return self.event_dict.get(trace_id, None) - def update_attachment(self, event_id: str, attachment: Dict[str, str]) -> None: - self.event_dict[event_id].attachment = attachment + def update_attachment(self, trace_id: str, attachment: Dict[str, str]) -> None: + self.event_dict[trace_id].attachment = attachment - def update_workflow(self, event_id: str, workflow: PretorWorkflow) -> None: - self.event_dict[event_id].workflow = workflow + def update_workflow(self, trace_id: str, workflow: PretorWorkflow) -> None: + self.event_dict[trace_id].workflow = workflow - def get_workflow(self, event_id: str) -> PretorWorkflow: - return self.event_dict[event_id].workflow + def get_workflow(self, trace_id: str) -> PretorWorkflow: + return self.event_dict[trace_id].workflow - async def put_pending(self, event_id, item) -> None: - await self.event_dict[event_id].pending_queue.put(item) + async def put_pending(self, trace_id, item) -> None: + await self.event_dict[trace_id].pending_queue.put(item) - async def get_pending(self, event_id) -> str: - return await self.event_dict[event_id].pending_queue.get() + async def get_pending(self, trace_id) -> str: + return await self.event_dict[trace_id].pending_queue.get() - async def put_received(self, event_id, item) -> None: - await self.event_dict[event_id].receive_queue.put(item) + async def put_received(self, trace_id, item) -> None: + await self.event_dict[trace_id].receive_queue.put(item) - async def get_received(self, event_id) -> str: - return await self.event_dict[event_id].receive_queue.get() + async def get_received(self, trace_id) -> str: + return await self.event_dict[trace_id].receive_queue.get() diff --git a/pretor/core/individual/control_node/control_node.py b/pretor/core/individual/control_node/control_node.py index d598fb8..4429d3a 100644 --- a/pretor/core/individual/control_node/control_node.py +++ b/pretor/core/individual/control_node/control_node.py @@ -68,9 +68,8 @@ class ControlNode: prompt += ( f"=== 当前任务步骤上下文 ===\n" f"- 步骤名称 (Name): {ctx.deps.workflow_step.name}\n" - f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.description}\n" - f"- 前置步骤结果参考 (Previous Results): {ctx.deps.workflow_step.precondition}\n" - f"- 可用工具 (Available Tools): {ctx.deps.current_tools}\n" + f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.desc}\n" + f"- 前置输入(input): {ctx.deps.workflow_step.input}\n" ) return prompt diff --git a/pretor/core/individual/supervisory_node/template.py b/pretor/core/individual/supervisory_node/template.py index 4b2705a..1c7c114 100644 --- a/pretor/core/individual/supervisory_node/template.py +++ b/pretor/core/individual/supervisory_node/template.py @@ -23,7 +23,7 @@ class ForUser(SupervisoryNodeResponse): context: str = Field(..., description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。") class ForConsciousnessNode(SupervisoryNodeResponse): - orkflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。") + workflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。") reasoning: str = Field(..., description="选择将任务移交意识节点并选用该模板的简短原因。") class TerminationMessage(BaseModel): diff --git a/pretor/core/workflow/workflow.py b/pretor/core/workflow/workflow.py index c617256..584a9a0 100644 --- a/pretor/core/workflow/workflow.py +++ b/pretor/core/workflow/workflow.py @@ -14,7 +14,6 @@ from typing import List, Optional, Union, Literal, Dict, Any from pydantic import BaseModel, Field, model_validator -from pretor.utils.demand_protocol import DemandProtocol from pretor.utils.logger import get_logger logger = get_logger('workflow') @@ -33,6 +32,7 @@ class LogicGate(BaseModel): class WorkStep(BaseModel): step: int = Field(..., gt=0, description="步骤序号,严格自增") + name: str = Field(..., description="步骤名称") node: NodeType = Field(..., description="负责执行的节点类型") action: str = Field(..., description="执行的原子动作") desc: str = Field(..., description="动作细节的自然语言描述,包含人工规范指导") @@ -56,7 +56,6 @@ class WorkflowStatus(BaseModel): default="waiting_llm_working", description="当前系统调度状态" ) - demand: DemandProtocol = Field(default=None, description="需要的资源或插件调用请求") class PretorWorkflow(BaseModel): title: str = Field(..., description="工作流的标题") @@ -68,7 +67,7 @@ class PretorWorkflow(BaseModel): command: Optional[str] = Field(default=None, description="触发此工作流的原始命令") output: Dict[str, Any] = Field(default_factory=dict, description="工作流最终产出结果") status: WorkflowStatus = Field(default_factory=WorkflowStatus, description="运行时状态对象") - event_info: EventInfo | None = Field(default_factory=None) + event_info: EventInfo | None = Field(default=None) context_memory: Dict[str, Any] = Field(default_factory=dict) @model_validator(mode='after') diff --git a/pretor/core/workflow/workflow_runner.py b/pretor/core/workflow/workflow_runner.py index 77451f2..77b3229 100644 --- a/pretor/core/workflow/workflow_runner.py +++ b/pretor/core/workflow/workflow_runner.py @@ -210,7 +210,7 @@ class WorkflowEngine: agent_id = input_data.get("agent_id") task_event = { "action": step.action, - "description": step.description, + "description": step.desc, "input_data": input_data, "context_memory": self.workflow.context_memory } diff --git a/pretor/plugin/tool_plugin/approval/approval.py b/pretor/plugin/tool_plugin/approval/approval.py index 9963d2d..e834bef 100644 --- a/pretor/plugin/tool_plugin/approval/approval.py +++ b/pretor/plugin/tool_plugin/approval/approval.py @@ -23,17 +23,17 @@ class ApprovalToolData(BaseToolData): config_args: Dict[str, str] = {} -async def approval(message: str, event_id: str) -> str: +async def approval(message: str, trace_id: str) -> str: """ 当任务存在某些高风险操作或者计划需要让用户审批,发送请求给用户等待用户审批 Args: message: 发送给用户的请求 - event_id: + trace_id: Returns: 用户的审批结果 """ actor_list = ray_actor_hook("global_state_machine") - await actor_list.global_state_machine.put_pending.remote(event_id, message) - reply = await actor_list.global_state_machine.get_received.remote(event_id) + await actor_list.global_state_machine.put_pending.remote(trace_id, message) + reply = await actor_list.global_state_machine.get_received.remote(trace_id) return reply \ No newline at end of file diff --git a/pretor/utils/demand_protocol.py b/pretor/utils/demand_protocol.py deleted file mode 100644 index 92689b6..0000000 --- a/pretor/utils/demand_protocol.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2026 zhaoxi826 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Dict, Any, Literal -from pydantic import BaseModel, Field - -# --- 1. 给 Individual (LLM/Agent) 的具体需求 --- -class IndividualDemand(BaseModel): - role_prompt: str = Field(..., description="赋予该个体的角色定义") - task_goal: str = Field(..., description="该个体的具体执行目标") - expected_output: str = Field(..., description="期望产出的数据结构或格式描述") - -# --- 2. 给 Tool (插件/函数调用) 的具体需求 --- -class ToolDemand(BaseModel): - method: str = Field(..., description="插件调用的具体方法名") - args: Dict[str, Any] = Field(default_factory=dict, description="传递给插件的参数") - -# --- 3. 给 System (系统/物理资源) 的具体需求 --- -class SystemDemand(BaseModel): - operation: Literal["allocate_resource", "docker_manage", "file_io", "network"] - params: Dict[str, Any] = Field(..., description="操作所需的物理参数,如 GPU 核心数、路径等") - -# --- 4. 统一需求入口 (裁判官协议体) --- -class DemandProtocol(BaseModel): - variety: Literal["individual", "tool", "system"] - name: str = Field(..., description="目标名称(如:python_expert, pytest_tool, docker_engine)") \ No newline at end of file