Compare commits

...

2 Commits

Author SHA1 Message Date
朝夕 b62212d8bf fix: 修复部分错误 2026-04-28 04:23:57 +08:00
朝夕 a1adbd00af
Refactor Agent Tool Assignments to Database-Driven Static Loading (#45)
* Refactor tool loading to use static database assignment

- Add `tools` field to `WorkerIndividual` and `SystemNodeConfig` models
- Update frontend WorkerIndividualSettings to support multi-select tools and select `bound_skill` from available backend API
- Deprecate dynamic `get_tool` method during agent execution, replacing it with `load_tools_from_list` via absolute paths during agent initialization
- Fix deprecated tool loading usage across `SupervisoryNode`, `ConsciousnessNode`, `ControlNode`, and `BaseIndividual`
- Provide `reload` API route to clear `GlobalIndividualManager` state to rebuild nodes cleanly upon next request
- Fix `user_test.py` related to unhashed password assignments in testing mock object

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* Refactor tool loading to use static database assignment part 2

- Restore BaseIndividual's `_init_agent` method to inject parsed `tools_list` via absolute paths dynamically using `load_tools_from_list`
- Restore React component multi-select support for tools dropdown via `/api/v1/resource/tool` backend response integration
- Restore `user_test.py` compatibility related to access hasher assignments
- Passed 75 tests with no errors

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* Refactor tool loading to use static database assignment

- Add `tools` field to `WorkerIndividual` and `SystemNodeConfig` models
- Update frontend WorkerIndividualSettings to support multi-select tools and select `bound_skill` from available backend API
- Deprecate dynamic `get_tool` method during agent execution, replacing it with `load_tools_from_list` via absolute paths during agent initialization
- Fix deprecated tool loading usage across `SupervisoryNode`, `ConsciousnessNode`, `ControlNode`, and `BaseIndividual`
- Provide `reload` API route to clear `GlobalIndividualManager` state to rebuild nodes cleanly upon next request
- Fix `user_test.py` related to unhashed password assignments in testing mock object

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* Finalize all tool loading refactoring with complete changes

- Merged database schema enhancements for `tools` multi-select list
- Completed the `load_tools_from_list` via absolute paths utility update
- Refactored `AgentFactory` and corresponding initialization calls in `BaseIndividual`, `SupervisoryNode`, `ConsciousnessNode`, and `ControlNode` to inject statically via constructor
- Updated frontend UI `WorkerIndividualSettings` with UI for `bound_skill` selection and dynamic multi-tool selector array
- Removed all legacy usages of dynamic `tools=tool` invocation
- Provided backend `/reload` API and automatic clearing of instance cache upon individual configuration update
- Repaired mock password assignment logic in testing suite

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* Restore complete tool loading refactor logic

- Re-apply BaseIndividual tools resolution logic
- Correctly patch API, UI files, system nodes and testing scripts avoiding checkout loss
- Passed all 75 integration and unit tests successfully

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>
2026-04-28 03:53:33 +08:00
19 changed files with 261 additions and 110 deletions

0
CODE_OF_CONDUCT.md Normal file
View File

View File

@ -5,8 +5,12 @@
一款基于 Python 的分布式多 Agent 协作系统
[![Python 3.13+](https://img.shields.io/badge/python-3.13+-blue.svg)](https://www.python.org/)
[![Ray](https://img.shields.io/badge/Distributed-Ray-0288d1.svg)](https://docs.ray.io/)
[![Pydantic-AI](https://img.shields.io/badge/Framework-Pydantic--AI-ff69b4.svg)](https://ai.pydantic.dev/)
[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE)
[**项目架构**](./docs/ARCHITECTURE.md) | [**更新日志**](./changelogs/CHANGELOG.md) | [**未来展望**](./changelogs/ROADMAP.md)
</div>
---
@ -14,44 +18,30 @@
---
## 特别之处?
## ✨ 核心特性
- 本项目通过 **Ray****vllm** 实现个人个性化助手的创建,你可以通过收集符合你自己偏好的数据,构建独属于自己的风格的人工智能助手。
- 本项目通过多 Agent 协作,实现比起单 Agent 系统更强的性能,从而完成更复杂的任务。
- 本项目通过调用本地模型从而减少对于昂贵的API使用和一定程度上的安全保护。如果你是创作者可以通过用自己的作品去训练属于自己的模型提高工作效率。
- (暂未实现)本项目适配多种消息平台,实现在外可通过多种方式给 **Pretor** 下达指令完成工作。
- (暂未实现)本项目内置 **growth_node(生长节点)** ,实现傻瓜式微调模型操作,让你的 **Pretor** 自己学会一些独特的技能。
### 🧠 异构协作体系
- **多智能体集群**:内置主管 (Supervisory)、意识 (Consciousness)、控制 (Control) 三大核心节点,实现比单 Agent 系统更严谨的决策链。
- **Worker 动态派生**:根据任务需求动态拉起 Ordinary 或 Skill 类型的 Worker Individual实现资源的按需分配。
### 🚀 分布式性能保障
- **Ray 驱动**:底层基于 Ray 构建,支持跨进程、跨机器的 Actor 通讯,轻松应对高并发任务流。
- **本地化优先**:深度适配 **vLLM**,支持本地私有化模型部署,在保障隐私的同时大幅降低 API 调用成本。
### 🛠️ 工业级工程设计
- **强类型契约**:基于 Pydantic-AI 实现 Tool 与 Agent 的接口定义,确保 AI 输出的确定性与安全性。
- **自动化流**:内置工作流引擎 (Workflow Engine),实现从需求发现到自动化执行的闭环。
### 📦 Pretor 生态子项目 (Sub-projects)
| 项目名称 | 代号 | 功能定位 | 当前状态 |
|:-----------------------------------------------------------|:--------| :--- | :--- |
| **[pretor-viceroy](https://github.com/zhaoxi826/viceroy)** | **总督** | **资源管理**:负责系统 Skill 的动态安装、元数据解析与全集群分发。 | ✅ 已发布 |
| **pretor-stardomain** | **星域** | **安全沙箱**:为 Agent 自动生成的代码提供轻量化的隔离运行环境,防止逃逸。 | 📅 规划中 |
| **pretor-explorer** | **探索者** | **网页感知**:自动化爬虫引擎,赋予智能体实时互联网信息搜索与内容抓取能力。 | 📅 规划中 |
| **pretor-pioneer** | **先驱者** | **知识增强**RAG 检索增强引擎,管理私有知识库的向量化、索引与精准检索。 | 📅 规划中 |
---
## 快速开始
本项目正在开发中...
---
## 项目子项目
#### pretor-viceroy:
**项目名称** : 总督
**github网址** :https://github.com/zhaoxi826/viceroy
**功能** :pretor的资源管理工具
**目前实现** :
- 对于skill的安装与解析
#### pretor-domain
**项目名称** : 域
**github网址** :(暂无)
**功能** : 轻量化沙箱
**目前实现** :
(规划中)
#### pretor-explorer
**项目名称** : 探索者
**github网址** :(暂无)
**功能** : 自动爬虫
**目前实现** :
(规划中)
#### pretor-pioneer
**项目名称** : 先驱者
**github网址** :(暂无)
**功能** : RAG检索增强生成
**目前实现** :
(规划中)

15
changelogs/CHANGELOG.md Normal file
View File

@ -0,0 +1,15 @@
# ChangeLog
---
## [v0.1.0Alpha] - 2026/4/28
### 更新:
#### 🚀 新增功能 (Added)
- **分布式 Actor 骨架**:基于 Ray 框架构建了多智能体协作底座,支持节点跨进程通讯与资源调度。
- **全局状态机 (GSM)**:实现了 `GlobalStateMachine` 模块,作为系统的“唯一真相来源”,管理所有 Individual、Skill 和 Provider 的注册信息。
- **核心认知节点**
- `SupervisoryNode`:负责任务拆解与分发。
- `ConsciousnessNode`:负责意图识别与语义理解。
- `ControlNode`:负责工作流状态监控与逻辑卡点。
- **异步工作流引擎**:实现 `WorkflowRunningEngine`,支持从数据库自动轮询并异步执行待办任务流。
- **自适应适配器**:集成 `Pydantic-AI`,并封装了统一的 `AbstractAgent` 协议,支持 OpenAI、Gemini 和 Claude 等多模型后端。
- **基础设施代理**:建立 `PostgresDatabase` Actor提供分布式的数据库连接池支持。

17
changelogs/ROADMAP.md Normal file
View File

@ -0,0 +1,17 @@
# Roadmap
---
## [v0.1.0Alpha] - 2026/4/28
### 未来展望:
#### 功能增加
- [ ] **完善系统插件**: 如 **RAG(检索增强生成)****沙箱** **联网搜索** 使agent拥有更多的能力适应多样化任务需求
- [ ] **增加MCP功能**: 增加MCP使得agent可以调用通用工具
- [ ] **完善special_individual** 使得`supervisory_node`等可以调用实现语言生成图像生成等功能
- [ ] **完善supervisory_node**: 实现`supervisory_node`对于工作流状态的访问,实现更方便的检测
- [ ] **对消息平台的对接**: 完善platform实现对于更多消息平台的对接钉钉微信等实现在社交软件对`supervisory_node`下达命令
#### 系统优化
- [ ] **优化workflow逻辑**: 通过**graph**等设计实现更优秀的工作流调度
- [ ] **优化GSM设计**: 对于 **GSMglobal_state_machine全局状态机** 进行重构,实现更高的并发
- [ ] **工具及skill优化**: 完善前端获取工具或skill的逻辑实现对于skill或者tool的配置改写以及详细信息获取
- [ ] **前端优化**: 完善前端设置逻辑(如:调节语言等),以及使前端更加灵活智能

View File

@ -8,5 +8,5 @@
- **意识节点**:负责复杂任务的处理;
- **生长节点**:负责获取资源并且将基础模型训练为特化模型;
- **特殊子个体**与外界交互的模型如embedding模型tts模型等
- **专家子个体**
- **专家子个体**携带有专业skill的agent对象
- **基础子个体**普通的agent对象

View File

@ -14,12 +14,15 @@ interface WorkerIndividual {
output_template?: string; // Change to string for the form state
bound_skill?: string; // Change to string for the form state
workspace?: string; // Change to string for the form state
tools?: string; // Form state for tools JSON array
}
export function WorkerIndividualSettings() {
const [providers, setProviders] = useState<Provider[]>([]);
const [workers, setWorkers] = useState<WorkerIndividual[]>([]);
const [systemNodes, setSystemNodes] = useState<any[]>([]);
const [availableSkills, setAvailableSkills] = useState<string[]>([]);
const [availableTools, setAvailableTools] = useState<string[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState('');
@ -32,14 +35,20 @@ export function WorkerIndividualSettings() {
const fetchData = async () => {
setLoading(true);
try {
const [provRes, workRes, sysRes] = await Promise.all([
const [provRes, workRes, sysRes, toolsRes, skillsRes] = await Promise.all([
apiClient.get('/api/v1/provider/list'),
apiClient.get('/api/v1/agent/worker'),
apiClient.get('/api/v1/agent')
apiClient.get('/api/v1/agent'),
apiClient.get('/api/v1/resource/tool'),
apiClient.get('/api/v1/resource/skill')
]);
setProviders(Object.values(provRes.data.provider_list || {}));
setWorkers(workRes.data.workers || []);
const allTools = toolsRes.data.tools ? Object.values(toolsRes.data.tools).flatMap(tGroup => Object.keys(tGroup as any)) : [];
setAvailableTools(allTools);
setAvailableSkills(Object.keys(skillsRes.data.skills || {}));
const sysNodesData = sysRes.data.system_nodes || [];
const defaultSysNodes = ['supervisory_node', 'consciousness_node', 'control_node'];
@ -54,6 +63,7 @@ export function WorkerIndividualSettings() {
agent_type: 'System Node',
provider_title: found && found.provider_title ? found.provider_title : defaultProvider,
model_id: found && found.model_id ? found.model_id : '',
tools: found && found.tools ? JSON.stringify(found.tools) : '[]',
is_system: true
};
});
@ -75,7 +85,8 @@ export function WorkerIndividualSettings() {
...worker,
output_template: typeof worker.output_template === 'string' ? worker.output_template : JSON.stringify(worker.output_template || {}),
bound_skill: typeof worker.bound_skill === 'string' ? worker.bound_skill : JSON.stringify(worker.bound_skill || {}),
workspace: typeof worker.workspace === 'string' ? worker.workspace : JSON.stringify(worker.workspace || [])
workspace: typeof worker.workspace === 'string' ? worker.workspace : JSON.stringify(worker.workspace || []),
tools: typeof worker.tools === 'string' ? worker.tools : JSON.stringify(worker.tools || [])
});
setIsNew(false);
setIsEditing(true);
@ -92,7 +103,8 @@ export function WorkerIndividualSettings() {
system_prompt: '',
output_template: '{}',
bound_skill: '{}',
workspace: '[]'
workspace: '[]',
tools: '[]'
});
setIsNew(true);
setIsEditing(true);
@ -118,7 +130,8 @@ export function WorkerIndividualSettings() {
const payload = {
individual_name: editData.agent_name,
provider_title: editData.provider_title,
model_id: editData.model_id
model_id: editData.model_id,
tools: JSON.parse(editData.tools || '[]')
};
await apiClient.post('/api/v1/agent', payload);
} else {
@ -126,7 +139,8 @@ export function WorkerIndividualSettings() {
...editData,
output_template: JSON.parse(editData.output_template || '{}'),
bound_skill: JSON.parse(editData.bound_skill || '{}'),
workspace: JSON.parse(editData.workspace || '[]')
workspace: JSON.parse(editData.workspace || '[]'),
tools: JSON.parse(editData.tools || '[]')
};
if (isNew) {
@ -332,13 +346,27 @@ export function WorkerIndividualSettings() {
/>
</div>
<div>
<label className="block text-sm font-medium text-slate-700 mb-1">Bound Skill (JSON)</label>
<textarea
value={editData.bound_skill || '{}'}
onChange={(e) => setEditData({...editData, bound_skill: e.target.value})}
rows={3}
className="w-full px-4 py-2 border border-slate-200 rounded-lg focus:ring-2 focus:ring-indigo-500 font-mono text-sm"
/>
<label className="block text-sm font-medium text-slate-700 mb-1">Bound Skill (Select)</label>
<select
value={(() => {
try {
const parsed = JSON.parse(editData.bound_skill || '{}');
return Object.keys(parsed)[0] || '';
} catch { return ''; }
})()}
onChange={(e) => {
const val = e.target.value;
const newSkill = val ? { [val]: [] } : {};
setEditData({...editData, bound_skill: JSON.stringify(newSkill)});
}}
className="w-full px-4 py-2 border border-slate-200 rounded-lg focus:ring-2 focus:ring-indigo-500"
disabled={editData.agent_type !== 'skill_individual'}
>
<option value="">No Skill Bound</option>
{availableSkills.map(skill => (
<option key={skill} value={skill}>{skill}</option>
))}
</select>
</div>
</div>
@ -354,6 +382,45 @@ export function WorkerIndividualSettings() {
</>
)}
<div>
<label className="block text-sm font-medium text-slate-700 mb-1">Tools (Select Multiple)</label>
<div className="flex flex-wrap gap-2 p-4 border border-slate-200 rounded-lg max-h-48 overflow-y-auto">
{availableTools.map(tool => {
let currentTools: string[] = [];
try {
currentTools = JSON.parse(editData.tools || '[]');
} catch { currentTools = []; }
const isSelected = currentTools.includes(tool);
return (
<button
key={tool}
type="button"
onClick={() => {
let updatedTools = [...currentTools];
if (isSelected) {
updatedTools = updatedTools.filter(t => t !== tool);
} else {
updatedTools.push(tool);
}
setEditData({...editData, tools: JSON.stringify(updatedTools)});
}}
className={`px-3 py-1.5 text-sm rounded-full transition-colors ${
isSelected
? 'bg-indigo-100 text-indigo-700 border border-indigo-200'
: 'bg-slate-50 text-slate-600 border border-slate-200 hover:bg-slate-100'
}`}
>
{tool}
</button>
);
})}
{availableTools.length === 0 && (
<span className="text-sm text-slate-500">No tools available</span>
)}
</div>
</div>
{modalMessage && (
<div className="p-3 bg-red-50 text-red-700 text-sm rounded-lg">
{modalMessage}

View File

@ -33,7 +33,8 @@ class AgentFactory:
output_type: ResponseModel,
system_prompt: str,
deps_type: DepsModel,
agent_name: str) -> Agent:
agent_name: str,
tools: list = None) -> Agent:
"""
create_agent方法将输入的provider对象实例化为一个pydantic-ai的agent对象
@ -58,5 +59,6 @@ class AgentFactory:
name=agent_name,
system_prompt=system_prompt,
output_type=output_type,
deps_type=deps_type)
deps_type=deps_type,
tools=tools)
return agent

View File

@ -30,10 +30,12 @@ class AgentRegister(BaseModel):
provider_title: str
model_id: str
individual_name: str
tools: Optional[List[str]] = None
class AgentLocalRegister(BaseModel):
path: str
individual_name: str
tools: Optional[List[str]] = None
@agent_router.get("")
async def get_system_nodes(_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))):
@ -56,20 +58,21 @@ async def load_agent(agent_register: Union[AgentRegister, AgentLocalRegister],
await postgres_database.upsert_system_node_config.remote(
agent_register.individual_name,
agent_register.provider_title,
agent_register.model_id
agent_register.model_id,
agent_register.tools
)
# Load agent into state machine
match agent_register.individual_name:
case "supervisory_node":
node = ray_actor_hook("supervisory_node").supervisory_node
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id)
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id, agent_register.tools)
case "consciousness_node":
node = ray_actor_hook("consciousness_node").consciousness_node
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id)
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id, agent_register.tools)
case "control_node":
node = ray_actor_hook("control_node").control_node
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id)
await node.create_agent.remote(global_state_machine,agent_register.provider_title,agent_register.model_id, agent_register.tools)
case _:
pass
except Exception as e:
@ -87,6 +90,7 @@ class WorkerIndividualCreate(BaseModel):
output_template: dict
bound_skill: Dict[str, List[str]]
workspace: List[str]
tools: Optional[List[str]] = None
class WorkerIndividualUpdate(BaseModel):
@ -99,6 +103,7 @@ class WorkerIndividualUpdate(BaseModel):
output_template: Optional[dict] = None
bound_skill: Optional[Dict[str, List[str]]] = None
workspace: Optional[List[str]] = None
tools: Optional[List[str]] = None
@agent_router.post("/worker")
@ -143,8 +148,29 @@ async def update_worker_individual(agent_id: str,
update_data = worker_data.model_dump(exclude_unset=True)
updated_worker = await postgres_database.update_worker_individual.remote( agent_id=agent_id, **update_data)
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
try:
await global_state_machine.remove_individual.remote(agent_id)
except Exception:
pass
return {"message": "success", "worker": updated_worker}
@agent_router.post("/worker/{agent_id}/reload")
async def reload_worker_individual(agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user)):
postgres_database = ray_actor_hook("postgres_database").postgres_database
worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id)
if not worker:
raise HTTPException(status_code=404, detail="Agent not found")
if worker.owner_id != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden: You do not own this agent")
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
await global_state_machine.remove_individual.remote(agent_id)
return {"message": "Worker will be reloaded on next use"}
@agent_router.delete("/worker/{agent_id}")
async def delete_worker_individual(agent_id: str,

View File

@ -22,7 +22,7 @@ class SystemNodeDatabase:
self.async_session_maker = async_session_maker
@database_exception
async def upsert_system_node_config(self, node_name: str, provider_title: str, model_id: str) -> SystemNodeConfig:
async def upsert_system_node_config(self, node_name: str, provider_title: str, model_id: str, tools: Optional[List[str]] = None) -> SystemNodeConfig:
async with self.async_session_maker() as session:
statement = select(SystemNodeConfig).where(SystemNodeConfig.node_name == node_name)
results = await session.execute(statement)
@ -30,8 +30,10 @@ class SystemNodeDatabase:
if config:
config.provider_title = provider_title
config.model_id = model_id
if tools is not None:
config.tools = tools
else:
config = SystemNodeConfig(node_name=node_name, provider_title=provider_title, model_id=model_id)
config = SystemNodeConfig(node_name=node_name, provider_title=provider_title, model_id=model_id, tools=tools)
session.add(config)
await session.commit()
await session.refresh(config)

View File

@ -106,9 +106,9 @@ class PostgresDatabase:
return await self._provider_database.update_provider(provider_id, **kwargs)
# System Node Database Methods
async def upsert_system_node_config(self, node_name: str, provider_title: str, model_id: str):
async def upsert_system_node_config(self, node_name: str, provider_title: str, model_id: str, tools: list[str] = None):
await self.ready_event.wait()
return await self._system_node_database.upsert_system_node_config(node_name, provider_title, model_id)
return await self._system_node_database.upsert_system_node_config(node_name, provider_title, model_id, tools)
async def get_all_system_node_configs(self):
await self.ready_event.wait()

View File

@ -34,4 +34,5 @@ class WorkerIndividual(SQLModel, table=True):
output_template: Optional[dict] = Field(sa_column=Column(JSON),description="输出模板标识")
bound_skill: Optional[str] = Field(sa_column=Column(JSON))
workspace: Optional[List[str]] = Field(sa_column=Column(JSON))
tools: Optional[List[str]] = Field(sa_column=Column(JSON), default=None)
owner_id: str

View File

@ -14,8 +14,12 @@
from sqlmodel import SQLModel, Field
from typing import List, Optional
from sqlalchemy import Column, JSON
class SystemNodeConfig(SQLModel, table=True):
__tablename__ = "system_node_config"
node_name: str = Field(primary_key=True)
provider_title: str
model_id: str
tools: Optional[List[str]] = Field(sa_column=Column(JSON), default=None)

View File

@ -22,8 +22,6 @@ from pretor.core.global_state_machine.global_state_machine import GlobalStateMac
from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory
from pretor.utils.get_tool import get_tool
@ray.remote
class ConsciousnessNode:
@ -33,7 +31,7 @@ class ConsciousnessNode:
self.agent: None | Agent = None
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str, tools_list: list[str] = None) -> None:
"""
create_agent方法将agent对象装配到ConsciousnessNode的属性内
该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的
@ -57,14 +55,18 @@ class ConsciousnessNode:
"请确保所有的思考和生成过程符合逻辑,严密且高质量。"
)
output_type = Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]
from pretor.utils.get_tool import load_tools_from_list
provider: Provider = await global_state_machine.get_provider.remote( provider_title)
agent_factory = AgentFactory()
callables = load_tools_from_list(tools_list)
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=ConsciousnessNodeDeps,
agent_name="consciousness_node")
agent_name="consciousness_node",
tools=callables)
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
@ -134,7 +136,6 @@ class ConsciousnessNode:
async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput]) -> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]:
try:
self.agent.retries = 3
tool = await get_tool("control_node")
if isinstance(payload, ForWorkflowEngineInput):
deps = ConsciousnessNodeDeps(
original_command=payload.original_command,
@ -144,8 +145,7 @@ class ConsciousnessNode:
self.logger.debug("ConsciousnessNode: 开始生成工作流 (原生重试开启)")
result = await self.agent.run(
"根据original_command制定严密的可执行workflow可以学习并参考workflow_template的设计理念",
deps=deps,
tools=tool)
deps=deps)
return result.output
elif isinstance(payload, ForWorkflowInput):
@ -155,8 +155,7 @@ class ConsciousnessNode:
)
self.logger.debug("ConsciousnessNode: 开始处理工作流节点任务 (原生重试开启)")
result = await self.agent.run(f"处理此工作流步骤信息:\n{payload.workflow_step.model_dump_json()}",
deps=deps,
tools=tool)
deps=deps)
return result.output
elif isinstance(payload, ForSupervisoryInput):
@ -166,8 +165,7 @@ class ConsciousnessNode:
)
self.logger.debug("ConsciousnessNode: 开始生成技术总结报告 (原生重试开启)")
result = await self.agent.run(f"基于以下工作流的执行记录,生成技术报告:\n{payload.workflow.model_dump_json()}",
deps=deps,
tools=tool)
deps=deps)
return result.output
except Exception as e:
self.logger.exception(f"ConsciousnessNode 模型生成最终失败: {str(e)}")

View File

@ -18,7 +18,6 @@ from pretor.core.global_state_machine.global_state_machine import GlobalStateMac
from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory
from pretor.core.individual.control_node.template import ForWorkflow, ForWorkflowInput, ControlNodeDeps
from pretor.utils.get_tool import get_tool
@ -30,7 +29,7 @@ class ControlNode:
self.agent: Agent | None = None
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str, tools_list: list[str] = None) -> None:
"""
create_agent方法将agent对象装配到Control的属性内
该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的
@ -54,14 +53,18 @@ class ControlNode:
"请注意:你的输出应当具体、实用,直接提供任务所要求的结果,不要做过多无关的寒暄。"
)
output_type = ForWorkflow
from pretor.utils.get_tool import load_tools_from_list
provider: Provider = await global_state_machine.get_provider.remote( provider_title)
agent_factory = AgentFactory()
callables = load_tools_from_list(tools_list)
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=ControlNodeDeps,
agent_name="control_node")
agent_name="control_node",
tools=callables)
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ControlNodeDeps]):
prompt = system_prompt + "\n\n"
@ -89,12 +92,9 @@ class ControlNode:
)
self.logger.debug(f"ControlNode: 开始执行工作流节点 [{payload.workflow_step.name}] (原生重试开启)")
tool = await get_tool("control_node")
result = await self.agent.run(
f"请根据提供的 workflow_step 上下文,执行此步骤并输出结果。\n详细指令或附加数据:{payload.workflow_step.model_dump_json()}",
deps=deps,
tools=tool
deps=deps
)
return result.output
except Exception as e:

View File

@ -22,7 +22,6 @@ from pretor.core.global_state_machine.model_provider import Provider
from pretor.core.individual.supervisory_node.template import ForConsciousnessNode, ForUser, SupervisoryNodeDeps, TerminationMessage
from pydantic_ai import RunContext, Agent
from pretor.utils.ray_hook import ray_actor_hook
from pretor.utils.get_tool import get_tool
@ray.remote
@ -33,7 +32,7 @@ class SupervisoryNode:
self.agent: None | Agent = None
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str, tools_list: list[str] = None) -> None:
"""
create_agent方法将agent对象装配到SupervisoryNode的属性内
该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的Agent实例
@ -57,14 +56,18 @@ class SupervisoryNode:
"请保持冷静、专业,并严格遵循上述路由规则。"
)
output_type = Union[ForConsciousnessNode, ForUser]
from pretor.utils.get_tool import load_tools_from_list
provider: Provider = await global_state_machine.get_provider.remote( provider_title)
agent_factory = AgentFactory()
callables = load_tools_from_list(tools_list)
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=SupervisoryNodeDeps,
agent_name="supervisory_node")
agent_name="supervisory_node",
tools=callables)
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[SupervisoryNodeDeps]):
@ -76,6 +79,13 @@ class SupervisoryNode:
f"- 当前时间 (Time): {ctx.deps.time}\n"
f"- 可用工作流模板 (Available Templates): {ctx.deps.available_templates}\n"
)
# 修改 system_prompt 变量
prompt += (
"\n\n注意:你必须调用且只能调用一个函数(工具)来输出结果。"
"如果你想直接回复用户,请调用 ForUser"
"如果你想移交给工作流,请调用 ForConsciousnessNode。"
"严禁返回纯文本,必须使用工具格式!"
)
if ctx.deps.error_history:
prompt += (
f"\n=== 错误重试指示 ===\n"
@ -172,10 +182,8 @@ class SupervisoryNode:
if isinstance(payload, TerminationMessage):
prompt_message = f"【工作流执行结束报告】\n请将以下技术报告转化为对用户的友好回复:\n{message}"
self.agent.retries = 3
tool = await get_tool("supervisory_node")
result = await self.agent.run(prompt_message,
deps=deps,
tools=tool)
deps=deps)
return result.output
except Exception as e:
self.logger.exception(f"SupervisoryNode 模型生成或解析最终失败: {str(e)}")

View File

@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import importlib.util
import os
import sys
from typing import Callable, Dict, List
import pathlib
from pretor.utils.ray_hook import ray_actor_hook
@ -20,50 +22,59 @@ from pretor.utils.ray_hook import ray_actor_hook
from pretor.utils.logger import get_logger
logger = get_logger('get_tool')
_tool_cache: Dict[str, Callable] = {}
_agent_tool_result_cache: Dict[str, List[Callable]] = {}
def _get_tool_func(tool_name: str) -> Callable | None:
func = _tool_cache.get(tool_name, None)
if func:
return func
tool_plugin_dir = pathlib.Path(__file__).parent.parent.parent / "plugin" / "tool_plugin" / tool_name
if not tool_plugin_dir.exists() or not tool_plugin_dir.is_dir():
app_root = "/app"
tool_plugin_dir = os.path.join(app_root, "pretor", "plugin", "tool_plugin", tool_name)
if not os.path.exists(tool_plugin_dir) or not os.path.isdir(tool_plugin_dir):
logger.error(f"Tool directory not found: {tool_plugin_dir}")
return None
module_name = f"pretor.plugin.tool_plugin.{tool_name}"
init_file = os.path.join(tool_plugin_dir, "__init__.py")
if not os.path.exists(init_file):
logger.error(f"Tool init file not found: {init_file}")
return None
try:
module = importlib.import_module(module_name)
func = getattr(module, tool_name)
module_name = f"pretor.plugin.tool_plugin.{tool_name}"
spec = importlib.util.spec_from_file_location(module_name, init_file)
if spec is None or spec.loader is None:
logger.error(f"Failed to create spec for {module_name}")
return None
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
func = getattr(module, tool_name, None)
if not callable(func):
logger.error(f"Tool function '{tool_name}' not found or not callable in {module_name}")
return None
_tool_cache[tool_name] = func
return func
except ModuleNotFoundError:
logger.error(f"Module {module_name} not found")
except Exception as e:
logger.error(f"Failed to load module {module_name}: {e}")
return None
def del_tool_cache(tool_name: str) -> None:
if tool_name in _tool_cache:
del _tool_cache[tool_name]
refresh_agent_tools()
def load_tools_from_list(tool_names: List[str] | None) -> List[Callable]:
if not tool_names:
return []
async def get_tool(agent_name: str) -> List[Callable]:
cached = _agent_tool_result_cache.get(agent_name)
if cached is not None:
return cached
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
_tool_list = await global_state_machine.get_tool_list.remote(agent_name)
tool_list = []
for tool_name in _tool_list.keys():
for tool_name in tool_names:
tool_func = _get_tool_func(tool_name)
if tool_func:
tool_list.append(tool_func)
else:
continue
_agent_tool_result_cache[agent_name] = tool_list
return tool_list
def refresh_agent_tools() -> None:
_agent_tool_result_cache.clear()
return tool_list

View File

@ -42,19 +42,25 @@ class BaseIndividual:
self.agent: Agent | None = None
async def _init_agent(self, agent_name: str, system_prompt: str):
from pretor.utils.get_tool import load_tools_from_list
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
provider_title = self.agent_config.get("provider_title", "openai") # default fallback
model_id = self.agent_config.get("model_id", "gpt-4o") # default fallback
tools_list = self.agent_config.get("tools", None)
provider: Provider = await global_state_machine.get_provider.remote( provider_title)
agent_factory = AgentFactory()
callables = load_tools_from_list(tools_list)
self.agent = agent_factory.create_agent(
provider=provider,
model_id=model_id,
output_type=WorkerIndividualResponse,
system_prompt=system_prompt,
deps_type=WorkerIndividualDeps,
agent_name=agent_name
agent_name=agent_name,
tools=callables
)
@self.agent.system_prompt

View File

@ -30,7 +30,8 @@ def test_create_agent_success_real():
name="myagent",
system_prompt="You are an AI",
output_type=str,
deps_type=dict
deps_type=dict,
tools=None
)
assert agent == mock_agent_cls.return_value

View File

@ -57,8 +57,10 @@ async def test_change_password_success(mock_session_maker, mock_dependencies):
mock_statement = MagicMock()
mock_select.return_value.where.return_value = mock_statement
from pretor.utils.access import Accessor
mock_user = MagicMock()
mock_user.hashed_password = "old_password"
mock_user.hashed_password = Accessor.hash_password("old_password")
mock_exec_result = MagicMock()
mock_exec_result.scalar_one_or_none.return_value = mock_user
@ -95,8 +97,9 @@ async def test_change_password_wrong_password(mock_session_maker, mock_dependenc
maker, session = mock_session_maker
db = AuthDatabase(maker)
from pretor.utils.access import Accessor
mock_user = MagicMock()
mock_user.hashed_password = "actual_password"
mock_user.hashed_password = Accessor.hash_password("actual_password")
mock_exec_result = MagicMock()
mock_exec_result.scalar_one_or_none.return_value = mock_user
session.execute = AsyncMock(return_value=mock_exec_result)