"""``api/workflow.py`` 读侧拼装:运行期状态 merge 到静态 step。""" from __future__ import annotations from kilostar.api.workflow import _merge_runtime_status def test_merge_marks_pending_when_no_log(): """没有任何运行日志时,所有 step 默认 pending。""" work_link = [ {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, {"step": 2, "name": "s2", "node": "consciousness_node", "action": "b"}, ] merged = _merge_runtime_status(work_link, []) assert [s["status"] for s in merged] == ["pending", "pending"] # 静态字段保留 assert merged[0]["name"] == "s1" assert merged[1]["node"] == "consciousness_node" def test_merge_uses_latest_status_per_step(): """同一 step 多条日志时取最后一条(working → completed)。""" work_link = [ {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, ] workflow_log = [ {"0": ["2026-01-01T00:00:00", "working", "开始"]}, {"0": ["2026-01-01T00:00:05", "completed", "成功"]}, ] merged = _merge_runtime_status(work_link, workflow_log) assert merged[0]["status"] == "completed" def test_merge_mixed_statuses(): """多 step 各自取自己最新状态;无日志的保持 pending。""" work_link = [ {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, {"step": 2, "name": "s2", "node": "skill_individual", "action": "b"}, {"step": 3, "name": "s3", "node": "skill_individual", "action": "c"}, ] workflow_log = [ {"0": ["t", "completed", "ok"]}, {"1": ["t", "failed", "boom"]}, ] merged = _merge_runtime_status(work_link, workflow_log) assert [s["status"] for s in merged] == ["completed", "failed", "pending"] def test_merge_falls_back_to_position_index_without_step_field(): """step 没有 step 字段时按位置索引匹配日志。""" work_link = [ {"name": "s1", "node": "skill_individual", "action": "a"}, {"name": "s2", "node": "skill_individual", "action": "b"}, ] workflow_log = [ {"1": ["t", "completed", "ok"]}, ] merged = _merge_runtime_status(work_link, workflow_log) assert merged[0]["status"] == "pending" assert merged[1]["status"] == "completed" def test_merge_ignores_malformed_log_entries(): """脏日志(非 dict / payload 不是数组 / key 不是数字)不应炸。""" work_link = [ {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, ] workflow_log = [ "not-a-dict", {"not-an-int": ["t", "completed", "x"]}, {"0": "not-a-list"}, {"0": ["t", "working"]}, ] merged = _merge_runtime_status(work_link, workflow_log) assert merged[0]["status"] == "working" def test_merge_handles_empty_work_link(): assert _merge_runtime_status([], []) == [] assert _merge_runtime_status(None, None) == []