import os from functools import lru_cache from cryptography.fernet import Fernet, InvalidToken from kilostar.utils.logger import get_logger logger = get_logger("crypto") _VERSION_PREFIX = "v1:" _SENSITIVE_KEYS = {"key", "token", "secret", "password", "apikey", "api_key"} class CryptoError(Exception): pass @lru_cache(maxsize=1) def _get_fernet() -> Fernet: raw = os.environ.get("KILOSTAR_SECRET_KEY", "") if not raw: raise CryptoError( "环境变量 KILOSTAR_SECRET_KEY 未设置,无法进行加解密。" "请生成一个密钥:python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"" ) try: return Fernet(raw.encode() if isinstance(raw, str) else raw) except Exception as e: raise CryptoError(f"KILOSTAR_SECRET_KEY 格式无效: {e}") from e def encrypt_secret(plaintext: str) -> str: if not plaintext: return plaintext f = _get_fernet() token = f.encrypt(plaintext.encode("utf-8")) return _VERSION_PREFIX + token.decode("utf-8") def decrypt_secret(ciphertext: str) -> str: if not ciphertext: return ciphertext if not ciphertext.startswith(_VERSION_PREFIX): return ciphertext raw = ciphertext[len(_VERSION_PREFIX):] f = _get_fernet() try: return f.decrypt(raw.encode("utf-8")).decode("utf-8") except InvalidToken as e: raise CryptoError("解密失败:密钥不匹配或密文已损坏") from e def is_encrypted(value: str) -> bool: return isinstance(value, str) and value.startswith(_VERSION_PREFIX) def _is_sensitive_key(key: str) -> bool: lower = key.lower() return any(s in lower for s in _SENSITIVE_KEYS) def encrypt_dict_secrets(data: dict) -> dict: if not isinstance(data, dict): return data out: dict = {} for k, v in data.items(): if _is_sensitive_key(k) and isinstance(v, str) and v and not is_encrypted(v): out[k] = encrypt_secret(v) else: out[k] = v return out def decrypt_dict_secrets(data: dict) -> dict: if not isinstance(data, dict): return data out: dict = {} for k, v in data.items(): if _is_sensitive_key(k) and isinstance(v, str) and is_encrypted(v): try: out[k] = decrypt_secret(v) except CryptoError as e: logger.error(f"字段 {k} 解密失败: {e}") out[k] = v else: out[k] = v return out