From aa47a19e9858b18b71e9ab65234273ada53d4efc Mon Sep 17 00:00:00 2001 From: zhaoxi Date: Wed, 1 Jul 2026 09:22:26 +0000 Subject: [PATCH] =?UTF-8?q?=E5=AD=98=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 16 +- Dockerfile | 16 +- ...026_06_17_0002-0012_plugin_owned_agents.py | 32 + data/plugin/data_analytics/__init__.py | 1 + data/plugin/data_analytics/agents.json | 16 + data/plugin/data_analytics/api.py | 150 ++ data/plugin/data_analytics/core/__init__.py | 1 + data/plugin/data_analytics/core/db.py | 235 +++ .../data_analytics/core/organization.py | 135 ++ .../frontend/CredentialPanel.tsx | 174 ++ .../data_analytics/frontend/Dashboard.tsx | 157 ++ .../data_analytics/frontend/JobDetail.tsx | 174 ++ .../data_analytics/frontend/NewJobDialog.tsx | 110 ++ .../frontend/build-manifest.mjs | 22 + data/plugin/data_analytics/frontend/client.ts | 30 + data/plugin/data_analytics/frontend/index.tsx | 61 + .../data_analytics/frontend/package-lock.json | 1744 +++++++++++++++++ .../data_analytics/frontend/package.json | 25 + .../plugin/data_analytics/frontend/styles.css | 91 + .../data_analytics/frontend/tsconfig.json | 13 + data/plugin/data_analytics/frontend/types.ts | 25 + .../data_analytics/frontend/vite.config.ts | 25 + data/plugin/data_analytics/manifest.json | 19 + .../plugin/data_analytics/toolset/__init__.py | 16 + .../data_analytics/toolset/_s3_common.py | 43 + .../data_analytics/toolset/manifest.json | 39 + .../data_analytics/toolset/ray_submit.py | 95 + .../data_analytics/toolset/s3_get_object.py | 46 + .../data_analytics/toolset/s3_list_objects.py | 47 + data/plugin/data_analytics/toolset/s3_peek.py | 35 + frontend/src/App.tsx | 23 +- frontend/src/components/Chat/LeftPanel.tsx | 109 +- frontend/src/i18n/locales/en.json | 4 + frontend/src/i18n/locales/zh.json | 4 + frontend/src/plugins/HeavyPluginShell.tsx | 109 ++ frontend/src/store/useAppStore.ts | 30 + frontend/tsconfig.app.json | 8 +- frontend/vite.config.ts | 6 + kilostar/api/__init__.py | 35 + kilostar/api/agent.py | 31 +- kilostar/api/plugin.py | 56 + .../postgres_database/model/individual.py | 3 + .../postgres_database/module/individual.py | 96 +- kilostar/core/postgres_database/postgres.py | 30 + kilostar/plugin_runtime/agents_config.py | 6 +- kilostar/plugin_runtime/base_organization.py | 140 +- kilostar/plugin_runtime/loader.py | 76 +- kilostar/plugin_runtime/plugin_manager.py | 79 +- kilostar/utils/settings.py | 13 + pyproject.toml | 2 + tests/unit/test_individual_database.py | 86 + tests/unit/test_plugin_runtime.py | 207 +- uv.lock | 52 +- 53 files changed, 4721 insertions(+), 77 deletions(-) create mode 100644 alembic/versions/2026_06_17_0002-0012_plugin_owned_agents.py create mode 100644 data/plugin/data_analytics/__init__.py create mode 100644 data/plugin/data_analytics/agents.json create mode 100644 data/plugin/data_analytics/api.py create mode 100644 data/plugin/data_analytics/core/__init__.py create mode 100644 data/plugin/data_analytics/core/db.py create mode 100644 data/plugin/data_analytics/core/organization.py create mode 100644 data/plugin/data_analytics/frontend/CredentialPanel.tsx create mode 100644 data/plugin/data_analytics/frontend/Dashboard.tsx create mode 100644 data/plugin/data_analytics/frontend/JobDetail.tsx create mode 100644 data/plugin/data_analytics/frontend/NewJobDialog.tsx create mode 100644 data/plugin/data_analytics/frontend/build-manifest.mjs create mode 100644 data/plugin/data_analytics/frontend/client.ts create mode 100644 data/plugin/data_analytics/frontend/index.tsx create mode 100644 data/plugin/data_analytics/frontend/package-lock.json create mode 100644 data/plugin/data_analytics/frontend/package.json create mode 100644 data/plugin/data_analytics/frontend/styles.css create mode 100644 data/plugin/data_analytics/frontend/tsconfig.json create mode 100644 data/plugin/data_analytics/frontend/types.ts create mode 100644 data/plugin/data_analytics/frontend/vite.config.ts create mode 100644 data/plugin/data_analytics/manifest.json create mode 100644 data/plugin/data_analytics/toolset/__init__.py create mode 100644 data/plugin/data_analytics/toolset/_s3_common.py create mode 100644 data/plugin/data_analytics/toolset/manifest.json create mode 100644 data/plugin/data_analytics/toolset/ray_submit.py create mode 100644 data/plugin/data_analytics/toolset/s3_get_object.py create mode 100644 data/plugin/data_analytics/toolset/s3_list_objects.py create mode 100644 data/plugin/data_analytics/toolset/s3_peek.py create mode 100644 frontend/src/plugins/HeavyPluginShell.tsx create mode 100644 tests/unit/test_individual_database.py diff --git a/.gitignore b/.gitignore index 60f2a28..00b77b3 100644 --- a/.gitignore +++ b/.gitignore @@ -4,8 +4,22 @@ data/* !data/plugin/ data/plugin/skill/ +# 插件运行时 SQLite / 状态 +data/plugin/*/_data/ + tmp/ .env .idea/ -.venv/ \ No newline at end of file +.venv/ + +# Python +__pycache__/ +*.pyc +*.pyo + +# Node / Frontend +node_modules/ +frontend/dist/ +data/plugin/*/frontend/dist/ +data/plugin/*/frontend/node_modules/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 00ef40f..1928edf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,12 +2,24 @@ FROM node:22-alpine AS frontend-builder WORKDIR /app/frontend -# Install dependencies and build the static assets +# 主前端 COPY frontend/package*.json ./ RUN npm ci COPY frontend/ . RUN npm run build +# 重型插件前端:每个插件独立 vite lib build,输出 dist/plugin-element.js + wc-manifest.json +# 这一段通过 sh 循环:复制源码 → npm install → build。任何一个插件失败都会让镜像构建失败, +# 用 || true 兜底过于宽松——这里选择硬失败,便于第一时间发现 build 问题。 +COPY data/plugin /app/data/plugin +RUN set -e; \ + for d in /app/data/plugin/*/frontend; do \ + if [ -f "$d/package.json" ]; then \ + echo "==> Building plugin frontend: $d"; \ + cd "$d" && npm install && npm run build; \ + fi; \ + done + # Stage 2: Build the Python backend and serve FROM python:3.13-slim WORKDIR /app @@ -36,6 +48,8 @@ COPY . . # Copy the built frontend static assets from Stage 1 COPY --from=frontend-builder /app/frontend/dist /app/frontend/dist +# 重型插件前端 build 产物(让 /plugin-ui// 静态挂载有内容可挂) +COPY --from=frontend-builder /app/data/plugin /app/data/plugin # Expose FastAPI and Ray Dashboard ports EXPOSE 8000 8265 diff --git a/alembic/versions/2026_06_17_0002-0012_plugin_owned_agents.py b/alembic/versions/2026_06_17_0002-0012_plugin_owned_agents.py new file mode 100644 index 0000000..175b2c0 --- /dev/null +++ b/alembic/versions/2026_06_17_0002-0012_plugin_owned_agents.py @@ -0,0 +1,32 @@ +"""add plugin_owned column to base_individual + +Revision ID: 0012 +Revises: 0011 +Create Date: 2026-06-17 +""" + +from alembic import op +import sqlalchemy as sa + + +revision = "0012" +down_revision = "0011" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "base_individual", + sa.Column("plugin_owned", sa.String(64), nullable=True), + ) + op.create_index( + "ix_base_individual_plugin_owned", + "base_individual", + ["plugin_owned"], + ) + + +def downgrade() -> None: + op.drop_index("ix_base_individual_plugin_owned", table_name="base_individual") + op.drop_column("base_individual", "plugin_owned") diff --git a/data/plugin/data_analytics/__init__.py b/data/plugin/data_analytics/__init__.py new file mode 100644 index 0000000..82a80c8 --- /dev/null +++ b/data/plugin/data_analytics/__init__.py @@ -0,0 +1 @@ +"""data_analytics 重型插件包。""" diff --git a/data/plugin/data_analytics/agents.json b/data/plugin/data_analytics/agents.json new file mode 100644 index 0000000..c30bc4d --- /dev/null +++ b/data/plugin/data_analytics/agents.json @@ -0,0 +1,16 @@ +{ + "agents": [ + { + "name": "analyst", + "role": "数据分析师", + "system_prompt": "你是一位严谨、克制的数据分析师。任务进来后:1) 先用 s3_list_objects/s3_peek 看几行了解结构;2) 决定用 python_executor(小数据,单机 pandas)或 ray_submit(大数据,分布式);3) 执行分析、得出明确结论,必要时给出图表链接或样例数据。注意:你只能读取 S3,**不能写入**。如果用户让你上传/删除/修改对象,请明确告知做不到。", + "tools": ["s3_list_objects", "s3_peek", "s3_get_object", "ray_submit", "python_executor"], + "skills": [], + "peers": [] + } + ], + "orchestration": { + "type": "react", + "entry": "analyst" + } +} diff --git a/data/plugin/data_analytics/api.py b/data/plugin/data_analytics/api.py new file mode 100644 index 0000000..473dc37 --- /dev/null +++ b/data/plugin/data_analytics/api.py @@ -0,0 +1,150 @@ +"""data_analytics 插件 API:凭证 CRUD + 分析任务提交/查询/事件流。 + +挂载后路径前缀为 /api/v1/plugin/data_analytics/...,跟核心 API 完全独立。 +所有数据库读写都走 organization actor 的代理方法(确保分布式模式下不跨 actor +共享 SQLAlchemy session)。 +""" + +from __future__ import annotations + +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field + +from kilostar.utils.access import Accessor, TokenData +from kilostar.utils.ray_hook import ray_actor_hook + +router = APIRouter(tags=["data_analytics"]) + + +# ─── Schemas ──────────────────────────────────────────────────────────────── + + +class CredentialCreate(BaseModel): + display_name: str = Field(..., max_length=100) + endpoint_url: Optional[str] = None + region: str = "us-east-1" + access_key: str = Field(..., min_length=1) + secret_key: str = Field(..., min_length=1) + + +class JobCreate(BaseModel): + cred_id: str + description: str = Field(..., min_length=1, max_length=2000) + + +# ─── Helpers ──────────────────────────────────────────────────────────────── + + +def _get_org(): + try: + return ray_actor_hook("org_data_analytics").org_data_analytics + except Exception as e: + raise HTTPException(503, f"data_analytics 插件未就绪:{e}") + + +# ─── Credentials ──────────────────────────────────────────────────────────── + + +@router.get("/credentials") +async def list_credentials( + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + rows = await org.cred_list.remote(token_data.username) + return {"credentials": rows} + + +@router.post("/credentials") +async def create_credential( + payload: CredentialCreate, + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + row = await org.cred_create.remote( + user_id=token_data.username, + display_name=payload.display_name, + access_key=payload.access_key, + secret_key=payload.secret_key, + endpoint_url=payload.endpoint_url, + region=payload.region, + ) + return row + + +@router.delete("/credentials/{cred_id}") +async def delete_credential( + cred_id: str, + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + ok = await org.cred_delete.remote(cred_id, token_data.username) + if not ok: + raise HTTPException(404, "凭证不存在或不属于当前用户") + return {"status": "ok"} + + +# ─── Jobs ─────────────────────────────────────────────────────────────────── + + +@router.post("/jobs") +async def create_job( + payload: JobCreate, + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + try: + return await org.job_create.remote( + user_id=token_data.username, + cred_id=payload.cred_id, + description=payload.description, + ) + except ValueError as e: + raise HTTPException(400, str(e)) + + +@router.get("/jobs") +async def list_jobs( + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + rows = await org.job_list.remote(token_data.username) + return {"jobs": rows} + + +@router.get("/jobs/{job_id}") +async def get_job( + job_id: str, + token_data: TokenData = Depends(Accessor.get_current_user), +): + org = _get_org() + row = await org.job_get.remote(job_id, token_data.username) + if row is None: + raise HTTPException(404, "任务不存在") + return row + + +@router.get("/jobs/{job_id}/stream") +async def stream_job( + job_id: str, + token_data: TokenData = Depends(Accessor.get_current_user), +): + """转发 organization 事件流为 SSE。""" + import json + + org = _get_org() + row = await org.job_get.remote(job_id, token_data.username) + if row is None: + raise HTTPException(404, "任务不存在") + org_task_id = row.get("org_task_id") + if not org_task_id: + raise HTTPException(409, "任务尚未投递到 organization") + + async def _generate(): + async for event in await org.stream.remote(org_task_id): + payload = event if isinstance(event, str) else json.dumps(event, ensure_ascii=False) + yield f"data: {payload}\n\n" + + return StreamingResponse(_generate(), media_type="text/event-stream") diff --git a/data/plugin/data_analytics/core/__init__.py b/data/plugin/data_analytics/core/__init__.py new file mode 100644 index 0000000..748a239 --- /dev/null +++ b/data/plugin/data_analytics/core/__init__.py @@ -0,0 +1 @@ +"""data_analytics organization 实现。""" diff --git a/data/plugin/data_analytics/core/db.py b/data/plugin/data_analytics/core/db.py new file mode 100644 index 0000000..514d031 --- /dev/null +++ b/data/plugin/data_analytics/core/db.py @@ -0,0 +1,235 @@ +"""data_analytics 插件本地 SQLite 表与 DAO。 + +注意:本插件用的 ``DeclarativeBase`` 跟核心 PG 完全独立,避免元数据空间串场。 +所有数据落到 ``data/plugin/data_analytics/_data/data_analytics.db``。 +""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional + +from sqlalchemy import DateTime, String, Text, select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + +from kilostar.utils.crypto import decrypt_dict_secrets, encrypt_dict_secrets + + +class Base(DeclarativeBase): + """data_analytics 插件私有的元数据空间,跟核心 PG 隔离。""" + + pass + + +class S3Credential(Base): + __tablename__ = "s3_credential" + + cred_id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[str] = mapped_column(String(64), index=True, nullable=False) + display_name: Mapped[str] = mapped_column(String(100), nullable=False) + endpoint_url: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) + region: Mapped[str] = mapped_column(String(50), default="us-east-1") + access_key: Mapped[str] = mapped_column(String(255), nullable=False) + secret_key: Mapped[str] = mapped_column(String(255), nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + +class AnalysisJob(Base): + __tablename__ = "analysis_job" + + job_id: Mapped[str] = mapped_column(String(64), primary_key=True) + user_id: Mapped[str] = mapped_column(String(64), index=True, nullable=False) + cred_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + description: Mapped[str] = mapped_column(Text, nullable=False) + status: Mapped[str] = mapped_column(String(20), default="pending", index=True) + org_task_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + result: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, nullable=False, index=True + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False + ) + + +class CredentialDAO: + """S3 凭证 DAO:写入时自动加密,读取时自动解密。""" + + SENSITIVE_KEYS = ("access_key", "secret_key") + + def __init__(self, sm: async_sessionmaker[AsyncSession]): + self._sm = sm + + @staticmethod + def _row_to_dict(row: S3Credential, *, include_secrets: bool) -> dict: + d = { + "cred_id": row.cred_id, + "user_id": row.user_id, + "display_name": row.display_name, + "endpoint_url": row.endpoint_url, + "region": row.region, + "access_key": row.access_key, + "secret_key": row.secret_key, + "created_at": row.created_at.isoformat() if row.created_at else None, + "updated_at": row.updated_at.isoformat() if row.updated_at else None, + } + if not include_secrets: + ak = decrypt_dict_secrets({"access_key": d["access_key"]}).get("access_key", "") + d["access_key"] = (ak[:4] + "***" + ak[-2:]) if len(ak) > 6 else "***" + d.pop("secret_key", None) + return d + # include_secrets=True 用于工具内部,返回明文给 boto3 + return decrypt_dict_secrets(d) + + async def list_by_user(self, user_id: str) -> List[dict]: + async with self._sm() as s: + stmt = select(S3Credential).where(S3Credential.user_id == user_id) + rows = (await s.execute(stmt)).scalars().all() + return [self._row_to_dict(r, include_secrets=False) for r in rows] + + async def get(self, cred_id: str, *, include_secrets: bool = False) -> Optional[dict]: + async with self._sm() as s: + stmt = select(S3Credential).where(S3Credential.cred_id == cred_id) + row = (await s.execute(stmt)).scalar_one_or_none() + if row is None: + return None + return self._row_to_dict(row, include_secrets=include_secrets) + + async def upsert( + self, + cred_id: str, + user_id: str, + display_name: str, + access_key: str, + secret_key: str, + endpoint_url: Optional[str] = None, + region: str = "us-east-1", + ) -> dict: + encrypted = encrypt_dict_secrets( + {"access_key": access_key, "secret_key": secret_key} + ) + async with self._sm() as s: + stmt = select(S3Credential).where(S3Credential.cred_id == cred_id) + existing = (await s.execute(stmt)).scalar_one_or_none() + if existing is not None: + existing.display_name = display_name + existing.endpoint_url = endpoint_url + existing.region = region + existing.access_key = encrypted["access_key"] + existing.secret_key = encrypted["secret_key"] + s.add(existing) + await s.commit() + await s.refresh(existing) + return self._row_to_dict(existing, include_secrets=False) + row = S3Credential( + cred_id=cred_id, + user_id=user_id, + display_name=display_name, + endpoint_url=endpoint_url, + region=region, + access_key=encrypted["access_key"], + secret_key=encrypted["secret_key"], + ) + s.add(row) + await s.commit() + await s.refresh(row) + return self._row_to_dict(row, include_secrets=False) + + async def delete(self, cred_id: str, user_id: str) -> bool: + async with self._sm() as s: + stmt = select(S3Credential).where( + S3Credential.cred_id == cred_id, S3Credential.user_id == user_id + ) + row = (await s.execute(stmt)).scalar_one_or_none() + if row is None: + return False + await s.delete(row) + await s.commit() + return True + + +class JobDAO: + """分析任务记录 DAO。""" + + def __init__(self, sm: async_sessionmaker[AsyncSession]): + self._sm = sm + + @staticmethod + def _row_to_dict(row: AnalysisJob) -> dict: + return { + "job_id": row.job_id, + "user_id": row.user_id, + "cred_id": row.cred_id, + "description": row.description, + "status": row.status, + "org_task_id": row.org_task_id, + "result": row.result, + "created_at": row.created_at.isoformat() if row.created_at else None, + "updated_at": row.updated_at.isoformat() if row.updated_at else None, + } + + async def create( + self, + job_id: str, + user_id: str, + description: str, + cred_id: Optional[str] = None, + ) -> dict: + async with self._sm() as s: + row = AnalysisJob( + job_id=job_id, + user_id=user_id, + description=description, + cred_id=cred_id, + ) + s.add(row) + await s.commit() + await s.refresh(row) + return self._row_to_dict(row) + + async def update( + self, + job_id: str, + *, + status: Optional[str] = None, + result: Optional[str] = None, + org_task_id: Optional[str] = None, + ) -> Optional[dict]: + async with self._sm() as s: + stmt = select(AnalysisJob).where(AnalysisJob.job_id == job_id) + row = (await s.execute(stmt)).scalar_one_or_none() + if row is None: + return None + if status is not None: + row.status = status + if result is not None: + row.result = result + if org_task_id is not None: + row.org_task_id = org_task_id + s.add(row) + await s.commit() + await s.refresh(row) + return self._row_to_dict(row) + + async def list_by_user(self, user_id: str, limit: int = 50) -> List[dict]: + async with self._sm() as s: + stmt = ( + select(AnalysisJob) + .where(AnalysisJob.user_id == user_id) + .order_by(AnalysisJob.created_at.desc()) + .limit(limit) + ) + rows = (await s.execute(stmt)).scalars().all() + return [self._row_to_dict(r) for r in rows] + + async def get(self, job_id: str) -> Optional[dict]: + async with self._sm() as s: + stmt = select(AnalysisJob).where(AnalysisJob.job_id == job_id) + row = (await s.execute(stmt)).scalar_one_or_none() + return self._row_to_dict(row) if row else None diff --git a/data/plugin/data_analytics/core/organization.py b/data/plugin/data_analytics/core/organization.py new file mode 100644 index 0000000..724b985 --- /dev/null +++ b/data/plugin/data_analytics/core/organization.py @@ -0,0 +1,135 @@ +"""data_analytics organization:管理本插件的 SQLite 元数据 + 注入凭证 ctx。 + +凭证经由 ``S3_CREDS_VAR`` ContextVar 传给工具,避免污染 agent tool signature +(agent 看到的工具不带 cred 参数,模型不会误传)。 + +API 层通过本类暴露的 ``cred_*`` / ``job_*`` 代理方法跨 actor 调 DAO, +保证分布式模式下 actor 之间不直接共享 SQLAlchemy session。 +""" + +from __future__ import annotations + +import contextvars +import uuid +from typing import Any, Callable, Dict, List, Optional + +from kilostar.plugin_runtime.base_organization import BaseOrganization +from kilostar.plugin_runtime.event import OrgEvent + +from .db import Base, CredentialDAO, JobDAO + +# 当前任务的 S3 凭证(明文):工具内部读 .get() 拿 +S3_CREDS_VAR: contextvars.ContextVar[Optional[Dict[str, Any]]] = contextvars.ContextVar( + "data_analytics_s3_creds", default=None +) + + +class DataAnalyticsOrganization(BaseOrganization): + """对接 S3 的数据分析组织。""" + + async def setup(self) -> None: + await super().setup() + await self.init_local_db([Base]) + # 跨工具/跨 API 共享的 DAO 实例 + self.cred_dao = CredentialDAO(self._session_maker) + self.job_dao = JobDAO(self._session_maker) + + async def on_first_install(self) -> None: + self.logger.info( + "data_analytics installed; configure S3 credentials in dashboard." + ) + + async def react( + self, + task_description: str, + ctx: Dict[str, Any], + emit: Callable[[OrgEvent], Any], + ) -> Any: + cred_id = ctx.get("cred_id") + if cred_id and getattr(self, "cred_dao", None) is not None: + cred = await self.cred_dao.get(cred_id, include_secrets=True) + if cred is None: + raise RuntimeError(f"S3 凭证 {cred_id} 不存在") + S3_CREDS_VAR.set(cred) + ctx["s3_cred_display"] = cred.get("display_name") + else: + S3_CREDS_VAR.set(None) + return await super().react(task_description, ctx, emit) + + # ─── 凭证代理(API 层调用) ───────────────────────────────────── + + async def cred_list(self, user_id: str) -> List[dict]: + return await self.cred_dao.list_by_user(user_id) + + async def cred_create( + self, + user_id: str, + display_name: str, + access_key: str, + secret_key: str, + endpoint_url: Optional[str] = None, + region: str = "us-east-1", + ) -> dict: + cred_id = uuid.uuid4().hex + return await self.cred_dao.upsert( + cred_id=cred_id, + user_id=user_id, + display_name=display_name, + access_key=access_key, + secret_key=secret_key, + endpoint_url=endpoint_url, + region=region, + ) + + async def cred_delete(self, cred_id: str, user_id: str) -> bool: + return await self.cred_dao.delete(cred_id, user_id) + + # ─── 任务代理 ────────────────────────────────────────────────── + + async def job_create( + self, user_id: str, cred_id: str, description: str + ) -> dict: + # 校验凭证归属 + cred = await self.cred_dao.get(cred_id, include_secrets=False) + if cred is None or cred.get("user_id") != user_id: + raise ValueError("凭证不存在或不属于当前用户") + + job_id = uuid.uuid4().hex + await self.job_dao.create( + job_id=job_id, + user_id=user_id, + description=description, + cred_id=cred_id, + ) + # 投递 organization 任务(拿 task_id 回填,便于前端拉事件流) + task_id = await self.submit( + description, {"user_id": user_id, "cred_id": cred_id, "job_id": job_id} + ) + await self.job_dao.update(job_id, status="running", org_task_id=task_id) + return {"job_id": job_id, "task_id": task_id, "status": "running"} + + async def job_list(self, user_id: str) -> List[dict]: + return await self.job_dao.list_by_user(user_id) + + async def job_get(self, job_id: str, user_id: str) -> Optional[dict]: + row = await self.job_dao.get(job_id) + if row is None or row.get("user_id") != user_id: + return None + # 附带最新 organization 状态 + org_task_id = row.get("org_task_id") + if org_task_id: + ts = await self.status(org_task_id) + if ts is not None: + row["task_status"] = ts.get("status") + row["task_result"] = ts.get("result") + row["task_error"] = ts.get("error") + # 任务终态时把结果回写 SQLite,方便重启后查询 + if ts.get("status") in ("completed", "failed") and row.get("status") != ts.get("status"): + result_payload = ts.get("result") if ts.get("status") == "completed" else ts.get("error") + await self.job_dao.update( + job_id, + status=ts.get("status"), + result=str(result_payload) if result_payload is not None else None, + ) + row["status"] = ts.get("status") + return row diff --git a/data/plugin/data_analytics/frontend/CredentialPanel.tsx b/data/plugin/data_analytics/frontend/CredentialPanel.tsx new file mode 100644 index 0000000..c43110e --- /dev/null +++ b/data/plugin/data_analytics/frontend/CredentialPanel.tsx @@ -0,0 +1,174 @@ +import { useState } from 'react'; +import { Plus, Trash2, Loader2, Key, Eye, EyeOff } from 'lucide-react'; +import { usePluginContext } from './client'; +import type { S3Credential } from './types'; + +const API_BASE = '/api/v1/plugin/data_analytics'; + +interface Props { + credentials: S3Credential[]; + loading: boolean; + onChanged: () => void; +} + +export function CredentialPanel({ credentials, loading, onChanged }: Props) { + const { client } = usePluginContext(); + const [showForm, setShowForm] = useState(false); + const [showSecret, setShowSecret] = useState(false); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(''); + const [form, setForm] = useState({ + display_name: '', + endpoint_url: '', + region: 'us-east-1', + access_key: '', + secret_key: '', + }); + + const reset = () => { + setForm({ display_name: '', endpoint_url: '', region: 'us-east-1', access_key: '', secret_key: '' }); + setError(''); + setShowSecret(false); + }; + + const submit = async () => { + if (!form.display_name.trim() || !form.access_key.trim() || !form.secret_key.trim()) { + setError('显示名 / Access Key / Secret Key 必填'); + return; + } + setBusy(true); + setError(''); + try { + await client.post(`${API_BASE}/credentials`, { + display_name: form.display_name.trim(), + endpoint_url: form.endpoint_url.trim() || null, + region: form.region.trim() || 'us-east-1', + access_key: form.access_key, + secret_key: form.secret_key, + }); + reset(); + setShowForm(false); + onChanged(); + } catch (e: unknown) { + const msg = (e as { response?: { data?: { detail?: string } } }).response?.data?.detail; + setError(msg || '保存失败'); + } finally { + setBusy(false); + } + }; + + const remove = async (cred_id: string) => { + if (!confirm('确定删除该凭证?删除后该凭证下的任务将无法继续运行。')) return; + try { + await client.delete(`${API_BASE}/credentials/${cred_id}`); + onChanged(); + } catch (e) { + console.error(e); + } + }; + + return ( +
+
+
+

+ S3 凭证 +

+

访问密钥加密存储于本地 SQLite。

+
+ +
+ + {showForm && ( +
+ setForm({ ...form, display_name: e.target.value })} + /> + setForm({ ...form, endpoint_url: e.target.value })} + /> + setForm({ ...form, region: e.target.value })} + /> + setForm({ ...form, access_key: e.target.value })} + /> +
+ setForm({ ...form, secret_key: e.target.value })} + /> + +
+ {error &&
{error}
} + +
+ )} + + {loading ? ( +
+ +
+ ) : credentials.length === 0 ? ( +
+ 还没有凭证,点右上角「新增」开始。 +
+ ) : ( +
+ {credentials.map((c) => ( +
+
+
{c.display_name}
+
+ {c.endpoint_url || 'aws-s3'} · {c.region} · {c.access_key} +
+
+ +
+ ))} +
+ )} +
+ ); +} diff --git a/data/plugin/data_analytics/frontend/Dashboard.tsx b/data/plugin/data_analytics/frontend/Dashboard.tsx new file mode 100644 index 0000000..6f80ed1 --- /dev/null +++ b/data/plugin/data_analytics/frontend/Dashboard.tsx @@ -0,0 +1,157 @@ +import { useCallback, useEffect, useState } from 'react'; +import { BarChart3, Plus, Loader2, ListChecks } from 'lucide-react'; +import { usePluginContext } from './client'; +import { CredentialPanel } from './CredentialPanel'; +import { NewJobDialog } from './NewJobDialog'; +import { JobDetail } from './JobDetail'; +import type { S3Credential, AnalysisJob } from './types'; + +const API_BASE = '/api/v1/plugin/data_analytics'; + +interface Props { + pluginName: string; +} + +export function Dashboard({ pluginName }: Props) { + const { client } = usePluginContext(); + const [credentials, setCredentials] = useState([]); + const [credLoading, setCredLoading] = useState(true); + const [jobs, setJobs] = useState([]); + const [jobLoading, setJobLoading] = useState(true); + const [showNewJob, setShowNewJob] = useState(false); + const [openJobId, setOpenJobId] = useState(null); + const [error, setError] = useState(''); + + const loadCredentials = useCallback(async () => { + setCredLoading(true); + try { + const resp = await client.get<{ credentials: S3Credential[] }>(`${API_BASE}/credentials`); + setCredentials(resp.data.credentials || []); + } catch (e: unknown) { + const msg = (e as { response?: { data?: { detail?: string } } }).response?.data?.detail; + setError(msg || '加载凭证失败'); + } finally { + setCredLoading(false); + } + }, [client]); + + const loadJobs = useCallback(async () => { + setJobLoading(true); + try { + const resp = await client.get<{ jobs: AnalysisJob[] }>(`${API_BASE}/jobs`); + setJobs(resp.data.jobs || []); + } catch (e: unknown) { + const msg = (e as { response?: { data?: { detail?: string } } }).response?.data?.detail; + setError(msg || '加载任务失败'); + } finally { + setJobLoading(false); + } + }, [client]); + + useEffect(() => { + loadCredentials(); + loadJobs(); + }, [loadCredentials, loadJobs]); + + // 轮询任务列表,方便看状态变化 + useEffect(() => { + const t = setInterval(loadJobs, 5000); + return () => clearInterval(t); + }, [loadJobs]); + + return ( +
+
+
+
+ +
+
+

数据分析

+

+ 对接 S3,让 agent 自主决定分析路径(python_executor / ray_submit)。{' '} + {pluginName} +

+
+
+ + {error && ( +
{error}
+ )} + + + +
+
+
+

+ 分析任务 +

+

点行查看详情和事件流。每 5 秒自动刷新。

+
+ +
+ + {jobLoading && jobs.length === 0 ? ( +
+ +
+ ) : jobs.length === 0 ? ( +
+ 还没有分析任务。点右上角「新建任务」开始。 +
+ ) : ( +
+ {jobs.map((j) => ( + + ))} +
+ )} +
+
+ + {showNewJob && ( + setShowNewJob(false)} + onCreated={loadJobs} + /> + )} + {openJobId && setOpenJobId(null)} />} +
+ ); +} + +function StatusBadge({ status }: { status: string }) { + const map: Record = { + pending: 'bg-bg-base text-text-muted border-border-primary', + running: 'bg-warning-bg text-warning border-warning/20', + completed: 'bg-success-bg text-success border-success/20', + failed: 'bg-danger-bg text-danger border-danger/20', + }; + const cls = map[status] || map.pending; + return ( + + {status} + + ); +} diff --git a/data/plugin/data_analytics/frontend/JobDetail.tsx b/data/plugin/data_analytics/frontend/JobDetail.tsx new file mode 100644 index 0000000..481efcf --- /dev/null +++ b/data/plugin/data_analytics/frontend/JobDetail.tsx @@ -0,0 +1,174 @@ +import { useEffect, useRef, useState } from 'react'; +import { Loader2, X, Activity } from 'lucide-react'; +import { usePluginContext } from './client'; +import type { AnalysisJob } from './types'; + +const API_BASE = '/api/v1/plugin/data_analytics'; + +interface Props { + jobId: string; + onClose: () => void; +} + +interface StreamEvent { + type?: string; + ts?: number; + payload?: unknown; + raw?: string; +} + +export function JobDetail({ jobId, onClose }: Props) { + const { client, token, apiBase } = usePluginContext(); + const [job, setJob] = useState(null); + const [events, setEvents] = useState([]); + const [loading, setLoading] = useState(true); + const eventBoxRef = useRef(null); + + // 初次加载 + 后台轮询 + useEffect(() => { + let cancelled = false; + const fetchJob = async () => { + try { + const resp = await client.get(`${API_BASE}/jobs/${jobId}`); + if (!cancelled) setJob(resp.data); + } catch (e) { + console.error('fetch job failed', e); + } finally { + if (!cancelled) setLoading(false); + } + }; + fetchJob(); + const t = setInterval(fetchJob, 4000); + return () => { + cancelled = true; + clearInterval(t); + }; + }, [client, jobId]); + + // SSE 事件流(用 fetch + ReadableStream,因为 EventSource 不支持自定义 header) + useEffect(() => { + const controller = new AbortController(); + const run = async () => { + try { + const url = `${apiBase || ''}${API_BASE}/jobs/${jobId}/stream`; + const resp = await fetch(url, { + headers: { Authorization: `Bearer ${token}` }, + signal: controller.signal, + }); + if (!resp.body) return; + const reader = resp.body.getReader(); + const decoder = new TextDecoder('utf-8'); + let buf = ''; + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + const parts = buf.split('\n\n'); + buf = parts.pop() || ''; + for (const part of parts) { + const line = part.split('\n').find((l) => l.startsWith('data:')); + if (!line) continue; + const payload = line.slice(5).trim(); + try { + setEvents((prev) => [...prev, JSON.parse(payload)]); + } catch { + setEvents((prev) => [...prev, { raw: payload }]); + } + } + } + } catch (e) { + if ((e as Error).name !== 'AbortError') console.error('SSE error', e); + } + }; + run(); + return () => controller.abort(); + }, [apiBase, jobId, token]); + + // 自动滚动到底部 + useEffect(() => { + if (eventBoxRef.current) { + eventBoxRef.current.scrollTop = eventBoxRef.current.scrollHeight; + } + }, [events]); + + return ( +
+
+
+
+

+ + 任务详情 +

+ {jobId} +
+ +
+ +
+ {loading ? ( +
+ +
+ ) : job ? ( + <> +
+ + + {job.task_error && } +
+ + {job.task_result !== undefined && job.task_result !== null && ( +
+
执行结果
+
+                    {typeof job.task_result === 'string'
+                      ? job.task_result
+                      : JSON.stringify(job.task_result, null, 2)}
+                  
+
+ )} + +
+
事件流(SSE)
+
+ {events.length === 0 ? ( + (等待事件…) + ) : ( + events.map((e, i) => ( +
+ {e.type || 'event'}{' '} + {e.payload !== undefined ? ( + {JSON.stringify(e.payload)} + ) : e.raw ? ( + {e.raw} + ) : null} +
+ )) + )} +
+
+ + ) : ( +
任务不存在或已被删除
+ )} +
+
+
+ ); +} + +function Field({ label, value, danger }: { label: string; value: string; danger?: boolean }) { + return ( +
+
{label}
+
{value}
+
+ ); +} diff --git a/data/plugin/data_analytics/frontend/NewJobDialog.tsx b/data/plugin/data_analytics/frontend/NewJobDialog.tsx new file mode 100644 index 0000000..c11d369 --- /dev/null +++ b/data/plugin/data_analytics/frontend/NewJobDialog.tsx @@ -0,0 +1,110 @@ +import { useState } from 'react'; +import { Loader2, Send, X } from 'lucide-react'; +import { usePluginContext } from './client'; +import type { S3Credential } from './types'; + +const API_BASE = '/api/v1/plugin/data_analytics'; + +interface Props { + credentials: S3Credential[]; + onClose: () => void; + onCreated: () => void; +} + +export function NewJobDialog({ credentials, onClose, onCreated }: Props) { + const { client } = usePluginContext(); + const [credId, setCredId] = useState(credentials[0]?.cred_id || ''); + const [description, setDescription] = useState(''); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(''); + + const submit = async () => { + if (!credId) { + setError('请选择 S3 凭证'); + return; + } + if (!description.trim()) { + setError('请描述要做的分析'); + return; + } + setBusy(true); + setError(''); + try { + await client.post(`${API_BASE}/jobs`, { + cred_id: credId, + description: description.trim(), + }); + onCreated(); + onClose(); + } catch (e: unknown) { + const msg = (e as { response?: { data?: { detail?: string } } }).response?.data?.detail; + setError(msg || '提交失败'); + } finally { + setBusy(false); + } + }; + + return ( +
+
+
+

新建分析任务

+ +
+
+
+ + {credentials.length === 0 ? ( +
+ 请先在上方添加 S3 凭证。 +
+ ) : ( + + )} +
+
+ +