From a53ffebe0efafb3cee77839252388b34fa0945f7 Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Wed, 3 Jun 2026 07:34:43 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E3=80=81=E7=B3=BB=E7=BB=9F=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E3=80=81workflow=E9=85=8D=E7=BD=AE=E5=8F=8A=E5=89=8D=E7=AB=AF?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增工具插件(edit_file, python_executor, search_file, shell_executor, write_file) 2. 新增系统事件日志模块和API 3. 新增workflow配置文件和详情API 4. 前端增加SSE、错误边界、设置引导等组件 5. 优化认证加密、速率限制、配置加载等工具模块 6. 删除废弃的cluster和health API 7. 补充单元测试和集成测试 Co-Authored-By: Claude Opus 4.7 (1M context) --- .dockerignore | 4 +- README-EN.md | 177 ++++++++++++++ README.md | 231 ++++++++++++------ .../2026_06_02_0000-0002_graph_and_logs.py | 49 ++++ config/config.yml | 4 +- config/workflow.yaml | 2 + frontend/src/App.tsx | 15 +- frontend/src/api/sse.ts | 130 ++++++++++ frontend/src/components/Agent/AgentLayout.tsx | 6 + .../src/components/Agent/SystemLogsView.tsx | 144 +++++++++++ .../Agent/WorkflowConfigSettings.tsx | 152 ++++++++++++ frontend/src/components/Chat/RightPanel.tsx | 116 ++++++++- .../src/components/Chat/WorkflowDiagram.tsx | 17 +- .../src/components/Chat/WorkflowListView.tsx | 127 ++++++---- frontend/src/components/ErrorBoundary.tsx | 52 ++++ .../src/components/Layout/SetupGuideModal.tsx | 109 +++++++++ frontend/src/i18n/locales/en.json | 39 ++- frontend/src/i18n/locales/zh.json | 41 +++- frontend/src/types/index.ts | 1 + kilostar/api/__init__.py | 14 +- kilostar/api/agent.py | 4 +- kilostar/api/auth.py | 35 ++- kilostar/api/cluster.py | 19 -- kilostar/api/health.py | 54 ---- kilostar/api/platform/onebot.py | 14 +- kilostar/api/system.py | 108 ++++++++ kilostar/api/workflow.py | 92 ++++++- .../global_state_machine.py | 4 + .../core/global_state_machine/tool_manager.py | 32 +++ .../core/postgres_database/model/__init__.py | 2 + .../model/system_event_log.py | 35 +++ .../module/system_event_log.py | 72 ++++++ kilostar/core/postgres_database/postgres.py | 38 ++- .../core/work/workflow/workflow_engine.py | 58 ++++- .../plugin/tool_plugin/approval/approval.py | 4 +- kilostar/plugin/tool_plugin/base_tool.py | 2 + .../plugin/tool_plugin/edit_file/__init__.py | 54 ++++ .../tool_plugin/file_reader/file_reader.py | 2 +- .../tool_plugin/python_executor/__init__.py | 67 +++++ .../tool_plugin/search_file/__init__.py | 58 +++++ .../tool_plugin/shell_executor/__init__.py | 54 ++++ .../plugin/tool_plugin/write_file/__init__.py | 42 ++++ kilostar/utils/access.py | 67 ++++- kilostar/utils/config_loader.py | 60 +++++ kilostar/utils/mcp_helper.py | 13 + kilostar/utils/rate_limit.py | 45 ++++ kilostar/utils/ray_hook.py | 48 +++- main.py | 25 +- pyproject.toml | 3 + tests/integration/test_smoke.py | 156 ++++++++++++ tests/unit/test_api_health.py | 6 +- tests/unit/test_api_workflow_auth.py | 113 +++++++++ tests/unit/test_api_workflow_detail.py | 80 ++++++ tests/unit/test_config_loader.py | 78 ++++++ tests/unit/test_plugin_metadata.py | 6 +- tests/unit/test_utils_ray_hook.py | 61 +++++ tests/unit/test_workflow_graph.py | 34 +++ 57 files changed, 2804 insertions(+), 271 deletions(-) create mode 100644 README-EN.md create mode 100644 alembic/versions/2026_06_02_0000-0002_graph_and_logs.py create mode 100644 config/workflow.yaml create mode 100644 frontend/src/api/sse.ts create mode 100644 frontend/src/components/Agent/SystemLogsView.tsx create mode 100644 frontend/src/components/Agent/WorkflowConfigSettings.tsx create mode 100644 frontend/src/components/ErrorBoundary.tsx create mode 100644 frontend/src/components/Layout/SetupGuideModal.tsx delete mode 100644 kilostar/api/cluster.py delete mode 100644 kilostar/api/health.py create mode 100644 kilostar/api/system.py create mode 100644 kilostar/core/postgres_database/model/system_event_log.py create mode 100644 kilostar/core/postgres_database/module/system_event_log.py create mode 100644 kilostar/plugin/tool_plugin/edit_file/__init__.py create mode 100644 kilostar/plugin/tool_plugin/python_executor/__init__.py create mode 100644 kilostar/plugin/tool_plugin/search_file/__init__.py create mode 100644 kilostar/plugin/tool_plugin/shell_executor/__init__.py create mode 100644 kilostar/plugin/tool_plugin/write_file/__init__.py create mode 100644 kilostar/utils/config_loader.py create mode 100644 kilostar/utils/rate_limit.py create mode 100644 tests/integration/test_smoke.py create mode 100644 tests/unit/test_api_workflow_auth.py create mode 100644 tests/unit/test_api_workflow_detail.py create mode 100644 tests/unit/test_config_loader.py diff --git a/.dockerignore b/.dockerignore index acf91de..c8a83b7 100644 --- a/.dockerignore +++ b/.dockerignore @@ -8,4 +8,6 @@ frontend/dist docker-compose.yml .env.template .env.example -.idea \ No newline at end of file +.idea +docs/ +tmp/ diff --git a/README-EN.md b/README-EN.md new file mode 100644 index 0000000..bd09435 --- /dev/null +++ b/README-EN.md @@ -0,0 +1,177 @@ +
+ +# KiloStar + +A distributed multi-agent collaboration system built with Python + +[![Python 3.13+](https://img.shields.io/badge/python-3.13+-blue.svg)](https://www.python.org/) +[![Ray](https://img.shields.io/badge/Distributed-Ray-0288d1.svg)](https://docs.ray.io/) +[![Pydantic-AI](https://img.shields.io/badge/Framework-Pydantic--AI-ff69b4.svg)](https://ai.pydantic.dev/) +[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE) + +[中文](./README.md) | [**Changelog**](./changelogs/CHANGELOG.md) | [**Roadmap**](./changelogs/ROADMAP.md) + +
+ +--- + +**KiloStar** is a next-generation distributed multi-agent collaboration system powered by **Ray**. It adopts a "central oversight + edge execution" heterogeneous cluster model, leveraging large MoE models for high-level reasoning while coordinating fine-tuned lightweight models for efficient task execution. Built on **Pydantic-AI** with strong typing and a FastAPI async gateway, KiloStar delivers end-to-end automation from requirement decomposition to resource scheduling and execution. + +> **Current version**: `v0.1.1-alpha` + +--- + +## ✨ Key Features + +### 🧠 Heterogeneous Agent Architecture +- **Multi-agent cluster**: Built-in Regulatory, Consciousness, Control, and Growth core nodes +- **Dynamic Worker spawning**: On-demand creation of Ordinary or Skill-type Worker Individuals + +### 🚀 Distributed Performance +- **Ray-powered**: Cross-process, cross-machine Actor communication for high-concurrency workloads +- **Local-first**: Deep vLLM integration for private model deployment + +### 🔄 Workflow Engine +- **pydantic-graph based**: Directed-graph workflow orchestration with conditional branching +- **Cross-process persistence**: PostgreSQL state snapshots enabling workflow resume after interruption +- **Human-in-the-Loop (HITL)**: Built-in HumanApproval node with idempotent resume semantics + +### 🛡️ Security +- **JWT authentication**: All API endpoints (including SSE streams) require Bearer Token auth +- **Ownership enforcement**: Workflow/chat resources are user-bound; cross-user access returns 403 +- **fetch-based SSE**: Token transmitted via Authorization header, never exposed in URLs + +--- + +## 🚀 Quick Start + +### Docker Compose (Recommended) + +```yaml +services: + db: + image: postgres:16-alpine + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgrespassword + POSTGRES_DB: kilostar + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d kilostar"] + interval: 5s + timeout: 5s + retries: 5 + + kilostar: + image: zhaoxi5699/kilostar:v0.1.1alpha + ports: + - "8000:8000" + - "8265:8265" + depends_on: + db: + condition: service_healthy + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgrespassword + - POSTGRES_HOST=db + - POSTGRES_PORT=5432 + - POSTGRES_DB=kilostar + - SECRET_KEY=changethiskey12345 +``` + +```bash +docker compose up -d +``` + +Once running: +- Web Console: http://localhost:8000 +- Ray Dashboard: http://localhost:8265 + +### Local Development + +```bash +# Backend +uv sync +cp config/.env.example .env # Configure database and secret key +uv run python main.py + +# Frontend +cd frontend && npm install && npm run dev +``` + +--- + +## 📁 Project Structure + +``` +KiloStar/ +├── main.py # App entrypoint (FastAPI + Ray init) +├── pyproject.toml # Python dependencies & metadata +├── Dockerfile / docker-compose.yml # Container deployment +├── alembic/ # Database migrations +├── config/ # Environment config templates +├── kilostar/ # Backend core package +│ ├── api/ # FastAPI route layer +│ │ ├── system.py # /health system health checks +│ │ ├── workflow.py # /workflow CRUD + SSE + resume +│ │ ├── chat.py # /chat session management +│ │ ├── agent.py # /agent Worker management +│ │ └── resource.py # /resource Skill/Toolset mgmt +│ ├── core/ # Core business logic +│ │ ├── individual/ # Agent node implementations +│ │ │ ├── consciousness_node/ # Task planning +│ │ │ ├── regulatory_node/ # Quality oversight +│ │ │ ├── control_node/ # Routing & dispatch +│ │ │ └── growth_node/ # Capability expansion +│ │ ├── work/ # Work execution layer +│ │ │ ├── workflow/ # Workflow engine (pydantic-graph) +│ │ │ ├── chat/ # Chat processing +│ │ │ └── task/ # Single-task execution +│ │ ├── global_state_machine/ # Global state (Provider/Config) +│ │ ├── global_workflow_manager/ # Workflow message queue Actor +│ │ └── postgres_database/ # PostgreSQL DAO layer +│ ├── adapter/ # Model adapters (OpenAI/vLLM/...) +│ ├── plugin/ # Tool plugins +│ │ └── tool_plugin/ # Tavily / FileReader / Approval +│ ├── utils/ # Utilities +│ │ ├── access.py # JWT authentication +│ │ ├── ray_hook.py # Ray Actor handle retrieval +│ │ └── check_user/ # Role-based authorization +│ ├── worker_cluster/ # Worker cluster management +│ └── worker_individual/ # Worker individual lifecycle +├── frontend/ # React frontend (Vite + Tailwind) +│ └── src/ +│ ├── api/ # Axios client + SSE wrapper +│ ├── components/ # UI components +│ │ ├── Chat/ # Workflow panel + live graph +│ │ ├── Agent/ # Worker/Provider management +│ │ ├── Plugin/ # Skill/Tool configuration +│ │ └── Settings/ # System settings +│ ├── i18n/ # Internationalization (zh/en) +│ ├── store/ # Zustand state management +│ └── types/ # TypeScript type definitions +├── tests/ # Test suite (249+ cases) +│ ├── unit/ # Unit tests +│ └── integration/ # Integration smoke tests +└── docs/ # Design documents +``` + +--- + +## 🧪 Testing + +```bash +# Run all tests +uv run pytest tests -q + +# Unit tests only +uv run pytest tests/unit -q + +# Integration tests +uv run pytest tests/integration -q +``` + +--- + +## 📄 License + +This project is licensed under the [Apache License 2.0](LICENSE). diff --git a/README.md b/README.md index 87409a2..ef6a4ed 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -
+
# KiloStar (千星) @@ -9,99 +9,176 @@ [![Pydantic-AI](https://img.shields.io/badge/Framework-Pydantic--AI-ff69b4.svg)](https://ai.pydantic.dev/) [![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE) -[**项目架构**](./docs/ARCHITECTURE.md) | [**更新日志**](./changelogs/CHANGELOG.md) | [**未来展望**](./changelogs/ROADMAP.md) +[English](./README-EN.md) | [**更新日志**](./changelogs/CHANGELOG.md) | [**未来展望**](./changelogs/ROADMAP.md)
--- -**KiloStar** 是一款基于 **Ray** 构建的下一代分布式多 Agent 协作系统。项目采用“中心监管 + 边缘执行”的异构集群模式,通过大参数 MoE 模型进行高层逻辑推理,并协同微调后的轻量化模型高效完成具体任务。借助 **Pydantic-AI** 提供的强类型约束与 FastAPI 异步网关,kilostar 实现了任务从需求拆解、资源调度到自动化执行的全链路闭环,为个人提供可靠的人工智能助手服务。 +**KiloStar** 是一款基于 **Ray** 构建的下一代分布式多 Agent 协作系统。项目采用"中心监管 + 边缘执行"的异构集群模式,通过大参数 MoE 模型进行高层逻辑推理,并协同微调后的轻量化模型高效完成具体任务。借助 **Pydantic-AI** 提供的强类型约束与 FastAPI 异步网关,KiloStar 实现了任务从需求拆解、资源调度到自动化执行的全链路闭环。 + +> **当前版本**:`v0.1.1-alpha` --- + ## ✨ 核心特性 ### 🧠 异构协作体系 -- **多智能体集群**:内置监控 (Regulatory)、意识 (Consciousness)、控制 (Control) 、 生长(Growth)核心节点,实现比单 Agent 系统更严谨的决策链。 -- **Worker 动态派生**:根据任务需求动态拉起 Ordinary 或 Skill 类型的 Worker Individual,实现资源的按需分配。 +- **多智能体集群**:内置监控 (Regulatory)、意识 (Consciousness)、控制 (Control)、生长 (Growth) 核心节点 +- **Worker 动态派生**:根据任务需求动态拉起 Ordinary 或 Skill 类型的 Worker Individual ### 🚀 分布式性能保障 -- **Ray 驱动**:底层基于 Ray 构建,支持跨进程、跨机器的 Actor 通讯,轻松应对高并发任务流。 -- **本地化优先**:深度适配 **vLLM**,支持本地私有化模型部署,在保障隐私的同时大幅降低 API 调用成本。 +- **Ray 驱动**:跨进程、跨机器的 Actor 通讯,轻松应对高并发任务流 +- **本地化优先**:深度适配 vLLM,支持本地私有化模型部署 -### 🛠️ 工业级工程设计 -- **强类型契约**:基于 Pydantic-AI 实现 Tool 与 Agent 的接口定义,确保 AI 输出的确定性与安全性。 -- **自动化流**:内置工作流引擎 (Workflow Engine),实现从需求发现到自动化执行的闭环。 +### 🔄 工作流引擎 +- **pydantic-graph 驱动**:基于有向图的工作流编排,支持条件分支与循环 +- **跨进程持久化**:PostgreSQL 状态快照,支持 workflow 中断后恢复(resume) +- **人工介入 (HITL)**:内置 HumanApproval 节点,支持审批挂起与幂等恢复 -### 📦 KiloStar 生态子项目 (Sub-projects) +### 🛡️ 安全设计 +- **JWT 鉴权**:所有 API 端点(含 SSE 事件流)均走 Bearer Token 认证 +- **归属校验**:workflow / chat 资源严格绑定 user_id,跨用户访问返回 403 +- **fetch-based SSE**:Token 走 Authorization header,不暴露在 URL 中 + +### 📦 生态子项目 + +| 项目 | 代号 | 功能 | 状态 | +|:--|:--|:--|:--| +| [kilostar-viceroy](https://github.com/zhaoxi826/viceroy) | 总督 | Skill 动态安装与全集群分发 | ✅ 已发布 | +| [kilostar-thought](https://github.com/zhaoxi826/thought) | 思绪 | Agent 增强记忆系统 | 开发中 | -| 项目名称 | 代号 | 功能定位 | 当前状态 | -|:-----------------------------------------------------------|:--------| :--- | :--- | -| **[kilostar-viceroy](https://github.com/zhaoxi826/viceroy)** | **总督** | **资源管理**:负责系统 Skill 的动态安装、元数据解析与全集群分发。 | ✅ 已发布 | -| **[kilostar-thought](https://github.com/zhaoxi826/thought)** | **思绪** | **记忆系统**:增强agent的记忆系统。 | 开发中 | --- -## 🚀 快速开始 (Quick Start) -> **当前版本**:`v0.1.0-alpha` (开发预览版) -> 本项目目前处于快速迭代阶段,欢迎提交 Issue 或 Pull Request。 +## 🚀 快速开始 -### 方式一:使用 Docker Compose (推荐) -这是部署 **kilostar 应用** 及其配套 **PostgreSQL 数据库** 最简单、最完整的方式。 +### Docker Compose(推荐) -1. **准备配置文件**:在本地创建一个目录,并新建 `docker-compose.yml`: - ```yaml - services: - db: - image: postgres:16-alpine - container_name: kilostar_db - environment: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgrespassword - POSTGRES_DB: kilostar - healthcheck: - test: ["CMD-SHELL", "pg_isready -U postgres -d kilostar"] - interval: 5s - timeout: 5s - retries: 5 +```yaml +services: + db: + image: postgres:16-alpine + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgrespassword + POSTGRES_DB: kilostar + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d kilostar"] + interval: 5s + timeout: 5s + retries: 5 - kilostar: - image: zhaoxi5699/kilostar:v0.1.0alpha - container_name: kilostar - ports: - - "8000:8000" - - "8265:8265" - depends_on: - db: - condition: service_healthy - environment: - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgrespassword - - POSTGRES_HOST=db - - POSTGRES_PORT=5432 - - POSTGRES_DB=kilostar - - SECRET_KEY=changethiskey12345 # 请在生产环境中修改此密钥 - ``` - -2. **启动服务** - ```bash - docker compose up -d - ``` - -### 方式二:使用 Docker -1. **启动服务** - ```bash - docker run -d \ - --name kilostar \ - -p 8000:8000 \ - -p 8265:8265 \ - -e POSTGRES_HOST=你的数据库IP \ - -e POSTGRES_USER=postgres \ - -e POSTGRES_PASSWORD=postgrespassword \ - -e POSTGRES_DB=kilostar \ - -e SECRET_KEY=your_secret_key \ - zhaoxi5699/kilostar:v0.1.0alpha - ``` - -## 🔍 访问与验证 -服务启动后,可以通过以下地址进行操作: -- Web 控制台 / API 文档: http://localhost:8000 -- Ray 任务仪表盘: http://localhost:8265 \ No newline at end of file + kilostar: + image: zhaoxi5699/kilostar:v0.1.1alpha + ports: + - "8000:8000" + - "8265:8265" + depends_on: + db: + condition: service_healthy + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgrespassword + - POSTGRES_HOST=db + - POSTGRES_PORT=5432 + - POSTGRES_DB=kilostar + - SECRET_KEY=changethiskey12345 +``` + +```bash +docker compose up -d +``` + +启动后访问: +- Web 控制台:http://localhost:8000 +- Ray Dashboard:http://localhost:8265 + +### 本地开发 + +```bash +# 后端 +uv sync +cp config/.env.example .env # 编辑数据库和密钥配置 +uv run python main.py + +# 前端 +cd frontend && npm install && npm run dev +``` + +--- + +## 📁 项目结构 + +``` +KiloStar/ +├── main.py # 应用入口(FastAPI + Ray 初始化) +├── pyproject.toml # Python 依赖与项目元数据 +├── Dockerfile / docker-compose.yml # 容器化部署 +├── alembic/ # 数据库迁移脚本 +├── config/ # 环境配置模板 +├── kilostar/ # 后端核心包 +│ ├── api/ # FastAPI 路由层 +│ │ ├── system.py # /health 系统健康检查 +│ │ ├── workflow.py # /workflow CRUD + SSE + resume +│ │ ├── chat.py # /chat 会话管理 +│ │ ├── agent.py # /agent Worker 管理 +│ │ └── resource.py # /resource Skill/Toolset 管理 +│ ├── core/ # 核心业务逻辑 +│ │ ├── individual/ # 各类 Agent 节点实现 +│ │ │ ├── consciousness_node/ # 意识节点(任务规划) +│ │ │ ├── regulatory_node/ # 监管节点(质量把关) +│ │ │ ├── control_node/ # 控制节点(路由调度) +│ │ │ └── growth_node/ # 生长节点(能力扩展) +│ │ ├── work/ # 工作执行层 +│ │ │ ├── workflow/ # 工作流引擎(pydantic-graph) +│ │ │ ├── chat/ # 对话处理 +│ │ │ └── task/ # 单任务执行 +│ │ ├── global_state_machine/ # 全局状态机(Provider/Config) +│ │ ├── global_workflow_manager/ # 工作流消息队列 Actor +│ │ └── postgres_database/ # PostgreSQL DAO 层 +│ ├── adapter/ # 模型适配器(OpenAI/vLLM/...) +│ ├── plugin/ # 工具插件 +│ │ └── tool_plugin/ # Tavily / FileReader / Approval +│ ├── utils/ # 工具函数 +│ │ ├── access.py # JWT 认证 +│ │ ├── ray_hook.py # Ray Actor 句柄获取 +│ │ └── check_user/ # 角色鉴权 +│ ├── worker_cluster/ # Worker 集群管理 +│ └── worker_individual/ # Worker 个体生命周期 +├── frontend/ # React 前端(Vite + Tailwind) +│ └── src/ +│ ├── api/ # Axios client + SSE 封装 +│ ├── components/ # UI 组件 +│ │ ├── Chat/ # 工作流面板 + 实时图 +│ │ ├── Agent/ # Worker/Provider 管理 +│ │ ├── Plugin/ # Skill/Tool 配置 +│ │ └── Settings/ # 系统设置 +│ ├── i18n/ # 国际化(中/英) +│ ├── store/ # Zustand 状态管理 +│ └── types/ # TypeScript 类型定义 +├── tests/ # 测试套件(249+ 用例) +│ ├── unit/ # 单元测试 +│ └── integration/ # 集成 smoke 测试 +└── docs/ # 设计文档 +``` + +--- + +## 🧪 测试 + +```bash +# 全量测试 +uv run pytest tests -q + +# 仅单元测试 +uv run pytest tests/unit -q + +# 集成测试(标记 integration) +uv run pytest tests/integration -q +``` + +--- + +## 📄 开源协议 + +本项目基于 [Apache License 2.0](LICENSE) 开源。 diff --git a/alembic/versions/2026_06_02_0000-0002_graph_and_logs.py b/alembic/versions/2026_06_02_0000-0002_graph_and_logs.py new file mode 100644 index 0000000..a3f125b --- /dev/null +++ b/alembic/versions/2026_06_02_0000-0002_graph_and_logs.py @@ -0,0 +1,49 @@ +"""add workflow_graph_state and system_event_log tables + +Revision ID: 0002_graph_and_logs +Revises: 0001_initial +Create Date: 2026-06-02 00:00:00 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from alembic import op + +revision: str = "0002_graph_and_logs" +down_revision: Union[str, None] = "0001_initial" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "workflow_graph_state", + sa.Column("trace_id", sa.String(64), primary_key=True, comment="对应的工作流 Trace ID"), + sa.Column("history", postgresql.JSONB(), nullable=False, server_default="[]", comment="pydantic_graph history JSON"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + op.create_table( + "system_event_log", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("trace_id", sa.String(64), nullable=False, comment="关联的工作流 trace_id"), + sa.Column("event_type", sa.String(50), nullable=False, comment="事件类型"), + sa.Column("level", sa.String(10), nullable=False, server_default="info", comment="日志级别"), + sa.Column("node_name", sa.String(100), nullable=True, comment="相关节点名称"), + sa.Column("message", sa.Text, nullable=False, comment="日志消息正文"), + sa.Column("extra_data", postgresql.JSONB(), nullable=True, comment="附加元数据"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + op.create_index("ix_system_event_log_trace_id", "system_event_log", ["trace_id"]) + op.create_index("ix_system_event_log_event_type", "system_event_log", ["event_type"]) + op.create_index("ix_system_event_log_level", "system_event_log", ["level"]) + op.create_index("ix_system_event_log_created_at", "system_event_log", ["created_at"]) + + +def downgrade() -> None: + op.drop_table("system_event_log") + op.drop_table("workflow_graph_state") diff --git a/config/config.yml b/config/config.yml index e1c4070..b1fd632 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,2 +1,2 @@ -version: v0.1 -name: \ No newline at end of file +version: v0.1.1-alpha +name: Kilostar \ No newline at end of file diff --git a/config/workflow.yaml b/config/workflow.yaml new file mode 100644 index 0000000..2df4c48 --- /dev/null +++ b/config/workflow.yaml @@ -0,0 +1,2 @@ +retry: + max_attempts: 5 \ No newline at end of file diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index a8aa2f2..4a3c5ad 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,7 +1,8 @@ -import { useEffect } from 'react'; +import { useEffect, useState } from 'react'; import i18n from './i18n'; import { TopBar } from './components/Layout/TopBar'; import { CollapsibleSidebar } from './components/Layout/CollapsibleSidebar'; +import { SetupGuideModal } from './components/Layout/SetupGuideModal'; import { SettingsLayout } from './components/Settings/SettingsLayout'; import { AgentLayout } from './components/Agent/AgentLayout'; import { PluginLayout } from './components/Plugin/PluginLayout'; @@ -19,6 +20,7 @@ function App() { isAuthenticated, setIsAuthenticated, mode, + setMode, showSettings, workTab, agentTab, @@ -27,6 +29,7 @@ function App() { } = useAppStore(); const { loadSessions } = useChatStore(); + const [showSetupGuide, setShowSetupGuide] = useState(false); useEffect(() => { applyTheme(); @@ -52,6 +55,7 @@ function App() { useEffect(() => { if (isAuthenticated) { loadSessions(); + setShowSetupGuide(true); } }, [isAuthenticated, loadSessions]); @@ -63,6 +67,15 @@ function App() {
+ {showSetupGuide && ( + setShowSetupGuide(false)} + onNavigateToAgent={() => { + setMode('agent'); + }} + /> + )} +
{showSettings ? ( diff --git a/frontend/src/api/sse.ts b/frontend/src/api/sse.ts new file mode 100644 index 0000000..1f48ac0 --- /dev/null +++ b/frontend/src/api/sse.ts @@ -0,0 +1,130 @@ +// 基于 fetch + ReadableStream 的轻量 SSE 客户端,带指数退避自动重连。 +// +// 原生 EventSource 无法携带自定义 header,只能把 token 放进 URL query, +// 而 token 进 URL 会被网关/浏览器历史/Referer 记录,存在泄露风险。 +// 这里用 fetch 手动读取 text/event-stream,token 走标准 Authorization header。 + +export interface SSEHandlers { + onOpen?: () => void; + onMessage?: (data: string) => void; + onError?: (err: unknown) => void; + // 连接断开、准备重连时回调,附带本次退避延迟(毫秒) + onReconnect?: (delayMs: number) => void; +} + +export interface SSEOptions { + // 初始重连延迟(毫秒),默认 1000 + baseDelayMs?: number; + // 最大重连延迟(毫秒),默认 30000 + maxDelayMs?: number; + // 鉴权失败(401/403)时是否停止重连,默认 true + stopOnAuthError?: boolean; +} + +export interface SSEConnection { + close: () => void; +} + +const AUTH_ERROR_STATUSES = new Set([401, 403]); + +export function connectSSE( + url: string, + token: string, + handlers: SSEHandlers, + options: SSEOptions = {}, +): SSEConnection { + const baseDelay = options.baseDelayMs ?? 1000; + const maxDelay = options.maxDelayMs ?? 30000; + const stopOnAuthError = options.stopOnAuthError ?? true; + + let controller = new AbortController(); + let closed = false; + let attempt = 0; + let retryTimer: ReturnType | null = null; + + const scheduleReconnect = () => { + if (closed) return; + // 指数退避 + 抖动,封顶 maxDelay + const backoff = Math.min(baseDelay * 2 ** attempt, maxDelay); + const delay = backoff / 2 + Math.random() * (backoff / 2); + attempt += 1; + handlers.onReconnect?.(delay); + retryTimer = setTimeout(() => { + if (closed) return; + controller = new AbortController(); + void run(); + }, delay); + }; + + const run = async () => { + try { + const resp = await fetch(url, { + method: 'GET', + headers: { + Accept: 'text/event-stream', + ...(token ? { Authorization: `Bearer ${token}` } : {}), + }, + signal: controller.signal, + }); + + if (!resp.ok || !resp.body) { + handlers.onError?.(new Error(`SSE connect failed: ${resp.status}`)); + if (stopOnAuthError && AUTH_ERROR_STATUSES.has(resp.status)) { + closed = true; + return; + } + scheduleReconnect(); + return; + } + + // 连接成功,重置退避计数 + attempt = 0; + handlers.onOpen?.(); + + const reader = resp.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + let sep: number; + while ((sep = buffer.indexOf('\n\n')) !== -1) { + const rawEvent = buffer.slice(0, sep); + buffer = buffer.slice(sep + 2); + const data = parseEventData(rawEvent); + if (data !== null) handlers.onMessage?.(data); + } + } + + // 流正常结束(服务端关闭),非主动 close 则尝试重连 + if (!closed) scheduleReconnect(); + } catch (err) { + if (controller.signal.aborted || closed) return; + handlers.onError?.(err); + scheduleReconnect(); + } + }; + + void run(); + + return { + close: () => { + closed = true; + if (retryTimer) clearTimeout(retryTimer); + controller.abort(); + }, + }; +} + +function parseEventData(rawEvent: string): string | null { + // 只关心 data: 行,多行 data 用 \n 拼接,忽略注释(:)与其他字段 + const dataLines = rawEvent + .split('\n') + .filter((line) => line.startsWith('data:')) + .map((line) => line.slice(5).replace(/^ /, '')); + if (dataLines.length === 0) return null; + return dataLines.join('\n'); +} diff --git a/frontend/src/components/Agent/AgentLayout.tsx b/frontend/src/components/Agent/AgentLayout.tsx index 65d59e0..04aac09 100644 --- a/frontend/src/components/Agent/AgentLayout.tsx +++ b/frontend/src/components/Agent/AgentLayout.tsx @@ -2,6 +2,8 @@ import { useTranslation } from 'react-i18next'; import { useAppStore } from '../../store/useAppStore'; import { ProvidersSettings } from './ProvidersSettings'; import { WorkerIndividualSettings } from './WorkerIndividualSettings'; +import { WorkflowConfigSettings } from './WorkflowConfigSettings'; +import { SystemLogsView } from './SystemLogsView'; export function AgentLayout() { const { t } = useTranslation(); @@ -10,6 +12,8 @@ export function AgentLayout() { const tabs = [ { key: 'worker', label: t('agent.individual') }, { key: 'providers', label: t('agent.providerManagement') }, + { key: 'config', label: t('agent.config') }, + { key: 'logs', label: t('agent.systemLogs') }, ]; return ( @@ -32,6 +36,8 @@ export function AgentLayout() {
{innerAgentTab === 'worker' && } {innerAgentTab === 'providers' && } + {innerAgentTab === 'config' && } + {innerAgentTab === 'logs' && }
); diff --git a/frontend/src/components/Agent/SystemLogsView.tsx b/frontend/src/components/Agent/SystemLogsView.tsx new file mode 100644 index 0000000..b232544 --- /dev/null +++ b/frontend/src/components/Agent/SystemLogsView.tsx @@ -0,0 +1,144 @@ +import { useState, useEffect } from 'react'; +import { useTranslation } from 'react-i18next'; +import { RefreshCw, Search } from 'lucide-react'; + +interface EventLog { + id: number; + trace_id: string; + event_type: string; + level: string; + node_name: string | null; + message: string; + metadata: Record | null; + created_at: string | null; +} + +const LEVEL_STYLES: Record = { + error: { bg: 'bg-[rgba(196,145,122,0.12)]', text: 'text-[#a0705a]', label: 'ERROR' }, + warn: { bg: 'bg-[rgba(196,168,130,0.15)]', text: 'text-[#9a7d5e]', label: 'WARN' }, + info: { bg: 'bg-[rgba(156,175,136,0.12)]', text: 'text-[#7a8e6a]', label: 'INFO' }, +}; + +export function SystemLogsView() { + const { t } = useTranslation(); + const [logs, setLogs] = useState([]); + const [loading, setLoading] = useState(false); + const [traceFilter, setTraceFilter] = useState(''); + const [typeFilter, setTypeFilter] = useState(''); + const [levelFilter, setLevelFilter] = useState(''); + + const fetchLogs = async () => { + setLoading(true); + try { + const params = new URLSearchParams(); + if (traceFilter) params.set('trace_id', traceFilter); + if (typeFilter) params.set('event_type', typeFilter); + if (levelFilter) params.set('level', levelFilter); + params.set('limit', '200'); + + const resp = await fetch(`/api/v1/system/logs?${params.toString()}`, { + headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` }, + }); + if (!resp.ok) throw new Error('Failed to fetch logs'); + const data = await resp.json(); + setLogs(data.logs || []); + } catch (err) { + console.error(err); + } finally { + setLoading(false); + } + }; + + useEffect(() => { fetchLogs(); }, []); + + const handleSearch = (e: React.FormEvent) => { + e.preventDefault(); + fetchLogs(); + }; + + return ( +
+
+

{t('agent.systemLogs')}

+ +
+ +
+ setTraceFilter(e.target.value)} + placeholder={t('agent.logFilterTraceId')} + className="px-3 py-2 bg-bg-card border border-border-primary rounded-lg text-sm text-text-primary placeholder:text-text-muted/50 focus:outline-none focus:ring-2 focus:ring-accent/15" + /> + + + +
+ +
+
+ + + + + + + + + + + + + {logs.length === 0 ? ( + + ) : ( + logs.map((log) => { + const style = LEVEL_STYLES[log.level] || LEVEL_STYLES.info; + return ( + + + + + + + + + ); + }) + )} + +
{t('agent.logLevel')}{t('agent.logType')}Trace ID{t('agent.logNode')}{t('agent.logMessage')}{t('agent.logTime')}
{t('agent.noLogs')}
+ {style.label} + {log.event_type}{log.trace_id.slice(-8)}{log.node_name || '-'}{log.message}{log.created_at ? new Date(log.created_at).toLocaleString() : '-'}
+
+
+
+ ); +} \ No newline at end of file diff --git a/frontend/src/components/Agent/WorkflowConfigSettings.tsx b/frontend/src/components/Agent/WorkflowConfigSettings.tsx new file mode 100644 index 0000000..8125c8d --- /dev/null +++ b/frontend/src/components/Agent/WorkflowConfigSettings.tsx @@ -0,0 +1,152 @@ +import { useState, useEffect } from 'react'; +import { useTranslation } from 'react-i18next'; + +interface WorkflowConfig { + retry: { + max_attempts: number; + }; +} + +export function WorkflowConfigSettings() { + const { t } = useTranslation(); + const [config, setConfig] = useState(null); + const [loading, setLoading] = useState(true); + const [saving, setSaving] = useState(false); + const [error, setError] = useState(null); + const [successMessage, setSuccessMessage] = useState(null); + + useEffect(() => { + loadConfig(); + }, []); + + const loadConfig = async () => { + try { + setLoading(true); + setError(null); + const response = await fetch('/api/v1/system/config/workflow', { + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + }, + }); + if (!response.ok) { + throw new Error(`Failed to load config: ${response.statusText}`); + } + const data = await response.json(); + setConfig(data); + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to load configuration'); + } finally { + setLoading(false); + } + }; + + const handleSave = async () => { + if (!config) return; + + try { + setSaving(true); + setError(null); + setSuccessMessage(null); + const response = await fetch('/api/v1/system/config/workflow', { + method: 'PUT', + headers: { + 'Authorization': `Bearer ${localStorage.getItem('token')}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(config), + }); + if (!response.ok) { + const errorData = await response.json(); + throw new Error(errorData.detail || `Failed to save: ${response.statusText}`); + } + setSuccessMessage(t('agent.configSaved')); + setTimeout(() => setSuccessMessage(null), 3000); + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to save configuration'); + } finally { + setSaving(false); + } + }; + + const handleMaxAttemptsChange = (value: string) => { + const numValue = parseInt(value, 10); + if (!isNaN(numValue) && numValue >= 1 && numValue <= 100) { + setConfig((prev) => prev ? { + ...prev, + retry: { ...prev.retry, max_attempts: numValue } + } : null); + } + }; + + if (loading) { + return ( +
+
{t('common.loading')}
+
+ ); + } + + if (!config) { + return ( +
+
{error || 'No configuration available'}
+
+ ); + } + + return ( +
+

+ {t('agent.workflowConfig')} +

+ + {error && ( +
+ {error} +
+ )} + + {successMessage && ( +
+ {successMessage} +
+ )} + +
+
+ + handleMaxAttemptsChange(e.target.value)} + className="w-full px-4 py-2 bg-bg-secondary border border-border-primary rounded-lg text-text-primary focus:outline-none focus:ring-2 focus:ring-accent" + /> +

+ {t('agent.maxRetryAttemptsDesc')} +

+
+ +
+ + +
+
+
+ ); +} diff --git a/frontend/src/components/Chat/RightPanel.tsx b/frontend/src/components/Chat/RightPanel.tsx index a7a9683..fe805b3 100644 --- a/frontend/src/components/Chat/RightPanel.tsx +++ b/frontend/src/components/Chat/RightPanel.tsx @@ -1,8 +1,11 @@ import { useState, useEffect, useRef } from 'react'; import { useTranslation } from 'react-i18next'; -import { Terminal, RefreshCw, SendHorizontal, LayoutList, GitFork, Radio } from 'lucide-react'; +import { Terminal, RefreshCw, SendHorizontal, LayoutList, GitFork, Radio, PlayCircle, ListChecks } from 'lucide-react'; import apiClient from '../../api/client'; +import { connectSSE } from '../../api/sse'; +import type { SSEConnection } from '../../api/sse'; import type { WorkflowDetail } from '../../types'; +import { ErrorBoundary } from '../ErrorBoundary'; import { WorkflowDiagram } from './WorkflowDiagram'; interface RightPanelProps { @@ -16,8 +19,9 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { const [logs, setLogs] = useState([]); const [sseConnected, setSseConnected] = useState(false); const [replyText, setReplyText] = useState(''); - const [activeTab, setActiveTab] = useState<'chat' | 'diagram'>('chat'); - const eventSourceRef = useRef(null); + const [resuming, setResuming] = useState(false); + const [activeTab, setActiveTab] = useState<'chat' | 'steps' | 'diagram'>('chat'); + const eventSourceRef = useRef(null); const logsEndRef = useRef(null); const fetchDetail = async (traceId: string) => { @@ -43,14 +47,28 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { setLogs([]); const apiBase = import.meta.env.VITE_API_BASE_URL || `${window.location.protocol}//${window.location.host}`; - const es = new EventSource(`${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`); - eventSourceRef.current = es; - es.onopen = () => setSseConnected(true); - es.onmessage = (event) => setLogs((prev) => [...prev, event.data]); - es.onerror = () => setSseConnected(false); + // 用 fetch-based SSE,token 走标准 Authorization header,不进 URL + const token = localStorage.getItem('token') || ''; + const conn = connectSSE( + `${apiBase}/api/v1/workflow/sse/${selectedWorkflow}`, + token, + { + onOpen: () => setSseConnected(true), + onMessage: (data) => setLogs((prev) => [...prev, data]), + onError: () => setSseConnected(false), + onReconnect: (delayMs) => { + setSseConnected(false); + setLogs((prev) => [ + ...prev, + `[System]: ${t('workflow.sseReconnecting', { seconds: Math.round(delayMs / 1000) })}`, + ]); + }, + }, + ); + eventSourceRef.current = conn; const interval = setInterval(() => fetchDetail(selectedWorkflow), 3000); - return () => { es.close(); eventSourceRef.current = null; clearInterval(interval); }; + return () => { conn.close(); eventSourceRef.current = null; clearInterval(interval); }; }, [selectedWorkflow]); useEffect(() => { @@ -72,6 +90,27 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { } }; + const handleResume = async () => { + if (!selectedWorkflow || resuming) return; + setResuming(true); + try { + await apiClient.post(`/api/v1/workflow/${selectedWorkflow}/resume`); + setLogs((prev) => [...prev, `[System]: ${t('workflow.resumeTriggered')}`]); + fetchDetail(selectedWorkflow); + } catch (err: any) { + const detailMsg = err?.response?.data?.detail || t('workflow.resumeFailed'); + setLogs((prev) => [...prev, `[System Error]: ${detailMsg}`]); + } finally { + setResuming(false); + } + }; + + // 只有非终态(未 completed/failed)的工作流才允许 resume + const canResume = + !!detail && + detail.status !== 'completed' && + detail.status !== 'failed'; + if (!selectedWorkflow) return null; return ( @@ -93,6 +132,9 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { + @@ -100,6 +142,11 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { + {canResume && ( + + )}
@@ -107,11 +154,58 @@ export function RightPanel({ selectedWorkflow }: RightPanelProps) { {activeTab === 'diagram' ? (
{detail?.steps && detail.steps.length > 0 ? ( - - ) : ( + + + + ) : (
Workflow steps are not yet generated.
)}
+ ) : activeTab === 'steps' ? ( +
+ {detail?.steps && detail.steps.length > 0 ? ( + detail.steps.map((step: any, idx: number) => { + const isCurrent = idx === (detail.current_step ?? 0); + const isDone = idx < (detail.current_step ?? 0); + return ( +
+
+ + {isDone ? '✓' : idx + 1} + + {step.node_name || step.name || `Step ${idx + 1}`} + {isCurrent && {t('workflow.status.running')}} +
+ {step.output && ( +
+                        {typeof step.output === 'string' ? step.output : JSON.stringify(step.output, null, 2)}
+                      
+ )} + {step.error && ( +
+ {step.error} +
+ )} +
+ ); + }) + ) : ( +
+ {t('workflow.noStepsYet')} +
+ )} +
+ ) : loading && !detail ? ( +
+
+
+
+
+
+
+
+
+
) : (
{detail?.command && ( diff --git a/frontend/src/components/Chat/WorkflowDiagram.tsx b/frontend/src/components/Chat/WorkflowDiagram.tsx index 46923cd..57ad53e 100644 --- a/frontend/src/components/Chat/WorkflowDiagram.tsx +++ b/frontend/src/components/Chat/WorkflowDiagram.tsx @@ -19,19 +19,26 @@ interface WorkflowDiagramProps { } export function WorkflowDiagram({ steps, currentStep, status }: WorkflowDiagramProps) { - const isWorkflowActive = status === 'llm_working' || status === 'tool_working'; + const isWorkflowActive = + status === 'llm_working' || + status === 'tool_working' || + status === 'running' || + status === 'hang_up'; const initialNodes = useMemo(() => { return steps.map((step, index) => { - const isCurrent = step.step === currentStep && isWorkflowActive; + // 后端 current_step 是 0-based 的 workflow_pointer;优先用 per-step 的 + // 运行期 status 着色,pointer 仅用来标"正在跑的那一步"。 + const isCurrent = index === currentStep && isWorkflowActive; const isCompleted = step.status === 'completed'; const isFailed = step.status === 'failed'; + const isWorking = step.status === 'working'; let bgColor = 'var(--bg-card)'; let borderColor = 'var(--border-primary)'; let textColor = 'var(--text-secondary)'; - if (isCurrent) { + if (isCurrent || isWorking) { bgColor = 'var(--bg-active)'; borderColor = 'var(--accent)'; textColor = 'var(--accent)'; @@ -61,7 +68,7 @@ export function WorkflowDiagram({ steps, currentStep, status }: WorkflowDiagramP border: `2px solid ${borderColor}`, borderRadius: '10px', color: textColor, - boxShadow: isCurrent ? '0 0 20px -4px var(--accent-glow)' : 'none', + boxShadow: (isCurrent || isWorking) ? '0 0 20px -4px var(--accent-glow)' : 'none', fontSize: '12px', }, }; @@ -75,7 +82,7 @@ export function WorkflowDiagram({ steps, currentStep, status }: WorkflowDiagramP id: `e${steps[i].step}-${steps[i + 1].step}`, source: steps[i].step.toString(), target: steps[i + 1].step.toString(), - animated: steps[i].step === currentStep && isWorkflowActive, + animated: i === currentStep && isWorkflowActive, style: { stroke: 'var(--border-primary)', strokeWidth: 2 }, markerEnd: { type: MarkerType.ArrowClosed, color: 'var(--border-primary)' }, }); diff --git a/frontend/src/components/Chat/WorkflowListView.tsx b/frontend/src/components/Chat/WorkflowListView.tsx index 4f6aaaf..4e4bdbd 100644 --- a/frontend/src/components/Chat/WorkflowListView.tsx +++ b/frontend/src/components/Chat/WorkflowListView.tsx @@ -11,6 +11,7 @@ export function WorkflowListView({ onSelectWorkflow }: WorkflowListViewProps) { const { t } = useTranslation(); const [workflows, setWorkflows] = useState([]); const [loading, setLoading] = useState(true); + const [statusFilter, setStatusFilter] = useState('all'); useEffect(() => { const fetchWorkflows = async () => { @@ -103,50 +104,90 @@ export function WorkflowListView({ onSelectWorkflow }: WorkflowListViewProps) {
- {workflows.length === 0 ? ( -
-
- -
-

{t('workflow.noWorkflows')}

-

{t('workflow.workflowsAppearHere')}

-
- ) : ( -
- {workflows.map((wf) => { - const style = getStatusStyle(wf.status); - return ( -
onSelectWorkflow(wf.trace_id)} - className="p-4 bg-bg-card rounded-xl border border-border-primary cursor-pointer transition-all hover:shadow-[0_4px_16px_rgba(0,0,0,0.05)] hover:-translate-y-0.5 hover:border-[#d5d0ca] dark:hover:border-white/10" - > -
- - {style.label} - -
-

- {wf.title || t('common.unnamed')} -

- {wf.command && ( -

- {wf.command} -

- )} -
- - {wf.trace_id.slice(-8)} - - {wf.created_at && ( - {new Date(wf.created_at).toLocaleDateString()} - )} -
+ {/* Status Filter Tabs */} +
+ {[ + { key: 'all', label: t('workflow.filterAll'), count: stats.total }, + { key: 'running', label: t('workflow.status.running'), count: stats.running }, + { key: 'completed', label: t('workflow.status.completed'), count: stats.completed }, + { key: 'failed', label: t('workflow.status.failed'), count: workflows.filter((w) => w.status === 'failed').length }, + { key: 'queued', label: t('workflow.queued'), count: stats.queued }, + ].map((tab) => ( + + ))} +
+ + {(() => { + const filteredWorkflows = workflows.filter((w) => { + if (statusFilter === 'all') return true; + if (statusFilter === 'running') return w.status?.includes('working'); + if (statusFilter === 'completed') return w.status === 'completed'; + if (statusFilter === 'failed') return w.status === 'failed'; + if (statusFilter === 'queued') return !w.status || (!w.status.includes('working') && w.status !== 'completed' && w.status !== 'failed'); + return true; + }); + + if (filteredWorkflows.length === 0) { + return ( +
+
+
- ); - })} -
- )} +

+ {statusFilter === 'all' ? t('workflow.noWorkflows') : t('workflow.noWorkflowsInFilter')} +

+

{t('workflow.workflowsAppearHere')}

+
+ ); + } + + return ( +
+ {filteredWorkflows.map((wf) => { + const style = getStatusStyle(wf.status); + return ( +
onSelectWorkflow(wf.trace_id)} + className="p-4 bg-bg-card rounded-xl border border-border-primary cursor-pointer transition-all hover:shadow-[0_4px_16px_rgba(0,0,0,0.05)] hover:-translate-y-0.5 hover:border-[#d5d0ca] dark:hover:border-white/10" + > +
+ + {style.label} + +
+

+ {wf.title || t('common.unnamed')} +

+ {wf.command && ( +

+ {wf.command} +

+ )} +
+ + {wf.trace_id.slice(-8)} + + {wf.created_at && ( + {new Date(wf.created_at).toLocaleDateString()} + )} +
+
+ ); + })} +
+ ); + })()}
); } diff --git a/frontend/src/components/ErrorBoundary.tsx b/frontend/src/components/ErrorBoundary.tsx new file mode 100644 index 0000000..4f3a2f3 --- /dev/null +++ b/frontend/src/components/ErrorBoundary.tsx @@ -0,0 +1,52 @@ +import { Component } from 'react'; +import type { ErrorInfo, ReactNode } from 'react'; + +interface ErrorBoundaryProps { + children: ReactNode; + fallback?: ReactNode; +} + +interface ErrorBoundaryState { + hasError: boolean; + message?: string; +} + +export class ErrorBoundary extends Component { + constructor(props: ErrorBoundaryProps) { + super(props); + this.state = { hasError: false }; + } + + static getDerivedStateFromError(error: Error): ErrorBoundaryState { + return { hasError: true, message: error.message }; + } + + componentDidCatch(error: Error, info: ErrorInfo) { + console.error('ErrorBoundary caught:', error, info.componentStack); + } + + reset = () => { + this.setState({ hasError: false, message: undefined }); + }; + + render() { + if (this.state.hasError) { + if (this.props.fallback) return this.props.fallback; + return ( +
+
Something went wrong while rendering.
+ {this.state.message && ( +
{this.state.message}
+ )} + +
+ ); + } + return this.props.children; + } +} diff --git a/frontend/src/components/Layout/SetupGuideModal.tsx b/frontend/src/components/Layout/SetupGuideModal.tsx new file mode 100644 index 0000000..30b0e8b --- /dev/null +++ b/frontend/src/components/Layout/SetupGuideModal.tsx @@ -0,0 +1,109 @@ +import { useEffect, useState } from 'react'; +import { useTranslation } from 'react-i18next'; +import { AlertCircle, X } from 'lucide-react'; +import apiClient from '../../api/client'; + +interface SetupGuideModalProps { + onClose: () => void; + onNavigateToAgent: () => void; +} + +export function SetupGuideModal({ onClose, onNavigateToAgent }: SetupGuideModalProps) { + const { t } = useTranslation(); + const [missingNodes, setMissingNodes] = useState([]); + + useEffect(() => { + const checkNodes = async () => { + try { + const resp = await apiClient.get('/api/v1/agent'); + const systemNodes = resp.data.system_nodes || []; + + const regulatory = systemNodes.find((n: any) => n.node_name === 'regulatory_node'); + const consciousness = systemNodes.find((n: any) => n.node_name === 'consciousness_node'); + + const missing = []; + if (!regulatory || !regulatory.provider_title || !regulatory.model_id) { + missing.push('regulatory_node'); + } + if (!consciousness || !consciousness.provider_title || !consciousness.model_id) { + missing.push('consciousness_node'); + } + + setMissingNodes(missing); + } catch (err) { + console.error('Failed to check system nodes:', err); + } + }; + checkNodes(); + }, []); + + if (missingNodes.length === 0) { + onClose(); + return null; + } + + return ( +
+
+ + +
+
+ +
+
+

+ {t('setup.coreNodesNotConfigured')} +

+

+ {t('setup.pleaseConfigureBeforeUse')} +

+
+
+ +
+
+ {t('setup.missingNodes')}: +
+
    + {missingNodes.includes('regulatory_node') && ( +
  • + + {t('setup.regulatoryNode')} +
  • + )} + {missingNodes.includes('consciousness_node') && ( +
  • + + {t('setup.consciousnessNode')} +
  • + )} +
+
+ +
+ + +
+
+
+ ); +} diff --git a/frontend/src/i18n/locales/en.json b/frontend/src/i18n/locales/en.json index 186a7e0..e7b9f77 100644 --- a/frontend/src/i18n/locales/en.json +++ b/frontend/src/i18n/locales/en.json @@ -76,6 +76,10 @@ "waitingEvents": "Waiting for events...", "replyPlaceholder": "Reply to the workflow...", "refresh": "Refresh Data", + "resume": "Resume Workflow", + "resumeTriggered": "Resume request sent, the workflow is recovering...", + "resumeFailed": "Failed to resume workflow", + "sseReconnecting": "Connection lost, retrying in {{seconds}}s...", "workflowDetails": "Workflow Details", "loading": "Loading Workflows...", "titleRequired": "Please enter a workflow title", @@ -90,7 +94,11 @@ "failed": "Failed" }, "total": "Total", - "queued": "Queued" + "queued": "Queued", + "filterAll": "All", + "noWorkflowsInFilter": "No workflows match current filter", + "steps": "Steps", + "noStepsYet": "Workflow steps not yet generated" }, "settings": { "settings": "Settings", @@ -163,6 +171,22 @@ "editWorker": "Edit Worker", "provider": "Provider", "model": "Model", + "config": "Config", + "workflowConfig": "Workflow Config", + "maxRetryAttempts": "Max Loop Retry Attempts", + "maxRetryAttemptsDesc": "When a workflow graph contains cycles, this limits the maximum number of times the engine may re-enter a loop, preventing infinite retries. Default: 5.", + "configSaved": "Configuration saved and hot-reloaded", + "systemLogs": "System Logs", + "logFilterTraceId": "Filter by Trace ID", + "logFilterAllTypes": "All event types", + "logFilterAllLevels": "All levels", + "logSearch": "Search", + "logLevel": "Level", + "logType": "Type", + "logNode": "Node", + "logMessage": "Message", + "logTime": "Time", + "noLogs": "No log entries yet", "description": "Description", "systemPrompt": "System Prompt", "outputTemplate": "Output Template (JSON)", @@ -224,6 +248,17 @@ "none": "None", "creating": "Creating...", "actions": "Actions", - "cancel": "Cancel" + "cancel": "Cancel", + "skip": "Skip for now", + "reset": "Reset", + "save": "Save" + }, + "setup": { + "coreNodesNotConfigured": "Core Nodes Not Configured", + "pleaseConfigureBeforeUse": "Regulatory and Consciousness nodes are core components of the system. Please configure them before using workflow features.", + "missingNodes": "Missing Nodes", + "regulatoryNode": "Regulatory Node", + "consciousnessNode": "Consciousness Node", + "goToConfig": "Go to Configuration" } } diff --git a/frontend/src/i18n/locales/zh.json b/frontend/src/i18n/locales/zh.json index 369bb98..f59ce12 100644 --- a/frontend/src/i18n/locales/zh.json +++ b/frontend/src/i18n/locales/zh.json @@ -37,7 +37,7 @@ "placeholder": "让 kilostar 做点什么...", "send": "发送", "selectChat": "选择对话记录或创建新对话以开始", - "assistantName": "kilostar 助手", + "assistantName": "KiloStar 助手", "errorCommunication": "抱歉,与服务器通信时出错。", "mistakeWarning": "KiloStar 可能会犯错,重要信息请自行核实。", "addAttachment": "添加附件", @@ -76,6 +76,10 @@ "waitingEvents": "等待事件...", "replyPlaceholder": "回复工作流...", "refresh": "刷新数据", + "resume": "恢复工作流", + "resumeTriggered": "恢复请求已发送,工作流正在恢复中...", + "resumeFailed": "恢复工作流失败", + "sseReconnecting": "连接断开,{{seconds}}秒后重试...", "workflowDetails": "工作流详情", "loading": "正在加载工作流...", "titleRequired": "请输入工作流标题", @@ -90,7 +94,11 @@ "failed": "失败" }, "total": "总数", - "queued": "排队中" + "queued": "排队中", + "filterAll": "全部", + "noWorkflowsInFilter": "当前筛选下无工作流", + "steps": "步骤", + "noStepsYet": "工作流步骤尚未生成" }, "settings": { "settings": "设置", @@ -163,6 +171,22 @@ "editWorker": "编辑工作节点", "provider": "供应商", "model": "模型", + "config": "配置", + "workflowConfig": "工作流配置", + "maxRetryAttempts": "最大环重试次数", + "maxRetryAttemptsDesc": "工作流图中有环时,防止因异常反复进入环路的最大重试次数。默认为 5。", + "configSaved": "配置已保存并热重载生效", + "systemLogs": "系统日志", + "logFilterTraceId": "按 Trace ID 筛选", + "logFilterAllTypes": "所有事件类型", + "logFilterAllLevels": "所有级别", + "logSearch": "查询", + "logLevel": "级别", + "logType": "类型", + "logNode": "节点", + "logMessage": "消息", + "logTime": "时间", + "noLogs": "暂无日志记录", "description": "描述", "systemPrompt": "系统提示词", "outputTemplate": "输出模板 (JSON)", @@ -224,6 +248,17 @@ "none": "无", "creating": "创建中...", "actions": "操作", - "cancel": "取消" + "cancel": "取消", + "skip": "稍后再说", + "reset": "重置", + "save": "保存" + }, + "setup": { + "coreNodesNotConfigured": "核心节点未配置", + "pleaseConfigureBeforeUse": "Regulatory 和 Consciousness 节点是系统的核心,请先完成配置后再使用工作流功能。", + "missingNodes": "未配置的节点", + "regulatoryNode": "监管节点 (Regulatory Node)", + "consciousnessNode": "意识节点 (Consciousness Node)", + "goToConfig": "前往配置" } } diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index dbef568..cb14779 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -93,6 +93,7 @@ export interface WorkflowDetail { status: string; command?: string; steps: WorkflowStep[]; + current_step?: number; context_blackboard?: Record; } diff --git a/kilostar/api/__init__.py b/kilostar/api/__init__.py index 4e24855..8bf6ca1 100644 --- a/kilostar/api/__init__.py +++ b/kilostar/api/__init__.py @@ -23,8 +23,7 @@ from ray import serve from .agent import agent_router from .auth import auth_router -from .cluster import cluster_router -from .health import health_router +from .system import system_router from .platform.frontend import client_router from .platform.onebot import onebot_router from .provider import provider_router @@ -53,7 +52,13 @@ def _get_locale(request: Request) -> str | None: app = FastAPI() -_cors_origins_env = os.environ.get("KILOSTAR_CORS_ORIGINS", "*") +_cors_origins_env = os.environ.get("KILOSTAR_CORS_ORIGINS", "") +_is_dev = os.environ.get("KILOSTAR_ENV", "production").lower() in ("dev", "development") +if not _cors_origins_env and _is_dev: + _cors_origins_env = "*" +elif not _cors_origins_env: + _cors_origins_env = "http://localhost:8000" + _cors_origins = [o.strip() for o in _cors_origins_env.split(",") if o.strip()] _allow_credentials = "*" not in _cors_origins app.add_middleware( @@ -83,13 +88,12 @@ async def request_id_middleware(request: Request, call_next): response.headers["X-Request-Id"] = request_id return response -app.include_router(health_router) # 健康检查 +app.include_router(system_router) # 健康探针 + 系统信息 app.include_router(client_router) # 客户端路径 app.include_router(onebot_router) # OneBot v11 路径 app.include_router(auth_router) # 用户路径 app.include_router(provider_router) # 供应商路径 app.include_router(resource_router) # 资源路径 -app.include_router(cluster_router) # 集群信息路径 app.include_router(agent_router) # agent路径 app.include_router(workflow_router) # workflow路径 app.include_router(chat_router) # chat路径 diff --git a/kilostar/api/agent.py b/kilostar/api/agent.py index 3a6ee0f..a01a3a3 100644 --- a/kilostar/api/agent.py +++ b/kilostar/api/agent.py @@ -116,7 +116,9 @@ async def load_agent( case _: pass except Exception as e: - raise HTTPException(status_code=500, detail=f"加载节点失败: {str(e)}") + from kilostar.utils.logger import get_logger + get_logger("agent_api").exception(f"加载节点失败: {e}") + raise HTTPException(status_code=500, detail="加载节点失败,请查看服务端日志") return {"message": "创建成功"} diff --git a/kilostar/api/auth.py b/kilostar/api/auth.py index 087f135..81cd6a7 100644 --- a/kilostar/api/auth.py +++ b/kilostar/api/auth.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from fastapi import APIRouter +from fastapi import APIRouter, Request from fastapi import Depends from pydantic import BaseModel from kilostar.utils.access import Accessor, TokenData @@ -21,6 +21,7 @@ from kilostar.utils.ray_hook import ray_actor_hook from kilostar.utils.check_user.role_check import RoleChecker from kilostar.core.postgres_database.model import UserAuthority from kilostar.utils.error import UserNotExistError +from kilostar.utils.rate_limit import register_limiter, login_limiter auth_router = APIRouter(prefix="/api/v1/auth", tags=["auth"]) @@ -33,8 +34,9 @@ class UserRegister(BaseModel): @auth_router.post("/register") -async def create_user(user_register: UserRegister): +async def create_user(user_register: UserRegister, request: Request): """注册新用户:异步线程池里做 argon2 哈希,再交由 PostgresDatabase Actor 落库。""" + register_limiter.check(request) postgres_database = ray_actor_hook("postgres_database").postgres_database hashed_password = await run_in_threadpool( Accessor.hash_password, user_register.password @@ -53,16 +55,39 @@ class UserLogin(BaseModel): @auth_router.post("/login") -async def login_user(user_login: UserLogin): +async def login_user(user_login: UserLogin, request: Request): """用户登录:查询用户后在线程池中校验口令,校验成功则签发 JWT。""" + login_limiter.check(request) postgres_database = ray_actor_hook("postgres_database").postgres_database user = await postgres_database.login_user.remote(user_login.user_name) if not user: raise UserNotExistError() - token = await run_in_threadpool( + tokens = await run_in_threadpool( Accessor.login_hashed_password, user, user_login.password ) - return {"message": "success", "token": token} + return { + "message": "success", + "token": tokens["access_token"], + "access_token": tokens["access_token"], + "refresh_token": tokens["refresh_token"], + } + + +class RefreshTokenRequest(BaseModel): + """``POST /refresh`` 入参:refresh token。""" + + refresh_token: str + + +@auth_router.post("/refresh") +async def refresh_token(body: RefreshTokenRequest): + """用 refresh token 换取新的 access token + refresh token 对。""" + tokens = Accessor.refresh_access_token(body.refresh_token) + return { + "message": "success", + "access_token": tokens["access_token"], + "refresh_token": tokens["refresh_token"], + } class ChangeAuthorityRequest(BaseModel): diff --git a/kilostar/api/cluster.py b/kilostar/api/cluster.py deleted file mode 100644 index c9cb7dc..0000000 --- a/kilostar/api/cluster.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright 2026 zhaoxi826 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from fastapi import APIRouter - -cluster_router = APIRouter(prefix="/api/v1/cluster", tags=["cluster"]) - -# Monitor websocket API temporarily removed diff --git a/kilostar/api/health.py b/kilostar/api/health.py deleted file mode 100644 index 0040419..0000000 --- a/kilostar/api/health.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2026 zhaoxi826 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""健康检查端点:用于容器存活/就绪探针。""" - -from fastapi import APIRouter -from fastapi.responses import JSONResponse - -from kilostar.utils.ray_hook import ray_actor_hook - -health_router = APIRouter(tags=["health"]) - - -@health_router.get("/health/live", include_in_schema=True) -async def liveness(): - """存活探针:进程能响应即视为存活。""" - return {"status": "alive"} - - -@health_router.get("/health/ready", include_in_schema=True) -async def readiness(): - """就绪探针:检查关键依赖(Postgres / GSM Actor)是否可达。""" - checks = {"postgres": False, "global_state_machine": False} - - try: - postgres_database = ray_actor_hook("postgres_database").postgres_database - await postgres_database.ping.remote() - checks["postgres"] = True - except Exception: - pass - - try: - gsm = ray_actor_hook("global_state_machine").global_state_machine - await gsm.get_skill_list.remote() - checks["global_state_machine"] = True - except Exception: - pass - - all_ok = all(checks.values()) - return JSONResponse( - status_code=200 if all_ok else 503, - content={"status": "ready" if all_ok else "not_ready", "checks": checks}, - ) diff --git a/kilostar/api/platform/onebot.py b/kilostar/api/platform/onebot.py index a1d43bf..82bdac7 100644 --- a/kilostar/api/platform/onebot.py +++ b/kilostar/api/platform/onebot.py @@ -49,12 +49,20 @@ onebot_router = APIRouter(prefix="/api/v1/adapter/onebot", tags=["onebot"]) def _verify_token(token_from_header: Optional[str]) -> None: """校验 OneBot 实现端在 ``Authorization`` 头里携带的 access_token。 - 若环境变量 ``ONEBOT_ACCESS_TOKEN`` 未设置则跳过校验。OneBot v11 规范要求 - 格式为 ``Bearer ``,这里同时容忍只填 token 字符串本身的写法。 + 若环境变量 ``ONEBOT_ACCESS_TOKEN`` 未设置,根据运行模式决策: + - 开发模式(KILOSTAR_ENV=dev):跳过校验并记录警告 + - 生产模式:拒绝所有请求,强制要求配置 token """ expected = os.environ.get("ONEBOT_ACCESS_TOKEN") if not expected: - return + is_dev = os.environ.get("KILOSTAR_ENV", "production").lower() in ("dev", "development") + if is_dev: + logger.warning("[OneBot] ONEBOT_ACCESS_TOKEN 未设置,开发模式下跳过认证") + return + raise HTTPException( + status_code=401, + detail="ONEBOT_ACCESS_TOKEN 未配置,拒绝未认证的 OneBot 连接", + ) if not token_from_header: raise HTTPException(status_code=401, detail="Missing access_token") raw = token_from_header.removeprefix("Bearer ").removeprefix("Token ").strip() diff --git a/kilostar/api/system.py b/kilostar/api/system.py new file mode 100644 index 0000000..35f3266 --- /dev/null +++ b/kilostar/api/system.py @@ -0,0 +1,108 @@ + # Copyright 2026 zhaoxi826 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""系统级端点:健康探针 + 集群/系统信息。 + +健康探针路径刻意保持在根(``/health/live`` / ``/health/ready``),不加 +``/api/v1`` 前缀——这是 k8s liveness/readiness probe 的惯例配置,加前缀会 +让运维侧探针 URL 变复杂。系统信息类端点则走 ``/api/v1/system`` 前缀。 +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends +from fastapi.responses import JSONResponse + +from kilostar.utils.ray_hook import ray_actor_hook +from kilostar.utils.access import Accessor, TokenData +from kilostar.utils.check_user.role_check import RoleChecker +from kilostar.core.postgres_database.model import UserAuthority +from kilostar.utils.config_loader import ( + get_workflow_config, + save_workflow_config, + WorkflowConfig, +) + +system_router = APIRouter(tags=["system"]) + + +@system_router.get("/health/live", include_in_schema=True) +async def liveness(): + """存活探针:进程能响应即视为存活。""" + return {"status": "alive"} + + +@system_router.get("/health/ready", include_in_schema=True) +async def readiness(): + """就绪探针:检查关键依赖(Postgres / GSM Actor)是否可达。""" + checks = {"postgres": False, "global_state_machine": False} + + try: + postgres_database = ray_actor_hook("postgres_database").postgres_database + await postgres_database.ping.remote() + checks["postgres"] = True + except Exception: + pass + + try: + gsm = ray_actor_hook("global_state_machine").global_state_machine + await gsm.get_skill_list.remote() + checks["global_state_machine"] = True + except Exception: + pass + + all_ok = all(checks.values()) + return JSONResponse( + status_code=200 if all_ok else 503, + content={"status": "ready" if all_ok else "not_ready", "checks": checks}, + ) + + +@system_router.get("/config/workflow") +async def get_workflow_config_endpoint( + _: TokenData = Depends(Accessor.get_current_user), +): + config = get_workflow_config() + return {"config": config.model_dump()} + + +@system_router.put("/config/workflow") +async def update_workflow_config_endpoint( + update: WorkflowConfig, + _: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)), +): + save_workflow_config(update) + return {"status": "ok", "config": update.model_dump()} + + +@system_router.get("/logs") +async def query_system_logs( + trace_id: str | None = None, + event_type: str | None = None, + level: str | None = None, + limit: int = 100, + offset: int = 0, + _: TokenData = Depends(Accessor.get_current_user), +): + from kilostar.utils.ray_hook import ray_actor_hook + + pg = await ray_actor_hook.get_actor("postgres_database") + logs = await pg.query_event_logs.remote( + trace_id=trace_id, + event_type=event_type, + level=level, + limit=limit, + offset=offset, + ) + return {"logs": logs, "count": len(logs)} diff --git a/kilostar/api/workflow.py b/kilostar/api/workflow.py index 7c096e7..e06d7fc 100644 --- a/kilostar/api/workflow.py +++ b/kilostar/api/workflow.py @@ -66,7 +66,23 @@ async def get_workflow_list( @workflow_router.get("/sse/{trace_id}") -async def get_workflow_sse(trace_id: str, request: Request): +async def get_workflow_sse( + trace_id: str, + request: Request, + token_data: TokenData = Depends(Accessor.get_current_user), +): + """SSE 事件流。 + + 鉴权走标准 ``Authorization: Bearer`` 头(前端用 fetch-based SSE, + token 不进 URL)。校验该 trace_id 属于当前用户。 + """ + postgres_database = ray_actor_hook("postgres_database").postgres_database + wf = await postgres_database.get_workflow.remote(trace_id) + if not wf: + raise HTTPException(status_code=404, detail="Workflow not found") + if getattr(wf, "user_id", None) != token_data.user_id: + raise HTTPException(status_code=403, detail="Forbidden") + global_workflow_manager = ray_actor_hook( "global_workflow_manager" ).global_workflow_manager @@ -88,7 +104,18 @@ async def get_workflow_sse(trace_id: str, request: Request): @workflow_router.post("/reply/{trace_id}") -async def post_workflow_reply(trace_id: str, request: Request): +async def post_workflow_reply( + trace_id: str, + request: Request, + token_data: TokenData = Depends(Accessor.get_current_user), +): + postgres_database = ray_actor_hook("postgres_database").postgres_database + wf = await postgres_database.get_workflow.remote(trace_id) + if not wf: + raise HTTPException(status_code=404, detail="Workflow not found") + if getattr(wf, "user_id", None) != token_data.user_id: + raise HTTPException(status_code=403, detail="Forbidden") + data = await request.json() reply_msg = data.get("message", "") global_workflow_manager = ray_actor_hook( @@ -106,10 +133,24 @@ async def get_workflow_detail( wf = await postgres_database.get_workflow.remote(trace_id) if not wf: raise HTTPException(status_code=404, detail="Workflow not found") + if getattr(wf, "user_id", None) != token_data.user_id: + raise HTTPException(status_code=403, detail="Forbidden") context = await postgres_database.get_workflow_context.remote(trace_id) - steps = context.work_link if context and hasattr(context, "work_link") else [] + work_link = ( + context.work_link if context and hasattr(context, "work_link") else [] + ) + workflow_log = ( + context.workflow_log if context and hasattr(context, "workflow_log") else [] + ) + workflow_pointer = ( + context.workflow_pointer + if context and getattr(context, "workflow_pointer", None) is not None + else 0 + ) + + steps = _merge_runtime_status(work_link, workflow_log) return { "trace_id": trace_id, @@ -117,10 +158,49 @@ async def get_workflow_detail( "status": wf.status, "command": wf.command, "steps": steps, + "current_step": workflow_pointer, "context_blackboard": context.blackboard if context else {}, } +def _merge_runtime_status(work_link: list, workflow_log: list) -> list: + """把运行期状态从 ``workflow_log`` 反推并 merge 到每个静态 step 上。 + + ``work_link`` 是 step 的**静态定义**(名字 / node 类型 / action),不含运行期 + 状态;运行期状态散落在 ``workflow_log`` 里——其结构为:: + + [{"": [timestamp, status, message]}, ...] + + 同一 step 可能出现多条(working → completed),取**最后一条**的 status 作为 + 该 step 当前状态。没有日志记录的 step 视为 ``pending``。 + + 前端 ``WorkflowDiagram`` 依赖每个 step 的 ``status`` 字段着色,这个拼装让 + 后端真正把运行期状态喂过去。 + """ + # step_index -> 最新 status + latest_status: dict[int, str] = {} + for entry in workflow_log or []: + if not isinstance(entry, dict): + continue + for key, payload in entry.items(): + try: + idx = int(key) + except (ValueError, TypeError): + continue + if isinstance(payload, (list, tuple)) and len(payload) >= 2: + latest_status[idx] = payload[1] + + merged = [] + for i, step in enumerate(work_link or []): + step_copy = dict(step) if isinstance(step, dict) else {} + # step 自带的 step 字段优先,否则用位置索引 + step_idx = step_copy.get("step") + lookup_idx = (step_idx - 1) if isinstance(step_idx, int) else i + step_copy["status"] = latest_status.get(lookup_idx, "pending") + merged.append(step_copy) + return merged + + @workflow_router.post("/{trace_id}/resume") async def resume_workflow( trace_id: str, @@ -151,9 +231,9 @@ async def resume_workflow( from kilostar.core.work.workflow.workflow_engine import run_workflow_task - # workflow_data 在 resume 路径上不会被使用(hydrate 会走 resume 分支), - # 这里给个空 dict 占位即可 - run_workflow_task.remote({}, trace_id) + # resume_only=True:task 入口 hydrate 失败会 fail-fast,绝不 fall through + # 到"全新模式空跑"。workflow_data 在 resume 路径上不会被使用,传空 dict 占位。 + run_workflow_task.remote({}, trace_id, resume_only=True) return {"trace_id": trace_id, "status": "resuming"} diff --git a/kilostar/core/global_state_machine/global_state_machine.py b/kilostar/core/global_state_machine/global_state_machine.py index 863d41d..5c5df8e 100644 --- a/kilostar/core/global_state_machine/global_state_machine.py +++ b/kilostar/core/global_state_machine/global_state_machine.py @@ -203,6 +203,10 @@ class GlobalStateMachine: """返回某个 scope 下的"系统 + 自定义工具组"toolset 列表(不含 MCP)。""" return self._global_tool_manager.get_toolsets_for_scope(scope) + def get_retrieval_toolsets_for_scope(self, scope: str) -> List[Any]: + """仅返回 retrieval 工具集(system_node 专用,不包含 generation 工具)。""" + return self._global_tool_manager.get_retrieval_toolsets_for_scope(scope) + # ─── MCP Server Registry ─────────────────────────────────── async def add_mcp_server(self, server_id: str, config: Dict[str, Any]) -> bool: diff --git a/kilostar/core/global_state_machine/tool_manager.py b/kilostar/core/global_state_machine/tool_manager.py index 497ec8a..e8d0490 100644 --- a/kilostar/core/global_state_machine/tool_manager.py +++ b/kilostar/core/global_state_machine/tool_manager.py @@ -34,7 +34,9 @@ class GlobalToolManager: def __init__(self) -> None: self.tool_metadata = {} self._tool_funcs = defaultdict(dict) + self._retrieval_tool_funcs = defaultdict(dict) self._system_toolsets = {} + self._retrieval_toolsets = {} self._custom_toolsets = {} self._third_party_funcs = {} self.tool_mapper = defaultdict(dict) @@ -75,11 +77,14 @@ class GlobalToolManager: is_system = bool(tool_data_cls.model_fields.get("is_system").default) category_field = tool_data_cls.model_fields.get("category") category = (category_field.default if category_field else "other") or "other" + toolset_field = tool_data_cls.model_fields.get("toolset") + toolset_name = (toolset_field.default if toolset_field else "other") or "other" self.tool_metadata[plugin_name] = { "name": plugin_name, "is_system": is_system, "category": category, + "toolset": toolset_name, "action_scope": list(action_scopes), } @@ -92,12 +97,15 @@ class GlobalToolManager: for scope in scopes: self._tool_funcs[scope][plugin_name] = tool_func self.tool_mapper[scope][plugin_name] = tool_data_cls + if toolset_name == "retrieval": + self._retrieval_tool_funcs[scope][plugin_name] = tool_func else: self._third_party_funcs[plugin_name] = tool_func for scope in scopes: self.tool_mapper[scope][plugin_name] = tool_data_cls self._build_system_toolsets() + self._build_retrieval_toolsets() def _build_system_toolsets(self) -> None: FunctionToolset = self._import_function_toolset() @@ -114,6 +122,21 @@ class GlobalToolManager: except Exception as e: logger.error(f"Failed to build system toolset {scope}: {e}") + def _build_retrieval_toolsets(self) -> None: + FunctionToolset = self._import_function_toolset() + if FunctionToolset is None: + return + for scope, name_to_func in self._retrieval_tool_funcs.items(): + if not name_to_func: + continue + try: + self._retrieval_toolsets[scope] = FunctionToolset( + tools=list(name_to_func.values()), + id=f"retrieval::{scope}", + ) + except Exception as e: + logger.error(f"Failed to build retrieval toolset {scope}: {e}") + def rebuild_custom_toolsets(self, custom_defs: Dict[str, Dict[str, Any]]) -> None: """根据 DB 中的自定义工具组定义重建 custom FunctionToolset。""" FunctionToolset = self._import_function_toolset() @@ -170,6 +193,15 @@ class GlobalToolManager: result.extend(self._custom_toolsets.values()) return result + def get_retrieval_toolsets_for_scope(self, scope: str) -> List[Any]: + """仅返回 retrieval 工具集(system_node 专用)。""" + result: List[Any] = [] + for s in ("default", scope): + ts = self._retrieval_toolsets.get(s) + if ts is not None: + result.append(ts) + return result + # ─── Metadata accessors ─── def is_third_party_tool(self, tool_name: str) -> bool: diff --git a/kilostar/core/postgres_database/model/__init__.py b/kilostar/core/postgres_database/model/__init__.py index 94f2064..4882414 100644 --- a/kilostar/core/postgres_database/model/__init__.py +++ b/kilostar/core/postgres_database/model/__init__.py @@ -33,6 +33,7 @@ from kilostar.core.postgres_database.model.system_node import SystemNodeConfigMo from kilostar.core.postgres_database.model.mcp_server import MCPServerModel from kilostar.core.postgres_database.model.tool_config import ToolConfigModel from kilostar.core.postgres_database.model.custom_toolset import CustomToolsetModel +from kilostar.core.postgres_database.model.system_event_log import SystemEventLog # 兼容旧代码的别名 Provider = ProviderModel @@ -61,5 +62,6 @@ __all__ = [ "MCPServerModel", "ToolConfigModel", "CustomToolsetModel", + "SystemEventLog", "AgentType", ] diff --git a/kilostar/core/postgres_database/model/system_event_log.py b/kilostar/core/postgres_database/model/system_event_log.py new file mode 100644 index 0000000..3994f0b --- /dev/null +++ b/kilostar/core/postgres_database/model/system_event_log.py @@ -0,0 +1,35 @@ +from sqlalchemy import String, DateTime, Integer, func, Text +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.dialects.postgresql import JSONB +from .base import BaseDataModel + + +class SystemEventLog(BaseDataModel): + __tablename__ = "system_event_log" + + id: Mapped[int] = mapped_column( + Integer, primary_key=True, autoincrement=True + ) + trace_id: Mapped[str] = mapped_column( + String(64), index=True, comment="关联的工作流 trace_id" + ) + event_type: Mapped[str] = mapped_column( + String(50), index=True, + comment="事件类型: workflow_start/step_enter/step_complete/step_error/workflow_complete/workflow_fail/system" + ) + level: Mapped[str] = mapped_column( + String(10), index=True, default="info", + comment="日志级别: info/warn/error" + ) + node_name: Mapped[str | None] = mapped_column( + String(100), nullable=True, comment="相关节点名称" + ) + message: Mapped[str] = mapped_column( + Text, comment="日志消息正文" + ) + extra_data: Mapped[dict | None] = mapped_column( + JSONB, nullable=True, comment="附加元数据(step_index/output 等)" + ) + created_at: Mapped[str] = mapped_column( + DateTime(timezone=True), server_default=func.now(), index=True + ) diff --git a/kilostar/core/postgres_database/module/system_event_log.py b/kilostar/core/postgres_database/module/system_event_log.py new file mode 100644 index 0000000..da42f63 --- /dev/null +++ b/kilostar/core/postgres_database/module/system_event_log.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from typing import List, Optional +from sqlalchemy import select, desc +from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession + +from kilostar.core.postgres_database.model.system_event_log import SystemEventLog +from kilostar.core.postgres_database.database_exception import database_exception + + +class SystemEventLogDatabase: + def __init__(self, async_session_maker: async_sessionmaker[AsyncSession]): + self.async_session_maker = async_session_maker + + @database_exception + async def insert_event( + self, + trace_id: str, + event_type: str, + level: str, + message: str, + node_name: Optional[str] = None, + metadata: Optional[dict] = None, + ) -> None: + async with self.async_session_maker() as session: + log = SystemEventLog( + trace_id=trace_id, + event_type=event_type, + level=level, + message=message, + node_name=node_name, + extra_data=metadata, + ) + session.add(log) + await session.commit() + + @database_exception + async def query_events( + self, + trace_id: Optional[str] = None, + event_type: Optional[str] = None, + level: Optional[str] = None, + limit: int = 100, + offset: int = 0, + ) -> List[dict]: + async with self.async_session_maker() as session: + stmt = select(SystemEventLog).order_by(desc(SystemEventLog.created_at)) + + if trace_id: + stmt = stmt.where(SystemEventLog.trace_id == trace_id) + if event_type: + stmt = stmt.where(SystemEventLog.event_type == event_type) + if level: + stmt = stmt.where(SystemEventLog.level == level) + + stmt = stmt.offset(offset).limit(limit) + result = await session.execute(stmt) + rows = result.scalars().all() + + return [ + { + "id": r.id, + "trace_id": r.trace_id, + "event_type": r.event_type, + "level": r.level, + "node_name": r.node_name, + "message": r.message, + "metadata": r.extra_data, + "created_at": str(r.created_at) if r.created_at else None, + } + for r in rows + ] diff --git a/kilostar/core/postgres_database/postgres.py b/kilostar/core/postgres_database/postgres.py index 43424b5..efdcab4 100644 --- a/kilostar/core/postgres_database/postgres.py +++ b/kilostar/core/postgres_database/postgres.py @@ -41,6 +41,7 @@ from kilostar.core.postgres_database.model.system_node import SystemNodeConfigMo from kilostar.core.postgres_database.model.mcp_server import MCPServerModel from kilostar.core.postgres_database.model.tool_config import ToolConfigModel from kilostar.core.postgres_database.model.custom_toolset import CustomToolsetModel +from kilostar.core.postgres_database.model.system_event_log import SystemEventLog from .module.individual import IndividualDatabase from .module.user import AuthDatabase @@ -51,6 +52,7 @@ from .module.chat_history import ChatHistoryDatabase from .module.mcp_server import MCPServerDatabase from .module.tool_config import ToolConfigDatabase from .module.custom_toolset import CustomToolsetDatabase +from .module.system_event_log import SystemEventLogDatabase @ray.remote @@ -85,6 +87,7 @@ class PostgresDatabase: self._mcp_server_database = MCPServerDatabase(self.async_session_maker) self._tool_config_database = ToolConfigDatabase(self.async_session_maker) self._custom_toolset_database = CustomToolsetDatabase(self.async_session_maker) + self._system_event_log_database = SystemEventLogDatabase(self.async_session_maker) self.ready_event = asyncio.Event() @@ -94,11 +97,10 @@ class PostgresDatabase: async with self.async_engine.begin() as conn: await conn.run_sync(BaseDataModel.metadata.create_all) print("✅ 数据库表创建/验证完成") + self.ready_event.set() except Exception as e: print(f"❌ 数据库初始化失败: {e}") raise - finally: - self.ready_event.set() async def ping(self) -> bool: """轻量探活:等待 ready 后执行 ``SELECT 1``。""" @@ -376,3 +378,35 @@ class PostgresDatabase: """删除一个自定义工具组。""" await self.ready_event.wait() return await self._custom_toolset_database.delete(toolset_id) + + # System Event Log Methods + async def insert_event_log( + self, + trace_id: str, + event_type: str, + level: str, + message: str, + node_name=None, + metadata=None, + ): + await self.ready_event.wait() + return await self._system_event_log_database.insert_event( + trace_id=trace_id, + event_type=event_type, + level=level, + message=message, + node_name=node_name, + metadata=metadata, + ) + + async def query_event_logs( + self, trace_id=None, event_type=None, level=None, limit=100, offset=0 + ): + await self.ready_event.wait() + return await self._system_event_log_database.query_events( + trace_id=trace_id, + event_type=event_type, + level=level, + limit=limit, + offset=offset, + ) diff --git a/kilostar/core/work/workflow/workflow_engine.py b/kilostar/core/work/workflow/workflow_engine.py index fd98c4e..ba443ca 100644 --- a/kilostar/core/work/workflow/workflow_engine.py +++ b/kilostar/core/work/workflow/workflow_engine.py @@ -61,6 +61,7 @@ class WorkflowGraphState(BaseModel): # 已发过 put_pending 的 HumanApproval step index 列表;resume 后避免重复推送。 # 用 list(不是 set)是为了 pydantic_graph 序列化 history 时 JSON 友好。 approvals_notified: List[int] = Field(default_factory=list) + jump_counts: Dict[str, int] = Field(default_factory=dict) # 业务侧执行入口:把 step + state 喂进去,拿到 (output_text, success_bool) @@ -277,8 +278,13 @@ async def _execute_step( ) try: - output_text, success = await executor(step_data, state) - except Exception as e: # 执行器抛异常 → 走失败分支 + step_timeout = step_data.get("timeout", 300) + output_text, success = await asyncio.wait_for( + executor(step_data, state), timeout=step_timeout + ) + except asyncio.TimeoutError: + output_text, success = f"步骤执行超时({step_data.get('timeout', 300)}s)", False + except Exception as e: output_text, success = str(e), False if success: @@ -311,6 +317,25 @@ async def _execute_step( logic_gate = step_data.get("logic_gate") or {} fail_target = logic_gate.get("if_fail") if fail_target and "jump_to_step_" in fail_target: + from kilostar.utils.config_loader import get_workflow_config + + max_attempts = get_workflow_config().retry.max_attempts + jump_key = f"{state.current_step_index}->{fail_target}" + state.jump_counts[jump_key] = state.jump_counts.get(jump_key, 0) + 1 + + if state.jump_counts[jump_key] > max_attempts: + state.logs.append( + { + str(state.current_step_index): [ + str(datetime.datetime.now()), + "failed", + f"环重试次数超过上限 ({max_attempts}),终止工作流", + ] + } + ) + await _persist_context(ctx, status=WorkflowStatus.FAILED.value) + return Finalize(status=WorkflowStatus.FAILED.value) + target_step = int(fail_target.split("_")[-1]) - 1 state.current_step_index = target_step await _persist_context(ctx, status=WorkflowStatus.RUNNING.value) @@ -495,14 +520,21 @@ async def resume_workflow_graph( @ray.remote -def run_workflow_task(workflow_data: dict, trace_id: str): +def run_workflow_task( + workflow_data: dict, trace_id: str, resume_only: bool = False +): """workflow 的 ray task 入口:一次性执行,跑完即销毁。 生产路径下持久化交给 ``PostgresStatePersistence`` —— 即便进程崩溃,再 fire 一次相同 ``trace_id`` 的任务(或调 ``/workflow/{trace_id}/resume``)即可 - 续跑。同时为了支持 fresh start,先尝试 ``hydrate``: + 续跑。入口先尝试 ``hydrate``: - hydrate 拿到内容 → 走 resume 路径 - - hydrate 没拿到 → 走全新路径 + - hydrate 没拿到 → 走全新路径(用传入的 ``workflow_data``) + + ``resume_only``:由 ``/resume`` API 显式置 True。此模式下 hydrate 失败 + (抛异常或没有持久化记录)必须 fail-fast,而不能 fall through 到全新路径—— + 否则会拿着空 ``workflow_data`` 空跑一个 ``work_link=[]`` 的 workflow 并误判 + 为 COMPLETED(静默 bug)。 ray task 是新进程,contextvars 不会从 caller 传过来,所以入口先 bind 一次 ``trace_id``,让节点内的日志自动带上它。 @@ -511,6 +543,9 @@ def run_workflow_task(workflow_data: dict, trace_id: str): from kilostar.core.work.workflow.graph_persistence import ( build_postgres_persistence, ) + from kilostar.utils.logger import get_logger + + _logger = get_logger("workflow_task") async def _entry() -> None: with trace_id_scope(trace_id): @@ -519,9 +554,20 @@ def run_workflow_task(workflow_data: dict, trace_id: str): recovered = False try: recovered = await persistence.hydrate() - except Exception: # pragma: no cover - 防御 + except Exception as e: + if resume_only: + _logger.error(f"resume 失败:无法 hydrate 图持久化记录: {e}") + raise recovered = False + if resume_only and not recovered: + msg = ( + f"resume 失败:trace {trace_id} 没有可恢复的图持久化记录," + "拒绝以全新模式空跑" + ) + _logger.error(msg) + raise RuntimeError(msg) + if recovered: await resume_workflow_graph(trace_id, persistence=persistence) else: diff --git a/kilostar/plugin/tool_plugin/approval/approval.py b/kilostar/plugin/tool_plugin/approval/approval.py index 972a374..e3b1b3b 100644 --- a/kilostar/plugin/tool_plugin/approval/approval.py +++ b/kilostar/plugin/tool_plugin/approval/approval.py @@ -18,7 +18,7 @@ from typing import List, Literal, Dict class ApprovalToolData(BaseToolData): - """``approval`` 工具的元数据:默认面向 control/consciousness 两类节点开放。""" + """``approval`` 工具的元数据:分配给所有系统节点和 skill_individual。""" is_system: bool = True action_scope: List[ @@ -29,7 +29,7 @@ class ApprovalToolData(BaseToolData): "growth_node", "", ] - ] = ["control_node", "consciousness_node"] + ] = [] config_args: Dict[str, str] = {} category: str = "system" diff --git a/kilostar/plugin/tool_plugin/base_tool.py b/kilostar/plugin/tool_plugin/base_tool.py index 4e35c70..3107ede 100644 --- a/kilostar/plugin/tool_plugin/base_tool.py +++ b/kilostar/plugin/tool_plugin/base_tool.py @@ -34,3 +34,5 @@ class BaseToolData(BaseModel): config_args: Dict[str, str] = {} category: str = "other" """工具分类:system(系统内置)、search(搜索)、mcp(MCP 服务器)、other(其他)""" + toolset: str = "other" + """工具集:retrieval(检索)、generation(生成)、other(其他)。system_node 只能用 retrieval 集。""" diff --git a/kilostar/plugin/tool_plugin/edit_file/__init__.py b/kilostar/plugin/tool_plugin/edit_file/__init__.py new file mode 100644 index 0000000..113bd4f --- /dev/null +++ b/kilostar/plugin/tool_plugin/edit_file/__init__.py @@ -0,0 +1,54 @@ +import os +from typing import List, Literal, Dict + +from kilostar.plugin.tool_plugin.base_tool import BaseToolData + + +class EditFileToolData(BaseToolData): + is_system: bool = True + action_scope: List[ + Literal[ + "control_node", + "consciousness_node", + "regulatory_node", + "growth_node", + "", + ] + ] = [] + config_args: Dict[str, str] = {} + category: str = "system" + + +async def edit_file( + file_path: str, + old_content: str, + new_content: str, +) -> str: + """通过查找替换的方式编辑文件内容。 + + Args: + file_path: 文件的路径 + old_content: 要被替换的原始内容片段 + new_content: 替换后的新内容 + + Returns: + 操作结果描述 + """ + try: + if not os.path.exists(file_path): + return f"[Error] 文件不存在: {file_path}" + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + if old_content not in content: + return f"[Error] 未在文件中找到要替换的内容片段" + + new_file_content = content.replace(old_content, new_content, 1) + + with open(file_path, "w", encoding="utf-8") as f: + f.write(new_file_content) + + return f"已成功编辑文件: {file_path}" + except Exception as e: + return f"[Error] 编辑文件失败: {e}" diff --git a/kilostar/plugin/tool_plugin/file_reader/file_reader.py b/kilostar/plugin/tool_plugin/file_reader/file_reader.py index f0de080..8668297 100644 --- a/kilostar/plugin/tool_plugin/file_reader/file_reader.py +++ b/kilostar/plugin/tool_plugin/file_reader/file_reader.py @@ -33,7 +33,7 @@ class FileReaderToolData(BaseToolData): "growth_node", "", ] - ] = ["control_node"] + ] = [] config_args: Dict[str, str] = {} category: str = "system" diff --git a/kilostar/plugin/tool_plugin/python_executor/__init__.py b/kilostar/plugin/tool_plugin/python_executor/__init__.py new file mode 100644 index 0000000..07f2386 --- /dev/null +++ b/kilostar/plugin/tool_plugin/python_executor/__init__.py @@ -0,0 +1,67 @@ +import asyncio +import sys +import tempfile +import os +from typing import List, Literal, Dict + +from kilostar.plugin.tool_plugin.base_tool import BaseToolData + + +class PythonExecutorToolData(BaseToolData): + is_system: bool = True + action_scope: List[ + Literal[ + "control_node", + "consciousness_node", + "regulatory_node", + "growth_node", + "", + ] + ] = [] + config_args: Dict[str, str] = {} + category: str = "system" + + +async def python_executor(code: str, timeout: int = 30) -> str: + """执行 Python 代码片段并返回输出。 + + Args: + code: 要执行的 Python 代码 + timeout: 超时秒数,默认 30 秒 + + Returns: + 代码的标准输出 + 标准错误 + """ + tmp_file = None + try: + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False, encoding="utf-8" + ) as f: + f.write(code) + tmp_file = f.name + + proc = await asyncio.create_subprocess_exec( + sys.executable, tmp_file, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + output = stdout.decode("utf-8", errors="replace") + err_output = stderr.decode("utf-8", errors="replace") + result = "" + if output: + result += output + if err_output: + result += f"\n[stderr]\n{err_output}" + if proc.returncode != 0: + result += f"\n[exit code: {proc.returncode}]" + return result.strip() or "(no output)" + except asyncio.TimeoutError: + return f"[Error] Python 代码执行超时({timeout}s)" + except Exception as e: + return f"[Error] 执行失败: {e}" + finally: + if tmp_file and os.path.exists(tmp_file): + os.unlink(tmp_file) diff --git a/kilostar/plugin/tool_plugin/search_file/__init__.py b/kilostar/plugin/tool_plugin/search_file/__init__.py new file mode 100644 index 0000000..83c79e6 --- /dev/null +++ b/kilostar/plugin/tool_plugin/search_file/__init__.py @@ -0,0 +1,58 @@ +import asyncio +from typing import List, Literal, Dict + +from kilostar.plugin.tool_plugin.base_tool import BaseToolData + + +class SearchFileToolData(BaseToolData): + is_system: bool = True + action_scope: List[ + Literal[ + "control_node", + "consciousness_node", + "regulatory_node", + "growth_node", + "", + ] + ] = [] + config_args: Dict[str, str] = {} + category: str = "system" + + +async def search_file( + keyword: str, + directory: str = ".", + file_pattern: str = "*", + max_results: int = 20, +) -> str: + """在指定目录下递归搜索包含关键字的文件内容。 + + Args: + keyword: 要搜索的关键字或正则表达式 + directory: 搜索的根目录,默认当前目录 + file_pattern: 文件名匹配模式,如 "*.py" + max_results: 最大返回结果数 + + Returns: + 匹配的文件名和行内容 + """ + try: + cmd = ( + f"grep -rn --include='{file_pattern}' " + f"-m {max_results} '{keyword}' '{directory}' 2>/dev/null " + f"| head -n {max_results}" + ) + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30) + output = stdout.decode("utf-8", errors="replace").strip() + if not output: + return f"未找到包含 '{keyword}' 的匹配项" + return output + except asyncio.TimeoutError: + return "[Error] 搜索超时" + except Exception as e: + return f"[Error] 搜索失败: {e}" diff --git a/kilostar/plugin/tool_plugin/shell_executor/__init__.py b/kilostar/plugin/tool_plugin/shell_executor/__init__.py new file mode 100644 index 0000000..00cd5ed --- /dev/null +++ b/kilostar/plugin/tool_plugin/shell_executor/__init__.py @@ -0,0 +1,54 @@ +import asyncio +from typing import List, Literal, Dict + +from kilostar.plugin.tool_plugin.base_tool import BaseToolData + + +class ShellExecutorToolData(BaseToolData): + is_system: bool = True + action_scope: List[ + Literal[ + "control_node", + "consciousness_node", + "regulatory_node", + "growth_node", + "", + ] + ] = [] + config_args: Dict[str, str] = {} + category: str = "system" + + +async def shell_executor(command: str, timeout: int = 30) -> str: + """在服务器上执行 shell 命令并返回输出。 + + Args: + command: 要执行的 shell 命令 + timeout: 超时秒数,默认 30 秒 + + Returns: + 命令的 stdout + stderr 输出 + """ + try: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + output = stdout.decode("utf-8", errors="replace") + err_output = stderr.decode("utf-8", errors="replace") + result = "" + if output: + result += output + if err_output: + result += f"\n[stderr]\n{err_output}" + if proc.returncode != 0: + result += f"\n[exit code: {proc.returncode}]" + return result.strip() or "(no output)" + except asyncio.TimeoutError: + return f"[Error] 命令执行超时({timeout}s)" + except Exception as e: + return f"[Error] 执行失败: {e}" diff --git a/kilostar/plugin/tool_plugin/write_file/__init__.py b/kilostar/plugin/tool_plugin/write_file/__init__.py new file mode 100644 index 0000000..4ddc8f0 --- /dev/null +++ b/kilostar/plugin/tool_plugin/write_file/__init__.py @@ -0,0 +1,42 @@ +import os +from typing import List, Literal, Dict + +from kilostar.plugin.tool_plugin.base_tool import BaseToolData + + +class WriteFileToolData(BaseToolData): + is_system: bool = True + action_scope: List[ + Literal[ + "control_node", + "consciousness_node", + "regulatory_node", + "growth_node", + "", + ] + ] = [] + config_args: Dict[str, str] = {} + category: str = "system" + + +async def write_file(file_path: str, content: str) -> str: + """将内容写入指定文件(会覆盖已有内容,自动创建目录)。 + + Args: + file_path: 文件的路径 + content: 要写入的内容 + + Returns: + 操作结果描述 + """ + try: + dir_path = os.path.dirname(file_path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + + return f"已成功写入文件: {file_path}({len(content)} 字符)" + except Exception as e: + return f"[Error] 写入文件失败: {e}" diff --git a/kilostar/utils/access.py b/kilostar/utils/access.py index 6558cf1..5087f6f 100644 --- a/kilostar/utils/access.py +++ b/kilostar/utils/access.py @@ -28,7 +28,8 @@ if TYPE_CHECKING: ALGORITHM = "HS256" -ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 +ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 2 +REFRESH_TOKEN_EXPIRE_DAYS = 7 _INSECURE_SECRETS = {"secret", "114514", "changethiskey12345"} @@ -84,9 +85,51 @@ class Accessor: expire = datetime.now(timezone.utc) + timedelta( minutes=ACCESS_TOKEN_EXPIRE_MINUTES ) - to_encode.update({"exp": int(expire.timestamp())}) + to_encode.update({"exp": int(expire.timestamp()), "type": "access"}) return jwt.encode(to_encode, _get_secret_key(), algorithm=ALGORITHM) + @staticmethod + def _create_refresh_token(data: dict) -> str: + """生成长效 refresh token(默认 7 天有效期)。""" + to_encode = data.copy() + expire = datetime.now(timezone.utc) + timedelta( + days=REFRESH_TOKEN_EXPIRE_DAYS + ) + to_encode.update({"exp": int(expire.timestamp()), "type": "refresh"}) + return jwt.encode(to_encode, _get_secret_key(), algorithm=ALGORITHM) + + @staticmethod + def verify_refresh_token(token: str) -> TokenData: + """校验 refresh token 有效性并返回用户身份;过期或类型不对抛 401。""" + try: + payload = jwt.decode(token, _get_secret_key(), algorithms=[ALGORITHM]) + if payload.get("type") != "refresh": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="无效的 refresh token", + ) + return TokenData(**{k: v for k, v in payload.items() if k != "type"}) + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Refresh token 已过期,请重新登录", + ) + except (jwt.InvalidTokenError, ValidationError): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="无效的 refresh token", + ) + + @staticmethod + def refresh_access_token(refresh_token: str) -> dict: + """用 refresh token 换取新的 access token + refresh token 对。""" + token_data = Accessor.verify_refresh_token(refresh_token) + payload = {"user_id": token_data.user_id, "username": token_data.username} + return { + "access_token": Accessor._create_access_token(payload), + "refresh_token": Accessor._create_refresh_token(payload), + } + @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """校验明文口令是否匹配数据库中存储的哈希。""" @@ -105,8 +148,8 @@ class Accessor: return Accessor._decode_token(token) @staticmethod - def login_hashed_password(user: "User", password: str) -> str: - """完成登录核验:找不到用户或密码错误抛 401,否则签发新令牌。""" + def login_hashed_password(user: "User", password: str) -> dict: + """完成登录核验:找不到用户或密码错误抛 401,否则签发 access + refresh 令牌对。""" if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -118,13 +161,21 @@ class Accessor: detail="用户名或密码错误", ) token_payload = {"user_id": str(user.user_id), "username": user.user_name} - return Accessor._create_access_token(data=token_payload) + return { + "access_token": Accessor._create_access_token(data=token_payload), + "refresh_token": Accessor._create_refresh_token(data=token_payload), + } @staticmethod def hash_password(password: str) -> str: - """对明文口令做强哈希;空值或长度不足 6 位会抛 ValueError。""" + """对明文口令做强哈希;空值或不满足复杂度要求会抛 ValueError。""" if not password: raise ValueError("密码不能为空") - if len(password) < 6: - raise ValueError("密码长度不能小于 6 位") + if len(password) < 8: + raise ValueError("密码长度不能小于 8 位") + has_upper = any(c.isupper() for c in password) + has_lower = any(c.islower() for c in password) + has_digit = any(c.isdigit() for c in password) + if not (has_upper and has_lower and has_digit): + raise ValueError("密码必须包含大写字母、小写字母和数字") return password_hasher.hash(password) diff --git a/kilostar/utils/config_loader.py b/kilostar/utils/config_loader.py new file mode 100644 index 0000000..fbed5ee --- /dev/null +++ b/kilostar/utils/config_loader.py @@ -0,0 +1,60 @@ +"""Workflow 配置文件管理:读取、缓存、热重载。 + +配置文件路径:``config/workflow.yaml``(相对于项目根目录)。 +采用模块级单例 + 文件修改时间检测,保证: +- 首次调用时懒加载 +- reload_workflow_config() 显式触发重载 +- 工作流引擎调 get_workflow_config() 始终拿到最新生效值 +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +import yaml +from pydantic import BaseModel, Field + +_CONFIG_DIR = Path(__file__).resolve().parent.parent.parent / "config" +_WORKFLOW_YAML = _CONFIG_DIR / "workflow.yaml" + + +class RetryConfig(BaseModel): + max_attempts: int = Field(default=5, ge=1, le=100) + + +class WorkflowConfig(BaseModel): + retry: RetryConfig = Field(default_factory=RetryConfig) + + +_current: WorkflowConfig | None = None + + +def _load_from_disk() -> WorkflowConfig: + if not _WORKFLOW_YAML.exists(): + return WorkflowConfig() + with open(_WORKFLOW_YAML, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return WorkflowConfig.model_validate(data) + + +def get_workflow_config() -> WorkflowConfig: + global _current + if _current is None: + _current = _load_from_disk() + return _current + + +def reload_workflow_config() -> WorkflowConfig: + global _current + _current = _load_from_disk() + return _current + + +def save_workflow_config(config: WorkflowConfig) -> None: + _WORKFLOW_YAML.parent.mkdir(parents=True, exist_ok=True) + data = config.model_dump() + with open(_WORKFLOW_YAML, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, allow_unicode=True) + reload_workflow_config() diff --git a/kilostar/utils/mcp_helper.py b/kilostar/utils/mcp_helper.py index d4fb446..69274a6 100644 --- a/kilostar/utils/mcp_helper.py +++ b/kilostar/utils/mcp_helper.py @@ -125,6 +125,19 @@ async def get_all_toolsets_for_scope(scope: str) -> List[Any]: return toolsets +async def get_retrieval_toolsets_for_scope(scope: str) -> List[Any]: + """仅返回 retrieval 工具集(system_node 专用)。不含 generation 和 MCP 工具。""" + toolsets: List[Any] = [] + try: + gsm = ray_actor_hook("global_state_machine").global_state_machine + retrieval = await gsm.get_retrieval_toolsets_for_scope.remote(scope) + if retrieval: + toolsets.extend(retrieval) + except Exception as e: + logger.error(f"Failed to load retrieval toolsets ({scope}): {e}") + return toolsets + + async def list_mcp_tools_for_configs( configs: Dict[str, Dict[str, Any]], ) -> List[Dict[str, Any]]: diff --git a/kilostar/utils/rate_limit.py b/kilostar/utils/rate_limit.py new file mode 100644 index 0000000..0318b78 --- /dev/null +++ b/kilostar/utils/rate_limit.py @@ -0,0 +1,45 @@ +import time +from collections import defaultdict +from typing import Dict, Tuple + +from fastapi import HTTPException, Request + + +class InMemoryRateLimiter: + """基于滑动窗口的内存限流器。 + + 按 IP 地址追踪请求次数,超出阈值时抛出 429。 + 适用于单实例部署;集群部署应替换为 Redis 后端。 + """ + + def __init__(self, max_requests: int = 5, window_seconds: int = 60): + self._max_requests = max_requests + self._window_seconds = window_seconds + self._requests: Dict[str, list[float]] = defaultdict(list) + + def _get_client_ip(self, request: Request) -> str: + forwarded = request.headers.get("X-Forwarded-For") + if forwarded: + return forwarded.split(",")[0].strip() + return request.client.host if request.client else "unknown" + + def _cleanup(self, key: str, now: float) -> None: + cutoff = now - self._window_seconds + self._requests[key] = [ + t for t in self._requests[key] if t > cutoff + ] + + def check(self, request: Request) -> None: + now = time.time() + key = self._get_client_ip(request) + self._cleanup(key, now) + if len(self._requests[key]) >= self._max_requests: + raise HTTPException( + status_code=429, + detail="请求过于频繁,请稍后再试", + ) + self._requests[key].append(now) + + +register_limiter = InMemoryRateLimiter(max_requests=5, window_seconds=60) +login_limiter = InMemoryRateLimiter(max_requests=10, window_seconds=60) diff --git a/kilostar/utils/ray_hook.py b/kilostar/utils/ray_hook.py index 916c7be..19d9f74 100644 --- a/kilostar/utils/ray_hook.py +++ b/kilostar/utils/ray_hook.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import time import ray from functools import lru_cache @@ -47,14 +48,57 @@ def clear_actor_cache(): _get_cached_actor_handle.cache_clear() -def ray_actor_hook(*actor_names: str): +def wait_for_actor( + actor_name: str, *, timeout: float = 10.0, interval: float = 0.5 +): + """阻塞等待某个 actor 就绪,返回其句柄。 + + 用于"启动期 / ray task 入口刚拉起"这类场景——被依赖的 actor 可能还没注册。 + 在 ``timeout`` 内按 ``interval`` 轮询 ``ray.get_actor``;拿到就立即返回, + 超时则抛带清晰上下文的 ``TimeoutError``(而不是裸 ``ValueError``)。 + + Args: + actor_name: actor 注册名 + timeout: 最长等待秒数;``<=0`` 表示只试一次(等价于直接取句柄) + interval: 轮询间隔秒数 + + Raises: + TimeoutError: 超时仍未就绪。原始异常通过 ``raise ... from`` 链保留。 + """ + deadline = time.monotonic() + max(timeout, 0.0) + last_err: Exception | None = None + while True: + try: + return _get_cached_actor_handle(actor_name) + except Exception as e: # ray.get_actor 失败一般是 ValueError + last_err = e + # 失败不能让 lru_cache 留下脏数据(异常本身不会被缓存, + # 但若底层换实现,这里清一次更稳妥) + if time.monotonic() >= deadline: + raise TimeoutError( + f"等待 actor {actor_name!r} 就绪超时({timeout}s):{last_err}" + ) from last_err + time.sleep(interval) + + +def ray_actor_hook(*actor_names: str, timeout: float = 0.0, interval: float = 0.5): """按名字批量取出 Ray Actor 句柄,组装成一个 ``ActorList`` 返回。 例:``actors = ray_actor_hook("postgres_database", "global_state_machine")``, 随后即可用 ``actors.postgres_database`` 拿到对应句柄。 + + Args: + timeout: ``>0`` 时对每个 actor 走 ``wait_for_actor`` 等待就绪(启动期用); + 缺省 ``0`` 保持原"快速失败"语义——actor 不在立即抛异常。 + interval: 等待轮询间隔,仅在 ``timeout>0`` 时生效。 """ actor_list = ActorList() for actor_name in actor_names: - handle = _get_cached_actor_handle(actor_name) + if timeout > 0: + handle = wait_for_actor( + actor_name, timeout=timeout, interval=interval + ) + else: + handle = _get_cached_actor_handle(actor_name) setattr(actor_list, actor_name, handle) return actor_list diff --git a/main.py b/main.py index ebc675d..301daff 100644 --- a/main.py +++ b/main.py @@ -1,13 +1,26 @@ import os import secrets +import sys +_INSECURE_SECRETS = {"secret", "114514", "changethiskey12345"} _secret_key = os.getenv("SECRET_KEY") -if not _secret_key or _secret_key in {"secret", "114514", "changethiskey12345"}: - _secret_key = secrets.token_urlsafe(32) - os.environ["SECRET_KEY"] = _secret_key - print( - "⚠️ 警告: 未提供有效的 SECRET_KEY 或使用了不安全的默认值,已生成并设置随机密钥。" - ) +_is_dev = os.getenv("KILOSTAR_ENV", "production").lower() in ("dev", "development") + +if not _secret_key or _secret_key in _INSECURE_SECRETS: + if _is_dev: + _secret_key = secrets.token_urlsafe(32) + os.environ["SECRET_KEY"] = _secret_key + print( + "⚠️ [开发模式] 未提供有效的 SECRET_KEY,已生成临时随机密钥(重启后失效)。" + ) + else: + print( + "❌ [致命错误] 未提供有效的 SECRET_KEY 或使用了不安全的默认值。\n" + " 请设置环境变量 SECRET_KEY 为一个高熵的随机字符串。\n" + " 可使用: python -c \"import secrets; print(secrets.token_urlsafe(32))\"\n" + " 若为开发环境,请设置 KILOSTAR_ENV=dev 以允许自动生成临时密钥。" + ) + sys.exit(1) import asyncio import ray diff --git a/pyproject.toml b/pyproject.toml index 9a16280..cc66427 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,9 @@ asyncio_mode = "auto" testpaths = ["tests"] pythonpath = ["."] addopts = "-ra -q --strict-markers" +markers = [ + "integration: 端到端 / 组装层 smoke 测试(不依赖真 ray / postgres)", +] filterwarnings = [ "ignore::DeprecationWarning", ] diff --git a/tests/integration/test_smoke.py b/tests/integration/test_smoke.py new file mode 100644 index 0000000..1ad1ded --- /dev/null +++ b/tests/integration/test_smoke.py @@ -0,0 +1,156 @@ +"""组装层 / 端到端 smoke 测试。 + +这一层补 ``tests/unit`` 的盲区:单测全是 mock 出来的纯逻辑,抓不到 +"import 错误 / 路由冲突 / 真实节点拓扑串联不上" 这类组装层 bug。 + +设计原则: +- **不依赖真 ray / 真 postgres**:sandbox 里 ``ray.init`` 有 psutil PID 问题, + 真 postgres 要 docker。这里只验证"组件能正确组装 + 真实拓扑能端到端跑通"。 +- app 装配:用真实的 ``KiloStarGateway`` 内部 ``app``(触发所有 router import + + 注册),打 health 探针。 +- workflow:用真实的 6 节点 graph 拓扑端到端跑,只 mock 最外层 IO(DB 写 / SSE / + 执行器),不 mock 任何节点逻辑。 +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import pytest +from httpx import ASGITransport, AsyncClient + +pytestmark = pytest.mark.integration + + +# ─── 组装层:整个 FastAPI app 能 import + 路由注册无冲突 ──────────────────── + + +@pytest.mark.asyncio +async def test_app_imports_and_health_live_ok(): + """导入生产 ``app`` 不报错,且 /health/live 返回 alive。 + + 这一步能抓到的真实 bug:任一 router 模块 import 失败、include_router + 路由前缀撞车、middleware 装配异常——这些单测都看不到。 + """ + from kilostar.api import app + + transport = ASGITransport(app=app, raise_app_exceptions=False) + async with AsyncClient(transport=transport, base_url="http://test") as client: + resp = await client.get("/health/live") + + assert resp.status_code == 200 + assert resp.json() == {"status": "alive"} + + +@pytest.mark.asyncio +async def test_app_route_table_has_expected_endpoints(): + """关键路由都已注册(拓扑回归保护)。""" + from kilostar.api import app + + paths = {getattr(r, "path", None) for r in app.router.routes} + assert "/health/live" in paths + assert "/health/ready" in paths + assert "/api/v1/workflow" in paths + # 阶段九/十 新增的 resume / graph 端点 + assert "/api/v1/workflow/{trace_id}/resume" in paths + assert "/api/v1/workflow/{trace_id}/graph" in paths + + +# ─── 端到端:真实 6 节点 graph 拓扑跑通 ──────────────────────────────────── + + +def _make_real_deps(skill_outputs, consciousness_outputs=None, replies=None): + """构造 WorkflowDeps:只 mock 最外层 IO,节点逻辑全用真实实现。""" + from kilostar.core.work.workflow.workflow_engine import WorkflowDeps + + skill_q = list(skill_outputs or []) + consc_q = list(consciousness_outputs or []) + reply_q = list(replies or []) + sink = {"pending": [], "skill": [], "consc": []} + + async def _get_received(tid): + return reply_q.pop(0) if reply_q else "" + + async def _run_skill(step, state): + sink["skill"].append(step.get("name")) + return skill_q.pop(0) if skill_q else ("(none)", True) + + async def _run_consciousness(step, state): + sink["consc"].append(step.get("name")) + return consc_q.pop(0) if consc_q else ("(none)", True) + + deps = WorkflowDeps( + upsert_workflow_context=AsyncMock(), + update_workflow_status=AsyncMock(), + put_pending=AsyncMock(side_effect=lambda t, m: sink["pending"].append(m)), + get_received=_get_received, + run_skill=_run_skill, + run_consciousness=_run_consciousness, + ) + return deps, sink + + +@pytest.mark.asyncio +async def test_end_to_end_mixed_workflow_runs_to_completion(): + """混合 skill + consciousness + HITL 的多步 workflow 端到端跑通。 + + 这是最贴近"真实一次 workflow"的 smoke:3 步分别走不同节点类型 + 一步 + 需要人工审批,全程用真实 Dispatch 派发逻辑。 + """ + from kilostar.core.work.workflow.workflow_engine import run_workflow_graph + from kilostar.core.work.workflow.model import WorkflowStatus + + deps, sink = _make_real_deps( + skill_outputs=[("s-ok", True), ("s2-ok", True)], + consciousness_outputs=[("c-ok", True)], + replies=["approve"], + ) + workflow_data = { + "work_link": [ + {"step": 1, "name": "research", "action": "do", + "node": "skill_individual", "agent_id": "a1"}, + {"step": 2, "name": "plan", "action": "do", + "node": "consciousness_node"}, + {"step": 3, "name": "review", "action": "do", + "node": "skill_individual", "agent_id": "a1", + "require_approval": True}, + ] + } + + final = await run_workflow_graph(workflow_data, "smoke-mixed", deps=deps) + + assert final == WorkflowStatus.COMPLETED.value + # 真实派发:skill 跑了 research + review(审批通过后),consciousness 跑了 plan + assert sink["skill"] == ["research", "review"] + assert sink["consc"] == ["plan"] + # 审批提示发过 + assert any("人工审批" in m for m in sink["pending"]) + + +@pytest.mark.asyncio +async def test_end_to_end_empty_workflow_completes_immediately(): + """空 work_link 直接 COMPLETED(不卡死、不报错)。""" + from kilostar.core.work.workflow.workflow_engine import run_workflow_graph + from kilostar.core.work.workflow.model import WorkflowStatus + + deps, _ = _make_real_deps(skill_outputs=[]) + final = await run_workflow_graph({"work_link": []}, "smoke-empty", deps=deps) + assert final == WorkflowStatus.COMPLETED.value + + +@pytest.mark.asyncio +async def test_end_to_end_failed_step_aborts_workflow(): + """某步执行失败 → 工作流终态 FAILED(真实 logic gate 行为)。""" + from kilostar.core.work.workflow.workflow_engine import run_workflow_graph + from kilostar.core.work.workflow.model import WorkflowStatus + + deps, sink = _make_real_deps(skill_outputs=[("boom", False)]) + workflow_data = { + "work_link": [ + {"step": 1, "name": "will-fail", "action": "do", + "node": "skill_individual", "agent_id": "a1"}, + ] + } + final = await run_workflow_graph(workflow_data, "smoke-fail", deps=deps) + assert final == WorkflowStatus.FAILED.value + assert sink["skill"] == ["will-fail"] diff --git a/tests/unit/test_api_health.py b/tests/unit/test_api_health.py index 792f89d..6c8f521 100644 --- a/tests/unit/test_api_health.py +++ b/tests/unit/test_api_health.py @@ -1,4 +1,4 @@ -"""``api/health.py`` 健康探针端点。""" +"""``api/system.py`` 健康探针端点。""" from __future__ import annotations @@ -9,13 +9,13 @@ import pytest from fastapi import FastAPI from httpx import AsyncClient, ASGITransport -from kilostar.api.health import health_router +from kilostar.api.system import system_router @pytest.fixture def health_app() -> FastAPI: app = FastAPI() - app.include_router(health_router) + app.include_router(system_router) return app diff --git a/tests/unit/test_api_workflow_auth.py b/tests/unit/test_api_workflow_auth.py new file mode 100644 index 0000000..3aaf039 --- /dev/null +++ b/tests/unit/test_api_workflow_auth.py @@ -0,0 +1,113 @@ +"""workflow 路由鉴权测试:SSE / reply / resume / detail / graph 端点归属校验。""" + +from __future__ import annotations + +import types +from unittest.mock import AsyncMock + +import pytest +from fastapi import FastAPI +from httpx import AsyncClient, ASGITransport + +from kilostar.api.workflow import workflow_router +from kilostar.utils.access import Accessor, TokenData + + +def _fake_user(user_id: str = "alice"): + return TokenData(user_id=user_id, username=user_id) + + +def _make_workflow(owner: str = "alice"): + return types.SimpleNamespace( + trace_id="trace-1", + user_id=owner, + title="test", + status="running", + ) + + +@pytest.fixture +def app_alice(): + app = FastAPI() + app.include_router(workflow_router) + app.dependency_overrides[Accessor.get_current_user] = lambda: _fake_user("alice") + return app + + +def _register_pg(fake_actors, owner: str = "alice"): + pg = types.SimpleNamespace() + pg.get_workflow = types.SimpleNamespace(remote=AsyncMock(return_value=_make_workflow(owner))) + pg.get_workflow_context = types.SimpleNamespace(remote=AsyncMock(return_value=None)) + pg.get_workflow_graph_state = types.SimpleNamespace(remote=AsyncMock(return_value=None)) + fake_actors.register("postgres_database", pg) + return pg + + +@pytest.mark.asyncio +async def test_detail_forbidden_other_user(app_alice, fake_actors): + _register_pg(fake_actors, owner="bob") + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.get("/api/v1/workflow/trace-1") + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_detail_not_found(app_alice, fake_actors): + pg = types.SimpleNamespace() + pg.get_workflow = types.SimpleNamespace(remote=AsyncMock(return_value=None)) + fake_actors.register("postgres_database", pg) + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.get("/api/v1/workflow/trace-nonexist") + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_reply_forbidden_other_user(app_alice, fake_actors): + _register_pg(fake_actors, owner="bob") + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.post("/api/v1/workflow/reply/trace-1", json={"message": "hi"}) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_resume_forbidden_other_user(app_alice, fake_actors): + _register_pg(fake_actors, owner="bob") + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.post("/api/v1/workflow/trace-1/resume") + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_resume_not_found(app_alice, fake_actors): + pg = types.SimpleNamespace() + pg.get_workflow = types.SimpleNamespace(remote=AsyncMock(return_value=None)) + fake_actors.register("postgres_database", pg) + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.post("/api/v1/workflow/trace-nonexist/resume") + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_graph_forbidden_other_user(app_alice, fake_actors): + _register_pg(fake_actors, owner="bob") + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.get("/api/v1/workflow/trace-1/graph") + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_sse_forbidden_other_user(app_alice, fake_actors): + _register_pg(fake_actors, owner="bob") + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.get("/api/v1/workflow/sse/trace-1") + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_sse_not_found(app_alice, fake_actors): + pg = types.SimpleNamespace() + pg.get_workflow = types.SimpleNamespace(remote=AsyncMock(return_value=None)) + fake_actors.register("postgres_database", pg) + async with AsyncClient(transport=ASGITransport(app=app_alice), base_url="http://t") as c: + resp = await c.get("/api/v1/workflow/sse/trace-nonexist") + assert resp.status_code == 404 diff --git a/tests/unit/test_api_workflow_detail.py b/tests/unit/test_api_workflow_detail.py new file mode 100644 index 0000000..73d115d --- /dev/null +++ b/tests/unit/test_api_workflow_detail.py @@ -0,0 +1,80 @@ +"""``api/workflow.py`` 读侧拼装:运行期状态 merge 到静态 step。""" + +from __future__ import annotations + +from kilostar.api.workflow import _merge_runtime_status + + +def test_merge_marks_pending_when_no_log(): + """没有任何运行日志时,所有 step 默认 pending。""" + work_link = [ + {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, + {"step": 2, "name": "s2", "node": "consciousness_node", "action": "b"}, + ] + merged = _merge_runtime_status(work_link, []) + assert [s["status"] for s in merged] == ["pending", "pending"] + # 静态字段保留 + assert merged[0]["name"] == "s1" + assert merged[1]["node"] == "consciousness_node" + + +def test_merge_uses_latest_status_per_step(): + """同一 step 多条日志时取最后一条(working → completed)。""" + work_link = [ + {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, + ] + workflow_log = [ + {"0": ["2026-01-01T00:00:00", "working", "开始"]}, + {"0": ["2026-01-01T00:00:05", "completed", "成功"]}, + ] + merged = _merge_runtime_status(work_link, workflow_log) + assert merged[0]["status"] == "completed" + + +def test_merge_mixed_statuses(): + """多 step 各自取自己最新状态;无日志的保持 pending。""" + work_link = [ + {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, + {"step": 2, "name": "s2", "node": "skill_individual", "action": "b"}, + {"step": 3, "name": "s3", "node": "skill_individual", "action": "c"}, + ] + workflow_log = [ + {"0": ["t", "completed", "ok"]}, + {"1": ["t", "failed", "boom"]}, + ] + merged = _merge_runtime_status(work_link, workflow_log) + assert [s["status"] for s in merged] == ["completed", "failed", "pending"] + + +def test_merge_falls_back_to_position_index_without_step_field(): + """step 没有 step 字段时按位置索引匹配日志。""" + work_link = [ + {"name": "s1", "node": "skill_individual", "action": "a"}, + {"name": "s2", "node": "skill_individual", "action": "b"}, + ] + workflow_log = [ + {"1": ["t", "completed", "ok"]}, + ] + merged = _merge_runtime_status(work_link, workflow_log) + assert merged[0]["status"] == "pending" + assert merged[1]["status"] == "completed" + + +def test_merge_ignores_malformed_log_entries(): + """脏日志(非 dict / payload 不是数组 / key 不是数字)不应炸。""" + work_link = [ + {"step": 1, "name": "s1", "node": "skill_individual", "action": "a"}, + ] + workflow_log = [ + "not-a-dict", + {"not-an-int": ["t", "completed", "x"]}, + {"0": "not-a-list"}, + {"0": ["t", "working"]}, + ] + merged = _merge_runtime_status(work_link, workflow_log) + assert merged[0]["status"] == "working" + + +def test_merge_handles_empty_work_link(): + assert _merge_runtime_status([], []) == [] + assert _merge_runtime_status(None, None) == [] diff --git a/tests/unit/test_config_loader.py b/tests/unit/test_config_loader.py new file mode 100644 index 0000000..c59e4e7 --- /dev/null +++ b/tests/unit/test_config_loader.py @@ -0,0 +1,78 @@ +"""``utils/config_loader.py``:workflow.yaml 读/写/热重载。""" + +from __future__ import annotations + +import yaml +import pytest + + +@pytest.fixture(autouse=True) +def isolated_yaml(tmp_path, monkeypatch): + """每个用例用独立的临时 yaml,避免污染真实 config/workflow.yaml。""" + from kilostar.utils import config_loader + + fake_yaml = tmp_path / "workflow.yaml" + monkeypatch.setattr(config_loader, "_WORKFLOW_YAML", fake_yaml) + monkeypatch.setattr(config_loader, "_current", None) + return fake_yaml + + +def test_get_workflow_config_returns_default_when_file_absent(): + from kilostar.utils.config_loader import get_workflow_config + + config = get_workflow_config() + assert config.retry.max_attempts == 5 + + +def test_get_workflow_config_reads_from_disk(isolated_yaml): + isolated_yaml.write_text("retry:\n max_attempts: 12\n", encoding="utf-8") + + from kilostar.utils.config_loader import reload_workflow_config + + config = reload_workflow_config() + assert config.retry.max_attempts == 12 + + +def test_save_workflow_config_writes_yaml_and_reloads(isolated_yaml): + from kilostar.utils.config_loader import ( + save_workflow_config, + get_workflow_config, + WorkflowConfig, + RetryConfig, + ) + + new_config = WorkflowConfig(retry=RetryConfig(max_attempts=20)) + save_workflow_config(new_config) + + on_disk = yaml.safe_load(isolated_yaml.read_text(encoding="utf-8")) + assert on_disk == {"retry": {"max_attempts": 20}} + + # 热重载:再次 get 应直接拿到新值 + assert get_workflow_config().retry.max_attempts == 20 + + +def test_max_attempts_validation_rejects_out_of_range(): + from kilostar.utils.config_loader import RetryConfig + + with pytest.raises(Exception): + RetryConfig(max_attempts=0) + + with pytest.raises(Exception): + RetryConfig(max_attempts=200) + + +def test_reload_picks_up_external_file_changes(isolated_yaml): + """模拟运维直接改 yaml 文件,reload 后引擎能拿到新值。""" + isolated_yaml.write_text("retry:\n max_attempts: 3\n", encoding="utf-8") + + from kilostar.utils.config_loader import ( + get_workflow_config, + reload_workflow_config, + ) + + assert reload_workflow_config().retry.max_attempts == 3 + + isolated_yaml.write_text("retry:\n max_attempts: 30\n", encoding="utf-8") + assert reload_workflow_config().retry.max_attempts == 30 + # get_workflow_config 也读到最新 + assert get_workflow_config().retry.max_attempts == 30 diff --git a/tests/unit/test_plugin_metadata.py b/tests/unit/test_plugin_metadata.py index ad4cbae..2194464 100644 --- a/tests/unit/test_plugin_metadata.py +++ b/tests/unit/test_plugin_metadata.py @@ -13,15 +13,15 @@ def test_approval_metadata(): data = ApprovalToolData() assert data.is_system is True assert data.category == "system" - assert "control_node" in data.action_scope - assert "consciousness_node" in data.action_scope + # action_scope 为空表示分配给 default 组(所有节点可用) + assert data.action_scope == [] def test_file_reader_metadata(): data = FileReaderToolData() assert data.is_system is True assert data.category == "system" - assert "control_node" in data.action_scope + assert data.action_scope == [] def test_tavily_search_metadata(): diff --git a/tests/unit/test_utils_ray_hook.py b/tests/unit/test_utils_ray_hook.py index d5936f5..3fbbd55 100644 --- a/tests/unit/test_utils_ray_hook.py +++ b/tests/unit/test_utils_ray_hook.py @@ -44,3 +44,64 @@ def test_ray_actor_hook_unknown_actor_raises(fake_actors): with pytest.raises(ValueError): ray_actor_hook("does_not_exist") + + +def test_wait_for_actor_returns_immediately_when_ready(fake_actors): + """actor 已就绪时 wait_for_actor 立刻返回,不进入轮询等待。""" + handle = MagicMock() + fake_actors.register("postgres_database", handle) + + from kilostar.utils.ray_hook import wait_for_actor + + got = wait_for_actor("postgres_database", timeout=5.0) + assert got is handle + + +def test_wait_for_actor_times_out_with_clear_error(fake_actors): + """超时仍未就绪时抛 TimeoutError,并在 message 里带 actor 名。""" + from kilostar.utils.ray_hook import wait_for_actor + + with pytest.raises(TimeoutError) as exc_info: + wait_for_actor("never_ready", timeout=0.2, interval=0.05) + assert "never_ready" in str(exc_info.value) + + +def test_wait_for_actor_succeeds_after_delayed_registration(fake_actors): + """actor 在第 N 次轮询时才注册,wait_for_actor 应在它就绪后返回。""" + from kilostar.utils.ray_hook import wait_for_actor + + handle = MagicMock() + calls = {"n": 0} + original_get = fake_actors.get + + def delayed_get(name, namespace="kilostar"): + calls["n"] += 1 + if calls["n"] >= 3: + return handle + raise ValueError("not ready yet") + + fake_actors.get = delayed_get + try: + got = wait_for_actor("late_actor", timeout=2.0, interval=0.05) + assert got is handle + assert calls["n"] >= 3 + finally: + fake_actors.get = original_get + + +def test_ray_actor_hook_with_timeout_waits(fake_actors): + """ray_actor_hook(timeout>0) 会走 wait_for_actor 等待路径。""" + from kilostar.utils.ray_hook import ray_actor_hook + + handle = MagicMock() + calls = {"n": 0} + + def delayed_get(name, namespace="kilostar"): + calls["n"] += 1 + if calls["n"] >= 2: + return handle + raise ValueError("not ready yet") + + fake_actors.get = delayed_get + actors = ray_actor_hook("slow_actor", timeout=2.0, interval=0.05) + assert actors.slow_actor is handle diff --git a/tests/unit/test_workflow_graph.py b/tests/unit/test_workflow_graph.py index eeefdd0..574e337 100644 --- a/tests/unit/test_workflow_graph.py +++ b/tests/unit/test_workflow_graph.py @@ -288,3 +288,37 @@ def test_workflow_graph_state_defaults(): assert state.final_status == WorkflowStatus.RUNNING.value assert state.logs == [] assert state.original_command == "" + assert state.jump_counts == {} + + +@pytest.mark.asyncio +async def test_loop_retry_exceeds_max_attempts_fails(monkeypatch): + """环跳转超过 max_attempts 后,工作流应直接 FAILED 而非无限重试。""" + from kilostar.utils import config_loader + from kilostar.utils.config_loader import WorkflowConfig, RetryConfig + + monkeypatch.setattr(config_loader, "_current", WorkflowConfig(retry=RetryConfig(max_attempts=2))) + + deps, sink = _make_deps( + skill_outputs=[ + ("o1", True), + ("fail", False), + ("o1", True), + ("fail", False), + ("o1", True), + ("fail", False), + ("o1", True), + ("fail", False), + ] + ) + workflow_data = { + "work_link": [ + {"step": 1, "name": "s1", "action": "do", + "node": "skill_individual", "agent_id": "a1"}, + {"step": 2, "name": "s2", "action": "do", + "node": "skill_individual", "agent_id": "a2", + "logic_gate": {"if_fail": "jump_to_step_1", "if_pass": "continue"}}, + ] + } + final = await run_workflow_graph(workflow_data, "trace-loop-limit", deps=deps) + assert final == WorkflowStatus.FAILED.value