Pretor/pretor/worker_individual/worker_cluster.py

107 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ray
@ray.remote
class WorkerCluster:
"""
工作集群 Actor管理和调度所有的 worker_individual
设计理念:按需加载,内存 LRU 淘汰,避免 Actor 爆炸
"""
def __init__(self, db_actor, max_capacity: int = 200):
self.db = db_actor
self.max_capacity = max_capacity
# 核心LRU 活跃 Agent 缓存池
self._active_workers: OrderedDict[str, BaseWorkerIndividual] = OrderedDict()
self.status = "running"
async def _recruit_worker(self, agent_id: str) -> BaseWorkerIndividual:
"""内部方法:招聘/唤醒一个具体的 Agent 对象"""
# 1. 尝试从缓存直接命中
if agent_id in self._active_workers:
self._active_workers.move_to_end(agent_id) # 标记为最近使用
return self._active_workers[agent_id]
# 2. 缓存未命中,去数据库拉取 Agent 档案配置
# agent_config = await self.db.get_agent_config.remote(agent_id)
# 模拟从数据库取出的配置数据
agent_config = {
"agent_id": agent_id,
"type": "skill", # 取决于数据库里的设定ordinary, skill, special
"prompt": "你是一个资深架构师..."
}
if not agent_config:
raise ValueError(f"无法唤醒 Agent {agent_id}:数据库中不存在该档案")
# 3. 工厂模式:根据类型动态装配不同量级的 Individual
worker_type = agent_config.get("type", "ordinary")
if worker_type == "skill":
worker = SkillIndividual(agent_config)
elif worker_type == "special":
worker = SpecialIndividual(agent_config)
else:
worker = OrdinaryIndividual(agent_config)
# 4. 放入内存池,如果爆满则淘汰最老的那个
self._active_workers[agent_id] = worker
if len(self._active_workers) > self.max_capacity:
evicted_id, _ = self._active_workers.popitem(last=False)
print(f"[WorkerCluster] 内存池满,休眠老化 Agent: {evicted_id}")
return worker
async def execute_task(self, agent_id: str, task_event: dict) -> dict:
"""
对外暴露的唯一干活接口。
task_event 应该包含所有的上下文Context、历史记忆、本次指令
"""
try:
# 1. 获取工作实体(秒级热启动或毫秒级缓存命中)
worker = await self._recruit_worker(agent_id)
# 2. 注入上下文并执行
# 这里的 run 方法内部不保存状态,所有记忆都从 task_event 传入
start_time = time.time()
result = await worker.run(task_event)
cost_time = time.time() - start_time
# 3. 封装标准回包
return {
"success": True,
"agent_id": agent_id,
"data": result,
"metrics": {"cost_time_sec": round(cost_time, 2)}
}
except Exception as e:
# 异常隔离:一个 Agent 报错,绝对不能把整个 Cluster 搞崩
return {
"success": False,
"agent_id": agent_id,
"error": str(e)
}
def get_cluster_metrics(self):
"""监控探针:用于查看当前集群负载"""
return {
"active_worker_count": len(self._active_workers),
"max_capacity": self.max_capacity,
"cached_agent_ids": list(self._active_workers.keys())
}