feat(frontend):优化前端页面设计

This commit is contained in:
2026-05-29 16:44:17 +00:00
parent a83c5fa5bd
commit affe460180
80 changed files with 2670 additions and 2678 deletions
@@ -26,8 +26,11 @@ from kilostar.utils.error import ModelNotExistError
class AgentFactory:
"""AgentFactory 核心组件类
这是一个领域数据模型或功能封装类,承载了 AgentFactory 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""模型工厂:把内部的 ``Provider`` 元数据翻译成 pydantic-ai 的 ``Agent``
支持 openai / claude / deepseek / gemini 四类后端,差异通过
``_models_mapping`` 中的 ``model_class`` + ``provider_class`` 键值对屏蔽。
"""
def __init__(self):
self._models_mapping = {
@@ -22,18 +22,13 @@ T = TypeVar("T", bound=BaseModel)
class AgentRunResultProxy:
"""AgentRunResultProxy 核心组件类。
这是一个领域数据模型或功能封装类,承载了 AgentRunResultProxy 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``Agent.run`` 结果的轻量代理:把已解析的结构化对象暴露为 ``.data`` / ``.output``。"""
def __init__(self, original, parsed):
self._original = original
self._parsed = parsed
def __getattr__(self, name):
"""检索并获取特定的 getattr 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: name: 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
if name == "data":
return self._parsed
if name == "output":
@@ -102,10 +97,7 @@ class DeepSeekReasonerAgent(Generic[T]):
)
def _parse_output(self, text: str) -> Any:
"""执行与 parse output 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: text (str): 控制逻辑流向的具体字符串参数,指定了期望的 text 内容。
Returns: (Any): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从模型自由文本中抽取 ```json 块并按 ``output_schema`` 校验为对象。"""
if not self.has_custom_output:
return text
@@ -142,20 +134,13 @@ class DeepSeekReasonerAgent(Generic[T]):
def __getattr__(self, item):
# Delegate any unknown attributes (like .system_prompt, .tool) to the underlying pydantic_ai Agent
"""检索并获取特定的 getattr 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: item: 参与 getattr 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
return getattr(self.agent, item)
async def run(
self, user_prompt: str, deps: Any = None, message_history: list = None, **kwargs
) -> Any:
# Custom retry loop
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_prompt (str): 控制逻辑流向的具体字符串参数,指定了期望的 user prompt 内容。 deps (Any): 参与 run 逻辑运算或数据构建的上下文依赖对象。 message_history (list): 批量操作所需的列表集合,囊括了需要统一处理的多个 message history 元素。
Returns: (Any): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""运行一次 deepseek-reasoner 推理:失败时根据错误反馈让模型重试,最多 ``self.retries`` 轮。"""
current_history = message_history or []
last_exception = None
+1 -1
View File
@@ -141,7 +141,7 @@ else:
@serve.deployment
@serve.ingress(app)
class kilostarGateway:
class KiloStarGateway:
gateway: Dict[str, WebSocket]
def __init__(self):
+12 -42
View File
@@ -28,8 +28,7 @@ agent_router = APIRouter(prefix="/api/v1/agent", tags=["agent"])
class AgentRegister(BaseModel):
"""AgentRegister 核心组件类。
这是一个领域数据模型或功能封装类,承载了 AgentRegister 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /agent`` 入参(远程模型):通过 provider + model_id 加载系统节点。"""
provider_title: str
model_id: str
@@ -38,8 +37,7 @@ class AgentRegister(BaseModel):
class AgentLocalRegister(BaseModel):
"""AgentLocalRegister 核心组件类。
这是一个领域数据模型或功能封装类,承载了 AgentLocalRegister 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /agent`` 入参(本地模型):通过本地路径加载系统节点。"""
path: str
individual_name: str
@@ -50,10 +48,7 @@ class AgentLocalRegister(BaseModel):
async def get_system_nodes(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""处理针对 get system nodes 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: _ (TokenData): 参与 get system nodes 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""返回三大系统节点(regulatory/consciousness/control)当前的持久化配置。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
configs = await postgres_database.get_all_system_node_configs.remote()
return {"system_nodes": configs}
@@ -64,10 +59,7 @@ async def load_agent(
agent_register: Union[AgentRegister, AgentLocalRegister],
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""处理针对 load agent 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: agent_register (Union[AgentRegister, AgentLocalRegister]): 参与 load agent 逻辑运算或数据构建的上下文依赖对象。 _ (TokenData): 参与 load agent 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""加载/重载某个系统节点的 Agent:先持久化配置,再调用对应节点 Actor 的 ``create_agent``。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
postgres_database = ray_actor_hook("postgres_database").postgres_database
@@ -76,7 +68,6 @@ async def load_agent(
elif isinstance(agent_register, AgentRegister):
try:
# Persist configuration
await postgres_database.upsert_system_node_config.remote(
agent_register.individual_name,
agent_register.provider_title,
@@ -84,7 +75,6 @@ async def load_agent(
agent_register.tools,
)
# Load agent into state machine
match agent_register.individual_name:
case "regulatory_node":
node = ray_actor_hook("regulatory_node").regulatory_node
@@ -118,8 +108,7 @@ async def load_agent(
class WorkerIndividualCreate(BaseModel):
"""WorkerIndividualCreate 核心组件类。
这是一个具体的 Worker 智能体实体类,代表着具备特定人设、领域技能或长文本处理能力的数字员工。它可以被控制器动态拉起,并在安全沙箱内执行复杂的工作流指令与多步骤推理任务。"""
"""``POST /worker`` 入参:创建一个 Worker Agent 所需的完整配置。"""
agent_name: str
agent_type: AgentType
@@ -134,8 +123,7 @@ class WorkerIndividualCreate(BaseModel):
class WorkerIndividualUpdate(BaseModel):
"""WorkerIndividualUpdate 核心组件类。
这是一个具体的 Worker 智能体实体类,代表着具备特定人设、领域技能或长文本处理能力的数字员工。它可以被控制器动态拉起,并在安全沙箱内执行复杂的工作流指令与多步骤推理任务。"""
"""``PUT /worker/{agent_id}`` 入参:可选字段构成的局部更新载荷。"""
agent_name: Optional[str] = None
agent_type: Optional[AgentType] = None
@@ -154,10 +142,7 @@ async def create_worker_individual(
worker_data: WorkerIndividualCreate,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""处理针对 create worker individual 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: worker_data (WorkerIndividualCreate): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""创建一个 Worker Agent``owner_id`` 自动绑定为当前登录用户。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
data_dict = worker_data.model_dump()
data_dict["owner_id"] = token_data.user_id
@@ -169,10 +154,7 @@ async def create_worker_individual(
async def get_worker_individual_list(
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""处理针对 get worker individual list 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""列出当前登录用户名下的全部 Worker Agent。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
workers = await postgres_database.get_worker_individual_list.remote(
owner_id=token_data.user_id
@@ -184,10 +166,7 @@ async def get_worker_individual_list(
async def get_worker_individual(
agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
"""处理针对 get worker individual 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""按 ``agent_id`` 查询 Worker Agent;非本人的 Agent 返回 403。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id)
if not worker:
@@ -205,10 +184,7 @@ async def update_worker_individual(
worker_data: WorkerIndividualUpdate,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""处理针对 update worker individual 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 worker_data (WorkerIndividualUpdate): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""局部更新 Worker Agent 配置;同时把状态机里的旧实例移除等待懒加载。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id)
if not worker:
@@ -236,10 +212,7 @@ async def update_worker_individual(
async def reload_worker_individual(
agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
"""处理针对 reload worker individual 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""强制把 Worker 从内存池中卸载,下次调用时按最新配置重新加载。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id)
if not worker:
@@ -259,10 +232,7 @@ async def reload_worker_individual(
async def delete_worker_individual(
agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user)
):
"""处理针对 delete worker individual 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""删除 Worker Agent;非本人 Agent 返回 403。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id)
if not worker:
+5 -14
View File
@@ -26,8 +26,7 @@ auth_router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
class UserRegister(BaseModel):
"""UserRegister 核心组件类。
这是一个领域数据模型或功能封装类,承载了 UserRegister 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /register`` 入参:用户名 + 明文密码。"""
user_name: str
password: str
@@ -35,10 +34,7 @@ class UserRegister(BaseModel):
@auth_router.post("/register")
async def create_user(user_register: UserRegister):
"""处理针对 create user 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: user_register (UserRegister): 参与 create user 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""注册新用户:异步线程池里做 argon2 哈希,再交由 PostgresDatabase Actor 落库。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
hashed_password = await run_in_threadpool(
Accessor.hash_password, user_register.password
@@ -50,8 +46,7 @@ async def create_user(user_register: UserRegister):
class UserLogin(BaseModel):
"""UserLogin 核心组件类。
这是一个领域数据模型或功能封装类,承载了 UserLogin 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /login`` 入参:用户名 + 明文密码。"""
user_name: str
password: str
@@ -59,10 +54,7 @@ class UserLogin(BaseModel):
@auth_router.post("/login")
async def login_user(user_login: UserLogin):
"""处理针对 login user 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: user_login (UserLogin): 参与 login user 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""用户登录:查询用户后在线程池中校验口令,校验成功则签发 JWT。"""
postgres_database = ray_actor_hook("postgres_database").postgres_database
user = await postgres_database.login_user.remote(user_login.user_name)
if not user:
@@ -74,8 +66,7 @@ async def login_user(user_login: UserLogin):
class ChangeAuthorityRequest(BaseModel):
"""ChangeAuthorityRequest 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ChangeAuthorityRequest 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``PUT /authority`` 入参:目标用户 ID 及新的权限枚举。"""
user_id: str
new_authority: UserAuthority
+3 -10
View File
@@ -26,8 +26,7 @@ client_router = APIRouter(prefix="/api/v1/adapter/client", tags=["client"])
class Message(BaseModel):
"""Message 核心组件类。
这是一个领域数据模型或功能封装类,承载了 Message 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /client`` 入参:来自前端的一段聊天文本。"""
message: str
@@ -36,10 +35,7 @@ class Message(BaseModel):
async def create_message(
message: Message, token_data: TokenData = Depends(Accessor.get_current_user)
):
"""处理针对 create message 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: message (Message): 参与 create message 逻辑运算或数据构建的上下文依赖对象。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""把前端消息转交给 RegulatoryNode 处理,并把回复透传给前端。"""
logger.info("收到消息,来源:客户端")
logger.debug(f"消息内容:{message.message}")
regulatory_node = ray_actor_hook("regulatory_node").regulatory_node
@@ -56,10 +52,7 @@ async def upload_file(
file: UploadFile = File(...),
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""处理针对 upload file 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: file (UploadFile): 参与 upload file 逻辑运算或数据构建的上下文依赖对象。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""以流式方式把上传文件落到 ``uploads/`` 目录;失败抛 500。"""
try:
upload_dir = "uploads"
os.makedirs(upload_dir, exist_ok=True)
+4 -14
View File
@@ -26,8 +26,7 @@ provider_router = APIRouter(prefix="/api/v1/provider", tags=["provider"])
class ProviderRegister(BaseModel):
"""ProviderRegister 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""``POST /provider`` 入参:注册一个模型 Provider 的最小字段集。"""
provider_type: Literal["openai", "claude", "deepseek"]
provider_title: str
@@ -40,10 +39,7 @@ async def create_provider(
provider_register: ProviderRegister,
token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
) -> None:
"""处理针对 create provider 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: provider_register (ProviderRegister): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_register 实例。 token_data (TokenData): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (None): 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""注册一个 Providerowner 为当前登录用户的 ``user_id``。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
await global_state_machine.add_provider_wrap.remote(
provider_type=provider_register.provider_type,
@@ -58,10 +54,7 @@ async def create_provider(
async def get_provider_list(
_: TokenData = Depends(Accessor.get_current_user),
) -> Dict[str, Dict[str, Provider]]:
"""处理针对 get provider list 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: _ (TokenData): 参与 get provider list 逻辑运算或数据构建的上下文依赖对象。
Returns: (Dict[str, Dict[str, Provider]]): 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""返回当前所有已注册的 Provider,前端用以展示模型清单。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
provider_list: Dict[
str, Provider
@@ -76,10 +69,7 @@ async def delete_provider(
RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)
),
) -> dict:
"""处理针对 delete provider 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 _ (TokenData): 参与 delete provider 逻辑运算或数据构建的上下文依赖对象。
Returns: (dict): 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""删除指定 ``provider_title`` 的 Provider;仅超管可调用。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
await global_state_machine.delete_provider.remote(provider_title=provider_title)
return {"message": "success"}
+5 -18
View File
@@ -24,8 +24,7 @@ resource_router = APIRouter(prefix="/api/v1/resource")
class Skill(BaseModel):
"""Skill 核心组件类。
这是一个领域数据模型或功能封装类,承载了 Skill 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``POST /skill`` 入参:技能仓库地址及可选子目录路径。"""
repo_url: str
path: str | None
@@ -35,10 +34,7 @@ class Skill(BaseModel):
async def install_skill(
skill: Skill, _: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER))
):
"""处理针对 install skill 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: skill (Skill): 参与 install skill 逻辑运算或数据构建的上下文依赖对象。 _ (TokenData): 参与 install skill 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""通过 viceroy 把 skill 仓库克隆到 ``plugin/skill``,并在状态机中登记。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
# noinspection PyUnresolvedReferences
import os
@@ -62,10 +58,7 @@ async def install_skill(
async def get_skills(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""处理针对 get skills 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: _ (TokenData): 参与 get skills 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""返回当前状态机中已登记的所有 skill 名称列表。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
skills = await global_state_machine.get_skill_list.remote()
return {"skills": skills}
@@ -78,10 +71,7 @@ async def delete_skill(
RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)
),
):
"""处理针对 delete skill 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: skill_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 _ (TokenData): 参与 delete skill 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""从状态机中移除 skill 注册项;不会删除磁盘上的代码文件。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
# Note: this only removes it from the state machine manager.
await global_state_machine.remove_skill.remote(skill_name)
@@ -92,10 +82,7 @@ async def delete_skill(
async def get_tools(
_: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)),
):
"""处理针对 get tools 相关的 HTTP API 请求。
该接口负责解析前端传入的载荷数据,调用底层核心业务逻辑进行处理,并组装标准化的 JSON 响应。
Args: _ (TokenData): 参与 get tools 逻辑运算或数据构建的上下文依赖对象。
Returns: : 序列化后的标准网络响应模型(如包含业务状态码、成功标志及对应的数据载荷 Data)。"""
"""汇总各作用域 tool_mapper,返回去重后的工具名称列表。"""
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
tool_mapper = await global_state_machine.get_tool_mapper.remote()
all_tool_names = set()
@@ -24,8 +24,11 @@ from kilostar.core.global_state_machine.individual_manager import (
@ray.remote
class GlobalStateMachine:
"""GlobalStateMachine 核心组件类
这是一个领域数据模型或功能封装类,承载了 GlobalStateMachine 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""全局状态机 Actor,统一持有 Provider/Tool/Skill/Individual 四个注册表
其它 Actor 通过 ``ray.get_actor("global_state_machine")`` 拿到本实例,
再调用本类暴露的方法来读写各注册表,避免每个 Actor 各自维护一份状态。
"""
def __init__(self, postgres_database: PostgresDatabase):
import sys
@@ -44,8 +47,7 @@ class GlobalStateMachine:
print("GSM __init__ DONE", file=sys.stderr, flush=True)
async def init_state_machine(self):
"""完成 state machine 模块的启动与依赖初始化。
在系统引导或服务拉起阶段被调用,负责建立网络连接、分配基础内存资源及注册核心服务组件。"""
"""从数据库加载 Provider/Individual 注册表到内存。"""
await self._global_provider_manager.init_provider_register(
self.postgres_database
)
@@ -61,10 +63,7 @@ class GlobalStateMachine:
provider_apikey,
provider_owner,
):
"""创建并持久化新的 provider wrap 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_type: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_type 实例。 provider_title: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 provider_url: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_url 实例。 provider_apikey: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_apikey 实例。 provider_owner: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_owner 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新增一个模型 Provider:内存注册 + 数据库持久化一并完成。"""
return await self._global_provider_manager.add_provider(
provider_type=provider_type,
provider_title=provider_title,
@@ -76,41 +75,26 @@ class GlobalStateMachine:
# Provider Manager Methods
def get_provider_list(self):
"""检索并获取特定的 provider list 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回内存中已登记的全部 Provider。"""
return self._global_provider_manager.get_provider_list()
def get_provider(self, provider_title):
"""检索并获取特定的 provider 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: provider_title: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 provider_title 取出单个 Provider 实例。"""
return self._global_provider_manager.get_provider(provider_title)
async def delete_provider(self, provider_title: str):
"""安全地移除或注销 provider。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""删除一个 Provider:内存注册 + 数据库持久化一并完成。"""
return await self._global_provider_manager.delete_provider(
provider_title, self.postgres_database
)
# Tool Manager Methods
def get_tool_mapper(self):
"""检索并获取特定的 tool mapper 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回 agent_name -> {tool_name: callable} 的全量映射。"""
return self._global_tool_manager.tool_mapper
def get_tool_list(self, agent_name: str):
# get_tool_list didn't actually exist on tool_manager, let's implement it to return the tools
# for a specific agent name (or scope)
"""检索并获取特定的 tool list 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: agent_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回某个 agent 可用的工具集(其专属工具与 default 工具的并集)。"""
tools = self._global_tool_manager.tool_mapper.get(agent_name, {})
# also include default tools
default_tools = self._global_tool_manager.tool_mapper.get("default", {})
@@ -119,49 +103,30 @@ class GlobalStateMachine:
# Skill Manager Methods
def add_skill(self, skill_name: str):
"""创建并持久化新的 skill 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: skill_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""注册一个新的 Skill 名称到 Skill 注册表。"""
return self._global_skill_manager.add_skill(skill_name)
def get_skill_list(self):
"""检索并获取特定skill list 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回全部已注册Skill 名称。"""
return self._global_skill_manager.get_skill_list()
def remove_skill(self, skill_name: str):
"""安全地移除或注销 skill。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: skill_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从注册表中移除一个 Skill。"""
return self._global_skill_manager.remove_skill(skill_name)
# Individual Manager Methods
def add_individual(self, agent_id: str, config):
"""创建并持久化新的 individual 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 config: 驱动该模块运行的核心配置字典或 Pydantic 数据模型,定义了重试策略、超时时间及模型参数等选项。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把一个 Worker Individual 的运行期配置加入注册表。"""
return self._global_individual_manager.add_individual(agent_id, config)
def get_individual(self, agent_id: str):
"""检索并获取特定的 individual 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 agent_id 取出某个 Worker Individual 的配置。"""
return self._global_individual_manager.get_individual(agent_id)
def remove_individual(self, agent_id: str):
"""安全地移除或注销 individual。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从注册表中移除一个 Worker Individual。"""
return self._global_individual_manager.remove_individual(agent_id)
def list_individuals(self):
"""执行与 list individuals 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回当前注册的全部 Worker Individual 列表。"""
return self._global_individual_manager.list_individuals()
@@ -19,17 +19,17 @@ logger = get_logger("individual_manager")
class GlobalIndividualManager:
"""GlobalIndividualManager 核心组件类。
这是一个管理器类,职责集中在维护整个系统内有关 GlobalIndividual 资源的全局生命周期。它提供了注册机制、状态同步以及跨组件的统一查询入口,确保系统中该类型资源的实例一致性与可控性。"""
"""Worker Individual 的内存注册表,按 agent_id 索引其配置字典。"""
def __init__(self):
self._individuals: Dict[str, Dict[str, Any]] = {}
async def init_individual_register(self, postgres) -> None:
"""完成 individual register 模块的启动与依赖初始化
在系统引导或服务拉起阶段被调用,负责建立网络连接、分配基础内存资源及注册核心服务组件。
Args: postgres: 参与 init individual register 逻辑运算或数据构建的上下文依赖对象。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从 Postgres 拉取已存的全部 Worker Individual 配置写入内存
若底层数据库尚未实现 ``get_all_worker_individual``,会以警告形式跳过,
而不是直接抛出,以便老库平滑升级。
"""
try:
try:
individuals = await postgres.get_all_worker_individual.remote()
@@ -74,15 +74,10 @@ class GlobalIndividualManager:
return self._individuals.get(agent_id, None)
def remove_individual(self, agent_id: str) -> None:
"""安全地移除或注销 individual。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从注册表中删除指定 agent_id;不存在时静默返回。"""
if agent_id in self._individuals:
del self._individuals[agent_id]
def list_individuals(self) -> Dict[str, Dict[str, Any]]:
"""执行与 list individuals 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: (Dict[str, Dict[str, Any]]): 高度聚合的字典结构数据,将多维度的属性特征或统计指标组合后一并返回。"""
"""返回 agent_id -> config 的全量映射。"""
return self._individuals
@@ -19,16 +19,14 @@ from enum import Enum
class ProviderStatus(str, Enum):
"""ProviderStatus 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""Provider 健康状态枚举:``UP`` 表示可用,``DOWN`` 表示已被探测为不可用。"""
UP = "up"
DOWN = "down"
class Provider(BaseModel):
"""Provider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""模型 Provider 的运行期表示,包含基础信息以及当前健康状态。"""
provider_title: str
provider_url: str
@@ -40,8 +38,7 @@ class Provider(BaseModel):
class ProviderArgs(BaseModel):
"""ProviderArgs 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""新增 Provider 时的入参集合,由 API 层拼装后传给具体 Provider 的工厂。"""
provider_title: str
provider_url: str
@@ -50,8 +47,7 @@ class ProviderArgs(BaseModel):
class BaseProvider(ABC):
"""BaseProvider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""所有具体 Provider 适配器的抽象基类,约定 ``create_provider`` 工厂三段式。"""
@staticmethod
@abstractmethod
@@ -24,15 +24,11 @@ from typing import List
class ClaudeProvider(BaseProvider):
"""ClaudeProvider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""Anthropic Claude Provider:使用 ``x-api-key`` + ``anthropic-version`` 头拉模型列表。"""
@staticmethod
async def create_provider(provider_args: ProviderArgs) -> Provider:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""工厂入口:拉取 Claude 模型列表后包装成 Provider。"""
provider_models: List[str] = await ClaudeProvider._load_models(provider_args)
provider: Provider = ClaudeProvider._return_provider(
provider_args, provider_models
@@ -43,10 +39,7 @@ class ClaudeProvider(BaseProvider):
@retry_on_retryable_error()
async def _load_models(provider_args: ProviderArgs) -> List[str]:
# Anthropic 官方需要 version 头
"""执行与 load models 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (List[str]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""从 ``/v1/models`` 拉取模型列表;接口不可用时回落到一组已知的 Claude 3.x 模型。"""
headers = {
"x-api-key": provider_args.provider_apikey,
"anthropic-version": "2023-06-01",
@@ -78,10 +71,7 @@ class ClaudeProvider(BaseProvider):
def _return_provider(
provider_args: ProviderArgs, provider_models: List[str]
) -> Provider:
"""执行与 return provider 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。 provider_models (List[str]): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_models 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把 ProviderArgs + 模型清单包装成 ``provider_type="claude"`` 的 Provider。"""
return Provider(
provider_title=provider_args.provider_title,
provider_apikey=provider_args.provider_apikey,
@@ -23,15 +23,11 @@ from typing import List
class DeepseekProvider(BaseProvider):
"""DeepseekProvider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""Deepseek ProviderAPI 兼容 OpenAI 协议,复用 ``GET /v1/models`` 拉取模型清单。"""
@staticmethod
async def create_provider(provider_args: ProviderArgs) -> Provider:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""工厂入口:拉取 Deepseek 模型列表后包装成 Provider。"""
provider_models: List[str] = await DeepseekProvider._load_models(provider_args)
provider: Provider = DeepseekProvider._return_provider(
provider_args, provider_models
@@ -41,10 +37,7 @@ class DeepseekProvider(BaseProvider):
@staticmethod
@retry_on_retryable_error()
async def _load_models(provider_args: ProviderArgs) -> List[str]:
"""执行与 load models 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (List[str]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""从 ``{base_url}/v1/models`` 拉取模型 ID 列表;网络异常会被包装为 RetryableError。"""
headers = {
"Authorization": f"Bearer {provider_args.provider_apikey}",
"Content-Type": "application/json",
@@ -81,10 +74,7 @@ class DeepseekProvider(BaseProvider):
def _return_provider(
provider_args: ProviderArgs, provider_models: List[str]
) -> Provider:
"""执行与 return provider 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。 provider_models (List[str]): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_models 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把 ProviderArgs + 模型清单包装成 ``provider_type="deepseek"`` 的 Provider。"""
return Provider(
provider_title=provider_args.provider_title,
provider_apikey=provider_args.provider_apikey,
@@ -23,15 +23,11 @@ from typing import List
class GeminiProvider(BaseProvider):
"""GeminiProvider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 Google Gemini)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""Google Gemini Provider:调用 ``/v1beta/models`` 接口获取模型清单。"""
@staticmethod
async def create_provider(provider_args: ProviderArgs) -> Provider:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""工厂入口:拉取 Gemini 模型列表后包装成 Provider。"""
provider_models: List[str] = await GeminiProvider._load_models(provider_args)
provider: Provider = GeminiProvider._return_provider(
provider_args, provider_models
@@ -41,10 +37,7 @@ class GeminiProvider(BaseProvider):
@staticmethod
@retry_on_retryable_error()
async def _load_models(provider_args: ProviderArgs) -> List[str]:
"""执行与 load models 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (List[str]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""从 ``/v1beta/models`` 拉取模型列表,去掉 ``models/`` 前缀;网络异常会被包装为 RetryableError。"""
headers = {
"Authorization": f"Bearer {provider_args.provider_apikey}",
"Content-Type": "application/json",
@@ -78,10 +71,7 @@ class GeminiProvider(BaseProvider):
def _return_provider(
provider_args: ProviderArgs, provider_models: List[str]
) -> Provider:
"""执行与 return provider 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。 provider_models (List[str]): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_models 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把 ProviderArgs + 模型清单包装成 ``provider_type="gemini"`` 的 Provider。"""
return Provider(
provider_title=provider_args.provider_title,
provider_apikey=provider_args.provider_apikey,
@@ -23,15 +23,11 @@ from typing import List
class OpenAIProvider(BaseProvider):
"""OpenAIProvider 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""OpenAI 兼容 Provider:通过 ``GET /v1/models`` 拉取模型清单,包装为 Provider 对象。"""
@staticmethod
async def create_provider(provider_args: ProviderArgs) -> Provider:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""工厂入口:拉取模型列表后包装成 Provider。"""
provider_models: List[str] = await OpenAIProvider._load_models(provider_args)
provider: Provider = OpenAIProvider._return_provider(
provider_args, provider_models
@@ -41,10 +37,7 @@ class OpenAIProvider(BaseProvider):
@staticmethod
@retry_on_retryable_error()
async def _load_models(provider_args: ProviderArgs) -> List[str]:
"""执行与 load models 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。
Returns: (List[str]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""从 ``{base_url}/v1/models`` 拉取模型 ID 列表;网络异常会被包装为 RetryableError。"""
headers = {
"Authorization": f"Bearer {provider_args.provider_apikey}",
"Content-Type": "application/json",
@@ -81,10 +74,7 @@ class OpenAIProvider(BaseProvider):
def _return_provider(
provider_args: ProviderArgs, provider_models: List[str]
) -> Provider:
"""执行与 return provider 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: provider_args (ProviderArgs): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_args 实例。 provider_models (List[str]): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_models 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把 ProviderArgs + 模型清单包装成 ``provider_type="openai"`` 的 Provider。"""
return Provider(
provider_title=provider_args.provider_title,
provider_apikey=provider_args.provider_apikey,
@@ -45,10 +45,7 @@ class ProviderManager:
self.provider_register = {}
async def init_provider_register(self, postgres) -> None:
"""完成 provider register 模块的启动与依赖初始化。
在系统引导或服务拉起阶段被调用,负责建立网络连接、分配基础内存资源及注册核心服务组件。
Args: postgres: 参与 init provider register 逻辑运算或数据构建的上下文依赖对象。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从 Postgres 读取已存的 Provider 列表,按 provider_title 装入内存注册表。"""
providers = await postgres.get_provider.remote()
for provider in providers:
self.provider_register[provider.provider_title] = provider
@@ -62,10 +59,13 @@ class ProviderManager:
provider_owner,
postgres_database,
) -> None:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: provider_type: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_type 实例。 provider_title: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 provider_url: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_url 实例。 provider_apikey: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_apikey 实例。 provider_owner: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_owner 实例。 postgres_database: 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新增并落库一个 Provider
- 按 ``provider_type`` 选择具体适配器(openai/claude/deepseek/gemini);
- 适配器调用其 ``create_provider`` 拉取模型清单;
- 写入内存注册表,并通过 ``postgres_database`` 持久化。
网络异常会包装成 RetryableError;不支持的类型记 warning 后返回 None。
"""
from kilostar.core.global_state_machine.model_provider import ProviderArgs
from kilostar.utils.logger import get_logger
@@ -112,23 +112,15 @@ class ProviderManager:
)
def get_provider_list(self):
"""检索并获取特定的 provider list 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回 provider_title -> Provider 的全量映射。"""
return self.provider_register
def get_provider(self, provider_title):
"""检索并获取特定的 provider 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: provider_title: 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 provider_title 取出单个 Provider;不存在返回 None。"""
return self.provider_register.get(provider_title)
async def delete_provider(self, provider_title: str, postgres_database) -> None:
"""安全地移除或注销 provider。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 postgres_database: 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从内存注册表 + Postgres 中一并删除指定 Provider;不存在时静默返回。"""
if provider_title in self.provider_register:
provider = self.provider_register[provider_title]
await postgres_database.delete_provider_db.remote(
@@ -19,8 +19,7 @@ import json
class GlobalSkillManager:
"""GlobalSkillManager 核心组件类。
这是一个管理器类,职责集中在维护整个系统内有关 GlobalSkill 资源的全局生命周期。它提供了注册机制、状态同步以及跨组件的统一查询入口,确保系统中该类型资源的实例一致性与可控性。"""
"""Skill 注册表:从 ``kilostar/plugin/skill/<name>/skill.json`` 启动期一次性扫描加载。"""
skill_mapper = Dict[str, Tuple[str]]
"""skill的存储表"""
@@ -24,8 +24,8 @@ logger = get_logger("tool_manager")
class GlobalToolManager:
"""GlobalToolManager 核心组件类。
这是一个管理器类,职责集中在维护整个系统内有关 GlobalTool 资源的全局生命周期。它提供了注册机制、状态同步以及跨组件的统一查询入口,确保系统中该类型资源的实例一致性与可控性"""
"""工具注册表:扫描 ``kilostar/plugin/tool_plugin/`` 下所有 BaseToolData 子类,
按 ``action_scope`` 分桶到 ``tool_mapper[scope][plugin_name]``;无 scope 的归入 ``default``"""
tool_mapper: Dict[str, Dict[str, Type[BaseToolData]]]
@@ -25,15 +25,13 @@ class ConsciousnessNodeResponse(ResponseModel):
pass
class ConsciousnessNodeDeps(DepsModel):
"""ConsciousnessNodeDeps 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""ConsciousnessNode 在 pydantic-ai Agent 中使用的依赖:原始指令、当前指令以及可用 Skill 列表。"""
original_command: str
command: str
available_skills: Optional[List[str]]
class ConsciousnessNodeInput(RequestModel):
"""ConsciousnessNodeInput 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""ConsciousnessNode 各类入参的共同基类,仅用于打 schema 标签。"""
pass
@@ -60,24 +58,21 @@ class ForregulatoryNode(ConsciousnessNodeResponse):
)
class ForWorkflowEngineInput(ConsciousnessNodeInput):
"""ForWorkflowEngineInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowEngineInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""从 RegulatoryNode 移交过来生成 Workflow 的入参:原始指令 + 已注册的 Skill 列表。"""
original_command: str
available_skills: list[dict] | None = None
class ForWorkflowInput(ConsciousnessNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""工作流执行期分配给 ConsciousnessNode 的步骤入参:当前 step + 原始指令上下文。"""
workflow_step: WorkflowStep
original_command: str
class ForregulatoryInput(ConsciousnessNodeInput):
"""ForregulatoryInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForregulatoryInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""工作流跑完后回交给 RegulatoryNode 时的入参:完整 workflow 对象 + 原始指令。"""
workflow: KiloStarWorkflow
original_command: str
@@ -26,8 +26,11 @@ from kilostar.core.individual.control_node.template import (
@ray.remote
class ControlNode:
"""ControlNode 核心组件类
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""ControlNode(控制节点):工作流中具体子任务的执行 Actor
它把 ConsciousnessNode 编排出的 ``workflow_step`` 拿来当作输入,借助
pydantic-ai Agent + 已绑定的工具集合产出 ``ForWorkflow`` 结构化输出。
"""
def __init__(self):
from kilostar.utils.logger import get_logger
@@ -85,10 +88,7 @@ class ControlNode:
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ControlNodeDeps]):
"""执行与 dynamic prompt 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: ctx (RunContext[ControlNodeDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""运行期动态拼接 system prompt:把当前 workflow_step 的关键字段塞进去。"""
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务步骤上下文 ===\n"
@@ -99,10 +99,7 @@ class ControlNode:
return prompt
async def working(self, payload: ForWorkflowInput) -> str:
"""执行与 working 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: payload (ForWorkflowInput): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
"""对外入口:执行一次步骤,吞掉异常并返回 ``None`` 以避免拖垮上游 Workflow。"""
try:
result: ForWorkflow = await self._run(payload)
return result
@@ -111,10 +108,7 @@ class ControlNode:
return None
async def _run(self, payload: ForWorkflowInput) -> ForWorkflow:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: payload (ForWorkflowInput): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (ForWorkflow): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""实际执行步骤:组装 ``ControlNodeDeps``、调用 Agent,最终把 ``ForWorkflow`` 输出取出。"""
try:
self.agent.retries = 3
deps = ControlNodeDeps(workflow_step=payload.workflow_step)
@@ -15,34 +15,30 @@
from pydantic import Field
from kilostar.core.work.workflow.workflow import WorkflowStep
from kilostar.utils.agent_model import ResponseModel, InputModel, DepsModel
from kilostar.utils.agent_model import ResponseModel, RequestModel, DepsModel
class ControlNodeResponse(ResponseModel):
"""控制节点回复的基类"""
"""控制节点回复的基类"""
pass
class ControlNodeInput(InputModel):
"""ControlNodeInput 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
class ControlNodeInput(RequestModel):
"""控制节点输入的基类,承载一次调度所需的入参。"""
pass
class ControlNodeDeps(DepsModel):
"""ControlNodeDeps 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""控制节点运行期依赖,注入到 pydantic-ai Agent 的 RunContext。"""
workflow_step: WorkflowStep
workflow_step: WorkflowStep
# In the future, this can be dynamically populated with tools specific to the current task execution
class ForWorkflow(ControlNodeResponse):
"""ForWorkflow 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflow 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""控制节点执行单个工作流步骤的输出模型。"""
output: str = Field(
..., description="控制节点执行特定工作流步骤的结果。包含执行细节和输出数据。"
@@ -50,7 +46,6 @@ class ForWorkflow(ControlNodeResponse):
class ForWorkflowInput(ControlNodeInput):
"""ForWorkflowInput 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ForWorkflowInput 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""控制节点针对工作流步骤的输入模型。"""
workflow_step: WorkflowStep
@@ -28,8 +28,11 @@ from pydantic_ai import RunContext, Agent
@ray.remote
class RegulatoryNode:
"""regulatoryNode 核心组件类
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""RegulatoryNode(监管节点):用户请求的入口路由 Actor
负责对消息做意图识别:闲聊 → 直接回 ``ForUser``;复杂任务 → 走
``ForConsciousnessNode`` 移交给意识节点;工作流回执 → 转译成对用户的总结回复。
"""
def __init__(self) -> None:
from kilostar.utils.logger import get_logger
@@ -86,10 +89,7 @@ class RegulatoryNode:
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[RegulatoryNodeDeps]):
"""执行与 dynamic prompt 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: ctx (RunContext[regulatoryNodeDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""运行期动态拼接 system prompt:注入平台/用户/时间/错误历史等上下文。"""
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前上下文 ===\n"
@@ -22,15 +22,9 @@ logger = get_logger("database_exception")
def database_exception(func):
"""执行与 database exception 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: func: 参与 database exception 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""异步装饰器:把 SQLAlchemy / Pydantic / 业务异常归类记日志后再抛出。"""
async def wrapper(*args, **kwargs):
"""执行与 wrapper 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
return await func(*args, **kwargs)
except ValidationError as e:
@@ -32,8 +32,7 @@ _AGENT_TYPE_MODEL_MAP = {
class IndividualDatabase:
"""IndividualDatabase 核心组件类。
这是一个数据库操作层 (DAO/Repository) 封装类,专注于处理实体模型与关系型数据库表之间的映射。它将复杂的 SQL 查询、跨表 Join 和事务回滚逻辑进行了高级抽象,向上层服务暴露简洁的数据读写接口。"""
"""Individual 表族(Base/Specialist/Ordinary/Special)的 DAO,按 agent_type 选择具体子表。"""
def __init__(self, async_session_maker):
self.async_session_maker = async_session_maker
@@ -44,8 +43,7 @@ class IndividualDatabase:
@database_exception
async def add_worker_individual(self, **kwargs):
"""创建并持久化新的 worker individual 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。"""
"""新建一个 Worker Individual:自动生成 ULID,按 ``agent_type`` 选择对应子表写入。"""
async with self.async_session_maker() as session:
agent_id = str(ULID())
agent_type = kwargs.get("agent_type", "base")
@@ -58,7 +56,7 @@ class IndividualDatabase:
@database_exception
async def get_worker_individual(self, agent_id: str):
"""检索并获取特定的 worker individual 数据集合或实例对象"""
"""按 agent_id 取单个 Individual;不存在返回 None"""
async with self.async_session_maker() as session:
statement = select(BaseIndividualModel).where(
BaseIndividualModel.agent_id == agent_id
@@ -68,7 +66,7 @@ class IndividualDatabase:
@database_exception
async def get_worker_individual_list(self, owner_id: str):
"""检索并获取特定的 worker individual list 数据集合或实例对象"""
"""读取某用户名下的所有 Individual"""
async with self.async_session_maker() as session:
statement = select(BaseIndividualModel).where(
BaseIndividualModel.owner_id == owner_id
@@ -78,7 +76,7 @@ class IndividualDatabase:
@database_exception
async def update_worker_individual(self, agent_id: str, **kwargs):
"""对现有的 worker individual 进行状态更新或属性覆盖"""
"""部分更新 Individual:只覆盖 kwargs 中非 None 的字段;找不到返回 None"""
async with self.async_session_maker() as session:
statement = select(BaseIndividualModel).where(
BaseIndividualModel.agent_id == agent_id
@@ -97,7 +95,7 @@ class IndividualDatabase:
@database_exception
async def delete_worker_individual(self, agent_id: str) -> bool:
"""安全地移除或注销 worker individual"""
"""删除 Individual;不存在返回 False,删除成功返回 True"""
async with self.async_session_maker() as session:
statement = select(BaseIndividualModel).where(
BaseIndividualModel.agent_id == agent_id
@@ -112,7 +110,7 @@ class IndividualDatabase:
@database_exception
async def get_all_worker_individual(self):
"""检索并获取特定的 all worker individual 数据集合或实例对象"""
"""返回数据库中全部 Individual"""
async with self.async_session_maker() as session:
statement = select(BaseIndividualModel)
results = await session.execute(statement)
@@ -20,17 +20,14 @@ from kilostar.core.postgres_database.database_exception import database_exceptio
class ProviderDatabase:
"""ProviderDatabase 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""Provider 表的 DAO:模型 Provider 的增删查改。"""
def __init__(self, async_session_maker):
self.async_session_maker = async_session_maker
@database_exception
async def get_provider(self) -> List[ProviderModel]:
"""检索并获取特定的 provider 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: (List[ProviderModel]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""返回全部 Provider,并将每行重新构造为新的 ``ProviderModel`` 实例(脱离 session)。"""
async with self.async_session_maker() as session:
statement = select(ProviderModel)
results = await session.execute(statement)
@@ -53,9 +50,7 @@ class ProviderDatabase:
@database_exception
async def add_provider(self, **kwargs) -> None:
"""创建并持久化新的 provider 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新建一条 Provider 记录;字段通过 kwargs 直接传给 ProviderModel。"""
async with self.async_session_maker() as session:
provider = ProviderModel(**kwargs)
session.add(provider)
@@ -63,10 +58,7 @@ class ProviderDatabase:
@database_exception
async def delete_provider(self, provider_id: str) -> None:
"""安全地移除或注销 provider。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""删除指定 ID 的 Provider;不存在时静默返回。"""
async with self.async_session_maker() as session:
provider = await session.get(ProviderModel, provider_id)
if provider is not None:
@@ -75,10 +67,7 @@ class ProviderDatabase:
@database_exception
async def update_provider(self, provider_id: str, **kwargs) -> None:
"""对现有的 provider 进行状态更新或属性覆盖。
基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。
Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。
Returns: (Provider): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""部分更新指定 Provider 的字段;不存在时返回 None,否则返回刷新后的对象。"""
async with self.async_session_maker() as session:
provider = await session.get(ProviderModel, provider_id)
if provider is not None:
@@ -19,8 +19,7 @@ from kilostar.core.postgres_database.database_exception import database_exceptio
class SystemNodeDatabase:
"""SystemNodeDatabase 核心组件类。
这是一个系统执行节点类,作为多智能体架构中的独立处理单元。它能够接收工作流上下文,根据内置的大模型策略进行意图理解和自主决策,从而驱动特定阶段的任务闭环。"""
"""SystemNodeConfig 表的 DAO:管理 control/consciousness/regulatory 等系统节点的模型配置。"""
def __init__(self, async_session_maker):
self.async_session_maker = async_session_maker
@@ -33,10 +32,7 @@ class SystemNodeDatabase:
model_id: str,
tools: Optional[List[str]] = None,
) -> SystemNodeConfigModel:
"""执行与 upsert system node config 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: node_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 model_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 model 实例。 tools (Optional[List[str]]): 控制逻辑流向的具体字符串参数,指定了期望的 tools 内容。
Returns: (SystemNodeConfigModel): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 node_name 插入或更新一个系统节点的模型配置(Provider + 模型 ID + 工具列表)。"""
async with self.async_session_maker() as session:
statement = select(SystemNodeConfigModel).where(
SystemNodeConfigModel.node_name == node_name
@@ -62,9 +58,7 @@ class SystemNodeDatabase:
@database_exception
async def get_all_system_node_configs(self) -> List[SystemNodeConfigModel]:
"""检索并获取特定的 all system node configs 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: (List[SystemNodeConfigModel]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""返回所有系统节点的模型配置列表。"""
async with self.async_session_maker() as session:
statement = select(SystemNodeConfigModel)
results = await session.execute(statement)
@@ -74,10 +68,7 @@ class SystemNodeDatabase:
async def get_system_node_config(
self, node_name: str
) -> Optional[SystemNodeConfigModel]:
"""检索并获取特定的 system node config 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: node_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (Optional[SystemNodeConfigModel]): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 node_name 取出单个系统节点的模型配置;不存在返回 None。"""
async with self.async_session_maker() as session:
statement = select(SystemNodeConfigModel).where(
SystemNodeConfigModel.node_name == node_name
+8 -29
View File
@@ -21,18 +21,14 @@ from kilostar.utils.access import Accessor
class AuthDatabase:
"""AuthDatabase 核心组件类。
这是一个数据库操作层 (DAO/Repository) 封装类,专注于处理实体模型与关系型数据库表之间的映射。它将复杂的 SQL 查询、跨表 Join 和事务回滚逻辑进行了高级抽象,向上层服务暴露简洁的数据读写接口。"""
"""User 表的 DAO:注册、登录、改密、删除以及权限读写。"""
def __init__(self, async_session_maker):
self.async_session_maker = async_session_maker
@database_exception
async def add_user(self, user_name: str, hashed_password: str) -> User:
"""创建并持久化新的 user 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 hashed_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 hashed password 内容。
Returns: (User): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新建一名用户;若当前库中尚无任何用户,第一名将被自动赋予 SUPER_ADMINISTRATOR 权限。"""
from ulid import ULID
async with self.async_session_maker() as session:
@@ -58,10 +54,7 @@ class AuthDatabase:
@database_exception
async def change_password(self, user_name, old_password, new_password) -> User:
"""执行与 change password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_name: 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 old_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。 new_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。
Returns: (User): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""校验旧密码后将其替换为新密码;旧密码不匹配抛 UserPasswordError。"""
async with self.async_session_maker() as session:
statement = select(User).where(User.user_name == user_name)
results = await session.execute(statement)
@@ -78,10 +71,7 @@ class AuthDatabase:
@database_exception
async def delete_user(self, user_name: str) -> None:
"""安全地移除或注销 user。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按用户名删除一名用户,不存在则抛 UserNotExistError。"""
async with self.async_session_maker() as session:
statement = select(User).where(User.user_name == user_name)
results = await session.execute(statement)
@@ -93,10 +83,7 @@ class AuthDatabase:
@database_exception
async def delete_user_by_id(self, user_id: str) -> None:
"""安全地移除或注销 user by id。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按用户 ID 删除一名用户,不存在则抛 UserNotExistError。"""
async with self.async_session_maker() as session:
user = await session.get(User, user_id)
if user is None:
@@ -106,10 +93,7 @@ class AuthDatabase:
@database_exception
async def login_user(self, user_name: str) -> str:
"""执行与 login user 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
"""按用户名查出 User 记录返回给上层;上层再做密码校验并签发 token。"""
async with self.async_session_maker() as session:
statement = select(User).where(User.user_name == user_name)
results = await session.execute(statement)
@@ -120,9 +104,7 @@ class AuthDatabase:
@database_exception
async def get_all_users(self) -> list[User]:
"""检索并获取特定的 all users 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: (list[User]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""返回数据库中全部用户列表。"""
async with self.async_session_maker() as session:
statement = select(User)
results = await session.execute(statement)
@@ -131,10 +113,7 @@ class AuthDatabase:
@database_exception
async def get_user_authority(self, user_id: str) -> UserAuthority:
"""检索并获取特定user authority 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。
Returns: (UserAuthority): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回指定用户UserAuthority 枚举;不存在抛 UserNotExistError。"""
async with self.async_session_maker() as session:
user = await session.get(User, user_id)
if user is None:
+37 -76
View File
@@ -49,8 +49,12 @@ from .module.chat_history import ChatHistoryDatabase
@ray.remote
class PostgresDatabase:
"""PostgresDatabase 核心组件类
这是一个数据库操作层 (DAO/Repository) 封装类,专注于处理实体模型与关系型数据库表之间的映射。它将复杂的 SQL 查询、跨表 Join 和事务回滚逻辑进行了高级抽象,向上层服务暴露简洁的数据读写接口。"""
"""以 Ray Actor 形式暴露的统一数据库门面
内部组合了 Auth / Provider / Individual / SystemNode / Workflow / ChatHistory
六个子库,所有方法在调用前都会等待 ``ready_event``,确保 ``init_db`` 完成后
再放行业务请求。
"""
def __init__(self):
user = os.environ.get("POSTGRES_USER")
@@ -76,6 +80,7 @@ class PostgresDatabase:
self.ready_event = asyncio.Event()
async def init_db(self) -> None:
"""根据 metadata 创建(或校验)所有 ORM 表,并置位 ready_event。"""
try:
async with self.async_engine.begin() as conn:
await conn.run_sync(BaseDataModel.metadata.create_all)
@@ -88,98 +93,65 @@ class PostgresDatabase:
# Auth Database Methods
async def add_user(self, user_name: str, hashed_password: str):
"""创建并持久化新的 user 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 hashed_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 hashed password 内容。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新建一名用户。"""
await self.ready_event.wait()
return await self._auth_database.add_user(user_name, hashed_password)
async def change_password(self, user_name, old_password, new_password):
"""执行与 change password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_name: 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 old_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。 new_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""校验旧密码后将用户的密码替换为新密码。"""
await self.ready_event.wait()
return await self._auth_database.change_password(
user_name, old_password, new_password
)
async def delete_user(self, user_name: str):
"""安全地移除或注销 user。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按用户名删除一名用户。"""
await self.ready_event.wait()
return await self._auth_database.delete_user(user_name)
async def delete_user_by_id(self, user_id: str):
"""安全地移除或注销 user by id。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按用户 ID 删除一名用户。"""
await self.ready_event.wait()
return await self._auth_database.delete_user_by_id(user_id)
async def login_user(self, user_name: str):
"""执行与 login user 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按用户名查询用户记录,用于上层做密码校验与签发 token。"""
await self.ready_event.wait()
return await self._auth_database.login_user(user_name)
async def get_all_users(self):
"""检索并获取特定的 all users 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回全部用户列表。"""
await self.ready_event.wait()
return await self._auth_database.get_all_users()
async def get_user_authority(self, user_id: str):
"""检索并获取特定的 user authority 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""读取指定用户的权限/角色字段。"""
await self.ready_event.wait()
return await self._auth_database.get_user_authority(user_id)
async def change_user_authority(self, user_id: str, new_authority):
"""执行与 change user authority 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。 new_authority: 参与 change user authority 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""更新指定用户的权限/角色字段。"""
await self.ready_event.wait()
return await self._auth_database.change_user_authority(user_id, new_authority)
# Provider Database Methods
async def get_provider(self):
"""检索并获取特定的 provider 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回全部已登记的模型 Provider。"""
await self.ready_event.wait()
return await self._provider_database.get_provider()
async def add_provider_db(self, **kwargs):
"""创建并持久化新的 provider db 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""新增一个模型 Provider 记录。"""
await self.ready_event.wait()
return await self._provider_database.add_provider(**kwargs)
async def delete_provider_db(self, provider_id: str):
"""安全地移除或注销 provider db
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""删除指定 ID 的模型 Provider 记录"""
await self.ready_event.wait()
return await self._provider_database.delete_provider(provider_id)
async def update_provider_db(self, provider_id: str, **kwargs):
"""对现有的 provider db 进行状态更新或属性覆盖。
基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。
Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""部分更新指定 Provider 的字段。"""
await self.ready_event.wait()
return await self._provider_database.update_provider(provider_id, **kwargs)
@@ -191,68 +163,47 @@ class PostgresDatabase:
model_id: str,
tools: list[str] = None,
):
"""执行与 upsert system node config 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: node_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 model_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 model 实例。 tools (list[str]): 控制逻辑流向的具体字符串参数,指定了期望的 tools 内容。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""插入或更新某个系统节点(如 control/consciousness/regulatory)的模型配置。"""
await self.ready_event.wait()
return await self._system_node_database.upsert_system_node_config(
node_name, provider_title, model_id, tools
)
async def get_all_system_node_configs(self):
"""检索并获取特定的 all system node configs 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回所有系统节点的模型配置。"""
await self.ready_event.wait()
return await self._system_node_database.get_all_system_node_configs()
# Individual Database Methods
async def add_worker_individual(self, **kwargs):
"""创建并持久化新的 worker individual 实体
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""登记一个新的 Worker Individual 配置"""
await self.ready_event.wait()
return await self._individual_database.add_worker_individual(**kwargs)
async def get_worker_individual(self, agent_id: str):
"""检索并获取特定的 worker individual 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按 agent_id 读取单个 Worker Individual 配置。"""
await self.ready_event.wait()
return await self._individual_database.get_worker_individual(agent_id)
async def get_worker_individual_list(self, owner_id: str):
"""检索并获取特定的 worker individual list 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: owner_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 owner 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""读取某用户名下的所有 Worker Individual 配置。"""
await self.ready_event.wait()
return await self._individual_database.get_worker_individual_list(owner_id)
async def update_worker_individual(self, agent_id: str, **kwargs):
"""对现有的 worker individual 进行状态更新或属性覆盖。
基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""部分更新指定 Worker Individual 的字段。"""
await self.ready_event.wait()
return await self._individual_database.update_worker_individual(
agent_id, **kwargs
)
async def delete_worker_individual(self, agent_id: str):
"""安全地移除或注销 worker individual。
执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。
Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""删除指定的 Worker Individual。"""
await self.ready_event.wait()
return await self._individual_database.delete_worker_individual(agent_id)
async def get_all_worker_individual(self):
"""检索并获取特定的 all worker individual 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回全部 Worker Individual 配置。"""
await self.ready_event.wait()
return await self._individual_database.get_all_worker_individual()
@@ -260,46 +211,56 @@ class PostgresDatabase:
async def create_workflow(
self, trace_id: str, user_id: str, title: str, command: str
):
"""新建一个工作流记录。"""
await self.ready_event.wait()
return await self._workflow_database.create_workflow(
trace_id, user_id, title, command
)
async def get_workflow(self, trace_id: str):
"""按 trace_id 读取工作流记录。"""
await self.ready_event.wait()
return await self._workflow_database.get_workflow(trace_id)
async def update_workflow_status(self, trace_id: str, status: str):
"""更新工作流的状态字段。"""
await self.ready_event.wait()
return await self._workflow_database.update_workflow_status(trace_id, status)
async def list_workflows(self, user_id: str):
"""返回某用户名下的全部工作流。"""
await self.ready_event.wait()
return await self._workflow_database.list_workflows(user_id)
async def upsert_workflow_context(self, trace_id: str, **kwargs):
"""插入或更新工作流的运行期上下文快照。"""
await self.ready_event.wait()
return await self._workflow_database.upsert_workflow_context(trace_id, **kwargs)
async def get_workflow_context(self, trace_id: str):
"""读取指定工作流的上下文快照。"""
await self.ready_event.wait()
return await self._workflow_database.get_workflow_context(trace_id)
# Chat History Database Methods
async def create_chat_session(self, user_id: str, title: str = "新对话"):
"""新建一个聊天会话。"""
await self.ready_event.wait()
return await self._chat_history_database.create_chat_session(user_id, title)
async def list_chat_sessions(self, user_id: str):
"""返回某用户名下的全部聊天会话。"""
await self.ready_event.wait()
return await self._chat_history_database.list_chat_sessions(user_id)
async def add_chat_message(self, chat_id: str, message: str, message_owner: str):
"""向某个聊天会话追加一条消息。"""
await self.ready_event.wait()
return await self._chat_history_database.add_chat_message(
chat_id, message, message_owner
)
async def list_chat_messages(self, chat_id: str):
"""返回某个聊天会话的全部消息。"""
await self.ready_event.wait()
return await self._chat_history_database.list_chat_messages(chat_id)
+1 -5
View File
@@ -85,11 +85,7 @@ class KiloStarWorkflow(BaseModel):
@model_validator(mode="after")
def validate_workflow_integrity(self) -> "KiloStarWorkflow":
"""
执行与 validate workflow integrity 相关的核心业务流转操作。
该方法保证了workflow中的work_step的序号为递增且跳转逻辑不会发生越界
Returns:
('KiloStarWorkflow'): 经过校验后的KiloStarWorkflow对象。"""
"""校验 work_link 完整性:步骤序号连续递增,且 ``logic_gate.if_fail`` 跳转目标不越界。"""
steps = [s.step for s in self.work_link]
expected = list(range(1, len(steps) + 1))
if steps != expected:
@@ -18,8 +18,7 @@ from typing import List, Literal, Dict
class ApprovalToolData(BaseToolData):
"""ApprovalToolData 核心组件类。
这是一个可被智能体动态调用的外部工具组件类。它定义了清晰的输入参数 Schema 与执行契约,赋予智能体与外界真实系统(如文件、网页、API)进行交互的能力。"""
"""``approval`` 工具的元数据:默认面向 control/consciousness 两类节点开放。"""
is_system: bool = True
action_scope: List[
+1 -2
View File
@@ -18,8 +18,7 @@ from pydantic import ConfigDict
class BaseToolData(BaseModel):
"""BaseToolData 核心组件类。
这是一个可被智能体动态调用的外部工具组件类。它定义了清晰的输入参数 Schema 与执行契约,赋予智能体与外界真实系统(如文件、网页、API)进行交互的能力。"""
"""所有工具插件的基类:声明工具的作用域、是否系统级以及配置参数 schema。"""
model_config = ConfigDict(extra="allow")
is_system: bool
@@ -18,8 +18,7 @@ import os
class FileReaderData(BaseToolData):
"""FileReaderData 核心组件类。
这是一个领域数据模型或功能封装类,承载了 FileReaderData 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""``file_reader`` 工具的元数据:声明工具的名称、描述与是否系统级别。"""
is_system: bool = True
name: str = "file_reader"
+35 -39
View File
@@ -14,46 +14,57 @@
from __future__ import annotations
import jwt
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import HTTPException, status, Request
from typing import TYPE_CHECKING, Optional
import jwt
from fastapi import HTTPException, Request, status
from pydantic import BaseModel, ValidationError
from pwdlib import PasswordHash
if TYPE_CHECKING:
from kilostar.core.postgres_database.model.user import User
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24
_INSECURE_SECRETS = {"secret", "114514", "changethiskey12345"}
class TokenData(BaseModel):
"""TokenData 核心组件类。
这是一个领域数据模型或功能封装类,承载了 TokenData 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""JWT 解码后的用户身份载荷。"""
user_id: str
username: Optional[str] = None
exp: Optional[int] = None
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24
def _get_secret_key() -> str:
"""读取并校验 SECRET_KEY 环境变量。
校验在首次实际使用 JWT 时进行,避免在模块导入阶段抛错,
从而把"环境约束""模块加载"解耦。
"""
key = os.getenv("SECRET_KEY")
if not key or key in _INSECURE_SECRETS:
raise RuntimeError(
"未提供有效的 SECRET_KEY 或使用了不安全的默认值,请设置一个高熵的随机字符串"
)
return key
if not SECRET_KEY or SECRET_KEY in {"secret", "114514"}:
raise RuntimeError("未提供有效的 SECRET_KEY 或使用了不安全的默认值")
password_hasher = PasswordHash.recommended()
class Accessor:
"""Accessor 核心组件类。
这是一个领域数据模型或功能封装类,承载了 Accessor 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""封装认证与口令哈希相关的静态工具方法。"""
@staticmethod
def _decode_token(token: str) -> TokenData:
"""执行与 decode token 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: token (str): 由认证中心颁发的 JWT 或长期访问令牌,用于跨服务调用时的身份自证与权限校验。
Returns: (TokenData): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""解码并校验 JWT,返回 TokenData;过期或无效时抛 401。"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, _get_secret_key(), algorithms=[ALGORITHM])
return TokenData(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(
@@ -68,31 +79,22 @@ class Accessor:
@staticmethod
def _create_access_token(data: dict) -> str:
"""创建并持久化新的 access token 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: data (dict): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
"""根据 payload 生成带过期时间的 JWT 访问令牌。"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": int(expire.timestamp())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return jwt.encode(to_encode, _get_secret_key(), algorithm=ALGORITHM)
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""执行与 verify password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: plain_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 plain password 内容。 hashed_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 hashed password 内容。
Returns: (bool): 一个布尔型结果标志,明确返回 True 表示该操作成功应用或条件达成,False 则表示失败或被拒绝。"""
"""校验明文口令是否匹配数据库中存储的哈希。"""
return password_hasher.verify(plain_password, hashed_password)
@staticmethod
def get_current_user(request: Request) -> TokenData:
"""检索并获取特定的 current user 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: request (Request): FastAPI 框架注入的原生 HTTP 请求对象,包含了完整的 Header 标头、查询参数和正文流。
Returns: (TokenData): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从 Authorization Bearer 头解析当前请求的用户身份。"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
@@ -103,11 +105,8 @@ class Accessor:
return Accessor._decode_token(token)
@staticmethod
def login_hashed_password(user: User, password: str) -> str:
"""执行与 login hashed password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user (User): 当前已通过鉴权流程的访问者实体对象,内部包含用户角色、权限层级及租户归属等核心元信息。 password (str): 控制逻辑流向的具体字符串参数,指定了期望的 password 内容。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
def login_hashed_password(user: "User", password: str) -> str:
"""完成登录核验:找不到用户或密码错误抛 401,否则签发新令牌。"""
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -123,10 +122,7 @@ class Accessor:
@staticmethod
def hash_password(password: str) -> str:
"""执行与 hash password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: password (str): 控制逻辑流向的具体字符串参数,指定了期望的 password 内容。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
"""对明文口令做强哈希;空值或长度不足 6 位会抛 ValueError。"""
if not password:
raise ValueError("密码不能为空")
if len(password) < 6:
+8 -10
View File
@@ -18,19 +18,17 @@ import yaml
def print_banner() -> None:
"""执行与 print banner 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""在启动阶段输出 KiloStar 的 ASCII 横幅与版本/作者元信息。"""
with open("config/config.yml", "r") as config:
config = yaml.load(config, Loader=yaml.FullLoader)
version = config.get("version", "unknown")
kilostar_banner = """
██████╗ ██████╗ ███████╗████████╗ █████╗ ██████╗
██╔══██╗██╔══██╗██╔════╝╚══██╔══╝██╔══██╗██╔══██╗
█████╔╝██████╔╝█████╗ ██║ ████║██████╔╝
██╔═══╝ ██╔══████╔══╝ ██║ ██║ ██║██╔══██╗
██║ ██║ ██║███████╗ ██║ ██████╔╝██║ ██║
╚═╝ ╚═╝ ╚═╝╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝
kilostar_banner = r"""
██████╗██╗ ██████╗ ███████╗████████╗ █████╗ ██████╗
██║ ██╔╝██║██║ ██╔══██╗██╔════╝╚══██╔══╝██╔══██╗██╔══██╗
█████╔╝ ████║ ██║ ██║███████╗ ██║ ███████║██████╔╝
██╔═██╗ ████║ ██║ ██║╚════██║ ██║ ██╔══██║██╔══██╗
██║ ██╗██║███████╗╚██████╔╝███████║ ██║ ██║ ██║██║ ██║
╚═╝ ╚═╝╚═╝╚══════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝
"""
console = Console()
banner_colored = Text(kilostar_banner, style="gold3 bold")
+6 -10
View File
@@ -19,10 +19,7 @@ from kilostar.utils.ray_hook import ray_actor_hook
async def get_authority(user_id: str) -> UserAuthority:
"""检索并获取特定的 authority 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。
Returns: (UserAuthority): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""通过 PostgresDatabase Actor 查出指定用户的 ``UserAuthority``;用户不存在时抛 401。"""
from kilostar.utils.error import UserNotExistError
postgres_database = ray_actor_hook("postgres_database").postgres_database
@@ -43,8 +40,10 @@ async def get_authority(user_id: str) -> UserAuthority:
class RoleChecker:
"""RoleChecker 核心组件类
这是一个领域数据模型或功能封装类,承载了 RoleChecker 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""FastAPI 依赖:在路由级别按 ``UserAuthority`` 做最低权限校验
例:``Depends(RoleChecker(allowed_roles=UserAuthority.ADMINISTRATOR))``。
"""
def __init__(self, **kwargs):
self.allowed_roles = kwargs.get(
@@ -54,10 +53,7 @@ class RoleChecker:
async def __call__(
self, token_data: Annotated[TokenData, Depends(Accessor.get_current_user)]
):
"""执行与 call 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: token_data (Annotated[TokenData, Depends(Accessor.get_current_user)]): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""对当前请求执行权限比较,权限不足抛 403,否则把 ``TokenData`` 透传给路由。"""
user_authority = await get_authority(token_data.user_id)
if user_authority < self.allowed_roles:
raise HTTPException(
+11 -20
View File
@@ -14,75 +14,66 @@
class RetryableError(Exception):
"""基类:所有可重试错误(如网络断开、抖动等临时性故障)"""
"""基类:所有可重试错误(如网络断开、抖动等临时性故障)"""
pass
class NonRetryableError(Exception):
"""基类:所有不可重试错误(如数据验证失败、类型错误等业务逻辑故障)"""
"""基类:所有不可重试错误(如数据验证失败、类型错误等业务逻辑故障)"""
pass
class DemandError(NonRetryableError):
"""DemandError 核心组件类。
这是一个自定义异常类,专门用于在 Demand 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""需求/任务参数不合法或不满足前置条件时抛出。"""
pass
class ModelNotExistError(Exception):
"""ModelNotExistError 核心组件类。
这是一个自定义异常类,专门用于在 ModelNotExist 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""请求了一个未在 Provider 中注册的模型 ID 时抛出。"""
pass
class UserError(Exception):
"""UserError 核心组件类。
这是一个自定义异常类,专门用于在 User 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""用户相关错误的基类,HTTP 层会被统一映射为 4xx。"""
pass
class UserNotExistError(UserError):
"""UserNotExistError 核心组件类。
这是一个自定义异常类,专门用于在 UserNotExist 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""按用户名/ID 查询时用户不存在。"""
pass
class UserPasswordError(UserError):
"""UserPasswordError 核心组件类。
这是一个自定义异常类,专门用于在 UserPassword 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""口令校验失败(旧密码错误、登录密码错误等)。"""
pass
class ProviderError(Exception):
"""ProviderError 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""模型 Provider 相关错误的基类。"""
pass
class ProviderNotExistError(ProviderError):
"""ProviderNotExistError 核心组件类。
这是一个模型/服务提供商适配器类,屏蔽了外部不同供应商(如 OpenAI、Anthropic 等)的底层 API 差异。它负责标准化参数组装、网络请求发送、鉴权处理以及响应结构的反序列化。"""
"""请求了一个未注册的 Provider 时抛出。"""
pass
class WorkflowError(Exception):
"""WorkflowError 核心组件类。
这是一个自定义异常类,专门用于在 Workflow 相关业务流程中触发中断。它携带了精确的错误上下文与追溯代码,帮助最外层网关能够统一捕获并返回友好的前端错误提示。"""
"""工作流执行期错误的基类,HTTP 层会被统一映射为 5xx。"""
pass
class WorkflowExit(WorkflowError):
"""WorkflowExit 核心组件类。
这是一个领域数据模型或功能封装类,承载了 WorkflowExit 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""工作流被显式终止(用户取消、上游决策跳出等)时抛出,是预期内的退出信号。"""
pass
+7 -12
View File
@@ -24,10 +24,11 @@ _tool_cache: Dict[str, Callable] = {}
def _get_tool_func(tool_name: str) -> Callable | None:
"""检索并获取特定的 tool func 数据集合或实例对象
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: tool_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (Callable | None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按名字从 ``kilostar/plugin/tool_plugin/<tool_name>/__init__.py`` 中加载工具函数
加载成功后会被缓存到模块级 ``_tool_cache``;找不到目录、找不到同名函数或
导入失败都会记录日志并返回 ``None``。
"""
func = _tool_cache.get(tool_name, None)
if func:
return func
@@ -72,19 +73,13 @@ def _get_tool_func(tool_name: str) -> Callable | None:
def del_tool_cache(tool_name: str) -> None:
"""执行与 del tool cache 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: tool_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""从内存缓存中移除某个工具,下次调用 ``load_tools_from_list`` 会重新从磁盘加载。"""
if tool_name in _tool_cache:
del _tool_cache[tool_name]
def load_tools_from_list(tool_names: List[str] | None) -> List[Callable]:
"""执行与 load tools from list 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: tool_names (List[str] | None): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。
Returns: (List[Callable]): 经过筛选、排序或分页处理后的实体对象列表集合。"""
"""批量加载工具:传入工具名列表,返回成功加载到的函数对象列表(失败项被跳过)。"""
if not tool_names:
return []
+2 -11
View File
@@ -18,17 +18,11 @@ from loguru._logger import Logger
def setup_logger() -> Logger:
"""对现有的 setup logger 进行状态更新或属性覆盖。
基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。
Returns: (Logger): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""初始化全局 loguru logger,输出格式为 ``actor:(...) | trace_id:(...) : message``。"""
logger.remove()
def format_record(record):
# Format string for rich handler
"""执行与 format record 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: record: 参与 format record 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
actor = record["extra"].get("actor_name", "System")
trace_id = record["extra"].get("trace_id", "")
@@ -57,8 +51,5 @@ global_logger = setup_logger()
def get_logger(actor_name: str, trace_id: str = "") -> Logger:
"""检索并获取特定的 logger 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: actor_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 trace_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 trace 实例。
Returns: (Logger): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""获取一个绑定了 actor_name 与可选 trace_id 的 logger,便于日志按 Actor/请求归类。"""
return global_logger.bind(actor_name=actor_name, trace_id=trace_id)
-3
View File
@@ -31,9 +31,6 @@ def pickle(cls: T) -> T:
def __reduce__(self):
# 1. 序列化:触发 Pydantic-core (Rust) 的极速序列化
"""执行与 reduce 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
data = self.model_dump_json()
# 2. 反序列化:告诉 Pickle 重建时调用 cls.model_validate_json
return cls.model_validate_json, (data,)
+6 -15
View File
@@ -16,31 +16,20 @@ from functools import lru_cache
class ActorList:
"""ActorList 核心组件类。
这是一个领域数据模型或功能封装类,承载了 ActorList 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
"""属性式访问的简易容器,用 ``a.actor_name`` 取代 ``d["actor_name"]``。"""
def __init__(self):
super().__setattr__("dict", {})
def __setattr__(self, key, value):
"""对现有的 setattr 进行状态更新或属性覆盖。
基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。
Args: key: 参与 setattr 逻辑运算或数据构建的上下文依赖对象。 value: 参与 setattr 逻辑运算或数据构建的上下文依赖对象。"""
self.dict[key] = value
def __getattr__(self, key):
"""检索并获取特定的 getattr 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: key: 参与 getattr 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
if key in self.dict:
return self.dict[key]
raise AttributeError(f"ActorList 对象没有属性 '{key}'")
def __delattr__(self, key):
"""执行与 delattr 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: key: 参与 delattr 逻辑运算或数据构建的上下文依赖对象。"""
if key in self.dict:
del self.dict[key]
else:
@@ -59,9 +48,11 @@ def clear_actor_cache():
def ray_actor_hook(*actor_names: str):
"""执行与 ray actor hook 相关的核心业务流转操作
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""按名字批量取出 Ray Actor 句柄,组装成一个 ``ActorList`` 返回
例:``actors = ray_actor_hook("postgres_database", "global_state_machine")``
随后即可用 ``actors.postgres_database`` 拿到对应句柄。
"""
actor_list = ActorList()
for actor_name in actor_names:
handle = _get_cached_actor_handle(actor_name)
+8 -14
View File
@@ -19,23 +19,20 @@ from kilostar.utils.error import RetryableError
def retry_on_retryable_error(max_retries=3, base_delay=1):
"""执行与 retry on retryable error 相关的核心业务流转操作
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: max_retries: 参与 retry on retryable error 逻辑运算或数据构建的上下文依赖对象。 base_delay: 参与 retry on retryable error 逻辑运算或数据构建的上下文依赖对象
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""指数退避重试装饰器:仅在抛出 ``RetryableError`` 时重试
同步/异步函数都支持。第 n 次重试前会 ``sleep(base_delay * 2**n)``
Args:
max_retries: 最多尝试次数(含首次),超过后会把最后一次异常重新抛出。
base_delay: 退避基准秒数。
"""
def decorator(func):
"""执行与 decorator 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: func: 参与 decorator 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
if asyncio.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
"""执行与 async wrapper 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
@@ -49,9 +46,6 @@ def retry_on_retryable_error(max_retries=3, base_delay=1):
@wraps(func)
def sync_wrapper(*args, **kwargs):
"""执行与 sync wrapper 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
import time
for attempt in range(max_retries):
+4 -12
View File
@@ -45,8 +45,7 @@ class WorkerCluster:
self.logger = get_logger("worker_cluster")
async def start(self):
"""执行与 start 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。"""
"""启动 runner 协程池并初始化任务队列。"""
if self.task_queue is None:
self.task_queue = Queue()
self.runners = [
@@ -84,9 +83,7 @@ class WorkerCluster:
return worker
async def _runner(self, runner_id: int):
"""执行与 runner 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: runner_id (int): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 runner 实例。"""
"""单个 runner 协程:从任务队列取任务,按 agent_id 唤醒 Worker 执行,结果写回 future。"""
while True:
try:
if self.task_queue is None:
@@ -130,10 +127,7 @@ class WorkerCluster:
await asyncio.sleep(1)
async def submit_task(self, task_id: str, agent_id: str, task_event: dict):
"""执行与 submit task 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: task_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 task 实例。 agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 task_event (dict): 由事件总线或工作流引擎分发过来的事件载荷,封装了触发此次调用的上下文快照与任务目标指令。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""提交一个任务到队列,挂起等待 runner 处理完成后返回响应字典。"""
if not self.runners:
await self.start()
@@ -151,9 +145,7 @@ class WorkerCluster:
self.results_futures.pop(task_id, None)
def get_cluster_metrics(self):
"""检索并获取特定的 cluster metrics 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""返回当前内存池中 Worker 数量、容量、缓存的 agent_id 列表与队列长度等指标。"""
return {
"active_worker_count": len(self._active_workers),
"max_capacity": self.max_capacity,
+16 -20
View File
@@ -16,7 +16,7 @@ from pydantic_ai import Agent, RunContext
from pydantic import Field
from kilostar.adapter.model_adapter.agent_factory import AgentFactory
from kilostar.core.global_state_machine.model_provider.base_provider import Provider
from kilostar.utils.agent_model import ResponseModel, InputModel, DepsModel
from kilostar.utils.agent_model import ResponseModel, RequestModel, DepsModel
from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.logger import get_logger
@@ -25,22 +25,19 @@ logger = get_logger("worker_individual")
class WorkerIndividualResponse(ResponseModel):
"""WorkerIndividualResponse 核心组件类。
这是一个具体的 Worker 智能体实体类,代表着具备特定人设、领域技能或长文本处理能力的数字员工。它可以被控制器动态拉起,并在安全沙箱内执行复杂的工作流指令与多步骤推理任务。"""
"""Worker Individual 的输出模型,承载一次任务执行后的结果文本。"""
output: str = Field(..., description="Worker执行任务的输出结果")
class WorkerIndividualDeps(DepsModel):
"""WorkerIndividualDeps 核心组件类。
这是一个具体的 Worker 智能体实体类,代表着具备特定人设、领域技能或长文本处理能力的数字员工。它可以被控制器动态拉起,并在安全沙箱内执行复杂的工作流指令与多步骤推理任务。"""
"""Worker Individual 的运行期依赖,注入到 pydantic-ai Agent 的 RunContext。"""
task_event: dict
class WorkerIndividualInput(InputModel):
"""WorkerIndividualInput 核心组件类。
这是一个具体的 Worker 智能体实体类,代表着具备特定人设、领域技能或长文本处理能力的数字员工。它可以被控制器动态拉起,并在安全沙箱内执行复杂的工作流指令与多步骤推理任务。"""
class WorkerIndividualInput(RequestModel):
"""Worker Individual 的输入模型,承载一次任务事件的入参。"""
task_event: dict
@@ -56,10 +53,15 @@ class BaseIndividual:
self.agent: Agent | None = None
async def _init_agent(self, agent_name: str, system_prompt: str):
"""完成 agent 模块的启动与依赖初始化
在系统引导或服务拉起阶段被调用,负责建立网络连接、分配基础内存资源及注册核心服务组件。
Args: agent_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 system_prompt (str): 控制逻辑流向的具体字符串参数,指定了期望的 system prompt 内容。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""根据 agent_config 拉起一个 pydantic-ai Agent 实例
从 GlobalStateMachine 取出 Provider,按 agent_config 中的 provider_title
和 model_id 选择模型,加载工具列表,并把 system_prompt 注册为动态提示词。
Args:
agent_name: Agent 的人类可读名称,用于日志与展示。
system_prompt: 该 Agent 的基础系统提示词,会和 task_event 拼接成动态提示词。
"""
from kilostar.utils.get_tool import load_tools_from_list
global_state_machine = ray_actor_hook(
@@ -90,17 +92,11 @@ class BaseIndividual:
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[WorkerIndividualDeps]):
"""执行与 dynamic prompt 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: ctx (RunContext[WorkerIndividualDeps]): 参与 dynamic prompt 逻辑运算或数据构建的上下文依赖对象。
Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
"""把基础 system_prompt 与本次 task_event 拼接成最终动态提示词。"""
prompt = system_prompt + "\n\n"
prompt += f"=== 当前任务上下文 ===\n{ctx.deps.task_event}\n"
return prompt
async def run(self, task_event: dict) -> dict:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: task_event (dict): 由事件总线或工作流引擎分发过来的事件载荷,封装了触发此次调用的上下文快照与任务目标指令。
Returns: (dict): 高度聚合的字典结构数据,将多维度的属性特征或统计指标组合后一并返回。"""
"""执行一次任务,需要由子类按自身策略实现。"""
raise NotImplementedError("子类必须实现 run 方法")
@@ -30,10 +30,7 @@ class OrdinaryIndividual(BaseIndividual):
super().__init__(agent_config)
async def run(self, task_event: dict) -> dict:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: task_event (dict): 由事件总线或工作流引擎分发过来的事件载荷,封装了触发此次调用的上下文快照与任务目标指令。
Returns: (dict): 高度聚合的字典结构数据,将多维度的属性特征或统计指标组合后一并返回。"""
"""执行一次普通任务:首次调用时懒初始化 Agent,再用 ``WorkerIndividualDeps`` 跑出结果。"""
if self.agent is None:
system_prompt = self.agent_config.get(
"prompt", "你是一个普通的AI助手,请尽力完成给定的任务。"
@@ -104,10 +104,7 @@ class SkillIndividual(BaseIndividual):
return tools
async def run(self, task_event: dict) -> dict:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: task_event (dict): 由事件总线或工作流引擎分发过来的事件载荷,封装了触发此次调用的上下文快照与任务目标指令。
Returns: (dict): 高度聚合的字典结构数据,将多维度的属性特征或统计指标组合后一并返回。"""
"""执行一次专家任务:先按 ``bound_skill`` 动态加载工具,再驱动 Agent 运行。"""
if self.agent is None:
system_prompt = self.agent_config.get(
"prompt",
@@ -30,10 +30,7 @@ class SpecialIndividual(BaseIndividual):
super().__init__(agent_config)
async def run(self, task_event: dict) -> dict:
"""执行与 run 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: task_event (dict): 由事件总线或工作流引擎分发过来的事件载荷,封装了触发此次调用的上下文快照与任务目标指令。
Returns: (dict): 高度聚合的字典结构数据,将多维度的属性特征或统计指标组合后一并返回。"""
"""执行一次特殊任务(语音/视频等特殊产物):懒初始化 Agent 后跑出结果。"""
if self.agent is None:
system_prompt = self.agent_config.get(
"prompt", "你是一个特殊的AI助手,负责处理特殊类型的任务。"