- {/* Monitoring Inner Sidebar */}
-
-
-
Monitoring
-
-
-
- {/* Future monitoring tabs (e.g., Application Logs, Agent Metrics) can go here */}
-
-
-
- {/* Monitoring Main Content */}
-
- {activeTab === 'cluster' && }
-
-
- );
-}
diff --git a/main.py b/main.py
index 5adfa9d..2bf97a0 100644
--- a/main.py
+++ b/main.py
@@ -35,7 +35,7 @@ async def start_system():
await postgres_database.init_db.remote()
# 3. 启动全局状态机
- global_state_machine = GlobalStateMachine.options(name='global_state_machine').remote(postgres_database)
+ GlobalStateMachine.options(name='global_state_machine').remote(postgres_database)
# 4. 启动核心节点
supervisory_node = SupervisoryNode.options(name='supervisory_node').remote()
diff --git a/pretor/api/cluster.py b/pretor/api/cluster.py
index 178e5c4..9165c47 100644
--- a/pretor/api/cluster.py
+++ b/pretor/api/cluster.py
@@ -12,34 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from fastapi import APIRouter, WebSocket, WebSocketDisconnect
-import ray
-import asyncio
+from fastapi import APIRouter
cluster_router = APIRouter(prefix="/api/v1/cluster", tags=["cluster"])
-@cluster_router.websocket("/ws/state")
-async def update_cluster_state(websocket: WebSocket):
- await websocket.accept()
- try:
- while True:
- nodes = ray.nodes()
- payload = [
- {
- "node_id": node.get("NodeID"),
- "node_name": node.get("NodeName"),
- "alive": node.get("Alive"),
- "resources": node.get("Resources", {}),
- "remaining": node.get("RemainingResources", {})
- }
- for node in nodes
- ]
- await websocket.send_json(payload)
- await asyncio.sleep(10)
- except WebSocketDisconnect:
- pass
- except RuntimeError as e:
- if "closed" not in str(e) and "GeneratorExit" not in str(e):
- raise
- except Exception:
- pass
\ No newline at end of file
+# Monitor websocket API temporarily removed
\ No newline at end of file
diff --git a/pretor/api/platform/frontend.py b/pretor/api/platform/frontend.py
index 45d1e00..21a2be2 100644
--- a/pretor/api/platform/frontend.py
+++ b/pretor/api/platform/frontend.py
@@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from fastapi import APIRouter, Depends, HTTPException, status
+from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from pydantic import BaseModel
from pretor.utils.access import Accessor, TokenData
from pretor.api.platform.event import PretorEvent
from pretor.utils.ray_hook import ray_actor_hook
+import os
+import shutil
from pretor.utils.logger import get_logger
logger = get_logger('frontend')
@@ -45,3 +47,17 @@ async def create_message(message: Message,
else:
return {"message": message}
+@client_router.post("/upload")
+async def upload_file(file: UploadFile = File(...),
+ token_data: TokenData = Depends(Accessor.get_current_user)):
+ try:
+ upload_dir = "uploads"
+ os.makedirs(upload_dir, exist_ok=True)
+ file_path = os.path.join(upload_dir, file.filename)
+ with open(file_path, "wb") as buffer:
+ shutil.copyfileobj(file.file, buffer)
+ logger.info(f"用户 {token_data.username} 上传了文件: {file.filename}")
+ return {"filename": file.filename, "message": f"File {file.filename} uploaded successfully"}
+ except Exception as e:
+ logger.error(f"文件上传失败: {e}")
+ raise HTTPException(status_code=500, detail="文件上传失败")
diff --git a/pretor/api/workflow.py b/pretor/api/workflow.py
index c20d975..0eb6ef4 100644
--- a/pretor/api/workflow.py
+++ b/pretor/api/workflow.py
@@ -14,26 +14,37 @@
from pretor.utils.ray_hook import ray_actor_hook
-from fastapi import APIRouter, WebSocket, WebSocketDisconnect
-
+from fastapi import APIRouter, Request
+from fastapi.responses import StreamingResponse
+import asyncio
workflow_router = APIRouter(prefix="/api/v1/workflow", tags=["workflow"])
-@workflow_router.websocket("/ws/{trace_id}")
-async def get_workflow(websocket: WebSocket, trace_id: str):
- await websocket.accept()
- global_state_machine = ray_actor_hook("global_state_machine")
- try:
- while True:
- await websocket.send(await global_state_machine.get_workflow.remote(trace_id))
- await websocket.send_text(await global_state_machine.get_pending.remote(trace_id))
- response = await websocket.receive_text()
- await global_state_machine.put_received(trace_id, response)
- except WebSocketDisconnect:
- pass
- except RuntimeError as e:
- if "closed" not in str(e) and "GeneratorExit" not in str(e):
- raise
- except Exception:
- pass
+@workflow_router.get("/sse/{trace_id}")
+async def get_workflow_sse(trace_id: str, request: Request):
+ global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
+
+ async def event_generator():
+ try:
+ while True:
+ if await request.is_disconnected():
+ break
+
+ # You might also want to send the workflow state periodically or when updated
+ # Here we just wait for pending messages and send them
+ message = await global_state_machine.get_pending.remote(trace_id)
+ # Ensure the message is formatted as SSE
+ yield f"data: {message}\n\n"
+ except asyncio.CancelledError:
+ pass
+
+ return StreamingResponse(event_generator(), media_type="text/event-stream")
+
+@workflow_router.post("/reply/{trace_id}")
+async def post_workflow_reply(trace_id: str, request: Request):
+ data = await request.json()
+ reply_msg = data.get("message", "")
+ global_state_machine = ray_actor_hook("global_state_machine").global_state_machine
+ await global_state_machine.put_received.remote(trace_id, reply_msg)
+ return {"status": "ok"}
diff --git a/tests/core/database/table/table_provider_test.py b/tests/core/database/table/table_provider_test.py
index 73441ed..a31147a 100644
--- a/tests/core/database/table/table_provider_test.py
+++ b/tests/core/database/table/table_provider_test.py
@@ -1,4 +1,3 @@
-import pytest
from pretor.core.database.table.provider import Provider
def test_provider_table():
diff --git a/tests/core/database/table/table_user_test.py b/tests/core/database/table/table_user_test.py
index 50ea1c3..8c6ab7f 100644
--- a/tests/core/database/table/table_user_test.py
+++ b/tests/core/database/table/table_user_test.py
@@ -1,4 +1,3 @@
-import pytest
from pretor.core.database.table.user import User
def test_user_table():
diff --git a/tests/core/global_state_machine/model_provider/base_provider_test.py b/tests/core/global_state_machine/model_provider/base_provider_test.py
index dd48484..46b7bcd 100644
--- a/tests/core/global_state_machine/model_provider/base_provider_test.py
+++ b/tests/core/global_state_machine/model_provider/base_provider_test.py
@@ -1,4 +1,3 @@
-import pytest
from pretor.core.global_state_machine.model_provider.base_provider import Provider, ProviderArgs, ProviderStatus
def test_provider_status():
diff --git a/tests/core/global_state_machine/provider_manager_test.py b/tests/core/global_state_machine/provider_manager_test.py
index b2f9d8a..27c3fa3 100644
--- a/tests/core/global_state_machine/provider_manager_test.py
+++ b/tests/core/global_state_machine/provider_manager_test.py
@@ -5,7 +5,6 @@ from pretor.core.global_state_machine.provider_manager import ProviderManager
@pytest.mark.asyncio
async def test_provider_manager_init():
- from pretor.core.global_state_machine.provider_manager import ProviderManager
mock_postgres = MagicMock()
mock_provider1 = MagicMock()
mock_provider1.provider_title = "title1"
diff --git a/tests/core/workflow/workflow_template_generator/workflow_template_generator_test.py b/tests/core/workflow/workflow_template_generator/workflow_template_generator_test.py
index d2bd2cf..3ad77ae 100644
--- a/tests/core/workflow/workflow_template_generator/workflow_template_generator_test.py
+++ b/tests/core/workflow/workflow_template_generator/workflow_template_generator_test.py
@@ -1,4 +1,3 @@
-import pytest
from unittest.mock import patch, MagicMock
from pretor.core.workflow.workflow_template_generator.workflow_template_generator import WorkflowTemplateGenerator
diff --git a/tests/core/workflow/workflow_template_manager_test.py b/tests/core/workflow/workflow_template_manager_test.py
index bb8be61..213ec81 100644
--- a/tests/core/workflow/workflow_template_manager_test.py
+++ b/tests/core/workflow/workflow_template_manager_test.py
@@ -1,4 +1,3 @@
-import pytest
import json
from unittest.mock import MagicMock, patch, mock_open
from pathlib import Path
diff --git a/tests/utils/access_test.py b/tests/utils/access_test.py
index a776963..a7260c8 100644
--- a/tests/utils/access_test.py
+++ b/tests/utils/access_test.py
@@ -83,7 +83,6 @@ def test_decode_token_validation_error():
token = "valid.jwt.invalid.payload"
payload = {"wrong": "payload"}
- import pydantic
from fastapi import HTTPException
with patch("jwt.decode", return_value=payload):