99520c69d7
1.新增后端测试 2.增加了后端的加密 3.增加了i18n(国际化)
89 lines
2.4 KiB
Python
89 lines
2.4 KiB
Python
"""``KiloStarWorkflow`` 的 ``validate_workflow_integrity`` 校验器覆盖。"""
|
|
|
|
import pytest
|
|
from pydantic import ValidationError
|
|
|
|
from kilostar.core.work.workflow.workflow import KiloStarWorkflow, WorkflowStep
|
|
from kilostar.core.work.workflow.model import LogicGate, WorkflowMetadata
|
|
|
|
|
|
def _step(
|
|
n: int, *, fail_target: str | None = None, name: str | None = None
|
|
) -> WorkflowStep:
|
|
return WorkflowStep(
|
|
step=n,
|
|
name=name or f"step_{n}",
|
|
action="noop",
|
|
logic_gate=LogicGate(if_fail=fail_target) if fail_target else None,
|
|
)
|
|
|
|
|
|
def test_valid_workflow_passes():
|
|
wf = KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[
|
|
_step(1),
|
|
_step(2, fail_target="jump_to_step_1"),
|
|
_step(3),
|
|
],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert wf.title == "t"
|
|
assert len(wf.work_link) == 3
|
|
|
|
|
|
def test_non_continuous_steps_rejected():
|
|
with pytest.raises(ValidationError) as exc:
|
|
KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[_step(1), _step(3)],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert "工作链步数不连续" in str(exc.value)
|
|
|
|
|
|
def test_jump_target_out_of_range_rejected():
|
|
with pytest.raises(ValidationError) as exc:
|
|
KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[
|
|
_step(1),
|
|
_step(2, fail_target="jump_to_step_99"),
|
|
],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert "越界" in str(exc.value)
|
|
|
|
|
|
def test_jump_target_below_one_rejected():
|
|
with pytest.raises(ValidationError) as exc:
|
|
KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[
|
|
_step(1, fail_target="jump_to_step_0"),
|
|
],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert "越界" in str(exc.value)
|
|
|
|
|
|
def test_logic_gate_without_jump_is_ignored():
|
|
"""没有 ``jump_to_step_`` 前缀的 if_fail 不进入越界判断。"""
|
|
wf = KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[
|
|
_step(1, fail_target="abort"),
|
|
],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert wf.work_link[0].logic_gate.if_fail == "abort"
|
|
|
|
|
|
def test_default_trace_id_is_set():
|
|
wf = KiloStarWorkflow(
|
|
title="t",
|
|
work_link=[_step(1)],
|
|
workflow_metadata=WorkflowMetadata(),
|
|
)
|
|
assert wf.trace_id and isinstance(wf.trace_id, str)
|