Compare commits

..

2 Commits

Author SHA1 Message Date
朝夕 edafe63e29 feat: workflow_template改为可不选 2026-04-29 10:02:40 +08:00
朝夕 d713bd1b30
Fix workflow scheduling for Skill Individuals and LeftPanel polling (#57)
- Update `workflow_runner.py` to retrieve `Skill Individual` items with their real `agent_id` rather than names, preventing assignment mismatches when routing via `WorkerCluster`.
- Update `ConsciousnessNode` prompts to strictly instruct the LLM to output real `agent_id`s in `agent_id` workflow fields.
- Update `WorkStep` schemas to reflect the requirement of `agent_id`.
- Add a 2-second polling interval in the frontend `LeftPanel.tsx` to automatically update the workflow list state rather than fetching only once upon tab selection.

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>
2026-04-29 09:07:41 +08:00
10 changed files with 119 additions and 50 deletions

View File

@ -8,3 +8,4 @@ frontend/dist
docker-compose.yml
.env
.env.example
.idea

View File

@ -16,6 +16,7 @@ WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
git \
&& rm -rf /var/lib/apt/lists/*
# Install uv package manager

View File

@ -42,6 +42,69 @@
| **pretor-pioneer** | **先驱者** | **知识增强**RAG 检索增强引擎,管理私有知识库的向量化、索引与精准检索。 | 📅 规划中 |
---
## 快速开始
本项目正在开发中...
## 🚀 快速开始 (Quick Start)
> **当前版本**`v0.1.0-alpha` (开发预览版)
> 本项目目前处于快速迭代阶段,欢迎提交 Issue 或 Pull Request。
### 方式一:使用 Docker Compose (推荐)
这是部署 **Pretor 应用** 及其配套 **PostgreSQL 数据库** 最简单、最完整的方式。
1. **准备配置文件**:在本地创建一个目录,并新建 `docker-compose.yml`
```yaml
services:
db:
image: postgres:16-alpine
container_name: pretor_db
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgrespassword
POSTGRES_DB: pretor
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d pretor"]
interval: 5s
timeout: 5s
retries: 5
pretor:
image: zhaoxi5699/pretor:v0.1.0alpha
container_name: pretor
ports:
- "8000:8000"
- "8265:8265"
depends_on:
db:
condition: service_healthy
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgrespassword
- POSTGRES_HOST=db
- POSTGRES_PORT=5432
- POSTGRES_DB=pretor
- SECRET_KEY=changethiskey12345 # 请在生产环境中修改此密钥
```
2. **启动服务**
```bash
docker compose up -d
```
### 方式二:使用 Docker
1. **启动服务**
```bash
docker run -d \
--name pretor \
-p 8000:8000 \
-p 8265:8265 \
-e POSTGRES_HOST=你的数据库IP \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgrespassword \
-e POSTGRES_DB=pretor \
-e SECRET_KEY=your_secret_key \
zhaoxi5699/pretor:v0.1.0alpha
```
## 🔍 访问与验证
服务启动后,可以通过以下地址进行操作:
- Web 控制台 / API 文档: http://localhost:8000
- Ray 任务仪表盘: http://localhost:8265

View File

@ -16,9 +16,9 @@ services:
timeout: 5s
retries: 5
app:
pretor:
build: .
container_name: pretor_app
container_name: pretor
ports:
- "8000:8000"
- "8265:8265"

View File

@ -40,9 +40,10 @@ export function LeftPanel({ activeTab, setActiveTab, selectedWorkflow, setSelect
const memPercent = totalMemory > 0 ? (usedMemory / totalMemory) * 100 : 0;
useEffect(() => {
if (activeTab === 'workflows') {
const fetchWorkflows = async () => {
setLoadingWorkflows(true);
let intervalId: ReturnType<typeof setInterval>;
const fetchWorkflows = async (isInitial = false) => {
if (isInitial) setLoadingWorkflows(true);
try {
const response = await apiClient.get('/api/v1/workflow/list');
// Fallback parsing just in case it returns an object or array
@ -59,11 +60,18 @@ export function LeftPanel({ activeTab, setActiveTab, selectedWorkflow, setSelect
console.error("Failed to fetch workflows", error);
setWorkflows([]);
} finally {
setLoadingWorkflows(false);
if (isInitial) setLoadingWorkflows(false);
}
};
fetchWorkflows();
if (activeTab === 'workflows') {
fetchWorkflows(true);
intervalId = setInterval(() => fetchWorkflows(false), 2000);
}
return () => {
if (intervalId) clearInterval(intervalId);
};
}, [activeTab]);
return (

View File

@ -80,9 +80,9 @@ class ConsciousnessNode:
prompt += f"- 选定工作流模板 (Workflow Template): {ctx.deps.workflow_template}\n"
if ctx.deps.available_skills:
prompt += "\n=== 当前可用 Skill Individual ===\n"
prompt += "你可以直接将以下 Skill Individual 安排进工作流的步骤中(设置 node 为 skill_individual并将 agent_id 设置为对应 Skill Individual 名称),作为可调用的工具。\n"
prompt += "你可以直接将以下 Skill Individual 安排进工作流的步骤中(设置 node 为 skill_individual并将 agent_id 设置为对应 Skill Individual 的真实 agent_id不要用名称),作为可调用的工具。\n"
for skill in ctx.deps.available_skills:
prompt += f"- 名称: {skill['name']}\n 描述: {skill['description']}\n"
prompt += f"- 真实 agent_id: {skill.get('agent_id')}\n 名称: {skill['name']}\n 描述: {skill['description']}\n"
return prompt

View File

@ -37,7 +37,7 @@ class WorkStep(BaseModel):
desc: str = Field(..., description="动作细节的自然语言描述,包含人工规范指导")
inputs: Optional[Union[str, List[str]]] = Field(default=None, description="前置依赖输出")
outputs: Optional[str] = Field(default=None, description="当前步骤产出物变量名")
agent_id: Optional[str] = Field(default=None, description="分配给 skill_individual 的 Skill Individual 名称")
agent_id: Optional[str] = Field(default=None, description="分配给 skill_individual 的 Skill Individual 真实 agent_id不可用名称代替")
logic_gate: Optional[LogicGate] = Field(default=None, description="逻辑跳转控制")
status: Literal["waiting", "running", "completed", "failed"] = Field(
default="waiting",

View File

@ -316,13 +316,17 @@ class WorkflowRunningEngine:
available_skills = None
if self.global_state_machine:
try:
raw_skills = await self.global_state_machine.get_skill_list.remote()
available_skills = [
{"name": name, "description": details[0], "instructions": details[1]}
for name, details in raw_skills.items()
]
all_individuals = await self.global_state_machine.list_individuals.remote()
available_skills = []
for agent_id, config in all_individuals.items():
if config.get("agent_type") == "skill_individual" or config.get("type") == "skill_individual":
available_skills.append({
"agent_id": agent_id,
"name": config.get("agent_name", "Unknown"),
"description": config.get("description", "")
})
except Exception as e:
self.logger.warning(f"获取技能列表失败: {e}")
self.logger.warning(f"获取Skill Individual列表失败: {e}")
payload = ForWorkflowEngineInput(
original_command=event.message,

View File

@ -60,6 +60,8 @@ async def test_workflow_engine_run():
step1.step = 1
step1.status = "waiting"
step1.node = "control_node"
step1.name = "mock_name"
step1.desc = "mock_desc"
step1.action = "mock_action"
step1.inputs = []
step1.outputs = "res"

View File

@ -1,11 +1,5 @@
import pytest
from pretor.core.workflow.workflow import WorkerGroup, WorkStep, PretorWorkflow, WorkflowStatus, LogicGate
def test_worker_group():
wg = WorkerGroup(name="group1", primary_individual={"coder": 1}, composite_individual={"tester": 1})
assert wg.name == "group1"
assert wg.primary_individual == {"coder": 1}
assert wg.composite_individual == {"tester": 1}
from pretor.core.workflow.workflow import WorkStep, PretorWorkflow, WorkflowStatus, LogicGate
def test_work_step():
ws = WorkStep(
@ -25,31 +19,27 @@ def test_work_step():
def test_pretor_workflow_validation_success():
ws1 = WorkStep(step=1, name="s1", node="control_node", action="a1", desc="d1")
ws2 = WorkStep(step=2, name="s2", node="supervisory_node", action="a2", desc="d2")
wg = WorkerGroup(name="g1", primary_individual={"coder": 1}, composite_individual={})
wf = PretorWorkflow(title="wf1", workgroup_list=[wg], work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
wf = PretorWorkflow(title="wf1", work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
assert wf.title == "wf1"
def test_pretor_workflow_validation_error_step_discontinuous():
ws1 = WorkStep(step=1, name="s1", node="control_node", action="a1", desc="d1")
ws2 = WorkStep(step=3, name="s3", node="supervisory_node", action="a2", desc="d2")
wg = WorkerGroup(name="g1", primary_individual={}, composite_individual={})
with pytest.raises(ValueError, match="工作链步数不连续"):
PretorWorkflow(title="wf1", workgroup_list=[wg], work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
PretorWorkflow(title="wf1", work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
def test_pretor_workflow_validation_error_jump_out_of_bounds():
lg = LogicGate(if_fail="jump_to_step_3", if_pass="continue")
ws1 = WorkStep(step=1, name="s1", node="control_node", action="a1", desc="d1", logic_gate=lg)
ws2 = WorkStep(step=2, name="s2", node="supervisory_node", action="a2", desc="d2")
wg = WorkerGroup(name="g1", primary_individual={}, composite_individual={})
with pytest.raises(ValueError, match="跳转目标 Step 3 越界了"):
PretorWorkflow(title="wf1", workgroup_list=[wg], work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
PretorWorkflow(title="wf1", work_link=[ws1, ws2], trace_id="t", event_info={"platform":"a", "user_name":"b"})
def test_pretor_workflow_validation_error_jump_format_error():
lg = LogicGate(if_fail="jump_to_step_invalid", if_pass="continue")
ws1 = WorkStep(step=1, name="s1", node="control_node", action="a1", desc="d1", logic_gate=lg)
wg = WorkerGroup(name="g1", primary_individual={}, composite_individual={})
with pytest.raises(ValueError, match="LogicGate 格式错误"):
PretorWorkflow(title="wf1", workgroup_list=[wg], work_link=[ws1], trace_id="t", event_info={"platform":"a", "user_name":"b"})
PretorWorkflow(title="wf1", work_link=[ws1], trace_id="t", event_info={"platform":"a", "user_name":"b"})
def test_workflow_status():
status = WorkflowStatus()