# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """请求/工作流上下文:基于 ``contextvars`` 的双层 ID 传播。 设计上把"一次用户请求"和"一次重型工作流"区分开: - ``request_id``:会话域。所有进 API 的请求都要带,由 middleware 在入口生成或 从 ``X-Request-Id`` 头继承。chat 这条同步链路靠它走完一生。 - ``trace_id``:工作流域。只有 ``ConsciousnessNode`` 决定启动重型任务时才生成, 挂到 ``KiloStarWorkflow`` 上。trace_id 应能追溯回触发它的 request_id(前者 通过显式参数传入,后者从 contextvars 读取)。 为什么用 ``contextvars`` 而不是参数透传: 1. ``contextvars`` 在 ``asyncio`` 协程间天然继承,不会跨协程串味; 2. ``loguru`` 的 ``patcher`` 钩子可以把它变成日志切面,业务代码不需要在每条 ``logger.info`` 上手动 ``.bind(trace_id=...)``; 3. Ray 跨进程调用时 contextvars 不会自动传播 —— 这是有意为之,避免不同 actor 间的上下文意外串联。跨 actor 边界要走显式参数,由接收方再 ``bind_*`` 一次。 """ from __future__ import annotations import uuid from contextlib import contextmanager from contextvars import ContextVar, Token from typing import Iterator, Optional _request_id_var: ContextVar[str] = ContextVar("kilostar_request_id", default="") _trace_id_var: ContextVar[str] = ContextVar("kilostar_trace_id", default="") def get_request_id() -> str: """返回当前协程的 ``request_id``,未绑定时返回空串。""" return _request_id_var.get() def get_trace_id() -> str: """返回当前协程的 ``trace_id``,未绑定时返回空串。""" return _trace_id_var.get() def bind_request_id(request_id: str) -> Token: """直接绑定 ``request_id`` 到当前 context,返回 token 以便 ``reset`` 还原。 返回的 ``Token`` 只能在与 ``set`` 同一线程/协程中传给 ``reset``,否则会抛 ``ValueError``。一般情况下推荐用 ``request_id_scope`` 上下文管理器代替。 """ return _request_id_var.set(request_id) def bind_trace_id(trace_id: str) -> Token: """直接绑定 ``trace_id`` 到当前 context,返回 token 以便 ``reset`` 还原。""" return _trace_id_var.set(trace_id) def reset_request_id(token: Token) -> None: _request_id_var.reset(token) def reset_trace_id(token: Token) -> None: _trace_id_var.reset(token) @contextmanager def request_id_scope(request_id: str) -> Iterator[str]: """``with`` 范围内绑定 request_id,退出自动还原。""" token = _request_id_var.set(request_id) try: yield request_id finally: _request_id_var.reset(token) @contextmanager def trace_id_scope(trace_id: str) -> Iterator[str]: """``with`` 范围内绑定 trace_id,退出自动还原。""" token = _trace_id_var.set(trace_id) try: yield trace_id finally: _trace_id_var.reset(token) def new_request_id(prefix: str = "req") -> str: """生成一个新的 request_id:``-``。""" return f"{prefix}-{uuid.uuid4().hex}" def snapshot() -> dict[str, str]: """返回当前上下文 ID 的快照,便于跨 actor/task 边界显式透传。""" return { "request_id": _request_id_var.get(), "trace_id": _trace_id_var.get(), } @contextmanager def apply_snapshot(snap: Optional[dict[str, str]]) -> Iterator[None]: """把外部传来的 snapshot 在当前 context 内生效一次(用于跨 Ray actor 调用时)。""" if not snap: yield return tokens: list[Token] = [] if snap.get("request_id"): tokens.append(_request_id_var.set(snap["request_id"])) if snap.get("trace_id"): tokens.append(_trace_id_var.set(snap["trace_id"])) try: yield finally: for tok in reversed(tokens): try: tok.var.reset(tok) except (ValueError, LookupError): # token 可能因协程切换失效,宽容处理 pass