Files
KiloStar/kilostar/api/workflow.py
T
zhaoxi 99520c69d7 feat(system):优化后端
1.新增后端测试
2.增加了后端的加密
3.增加了i18n(国际化)
2026-05-31 15:39:34 +00:00

208 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kilostar.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ulid import ULID
import asyncio
from kilostar.utils.access import Accessor, TokenData
from kilostar.utils.check_user.role_check import RoleChecker
from kilostar.core.postgres_database.model import UserAuthority
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
class CreateWorkflowRequest(BaseModel):
title: str
command: str
@workflow_router.post("")
async def create_workflow(
request: CreateWorkflowRequest,
token_data: TokenData = Depends(Accessor.get_current_user),
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
trace_id = str(ULID())
await postgres_database.create_workflow.remote(
trace_id=trace_id,
user_id=token_data.user_id,
title=request.title,
command=request.command,
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.create_trace.remote(trace_id)
consciousness_node = ray_actor_hook("consciousness_node").consciousness_node
consciousness_node.start_workflow_design.remote(trace_id, request.command)
return {"trace_id": trace_id, "status": "creating"}
@workflow_router.get("/list")
async def get_workflow_list(
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))):
postgres_database = ray_actor_hook("postgres_database").postgres_database
workflows = await postgres_database.list_workflows.remote(
user_id=token_data.user_id
)
return {"workflows": workflows}
@workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(trace_id: str, request: Request):
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
async def event_generator():
try:
while True:
if await request.is_disconnected():
break
message = await global_workflow_manager.get_pending.remote(trace_id)
if message:
yield f"data: {message}\n\n"
else:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
pass
return StreamingResponse(event_generator(), media_type="text/event-stream")
@workflow_router.post("/reply/{trace_id}")
async def post_workflow_reply(trace_id: str, request: Request):
data = await request.json()
reply_msg = data.get("message", "")
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.put_received.remote(trace_id, reply_msg)
return {"status": "ok"}
@workflow_router.get("/{trace_id}")
async def get_workflow_detail(
trace_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
context = await postgres_database.get_workflow_context.remote(trace_id)
steps = context.work_link if context and hasattr(context, "work_link") else []
return {
"trace_id": trace_id,
"title": wf.title,
"status": wf.status,
"command": wf.command,
"steps": steps,
"context_blackboard": context.blackboard if context else {},
}
@workflow_router.post("/{trace_id}/resume")
async def resume_workflow(
trace_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""从 ``workflow_graph_state`` 持久化恢复一个被中断/挂起的工作流。
新 fire 一个 ray tasktask 入口的 ``hydrate`` 检查会自动走 resume 路径
把剩余节点跑完。
"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
record = await postgres_database.get_workflow_graph_state.remote(trace_id)
if record is None:
raise HTTPException(
status_code=409, detail="该工作流没有可恢复的图持久化记录"
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.create_trace.remote(trace_id)
from kilostar.core.work.workflow.workflow_engine import run_workflow_task
# workflow_data 在 resume 路径上不会被使用(hydrate 会走 resume 分支),
# 这里给个空 dict 占位即可
run_workflow_task.remote({}, trace_id)
return {"trace_id": trace_id, "status": "resuming"}
@workflow_router.get("/{trace_id}/graph")
async def get_workflow_graph_mermaid(
trace_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""返回当前 workflow 引擎的 mermaid 图源码(节点拓扑)。
拓扑本身对所有 trace 是同一份;但如果该 trace 已经有 ``workflow_graph_state``
持久化记录,会读出 history 里"已经成功跑过的节点"作为 ``highlighted_nodes``
传给 mermaid,前端拿到的 mermaid 源码会自带 visited 节点高亮。
"""
from kilostar.core.work.workflow.workflow_engine import workflow_graph
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
if getattr(wf, "user_id", None) != token_data.user_id:
raise HTTPException(status_code=403, detail="Forbidden")
visited: list[str] = []
record = await postgres_database.get_workflow_graph_state.remote(trace_id)
if record is not None:
history = getattr(record, "history", None) or []
# history 里每条 NodeSnapshot.id 形如 "ClassName:hash",截前缀作为 NodeIdent
# 只取 status == "success" 的节点(避免 "created" / "running" 带噪声)
seen: set[str] = set()
for entry in history:
if not isinstance(entry, dict):
continue
if entry.get("kind") != "node":
continue
if entry.get("status") != "success":
continue
sid = entry.get("id") or ""
cls_name = sid.split(":", 1)[0] if sid else ""
if cls_name and cls_name not in seen:
seen.add(cls_name)
visited.append(cls_name)
try:
if visited:
mermaid = workflow_graph.mermaid_code(highlighted_nodes=visited)
else:
mermaid = workflow_graph.mermaid_code()
except Exception as e:
raise HTTPException(status_code=500, detail=f"mermaid 生成失败: {e}")
return {"trace_id": trace_id, "mermaid": mermaid, "visited": visited}