feat: 修复了一些bug

This commit is contained in:
朝夕 2026-04-28 20:03:24 +08:00
parent 0a7197295e
commit 689b7dc7f1
1 changed files with 8 additions and 7 deletions

View File

@ -25,7 +25,7 @@ from pretor.worker_individual.special_individual import SpecialIndividual
from pretor.utils.logger import get_logger from pretor.utils.logger import get_logger
logger = get_logger('worker_cluster')
@ray.remote @ray.remote
class WorkerCluster: class WorkerCluster:
@ -42,12 +42,13 @@ class WorkerCluster:
self.results_futures = {} self.results_futures = {}
self.runners = [] self.runners = []
self.num_runners = num_runners self.num_runners = num_runners
self.logger = get_logger('worker_cluster')
async def start(self): async def start(self):
if self.task_queue is None: if self.task_queue is None:
self.task_queue = Queue() self.task_queue = Queue()
self.runners = [asyncio.create_task(self._runner(i)) for i in range(self.num_runners)] self.runners = [asyncio.create_task(self._runner(i)) for i in range(self.num_runners)]
logger.info(f"WorkerCluster 已启动 {self.num_runners} 个 runner 协程。") self.logger.info(f"WorkerCluster 已启动 {self.num_runners} 个 runner 协程。")
async def _recruit_worker(self, agent_id: str) -> BaseIndividual: async def _recruit_worker(self, agent_id: str) -> BaseIndividual:
"""内部方法:招聘/唤醒一个具体的 Agent 对象""" """内部方法:招聘/唤醒一个具体的 Agent 对象"""
@ -72,7 +73,7 @@ class WorkerCluster:
self._active_workers[agent_id] = worker self._active_workers[agent_id] = worker
if len(self._active_workers) > self.max_capacity: if len(self._active_workers) > self.max_capacity:
evicted_id, _ = self._active_workers.popitem(last=False) evicted_id, _ = self._active_workers.popitem(last=False)
logger.info(f"[WorkerCluster] 内存池满,休眠老化 Agent: {evicted_id}") self.logger.info(f"[WorkerCluster] 内存池满,休眠老化 Agent: {evicted_id}")
return worker return worker
@ -87,7 +88,7 @@ class WorkerCluster:
agent_id = task.get("agent_id") agent_id = task.get("agent_id")
task_event = task.get("task_event") task_event = task.get("task_event")
logger.debug(f"[WorkerCluster Runner {runner_id}] 开始处理任务 {task_id} 给 Agent {agent_id}") self.logger.debug(f"[WorkerCluster Runner {runner_id}] 开始处理任务 {task_id} 给 Agent {agent_id}")
start_time = time.time() start_time = time.time()
try: try:
@ -102,7 +103,7 @@ class WorkerCluster:
"metrics": {"cost_time_sec": round(cost_time, 2)} "metrics": {"cost_time_sec": round(cost_time, 2)}
} }
except Exception as e: except Exception as e:
logger.exception(f"[WorkerCluster Runner {runner_id}] 执行任务 {task_id} 时发生错误: {e}") self.logger.exception(f"[WorkerCluster Runner {runner_id}] 执行任务 {task_id} 时发生错误: {e}")
response = { response = {
"success": False, "success": False,
"agent_id": agent_id, "agent_id": agent_id,
@ -114,7 +115,7 @@ class WorkerCluster:
future.set_result(response) future.set_result(response)
except Exception as e: except Exception as e:
logger.error(f"[WorkerCluster Runner {runner_id}] 循环发生异常: {e}") self.logger.error(f"[WorkerCluster Runner {runner_id}] 循环发生异常: {e}")
await asyncio.sleep(1) await asyncio.sleep(1)
async def submit_task(self, task_id: str, agent_id: str, task_event: dict): async def submit_task(self, task_id: str, agent_id: str, task_event: dict):
@ -130,7 +131,7 @@ class WorkerCluster:
"task_event": task_event "task_event": task_event
} }
await self.task_queue.put_async(task) await self.task_queue.put_async(task)
logger.debug(f"[WorkerCluster] 任务 {task_id} 已加入队列。") self.logger.debug(f"[WorkerCluster] 任务 {task_id} 已加入队列。")
try: try:
result = await future result = await future