151 lines
4.8 KiB
Python
151 lines
4.8 KiB
Python
"""data_analytics 插件 API:凭证 CRUD + 分析任务提交/查询/事件流。
|
|
|
|
挂载后路径前缀为 /api/v1/plugin/data_analytics/...,跟核心 API 完全独立。
|
|
所有数据库读写都走 organization actor 的代理方法(确保分布式模式下不跨 actor
|
|
共享 SQLAlchemy session)。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
|
|
from kilostar.utils.access import Accessor, TokenData
|
|
from kilostar.utils.ray_hook import ray_actor_hook
|
|
|
|
router = APIRouter(tags=["data_analytics"])
|
|
|
|
|
|
# ─── Schemas ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class CredentialCreate(BaseModel):
|
|
display_name: str = Field(..., max_length=100)
|
|
endpoint_url: Optional[str] = None
|
|
region: str = "us-east-1"
|
|
access_key: str = Field(..., min_length=1)
|
|
secret_key: str = Field(..., min_length=1)
|
|
|
|
|
|
class JobCreate(BaseModel):
|
|
cred_id: str
|
|
description: str = Field(..., min_length=1, max_length=2000)
|
|
|
|
|
|
# ─── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _get_org():
|
|
try:
|
|
return ray_actor_hook("org_data_analytics").org_data_analytics
|
|
except Exception as e:
|
|
raise HTTPException(503, f"data_analytics 插件未就绪:{e}")
|
|
|
|
|
|
# ─── Credentials ────────────────────────────────────────────────────────────
|
|
|
|
|
|
@router.get("/credentials")
|
|
async def list_credentials(
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
rows = await org.cred_list.remote(token_data.username)
|
|
return {"credentials": rows}
|
|
|
|
|
|
@router.post("/credentials")
|
|
async def create_credential(
|
|
payload: CredentialCreate,
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
row = await org.cred_create.remote(
|
|
user_id=token_data.username,
|
|
display_name=payload.display_name,
|
|
access_key=payload.access_key,
|
|
secret_key=payload.secret_key,
|
|
endpoint_url=payload.endpoint_url,
|
|
region=payload.region,
|
|
)
|
|
return row
|
|
|
|
|
|
@router.delete("/credentials/{cred_id}")
|
|
async def delete_credential(
|
|
cred_id: str,
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
ok = await org.cred_delete.remote(cred_id, token_data.username)
|
|
if not ok:
|
|
raise HTTPException(404, "凭证不存在或不属于当前用户")
|
|
return {"status": "ok"}
|
|
|
|
|
|
# ─── Jobs ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@router.post("/jobs")
|
|
async def create_job(
|
|
payload: JobCreate,
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
try:
|
|
return await org.job_create.remote(
|
|
user_id=token_data.username,
|
|
cred_id=payload.cred_id,
|
|
description=payload.description,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(400, str(e))
|
|
|
|
|
|
@router.get("/jobs")
|
|
async def list_jobs(
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
rows = await org.job_list.remote(token_data.username)
|
|
return {"jobs": rows}
|
|
|
|
|
|
@router.get("/jobs/{job_id}")
|
|
async def get_job(
|
|
job_id: str,
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
org = _get_org()
|
|
row = await org.job_get.remote(job_id, token_data.username)
|
|
if row is None:
|
|
raise HTTPException(404, "任务不存在")
|
|
return row
|
|
|
|
|
|
@router.get("/jobs/{job_id}/stream")
|
|
async def stream_job(
|
|
job_id: str,
|
|
token_data: TokenData = Depends(Accessor.get_current_user),
|
|
):
|
|
"""转发 organization 事件流为 SSE。"""
|
|
import json
|
|
|
|
org = _get_org()
|
|
row = await org.job_get.remote(job_id, token_data.username)
|
|
if row is None:
|
|
raise HTTPException(404, "任务不存在")
|
|
org_task_id = row.get("org_task_id")
|
|
if not org_task_id:
|
|
raise HTTPException(409, "任务尚未投递到 organization")
|
|
|
|
async def _generate():
|
|
async for event in await org.stream.remote(org_task_id):
|
|
payload = event if isinstance(event, str) else json.dumps(event, ensure_ascii=False)
|
|
yield f"data: {payload}\n\n"
|
|
|
|
return StreamingResponse(_generate(), media_type="text/event-stream")
|