Files
zhaoxi a53ffebe0e feat: 新增工具插件、系统日志、workflow配置及前端优化
1. 新增工具插件(edit_file, python_executor, search_file, shell_executor, write_file)
2. 新增系统事件日志模块和API
3. 新增workflow配置文件和详情API
4. 前端增加SSE、错误边界、设置引导等组件
5. 优化认证加密、速率限制、配置加载等工具模块
6. 删除废弃的cluster和health API
7. 补充单元测试和集成测试

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 07:34:43 +00:00

82 lines
2.6 KiB
Python

"""``api/system.py`` 健康探针端点。"""
from __future__ import annotations
import types
from unittest.mock import AsyncMock
import pytest
from fastapi import FastAPI
from httpx import AsyncClient, ASGITransport
from kilostar.api.system import system_router
@pytest.fixture
def health_app() -> FastAPI:
app = FastAPI()
app.include_router(system_router)
return app
@pytest.mark.asyncio
async def test_liveness_returns_alive(health_app):
transport = ASGITransport(app=health_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health/live")
assert resp.status_code == 200
assert resp.json() == {"status": "alive"}
@pytest.mark.asyncio
async def test_readiness_all_ok(health_app, fake_actors):
pg = types.SimpleNamespace()
pg.ping = types.SimpleNamespace(remote=AsyncMock(return_value=True))
fake_actors.register("postgres_database", pg)
gsm = types.SimpleNamespace()
gsm.get_skill_list = types.SimpleNamespace(remote=AsyncMock(return_value=[]))
fake_actors.register("global_state_machine", gsm)
transport = ASGITransport(app=health_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health/ready")
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ready"
assert body["checks"] == {"postgres": True, "global_state_machine": True}
@pytest.mark.asyncio
async def test_readiness_postgres_down(health_app, fake_actors):
pg = types.SimpleNamespace()
pg.ping = types.SimpleNamespace(
remote=AsyncMock(side_effect=RuntimeError("db down"))
)
fake_actors.register("postgres_database", pg)
gsm = types.SimpleNamespace()
gsm.get_skill_list = types.SimpleNamespace(remote=AsyncMock(return_value=[]))
fake_actors.register("global_state_machine", gsm)
transport = ASGITransport(app=health_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health/ready")
assert resp.status_code == 503
body = resp.json()
assert body["status"] == "not_ready"
assert body["checks"]["postgres"] is False
assert body["checks"]["global_state_machine"] is True
@pytest.mark.asyncio
async def test_readiness_actor_not_registered(health_app, fake_actors):
transport = ASGITransport(app=health_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health/ready")
assert resp.status_code == 503
assert resp.json()["status"] == "not_ready"