Files
KiloStar/kilostar/api/workflow.py
T

122 lines
4.1 KiB
Python

# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kilostar.utils.ray_hook import ray_actor_hook
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from ulid import ULID
import asyncio
from kilostar.utils.access import Accessor, TokenData
from kilostar.utils.check_user.role_check import RoleChecker
from kilostar.core.postgres_database.model import UserAuthority
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
class CreateWorkflowRequest(BaseModel):
title: str
command: str
@workflow_router.post("")
async def create_workflow(
request: CreateWorkflowRequest,
token_data: TokenData = Depends(Accessor.get_current_user),
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
trace_id = str(ULID())
await postgres_database.create_workflow.remote(
trace_id=trace_id,
user_id=token_data.user_id,
title=request.title,
command=request.command,
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.create_trace.remote(trace_id)
consciousness_node = ray_actor_hook("consciousness_node").consciousness_node
consciousness_node.start_workflow_design.remote(trace_id, request.command)
return {"trace_id": trace_id, "status": "creating"}
@workflow_router.get("/list")
async def get_workflow_list(
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))):
postgres_database = ray_actor_hook("postgres_database").postgres_database
workflows = await postgres_database.list_workflows.remote(
user_id=token_data.user_id
)
return {"workflows": workflows}
@workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(trace_id: str, request: Request):
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
async def event_generator():
try:
while True:
if await request.is_disconnected():
break
message = await global_workflow_manager.get_pending.remote(trace_id)
if message:
yield f"data: {message}\n\n"
else:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
pass
return StreamingResponse(event_generator(), media_type="text/event-stream")
@workflow_router.post("/reply/{trace_id}")
async def post_workflow_reply(trace_id: str, request: Request):
data = await request.json()
reply_msg = data.get("message", "")
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.put_received.remote(trace_id, reply_msg)
return {"status": "ok"}
@workflow_router.get("/{trace_id}")
async def get_workflow_detail(
trace_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
postgres_database = ray_actor_hook("postgres_database").postgres_database
wf = await postgres_database.get_workflow.remote(trace_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
context = await postgres_database.get_workflow_context.remote(trace_id)
steps = context.work_link if context and hasattr(context, "work_link") else []
return {
"trace_id": trace_id,
"title": wf.title,
"status": wf.status,
"command": wf.command,
"steps": steps,
"context_blackboard": context.blackboard if context else {},
}