Files
zhaoxi 6d658b4f4d feat: 工具系统迁移 + 重型插件骨架 + 前端交互增强
- 工具系统从 kilostar/plugin/tool_plugin/ 迁移到 data/toolset/(manifest.json 声明式)
- 新增 plugin_runtime 模块:BaseOrganization / GlobalPluginManager / loader / tool_bridge
- 新增 org_task + org_task_event 表及 DAO(alembic 0009)
- 新增 /api/v1/plugin 路由(submit/status/stream/install/reload)
- 新增 data/plugin/example_dept 示例重型插件
- regulatory_node 支持聊天历史上下文注入
- send_file 改为 artifact 存盘 + SSE 推送下载链接
- 前端 WorkflowFileCard 组件 + ToolSettings README 渲染
- utils 整理:合并 access/role_check、standalone_proxy→ray_compat、删除废弃模块
- 项目结构文档移至 docs/STRUCTURE.md 并详细展开

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 05:20:00 +00:00

289 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kilostar.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ulid import ULID
import asyncio
from kilostar.utils.access import Accessor, TokenData, RoleChecker
from kilostar.core.postgres_database.model import UserAuthority
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
class CreateWorkflowRequest(BaseModel):
title: str
command: str
@workflow_router.post("")
async def create_workflow(
request: CreateWorkflowRequest,
token_data: TokenData = Depends(Accessor.get_current_user),
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
trace_id = str(ULID())
await postgres_database.create_workflow.remote(
trace_id=trace_id,
user_id=token_data.user_id,
title=request.title,
command=request.command,
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.create_trace.remote(trace_id)
consciousness_node = ray_actor_hook("consciousness_node").consciousness_node
consciousness_node.start_workflow_design.remote(trace_id, request.command)
return {"trace_id": trace_id, "status": "creating"}
@workflow_router.get("/list")
async def get_workflow_list(
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))):
postgres_database = ray_actor_hook("postgres_database").postgres_database
workflows = await postgres_database.list_workflows.remote(
user_id=token_data.user_id
)
return {"workflows": workflows}
@workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(
trace_id: str,
request: Request,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""SSE 事件流。
鉴权走标准 ``Authorization: Bearer`` 头(前端用 fetch-based SSE
token 不进 URL)。校验该 trace_id 属于当前用户。
"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
async def event_generator():
try:
while True:
if await request.is_disconnected():
break
message = await global_workflow_manager.get_pending.remote(trace_id)
if message:
yield f"data: {message}\n\n"
else:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
pass
return StreamingResponse(event_generator(), media_type="text/event-stream")
@workflow_router.post("/reply/{trace_id}")
async def post_workflow_reply(
trace_id: str,
request: Request,
token_data: TokenData = Depends(Accessor.get_current_user),
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
data = await request.json()
reply_msg = data.get("message", "")
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.put_received.remote(trace_id, reply_msg)
return {"status": "ok"}
@workflow_router.get("/{trace_id}")
async def get_workflow_detail(
trace_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
context = await postgres_database.get_workflow_context.remote(trace_id)
work_link = (
context.work_link if context and hasattr(context, "work_link") else []
)
workflow_log = (
context.workflow_log if context and hasattr(context, "workflow_log") else []
)
workflow_pointer = (
context.workflow_pointer
if context and getattr(context, "workflow_pointer", None) is not None
else 0
)
steps = _merge_runtime_status(work_link, workflow_log)
return {
"trace_id": trace_id,
"title": wf.title,
"status": wf.status,
"command": wf.command,
"steps": steps,
"current_step": workflow_pointer,
"context_blackboard": context.blackboard if context else {},
}
def _merge_runtime_status(work_link: list, workflow_log: list) -> list:
"""把运行期状态从 ``workflow_log`` 反推并 merge 到每个静态 step 上。
``work_link`` 是 step 的**静态定义**(名字 / node 类型 / action),不含运行期
状态;运行期状态散落在 ``workflow_log`` 里——其结构为::
[{"<step_index>": [timestamp, status, message]}, ...]
同一 step 可能出现多条(working → completed),取**最后一条**的 status 作为
该 step 当前状态。没有日志记录的 step 视为 ``pending``。
前端 ``WorkflowDiagram`` 依赖每个 step 的 ``status`` 字段着色,这个拼装让
后端真正把运行期状态喂过去。
"""
latest_status: dict[int, str] = {}
latest_output: dict[int, str] = {}
for entry in workflow_log or []:
if not isinstance(entry, dict):
continue
for key, payload in entry.items():
try:
idx = int(key)
except (ValueError, TypeError):
continue
if isinstance(payload, (list, tuple)) and len(payload) >= 2:
latest_status[idx] = payload[1]
if isinstance(payload, (list, tuple)) and len(payload) >= 3:
latest_output[idx] = payload[2]
merged = []
for i, step in enumerate(work_link or []):
step_copy = dict(step) if isinstance(step, dict) else {}
step_idx = step_copy.get("step")
lookup_idx = (step_idx - 1) if isinstance(step_idx, int) else i
step_copy["status"] = latest_status.get(lookup_idx, "pending")
step_copy["output"] = latest_output.get(lookup_idx, "")
merged.append(step_copy)
return merged
@workflow_router.post("/{trace_id}/resume")
async def resume_workflow(
trace_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""从 ``workflow_graph_state`` 持久化恢复一个被中断/挂起的工作流。
新 fire 一个 ray tasktask 入口的 ``hydrate`` 检查会自动走 resume 路径
把剩余节点跑完。
"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
record = await postgres_database.get_workflow_graph_state.remote(trace_id)
if record is None:
raise HTTPException(
status_code=409, detail="该工作流没有可恢复的图持久化记录"
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.create_trace.remote(trace_id)
from kilostar.core.work.workflow.workflow_engine import run_workflow_task
# resume_only=Truetask 入口 hydrate 失败会 fail-fast,绝不 fall through
# 到"全新模式空跑"。workflow_data 在 resume 路径上不会被使用,传空 dict 占位。
run_workflow_task.remote({}, trace_id, resume_only=True)
return {"trace_id": trace_id, "status": "resuming"}
@workflow_router.get("/{trace_id}/graph")
async def get_workflow_graph_mermaid(
trace_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""返回当前 workflow 引擎的 mermaid 图源码(节点拓扑)。
拓扑本身对所有 trace 是同一份;但如果该 trace 已经有 ``workflow_graph_state``
持久化记录,会读出 history 里"已经成功跑过的节点"作为 ``highlighted_nodes``
传给 mermaid,前端拿到的 mermaid 源码会自带 visited 节点高亮。
"""
from kilostar.core.work.workflow.workflow_engine import workflow_graph
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
visited: list[str] = []
record = await postgres_database.get_workflow_graph_state.remote(trace_id)
if record is not None:
history = getattr(record, "history", None) or []
# history 里每条 NodeSnapshot.id 形如 "ClassName:hash",截前缀作为 NodeIdent
# 只取 status == "success" 的节点(避免 "created" / "running" 带噪声)
seen: set[str] = set()
for entry in history:
if not isinstance(entry, dict):
continue
if entry.get("kind") != "node":
continue
if entry.get("status") != "success":
continue
sid = entry.get("id") or ""
cls_name = sid.split(":", 1)[0] if sid else ""
if cls_name and cls_name not in seen:
seen.add(cls_name)
visited.append(cls_name)
try:
if visited:
mermaid = workflow_graph.mermaid_code(highlighted_nodes=visited)
else:
mermaid = workflow_graph.mermaid_code()
except Exception as e:
raise HTTPException(status_code=500, detail=f"mermaid 生成失败: {e}")
return {"trace_id": trace_id, "mermaid": mermaid, "visited": visited}