Files
KiloStar/kilostar/utils/access.py
T
zhaoxi 78bd6adc48 feat: workflow和chat分离
1,增加了创建workflow的页面
2.删除了event
2026-05-14 15:51:28 +00:00

135 lines
7.4 KiB
Python

# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import jwt
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import HTTPException, status, Request
from pydantic import BaseModel, ValidationError
from pwdlib import PasswordHash
class TokenData(BaseModel):
"""TokenData 核心组件类。
这是一个领域数据模型或功能封装类,承载了 TokenData 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
user_id: str
username: Optional[str] = None
exp: Optional[int] = None
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24
if not SECRET_KEY or SECRET_KEY in {"secret", "114514"}:
raise RuntimeError("未提供有效的 SECRET_KEY 或使用了不安全的默认值")
password_hasher = PasswordHash.recommended()
class Accessor:
"""Accessor 核心组件类。
这是一个领域数据模型或功能封装类,承载了 Accessor 相关的内聚属性定义与状态维护。它的存在隔离了局部的业务复杂性,并对外提供了类型安全的访问接口。"""
@staticmethod
def _decode_token(token: str) -> TokenData:
"""执行与 decode token 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: token (str): 由认证中心颁发的 JWT 或长期访问令牌,用于跨服务调用时的身份自证与权限校验。
Returns: (TokenData): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return TokenData(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 已过期",
)
except (jwt.InvalidTokenError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证",
)
@staticmethod
def _create_access_token(data: dict) -> str:
"""创建并持久化新的 access token 实体。
接收构建参数,执行必要的数据校验与默认值填充后,将新记录安全地写入底层存储或系统注册表中。
Args: data (dict): 从客户端传递过来或由上游组件生成的核心业务数据体,通常需要进一步的清洗和结构化解析。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": int(expire.timestamp())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""执行与 verify password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: plain_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 plain password 内容。 hashed_password (str): 控制逻辑流向的具体字符串参数,指定了期望的 hashed password 内容。
Returns: (bool): 一个布尔型结果标志,明确返回 True 表示该操作成功应用或条件达成,False 则表示失败或被拒绝。"""
return password_hasher.verify(plain_password, hashed_password)
@staticmethod
def get_current_user(request: Request) -> TokenData:
"""检索并获取特定的 current user 数据集合或实例对象。
根据提供的查询条件或上下文凭证,从数据库、缓存或第三方服务中读取对应的资源状态。
Args: request (Request): FastAPI 框架注入的原生 HTTP 请求对象,包含了完整的 Header 标头、查询参数和正文流。
Returns: (TokenData): 经由当前业务模型加工处理后所输出的具体数据实例或领域模型对象。"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未提供认证头部",
)
token = auth_header.split(" ")[1]
return Accessor._decode_token(token)
@staticmethod
def login_hashed_password(user: User, password: str) -> str:
"""执行与 login hashed password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: user (User): 当前已通过鉴权流程的访问者实体对象,内部包含用户角色、权限层级及租户归属等核心元信息。 password (str): 控制逻辑流向的具体字符串参数,指定了期望的 password 内容。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
)
if not Accessor.verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
token_payload = {"user_id": str(user.user_id), "username": user.user_name}
return Accessor._create_access_token(data=token_payload)
@staticmethod
def hash_password(password: str) -> str:
"""执行与 hash password 相关的核心业务流转操作。
该方法封装了具体的算法策略或状态控制逻辑,确保操作能够在事务上下文中被原子且一致地执行。
Args: password (str): 控制逻辑流向的具体字符串参数,指定了期望的 password 内容。
Returns: (str): 处理流程所输出的具体字符串产物,可能是新生成的 ID 序列、格式化好的文本片段或 LLM 推理的回答内容。"""
if not password:
raise ValueError("密码不能为空")
if len(password) < 6:
raise ValueError("密码长度不能小于 6 位")
return password_hasher.hash(password)