import asyncio import ray from pretor.core.pipeline.pipeline_router import PipelineRouter from pretor.core.workflow_manager.workflow import PretorWorkflow from loguru import logger @ray.remote class PretorPipeline: def __init__(self): self.pipeline = asyncio.Queue() self.running =True self.worker_group = [] async def running(self): for i in range(10): self.worker_group.append(await self.worker()) async def worker(self): while True: workflow = await self.pipeline.get() try: logger.info(f"{workflow.title}开始运行") for work_item in workflow.work_link: await PipelineRouter.router(workflow, work_item) except: logger.error(f"{workflow.title}遭受致命错误,已结束") continue async def submit_workflow(self, workflow: PretorWorkflow): await self.pipeline.put(workflow) logger.info(f"任务已进入受理队列,当前排队数: {self.pipeline.qsize()}")