Refactor Workflow and Chat Architecture (#68)

* refactor: overhaul workflow and chat architecture

- Separate Chat and Workflow API endpoints and database models
- Use JSONB to store workflow execution context in Postgres
- Convert workflow engine to use pydantic-ai execution graphs inside a Ray task
- Update frontend React components to support standalone workflow creation
- Remove obsolete and broken workflow runner tests

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* refactor: overhaul workflow and chat architecture

- Separate Chat and Workflow API endpoints and database models
- Use JSONB to store workflow execution context in Postgres
- Convert workflow engine to use pydantic-ai execution graphs inside a Ray task
- Update frontend React components to support standalone workflow creation
- Remove obsolete and broken workflow runner tests

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

* refactor: overhaul workflow and chat architecture

- Separate Chat and Workflow API endpoints and database models
- Use JSONB to store workflow execution context in Postgres
- Convert workflow engine to use pydantic-ai execution graphs inside a Ray task
- Update frontend React components to support standalone workflow creation
- Move workflow_engine inside workflow package to keep core root clean
- Remove obsolete and broken workflow runner tests

Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>

---------

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>
This commit is contained in:
2026-05-12 15:47:17 +08:00
committed by GitHub
parent ee9bbbf676
commit ff1ede47a0
33 changed files with 995 additions and 412 deletions
@@ -17,7 +17,9 @@ from kilostar.core.global_state_machine.provider_manager import ProviderManager
from kilostar.core.global_state_machine.tool_manager import GlobalToolManager
from kilostar.core.postgres_database import PostgresDatabase
from kilostar.core.global_state_machine.skill_manager import GlobalSkillManager
from kilostar.core.global_state_machine.individual_manager import GlobalIndividualManager
from kilostar.core.global_state_machine.individual_manager import (
GlobalIndividualManager,
)
@ray.remote
@@ -2,7 +2,7 @@ import ray
import asyncio
from typing import Dict
from kilostar.api.platform.event import kilostarEvent
from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow
from kilostar.core.work.workflow.workflow import KiloStarWorkflow
from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.logger import get_logger
@@ -145,12 +145,12 @@ class GlobalWorkflowManager:
self.event_dict[trace_id].attachment = attachment
await self._upsert_event_to_db(self.event_dict[trace_id])
async def update_workflow(self, trace_id: str, workflow: kilostarWorkflow) -> None:
async def update_workflow(self, trace_id: str, workflow: KiloStarWorkflow) -> None:
if trace_id in self.event_dict:
self.event_dict[trace_id].workflow = workflow
await self._upsert_event_to_db(self.event_dict[trace_id])
async def get_workflow(self, trace_id: str) -> kilostarWorkflow | None:
async def get_workflow(self, trace_id: str) -> KiloStarWorkflow | None:
event = await self.get_event(trace_id)
return event.workflow if event else None
@@ -28,13 +28,11 @@ from pydantic_ai import Agent, RunContext
from kilostar.core.global_state_machine.global_state_machine import GlobalStateMachine
from kilostar.core.global_state_machine.model_provider.base_provider import Provider
from kilostar.adapter.model_adapter.agent_factory import AgentFactory
from kilostar.utils.ray_hook import ray_actor_hook
@ray.remote
class ConsciousnessNode:
"""ConsciousnessNode 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
def __init__(self) -> None:
from kilostar.utils.logger import get_logger
@@ -48,19 +46,6 @@ class ConsciousnessNode:
model_id: str,
tools_list: list[str] = None,
) -> None:
"""
create_agent方法,将agent对象装配到ConsciousnessNode的属性内
该方法通过provider_title从global_state_machine中获取provider对象,然后从provider对象中取出供应商形象,装配为pydantic_ai的
Agent实例,
并挂载到self.agent属性
Args:
global_state_machine: 全局状态机
provider_title: 供应商名
model_id: 模型id
Returns:
无返回
"""
system_prompt: str = (
"你叫kilostar,是一个多智能体AI助手系统中的【意识节点 (Consciousness Node)】。\n"
"你是系统的'高级规划师''架构师',负责处理监控节点分配过来的复杂任务。\n"
@@ -91,10 +76,6 @@ class ConsciousnessNode:
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
"""执行与 dynamic prompt 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: ctx (RunContext[ConsciousnessNodeDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务上下文 ===\n"
@@ -109,14 +90,66 @@ class ConsciousnessNode:
return prompt
async def start_workflow_design(self, trace_id: str, command: str):
"""
开始进行工作流设计的交互过程(与用户通过 SSE 进行确认或直接生成)
目前简化为:直接根据 command 拆解并构建工作流,然后提交执行。
"""
self.logger.info(
f"ConsciousnessNode: 开始为 trace_id {trace_id} 设计工作流。原始命令:{command}"
)
# 获取可用技能 (示例)
postgres_database = ray_actor_hook("postgres_database").postgres_database
skills_entities = await postgres_database.get_all_worker_individual.remote()
available_skills = []
if skills_entities:
for skill in skills_entities:
available_skills.append(
{
"agent_id": skill.agent_id,
"name": skill.agent_name,
"description": skill.description,
}
)
payload = ForWorkflowEngineInput(
original_command=command, available_skills=available_skills
)
# 通知 SSE 正在生成图结构
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.put_received.remote(
trace_id, "正在为您构建并规划工作流任务节点,请稍候..."
)
# 实际构建过程
result = await self.working(payload)
if result and isinstance(result, ForWorkflowEngine):
workflow = result.workflow
workflow.trace_id = trace_id
await global_workflow_manager.put_received.remote(
trace_id, "工作流构建完成,即将开始执行!"
)
# 将生成的完整工作流提交执行
workflow_engine = ray_actor_hook(
"workflow_running_engine"
).workflow_running_engine
await workflow_engine.execute_workflow.remote(workflow)
else:
await global_workflow_manager.put_received.remote(
trace_id, "很抱歉,工作流生成失败。"
)
await postgres_database.update_workflow_status.remote(trace_id, "failed")
async def working(
self,
payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForregulatoryInput],
) -> Union[ForWorkflowEngine, ForWorkflow, ForregulatoryNode, None]:
"""执行与 working 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: payload (Union[ForWorkflowEngineInput, ForWorkflowInput, ForregulatoryInput]): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (Union[ForWorkflowEngine, ForWorkflow, ForregulatoryNode, None]): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
result = await self._run(payload)
if isinstance(result, (ForWorkflowEngine, ForWorkflow, ForregulatoryNode)):
@@ -132,51 +165,20 @@ class ConsciousnessNode:
@overload
async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine:
"""
_run方法
该分支应当在regulatory_node简单处理用户命令后,工作流创建前调用!
Args:
payload: 应当包含原始命令和可用技能等信息
Returns:
ForWorkflowEngine对象,将被放到全局状态机后丢入WorkflowEngine的异步队列
"""
pass
@overload
async def _run(self, payload: ForWorkflow) -> ForWorkflow:
"""
_run方法
该分支应当在workflow运行时,由WorkflowEngine进行调用!
Args:
payload: 应当包含workflow中的WorkStep对象
Returns:
ForWorkflow对象,作为ConsciousnessNode执行Workflow中的WorkStep的结果
"""
pass
@overload
async def _run(self, payload: ForregulatoryInput) -> ForregulatoryNode:
"""
_run方法
该分支应当在workflow运行完全结束后,由WorkflowEngine进行调用!
Args:
payload: 应当包含整个Workflow的情况
Returns:
Forregulatory对象,作为ConsciousnessNode对于全工作流的技术性总结,返回给regulatoryNode
"""
pass
async def _run(
self,
payload: Union[ForregulatoryInput, ForWorkflowInput, ForWorkflowEngineInput],
) -> Union[ForregulatoryNode, ForWorkflow, ForWorkflowEngine]:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: payload (Union[ForregulatoryInput, ForWorkflowInput, ForWorkflowEngineInput]): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (Union[ForregulatoryNode, ForWorkflow, ForWorkflowEngine]): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
self.agent.retries = 3
if isinstance(payload, ForWorkflowEngineInput):
@@ -13,7 +13,7 @@
# limitations under the License.
from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow, WorkStep
from kilostar.core.work.workflow.workflow import KiloStarWorkflow, WorkflowStep
from kilostar.utils.agent_model import ResponseModel, DepsModel, InputModel
from pydantic import Field
@@ -28,7 +28,7 @@ class ConsciousnessNodeResponse(ResponseModel):
class ForWorkflowEngine(ConsciousnessNodeResponse):
"""生成workflow并放入WorkflowEngine"""
workflow: kilostarWorkflow = Field(
workflow: KiloStarWorkflow = Field(
..., description="生成好的符合规范的完整工作流对象。"
)
reasoning: str = Field(..., description="生成此工作流的原因和思路简述。")
@@ -76,7 +76,7 @@ class ForWorkflowInput(ConsciousnessNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
workflow_step: WorkStep
workflow_step: WorkflowStep
original_command: str
@@ -84,5 +84,5 @@ class ForregulatoryInput(ConsciousnessNodeInput):
"""ForregulatoryInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForregulatoryInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
workflow: kilostarWorkflow
workflow: KiloStarWorkflow
original_command: str
@@ -14,7 +14,7 @@
from pydantic import Field
from kilostar.core.workflow_running_engine.workflow import WorkStep
from kilostar.core.work.workflow.workflow import WorkflowStep
from kilostar.utils.agent_model import ResponseModel, InputModel, DepsModel
@@ -35,7 +35,8 @@ class ControlNodeDeps(DepsModel):
"""ControlNodeDeps 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
workflow_step: WorkStep
workflow_step: WorkflowStep
workflow_step: WorkflowStep
# In the future, this can be dynamically populated with tools specific to the current task execution
@@ -52,4 +53,4 @@ class ForWorkflowInput(ControlNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
workflow_step: WorkStep
workflow_step: WorkflowStep
@@ -157,7 +157,9 @@ class RegulatoryNode:
return "抱歉,监控节点处理请求时发生严重错误,请联系管理员。"
@overload
async def _run(self, payload: kilostarEvent) -> Union[ForConsciousnessNode, ForUser]:
async def _run(
self, payload: kilostarEvent
) -> Union[ForConsciousnessNode, ForUser]:
"""
_run方法
Args:
@@ -15,5 +15,21 @@
from kilostar.core.postgres_database.model.user import User
from kilostar.core.postgres_database.model.provider import Provider
from kilostar.core.postgres_database.model.individual import WorkerIndividual
from kilostar.core.postgres_database.model.workflow import (
Workflow,
WorkflowContextModel,
)
from kilostar.core.postgres_database.model.chat_history import (
ChatHistoryRegister,
ChatHistoryMessage,
)
__all__ = ["User", "Provider", "WorkerIndividual"]
__all__ = [
"User",
"Provider",
"WorkerIndividual",
"Workflow",
"WorkflowContextModel",
"ChatHistoryRegister",
"ChatHistoryMessage",
]
@@ -15,5 +15,6 @@
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
class BaseDataModel(DeclarativeBase, AsyncAttrs):
pass
pass
@@ -11,19 +11,55 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Literal
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from .base import BaseDataModel
from sqlalchemy.orm import Mapped
class ChatHistoryMessage(BaseDataModel):
__tablename__ = "chat_history_massage"
message_id: Mapped[str]
message: Mapped[str]
message_owner: Literal["user","regulatory_node"]
class ChatHistoryRegister(BaseDataModel):
__tablename__ = "chat_history_register"
chat_id: Mapped[str]
user_id: Mapped[str]
"""
一个特定的聊天会话记录注册表。
类似于多会话的一个 Thread/Session。
"""
__tablename__ = "chat_history_register"
chat_id: Mapped[str] = mapped_column(
String(64), primary_key=True, description="聊天会话ID"
)
user_id: Mapped[str] = mapped_column(
String(64), index=True, description="归属的用户ID"
)
title: Mapped[str] = mapped_column(
String(255), default="新对话", description="对话标题"
)
created_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
class ChatHistoryMessage(BaseDataModel):
"""
特定会话中的每一条具体消息记录。
"""
__tablename__ = "chat_history_message"
message_id: Mapped[str] = mapped_column(
String(64), primary_key=True, description="消息ID"
)
chat_id: Mapped[str] = mapped_column(
String(64), index=True, description="所属会话ID"
)
message: Mapped[str] = mapped_column(String, description="消息体内容")
message_owner: Mapped[str] = mapped_column(
String(50),
description="消息发送方,例如 'user', 'regulatory_node', 'consciousness_node'",
)
created_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -44,10 +44,7 @@ class BaseIndividualModel(BaseDataModel):
agent_type: Mapped[str] = mapped_column(String(32))
__mapper_args__ = {
"polymorphic_on": "agent_type",
"polymorphic_identity": "base"
}
__mapper_args__ = {"polymorphic_on": "agent_type", "polymorphic_identity": "base"}
# ==========================================
@@ -57,8 +54,7 @@ class SpecialistIndividualModel(BaseIndividualModel):
__tablename__ = "specialist_individual"
agent_id: Mapped[str] = mapped_column(
ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
primary_key=True
ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
bound_skill: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONB)
workspace: Mapped[Optional[List[str]]] = mapped_column(JSONB)
@@ -70,12 +66,12 @@ class SpecialistIndividualModel(BaseIndividualModel):
sub_ordinary_agents: Mapped[List["OrdinaryIndividualModel"]] = relationship(
back_populates="manager",
cascade="all, delete-orphan",
foreign_keys="[OrdinaryIndividualModel.manager_id]"
foreign_keys="[OrdinaryIndividualModel.manager_id]",
)
sub_special_agents: Mapped[List["SpecialIndividualModel"]] = relationship(
back_populates="manager",
cascade="all, delete-orphan",
foreign_keys="[SpecialIndividualModel.manager_id]"
foreign_keys="[SpecialIndividualModel.manager_id]",
)
__mapper_args__ = {
@@ -90,8 +86,7 @@ class OrdinaryIndividualModel(BaseIndividualModel):
__tablename__ = "ordinary_individual"
agent_id: Mapped[str] = mapped_column(
ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
primary_key=True
ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
finetuned_from: Mapped[Optional[str]] = mapped_column(String(100))
tools: Mapped[Optional[List[str]]] = mapped_column(
@@ -106,7 +101,7 @@ class OrdinaryIndividualModel(BaseIndividualModel):
# 逻辑关联:指向上级专家
manager: Mapped[Optional["SpecialistIndividualModel"]] = relationship(
back_populates="sub_ordinary_agents",
foreign_keys=[manager_id] # 显式指定使用 manager_id 解析关系
foreign_keys=[manager_id], # 显式指定使用 manager_id 解析关系
)
__mapper_args__ = {
@@ -121,12 +116,10 @@ class SpecialIndividualModel(BaseIndividualModel):
__tablename__ = "special_individual"
agent_id: Mapped[str] = mapped_column(
ForeignKey("base_individual.agent_id", ondelete="CASCADE"),
primary_key=True
ForeignKey("base_individual.agent_id", ondelete="CASCADE"), primary_key=True
)
modality_type: Mapped[ModalityType] = mapped_column(
default=ModalityType.MULTIMODAL,
server_default=text("'multimodal'")
default=ModalityType.MULTIMODAL, server_default=text("'multimodal'")
)
multimodal_config: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSONB)
@@ -137,10 +130,9 @@ class SpecialIndividualModel(BaseIndividualModel):
# 【修复2】:修正 back_populates 指向正确的变量名
manager: Mapped[Optional["SpecialistIndividualModel"]] = relationship(
back_populates="sub_special_agents",
foreign_keys=[manager_id]
back_populates="sub_special_agents", foreign_keys=[manager_id]
)
__mapper_args__ = {
"polymorphic_identity": "special",
}
}
@@ -24,6 +24,7 @@ class ProviderModel(BaseDataModel):
Provider 物理模型。
作为模型/服务提供商适配器,标准化不同供应商(OpenAI, Anthropic 等)的配置。
"""
__tablename__ = "provider"
provider_id: Mapped[str] = mapped_column(String(64), primary_key=True)
provider_title: Mapped[str] = mapped_column(String(100), index=True, nullable=False)
@@ -31,14 +32,12 @@ class ProviderModel(BaseDataModel):
provider_url: Mapped[Optional[str]] = mapped_column(Text)
provider_apikey: Mapped[Optional[str]] = mapped_column(Text)
provider_models: Mapped[List[str]] = mapped_column(
JSONB,
default=list,
server_default=text("'[]'::jsonb")
JSONB, default=list, server_default=text("'[]'::jsonb")
)
provider_owner: Mapped[str] = mapped_column(String(64), index=True)
is_active: Mapped[bool] = mapped_column(
Boolean,
default=True,
server_default=text("true"),
comment="该服务商节点是否在线/启用"
comment="该服务商节点是否在线/启用",
)
@@ -13,10 +13,12 @@
# limitations under the License.
from typing import List, Optional
from sqlalchemy import String, Text
from sqlalchemy.dialects.postgresql import JSONB # 针对 Postgres 优化,支持索引和高性能解析
from sqlalchemy import String
from sqlalchemy.dialects.postgresql import (
JSONB,
) # 针对 Postgres 优化,支持索引和高性能解析
from sqlalchemy.orm import Mapped, mapped_column
from .base import BaseDataModel
from .base import BaseDataModel
class SystemNodeConfigModel(BaseDataModel):
@@ -24,12 +26,11 @@ class SystemNodeConfigModel(BaseDataModel):
SystemNodeConfig 物理模型。
作为 kilostar 架构中的独立处理单元,负责存储 LLM 节点的执行策略与工具配置。
"""
__tablename__ = "system_node_config"
node_name: Mapped[str] = mapped_column(String(100), primary_key=True)
provider_title: Mapped[str] = mapped_column(String(50), nullable=False)
model_id: Mapped[str] = mapped_column(String(100), nullable=False)
tools: Mapped[Optional[List[str]]] = mapped_column(
JSONB,
default=list,
comment="节点可调用的工具标识列表"
JSONB, default=list, comment="节点可调用的工具标识列表"
)
@@ -25,6 +25,7 @@ class UserAuthority(IntEnum):
"""
权限枚举类
"""
SUPER_ADMINISTRATOR = 100
ADMINISTRATOR = 50
USER = 20
@@ -36,12 +37,11 @@ class User(BaseDataModel):
"""
数据库user表模型
"""
__tablename__ = "user"
user_id: Mapped[str] = mapped_column(String(64), primary_key=True)
user_name: Mapped[str] = mapped_column(String(100), index=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
user_authority: Mapped[UserAuthority] = mapped_column(
Integer,
default=UserAuthority.USER,
server_default=text("20")
Integer, default=UserAuthority.USER, server_default=text("20")
)
@@ -12,12 +12,70 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlmodel import SQLModel, Field
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import JSONB
from .base import BaseDataModel
class EventRecord(SQLModel, table=True):
trace_id: str = Field(
primary_key=True, description="The unique trace ID of the kilostarEvent"
class Workflow(BaseDataModel):
__tablename__ = "workflow"
trace_id: Mapped[str] = mapped_column(
String(64), primary_key=True, description="工作流唯一ID (Trace ID)"
)
user_id: Mapped[str] = mapped_column(
String(64), index=True, description="创建该工作流的用户ID"
)
title: Mapped[str] = mapped_column(String(255), description="工作流标题/简短描述")
command: Mapped[str] = mapped_column(
String, description="创建工作流的原始用户命令文本"
)
status: Mapped[str] = mapped_column(
String(50),
default="creating",
description="工作流的总体状态 (例如: creating, running, pending, completed, failed等)",
)
version: Mapped[str] = mapped_column(
String(50), default="v1.0", description="系统协议版本号"
)
created_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
class WorkflowContextModel(BaseDataModel):
__tablename__ = "workflow_context"
trace_id: Mapped[str] = mapped_column(
String(64), primary_key=True, description="对应的工作流 Trace ID"
)
workflow_status: Mapped[dict] = mapped_column(
JSONB, default=dict, description="工作流状态变更历史"
)
blackboard: Mapped[dict] = mapped_column(
JSONB, default=dict, description="大模型输出的存储区 (共享黑板)"
)
work_step_status: Mapped[dict] = mapped_column(
JSONB, nullable=True, description="工作流运行步骤状态"
)
workflow_pointer: Mapped[int] = mapped_column(
nullable=True, description="工作流指针,指向具体运行步骤位置"
)
workflow_log: Mapped[list] = mapped_column(
JSONB, default=list, description="工作流运行日志"
)
work_link: Mapped[list] = mapped_column(
JSONB,
default=list,
description="工作链(即 WorkflowStep 的定义列表,包含图结构和原子动作)",
)
created_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[str] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
event_data_json: str = Field(description="The JSON serialized kilostarEvent data")
@@ -0,0 +1,82 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import select, func
from typing import List
from kilostar.core.postgres_database.model.chat_history import (
ChatHistoryRegister,
ChatHistoryMessage,
)
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from ulid import ULID
class ChatHistoryDatabase:
def __init__(self, async_session_maker: async_sessionmaker[AsyncSession]):
self.async_session_maker = async_session_maker
async def create_chat_session(
self, user_id: str, title: str = "新对话"
) -> ChatHistoryRegister:
async with self.async_session_maker() as session:
chat_id = str(ULID())
chat = ChatHistoryRegister(chat_id=chat_id, user_id=user_id, title=title)
session.add(chat)
await session.commit()
await session.refresh(chat)
return chat
async def list_chat_sessions(self, user_id: str) -> List[ChatHistoryRegister]:
async with self.async_session_maker() as session:
statement = (
select(ChatHistoryRegister)
.where(ChatHistoryRegister.user_id == user_id)
.order_by(ChatHistoryRegister.updated_at.desc())
)
results = await session.execute(statement)
return results.scalars().all()
async def add_chat_message(
self, chat_id: str, message: str, message_owner: str
) -> ChatHistoryMessage:
async with self.async_session_maker() as session:
msg_id = str(ULID())
msg = ChatHistoryMessage(
message_id=msg_id,
chat_id=chat_id,
message=message,
message_owner=message_owner,
)
session.add(msg)
# Update the chat session's updated_at
statement = select(ChatHistoryRegister).where(
ChatHistoryRegister.chat_id == chat_id
)
results = await session.execute(statement)
chat = results.scalar_one_or_none()
if chat:
chat.updated_at = func.now()
await session.commit()
await session.refresh(msg)
return msg
async def list_chat_messages(self, chat_id: str) -> List[ChatHistoryMessage]:
async with self.async_session_maker() as session:
statement = (
select(ChatHistoryMessage)
.where(ChatHistoryMessage.chat_id == chat_id)
.order_by(ChatHistoryMessage.created_at.asc())
)
results = await session.execute(statement)
return results.scalars().all()
@@ -0,0 +1,96 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy import select
from typing import List, Optional
from kilostar.core.postgres_database.model.workflow import (
Workflow,
WorkflowContextModel,
)
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
class WorkflowDatabase:
def __init__(self, async_session_maker: async_sessionmaker[AsyncSession]):
self.async_session_maker = async_session_maker
async def create_workflow(
self, trace_id: str, user_id: str, title: str, command: str
) -> Workflow:
async with self.async_session_maker() as session:
wf = Workflow(
trace_id=trace_id,
user_id=user_id,
title=title,
command=command,
status="creating",
)
session.add(wf)
await session.commit()
await session.refresh(wf)
return wf
async def get_workflow(self, trace_id: str) -> Optional[Workflow]:
async with self.async_session_maker() as session:
statement = select(Workflow).where(Workflow.trace_id == trace_id)
results = await session.execute(statement)
return results.scalar_one_or_none()
async def update_workflow_status(
self, trace_id: str, status: str
) -> Optional[Workflow]:
async with self.async_session_maker() as session:
statement = select(Workflow).where(Workflow.trace_id == trace_id)
results = await session.execute(statement)
record = results.scalar_one_or_none()
if record:
record.status = status
await session.commit()
await session.refresh(record)
return record
async def list_workflows(self, user_id: str) -> List[Workflow]:
async with self.async_session_maker() as session:
statement = select(Workflow).where(Workflow.user_id == user_id)
results = await session.execute(statement)
return results.scalars().all()
async def upsert_workflow_context(
self, trace_id: str, **kwargs
) -> WorkflowContextModel:
async with self.async_session_maker() as session:
statement = select(WorkflowContextModel).where(
WorkflowContextModel.trace_id == trace_id
)
results = await session.execute(statement)
record = results.scalar_one_or_none()
if record:
for key, value in kwargs.items():
setattr(record, key, value)
else:
record = WorkflowContextModel(trace_id=trace_id, **kwargs)
session.add(record)
await session.commit()
await session.refresh(record)
return record
async def get_workflow_context(
self, trace_id: str
) -> Optional[WorkflowContextModel]:
async with self.async_session_maker() as session:
statement = select(WorkflowContextModel).where(
WorkflowContextModel.trace_id == trace_id
)
results = await session.execute(statement)
return results.scalar_one_or_none()
@@ -25,6 +25,8 @@ from .module.event import EventDatabase
from .module.user import AuthDatabase
from .module.provider import ProviderDatabase
from .module.system_node import SystemNodeDatabase
from .module.workflow import WorkflowDatabase
from .module.chat_history import ChatHistoryDatabase
@ray.remote
@@ -51,6 +53,8 @@ class PostgresDatabase:
self._individual_database = IndividualDatabase(self.async_session_maker)
self._event_database = EventDatabase(self.async_session_maker)
self._system_node_database = SystemNodeDatabase(self.async_session_maker)
self._workflow_database = WorkflowDatabase(self.async_session_maker)
self._chat_history_database = ChatHistoryDatabase(self.async_session_maker)
self.ready_event = asyncio.Event()
@@ -254,3 +258,51 @@ class PostgresDatabase:
async def delete_event(self, trace_id: str):
await self.ready_event.wait()
return await self._event_database.delete_event(trace_id)
# Workflow Database Methods
async def create_workflow(
self, trace_id: str, user_id: str, title: str, command: str
):
await self.ready_event.wait()
return await self._workflow_database.create_workflow(
trace_id, user_id, title, command
)
async def get_workflow(self, trace_id: str):
await self.ready_event.wait()
return await self._workflow_database.get_workflow(trace_id)
async def update_workflow_status(self, trace_id: str, status: str):
await self.ready_event.wait()
return await self._workflow_database.update_workflow_status(trace_id, status)
async def list_workflows(self, user_id: str):
await self.ready_event.wait()
return await self._workflow_database.list_workflows(user_id)
async def upsert_workflow_context(self, trace_id: str, **kwargs):
await self.ready_event.wait()
return await self._workflow_database.upsert_workflow_context(trace_id, **kwargs)
async def get_workflow_context(self, trace_id: str):
await self.ready_event.wait()
return await self._workflow_database.get_workflow_context(trace_id)
# Chat History Database Methods
async def create_chat_session(self, user_id: str, title: str = "新对话"):
await self.ready_event.wait()
return await self._chat_history_database.create_chat_session(user_id, title)
async def list_chat_sessions(self, user_id: str):
await self.ready_event.wait()
return await self._chat_history_database.list_chat_sessions(user_id)
async def add_chat_message(self, chat_id: str, message: str, message_owner: str):
await self.ready_event.wait()
return await self._chat_history_database.add_chat_message(
chat_id, message, message_owner
)
async def list_chat_messages(self, chat_id: str):
await self.ready_event.wait()
return await self._chat_history_database.list_chat_messages(chat_id)
+9 -2
View File
@@ -16,13 +16,17 @@ from pydantic import BaseModel, Field
from typing import Literal, Optional
from enum import Enum
class LogicGate(BaseModel):
"""
LogicGate 类。
跳转逻辑,标记该步骤运行成功或失败的动作
"""
if_fail: str = Field(..., description="失败跳转目标,如 'jump_to_step_1'")
if_pass: Literal["continue", "exit"] = Field(default="continue", description="成功后的动作")
if_pass: Literal["continue", "exit"] = Field(
default="continue", description="成功后的动作"
)
class WorkflowMetadata(BaseModel):
@@ -30,6 +34,7 @@ class WorkflowMetadata(BaseModel):
WorkflowMetadata类
workflow的元数据类,保存与用户有关的数据
"""
user_id: Optional[str] = Field(default=None, description="创建工作流的用户的ulid")
command: Optional[str] = Field(default=None, description="创建工作流的原始命令")
@@ -44,6 +49,7 @@ class WorkStepStatus(str, Enum):
COMPLETED: 完成
FAILED = 失败
"""
PENDING = "pending"
WORKING = "working"
HANGUP = "hang_up"
@@ -61,9 +67,10 @@ class WorkflowStatus(str, Enum):
CREATING = 创建中
PENDING = 等待中
"""
RUNNING = "running"
HANGUP = "hang_up"
COMPLETED = "completed"
FAILED = "failed"
CREATING = "creating"
PENDING = "pending"
PENDING = "pending"
+31 -9
View File
@@ -18,19 +18,32 @@ from .model import LogicGate, WorkflowMetadata, WorkStepStatus, WorkflowStatus
from ulid import ULID
from datetime import datetime
class WorkflowContext(BaseModel):
"""
WorkflowContext 类
作为workflow运行时的数据部分,使得数据和计算分离
"""
trace_id: str = Field(description="工作流的trace_id")
workflow_status: Dict[str, WorkflowStatus] = Field(default_factory=lambda: {datetime.now().strftime("%Y-%m-%d %H:%M:%S"):WorkflowStatus.CREATING} ,description="工作流状态")
workflow_status: Dict[str, WorkflowStatus] = Field(
default_factory=lambda: {
datetime.now().strftime("%Y-%m-%d %H:%M:%S"): WorkflowStatus.CREATING
},
description="工作流状态",
)
blackboard: Dict[str, Any] = Field(description="大模型输出的存储区")
work_step_status: Optional[Dict[int, tuple[str, WorkStepStatus]]] = Field(default= None,description="工作流运行状态")
work_step_status: Optional[Dict[int, tuple[str, WorkStepStatus]]] = Field(
default=None, description="工作流运行状态"
)
"""work_step_status:字典,键为整个工作流的运行步骤,值为元组,包含两个字段:
1.字符串,更新时间的字符串;2.WorkflowStatus枚举类,当前步骤的运行情况"""
workflow_pointer: Optional[int] = Field(description="工作流指针,指向具体的workflow位置")
workflow_log: List[Dict[int, tuple[str, WorkflowStatus, str]]] = Field(default=[], description="工作流运行日志")
workflow_pointer: Optional[int] = Field(
description="工作流指针,指向具体的workflow位置"
)
workflow_log: List[Dict[int, tuple[str, WorkflowStatus, str]]] = Field(
default=[], description="工作流运行日志"
)
"""workflow_log:一个列表,内部元素为一个字典,键为步骤序号,值为一个元组,包含三个字段:
1.字符串,更新时间的字符串;2.WorkflowStatus枚举类,当前步骤的运行情况;3.字符串,当前步骤运行完后的输出总结或失败原因"""
@@ -40,12 +53,18 @@ class WorkflowStep(BaseModel):
WorkflowStep 类
workflow每一个步骤的模型,为workflow的最小执行单位
"""
step: int = Field(..., gt=0, description="步骤序号,严格自增")
name: str = Field(..., description="步骤名称")
action: str = Field(..., description="执行的原子动作")
inputs: Optional[Union[str, List[str]]] = Field(default=None, description="前置依赖输出")
inputs: Optional[Union[str, List[str]]] = Field(
default=None, description="前置依赖输出"
)
outputs: Optional[str] = Field(default=None, description="当前步骤产出物变量名")
agent_id: Optional[str] = Field(default=None,description="分配给 skill_individual 的 Skill Individual 真实 agent_id,不可用名称代替",)
agent_id: Optional[str] = Field(
default=None,
description="分配给 skill_individual 的 Skill Individual 真实 agent_id,不可用名称代替",
)
logic_gate: Optional[LogicGate] = Field(default=None, description="逻辑跳转控制")
@@ -54,9 +73,12 @@ class KiloStarWorkflow(BaseModel):
KiloStarWorkflow 类
kilostar的workflow核心类,由consciousness_node创建
"""
trace_id: str = Field(default_factory=lambda: str(ULID()), description="系统自动生成的追溯ID")
trace_id: str = Field(
default_factory=lambda: str(ULID()), description="系统自动生成的追溯ID"
)
version: str = Field(default="v1.0", description="系统协议版本号")
#-------------------
# -------------------
title: str = Field(..., description="工作流标题")
work_link: List[WorkflowStep] = Field(..., description="工作链")
workflow_metadata: WorkflowMetadata
@@ -86,4 +108,4 @@ class KiloStarWorkflow(BaseModel):
if "越界" in str(e):
raise e
raise ValueError(f"LogicGate 格式错误: {s.logic_gate.if_fail}")
return self
return self
@@ -0,0 +1,177 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import ray
from kilostar.core.work.workflow.workflow import KiloStarWorkflow
from typing import Dict, Any, List
@ray.remote
def run_workflow_task(workflow_data: dict, trace_id: str):
from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.core.work.workflow.model import WorkflowStatus
import datetime
from pydantic import BaseModel
# State passed through graph nodes
class WorkflowGraphState(BaseModel):
trace_id: str
blackboard: Dict[str, Any]
work_link: List[Dict[str, Any]]
current_step_index: int = 0
status: str = "running"
logs: List[Dict[str, Any]] = []
async def save_context(state: WorkflowGraphState):
postgres_database = ray_actor_hook("postgres_database").postgres_database
await postgres_database.upsert_workflow_context.remote(
state.trace_id,
workflow_pointer=state.current_step_index,
blackboard=state.blackboard,
work_link=state.work_link,
workflow_status={str(datetime.datetime.now()): state.status},
workflow_log=state.logs,
)
await postgres_database.update_workflow_status.remote(
state.trace_id, state.status
)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
await global_workflow_manager.put_received.remote(
state.trace_id, f"执行步骤 {state.current_step_index + 1}..."
)
async def execute_step(state: WorkflowGraphState):
"""执行单一工作流节点逻辑"""
if state.current_step_index >= len(state.work_link):
state.status = WorkflowStatus.COMPLETED
return state
step = state.work_link[state.current_step_index]
step.get("node", "")
action = step.get("action", "")
# 记录开始状态
state.logs.append(
{
str(state.current_step_index): [
str(datetime.datetime.now()),
"working",
f"开始执行: {step.get('name', '未命名步骤')}",
]
}
)
await save_context(state)
try:
# TODO: 实际对接不同节点执行逻辑 (例如: control_node, agent 技能)
# 这里是简化版,向控制节点或指定 skill 发送指令
# ... 模拟执行逻辑 ...
await asyncio.sleep(2)
# 记录结果
state.blackboard[
step.get("outputs", f"step_{state.current_step_index}_result")
] = "Success execution of " + action
state.logs[-1][str(state.current_step_index)] = [
str(datetime.datetime.now()),
"completed",
f"成功: {action}",
]
# 判断逻辑跳转
logic_gate = step.get("logic_gate")
if logic_gate and logic_gate.get("if_pass") == "exit":
state.status = WorkflowStatus.COMPLETED
else:
state.current_step_index += 1
except Exception as e:
state.logs[-1][str(state.current_step_index)] = [
str(datetime.datetime.now()),
"failed",
str(e),
]
state.status = WorkflowStatus.FAILED
logic_gate = step.get("logic_gate")
if logic_gate and logic_gate.get("if_fail"):
fail_target = logic_gate.get("if_fail")
if "jump_to_step_" in fail_target:
target_step = int(fail_target.split("_")[-1]) - 1
state.current_step_index = target_step
state.status = WorkflowStatus.RUNNING
await save_context(state)
return state
async def _run():
postgres_database = ray_actor_hook("postgres_database").postgres_database
await postgres_database.update_workflow_status.remote(
trace_id, WorkflowStatus.RUNNING
)
state = WorkflowGraphState(
trace_id=trace_id,
blackboard={},
work_link=workflow_data.get("work_link", []),
)
await save_context(state)
# 简单的图执行驱动 (模拟 pydantic-ai.graph.run 行为,直至 Graph 库正式稳定)
while state.status == WorkflowStatus.RUNNING and state.current_step_index < len(
state.work_link
):
state = await execute_step(state)
await postgres_database.update_workflow_status.remote(trace_id, state.status)
global_workflow_manager = ray_actor_hook(
"global_workflow_manager"
).global_workflow_manager
msg = (
"工作流执行完成!"
if state.status == WorkflowStatus.COMPLETED
else "工作流执行失败。"
)
await global_workflow_manager.put_received.remote(trace_id, msg)
asyncio.run(_run())
@ray.remote
class WorkflowRunningEngine:
def __init__(
self, consciousness_node=None, control_node=None, regulatory_node=None
):
self.consciousness_node = consciousness_node
self.control_node = control_node
self.regulatory_node = regulatory_node
self.events_queue = asyncio.Queue()
async def put_event(self, event):
await self.events_queue.put(event)
async def run(self):
"""引擎循环提取事件"""
while True:
await self.events_queue.get()
await asyncio.sleep(1)
async def execute_workflow(self, workflow: KiloStarWorkflow):
# 这个方法可以由意识节点调用来提交一个完整的运行任务
workflow_dict = workflow.model_dump()
trace_id = workflow.trace_id
run_workflow_task.remote(workflow_dict, trace_id)