30 lines
1.1 KiB
Python
30 lines
1.1 KiB
Python
import asyncio
|
|
import ray
|
|
from pretor.core.pipeline.pipeline_router import PipelineRouter
|
|
from pretor.core.workflow_manager.workflow import PretorWorkflow
|
|
from loguru import logger
|
|
|
|
@ray.remote
|
|
class PretorPipeline:
|
|
def __init__(self):
|
|
self.pipeline = asyncio.Queue()
|
|
self.running =True
|
|
self.worker_group = []
|
|
async def running(self):
|
|
for i in range(10):
|
|
self.worker_group.append(await self.worker())
|
|
|
|
async def worker(self):
|
|
while True:
|
|
workflow = await self.pipeline.get()
|
|
try:
|
|
logger.info(f"{workflow.title}开始运行")
|
|
for work_item in workflow.work_link:
|
|
await PipelineRouter.router(workflow, work_item)
|
|
except:
|
|
logger.error(f"{workflow.title}遭受致命错误,已结束")
|
|
continue
|
|
|
|
async def submit_workflow(self, workflow: PretorWorkflow):
|
|
await self.pipeline.put(workflow)
|
|
logger.info(f"任务已进入受理队列,当前排队数: {self.pipeline.qsize()}") |