99520c69d7
1.新增后端测试 2.增加了后端的加密 3.增加了i18n(国际化)
88 lines
2.5 KiB
Python
88 lines
2.5 KiB
Python
import os
|
|
from functools import lru_cache
|
|
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
|
|
from kilostar.utils.logger import get_logger
|
|
|
|
logger = get_logger("crypto")
|
|
|
|
_VERSION_PREFIX = "v1:"
|
|
_SENSITIVE_KEYS = {"key", "token", "secret", "password", "apikey", "api_key"}
|
|
|
|
|
|
class CryptoError(Exception):
|
|
pass
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _get_fernet() -> Fernet:
|
|
raw = os.environ.get("KILOSTAR_SECRET_KEY", "")
|
|
if not raw:
|
|
raise CryptoError(
|
|
"环境变量 KILOSTAR_SECRET_KEY 未设置,无法进行加解密。"
|
|
"请生成一个密钥:python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
|
|
)
|
|
try:
|
|
return Fernet(raw.encode() if isinstance(raw, str) else raw)
|
|
except Exception as e:
|
|
raise CryptoError(f"KILOSTAR_SECRET_KEY 格式无效: {e}") from e
|
|
|
|
|
|
def encrypt_secret(plaintext: str) -> str:
|
|
if not plaintext:
|
|
return plaintext
|
|
f = _get_fernet()
|
|
token = f.encrypt(plaintext.encode("utf-8"))
|
|
return _VERSION_PREFIX + token.decode("utf-8")
|
|
|
|
|
|
def decrypt_secret(ciphertext: str) -> str:
|
|
if not ciphertext:
|
|
return ciphertext
|
|
if not ciphertext.startswith(_VERSION_PREFIX):
|
|
return ciphertext
|
|
raw = ciphertext[len(_VERSION_PREFIX):]
|
|
f = _get_fernet()
|
|
try:
|
|
return f.decrypt(raw.encode("utf-8")).decode("utf-8")
|
|
except InvalidToken as e:
|
|
raise CryptoError("解密失败:密钥不匹配或密文已损坏") from e
|
|
|
|
|
|
def is_encrypted(value: str) -> bool:
|
|
return isinstance(value, str) and value.startswith(_VERSION_PREFIX)
|
|
|
|
|
|
def _is_sensitive_key(key: str) -> bool:
|
|
lower = key.lower()
|
|
return any(s in lower for s in _SENSITIVE_KEYS)
|
|
|
|
|
|
def encrypt_dict_secrets(data: dict) -> dict:
|
|
if not isinstance(data, dict):
|
|
return data
|
|
out: dict = {}
|
|
for k, v in data.items():
|
|
if _is_sensitive_key(k) and isinstance(v, str) and v and not is_encrypted(v):
|
|
out[k] = encrypt_secret(v)
|
|
else:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
def decrypt_dict_secrets(data: dict) -> dict:
|
|
if not isinstance(data, dict):
|
|
return data
|
|
out: dict = {}
|
|
for k, v in data.items():
|
|
if _is_sensitive_key(k) and isinstance(v, str) and is_encrypted(v):
|
|
try:
|
|
out[k] = decrypt_secret(v)
|
|
except CryptoError as e:
|
|
logger.error(f"字段 {k} 解密失败: {e}")
|
|
out[k] = v
|
|
else:
|
|
out[k] = v
|
|
return out
|