Pretor/pretor/worker_individual/worker_individual.py

158 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic_ai import Agent, RunContext
from pydantic import Field
from pretor.adapter.model_adapter.agent_factory import AgentFactory
from pretor.core.global_state_machine.model_provider.base_provider import Provider
from pretor.utils.agent_model import ResponseModel, InputModel, DepsModel
from pretor.utils.ray_hook import ray_actor_hook
from pretor.utils.logger import get_logger
logger = get_logger('worker_individual')
class WorkerIndividualResponse(ResponseModel):
output: str = Field(..., description="Worker执行任务的输出结果")
class WorkerIndividualDeps(DepsModel):
task_event: dict
class WorkerIndividualInput(InputModel):
task_event: dict
class BaseIndividual:
"""
Worker Individual 的基类
"""
def __init__(self, agent_config: dict):
self.agent_config = agent_config
self.agent_id = agent_config.get("agent_id")
self.agent: Agent | None = None
async def _init_agent(self, agent_name: str, system_prompt: str):
global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
provider_title = self.agent_config.get("provider_title", "openai") # default fallback
model_id = self.agent_config.get("model_id", "gpt-4o") # default fallback
provider: Provider = await global_state_machine.get_provider.remote( provider_title)
agent_factory = AgentFactory()
self.agent = agent_factory.create_agent(
provider=provider,
model_id=model_id,
output_type=WorkerIndividualResponse,
system_prompt=system_prompt,
deps_type=WorkerIndividualDeps,
agent_name=agent_name
)
@self.agent.system_prompt
async def dynamic_prompt(ctx: RunContext[WorkerIndividualDeps]):
prompt = system_prompt + "\n\n"
prompt += (
f"=== 当前任务上下文 ===\n"
f"{ctx.deps.task_event}\n"
)
return prompt
async def run(self, task_event: dict) -> dict:
raise NotImplementedError("子类必须实现 run 方法")
class SkillIndividual(BaseIndividual):
"""
专家子个体:拥有专业 skill 的 agent。
"""
def __init__(self, agent_config: dict):
super().__init__(agent_config)
async def run(self, task_event: dict) -> dict:
if self.agent is None:
system_prompt = self.agent_config.get("prompt",
"你是一个拥有专业技能的专家级AI助手请利用你的专业知识完成给定的任务。")
await self._init_agent("skill_individual", system_prompt)
deps = WorkerIndividualDeps(task_event=task_event)
self.agent.retries = 3
# In actual usage, tools could be dynamically loaded here based on agent_config
# tool = get_tool("skill_individual")
try:
result = await self.agent.run(
f"请执行以下任务:\n{task_event}",
deps=deps
# tools=tool
)
return {"output": result.data.output}
except Exception as e:
logger.exception(f"SkillIndividual {self.agent_id} 执行失败: {e}")
raise
class OrdinaryIndividual(BaseIndividual):
"""
普通子个体:普通的 agent。
"""
def __init__(self, agent_config: dict):
super().__init__(agent_config)
async def run(self, task_event: dict) -> dict:
if self.agent is None:
system_prompt = self.agent_config.get("prompt", "你是一个普通的AI助手请尽力完成给定的任务。")
await self._init_agent("ordinary_individual", system_prompt)
deps = WorkerIndividualDeps(task_event=task_event)
self.agent.retries = 3
try:
result = await self.agent.run(
f"请执行以下任务:\n{task_event}",
deps=deps
)
return {"output": result.data.output}
except Exception as e:
logger.exception(f"OrdinaryIndividual {self.agent_id} 执行失败: {e}")
raise
class SpecialIndividual(BaseIndividual):
"""
特殊子个体:执行特殊任务的 agent如生成语音、视频等。
"""
def __init__(self, agent_config: dict):
super().__init__(agent_config)
async def run(self, task_event: dict) -> dict:
if self.agent is None:
system_prompt = self.agent_config.get("prompt", "你是一个特殊的AI助手负责处理特殊类型的任务。")
await self._init_agent("special_individual", system_prompt)
deps = WorkerIndividualDeps(task_event=task_event)
self.agent.retries = 3
try:
result = await self.agent.run(
f"请执行以下任务:\n{task_event}",
deps=deps
)
return {"output": result.data.output}
except Exception as e:
logger.exception(f"SpecialIndividual {self.agent_id} 执行失败: {e}")
raise