Pretor/pretor/core/pipeline/pipeline.py

30 lines
1.1 KiB
Python

import asyncio
import ray
from pretor.core.pipeline.pipeline_router import PipelineRouter
from pretor.core.workflow_manager.workflow import PretorWorkflow
from loguru import logger
@ray.remote
class PretorPipeline:
def __init__(self):
self.pipeline = asyncio.Queue()
self.running =True
self.worker_group = []
async def running(self):
for i in range(10):
self.worker_group.append(await self.worker())
async def worker(self):
while True:
workflow = await self.pipeline.get()
try:
logger.info(f"{workflow.title}开始运行")
for work_item in workflow.work_link:
await PipelineRouter.router(workflow, work_item)
except:
logger.error(f"{workflow.title}遭受致命错误,已结束")
continue
async def submit_workflow(self, workflow: PretorWorkflow):
await self.pipeline.put(workflow)
logger.info(f"任务已进入受理队列,当前排队数: {self.pipeline.qsize()}")