diff --git a/pretor/worker_individual/worker_cluster.py b/pretor/worker_individual/worker_cluster.py index af60fb7..813dbd8 100644 --- a/pretor/worker_individual/worker_cluster.py +++ b/pretor/worker_individual/worker_cluster.py @@ -25,7 +25,7 @@ from pretor.worker_individual.special_individual import SpecialIndividual from pretor.utils.logger import get_logger -logger = get_logger('worker_cluster') + @ray.remote class WorkerCluster: @@ -42,12 +42,13 @@ class WorkerCluster: self.results_futures = {} self.runners = [] self.num_runners = num_runners + self.logger = get_logger('worker_cluster') async def start(self): if self.task_queue is None: self.task_queue = Queue() self.runners = [asyncio.create_task(self._runner(i)) for i in range(self.num_runners)] - logger.info(f"WorkerCluster 已启动 {self.num_runners} 个 runner 协程。") + self.logger.info(f"WorkerCluster 已启动 {self.num_runners} 个 runner 协程。") async def _recruit_worker(self, agent_id: str) -> BaseIndividual: """内部方法:招聘/唤醒一个具体的 Agent 对象""" @@ -72,7 +73,7 @@ class WorkerCluster: self._active_workers[agent_id] = worker if len(self._active_workers) > self.max_capacity: evicted_id, _ = self._active_workers.popitem(last=False) - logger.info(f"[WorkerCluster] 内存池满,休眠老化 Agent: {evicted_id}") + self.logger.info(f"[WorkerCluster] 内存池满,休眠老化 Agent: {evicted_id}") return worker @@ -87,7 +88,7 @@ class WorkerCluster: agent_id = task.get("agent_id") task_event = task.get("task_event") - logger.debug(f"[WorkerCluster Runner {runner_id}] 开始处理任务 {task_id} 给 Agent {agent_id}") + self.logger.debug(f"[WorkerCluster Runner {runner_id}] 开始处理任务 {task_id} 给 Agent {agent_id}") start_time = time.time() try: @@ -102,7 +103,7 @@ class WorkerCluster: "metrics": {"cost_time_sec": round(cost_time, 2)} } except Exception as e: - logger.exception(f"[WorkerCluster Runner {runner_id}] 执行任务 {task_id} 时发生错误: {e}") + self.logger.exception(f"[WorkerCluster Runner {runner_id}] 执行任务 {task_id} 时发生错误: {e}") response = { "success": False, "agent_id": agent_id, @@ -114,7 +115,7 @@ class WorkerCluster: future.set_result(response) except Exception as e: - logger.error(f"[WorkerCluster Runner {runner_id}] 循环发生异常: {e}") + self.logger.error(f"[WorkerCluster Runner {runner_id}] 循环发生异常: {e}") await asyncio.sleep(1) async def submit_task(self, task_id: str, agent_id: str, task_event: dict): @@ -130,7 +131,7 @@ class WorkerCluster: "task_event": task_event } await self.task_queue.put_async(task) - logger.debug(f"[WorkerCluster] 任务 {task_id} 已加入队列。") + self.logger.debug(f"[WorkerCluster] 任务 {task_id} 已加入队列。") try: result = await future