Files
2026-07-01 09:22:26 +00:00

96 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ray_submit:把分析脚本提交到 Raydistributed)或 subprocessstandalone)执行。
凭证以 ``AWS_*`` 环境变量注入子进程,让 boto3/pandas-s3 自然读到。
脚本走 ``kilostar.utils.sandbox.validate_python_code`` 的静态屏蔽兜底。
"""
from __future__ import annotations
import asyncio
import os
import sys
import tempfile
from kilostar.utils.ray_compat import _STANDALONE
from kilostar.utils.sandbox import (
CodeViolation,
get_python_timeout,
validate_python_code,
)
from ._s3_common import get_s3_creds_or_raise
def _build_env(creds) -> dict:
env = os.environ.copy()
env["AWS_ACCESS_KEY_ID"] = creds["access_key"]
env["AWS_SECRET_ACCESS_KEY"] = creds["secret_key"]
env["AWS_DEFAULT_REGION"] = creds.get("region") or "us-east-1"
if creds.get("endpoint_url"):
env["AWS_ENDPOINT_URL_S3"] = creds["endpoint_url"]
env["AWS_ENDPOINT_URL"] = creds["endpoint_url"]
return env
async def ray_submit(script: str, timeout: int = 300) -> str:
"""提交 Python 脚本到 Ray(分布式)或子进程(单机)执行。
脚本中可直接 ``import boto3`` 读 S3(凭证已通过环境变量注入);可用
pandas / polars / numpy 等已安装的依赖。**只读**——不要尝试 put/delete。
Args:
script: Python 源码
timeout: 超时秒数(默认 300
Returns:
stdout(必要时尾部追加 stderr 与 exit code
"""
try:
script = validate_python_code(script)
except CodeViolation as e:
return f"[Sandbox] {e}"
creds = get_s3_creds_or_raise()
env = _build_env(creds)
timeout = get_python_timeout(timeout)
# standalone 与 distributed 第一版都走 subprocess,保证环境变量传递可控
# ray.remote 跑函数时 env vars 需另装 runtime_env,复杂度跟 subprocess 持平
# 但前者透明可控,先这样落地)
tmp_file = None
try:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False, encoding="utf-8"
) as f:
f.write(script)
tmp_file = f.name
proc = await asyncio.create_subprocess_exec(
sys.executable,
tmp_file,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
out = stdout.decode("utf-8", errors="replace")
err = stderr.decode("utf-8", errors="replace")
result = ""
if out:
result += out
if err:
result += f"\n[stderr]\n{err}"
if proc.returncode != 0:
result += f"\n[exit code: {proc.returncode}]"
result = result.strip() or "(no output)"
if not _STANDALONE:
result = f"[mode: ray-cluster (subprocess)]\n{result}"
return result
except asyncio.TimeoutError:
return f"[Error] ray_submit 执行超时({timeout}s"
except Exception as e:
return f"[Error] ray_submit 失败:{e}"
finally:
if tmp_file and os.path.exists(tmp_file):
os.unlink(tmp_file)