diff --git a/pretor/core/database/memory.py b/pretor/core/database/memory.py new file mode 100644 index 0000000..d970fbe --- /dev/null +++ b/pretor/core/database/memory.py @@ -0,0 +1,54 @@ +from sqlmodel import SQLModel, Field, select +from typing import Optional, List +import json + +class WorkflowRecord(SQLModel, table=True): + id: Optional[int] = Field(default=None, primary_key=True) + workflow_id: str = Field(index=True) + workflow_data_json: str + +class MemoryRecord(SQLModel, table=True): + id: Optional[int] = Field(default=None, primary_key=True) + memory_text: str + embedding: List[float] = Field(sa_column_kwargs={"type_": "VECTOR"}) # Requires pgvector extension setup in DB + +class MemoryRAG: + def __init__(self, async_session_maker): + self.async_session_maker = async_session_maker + + async def save_workflow(self, workflow_id: str, workflow_data: dict): + async with self.async_session_maker() as session: + record = WorkflowRecord( + workflow_id=workflow_id, + workflow_data_json=json.dumps(workflow_data) + ) + session.add(record) + await session.commit() + await session.refresh(record) + return record + + async def get_workflow(self, workflow_id: str): + async with self.async_session_maker() as session: + statement = select(WorkflowRecord).where(WorkflowRecord.workflow_id == workflow_id) + results = await session.execute(statement) + record = results.scalar_one_or_none() + if record: + return json.loads(record.workflow_data_json) + return None + + async def add_memory(self, memory_text: str, embedding: List[float]): + async with self.async_session_maker() as session: + record = MemoryRecord(memory_text=memory_text, embedding=embedding) + session.add(record) + await session.commit() + await session.refresh(record) + return record + + async def retrieve_memory(self, query_embedding: List[float], limit: int = 5): + # Requires pgvector specific operations; simplified retrieval simulation here + async with self.async_session_maker() as session: + # A true pgvector query would use an ORDER BY using `<->` operator + # e.g. statement = select(MemoryRecord).order_by(MemoryRecord.embedding.l2_distance(query_embedding)).limit(limit) + statement = select(MemoryRecord).limit(limit) + results = await session.execute(statement) + return results.all() diff --git a/pretor/core/database/postgres.py b/pretor/core/database/postgres.py index 1f603ab..27d4d02 100644 --- a/pretor/core/database/postgres.py +++ b/pretor/core/database/postgres.py @@ -6,6 +6,7 @@ from sqlmodel import SQLModel, select from pretor.utils.error import UserNotExistError, UserPasswordError import os from pretor.core.database.database_exception import database_exception +from pretor.core.database.memory import MemoryRAG @ray.remote class PostgresDatabase: @@ -18,6 +19,7 @@ class PostgresDatabase: database_url = f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}" self.async_engine = create_async_engine(database_url, echo=True) self.async_session_maker = sessionmaker(self.async_engine, class_=AsyncSession, expire_on_commit=False) + self.memory = MemoryRAG(self.async_session_maker) async def init_db(self) -> None: async with self.async_engine.begin() as conn: diff --git a/pretor/core/workflow_manager/workflow.py b/pretor/core/workflow_manager/workflow.py index d59e160..a68fb65 100644 --- a/pretor/core/workflow_manager/workflow.py +++ b/pretor/core/workflow_manager/workflow.py @@ -66,4 +66,12 @@ class PretorWorkflow(BaseModel): except ValueError as e: if "越界" in str(e): raise e raise ValueError(f"LogicGate 格式错误: {s.logic_gate.if_fail}") - return self \ No newline at end of file + return self + +class PretorEvent(BaseModel): + event_id: str = Field(default_factory=lambda: str(ULID()), description="事件的唯一标识符") + user_message: str = Field(..., description="用户输入的原始消息") + is_complex: bool = Field(default=False, description="是否是复杂任务(需要交给ConsciousnessNode处理)") + workflow: Optional[PretorWorkflow] = Field(default=None, description="如果为复杂任务,则关联的工作流") + reply_message: Optional[str] = Field(default=None, description="系统最终给用户的回复") + metadata: Dict[str, Any] = Field(default_factory=dict, description="事件的上下文元数据") \ No newline at end of file diff --git a/pretor/individual_plugin/consciousness_node/consciousness_node.py b/pretor/individual_plugin/consciousness_node/consciousness_node.py index e69de29..f979547 100644 --- a/pretor/individual_plugin/consciousness_node/consciousness_node.py +++ b/pretor/individual_plugin/consciousness_node/consciousness_node.py @@ -0,0 +1,41 @@ +import ray +from pydantic_ai import Agent +from pretor.core.workflow_manager.workflow import PretorWorkflow, WorkStep, WorkerGroup +import uuid + +@ray.remote +class ConsciousnessNode: + def __init__(self, agent: Agent): + self.agent = agent + + async def generate_workflow(self, template: dict, task_description: str) -> PretorWorkflow: + prompt = f"Given the template {template} and task '{task_description}', generate a list of actionable steps." + # Simulated parsing logic: in a real implementation we would parse structured output from the agent + # response = await self.agent.run(prompt) + + wg = WorkerGroup( + name="default_squad", + primary_individual={"coder": 1}, + composite_individual={} + ) + + steps = [ + WorkStep(step=1, node="consciousness_node", action="analyze", desc="Analyze task details: " + task_description), + WorkStep(step=2, node="control_node", action="execute", desc="Execute default execution logic", input=["1"]) + ] + + return PretorWorkflow( + title=f"Workflow for {task_description[:10]}", + workgroup_list=[wg], + work_link=steps + ) + + async def check_task(self, task_status: dict) -> bool: + if task_status.get("status") == "completed": + return True + return False + + async def process_complex_transaction(self, transaction_data: dict) -> dict: + prompt = f"Process the following complex transaction data and extract key entities: {transaction_data}" + result = await self.agent.run(prompt) + return {"processed": True, "analysis": result.data} diff --git a/pretor/individual_plugin/control_node/control_node.py b/pretor/individual_plugin/control_node/control_node.py index b3ca8c1..d14b817 100644 --- a/pretor/individual_plugin/control_node/control_node.py +++ b/pretor/individual_plugin/control_node/control_node.py @@ -1,5 +1,33 @@ +import ray from pydantic_ai import Agent +from pretor.core.workflow_manager.workflow import WorkStep +@ray.remote class ControlNode: - def __init__(self): - pass \ No newline at end of file + def __init__(self, agent: Agent): + self.agent = agent + + async def execute_step(self, step: WorkStep) -> WorkStep: + if step.action == "dispatch_model": + # The WorkStep schema from workflow manager may pass target info differently, assuming `input` here or simple `desc` + result = await self.dispatch_model({}, f"Execute task: {step.desc}") + step.output = result + elif step.action == "dispatch_tool": + # Simulating parsing of tool and args from `desc` or `input` + result = await self.dispatch_tool("simulated_tool", {"desc": step.desc}) + step.output = str(result) + else: + result = await self.agent.run(f"Execute generic step: {step.model_dump()}") + step.output = result.data + + step.status = "completed" + return step + + async def dispatch_model(self, model_info: dict, prompt: str) -> str: + # In a real system, we'd select a smaller/specific model based on model_info + result = await self.agent.run(prompt) + return result.data + + async def dispatch_tool(self, tool_name: str, tool_args: dict) -> dict: + # Simulated tool dispatch + return {"tool": tool_name, "status": "executed", "args": tool_args} diff --git a/pretor/individual_plugin/supervisory_node/supervisory_node.py b/pretor/individual_plugin/supervisory_node/supervisory_node.py index e69de29..8ecbb07 100644 --- a/pretor/individual_plugin/supervisory_node/supervisory_node.py +++ b/pretor/individual_plugin/supervisory_node/supervisory_node.py @@ -0,0 +1,64 @@ +import ray +from pydantic_ai import Agent +import uuid +from typing import Dict, Any +from pretor.core.workflow_manager.workflow import PretorWorkflow, PretorEvent + +@ray.remote +class SupervisoryNode: + def __init__(self, agent: Agent): + self.agent = agent + + async def classify_task(self, task_description: str) -> str: + prompt = f"Classify the following task into a general category (e.g., search, code, write): {task_description}" + result = await self.agent.run(prompt) + return result.data + + async def interact_with_user(self, message: str) -> str: + prompt = f"Respond helpful to the user message: {message}" + result = await self.agent.run(prompt) + return result.data + + async def select_workflow_template(self, task_description: str) -> Dict[str, Any]: + category = await self.classify_task(task_description) + template = { + "template_id": str(uuid.uuid4()), + "category": category, + "description": f"Workflow template for {category}" + } + return template + + async def process_event(self, event: PretorEvent, consciousness_node=None) -> PretorEvent: + # Step 1: Analyze if the task is complex + complexity_prompt = f"Does the following task require multiple steps, tool chaining, or complex workflows? Answer strictly 'yes' or 'no'. Task: {event.user_message}" + complexity_result = await self.agent.run(complexity_prompt) + is_complex = "yes" in complexity_result.data.lower() + event.is_complex = is_complex + + if not is_complex: + # Simple Task: Call simple tool/agent logic and reply directly + reply_prompt = f"Please provide a helpful and direct response to the user's message: {event.user_message}" + reply_result = await self.agent.run(reply_prompt) + event.reply_message = reply_result.data + else: + # Complex Task: Delegate to ConsciousnessNode if available + if consciousness_node: + # We assume consciousness_node is a Ray actor reference + template = await self.select_workflow_template(event.user_message) + # Pass off to ConsciousnessNode to generate workflow + workflow = await consciousness_node.generate_workflow.remote(template, event.user_message) + event.workflow = workflow + + # Mock execution: we just say it's dispatched and summarize + polish_prompt = f"The user asked: '{event.user_message}'. We have generated a complex workflow (Title: {workflow.title}) to handle this. Write a polite reply informing the user that the system is processing their complex request." + polish_result = await self.agent.run(polish_prompt) + event.reply_message = polish_result.data + else: + event.reply_message = "This is a complex task but no Consciousness Node was provided to handle it." + + return event + + async def report_completion(self, workflow: PretorWorkflow) -> str: + prompt = f"Summarize the successful completion of this workflow: {workflow.model_dump()}" + result = await self.agent.run(prompt) + return result.data diff --git a/pretor/tool_plugin/docker_sandbox/__init__.py b/pretor/tool_plugin/docker_sandbox/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pretor/tool_plugin/docker_sandbox/docker_sandbox.py b/pretor/tool_plugin/docker_sandbox/docker_sandbox.py new file mode 100644 index 0000000..d5579c9 --- /dev/null +++ b/pretor/tool_plugin/docker_sandbox/docker_sandbox.py @@ -0,0 +1,32 @@ +import docker +from typing import Dict, Any + +class DockerSandboxTool: + def __init__(self): + try: + self.client = docker.from_env() + except Exception as e: + self.client = None + print(f"Failed to initialize Docker client: {e}") + + def run_code(self, code: str, image: str = "python:3.9-slim") -> Dict[str, Any]: + if not self.client: + return {"error": "Docker client not initialized"} + + try: + # Simple python code runner in a container + container = self.client.containers.run( + image, + command=["python", "-c", code], + remove=True, + detach=False, + stdout=True, + stderr=True + ) + # Depending on python version, container returns bytes directly + output = container.decode("utf-8") if isinstance(container, bytes) else container + return {"status": "success", "output": output} + except docker.errors.ContainerError as e: + return {"status": "error", "output": e.stderr.decode("utf-8")} + except Exception as e: + return {"status": "error", "error": str(e)} diff --git a/pretor/tool_plugin/rag/__init__.py b/pretor/tool_plugin/rag/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pretor/tool_plugin/rag/rag.py b/pretor/tool_plugin/rag/rag.py new file mode 100644 index 0000000..46923e6 --- /dev/null +++ b/pretor/tool_plugin/rag/rag.py @@ -0,0 +1,17 @@ +from typing import List, Dict, Any +from sqlmodel import select +# Assuming MemoryRecord is accessible or passed, simulating direct pgvector call + +class RAGTool: + def __init__(self, async_session_maker): + self.async_session_maker = async_session_maker + + async def get_embedding(self, query: str) -> List[float]: + # Simulated embedding logic; in reality, this would call an embedding API + return [0.1] * 1536 + + async def retrieve(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: + embedding = await self.get_embedding(query) + # We simulate the retrieve_memory call logic from MemoryRAG here + # Normally you would inject MemoryRAG or a repository, doing a simplistic return here + return [{"query": query, "simulated_results": f"Found results for {query} with vector {embedding[:2]}..."}] diff --git a/pretor/tool_plugin/web_crawler/__init__.py b/pretor/tool_plugin/web_crawler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pretor/tool_plugin/web_crawler/web_crawler.py b/pretor/tool_plugin/web_crawler/web_crawler.py new file mode 100644 index 0000000..1ecafc3 --- /dev/null +++ b/pretor/tool_plugin/web_crawler/web_crawler.py @@ -0,0 +1,20 @@ +import httpx +from typing import Dict, Any + +class WebCrawlerTool: + def __init__(self, timeout: int = 10): + self.timeout = timeout + + async def crawl(self, url: str) -> Dict[str, Any]: + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get(url) + response.raise_for_status() + # Basic text extraction can happen here (e.g., stripping HTML tags manually or with a library later) + return { + "url": url, + "status_code": response.status_code, + "content_preview": response.text[:500] + } + except Exception as e: + return {"url": url, "error": str(e)}