Files
zhaoxi 99520c69d7 feat(system):优化后端
1.新增后端测试
2.增加了后端的加密
3.增加了i18n(国际化)
2026-05-31 15:39:34 +00:00

98 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Alembic 迁移环境。
设计要点:
1. 数据库 URL 从 ``POSTGRES_*`` 环境变量动态拼装,不污染 ``alembic.ini``
2. 复用项目本身的 ORM metadata``BaseDataModel.metadata`` 让 autogenerate
能识别全部表;
3. 与运行期保持一致使用 ``asyncpg`` 异步驱动 —— 通过 ``async_engine_from_config``
建立 AsyncEngine,再借 ``run_sync`` 把 alembic 的 sync migration 执行入口
挂载进去。这样无需额外引入 psycopg 等同步驱动。
"""
from __future__ import annotations
import asyncio
import os
import sys
from logging.config import fileConfig
from pathlib import Path
from alembic import context
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from kilostar.core.postgres_database.model.base import BaseDataModel # noqa: E402
from kilostar.core.postgres_database import model as _model_pkg # noqa: F401,E402
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
def _build_db_url() -> str:
"""从 ``POSTGRES_*`` 环境变量拼装一个 asyncpg URL。"""
user = os.environ.get("POSTGRES_USER", "postgres")
password = os.environ.get("POSTGRES_PASSWORD", "postgrespassword")
host = os.environ.get("POSTGRES_HOST", "127.0.0.1")
port = os.environ.get("POSTGRES_PORT", "5432")
db = os.environ.get("POSTGRES_DB", "kilostar")
return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db}"
config.set_main_option("sqlalchemy.url", _build_db_url())
target_metadata = BaseDataModel.metadata
def run_migrations_offline() -> None:
"""离线模式:不连库,直接把 SQL 渲染到 stdout。"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
def _do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
async def _run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(_do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(_run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()