Files
zhaoxi 99520c69d7 feat(system):优化后端
1.新增后端测试
2.增加了后端的加密
3.增加了i18n(国际化)
2026-05-31 15:39:34 +00:00

131 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""请求/工作流上下文:基于 ``contextvars`` 的双层 ID 传播。
设计上把"一次用户请求""一次重型工作流"区分开:
- ``request_id``:会话域。所有进 API 的请求都要带,由 middleware 在入口生成或
从 ``X-Request-Id`` 头继承。chat 这条同步链路靠它走完一生。
- ``trace_id``:工作流域。只有 ``ConsciousnessNode`` 决定启动重型任务时才生成,
挂到 ``KiloStarWorkflow`` 上。trace_id 应能追溯回触发它的 request_id(前者
通过显式参数传入,后者从 contextvars 读取)。
为什么用 ``contextvars`` 而不是参数透传:
1. ``contextvars`` 在 ``asyncio`` 协程间天然继承,不会跨协程串味;
2. ``loguru`` 的 ``patcher`` 钩子可以把它变成日志切面,业务代码不需要在每条
``logger.info`` 上手动 ``.bind(trace_id=...)``
3. Ray 跨进程调用时 contextvars 不会自动传播 —— 这是有意为之,避免不同 actor
间的上下文意外串联。跨 actor 边界要走显式参数,由接收方再 ``bind_*`` 一次。
"""
from __future__ import annotations
import uuid
from contextlib import contextmanager
from contextvars import ContextVar, Token
from typing import Iterator, Optional
_request_id_var: ContextVar[str] = ContextVar("kilostar_request_id", default="")
_trace_id_var: ContextVar[str] = ContextVar("kilostar_trace_id", default="")
def get_request_id() -> str:
"""返回当前协程的 ``request_id``,未绑定时返回空串。"""
return _request_id_var.get()
def get_trace_id() -> str:
"""返回当前协程的 ``trace_id``,未绑定时返回空串。"""
return _trace_id_var.get()
def bind_request_id(request_id: str) -> Token:
"""直接绑定 ``request_id`` 到当前 context,返回 token 以便 ``reset`` 还原。
返回的 ``Token`` 只能在与 ``set`` 同一线程/协程中传给 ``reset``,否则会抛
``ValueError``。一般情况下推荐用 ``request_id_scope`` 上下文管理器代替。
"""
return _request_id_var.set(request_id)
def bind_trace_id(trace_id: str) -> Token:
"""直接绑定 ``trace_id`` 到当前 context,返回 token 以便 ``reset`` 还原。"""
return _trace_id_var.set(trace_id)
def reset_request_id(token: Token) -> None:
_request_id_var.reset(token)
def reset_trace_id(token: Token) -> None:
_trace_id_var.reset(token)
@contextmanager
def request_id_scope(request_id: str) -> Iterator[str]:
"""``with`` 范围内绑定 request_id,退出自动还原。"""
token = _request_id_var.set(request_id)
try:
yield request_id
finally:
_request_id_var.reset(token)
@contextmanager
def trace_id_scope(trace_id: str) -> Iterator[str]:
"""``with`` 范围内绑定 trace_id,退出自动还原。"""
token = _trace_id_var.set(trace_id)
try:
yield trace_id
finally:
_trace_id_var.reset(token)
def new_request_id(prefix: str = "req") -> str:
"""生成一个新的 request_id``<prefix>-<uuid4 hex>``。"""
return f"{prefix}-{uuid.uuid4().hex}"
def snapshot() -> dict[str, str]:
"""返回当前上下文 ID 的快照,便于跨 actor/task 边界显式透传。"""
return {
"request_id": _request_id_var.get(),
"trace_id": _trace_id_var.get(),
}
@contextmanager
def apply_snapshot(snap: Optional[dict[str, str]]) -> Iterator[None]:
"""把外部传来的 snapshot 在当前 context 内生效一次(用于跨 Ray actor 调用时)。"""
if not snap:
yield
return
tokens: list[Token] = []
if snap.get("request_id"):
tokens.append(_request_id_var.set(snap["request_id"]))
if snap.get("trace_id"):
tokens.append(_trace_id_var.set(snap["trace_id"]))
try:
yield
finally:
for tok in reversed(tokens):
try:
tok.var.reset(tok)
except (ValueError, LookupError):
# token 可能因协程切换失效,宽容处理
pass