Files
2026-07-01 09:22:26 +00:00

151 lines
4.8 KiB
Python

"""data_analytics 插件 API:凭证 CRUD + 分析任务提交/查询/事件流。
挂载后路径前缀为 /api/v1/plugin/data_analytics/...,跟核心 API 完全独立。
所有数据库读写都走 organization actor 的代理方法(确保分布式模式下不跨 actor
共享 SQLAlchemy session)。
"""
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from kilostar.utils.access import Accessor, TokenData
from kilostar.utils.ray_hook import ray_actor_hook
router = APIRouter(tags=["data_analytics"])
# ─── Schemas ────────────────────────────────────────────────────────────────
class CredentialCreate(BaseModel):
display_name: str = Field(..., max_length=100)
endpoint_url: Optional[str] = None
region: str = "us-east-1"
access_key: str = Field(..., min_length=1)
secret_key: str = Field(..., min_length=1)
class JobCreate(BaseModel):
cred_id: str
description: str = Field(..., min_length=1, max_length=2000)
# ─── Helpers ────────────────────────────────────────────────────────────────
def _get_org():
try:
return ray_actor_hook("org_data_analytics").org_data_analytics
except Exception as e:
raise HTTPException(503, f"data_analytics 插件未就绪:{e}")
# ─── Credentials ────────────────────────────────────────────────────────────
@router.get("/credentials")
async def list_credentials(
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
rows = await org.cred_list.remote(token_data.username)
return {"credentials": rows}
@router.post("/credentials")
async def create_credential(
payload: CredentialCreate,
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
row = await org.cred_create.remote(
user_id=token_data.username,
display_name=payload.display_name,
access_key=payload.access_key,
secret_key=payload.secret_key,
endpoint_url=payload.endpoint_url,
region=payload.region,
)
return row
@router.delete("/credentials/{cred_id}")
async def delete_credential(
cred_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
ok = await org.cred_delete.remote(cred_id, token_data.username)
if not ok:
raise HTTPException(404, "凭证不存在或不属于当前用户")
return {"status": "ok"}
# ─── Jobs ───────────────────────────────────────────────────────────────────
@router.post("/jobs")
async def create_job(
payload: JobCreate,
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
try:
return await org.job_create.remote(
user_id=token_data.username,
cred_id=payload.cred_id,
description=payload.description,
)
except ValueError as e:
raise HTTPException(400, str(e))
@router.get("/jobs")
async def list_jobs(
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
rows = await org.job_list.remote(token_data.username)
return {"jobs": rows}
@router.get("/jobs/{job_id}")
async def get_job(
job_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
org = _get_org()
row = await org.job_get.remote(job_id, token_data.username)
if row is None:
raise HTTPException(404, "任务不存在")
return row
@router.get("/jobs/{job_id}/stream")
async def stream_job(
job_id: str,
token_data: TokenData = Depends(Accessor.get_current_user),
):
"""转发 organization 事件流为 SSE。"""
import json
org = _get_org()
row = await org.job_get.remote(job_id, token_data.username)
if row is None:
raise HTTPException(404, "任务不存在")
org_task_id = row.get("org_task_id")
if not org_task_id:
raise HTTPException(409, "任务尚未投递到 organization")
async def _generate():
async for event in await org.stream.remote(org_task_id):
payload = event if isinstance(event, str) else json.dumps(event, ensure_ascii=False)
yield f"data: {payload}\n\n"
return StreamingResponse(_generate(), media_type="text/event-stream")