Files
2026-07-01 09:22:26 +00:00

44 lines
1.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""S3 工具共用辅助:从 ContextVar 拿凭证 + 解析 URI。
所有 s3_* 工具都依赖这个模块,把"明文凭证"的取用集中在一处。
"""
from __future__ import annotations
import re
from typing import Any, Dict, Tuple
def get_s3_creds_or_raise() -> Dict[str, Any]:
"""从 organization 注入的 ContextVar 中取出明文凭证;未注入则抛错。"""
# 延迟 import 避免循环;这里走 organization 子类被加载时注入的虚拟包路径
from ..core.organization import S3_CREDS_VAR
creds = S3_CREDS_VAR.get()
if not creds:
raise RuntimeError(
"未提供 S3 凭证:本任务上下文中没有 cred_id,请在创建 job 时选择凭证。"
)
return creds
def parse_s3_uri(uri: str) -> Tuple[str, str]:
"""解析 ``s3://bucket/key`` → ``(bucket, key)``;非法格式抛 ValueError。"""
m = re.match(r"^s3://([^/]+)/(.+)$", uri.strip())
if not m:
raise ValueError(f"非法 S3 URI{uri!r}(期待 s3://bucket/key 形式)")
return m.group(1), m.group(2)
def make_session_kwargs(creds: Dict[str, Any]) -> Dict[str, Any]:
"""转 boto3/aiobotocore client 调用所需的 kwargs。"""
kw: Dict[str, Any] = {
"aws_access_key_id": creds["access_key"],
"aws_secret_access_key": creds["secret_key"],
"region_name": creds.get("region") or "us-east-1",
}
endpoint = creds.get("endpoint_url")
if endpoint:
kw["endpoint_url"] = endpoint
return kw