fix: 修复了字段名bug

This commit is contained in:
朝夕 2026-04-26 21:37:20 +08:00
parent e6bf9e2ce4
commit 913648a071
10 changed files with 43 additions and 84 deletions

View File

@ -1,13 +1,11 @@
待解决问题 ## 待解决问题
---
## 问题栏
#### 🔴 核心缺陷与修复 (Bug Fixes & Stability)
## 问题栏
#### 🔴 核心缺陷与修复 (Bug Fixes & Stability)
#### 🛡️ 安全与合规 (Security & Auth) #### 🛡️ 安全与合规 (Security & Auth)
#### ⚡ 性能与资源优化 (Performance & Scalability) #### ⚡ 性能与资源优化 (Performance & Scalability)
#### 🏗️ 架构演进 (Architecture & Refactoring) #### 🏗️ 架构演进 (Architecture & Refactoring)

View File

@ -19,16 +19,16 @@ from fastapi import APIRouter, WebSocket, WebSocketDisconnect
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"]) workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
@workflow_router.websocket("/ws/{event_id}") @workflow_router.websocket("/ws/{trace_id}")
async def get_workflow(websocket: WebSocket, event_id: str): async def get_workflow(websocket: WebSocket, trace_id: str):
await websocket.accept() await websocket.accept()
global_state_machine = ray_actor_hook("global_state_machine") global_state_machine = ray_actor_hook("global_state_machine")
try: try:
while True: while True:
await websocket.send(await global_state_machine.get_workflow.remote(event_id)) await websocket.send(await global_state_machine.get_workflow.remote(trace_id))
await websocket.send_text(await global_state_machine.get_pending.remote(event_id)) await websocket.send_text(await global_state_machine.get_pending.remote(trace_id))
response = await websocket.receive_text() response = await websocket.receive_text()
await global_state_machine.put_received(event_id, response) await global_state_machine.put_received(trace_id, response)
except WebSocketDisconnect: except WebSocketDisconnect:
pass pass
except RuntimeError as e: except RuntimeError as e:

View File

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
from sqlmodel import SQLModel, Field from sqlmodel import SQLModel, Field
from typing import List, Dict from typing import List, Optional
from sqlalchemy import Column, JSON from sqlalchemy import Column, JSON
from enum import Enum from enum import Enum
@ -30,8 +30,8 @@ class WorkerIndividual(SQLModel, table=True):
description: str = Field(nullable=False) description: str = Field(nullable=False)
provider_title: str provider_title: str
model_id: str model_id: str
system_prompt: str system_prompt: Optional[dict]
output_template: dict = Field(sa_column=Column(JSON),description="输出模板标识") output_template: Optional[dict] = Field(sa_column=Column(JSON),description="输出模板标识")
bound_skill: Dict[str, List[str]] = Field(sa_column=Column(JSON)) bound_skill: Optional[str] = Field(sa_column=Column(JSON))
workspace: List[str] = Field(sa_column=Column(JSON)) workspace: Optional[List[str]] = Field(sa_column=Column(JSON))
owner_id: str owner_id: str

View File

@ -116,31 +116,31 @@ class GlobalStateMachine:
def add_event(self, event: PretorEvent) -> None: def add_event(self, event: PretorEvent) -> None:
event.pending_queue = asyncio.Queue() event.pending_queue = asyncio.Queue()
event.receive_queue = asyncio.Queue() event.receive_queue = asyncio.Queue()
self.event_dict[event.event_id] = event self.event_dict[event.trace_id] = event
def delete_event(self, event_id: str) -> None: def delete_event(self, trace_id: str) -> None:
del self.event_dict[event_id] del self.event_dict[trace_id]
def get_event(self, event_id: str) -> PretorEvent: def get_event(self, trace_id: str) -> PretorEvent:
return self.event_dict.get(event_id, None) return self.event_dict.get(trace_id, None)
def update_attachment(self, event_id: str, attachment: Dict[str, str]) -> None: def update_attachment(self, trace_id: str, attachment: Dict[str, str]) -> None:
self.event_dict[event_id].attachment = attachment self.event_dict[trace_id].attachment = attachment
def update_workflow(self, event_id: str, workflow: PretorWorkflow) -> None: def update_workflow(self, trace_id: str, workflow: PretorWorkflow) -> None:
self.event_dict[event_id].workflow = workflow self.event_dict[trace_id].workflow = workflow
def get_workflow(self, event_id: str) -> PretorWorkflow: def get_workflow(self, trace_id: str) -> PretorWorkflow:
return self.event_dict[event_id].workflow return self.event_dict[trace_id].workflow
async def put_pending(self, event_id, item) -> None: async def put_pending(self, trace_id, item) -> None:
await self.event_dict[event_id].pending_queue.put(item) await self.event_dict[trace_id].pending_queue.put(item)
async def get_pending(self, event_id) -> str: async def get_pending(self, trace_id) -> str:
return await self.event_dict[event_id].pending_queue.get() return await self.event_dict[trace_id].pending_queue.get()
async def put_received(self, event_id, item) -> None: async def put_received(self, trace_id, item) -> None:
await self.event_dict[event_id].receive_queue.put(item) await self.event_dict[trace_id].receive_queue.put(item)
async def get_received(self, event_id) -> str: async def get_received(self, trace_id) -> str:
return await self.event_dict[event_id].receive_queue.get() return await self.event_dict[trace_id].receive_queue.get()

View File

@ -68,9 +68,8 @@ class ControlNode:
prompt += ( prompt += (
f"=== 当前任务步骤上下文 ===\n" f"=== 当前任务步骤上下文 ===\n"
f"- 步骤名称 (Name): {ctx.deps.workflow_step.name}\n" f"- 步骤名称 (Name): {ctx.deps.workflow_step.name}\n"
f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.description}\n" f"- 步骤目标/描述 (Description): {ctx.deps.workflow_step.desc}\n"
f"- 前置步骤结果参考 (Previous Results): {ctx.deps.workflow_step.precondition}\n" f"- 前置输入input: {ctx.deps.workflow_step.input}\n"
f"- 可用工具 (Available Tools): {ctx.deps.current_tools}\n"
) )
return prompt return prompt

View File

@ -23,7 +23,7 @@ class ForUser(SupervisoryNodeResponse):
context: str = Field(..., description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。") context: str = Field(..., description="对用户的回复,应当使用和蔼的语气进行回复。用于直接解答简单问题或返回最终报告。")
class ForConsciousnessNode(SupervisoryNodeResponse): class ForConsciousnessNode(SupervisoryNodeResponse):
orkflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。") workflow_template: str = Field(..., description="选择的工作流模板的名称,用于处理复杂任务。")
reasoning: str = Field(..., description="选择将任务移交意识节点并选用该模板的简短原因。") reasoning: str = Field(..., description="选择将任务移交意识节点并选用该模板的简短原因。")
class TerminationMessage(BaseModel): class TerminationMessage(BaseModel):

View File

@ -14,7 +14,6 @@
from typing import List, Optional, Union, Literal, Dict, Any from typing import List, Optional, Union, Literal, Dict, Any
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, model_validator
from pretor.utils.demand_protocol import DemandProtocol
from pretor.utils.logger import get_logger from pretor.utils.logger import get_logger
logger = get_logger('workflow') logger = get_logger('workflow')
@ -33,6 +32,7 @@ class LogicGate(BaseModel):
class WorkStep(BaseModel): class WorkStep(BaseModel):
step: int = Field(..., gt=0, description="步骤序号,严格自增") step: int = Field(..., gt=0, description="步骤序号,严格自增")
name: str = Field(..., description="步骤名称")
node: NodeType = Field(..., description="负责执行的节点类型") node: NodeType = Field(..., description="负责执行的节点类型")
action: str = Field(..., description="执行的原子动作") action: str = Field(..., description="执行的原子动作")
desc: str = Field(..., description="动作细节的自然语言描述,包含人工规范指导") desc: str = Field(..., description="动作细节的自然语言描述,包含人工规范指导")
@ -56,7 +56,6 @@ class WorkflowStatus(BaseModel):
default="waiting_llm_working", default="waiting_llm_working",
description="当前系统调度状态" description="当前系统调度状态"
) )
demand: DemandProtocol = Field(default=None, description="需要的资源或插件调用请求")
class PretorWorkflow(BaseModel): class PretorWorkflow(BaseModel):
title: str = Field(..., description="工作流的标题") title: str = Field(..., description="工作流的标题")
@ -68,7 +67,7 @@ class PretorWorkflow(BaseModel):
command: Optional[str] = Field(default=None, description="触发此工作流的原始命令") command: Optional[str] = Field(default=None, description="触发此工作流的原始命令")
output: Dict[str, Any] = Field(default_factory=dict, description="工作流最终产出结果") output: Dict[str, Any] = Field(default_factory=dict, description="工作流最终产出结果")
status: WorkflowStatus = Field(default_factory=WorkflowStatus, description="运行时状态对象") status: WorkflowStatus = Field(default_factory=WorkflowStatus, description="运行时状态对象")
event_info: EventInfo | None = Field(default_factory=None) event_info: EventInfo | None = Field(default=None)
context_memory: Dict[str, Any] = Field(default_factory=dict) context_memory: Dict[str, Any] = Field(default_factory=dict)
@model_validator(mode='after') @model_validator(mode='after')

View File

@ -210,7 +210,7 @@ class WorkflowEngine:
agent_id = input_data.get("agent_id") agent_id = input_data.get("agent_id")
task_event = { task_event = {
"action": step.action, "action": step.action,
"description": step.description, "description": step.desc,
"input_data": input_data, "input_data": input_data,
"context_memory": self.workflow.context_memory "context_memory": self.workflow.context_memory
} }

View File

@ -23,17 +23,17 @@ class ApprovalToolData(BaseToolData):
config_args: Dict[str, str] = {} config_args: Dict[str, str] = {}
async def approval(message: str, event_id: str) -> str: async def approval(message: str, trace_id: str) -> str:
""" """
当任务存在某些高风险操作或者计划需要让用户审批发送请求给用户等待用户审批 当任务存在某些高风险操作或者计划需要让用户审批发送请求给用户等待用户审批
Args: Args:
message: 发送给用户的请求 message: 发送给用户的请求
event_id: trace_id:
Returns: Returns:
用户的审批结果 用户的审批结果
""" """
actor_list = ray_actor_hook("global_state_machine") actor_list = ray_actor_hook("global_state_machine")
await actor_list.global_state_machine.put_pending.remote(event_id, message) await actor_list.global_state_machine.put_pending.remote(trace_id, message)
reply = await actor_list.global_state_machine.get_received.remote(event_id) reply = await actor_list.global_state_machine.get_received.remote(trace_id)
return reply return reply

View File

@ -1,37 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Any, Literal
from pydantic import BaseModel, Field
# --- 1. 给 Individual (LLM/Agent) 的具体需求 ---
class IndividualDemand(BaseModel):
role_prompt: str = Field(..., description="赋予该个体的角色定义")
task_goal: str = Field(..., description="该个体的具体执行目标")
expected_output: str = Field(..., description="期望产出的数据结构或格式描述")
# --- 2. 给 Tool (插件/函数调用) 的具体需求 ---
class ToolDemand(BaseModel):
method: str = Field(..., description="插件调用的具体方法名")
args: Dict[str, Any] = Field(default_factory=dict, description="传递给插件的参数")
# --- 3. 给 System (系统/物理资源) 的具体需求 ---
class SystemDemand(BaseModel):
operation: Literal["allocate_resource", "docker_manage", "file_io", "network"]
params: Dict[str, Any] = Field(..., description="操作所需的物理参数,如 GPU 核心数、路径等")
# --- 4. 统一需求入口 (裁判官协议体) ---
class DemandProtocol(BaseModel):
variety: Literal["individual", "tool", "system"]
name: str = Field(..., description="目标名称python_expert, pytest_tool, docker_engine")