wip:优化individual,加了基本的启动页面

This commit is contained in:
朝夕 2026-04-10 23:17:06 +08:00
parent 2e5b2bce3a
commit dc857cbff7
20 changed files with 1950 additions and 258 deletions

2
Makefile Normal file
View File

@ -0,0 +1,2 @@
run:
uv run main.py

1
config/config.yml Normal file
View File

@ -0,0 +1 @@
version: v0.1

View File

@ -0,0 +1,7 @@
from pretor.utils.banner import print_banner
def main():
print_banner()
if __name__ == '__main__':
main()

View File

@ -1,49 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.providers.google import GoogleProvider
from pydantic_ai.providers.anthropic import AnthropicProvider
from pathlib import Path
class AgentFactory:
def __init__(self):
self._models_mapping = {"openai": (OpenAIChatModel, OpenAIProvider), "gemini": (GoogleModel, GoogleProvider), "claude": (AnthropicModel, AnthropicProvider)}
def _load_agent_protocol(self):
pass
def create_model(self, protocol_name: str, api_key: str, url: str | None, model_id: str):
"""
创建agent的模型对象
Args:
protocol_name: 协议名称如openai,gemini等应当为_model_mapping的键
api_key: api调用令牌
url: api调用的url
model_id: 模型名
Returns:
应当返回一个可以作为Agent类model参数的对象
Raises:
当protocol_name在_models_mapping中不存在抛出ValueError错误
"""
if protocol_name not in self._models_mapping:
raise ValueError(f"不支持的协议类型: {protocol_name}")
model_class, provider_class = self._models_mapping[protocol_name]
return model_class(model_id, provider_class(api_key = api_key, url = url))

View File

@ -0,0 +1,62 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.providers.google import GoogleProvider
from pydantic_ai.providers.anthropic import AnthropicProvider
from pretor.core.global_state_machine.model_provider import Provider
from pretor.utils.agent_model import ResponseModel, DepsModel
from pretor.utils.error import ModelNotExistError
class AgentFactory:
def __init__(self):
self._models_mapping = {"openai": (OpenAIChatModel, OpenAIProvider), "gemini": (GoogleModel, GoogleProvider), "claude": (AnthropicModel, AnthropicProvider)}
def create_agent(self,
provider: Provider,
model_id: str,
output_type: ResponseModel,
system_prompt: str,
deps_type: DepsModel,
agent_name: str) -> Agent:
"""
create_agent方法将输入的provider对象实例化为一个pydantic-ai的agent对象
Args:
provider: Provider对象从global_state_machine中获取
model_id: 模型名
output_type: 输出格式
system_prompt: 系统提示词
deps_type: 依赖类型在agent运行时动态输入的格式化消息
agent_name: agent的名字
Returns:
返回被实例化的pydantic-ai的Agent对象
"""
if model_id not in provider.provider_models:
raise ModelNotExistError("模型不存在")
if provider.provider_type not in self._models_mapping:
raise ValueError(f"不支持的协议类型: {provider.provider_type}")
model_class, provider_class = self._models_mapping[provider.provider_type]
model = model_class(model_id, provider_class(api_key=provider.api_key, url=provider.url))
agent = Agent(model=model,
name=agent_name,
system_prompt=system_prompt,
output_type=output_type,
deps_type=deps_type)
return agent

View File

@ -1,54 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def create_agent(self, agent_name: str,
system_prompt: str,
provider_title: str,
model_id: str,
output_type: ResponseModel,
deps_type: DepsModel) -> Agent:
"""
create_agent方法将保存的适配器转化为agent对象并返回
Args:
agent_name: agent名字代表实例化个体起的名字
system_prompt: 系统提示词给llm的系统提示词
provider_title: 供应商名称
model_id: 模型Id实例化agent所输入的model_id
output_type: 输出格式实例化agent后对应llm所应当输出的格式
deps_type: 依赖格式输入llm的格式
Returns:
一个pydanticAI的Agent对象包含对应的apikey,url,model_id等信息应当挂载到individual类的agent属性下
Raises:
ProviderNotExistError 当在provider_register属性里找不到供应商的自定义名称时抛出
ModelNotExistError 在获取的provider的模型列表中找不到输入的model_id抛出
"""
if provider_title not in self.provider_register:
raise ProviderNotExistError("提供商不存在")
provider = self.provider_register[provider_title]
if model_id not in provider.provider_models:
raise ModelNotExistError("模型不存在")
model = self._agent_factory.create_model(provider.provider_type,
provider.provider_apikey,
provider.provider_url,
model_id)
agent = Agent(model=model,
name=agent_name,
system_prompt=system_prompt,
output_type=output_type,
deps_type=deps_type)
return agent

View File

@ -21,10 +21,8 @@ from typing import Dict, Literal
from pretor.core.database.postgres import PostgresDatabase
from pretor.api.platform.event import PretorEvent
import asyncio
from pretor.core.workflow.workflow import PretorWorkflow
@ray.remote
class GlobalStateMachine:
def __init__(self, postgres_database: PostgresDatabase):

View File

@ -16,6 +16,7 @@ from abc import ABC, abstractmethod
from pydantic import BaseModel
from typing import List
from enum import Enum
class ProviderStatus(Enum, str):
UP = "up"
DOWN = "down"

View File

@ -13,42 +13,124 @@
# limitations under the License.
import ray
from pydantic_ai import Agent
from pretor.core.workflow.workflow import PretorWorkflow, WorkStep, WorkerGroup
from typing import Union, overload
from pretor.core.individual.consciousness_node.template import (ConsciousnessNodeDeps, ForSupervisoryNode, ForWorkflow,\
ForWorkflowEngine, ForWorkflowInput, ForSupervisoryInput, ForWorkflowEngineInput)
from pydantic_ai import Agent, RunContext
from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.adapter.model_adapter.agent_factory import AgentFactory
@ray.remote
class ConsciousnessNode:
def __init__(self, agent: Agent):
self.agent = agent
def __init__(self) -> None:
self.agent: None | Agent = None
async def generate_workflow(self, template: dict, task_description: str) -> PretorWorkflow:
prompt = f"Given the template {template} and task '{task_description}', generate a list of actionable steps."
# Simulated parsing logic: in a real implementation we would parse structured output from the agent
# response = await self.agent.run(prompt)
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[ConsciousnessNodeDeps]):
return f"Context: original_command: {ctx.deps.original_command}, workflow_template: {ctx.deps.workflow_template}"
wg = WorkerGroup(
name="default_squad",
primary_individual={"coder": 1},
composite_individual={}
)
def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
"""
create_agent方法将agent对象装配到ConsciousnessNode的属性内
该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的
Agent实例
并挂载到self.agent属性
Args:
global_state_machine: 全局状态机
provider_title: 供应商名
model_id: 模型id
steps = [
WorkStep(step=1, node="consciousness_node", action="analyze", desc="Analyze task details: " + task_description),
WorkStep(step=2, node="control_node", action="execute", desc="Execute default execution logic", input=["1"])
]
Returns:
无返回
"""
system_prompt: str = ""
output_type = Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]
provider: Provider = global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory()
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=ConsciousnessNodeDeps,
agent_name="consciousness_node")
return PretorWorkflow(
title=f"Workflow for {task_description[:10]}",
workgroup_list=[wg],
work_link=steps
)
async def running(self, payload: Union[ForWorkflowEngineInput, ForWorkflowInput, ForSupervisoryInput]) -> str:
result: Union[ForWorkflowEngine, ForWorkflow, ForSupervisoryNode] = await self._run(payload)
if isinstance(result, ForWorkflowEngine):
return result
async def check_task(self, task_status: dict) -> bool:
if task_status.get("status") == "completed":
return True
return False
elif isinstance(result, ForWorkflow):
return result
async def process_complex_transaction(self, transaction_data: dict) -> dict:
prompt = f"Process the following complex transaction data and extract key entities: {transaction_data}"
result = await self.agent.run(prompt)
return {"processed": True, "analysis": result.data}
elif isinstance(result, ForSupervisoryInput):
return result
else:
return None
@overload
async def _run(self, payload: ForWorkflowEngineInput) -> ForWorkflowEngine:
"""
_run方法
该分支应当在supervisory_node简单处理用户命令后工作流创建前调用
Args:
payload: 应当包含workflow_template和event对象
Returns:
ForWorkflowEngine对象将被放到全局状态机后丢入WorkflowEngine的异步队列
"""
pass
@overload
async def _run(self, payload: ForWorkflow) -> ForWorkflow:
"""
_run方法
该分支应当在workflow运行时由WorkflowEngine进行调用
Args:
payload: 应当包含workflow中的WorkStep对象
Returns:
ForWorkflow对象作为ConsciousnessNode执行Workflow中的WorkStep的结果
"""
pass
@overload
async def _run(self, payload: ForSupervisoryInput) -> ForSupervisoryNode:
"""
_run方法
该分支应当在workflow运行完全结束后由WorkflowEngine进行调用
Args:
payload: 应当包含整个Workflow的情况
Returns:
ForSupervisory对象作为ConsciousnessNode对于全工作流的技术性总结返回给SupervisoryNode
"""
pass
async def _run(self, payload: Union[ForSupervisoryInput, ForWorkflowInput, ForWorkflowEngineInput])\
-> Union[ForSupervisoryNode, ForWorkflow, ForWorkflowEngine]:
if isinstance(payload, ForWorkflowEngineInput):
deps = ConsciousnessNodeDeps(original_command=payload.original_command,
workflow_template=payload.workflow_template,
command="拆解原始命令变成一个工作流")
result = await self.agent.run(f"根据original_command制定workflow可以学习workflow_template",
deps=deps,
output_type=ForWorkflowEngine,)
return result.output
elif isinstance(payload, ForWorkflowInput):
deps = ConsciousnessNodeDeps(original_command=payload.original_command,
command="完成workflowstep的任务")
result = await self.agent.run(payload.workflow_step.model_dump_json(),
deps=deps,
output_type=ForWorkflow)
return result.output
elif isinstance(payload, ForSupervisoryInput):
deps = ConsciousnessNodeDeps(original_command=payload.original_command,
command="对于结果进行检查,并且生成一份技术性的总结报告")
result = await self.agent.run(payload.workflow.model_dump_json(),
deps=deps,
output_type=ForSupervisoryNode)
return result.output
return None

View File

@ -1,34 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic import Field
from pretor.core.workflow.workflow import PretorWorkflow
from pretor.utils.agent_model import ResponseModel, DepsModel
class ConsciousnessNodeResponse(ResponseModel):
pass
class ForControlNode(ConsciousnessNodeResponse):
class ForSystem(ConsciousnessNodeResponse):
workflow: PretorWorkflow
class ForSupervisoryNode(ConsciousnessNodeResponse):
pass
class SupervisoryNodeDeps(DepsModel):
platform: str
user_name: str
time: str

View File

@ -0,0 +1,64 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic import BaseModel
from pretor.core.workflow.workflow import PretorWorkflow, WorkStep
from pretor.utils.agent_model import ResponseModel, DepsModel
#意识节点回复类
class ConsciousnessNodeResponse(ResponseModel):
"""Consciousness response model,是意识节点所有回复类型的父类"""
pass
class ForWorkflowEngine(ConsciousnessNodeResponse):
"""生成workflow并放入WorkflowEngine"""
workflow: PretorWorkflow
class ForWorkflow(ConsciousnessNodeResponse):
"""处理workflow中需要ConsciousnessNode的工作"""
output: str
class ForSupervisoryNode(ConsciousnessNodeResponse):
"""工作流完成后进行校验并返回给SupervisoryNode"""
output: str
class ConsciousnessNodeDeps(DepsModel):
original_command: str
workflow_template: str | None = None
command: str
class ConsciousnessNodeInput(BaseModel):
pass
class ForWorkflowEngineInput(ConsciousnessNodeInput):
workflow_template: str
original_command: str
class ForWorkflowInput(ConsciousnessNodeInput):
workflow_step: WorkStep
original_command: str
class ForSupervisoryInput(ConsciousnessNodeInput):
workflow: PretorWorkflow
original_command: str

View File

@ -14,39 +14,111 @@
import datetime
import ray
from typing import Union
from typing import Union, overload
from pretor.api.platform.event import PretorEvent
from pretor.core.individual.supervisory_node.response import ForConsciousnessNode, ForUser, SupervisoryNodeDeps
from pydantic_ai import RunContext
from pretor.adapter.model_adapter.agent_factory import AgentFactory
from pretor.core.global_state_machine.global_state_machine import GlobalStateMachine
from pretor.core.global_state_machine.model_provider import Provider
from pretor.core.individual.supervisory_node.template import ForConsciousnessNode, ForUser, SupervisoryNodeDeps, TerminationMessage
from pydantic_ai import RunContext, Agent
@ray.remote
class SupervisoryNode:
def __init__(self, provider_manager: ProviderManager, provider_title: str, model_id: str) -> None:
def __init__(self) -> None:
self.agent: None | Agent = None
async def create_agent(self, global_state_machine: GlobalStateMachine, provider_title: str, model_id: str) -> None:
"""
create_agent方法将agent对象装配到SupervisoryNode的属性内
该方法通过provider_title从global_state_machine中获取provider对象然后从provider对象中取出供应商形象装配为pydantic_ai的Agent实例
并挂载到self.agent属性
Args:
global_state_machine: 全局状态机
provider_title: 供应商名
model_id: 模型id
Returns:
无返回
"""
system_prompt: str = ""
output_type = Union[ForConsciousnessNode, ForUser]
self.agent = provider_manager.create_agent(agent_name="supervisory",
system_prompt=system_prompt,
provider_title=provider_title,
model_id=model_id,
output_type=output_type,
deps_type=SupervisoryNodeDeps)
provider: Provider = await global_state_machine.get_provider.remote(provider_title)
agent_factory = AgentFactory()
self.agent = agent_factory.create_agent(provider=provider,
model_id=model_id,
output_type=output_type,
system_prompt=system_prompt,
deps_type=SupervisoryNodeDeps,
agent_name="supervisory_node")
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[SupervisoryNodeDeps]):
return f"Context: Platform={ctx.deps.platform}, User={ctx.deps.user_name}, Time={ctx.deps.time}"
async def working(self, event: PretorEvent):
deps = SupervisoryNodeDeps(platform=event.platform,
user_name=event.user_name,
time=datetime.datetime.now())
result = await self.agent.run(event.message,deps=deps)
if isinstance(result.data, ForConsciousnessNode):
###工作函数
async def working(self, payload: Union[PretorEvent, TerminationMessage]) -> str:
"""
working方法是节点唯一的调用方法对于_run函数的结果进行判断并实现最终回复
Args:
payload: 消息载荷包含所有信息
Returns:
str,监控节点对于用户的回复
"""
result = await self._run(payload)
if isinstance(result, ForConsciousnessNode):
return "任务已创建"
elif isinstance(result.date, ForUser):
return result.data.content
elif isinstance(result, ForUser):
return result.content
else:
return "未知响应类型"
async def router(self, ):
@overload
async def _run(self, payload: PretorEvent) -> Union[ForConsciousnessNode, ForUser]:
"""
_run方法
Args:
payload: PretorEvent的实例是用户输入时对于消息的封装
Returns:
ForUser对象监控节点对于用户进行的简单回答
ForConsciousnessNode对象监控节点将用户的请求判断为复杂任务将PretorEvent传递给意识节点并且给选择好的工作流模板
"""
pass
@overload
async def _run(self, payload: TerminationMessage) -> ForUser:
"""
_run方法
Args:
payload: Termination的实例是工作流结束后到达监控节点的最后结果
Returns:
ForUser对象工作流结束后给用户的返回
"""
pass
async def _run(self, payload: Union[PretorEvent, TerminationMessage]) -> Union[ForConsciousnessNode, ForUser]:
"""
_run方法将payload转化为对llm发送的消息并发送
Args:
payload: 消息载荷
Returns:
ForConsciousnessNode对象对意识节点发送的消息
ForUser对象对用户发送到消息
"""
if isinstance(payload, PretorEvent):
deps = SupervisoryNodeDeps(platform=payload.platform,
user_name=payload.user_name,
time=datetime.datetime.now())
result = await self.agent.run(payload.message, deps=deps)
else:
deps = SupervisoryNodeDeps(platform=payload.platform,
user_name=payload.user_name,
time=datetime.datetime.now())
result = await self.agent.run(payload.message, deps=deps)
return result.output

View File

@ -14,6 +14,7 @@
from pydantic import Field
from pretor.utils.agent_model import ResponseModel, DepsModel
from pydantic import BaseModel
class SupervisoryNodeResponse(ResponseModel):
pass
@ -24,6 +25,11 @@ class ForUser(SupervisoryNodeResponse):
class ForConsciousnessNode(SupervisoryNodeResponse):
workflow_template: str = Field(..., description="选择的工作流模板应当为对应模板的name字段")
class TerminationMessage(BaseModel):
platform: str
user_name: str
message: str
class SupervisoryNodeDeps(DepsModel):
platform: str
user_name: str

View File

@ -1,38 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, Union, Dict, Any, Literal
from pydantic import BaseModel, Field
# --- 1. 给 Individual (LLM/Agent) 的具体需求 ---
class IndividualDemand(BaseModel):
role_prompt: str = Field(..., description="赋予该个体的角色定义")
task_goal: str = Field(..., description="该个体的具体执行目标")
expected_output: str = Field(..., description="期望产出的数据结构或格式描述")
# --- 2. 给 Tool (插件/函数调用) 的具体需求 ---
class ToolDemand(BaseModel):
method: str = Field(..., description="插件调用的具体方法名")
args: Dict[str, Any] = Field(default_factory=dict, description="传递给插件的参数")
# --- 3. 给 System (系统/物理资源) 的具体需求 ---
class SystemDemand(BaseModel):
operation: Literal["allocate_resource", "docker_manage", "file_io", "network"]
params: Dict[str, Any] = Field(..., description="操作所需的物理参数,如 GPU 核心数、路径等")
# --- 4. 统一需求入口 (裁判官协议体) ---
class DemandProtocol(BaseModel):
variety: Literal["individual", "tool", "system"]
name: str = Field(..., description="目标名称python_expert, pytest_tool, docker_engine")
content: Union[IndividualDemand, ToolDemand, SystemDemand] = Field(..., description="需求的具体参数细节")

View File

@ -1,26 +0,0 @@
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC,abstractmethod
from pretor.core.workflow.workflow import PretorWorkflow
class RunnableObject(ABC):
@abstractmethod
def __init__(self, **kwargs):
pass
@abstractmethod
async def run(self, workflow: PretorWorkflow) -> None:
pass

25
pretor/utils/banner.py Normal file
View File

@ -0,0 +1,25 @@
from rich.console import Console
from rich.text import Text
import yaml
def print_banner() -> None:
with open("config/config.yml","r") as config:
config = yaml.load(config, Loader=yaml.FullLoader)
version = config.get("version", "unknown")
pretor_banner = """
"""
console = Console()
banner_colored = Text(pretor_banner, style="gold3 bold")
console.print(banner_colored)
console.print("=" * 40, style="dim") # dim=灰色,低调
console.print("🚀 Multi-Agent Orchestration Platform", style="blue")
console.print(f"📦 Version: {version}", style="green")
console.print(f"👤 Author: zhaoxi826", style="yellow")
console.print(f"📜 License: Apache 2.0", style="magenta")
console.print(f"🐙 github: https://github.com/zhaoxi826/pretor", style="yellow")
console.print("=" * 40, style="dim")

View File

@ -19,16 +19,19 @@ T = TypeVar("T", bound=Type[BaseModel])
def pickle(cls: T) -> T:
"""
这是一个类装饰器用来接管 Pydantic 对象的 Pickle 序列化流程
它强迫 Pickle 使用 Pydantic 经过 Rust 优化的 JSON 导出/导入逻辑
类装饰器pickle
通过装饰继承了BaseModel的类用pydantic的高效序列化替代python原生__reduce__魔术方法,实现ray在通讯时的高效序列化
Args:
cls: 继承了BaseModel类的类需要被装饰的对象
Returns:
返回被重写了__reduce__魔术方法的cls类
"""
def __reduce__(self):
# 1. 序列化:触发 Pydantic-core (Rust) 的极速序列化
data = self.model_dump_json()
# 2. 反序列化:告诉 Pickle 重建时调用 cls.model_validate_json
return cls.model_validate_json, (data,)
# 动态把这个魔术方法“缝”到类上
cls.__reduce__ = __reduce__
return cls

View File

@ -13,9 +13,12 @@ dependencies = [
"loguru>=0.7.3",
"passlib[bcrypt]>=1.7.4",
"pydantic-ai>=1.73.0",
"pyfiglet>=1.0.4",
"pytest>=9.0.3",
"python-ulid>=3.1.0",
"ray[default,serve]>=2.54.0",
"rich>=14.3.3",
"sqlmodel>=0.0.37",
"types-docutils==0.22.3.20260408",
"vllm>=0.11.0",
]

1567
uv.lock

File diff suppressed because it is too large Load Diff