"""组装层 / 端到端 smoke 测试。 这一层补 ``tests/unit`` 的盲区:单测全是 mock 出来的纯逻辑,抓不到 "import 错误 / 路由冲突 / 真实节点拓扑串联不上" 这类组装层 bug。 设计原则: - **不依赖真 ray / 真 postgres**:sandbox 里 ``ray.init`` 有 psutil PID 问题, 真 postgres 要 docker。这里只验证"组件能正确组装 + 真实拓扑能端到端跑通"。 - app 装配:用真实的 ``KiloStarGateway`` 内部 ``app``(触发所有 router import + 注册),打 health 探针。 - workflow:用真实的 6 节点 graph 拓扑端到端跑,只 mock 最外层 IO(DB 写 / SSE / 执行器),不 mock 任何节点逻辑。 """ from __future__ import annotations from unittest.mock import AsyncMock import pytest from httpx import ASGITransport, AsyncClient pytestmark = pytest.mark.integration # ─── 组装层:整个 FastAPI app 能 import + 路由注册无冲突 ──────────────────── @pytest.mark.asyncio async def test_app_imports_and_health_live_ok(): """导入生产 ``app`` 不报错,且 /health/live 返回 alive。 这一步能抓到的真实 bug:任一 router 模块 import 失败、include_router 路由前缀撞车、middleware 装配异常——这些单测都看不到。 """ from kilostar.api import app transport = ASGITransport(app=app, raise_app_exceptions=False) async with AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.get("/health/live") assert resp.status_code == 200 assert resp.json() == {"status": "alive"} @pytest.mark.asyncio async def test_app_route_table_has_expected_endpoints(): """关键路由都已注册(拓扑回归保护)。""" from kilostar.api import app paths = {getattr(r, "path", None) for r in app.router.routes} assert "/health/live" in paths assert "/health/ready" in paths assert "/api/v1/workflow" in paths # 阶段九/十 新增的 resume / graph 端点 assert "/api/v1/workflow/{trace_id}/resume" in paths assert "/api/v1/workflow/{trace_id}/graph" in paths # ─── 端到端:真实 6 节点 graph 拓扑跑通 ──────────────────────────────────── def _make_real_deps(skill_outputs, consciousness_outputs=None, replies=None): """构造 WorkflowDeps:只 mock 最外层 IO,节点逻辑全用真实实现。""" from kilostar.core.work.workflow.workflow_engine import WorkflowDeps skill_q = list(skill_outputs or []) consc_q = list(consciousness_outputs or []) reply_q = list(replies or []) sink = {"pending": [], "skill": [], "consc": []} async def _get_received(tid): return reply_q.pop(0) if reply_q else "" async def _run_skill(step, state): sink["skill"].append(step.get("name")) return skill_q.pop(0) if skill_q else ("(none)", True) async def _run_consciousness(step, state): sink["consc"].append(step.get("name")) return consc_q.pop(0) if consc_q else ("(none)", True) deps = WorkflowDeps( upsert_workflow_context=AsyncMock(), update_workflow_status=AsyncMock(), put_pending=AsyncMock(side_effect=lambda t, m: sink["pending"].append(m)), get_received=_get_received, run_skill=_run_skill, run_consciousness=_run_consciousness, ) return deps, sink @pytest.mark.asyncio async def test_end_to_end_mixed_workflow_runs_to_completion(): """混合 skill + consciousness + HITL 的多步 workflow 端到端跑通。 这是最贴近"真实一次 workflow"的 smoke:3 步分别走不同节点类型 + 一步 需要人工审批,全程用真实 Dispatch 派发逻辑。 """ from kilostar.core.work.workflow.workflow_engine import run_workflow_graph from kilostar.core.work.workflow.model import WorkflowStatus deps, sink = _make_real_deps( skill_outputs=[("s-ok", True), ("s2-ok", True)], consciousness_outputs=[("c-ok", True)], replies=["approve"], ) workflow_data = { "work_link": [ {"step": 1, "name": "research", "action": "do", "node": "skill_individual", "agent_id": "a1"}, {"step": 2, "name": "plan", "action": "do", "node": "consciousness_node"}, {"step": 3, "name": "review", "action": "do", "node": "skill_individual", "agent_id": "a1", "require_approval": True}, ] } final = await run_workflow_graph(workflow_data, "smoke-mixed", deps=deps) assert final == WorkflowStatus.COMPLETED.value # 真实派发:skill 跑了 research + review(审批通过后),consciousness 跑了 plan assert sink["skill"] == ["research", "review"] assert sink["consc"] == ["plan"] # 审批提示发过 assert any("人工审批" in m for m in sink["pending"]) @pytest.mark.asyncio async def test_end_to_end_empty_workflow_completes_immediately(): """空 work_link 直接 COMPLETED(不卡死、不报错)。""" from kilostar.core.work.workflow.workflow_engine import run_workflow_graph from kilostar.core.work.workflow.model import WorkflowStatus deps, _ = _make_real_deps(skill_outputs=[]) final = await run_workflow_graph({"work_link": []}, "smoke-empty", deps=deps) assert final == WorkflowStatus.COMPLETED.value @pytest.mark.asyncio async def test_end_to_end_failed_step_aborts_workflow(): """某步执行失败 → 工作流终态 FAILED(真实 logic gate 行为)。""" from kilostar.core.work.workflow.workflow_engine import run_workflow_graph from kilostar.core.work.workflow.model import WorkflowStatus deps, sink = _make_real_deps(skill_outputs=[("boom", False)]) workflow_data = { "work_link": [ {"step": 1, "name": "will-fail", "action": "do", "node": "skill_individual", "agent_id": "a1"}, ] } final = await run_workflow_graph(workflow_data, "smoke-fail", deps=deps) assert final == WorkflowStatus.FAILED.value assert sink["skill"] == ["will-fail"]