from pathlib import Path import json from ulid import ULID class Workflow: def __init__(self): self.workflow_id : str = "" self.workflow_title: str = "" self.work_link: list = [] self.workflow_description: str = "" self.workflow_command: str = "" self.workflow_output: dict = {} self.workflow_metadata : dict = {} self.work_demand: dict = {} self.status: str = "" def create_workflow(self, trace_id : str, workflow_title: str, workflow_command: str, workflow_name : str) -> None: current_dir = Path(__file__).parent workflow_file = current_dir / "workflow_list" / "{}_workflow.json".format(workflow_name) with workflow_file.open("r", encoding="utf-8") as json_file: workflow_json = json.load(json_file) self.workflow_id = "{}_".format(workflow_name) + trace_id self.workflow_title = workflow_title self.work_link = workflow_json.get("work_link") self.workflow_description = workflow_json.get("workflow_description") self.workflow_command = workflow_command self.workflow_metadata = workflow_json.get("metadata") self.status = "step1" def get_workflow(self) -> str: workflow = { "workflow_id":self.workflow_id, "workflow_title":self.workflow_title, "work_link":self.work_link, "workflow_command":self.workflow_command, "workflow_output":self.workflow_output, "workflow_metadata":self.workflow_metadata, "work_demand":self.work_demand, "status":self.status, } workflow = json.dumps(workflow) return workflow def set_output(self, step, output) -> None: self.workflow_output["step:{}".format(step)] = output def set_work_link(self, work_link: str) -> None: work_link = json.loads(work_link) self.work_link = work_link