Files
zhaoxi 99520c69d7 feat(system):优化后端
1.新增后端测试
2.增加了后端的加密
3.增加了i18n(国际化)
2026-05-31 15:39:34 +00:00

88 lines
2.5 KiB
Python

import os
from functools import lru_cache
from cryptography.fernet import Fernet, InvalidToken
from kilostar.utils.logger import get_logger
logger = get_logger("crypto")
_VERSION_PREFIX = "v1:"
_SENSITIVE_KEYS = {"key", "token", "secret", "password", "apikey", "api_key"}
class CryptoError(Exception):
pass
@lru_cache(maxsize=1)
def _get_fernet() -> Fernet:
raw = os.environ.get("KILOSTAR_SECRET_KEY", "")
if not raw:
raise CryptoError(
"环境变量 KILOSTAR_SECRET_KEY 未设置,无法进行加解密。"
"请生成一个密钥:python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
)
try:
return Fernet(raw.encode() if isinstance(raw, str) else raw)
except Exception as e:
raise CryptoError(f"KILOSTAR_SECRET_KEY 格式无效: {e}") from e
def encrypt_secret(plaintext: str) -> str:
if not plaintext:
return plaintext
f = _get_fernet()
token = f.encrypt(plaintext.encode("utf-8"))
return _VERSION_PREFIX + token.decode("utf-8")
def decrypt_secret(ciphertext: str) -> str:
if not ciphertext:
return ciphertext
if not ciphertext.startswith(_VERSION_PREFIX):
return ciphertext
raw = ciphertext[len(_VERSION_PREFIX):]
f = _get_fernet()
try:
return f.decrypt(raw.encode("utf-8")).decode("utf-8")
except InvalidToken as e:
raise CryptoError("解密失败:密钥不匹配或密文已损坏") from e
def is_encrypted(value: str) -> bool:
return isinstance(value, str) and value.startswith(_VERSION_PREFIX)
def _is_sensitive_key(key: str) -> bool:
lower = key.lower()
return any(s in lower for s in _SENSITIVE_KEYS)
def encrypt_dict_secrets(data: dict) -> dict:
if not isinstance(data, dict):
return data
out: dict = {}
for k, v in data.items():
if _is_sensitive_key(k) and isinstance(v, str) and v and not is_encrypted(v):
out[k] = encrypt_secret(v)
else:
out[k] = v
return out
def decrypt_dict_secrets(data: dict) -> dict:
if not isinstance(data, dict):
return data
out: dict = {}
for k, v in data.items():
if _is_sensitive_key(k) and isinstance(v, str) and is_encrypted(v):
try:
out[k] = decrypt_secret(v)
except CryptoError as e:
logger.error(f"字段 {k} 解密失败: {e}")
out[k] = v
else:
out[k] = v
return out