Files
KiloStar/tests/integration/test_smoke.py
T
zhaoxi a53ffebe0e feat: 新增工具插件、系统日志、workflow配置及前端优化
1. 新增工具插件(edit_file, python_executor, search_file, shell_executor, write_file)
2. 新增系统事件日志模块和API
3. 新增workflow配置文件和详情API
4. 前端增加SSE、错误边界、设置引导等组件
5. 优化认证加密、速率限制、配置加载等工具模块
6. 删除废弃的cluster和health API
7. 补充单元测试和集成测试

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 07:34:43 +00:00

157 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""组装层 / 端到端 smoke 测试。
这一层补 ``tests/unit`` 的盲区:单测全是 mock 出来的纯逻辑,抓不到
"import 错误 / 路由冲突 / 真实节点拓扑串联不上" 这类组装层 bug。
设计原则:
- **不依赖真 ray / 真 postgres**sandbox 里 ``ray.init`` 有 psutil PID 问题,
真 postgres 要 docker。这里只验证"组件能正确组装 + 真实拓扑能端到端跑通"
- app 装配:用真实的 ``KiloStarGateway`` 内部 ``app``(触发所有 router import +
注册),打 health 探针。
- workflow:用真实的 6 节点 graph 拓扑端到端跑,只 mock 最外层 IO(DB 写 / SSE /
执行器),不 mock 任何节点逻辑。
"""
from __future__ import annotations
from unittest.mock import AsyncMock
import pytest
from httpx import ASGITransport, AsyncClient
pytestmark = pytest.mark.integration
# ─── 组装层:整个 FastAPI app 能 import + 路由注册无冲突 ────────────────────
@pytest.mark.asyncio
async def test_app_imports_and_health_live_ok():
"""导入生产 ``app`` 不报错,且 /health/live 返回 alive。
这一步能抓到的真实 bug:任一 router 模块 import 失败、include_router
路由前缀撞车、middleware 装配异常——这些单测都看不到。
"""
from kilostar.api import app
transport = ASGITransport(app=app, raise_app_exceptions=False)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health/live")
assert resp.status_code == 200
assert resp.json() == {"status": "alive"}
@pytest.mark.asyncio
async def test_app_route_table_has_expected_endpoints():
"""关键路由都已注册(拓扑回归保护)。"""
from kilostar.api import app
paths = {getattr(r, "path", None) for r in app.router.routes}
assert "/health/live" in paths
assert "/health/ready" in paths
assert "/api/v1/workflow" in paths
# 阶段九/十 新增的 resume / graph 端点
assert "/api/v1/workflow/{trace_id}/resume" in paths
assert "/api/v1/workflow/{trace_id}/graph" in paths
# ─── 端到端:真实 6 节点 graph 拓扑跑通 ────────────────────────────────────
def _make_real_deps(skill_outputs, consciousness_outputs=None, replies=None):
"""构造 WorkflowDeps:只 mock 最外层 IO,节点逻辑全用真实实现。"""
from kilostar.core.work.workflow.workflow_engine import WorkflowDeps
skill_q = list(skill_outputs or [])
consc_q = list(consciousness_outputs or [])
reply_q = list(replies or [])
sink = {"pending": [], "skill": [], "consc": []}
async def _get_received(tid):
return reply_q.pop(0) if reply_q else ""
async def _run_skill(step, state):
sink["skill"].append(step.get("name"))
return skill_q.pop(0) if skill_q else ("(none)", True)
async def _run_consciousness(step, state):
sink["consc"].append(step.get("name"))
return consc_q.pop(0) if consc_q else ("(none)", True)
deps = WorkflowDeps(
upsert_workflow_context=AsyncMock(),
update_workflow_status=AsyncMock(),
put_pending=AsyncMock(side_effect=lambda t, m: sink["pending"].append(m)),
get_received=_get_received,
run_skill=_run_skill,
run_consciousness=_run_consciousness,
)
return deps, sink
@pytest.mark.asyncio
async def test_end_to_end_mixed_workflow_runs_to_completion():
"""混合 skill + consciousness + HITL 的多步 workflow 端到端跑通。
这是最贴近"真实一次 workflow"的 smoke:3 步分别走不同节点类型 + 一步
需要人工审批,全程用真实 Dispatch 派发逻辑。
"""
from kilostar.core.work.workflow.workflow_engine import run_workflow_graph
from kilostar.core.work.workflow.model import WorkflowStatus
deps, sink = _make_real_deps(
skill_outputs=[("s-ok", True), ("s2-ok", True)],
consciousness_outputs=[("c-ok", True)],
replies=["approve"],
)
workflow_data = {
"work_link": [
{"step": 1, "name": "research", "action": "do",
"node": "skill_individual", "agent_id": "a1"},
{"step": 2, "name": "plan", "action": "do",
"node": "consciousness_node"},
{"step": 3, "name": "review", "action": "do",
"node": "skill_individual", "agent_id": "a1",
"require_approval": True},
]
}
final = await run_workflow_graph(workflow_data, "smoke-mixed", deps=deps)
assert final == WorkflowStatus.COMPLETED.value
# 真实派发:skill 跑了 research + review(审批通过后),consciousness 跑了 plan
assert sink["skill"] == ["research", "review"]
assert sink["consc"] == ["plan"]
# 审批提示发过
assert any("人工审批" in m for m in sink["pending"])
@pytest.mark.asyncio
async def test_end_to_end_empty_workflow_completes_immediately():
"""空 work_link 直接 COMPLETED(不卡死、不报错)。"""
from kilostar.core.work.workflow.workflow_engine import run_workflow_graph
from kilostar.core.work.workflow.model import WorkflowStatus
deps, _ = _make_real_deps(skill_outputs=[])
final = await run_workflow_graph({"work_link": []}, "smoke-empty", deps=deps)
assert final == WorkflowStatus.COMPLETED.value
@pytest.mark.asyncio
async def test_end_to_end_failed_step_aborts_workflow():
"""某步执行失败 → 工作流终态 FAILED(真实 logic gate 行为)。"""
from kilostar.core.work.workflow.workflow_engine import run_workflow_graph
from kilostar.core.work.workflow.model import WorkflowStatus
deps, sink = _make_real_deps(skill_outputs=[("boom", False)])
workflow_data = {
"work_link": [
{"step": 1, "name": "will-fail", "action": "do",
"node": "skill_individual", "agent_id": "a1"},
]
}
final = await run_workflow_graph(workflow_data, "smoke-fail", deps=deps)
assert final == WorkflowStatus.FAILED.value
assert sink["skill"] == ["will-fail"]