-
Workflows
-
Manage and monitor your automated processes.
+
+
+
Workflows
+
Manage and monitor your automated processes.
+
+
{workflows.length === 0 ? (
@@ -103,7 +111,13 @@ export function WorkflowListView({ onSelectWorkflow }: WorkflowListViewProps) {
- {wf.message && (
+ {(wf as any).command && (
+
+ Command:
+ "{(wf as any).command}"
+
+ )}
+ {wf.message && !(wf as any).command && (
Command:
"{wf.message}"
@@ -111,12 +125,15 @@ export function WorkflowListView({ onSelectWorkflow }: WorkflowListViewProps) {
)}
-
- {wf.event_id}
+
+ {wf.event_id || (wf as any).trace_id}
{wf.create_time && (
{new Date(wf.create_time).toLocaleDateString()}
)}
+ {(wf as any).created_at && !wf.create_time && (
+ {new Date((wf as any).created_at).toLocaleDateString()}
+ )}
diff --git a/kilostar/api/__init__.py b/kilostar/api/__init__.py
index 759142d..50a4320 100644
--- a/kilostar/api/__init__.py
+++ b/kilostar/api/__init__.py
@@ -27,6 +27,7 @@ from .platform.frontend import client_router
from .provider import provider_router
from .resource import resource_router
from .workflow import workflow_router
+from .chat import chat_router
from kilostar.utils.error import (
DemandError,
ModelNotExistError,
@@ -48,6 +49,7 @@ app.include_router(resource_router) # 资源路径
app.include_router(cluster_router) # 集群信息路径
app.include_router(agent_router) # agent路径
app.include_router(workflow_router) # workflow路径
+app.include_router(chat_router) # chat路径
@app.exception_handler(UserNotExistError)
diff --git a/kilostar/api/chat.py b/kilostar/api/chat.py
new file mode 100644
index 0000000..18675f5
--- /dev/null
+++ b/kilostar/api/chat.py
@@ -0,0 +1,109 @@
+# Copyright 2026 zhaoxi826
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fastapi import APIRouter, Depends
+from pydantic import BaseModel
+from kilostar.utils.ray_hook import ray_actor_hook
+from kilostar.utils.access import Accessor, TokenData
+
+chat_router = APIRouter(prefix="/api/v1/chat", tags=["chat"])
+
+
+class CreateChatRequest(BaseModel):
+ title: str = "新对话"
+ initial_message: str
+
+
+class SendMessageRequest(BaseModel):
+ message: str
+
+
+@chat_router.post("")
+async def create_chat_session(
+ request: CreateChatRequest,
+ token_data: TokenData = Depends(Accessor.get_current_user),
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ chat = await postgres_database.create_chat_session.remote(
+ user_id=token_data.user_id, title=request.title
+ )
+
+ # 存入用户消息
+ await postgres_database.add_chat_message.remote(
+ chat_id=chat.chat_id, message=request.initial_message, message_owner="user"
+ )
+
+ # 调用监管节点处理简单任务/交流
+ regulatory_node = ray_actor_hook("regulatory_node").regulatory_node
+ # 在此发起任务并等待或异步返回结果
+ response_msg = await regulatory_node.handle_chat_message.remote(
+ user_id=token_data.user_id,
+ chat_id=chat.chat_id,
+ message=request.initial_message,
+ )
+
+ # 存入回复消息
+ if response_msg:
+ await postgres_database.add_chat_message.remote(
+ chat_id=chat.chat_id, message=response_msg, message_owner="regulatory_node"
+ )
+
+ return {"chat_id": chat.chat_id, "reply": response_msg}
+
+
+@chat_router.get("")
+async def list_chat_sessions(
+ token_data: TokenData = Depends(Accessor.get_current_user),
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ sessions = await postgres_database.list_chat_sessions.remote(
+ user_id=token_data.user_id
+ )
+ return {"sessions": sessions}
+
+
+@chat_router.get("/{chat_id}")
+async def get_chat_history(
+ chat_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ messages = await postgres_database.list_chat_messages.remote(chat_id=chat_id)
+ return {"messages": messages}
+
+
+@chat_router.post("/{chat_id}/reply")
+async def send_chat_message(
+ chat_id: str,
+ request: SendMessageRequest,
+ token_data: TokenData = Depends(Accessor.get_current_user),
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ # 存用户消息
+ await postgres_database.add_chat_message.remote(
+ chat_id=chat_id, message=request.message, message_owner="user"
+ )
+
+ # 调用监管节点
+ regulatory_node = ray_actor_hook("regulatory_node").regulatory_node
+ response_msg = await regulatory_node.handle_chat_message.remote(
+ user_id=token_data.user_id, chat_id=chat_id, message=request.message
+ )
+
+ # 存回复
+ if response_msg:
+ await postgres_database.add_chat_message.remote(
+ chat_id=chat_id, message=response_msg, message_owner="regulatory_node"
+ )
+
+ return {"reply": response_msg}
diff --git a/kilostar/api/platform/event.py b/kilostar/api/platform/event.py
index e50bea7..8e2f7c5 100644
--- a/kilostar/api/platform/event.py
+++ b/kilostar/api/platform/event.py
@@ -16,8 +16,8 @@ import datetime
from pydantic import BaseModel, Field, ConfigDict
from ulid import ULID
from typing import Any, Dict
-from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow
import asyncio
+from kilostar.core.work.workflow.workflow import KiloStarWorkflow
class kilostarEvent(BaseModel):
@@ -43,7 +43,7 @@ class kilostarEvent(BaseModel):
context: Dict[str, Any] = Field(
default_factory=dict, description="事件上下文内容,可包含工作流模板等信息"
)
- workflow: kilostarWorkflow | None = Field(default=None, description="工作流")
+ workflow: KiloStarWorkflow | None = Field(default=None, description="工作流")
pending_queue: asyncio.Queue[str] | None = Field(
default=None, description="待处理队列"
)
diff --git a/kilostar/api/workflow.py b/kilostar/api/workflow.py
index 8cc2f1f..664b56a 100644
--- a/kilostar/api/workflow.py
+++ b/kilostar/api/workflow.py
@@ -12,101 +12,95 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
from kilostar.utils.ray_hook import ray_actor_hook
-from fastapi import APIRouter, Request, HTTPException
+from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse
+from pydantic import BaseModel
+from ulid import ULID
import asyncio
+from kilostar.utils.access import Accessor, TokenData
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
+class CreateWorkflowRequest(BaseModel):
+ title: str
+ command: str
+
+
+@workflow_router.post("")
+async def create_workflow(
+ request: CreateWorkflowRequest,
+ token_data: TokenData = Depends(Accessor.get_current_user),
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ trace_id = str(ULID())
+ await postgres_database.create_workflow.remote(
+ trace_id=trace_id,
+ user_id=token_data.user_id,
+ title=request.title,
+ command=request.command,
+ )
+
+ # 将需求发送给意识节点去处理构建
+ consciousness_node = ray_actor_hook("consciousness_node").consciousness_node
+ # 可以异步通知意识节点开始与用户在特定 Trace ID 下对话
+ consciousness_node.start_workflow_design.remote(trace_id, request.command)
+
+ return {"trace_id": trace_id, "status": "creating"}
+
+
@workflow_router.get("/list")
-async def get_workflow_list():
- """处理针对 get workflow list 相关的 HTTP API 请求。
- 该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
- Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
- global_workflow_manager = ray_actor_hook(
- "global_workflow_manager"
- ).global_workflow_manager
- events = await global_workflow_manager.list_events.remote()
- return events
+async def get_workflow_list(token_data: TokenData = Depends(Accessor.get_current_user)):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ workflows = await postgres_database.list_workflows.remote(
+ user_id=token_data.user_id
+ )
+ return {"workflows": workflows}
@workflow_router.get("/{trace_id}")
-async def get_workflow_detail(trace_id: str):
- """处理针对 get workflow detail 相关的 HTTP API 请求。
- 该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
- Args: trace_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 trace 实例。
- Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
- global_workflow_manager = ray_actor_hook(
- "global_workflow_manager"
- ).global_workflow_manager
- event = await global_workflow_manager.get_event.remote(trace_id)
- if not event:
+async def get_workflow_detail(
+ trace_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
+):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ wf = await postgres_database.get_workflow.remote(trace_id)
+ if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
- workflow = event.workflow
- if not workflow:
- return {
- "event_id": trace_id,
- "workflow_title": None,
- "status": "waiting",
- "user_name": event.user_name,
- "message": event.message,
- "create_time": event.create_time,
- "steps": [],
- }
+ context = await postgres_database.get_workflow_context.remote(trace_id)
+
+ steps = context.work_link if context and hasattr(context, "work_link") else []
- steps = []
- for step in workflow.work_link:
- steps.append(
- {
- "step": step.step,
- "name": step.name,
- "node": step.node,
- "action": step.action,
- "desc": step.desc,
- "status": step.status,
- "agent_id": step.agent_id,
- }
- )
return {
- "event_id": trace_id,
- "workflow_title": workflow.title,
- "status": workflow.status.status,
- "command": workflow.command,
- "current_step": workflow.status.step,
- "user_name": event.user_name,
- "message": event.message,
- "create_time": event.create_time,
+ "trace_id": trace_id,
+ "title": wf.title,
+ "status": wf.status,
+ "command": wf.command,
"steps": steps,
+ "context_blackboard": context.blackboard if context else {},
}
@workflow_router.get("/sse/{trace_id}")
async def get_workflow_sse(trace_id: str, request: Request):
- """处理针对 get workflow sse 相关的 HTTP API 请求。
- 该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
- Args: trace_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 trace 实例。 request (Request): FastAPI 框架注入的原生 HTTP 请求对象,包含了完整的 Header 标头、查询参数和正文流。
- Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
+ """
+ 用于与意识节点交互,获取工作流状态或设计阶段的问答消息
+ """
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
async def event_generator():
- """执行与 event generator 相关的核心业务流转操作。
- 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。"""
try:
while True:
if await request.is_disconnected():
break
-
- # You might also want to send the workflow state periodically or when updated
- # Here we just wait for pending messages and send them
message = await global_workflow_manager.get_pending.remote(trace_id)
- # Ensure the message is formatted as SSE
- yield f"data: {message}\n\n"
+ if message:
+ yield f"data: {message}\n\n"
+ else:
+ await asyncio.sleep(0.5)
except asyncio.CancelledError:
pass
@@ -115,10 +109,9 @@ async def get_workflow_sse(trace_id: str, request: Request):
@workflow_router.post("/reply/{trace_id}")
async def post_workflow_reply(trace_id: str, request: Request):
- """处理针对 post workflow reply 相关的 HTTP API 请求。
- 该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
- Args: trace_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 trace 实例。 request (Request): FastAPI 框架注入的原生 HTTP 请求对象,包含了完整的 Header 标头、查询参数和正文流。
- Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
+ """
+ 用于用户回复意识节点的提问(设计阶段或运行中的中断确认)
+ """
data = await request.json()
reply_msg = data.get("message", "")
global_workflow_manager = ray_actor_hook(
diff --git a/kilostar/core/global_state_machine/global_state_machine.py b/kilostar/core/global_state_machine/global_state_machine.py
index c7bca35..b04a0b2 100644
--- a/kilostar/core/global_state_machine/global_state_machine.py
+++ b/kilostar/core/global_state_machine/global_state_machine.py
@@ -17,7 +17,9 @@ from kilostar.core.global_state_machine.provider_manager import ProviderManager
from kilostar.core.global_state_machine.tool_manager import GlobalToolManager
from kilostar.core.postgres_database import PostgresDatabase
from kilostar.core.global_state_machine.skill_manager import GlobalSkillManager
-from kilostar.core.global_state_machine.individual_manager import GlobalIndividualManager
+from kilostar.core.global_state_machine.individual_manager import (
+ GlobalIndividualManager,
+)
@ray.remote
diff --git a/kilostar/core/global_workflow_manager/global_workflow_manager.py b/kilostar/core/global_workflow_manager/global_workflow_manager.py
index d5fc158..3775370 100644
--- a/kilostar/core/global_workflow_manager/global_workflow_manager.py
+++ b/kilostar/core/global_workflow_manager/global_workflow_manager.py
@@ -2,7 +2,7 @@ import ray
import asyncio
from typing import Dict
from kilostar.api.platform.event import kilostarEvent
-from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow
+from kilostar.core.work.workflow.workflow import KiloStarWorkflow
from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.logger import get_logger
@@ -145,12 +145,12 @@ class GlobalWorkflowManager:
self.event_dict[trace_id].attachment = attachment
await self._upsert_event_to_db(self.event_dict[trace_id])
- async def update_workflow(self, trace_id: str, workflow: kilostarWorkflow) -> None:
+ async def update_workflow(self, trace_id: str, workflow: KiloStarWorkflow) -> None:
if trace_id in self.event_dict:
self.event_dict[trace_id].workflow = workflow
await self._upsert_event_to_db(self.event_dict[trace_id])
- async def get_workflow(self, trace_id: str) -> kilostarWorkflow | None:
+ async def get_workflow(self, trace_id: str) -> KiloStarWorkflow | None:
event = await self.get_event(trace_id)
return event.workflow if event else None
diff --git a/kilostar/core/individual/consciousness_node/consciousness_node.py b/kilostar/core/individual/consciousness_node/consciousness_node.py
index 8997664..46b9ca8 100644
--- a/kilostar/core/individual/consciousness_node/consciousness_node.py
+++ b/kilostar/core/individual/consciousness_node/consciousness_node.py
@@ -28,13 +28,11 @@ from pydantic_ai import Agent, RunContext
from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine
from kilostar.core.global_state_machine.model_provider.base_provider import Provider
from kilostar.adapter.model_adapter.agent_factory import AgentFactory
+from kilostar.utils.ray_hook import ray_actor_hook
@ray.remote
class ConsciousnessNode:
- """ConsciousnessNode 核心组件类。
- 这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
-
def __init__(self) -> None:
from kilostar.utils.logger import get_logger
@@ -48,19 +46,6 @@ class ConsciousnessNode:
model_id: str,
tools_list: list[str] = None,
) -> None:
- """
- create_agent方法,将agent对象装配到ConsciousnessNode的属性内
- 该方法通过provider_title从global_state_machine中获取provider对象,然后从provider对象中取出供应商形象,装配为pydantic_ai的
- Agent实例,
- 并挂载到self.agent属性
- Args:
- global_state_machine: 全局状态机
- provider_title: 供应商名
- model_id: 模型id
-
- Returns:
- 无返回
- """
system_prompt: str = (
"你叫kilostar,是一个多智能体AI助手系统中的【意识节点 (Consciousness Node)】。\n"
"你是系统的'高级规划师'和'架构师',负责处理监控节点分配过来的复杂任务。\n"
@@ -91,10 +76,6 @@ class ConsciousnessNode:
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
- """执行与 dynamic prompt 相关的核心业务流转操作。
- 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
- Args: ctx (RunContext[ConsciousnessNodeDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。
- Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务上下文 ===\n"
@@ -109,14 +90,66 @@ class ConsciousnessNode:
return prompt
+ async def start_workflow_design(self, trace_id: str, command: str):
+ """
+ 开始进行工作流设计的交互过程(与用户通过 SSE 进行确认或直接生成)
+ 目前简化为:直接根据 command 拆解并构建工作流,然后提交执行。
+ """
+ self.logger.info(
+ f"ConsciousnessNode: 开始为 trace_id {trace_id} 设计工作流。原始命令:{command}"
+ )
+ # 获取可用技能 (示例)
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ skills_entities = await postgres_database.get_all_worker_individual.remote()
+ available_skills = []
+ if skills_entities:
+ for skill in skills_entities:
+ available_skills.append(
+ {
+ "agent_id": skill.agent_id,
+ "name": skill.agent_name,
+ "description": skill.description,
+ }
+ )
+
+ payload = ForWorkflowEngineInput(
+ original_command=command, available_skills=available_skills
+ )
+
+ # 通知 SSE 正在生成图结构
+ global_workflow_manager = ray_actor_hook(
+ "global_workflow_manager"
+ ).global_workflow_manager
+ await global_workflow_manager.put_received.remote(
+ trace_id, "正在为您构建并规划工作流任务节点,请稍候..."
+ )
+
+ # 实际构建过程
+ result = await self.working(payload)
+
+ if result and isinstance(result, ForWorkflowEngine):
+ workflow = result.workflow
+ workflow.trace_id = trace_id
+
+ await global_workflow_manager.put_received.remote(
+ trace_id, "工作流构建完成,即将开始执行!"
+ )
+
+ # 将生成的完整工作流提交执行
+ workflow_engine = ray_actor_hook(
+ "workflow_running_engine"
+ ).workflow_running_engine
+ await workflow_engine.execute_workflow.remote(workflow)
+ else:
+ await global_workflow_manager.put_received.remote(
+ trace_id, "很抱歉,工作流生成失败。"
+ )
+ await postgres_database.update_workflow_status.remote(trace_id, "failed")
+
async def working(
self,
payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForregulatoryInput],
) -> Union[ForWorkflowEngine, ForWorkflow, ForregulatoryNode, None]:
- """执行与 working 相关的核心业务流转操作。
- 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
- Args: payload (Union[ForWorkflowEngineInput, ForWorkflowInput, ForregulatoryInput]): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
- Returns: (Union[ForWorkflowEngine, ForWorkflow, ForregulatoryNode, None]): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
result = await self._run(payload)
if isinstance(result, (ForWorkflowEngine, ForWorkflow, ForregulatoryNode)):
@@ -132,51 +165,20 @@ class ConsciousnessNode:
@overload
async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine:
- """
- _run方法
- 该分支应当在regulatory_node简单处理用户命令后,工作流创建前调用!
- Args:
- payload: 应当包含原始命令和可用技能等信息
-
- Returns:
- ForWorkflowEngine对象,将被放到全局状态机后丢入WorkflowEngine的异步队列
- """
pass
@overload
async def _run(self, payload: ForWorkflow) -> ForWorkflow:
- """
- _run方法
- 该分支应当在workflow运行时,由WorkflowEngine进行调用!
- Args:
- payload: 应当包含workflow中的WorkStep对象
-
- Returns:
- ForWorkflow对象,作为ConsciousnessNode执行Workflow中的WorkStep的结果
- """
pass
@overload
async def _run(self, payload: ForregulatoryInput) -> ForregulatoryNode:
- """
- _run方法
- 该分支应当在workflow运行完全结束后,由WorkflowEngine进行调用!
- Args:
- payload: 应当包含整个Workflow的情况
-
- Returns:
- Forregulatory对象,作为ConsciousnessNode对于全工作流的技术性总结,返回给regulatoryNode
- """
pass
async def _run(
self,
payload: Union[ForregulatoryInput, ForWorkflowInput, ForWorkflowEngineInput],
) -> Union[ForregulatoryNode, ForWorkflow, ForWorkflowEngine]:
- """执行与 run 相关的核心业务流转操作。
- 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
- Args: payload (Union[ForregulatoryInput, ForWorkflowInput, ForWorkflowEngineInput]): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
- Returns: (Union[ForregulatoryNode, ForWorkflow, ForWorkflowEngine]): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
self.agent.retries = 3
if isinstance(payload, ForWorkflowEngineInput):
diff --git a/kilostar/core/individual/consciousness_node/template.py b/kilostar/core/individual/consciousness_node/template.py
index 5a5b53f..ab2674d 100644
--- a/kilostar/core/individual/consciousness_node/template.py
+++ b/kilostar/core/individual/consciousness_node/template.py
@@ -13,7 +13,7 @@
# limitations under the License.
-from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow, WorkStep
+from kilostar.core.work.workflow.workflow import KiloStarWorkflow, WorkflowStep
from kilostar.utils.agent_model import ResponseModel, DepsModel, InputModel
from pydantic import Field
@@ -28,7 +28,7 @@ class ConsciousnessNodeResponse(ResponseModel):
class ForWorkflowEngine(ConsciousnessNodeResponse):
"""生成workflow并放入WorkflowEngine"""
- workflow: kilostarWorkflow = Field(
+ workflow: KiloStarWorkflow = Field(
..., description="生成好的符合规范的完整工作流对象。"
)
reasoning: str = Field(..., description="生成此工作流的原因和思路简述。")
@@ -76,7 +76,7 @@ class ForWorkflowInput(ConsciousnessNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
- workflow_step: WorkStep
+ workflow_step: WorkflowStep
original_command: str
@@ -84,5 +84,5 @@ class ForregulatoryInput(ConsciousnessNodeInput):
"""ForregulatoryInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForregulatoryInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
- workflow: kilostarWorkflow
+ workflow: KiloStarWorkflow
original_command: str
diff --git a/kilostar/core/individual/control_node/template.py b/kilostar/core/individual/control_node/template.py
index 7f73f8d..65347e7 100644
--- a/kilostar/core/individual/control_node/template.py
+++ b/kilostar/core/individual/control_node/template.py
@@ -14,7 +14,7 @@
from pydantic import Field
-from kilostar.core.workflow_running_engine.workflow import WorkStep
+from kilostar.core.work.workflow.workflow import WorkflowStep
from kilostar.utils.agent_model import ResponseModel, InputModel, DepsModel
@@ -35,7 +35,8 @@ class ControlNodeDeps(DepsModel):
"""ControlNodeDeps 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
- workflow_step: WorkStep
+ workflow_step: WorkflowStep
+ workflow_step: WorkflowStep
# In the future, this can be dynamically populated with tools specific to the current task execution
@@ -52,4 +53,4 @@ class ForWorkflowInput(ControlNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
- workflow_step: WorkStep
+ workflow_step: WorkflowStep
diff --git a/kilostar/core/individual/regulatory_node/regulatory_node.py b/kilostar/core/individual/regulatory_node/regulatory_node.py
index 2ed81db..c827ab7 100644
--- a/kilostar/core/individual/regulatory_node/regulatory_node.py
+++ b/kilostar/core/individual/regulatory_node/regulatory_node.py
@@ -157,7 +157,9 @@ class RegulatoryNode:
return "抱歉,监控节点处理请求时发生严重错误,请联系管理员。"
@overload
- async def _run(self, payload: kilostarEvent) -> Union[ForConsciousnessNode, ForUser]:
+ async def _run(
+ self, payload: kilostarEvent
+ ) -> Union[ForConsciousnessNode, ForUser]:
"""
_run方法
Args:
diff --git a/kilostar/core/postgres_database/model/__init__.py b/kilostar/core/postgres_database/model/__init__.py
index b09f6ee..18d80a4 100644
--- a/kilostar/core/postgres_database/model/__init__.py
+++ b/kilostar/core/postgres_database/model/__init__.py
@@ -15,5 +15,21 @@
from kilostar.core.postgres_database.model.user import User
from kilostar.core.postgres_database.model.provider import Provider
from kilostar.core.postgres_database.model.individual import WorkerIndividual
+from kilostar.core.postgres_database.model.workflow import (
+ Workflow,
+ WorkflowContextModel,
+)
+from kilostar.core.postgres_database.model.chat_history import (
+ ChatHistoryRegister,
+ ChatHistoryMessage,
+)
-__all__ = ["User", "Provider", "WorkerIndividual"]
+__all__ = [
+ "User",
+ "Provider",
+ "WorkerIndividual",
+ "Workflow",
+ "WorkflowContextModel",
+ "ChatHistoryRegister",
+ "ChatHistoryMessage",
+]
diff --git a/kilostar/core/postgres_database/model/base.py b/kilostar/core/postgres_database/model/base.py
index e364561..c6016bc 100644
--- a/kilostar/core/postgres_database/model/base.py
+++ b/kilostar/core/postgres_database/model/base.py
@@ -15,5 +15,6 @@
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
+
class BaseDataModel(DeclarativeBase, AsyncAttrs):
- pass
\ No newline at end of file
+ pass
diff --git a/kilostar/core/postgres_database/model/chat_history.py b/kilostar/core/postgres_database/model/chat_history.py
index e6e004a..5b6593e 100644
--- a/kilostar/core/postgres_database/model/chat_history.py
+++ b/kilostar/core/postgres_database/model/chat_history.py
@@ -11,19 +11,55 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Literal
+from sqlalchemy import String, DateTime, func
+from sqlalchemy.orm import Mapped, mapped_column
from .base import BaseDataModel
-from sqlalchemy.orm import Mapped
-class ChatHistoryMessage(BaseDataModel):
- __tablename__ = "chat_history_massage"
- message_id: Mapped[str]
- message: Mapped[str]
- message_owner: Literal["user","regulatory_node"]
class ChatHistoryRegister(BaseDataModel):
- __tablename__ = "chat_history_register"
- chat_id: Mapped[str]
- user_id: Mapped[str]
+ """
+ 一个特定的聊天会话记录注册表。
+ 类似于多会话的一个 Thread/Session。
+ """
+ __tablename__ = "chat_history_register"
+
+ chat_id: Mapped[str] = mapped_column(
+ String(64), primary_key=True, description="聊天会话ID"
+ )
+ user_id: Mapped[str] = mapped_column(
+ String(64), index=True, description="归属的用户ID"
+ )
+ title: Mapped[str] = mapped_column(
+ String(255), default="新对话", description="对话标题"
+ )
+ created_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now()
+ )
+ updated_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
+ )
+
+
+class ChatHistoryMessage(BaseDataModel):
+ """
+ 特定会话中的每一条具体消息记录。
+ """
+
+ __tablename__ = "chat_history_message"
+
+ message_id: Mapped[str] = mapped_column(
+ String(64), primary_key=True, description="消息ID"
+ )
+ chat_id: Mapped[str] = mapped_column(
+ String(64), index=True, description="所属会话ID"
+ )
+ message: Mapped[str] = mapped_column(String, description="消息体内容")
+ message_owner: Mapped[str] = mapped_column(
+ String(50),
+ description="消息发送方,例如 'user', 'regulatory_node', 'consciousness_node' 等",
+ )
+ created_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now()
+ )
diff --git a/kilostar/core/postgres_database/model/individual.py b/kilostar/core/postgres_database/model/individual.py
index f05b855..011b67b 100644
--- a/kilostar/core/postgres_database/model/individual.py
+++ b/kilostar/core/postgres_database/model/individual.py
@@ -44,10 +44,7 @@ class BaseIndividualModel(BaseDataModel):
agent_type: Mapped[str] = mapped_column(String(32))
- __mapper_args__ = {
- "polymorphic_on": "agent_type",
- "polymorphic_identity": "base"
- }
+ __mapper_args__ = {"polymorphic_on": "agent_type", "polymorphic_identity": "base"}
# ==========================================
@@ -57,8 +54,7 @@ class SpecialistIndividualModel(BaseIndividualModel):
__tablename__ = "specialist_individual"
agent_id: Mapped[str] = mapped_column(
- ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
- primary_key=True
+ ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
bound_skill: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONB)
workspace: Mapped[Optional[List[str]]] = mapped_column(JSONB)
@@ -70,12 +66,12 @@ class SpecialistIndividualModel(BaseIndividualModel):
sub_ordinary_agents: Mapped[List["OrdinaryIndividualModel"]] = relationship(
back_populates="manager",
cascade="all, delete-orphan",
- foreign_keys="[OrdinaryIndividualModel.manager_id]"
+ foreign_keys="[OrdinaryIndividualModel.manager_id]",
)
sub_special_agents: Mapped[List["SpecialIndividualModel"]] = relationship(
back_populates="manager",
cascade="all, delete-orphan",
- foreign_keys="[SpecialIndividualModel.manager_id]"
+ foreign_keys="[SpecialIndividualModel.manager_id]",
)
__mapper_args__ = {
@@ -90,8 +86,7 @@ class OrdinaryIndividualModel(BaseIndividualModel):
__tablename__ = "ordinary_individual"
agent_id: Mapped[str] = mapped_column(
- ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
- primary_key=True
+ ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
finetuned_from: Mapped[Optional[str]] = mapped_column(String(100))
tools: Mapped[Optional[List[str]]] = mapped_column(
@@ -106,7 +101,7 @@ class OrdinaryIndividualModel(BaseIndividualModel):
# 逻辑关联:指向上级专家
manager: Mapped[Optional["SpecialistIndividualModel"]] = relationship(
back_populates="sub_ordinary_agents",
- foreign_keys=[manager_id] # 显式指定使用 manager_id 解析关系
+ foreign_keys=[manager_id], # 显式指定使用 manager_id 解析关系
)
__mapper_args__ = {
@@ -121,12 +116,10 @@ class SpecialIndividualModel(BaseIndividualModel):
__tablename__ = "special_individual"
agent_id: Mapped[str] = mapped_column(
- ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
- primary_key=True
+ ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
modality_type: Mapped[ModalityType] = mapped_column(
- default=ModalityType.MULTIMODAL,
- server_default=text("'multimodal'")
+ default=ModalityType.MULTIMODAL, server_default=text("'multimodal'")
)
multimodal_config: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONB)
@@ -137,10 +130,9 @@ class SpecialIndividualModel(BaseIndividualModel):
# 【修复2】:修正 back_populates 指向正确的变量名
manager: Mapped[Optional["SpecialistIndividualModel"]] = relationship(
- back_populates="sub_special_agents",
- foreign_keys=[manager_id]
+ back_populates="sub_special_agents", foreign_keys=[manager_id]
)
__mapper_args__ = {
"polymorphic_identity": "special",
- }
\ No newline at end of file
+ }
diff --git a/kilostar/core/postgres_database/model/provider.py b/kilostar/core/postgres_database/model/provider.py
index 746b69c..bad3055 100644
--- a/kilostar/core/postgres_database/model/provider.py
+++ b/kilostar/core/postgres_database/model/provider.py
@@ -24,6 +24,7 @@ class ProviderModel(BaseDataModel):
Provider 物理模型。
作为模型/服务提供商适配器,标准化不同供应商(OpenAI, Anthropic 等)的配置。
"""
+
__tablename__ = "provider"
provider_id: Mapped[str] = mapped_column(String(64), primary_key=True)
provider_title: Mapped[str] = mapped_column(String(100), index=True, nullable=False)
@@ -31,14 +32,12 @@ class ProviderModel(BaseDataModel):
provider_url: Mapped[Optional[str]] = mapped_column(Text)
provider_apikey: Mapped[Optional[str]] = mapped_column(Text)
provider_models: Mapped[List[str]] = mapped_column(
- JSONB,
- default=list,
- server_default=text("'[]'::jsonb")
+ JSONB, default=list, server_default=text("'[]'::jsonb")
)
provider_owner: Mapped[str] = mapped_column(String(64), index=True)
is_active: Mapped[bool] = mapped_column(
Boolean,
default=True,
server_default=text("true"),
- comment="该服务商节点是否在线/启用"
+ comment="该服务商节点是否在线/启用",
)
diff --git a/kilostar/core/postgres_database/model/system_node.py b/kilostar/core/postgres_database/model/system_node.py
index abf55e5..691b6ee 100644
--- a/kilostar/core/postgres_database/model/system_node.py
+++ b/kilostar/core/postgres_database/model/system_node.py
@@ -13,10 +13,12 @@
# limitations under the License.
from typing import List, Optional
-from sqlalchemy import String, Text
-from sqlalchemy.dialects.postgresql import JSONB # 针对 Postgres 优化,支持索引和高性能解析
+from sqlalchemy import String
+from sqlalchemy.dialects.postgresql import (
+ JSONB,
+) # 针对 Postgres 优化,支持索引和高性能解析
from sqlalchemy.orm import Mapped, mapped_column
-from .base import BaseDataModel
+from .base import BaseDataModel
class SystemNodeConfigModel(BaseDataModel):
@@ -24,12 +26,11 @@ class SystemNodeConfigModel(BaseDataModel):
SystemNodeConfig 物理模型。
作为 kilostar 架构中的独立处理单元,负责存储 LLM 节点的执行策略与工具配置。
"""
+
__tablename__ = "system_node_config"
node_name: Mapped[str] = mapped_column(String(100), primary_key=True)
provider_title: Mapped[str] = mapped_column(String(50), nullable=False)
model_id: Mapped[str] = mapped_column(String(100), nullable=False)
tools: Mapped[Optional[List[str]]] = mapped_column(
- JSONB,
- default=list,
- comment="节点可调用的工具标识列表"
+ JSONB, default=list, comment="节点可调用的工具标识列表"
)
diff --git a/kilostar/core/postgres_database/model/user.py b/kilostar/core/postgres_database/model/user.py
index 0705bb5..9da2923 100644
--- a/kilostar/core/postgres_database/model/user.py
+++ b/kilostar/core/postgres_database/model/user.py
@@ -25,6 +25,7 @@ class UserAuthority(IntEnum):
"""
权限枚举类
"""
+
SUPER_ADMINISTRATOR = 100
ADMINISTRATOR = 50
USER = 20
@@ -36,12 +37,11 @@ class User(BaseDataModel):
"""
数据库user表模型
"""
+
__tablename__ = "user"
user_id: Mapped[str] = mapped_column(String(64), primary_key=True)
user_name: Mapped[str] = mapped_column(String(100), index=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
user_authority: Mapped[UserAuthority] = mapped_column(
- Integer,
- default=UserAuthority.USER,
- server_default=text("20")
+ Integer, default=UserAuthority.USER, server_default=text("20")
)
diff --git a/kilostar/core/postgres_database/model/workflow.py b/kilostar/core/postgres_database/model/workflow.py
index 0252abd..7fbf194 100644
--- a/kilostar/core/postgres_database/model/workflow.py
+++ b/kilostar/core/postgres_database/model/workflow.py
@@ -12,12 +12,70 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from sqlmodel import SQLModel, Field
+from sqlalchemy import String, DateTime, func
+from sqlalchemy.orm import Mapped, mapped_column
+from sqlalchemy.dialects.postgresql import JSONB
+from .base import BaseDataModel
-class EventRecord(SQLModel, table=True):
- trace_id: str = Field(
- primary_key=True, description="The unique trace ID of the kilostarEvent"
+class Workflow(BaseDataModel):
+ __tablename__ = "workflow"
+
+ trace_id: Mapped[str] = mapped_column(
+ String(64), primary_key=True, description="工作流唯一ID (Trace ID)"
+ )
+ user_id: Mapped[str] = mapped_column(
+ String(64), index=True, description="创建该工作流的用户ID"
+ )
+ title: Mapped[str] = mapped_column(String(255), description="工作流标题/简短描述")
+ command: Mapped[str] = mapped_column(
+ String, description="创建工作流的原始用户命令文本"
+ )
+ status: Mapped[str] = mapped_column(
+ String(50),
+ default="creating",
+ description="工作流的总体状态 (例如: creating, running, pending, completed, failed等)",
+ )
+ version: Mapped[str] = mapped_column(
+ String(50), default="v1.0", description="系统协议版本号"
+ )
+ created_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now()
+ )
+ updated_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
+ )
+
+
+class WorkflowContextModel(BaseDataModel):
+ __tablename__ = "workflow_context"
+
+ trace_id: Mapped[str] = mapped_column(
+ String(64), primary_key=True, description="对应的工作流 Trace ID"
+ )
+ workflow_status: Mapped[dict] = mapped_column(
+ JSONB, default=dict, description="工作流状态变更历史"
+ )
+ blackboard: Mapped[dict] = mapped_column(
+ JSONB, default=dict, description="大模型输出的存储区 (共享黑板)"
+ )
+ work_step_status: Mapped[dict] = mapped_column(
+ JSONB, nullable=True, description="工作流运行步骤状态"
+ )
+ workflow_pointer: Mapped[int] = mapped_column(
+ nullable=True, description="工作流指针,指向具体运行步骤位置"
+ )
+ workflow_log: Mapped[list] = mapped_column(
+ JSONB, default=list, description="工作流运行日志"
+ )
+ work_link: Mapped[list] = mapped_column(
+ JSONB,
+ default=list,
+ description="工作链(即 WorkflowStep 的定义列表,包含图结构和原子动作)",
+ )
+ created_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now()
+ )
+ updated_at: Mapped[str] = mapped_column(
+ DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
- event_data_json: str = Field(description="The JSON serialized kilostarEvent data")
diff --git a/kilostar/core/postgres_database/module/chat_history.py b/kilostar/core/postgres_database/module/chat_history.py
new file mode 100644
index 0000000..009502f
--- /dev/null
+++ b/kilostar/core/postgres_database/module/chat_history.py
@@ -0,0 +1,82 @@
+# Copyright 2026 zhaoxi826
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from sqlalchemy import select, func
+from typing import List
+from kilostar.core.postgres_database.model.chat_history import (
+ ChatHistoryRegister,
+ ChatHistoryMessage,
+)
+from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
+from ulid import ULID
+
+
+class ChatHistoryDatabase:
+ def __init__(self, async_session_maker: async_sessionmaker[AsyncSession]):
+ self.async_session_maker = async_session_maker
+
+ async def create_chat_session(
+ self, user_id: str, title: str = "新对话"
+ ) -> ChatHistoryRegister:
+ async with self.async_session_maker() as session:
+ chat_id = str(ULID())
+ chat = ChatHistoryRegister(chat_id=chat_id, user_id=user_id, title=title)
+ session.add(chat)
+ await session.commit()
+ await session.refresh(chat)
+ return chat
+
+ async def list_chat_sessions(self, user_id: str) -> List[ChatHistoryRegister]:
+ async with self.async_session_maker() as session:
+ statement = (
+ select(ChatHistoryRegister)
+ .where(ChatHistoryRegister.user_id == user_id)
+ .order_by(ChatHistoryRegister.updated_at.desc())
+ )
+ results = await session.execute(statement)
+ return results.scalars().all()
+
+ async def add_chat_message(
+ self, chat_id: str, message: str, message_owner: str
+ ) -> ChatHistoryMessage:
+ async with self.async_session_maker() as session:
+ msg_id = str(ULID())
+ msg = ChatHistoryMessage(
+ message_id=msg_id,
+ chat_id=chat_id,
+ message=message,
+ message_owner=message_owner,
+ )
+ session.add(msg)
+ # Update the chat session's updated_at
+ statement = select(ChatHistoryRegister).where(
+ ChatHistoryRegister.chat_id == chat_id
+ )
+ results = await session.execute(statement)
+ chat = results.scalar_one_or_none()
+ if chat:
+ chat.updated_at = func.now()
+ await session.commit()
+ await session.refresh(msg)
+ return msg
+
+ async def list_chat_messages(self, chat_id: str) -> List[ChatHistoryMessage]:
+ async with self.async_session_maker() as session:
+ statement = (
+ select(ChatHistoryMessage)
+ .where(ChatHistoryMessage.chat_id == chat_id)
+ .order_by(ChatHistoryMessage.created_at.asc())
+ )
+ results = await session.execute(statement)
+ return results.scalars().all()
diff --git a/kilostar/core/postgres_database/module/workflow.py b/kilostar/core/postgres_database/module/workflow.py
new file mode 100644
index 0000000..a7105e9
--- /dev/null
+++ b/kilostar/core/postgres_database/module/workflow.py
@@ -0,0 +1,96 @@
+# Copyright 2026 zhaoxi826
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from sqlalchemy import select
+from typing import List, Optional
+from kilostar.core.postgres_database.model.workflow import (
+ Workflow,
+ WorkflowContextModel,
+)
+from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
+
+
+class WorkflowDatabase:
+ def __init__(self, async_session_maker: async_sessionmaker[AsyncSession]):
+ self.async_session_maker = async_session_maker
+
+ async def create_workflow(
+ self, trace_id: str, user_id: str, title: str, command: str
+ ) -> Workflow:
+ async with self.async_session_maker() as session:
+ wf = Workflow(
+ trace_id=trace_id,
+ user_id=user_id,
+ title=title,
+ command=command,
+ status="creating",
+ )
+ session.add(wf)
+ await session.commit()
+ await session.refresh(wf)
+ return wf
+
+ async def get_workflow(self, trace_id: str) -> Optional[Workflow]:
+ async with self.async_session_maker() as session:
+ statement = select(Workflow).where(Workflow.trace_id == trace_id)
+ results = await session.execute(statement)
+ return results.scalar_one_or_none()
+
+ async def update_workflow_status(
+ self, trace_id: str, status: str
+ ) -> Optional[Workflow]:
+ async with self.async_session_maker() as session:
+ statement = select(Workflow).where(Workflow.trace_id == trace_id)
+ results = await session.execute(statement)
+ record = results.scalar_one_or_none()
+ if record:
+ record.status = status
+ await session.commit()
+ await session.refresh(record)
+ return record
+
+ async def list_workflows(self, user_id: str) -> List[Workflow]:
+ async with self.async_session_maker() as session:
+ statement = select(Workflow).where(Workflow.user_id == user_id)
+ results = await session.execute(statement)
+ return results.scalars().all()
+
+ async def upsert_workflow_context(
+ self, trace_id: str, **kwargs
+ ) -> WorkflowContextModel:
+ async with self.async_session_maker() as session:
+ statement = select(WorkflowContextModel).where(
+ WorkflowContextModel.trace_id == trace_id
+ )
+ results = await session.execute(statement)
+ record = results.scalar_one_or_none()
+ if record:
+ for key, value in kwargs.items():
+ setattr(record, key, value)
+ else:
+ record = WorkflowContextModel(trace_id=trace_id, **kwargs)
+ session.add(record)
+ await session.commit()
+ await session.refresh(record)
+ return record
+
+ async def get_workflow_context(
+ self, trace_id: str
+ ) -> Optional[WorkflowContextModel]:
+ async with self.async_session_maker() as session:
+ statement = select(WorkflowContextModel).where(
+ WorkflowContextModel.trace_id == trace_id
+ )
+ results = await session.execute(statement)
+ return results.scalar_one_or_none()
diff --git a/kilostar/core/postgres_database/postgres.py b/kilostar/core/postgres_database/postgres.py
index 994cf36..68f38ce 100644
--- a/kilostar/core/postgres_database/postgres.py
+++ b/kilostar/core/postgres_database/postgres.py
@@ -25,6 +25,8 @@ from .module.event import EventDatabase
from .module.user import AuthDatabase
from .module.provider import ProviderDatabase
from .module.system_node import SystemNodeDatabase
+from .module.workflow import WorkflowDatabase
+from .module.chat_history import ChatHistoryDatabase
@ray.remote
@@ -51,6 +53,8 @@ class PostgresDatabase:
self._individual_database = IndividualDatabase(self.async_session_maker)
self._event_database = EventDatabase(self.async_session_maker)
self._system_node_database = SystemNodeDatabase(self.async_session_maker)
+ self._workflow_database = WorkflowDatabase(self.async_session_maker)
+ self._chat_history_database = ChatHistoryDatabase(self.async_session_maker)
self.ready_event = asyncio.Event()
@@ -254,3 +258,51 @@ class PostgresDatabase:
async def delete_event(self, trace_id: str):
await self.ready_event.wait()
return await self._event_database.delete_event(trace_id)
+
+ # Workflow Database Methods
+ async def create_workflow(
+ self, trace_id: str, user_id: str, title: str, command: str
+ ):
+ await self.ready_event.wait()
+ return await self._workflow_database.create_workflow(
+ trace_id, user_id, title, command
+ )
+
+ async def get_workflow(self, trace_id: str):
+ await self.ready_event.wait()
+ return await self._workflow_database.get_workflow(trace_id)
+
+ async def update_workflow_status(self, trace_id: str, status: str):
+ await self.ready_event.wait()
+ return await self._workflow_database.update_workflow_status(trace_id, status)
+
+ async def list_workflows(self, user_id: str):
+ await self.ready_event.wait()
+ return await self._workflow_database.list_workflows(user_id)
+
+ async def upsert_workflow_context(self, trace_id: str, **kwargs):
+ await self.ready_event.wait()
+ return await self._workflow_database.upsert_workflow_context(trace_id, **kwargs)
+
+ async def get_workflow_context(self, trace_id: str):
+ await self.ready_event.wait()
+ return await self._workflow_database.get_workflow_context(trace_id)
+
+ # Chat History Database Methods
+ async def create_chat_session(self, user_id: str, title: str = "新对话"):
+ await self.ready_event.wait()
+ return await self._chat_history_database.create_chat_session(user_id, title)
+
+ async def list_chat_sessions(self, user_id: str):
+ await self.ready_event.wait()
+ return await self._chat_history_database.list_chat_sessions(user_id)
+
+ async def add_chat_message(self, chat_id: str, message: str, message_owner: str):
+ await self.ready_event.wait()
+ return await self._chat_history_database.add_chat_message(
+ chat_id, message, message_owner
+ )
+
+ async def list_chat_messages(self, chat_id: str):
+ await self.ready_event.wait()
+ return await self._chat_history_database.list_chat_messages(chat_id)
diff --git a/kilostar/core/work/workflow/model.py b/kilostar/core/work/workflow/model.py
index eed6e7a..79a2f8c 100644
--- a/kilostar/core/work/workflow/model.py
+++ b/kilostar/core/work/workflow/model.py
@@ -16,13 +16,17 @@ from pydantic import BaseModel, Field
from typing import Literal, Optional
from enum import Enum
+
class LogicGate(BaseModel):
"""
LogicGate 类。
跳转逻辑,标记该步骤运行成功或失败的动作
"""
+
if_fail: str = Field(..., description="失败跳转目标,如 'jump_to_step_1'")
- if_pass: Literal["continue", "exit"] = Field(default="continue", description="成功后的动作")
+ if_pass: Literal["continue", "exit"] = Field(
+ default="continue", description="成功后的动作"
+ )
class WorkflowMetadata(BaseModel):
@@ -30,6 +34,7 @@ class WorkflowMetadata(BaseModel):
WorkflowMetadata类
workflow的元数据类,保存与用户有关的数据
"""
+
user_id: Optional[str] = Field(default=None, description="创建工作流的用户的ulid")
command: Optional[str] = Field(default=None, description="创建工作流的原始命令")
@@ -44,6 +49,7 @@ class WorkStepStatus(str, Enum):
COMPLETED: 完成
FAILED = 失败
"""
+
PENDING = "pending"
WORKING = "working"
HANGUP = "hang_up"
@@ -61,9 +67,10 @@ class WorkflowStatus(str, Enum):
CREATING = 创建中
PENDING = 等待中
"""
+
RUNNING = "running"
HANGUP = "hang_up"
COMPLETED = "completed"
FAILED = "failed"
CREATING = "creating"
- PENDING = "pending"
\ No newline at end of file
+ PENDING = "pending"
diff --git a/kilostar/core/work/workflow/workflow.py b/kilostar/core/work/workflow/workflow.py
index 031ad6d..ac66cae 100644
--- a/kilostar/core/work/workflow/workflow.py
+++ b/kilostar/core/work/workflow/workflow.py
@@ -18,19 +18,32 @@ from .model import LogicGate, WorkflowMetadata, WorkStepStatus, WorkflowStatus
from ulid import ULID
from datetime import datetime
+
class WorkflowContext(BaseModel):
"""
WorkflowContext 类
作为workflow运行时的数据部分,使得数据和计算分离
"""
+
trace_id: str = Field(description="工作流的trace_id")
- workflow_status: Dict[str, WorkflowStatus] = Field(default_factory=lambda: {datetime.now().strftime("%Y-%m-%d %H:%M:%S"):WorkflowStatus.CREATING} ,description="工作流状态")
+ workflow_status: Dict[str, WorkflowStatus] = Field(
+ default_factory=lambda: {
+ datetime.now().strftime("%Y-%m-%d %H:%M:%S"): WorkflowStatus.CREATING
+ },
+ description="工作流状态",
+ )
blackboard: Dict[str, Any] = Field(description="大模型输出的存储区")
- work_step_status: Optional[Dict[int, tuple[str, WorkStepStatus]]] = Field(default= None,description="工作流运行状态")
+ work_step_status: Optional[Dict[int, tuple[str, WorkStepStatus]]] = Field(
+ default=None, description="工作流运行状态"
+ )
"""work_step_status:字典,键为整个工作流的运行步骤,值为元组,包含两个字段:
1.字符串,更新时间的字符串;2.WorkflowStatus枚举类,当前步骤的运行情况"""
- workflow_pointer: Optional[int] = Field(description="工作流指针,指向具体的workflow位置")
- workflow_log: List[Dict[int, tuple[str, WorkflowStatus, str]]] = Field(default=[], description="工作流运行日志")
+ workflow_pointer: Optional[int] = Field(
+ description="工作流指针,指向具体的workflow位置"
+ )
+ workflow_log: List[Dict[int, tuple[str, WorkflowStatus, str]]] = Field(
+ default=[], description="工作流运行日志"
+ )
"""workflow_log:一个列表,内部元素为一个字典,键为步骤序号,值为一个元组,包含三个字段:
1.字符串,更新时间的字符串;2.WorkflowStatus枚举类,当前步骤的运行情况;3.字符串,当前步骤运行完后的输出总结或失败原因"""
@@ -40,12 +53,18 @@ class WorkflowStep(BaseModel):
WorkflowStep 类
workflow每一个步骤的模型,为workflow的最小执行单位
"""
+
step: int = Field(..., gt=0, description="步骤序号,严格自增")
name: str = Field(..., description="步骤名称")
action: str = Field(..., description="执行的原子动作")
- inputs: Optional[Union[str, List[str]]] = Field(default=None, description="前置依赖输出")
+ inputs: Optional[Union[str, List[str]]] = Field(
+ default=None, description="前置依赖输出"
+ )
outputs: Optional[str] = Field(default=None, description="当前步骤产出物变量名")
- agent_id: Optional[str] = Field(default=None,description="分配给 skill_individual 的 Skill Individual 真实 agent_id,不可用名称代替",)
+ agent_id: Optional[str] = Field(
+ default=None,
+ description="分配给 skill_individual 的 Skill Individual 真实 agent_id,不可用名称代替",
+ )
logic_gate: Optional[LogicGate] = Field(default=None, description="逻辑跳转控制")
@@ -54,9 +73,12 @@ class KiloStarWorkflow(BaseModel):
KiloStarWorkflow 类
kilostar的workflow核心类,由consciousness_node创建
"""
- trace_id: str = Field(default_factory=lambda: str(ULID()), description="系统自动生成的追溯ID")
+
+ trace_id: str = Field(
+ default_factory=lambda: str(ULID()), description="系统自动生成的追溯ID"
+ )
version: str = Field(default="v1.0", description="系统协议版本号")
- #-------------------
+ # -------------------
title: str = Field(..., description="工作流标题")
work_link: List[WorkflowStep] = Field(..., description="工作链")
workflow_metadata: WorkflowMetadata
@@ -86,4 +108,4 @@ class KiloStarWorkflow(BaseModel):
if "越界" in str(e):
raise e
raise ValueError(f"LogicGate 格式错误: {s.logic_gate.if_fail}")
- return self
\ No newline at end of file
+ return self
diff --git a/kilostar/core/work/workflow/workflow_engine.py b/kilostar/core/work/workflow/workflow_engine.py
new file mode 100644
index 0000000..46d80a7
--- /dev/null
+++ b/kilostar/core/work/workflow/workflow_engine.py
@@ -0,0 +1,177 @@
+# Copyright 2026 zhaoxi826
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import ray
+from kilostar.core.work.workflow.workflow import KiloStarWorkflow
+from typing import Dict, Any, List
+
+
+@ray.remote
+def run_workflow_task(workflow_data: dict, trace_id: str):
+ from kilostar.utils.ray_hook import ray_actor_hook
+ from kilostar.core.work.workflow.model import WorkflowStatus
+ import datetime
+ from pydantic import BaseModel
+
+ # State passed through graph nodes
+ class WorkflowGraphState(BaseModel):
+ trace_id: str
+ blackboard: Dict[str, Any]
+ work_link: List[Dict[str, Any]]
+ current_step_index: int = 0
+ status: str = "running"
+ logs: List[Dict[str, Any]] = []
+
+ async def save_context(state: WorkflowGraphState):
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ await postgres_database.upsert_workflow_context.remote(
+ state.trace_id,
+ workflow_pointer=state.current_step_index,
+ blackboard=state.blackboard,
+ work_link=state.work_link,
+ workflow_status={str(datetime.datetime.now()): state.status},
+ workflow_log=state.logs,
+ )
+ await postgres_database.update_workflow_status.remote(
+ state.trace_id, state.status
+ )
+ global_workflow_manager = ray_actor_hook(
+ "global_workflow_manager"
+ ).global_workflow_manager
+ await global_workflow_manager.put_received.remote(
+ state.trace_id, f"执行步骤 {state.current_step_index + 1}..."
+ )
+
+ async def execute_step(state: WorkflowGraphState):
+ """执行单一工作流节点逻辑"""
+ if state.current_step_index >= len(state.work_link):
+ state.status = WorkflowStatus.COMPLETED
+ return state
+
+ step = state.work_link[state.current_step_index]
+ step.get("node", "")
+ action = step.get("action", "")
+
+ # 记录开始状态
+ state.logs.append(
+ {
+ str(state.current_step_index): [
+ str(datetime.datetime.now()),
+ "working",
+ f"开始执行: {step.get('name', '未命名步骤')}",
+ ]
+ }
+ )
+ await save_context(state)
+
+ try:
+ # TODO: 实际对接不同节点执行逻辑 (例如: control_node, agent 技能)
+ # 这里是简化版,向控制节点或指定 skill 发送指令
+
+ # ... 模拟执行逻辑 ...
+ await asyncio.sleep(2)
+
+ # 记录结果
+ state.blackboard[
+ step.get("outputs", f"step_{state.current_step_index}_result")
+ ] = "Success execution of " + action
+ state.logs[-1][str(state.current_step_index)] = [
+ str(datetime.datetime.now()),
+ "completed",
+ f"成功: {action}",
+ ]
+
+ # 判断逻辑跳转
+ logic_gate = step.get("logic_gate")
+ if logic_gate and logic_gate.get("if_pass") == "exit":
+ state.status = WorkflowStatus.COMPLETED
+ else:
+ state.current_step_index += 1
+
+ except Exception as e:
+ state.logs[-1][str(state.current_step_index)] = [
+ str(datetime.datetime.now()),
+ "failed",
+ str(e),
+ ]
+ state.status = WorkflowStatus.FAILED
+ logic_gate = step.get("logic_gate")
+ if logic_gate and logic_gate.get("if_fail"):
+ fail_target = logic_gate.get("if_fail")
+ if "jump_to_step_" in fail_target:
+ target_step = int(fail_target.split("_")[-1]) - 1
+ state.current_step_index = target_step
+ state.status = WorkflowStatus.RUNNING
+
+ await save_context(state)
+ return state
+
+ async def _run():
+ postgres_database = ray_actor_hook("postgres_database").postgres_database
+ await postgres_database.update_workflow_status.remote(
+ trace_id, WorkflowStatus.RUNNING
+ )
+
+ state = WorkflowGraphState(
+ trace_id=trace_id,
+ blackboard={},
+ work_link=workflow_data.get("work_link", []),
+ )
+ await save_context(state)
+
+ # 简单的图执行驱动 (模拟 pydantic-ai.graph.run 行为,直至 Graph 库正式稳定)
+ while state.status == WorkflowStatus.RUNNING and state.current_step_index < len(
+ state.work_link
+ ):
+ state = await execute_step(state)
+
+ await postgres_database.update_workflow_status.remote(trace_id, state.status)
+ global_workflow_manager = ray_actor_hook(
+ "global_workflow_manager"
+ ).global_workflow_manager
+ msg = (
+ "工作流执行完成!"
+ if state.status == WorkflowStatus.COMPLETED
+ else "工作流执行失败。"
+ )
+ await global_workflow_manager.put_received.remote(trace_id, msg)
+
+ asyncio.run(_run())
+
+
+@ray.remote
+class WorkflowRunningEngine:
+ def __init__(
+ self, consciousness_node=None, control_node=None, regulatory_node=None
+ ):
+ self.consciousness_node = consciousness_node
+ self.control_node = control_node
+ self.regulatory_node = regulatory_node
+ self.events_queue = asyncio.Queue()
+
+ async def put_event(self, event):
+ await self.events_queue.put(event)
+
+ async def run(self):
+ """引擎循环提取事件"""
+ while True:
+ await self.events_queue.get()
+ await asyncio.sleep(1)
+
+ async def execute_workflow(self, workflow: KiloStarWorkflow):
+ # 这个方法可以由意识节点调用来提交一个完整的运行任务
+ workflow_dict = workflow.model_dump()
+ trace_id = workflow.trace_id
+ run_workflow_task.remote(workflow_dict, trace_id)
diff --git a/pyproject.toml b/pyproject.toml
index a95965e..4647460 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -3,7 +3,7 @@ name = "kilostar"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
-requires-python = ">=3.13"
+requires-python = ">=3.12,<4.0"
dependencies = [
"asyncpg>=0.31.0",
"docker-py>=1.10.6",
diff --git a/tests/core/global_state_machine/global_state_machine_test.py b/tests/core/global_state_machine/global_state_machine_test.py
index 38bbac0..765747f 100644
--- a/tests/core/global_state_machine/global_state_machine_test.py
+++ b/tests/core/global_state_machine/global_state_machine_test.py
@@ -123,4 +123,6 @@ def test_get_provider_list_and_get_provider(gsm):
assert gsm._global_provider_manager.get_provider_list() == {"p1": mock_provider}
assert gsm._global_provider_manager.get_provider("p1") == mock_provider
assert gsm._global_provider_manager.get_provider("missing") is None
+
+
# noqa: E402
diff --git a/tests/core/postgres_database/postgres_test.py b/tests/core/postgres_database/postgres_test.py
index 0674927..0376203 100644
--- a/tests/core/postgres_database/postgres_test.py
+++ b/tests/core/postgres_database/postgres_test.py
@@ -82,4 +82,6 @@ async def test_postgres_database(
mock_conn.run_sync.assert_called_once_with(mock_create_all)
assert await db.get_user_authority(user_id="123") == "test_auth"
+
+
# noqa: E402
diff --git a/tests/core/workflow_running_engine/workflow_runner_test.py b/tests/core/workflow_running_engine/workflow_runner_test.py
deleted file mode 100644
index 26d3096..0000000
--- a/tests/core/workflow_running_engine/workflow_runner_test.py
+++ /dev/null
@@ -1,197 +0,0 @@
-import pytest
-from unittest.mock import MagicMock, AsyncMock, patch
-import asyncio
-
-import sys
-import builtins
-
-real_import = builtins.__import__
-
-
-def mock_import(name, globals=None, locals=None, fromlist=(), level=0):
- if name == "ray":
- mock_ray = MagicMock()
-
- def mock_remote(*args, **kwargs):
- if len(args) == 1 and callable(args[0]):
- return args[0]
-
- def decorator(cls):
- return cls
-
- return decorator
-
- mock_ray.remote = mock_remote
- return mock_ray
- return real_import(name, globals, locals, fromlist, level)
-
-
-builtins.__import__ = mock_import
-for mod in list(sys.modules.keys()):
- if "kilostar.core.workflow_running_engine.workflow_runner" in mod or "ray" in mod:
- del sys.modules[mod]
-from kilostar.core.workflow_running_engine.workflow_runner import ( # noqa: E402
- WorkflowEngine,
- WorkflowRunningEngine,
-)
-
-builtins.__import__ = real_import
-
-
-@pytest.fixture
-def mock_ray():
- with patch("kilostar.core.workflow_running_engine.workflow_runner.ray") as mock_ray:
- mock_ray.get = lambda x: x
- yield mock_ray
-
-
-def test_workflow_engine_init():
- mock_wf = MagicMock()
- mock_wf.work_link = []
- engine = WorkflowEngine(mock_wf, "conscious", "control", "supervisor")
- assert engine.workflow == mock_wf
- assert engine.consciousness_node == "conscious"
- assert engine.control_node == "control"
- assert engine.regulatory_node == "supervisor"
-
-
-@pytest.mark.asyncio
-async def test_workflow_engine_run():
- from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow, WorkStep, WorkflowStatus
-
- mock_wf = MagicMock(spec=kilostarWorkflow)
-
- step1 = MagicMock(spec=WorkStep)
- step1.step = 1
- step1.status = "waiting"
- step1.node = "control_node"
- step1.name = "mock_name"
- step1.desc = "mock_desc"
- step1.action = "mock_action"
- step1.inputs = []
- step1.outputs = "res"
- step1.logic_gate = None
- mock_wf.work_link = [step1]
-
- mock_status = MagicMock(spec=WorkflowStatus)
- mock_status.step = 1
- mock_status.status = "running"
- mock_wf.status = mock_status
- mock_wf.context_memory = {}
- mock_wf.title = "mock_title"
- mock_wf.trace_id = "mock_trace_id"
- mock_wf.command = "mock_command"
- mock_wf.event_info = MagicMock()
- mock_wf.event_info.platform = "test"
- mock_wf.event_info.user_name = "test_user"
-
- mock_control = MagicMock()
- mock_control.working.remote = AsyncMock(return_value="process_result")
-
- mock_conscious = MagicMock()
- mock_conscious.working.remote = AsyncMock(return_value="report")
-
- mock_supervisor = MagicMock()
- mock_supervisor.working.remote = AsyncMock(return_value="response")
-
- engine = WorkflowEngine(mock_wf, mock_conscious, mock_control, mock_supervisor)
-
- with patch(
- "kilostar.core.workflow_running_engine.workflow_runner.ray"
- ) as mock_ray_patch:
- mock_gsm = MagicMock()
- mock_ray_patch.get_actor.return_value = mock_gsm
- await engine.run()
-
- assert step1.status == "completed"
- assert mock_wf.context_memory["res"] == "process_result"
-
-
-def test_workflow_running_engine_init():
- engine = WorkflowRunningEngine("conscious", "control", "supervisor")
- assert engine.consciousness_node == "conscious"
- assert engine.control_node == "control"
- assert engine.regulatory_node == "supervisor"
-
-
-@pytest.mark.asyncio
-async def test_workflow_running_engine_submit():
- engine = WorkflowRunningEngine("conscious", "control", "supervisor")
- engine.workflow_queue = asyncio.Queue()
-
- mock_wf = MagicMock()
- await engine.workflow_queue.put(mock_wf)
-
- item = await engine.workflow_queue.get()
- assert item == mock_wf
-
-
-@pytest.mark.asyncio
-async def test_workflow_running_engine_runner():
- from kilostar.api.platform.event import kilostarEvent
- from kilostar.core.individual.consciousness_node.template import ForWorkflowEngine
-
- mock_consciousness = MagicMock()
- mock_wf = MagicMock()
- mock_wf.trace_id = "test_trace"
- mock_wf.title = "test_title"
-
- mock_result = MagicMock(spec=ForWorkflowEngine)
- mock_result.workflow = mock_wf
- mock_consciousness.working.remote = AsyncMock(return_value=mock_result)
-
- engine = WorkflowRunningEngine(mock_consciousness, "control", "supervisor")
- engine.workflow_queue = asyncio.Queue()
-
- mock_event = kilostarEvent(
- platform="test_platform",
- user_id="test_user",
- user_name="test_user",
- message="test_message",
- context={},
- )
- await engine.workflow_queue.put(mock_event)
-
- # Mock the global_state_machine get_skill_list.remote method properly
- mock_gsm = MagicMock()
- mock_gsm.list_individuals.remote = AsyncMock(
- return_value={
- "test_skill": {
- "agent_type": "skill_individual",
- "agent_name": "TestSkill",
- "description": "desc",
- }
- }
- )
- engine.global_state_machine = mock_gsm
-
- with (
- patch(
- "kilostar.core.workflow_running_engine.workflow_runner.WorkflowEngine"
- ) as mock_wf_engine_cls,
- patch("builtins.open", new_callable=MagicMock) as mock_open,
- patch(
- "kilostar.core.workflow_running_engine.workflow_runner.ray_actor_hook"
- ) as mock_hook,
- ):
- # Instead of patching hook, we inject it directly
- # engine.global_state_machine = AsyncMock()
-
- mock_open.return_value.__enter__.return_value.read.return_value = "{}"
-
- mock_gwm = MagicMock()
- mock_gwm.update_workflow.remote = AsyncMock()
- mock_hook.return_value.global_workflow_manager = mock_gwm
-
- mock_engine_instance = MagicMock()
- mock_engine_instance.run = AsyncMock()
- mock_wf_engine_cls.return_value = mock_engine_instance
-
- task = asyncio.create_task(engine.runner(1))
- await asyncio.sleep(0.05)
- task.cancel()
-
- mock_wf_engine_cls.assert_called_with(
- mock_wf, mock_consciousness, "control", "supervisor"
- )
-# noqa: E402
diff --git a/tests/utils/access_test.py b/tests/utils/access_test.py
index 564aa7b..3c72117 100644
--- a/tests/utils/access_test.py
+++ b/tests/utils/access_test.py
@@ -96,4 +96,6 @@ def test_decode_token_validation_error():
assert excinfo.value.status_code == 401
assert excinfo.value.detail == "无效的认证凭证"
+
+
# noqa: E402