feat(system):优化后端
1.新增后端测试 2.增加了后端的加密 3.增加了i18n(国际化)
This commit is contained in:
@@ -0,0 +1,87 @@
|
||||
import os
|
||||
from functools import lru_cache
|
||||
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
from kilostar.utils.logger import get_logger
|
||||
|
||||
logger = get_logger("crypto")
|
||||
|
||||
_VERSION_PREFIX = "v1:"
|
||||
_SENSITIVE_KEYS = {"key", "token", "secret", "password", "apikey", "api_key"}
|
||||
|
||||
|
||||
class CryptoError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def _get_fernet() -> Fernet:
|
||||
raw = os.environ.get("KILOSTAR_SECRET_KEY", "")
|
||||
if not raw:
|
||||
raise CryptoError(
|
||||
"环境变量 KILOSTAR_SECRET_KEY 未设置,无法进行加解密。"
|
||||
"请生成一个密钥:python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
|
||||
)
|
||||
try:
|
||||
return Fernet(raw.encode() if isinstance(raw, str) else raw)
|
||||
except Exception as e:
|
||||
raise CryptoError(f"KILOSTAR_SECRET_KEY 格式无效: {e}") from e
|
||||
|
||||
|
||||
def encrypt_secret(plaintext: str) -> str:
|
||||
if not plaintext:
|
||||
return plaintext
|
||||
f = _get_fernet()
|
||||
token = f.encrypt(plaintext.encode("utf-8"))
|
||||
return _VERSION_PREFIX + token.decode("utf-8")
|
||||
|
||||
|
||||
def decrypt_secret(ciphertext: str) -> str:
|
||||
if not ciphertext:
|
||||
return ciphertext
|
||||
if not ciphertext.startswith(_VERSION_PREFIX):
|
||||
return ciphertext
|
||||
raw = ciphertext[len(_VERSION_PREFIX):]
|
||||
f = _get_fernet()
|
||||
try:
|
||||
return f.decrypt(raw.encode("utf-8")).decode("utf-8")
|
||||
except InvalidToken as e:
|
||||
raise CryptoError("解密失败:密钥不匹配或密文已损坏") from e
|
||||
|
||||
|
||||
def is_encrypted(value: str) -> bool:
|
||||
return isinstance(value, str) and value.startswith(_VERSION_PREFIX)
|
||||
|
||||
|
||||
def _is_sensitive_key(key: str) -> bool:
|
||||
lower = key.lower()
|
||||
return any(s in lower for s in _SENSITIVE_KEYS)
|
||||
|
||||
|
||||
def encrypt_dict_secrets(data: dict) -> dict:
|
||||
if not isinstance(data, dict):
|
||||
return data
|
||||
out: dict = {}
|
||||
for k, v in data.items():
|
||||
if _is_sensitive_key(k) and isinstance(v, str) and v and not is_encrypted(v):
|
||||
out[k] = encrypt_secret(v)
|
||||
else:
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
def decrypt_dict_secrets(data: dict) -> dict:
|
||||
if not isinstance(data, dict):
|
||||
return data
|
||||
out: dict = {}
|
||||
for k, v in data.items():
|
||||
if _is_sensitive_key(k) and isinstance(v, str) and is_encrypted(v):
|
||||
try:
|
||||
out[k] = decrypt_secret(v)
|
||||
except CryptoError as e:
|
||||
logger.error(f"字段 {k} 解密失败: {e}")
|
||||
out[k] = v
|
||||
else:
|
||||
out[k] = v
|
||||
return out
|
||||
Reference in New Issue
Block a user