# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import asyncio import ray from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from kilostar.core.postgres_database.model.base import BaseDataModel from .module.individual import IndividualDatabase from .module.event import EventDatabase from .module.user import AuthDatabase from .module.provider import ProviderDatabase from .module.system_node import SystemNodeDatabase from .module.workflow import WorkflowDatabase from .module.chat_history import ChatHistoryDatabase @ray.remote class PostgresDatabase: """PostgresDatabase 核心组件类。 这是一个数据库操作层 (DAO/Repository) 封装类,专注于处理实体模型与关系型数据库表之间的映射。它将复杂的 SQL 查询、跨表 Join 和事务回滚逻辑进行了高级抽象,向上层服务暴露简洁的数据读写接口。""" def __init__(self): user = os.environ.get("POSTGRES_USER") password = os.environ.get("POSTGRES_PASSWORD") host = os.environ.get("POSTGRES_HOST") port = os.environ.get("POSTGRES_PORT") database = os.environ.get("POSTGRES_DB") database_url = ( f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}" ) self.async_engine = create_async_engine(database_url, echo=True) self.async_session_maker = sessionmaker( self.async_engine, class_=AsyncSession, expire_on_commit=False ) self._auth_database = AuthDatabase(self.async_session_maker) self._provider_database = ProviderDatabase(self.async_session_maker) self._individual_database = IndividualDatabase(self.async_session_maker) self._event_database = EventDatabase(self.async_session_maker) self._system_node_database = SystemNodeDatabase(self.async_session_maker) self._workflow_database = WorkflowDatabase(self.async_session_maker) self._chat_history_database = ChatHistoryDatabase(self.async_session_maker) self.ready_event = asyncio.Event() async def init_db(self) -> None: """完成 db 模块的启动与依赖初始化。 在系统引导或服务拉起阶段被调用,负责建立网络连接、分配基础内存资源及注册核心服务组件。 Returns: (None): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" try: async with self.async_engine.begin() as conn: await conn.run_sync(BaseDataModel.metadata.create_all) except Exception as e: # Provide a warning if the database is not accessible, allowing # the app to start up for development/UI tests without crashing immediately. print(f"Warning: Failed to initialize PostgreSQL database: {e}") finally: self.ready_event.set() # Auth Database Methods async def add_user(self, user_name: str, hashed_password: str): """创建并持久化新的 user 实体。 接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。 Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 hashed_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 hashed password 内容。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.add_user(user_name, hashed_password) async def change_password(self, user_name, old_password, new_password): """执行与 change password 相关的核心业务流转操作。 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。 Args: user_name: 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 old_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。 new_password: 参与 change password 逻辑运算或数据构建的上下文依赖对象。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.change_password( user_name, old_password, new_password ) async def delete_user(self, user_name: str): """安全地移除或注销 user。 执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。 Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.delete_user(user_name) async def delete_user_by_id(self, user_id: str): """安全地移除或注销 user by id。 执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。 Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.delete_user_by_id(user_id) async def login_user(self, user_name: str): """执行与 login user 相关的核心业务流转操作。 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。 Args: user_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.login_user(user_name) async def get_all_users(self): """检索并获取特定的 all users 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.get_all_users() async def get_user_authority(self, user_id: str): """检索并获取特定的 user authority 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.get_user_authority(user_id) async def change_user_authority(self, user_id: str, new_authority): """执行与 change user authority 相关的核心业务流转操作。 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。 Args: user_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 user 实例。 new_authority: 参与 change user authority 逻辑运算或数据构建的上下文依赖对象。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._auth_database.change_user_authority(user_id, new_authority) # Provider Database Methods async def get_provider(self): """检索并获取特定的 provider 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._provider_database.get_provider() async def add_provider_db(self, **kwargs): """创建并持久化新的 provider db 实体。 接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._provider_database.add_provider(**kwargs) async def delete_provider_db(self, provider_id: str): """安全地移除或注销 provider db。 执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。 Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._provider_database.delete_provider(provider_id) async def update_provider_db(self, provider_id: str, **kwargs): """对现有的 provider db 进行状态更新或属性覆盖。 基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。 Args: provider_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._provider_database.update_provider(provider_id, **kwargs) # System Node Database Methods async def upsert_system_node_config( self, node_name: str, provider_title: str, model_id: str, tools: list[str] = None, ): """执行与 upsert system node config 相关的核心业务流转操作。 该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。 Args: node_name (str): 赋予该实体的人类可读名称或标题字符串,主要用于前端 UI 展示、日志记录或模糊检索。 provider_title (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 provider_title 实例。 model_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 model 实例。 tools (list[str]): 控制逻辑流向的具体字符串参数,指定了期望的 tools 内容。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._system_node_database.upsert_system_node_config( node_name, provider_title, model_id, tools ) async def get_all_system_node_configs(self): """检索并获取特定的 all system node configs 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._system_node_database.get_all_system_node_configs() # Individual Database Methods async def add_worker_individual(self, **kwargs): """创建并持久化新的 worker individual 实体。 接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.add_worker_individual(**kwargs) async def get_worker_individual(self, agent_id: str): """检索并获取特定的 worker individual 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.get_worker_individual(agent_id) async def get_worker_individual_list(self, owner_id: str): """检索并获取特定的 worker individual list 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Args: owner_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 owner 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.get_worker_individual_list(owner_id) async def update_worker_individual(self, agent_id: str, **kwargs): """对现有的 worker individual 进行状态更新或属性覆盖。 基于增量变更原则,合并最新的配置或数据,并触发相关依赖组件的缓存刷新或事件通知。 Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.update_worker_individual( agent_id, **kwargs ) async def delete_worker_individual(self, agent_id: str): """安全地移除或注销 worker individual。 执行物理删除或逻辑删除操作,并妥善清理相关的关联数据及占用资源。 Args: agent_id (str): 目标对象的唯一全局标识符 (UUID/ULID),用于在数据库表或缓存结构中精准匹配该 agent 实例。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.delete_worker_individual(agent_id) async def get_all_worker_individual(self): """检索并获取特定的 all worker individual 数据集合或实例对象。 根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。 Returns: : 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。""" await self.ready_event.wait() return await self._individual_database.get_all_worker_individual() # Event Database Methods async def upsert_event(self, trace_id: str, event_data_json: str): await self.ready_event.wait() return await self._event_database.upsert_event(trace_id, event_data_json) async def get_event(self, trace_id: str): await self.ready_event.wait() return await self._event_database.get_event(trace_id) async def get_all_events(self): await self.ready_event.wait() return await self._event_database.get_all_events() async def delete_event(self, trace_id: str): await self.ready_event.wait() return await self._event_database.delete_event(trace_id) # Workflow Database Methods async def create_workflow( self, trace_id: str, user_id: str, title: str, command: str ): await self.ready_event.wait() return await self._workflow_database.create_workflow( trace_id, user_id, title, command ) async def get_workflow(self, trace_id: str): await self.ready_event.wait() return await self._workflow_database.get_workflow(trace_id) async def update_workflow_status(self, trace_id: str, status: str): await self.ready_event.wait() return await self._workflow_database.update_workflow_status(trace_id, status) async def list_workflows(self, user_id: str): await self.ready_event.wait() return await self._workflow_database.list_workflows(user_id) async def upsert_workflow_context(self, trace_id: str, **kwargs): await self.ready_event.wait() return await self._workflow_database.upsert_workflow_context(trace_id, **kwargs) async def get_workflow_context(self, trace_id: str): await self.ready_event.wait() return await self._workflow_database.get_workflow_context(trace_id) # Chat History Database Methods async def create_chat_session(self, user_id: str, title: str = "新对话"): await self.ready_event.wait() return await self._chat_history_database.create_chat_session(user_id, title) async def list_chat_sessions(self, user_id: str): await self.ready_event.wait() return await self._chat_history_database.list_chat_sessions(user_id) async def add_chat_message(self, chat_id: str, message: str, message_owner: str): await self.ready_event.wait() return await self._chat_history_database.add_chat_message( chat_id, message, message_owner ) async def list_chat_messages(self, chat_id: str): await self.ready_event.wait() return await self._chat_history_database.list_chat_messages(chat_id)