# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import asyncio import ray from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlmodel import SQLModel from pretor.core.database.module.individual import IndividualDatabase from pretor.core.database.module.user import AuthDatabase from pretor.core.database.module.provider import ProviderDatabase from pretor.core.database.module.system_node import SystemNodeDatabase @ray.remote class PostgresDatabase: def __init__(self): user = os.environ.get('POSTGRES_USER') password = os.environ.get('POSTGRES_PASSWORD') host = os.environ.get('POSTGRES_HOST') port = os.environ.get('POSTGRES_PORT') database = os.environ.get('POSTGRES_DB') database_url = f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}" self.async_engine = create_async_engine(database_url, echo=True) self.async_session_maker = sessionmaker(self.async_engine, class_=AsyncSession, expire_on_commit=False) self._auth_database = AuthDatabase(self.async_session_maker) self._provider_database = ProviderDatabase(self.async_session_maker) self._individual_database = IndividualDatabase(self.async_session_maker) self._system_node_database = SystemNodeDatabase(self.async_session_maker) self.ready_event = asyncio.Event() async def init_db(self) -> None: try: async with self.async_engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) except Exception as e: # Provide a warning if the database is not accessible, allowing # the app to start up for development/UI tests without crashing immediately. print(f"Warning: Failed to initialize PostgreSQL database: {e}") finally: self.ready_event.set() # Auth Database Methods async def add_user(self, user_name: str, hashed_password: str): await self.ready_event.wait() return await self._auth_database.add_user(user_name, hashed_password) async def change_password(self, user_name, old_password, new_password): await self.ready_event.wait() return await self._auth_database.change_password(user_name, old_password, new_password) async def delete_user(self, user_name: str): await self.ready_event.wait() return await self._auth_database.delete_user(user_name) async def delete_user_by_id(self, user_id: str): await self.ready_event.wait() return await self._auth_database.delete_user_by_id(user_id) async def login_user(self, user_name: str): await self.ready_event.wait() return await self._auth_database.login_user(user_name) async def get_all_users(self): await self.ready_event.wait() return await self._auth_database.get_all_users() async def get_user_authority(self, user_id: str): await self.ready_event.wait() return await self._auth_database.get_user_authority(user_id) async def change_user_authority(self, user_id: str, new_authority): await self.ready_event.wait() return await self._auth_database.change_user_authority(user_id, new_authority) # Provider Database Methods async def get_provider(self): await self.ready_event.wait() return await self._provider_database.get_provider() async def add_provider_db(self, **kwargs): await self.ready_event.wait() return await self._provider_database.add_provider(**kwargs) async def delete_provider_db(self, provider_id: str): await self.ready_event.wait() return await self._provider_database.delete_provider(provider_id) async def update_provider_db(self, provider_id: str, **kwargs): await self.ready_event.wait() return await self._provider_database.update_provider(provider_id, **kwargs) # System Node Database Methods async def upsert_system_node_config(self, node_name: str, provider_title: str, model_id: str, tools: list[str] = None): await self.ready_event.wait() return await self._system_node_database.upsert_system_node_config(node_name, provider_title, model_id, tools) async def get_all_system_node_configs(self): await self.ready_event.wait() return await self._system_node_database.get_all_system_node_configs() # Individual Database Methods async def add_worker_individual(self, **kwargs): await self.ready_event.wait() return await self._individual_database.add_worker_individual(**kwargs) async def get_worker_individual(self, agent_id: str): await self.ready_event.wait() return await self._individual_database.get_worker_individual(agent_id) async def get_worker_individual_list(self, owner_id: str): await self.ready_event.wait() return await self._individual_database.get_worker_individual_list(owner_id) async def update_worker_individual(self, agent_id: str, **kwargs): await self.ready_event.wait() return await self._individual_database.update_worker_individual(agent_id, **kwargs) async def delete_worker_individual(self, agent_id: str): await self.ready_event.wait() return await self._individual_database.delete_worker_individual(agent_id) async def get_all_worker_individual(self): await self.ready_event.wait() return await self._individual_database.get_all_worker_individual()