style: 项目重构
1.项目改名为kilostar(千星) 2.后端部分进行大规模重构 3.node功能进行大规模重新设计
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
from kilostar.core.global_workflow_manager.global_workflow_manager import (
|
||||
GlobalWorkflowManager,
|
||||
)
|
||||
|
||||
__all__ = ["GlobalWorkflowManager"]
|
||||
@@ -0,0 +1,212 @@
|
||||
import ray
|
||||
import asyncio
|
||||
from typing import Dict
|
||||
from kilostar.api.platform.event import kilostarEvent
|
||||
from kilostar.core.workflow_running_engine.workflow import kilostarWorkflow
|
||||
from kilostar.utils.ray_hook import ray_actor_hook
|
||||
from kilostar.utils.logger import get_logger
|
||||
|
||||
|
||||
@ray.remote
|
||||
class GlobalWorkflowManager:
|
||||
def __init__(self):
|
||||
self.event_dict: Dict[str, kilostarEvent] = {}
|
||||
self.event_object_refs: Dict[str, ray.ObjectRef] = {}
|
||||
self.postgres_database = None
|
||||
self.logger = get_logger("GlobalWorkflowManager")
|
||||
|
||||
async def init_manager(self):
|
||||
self.postgres_database = ray_actor_hook("postgres_database").postgres_database
|
||||
|
||||
# Load all events from database to memory
|
||||
try:
|
||||
records = await self.postgres_database.get_all_events.remote()
|
||||
for record in records:
|
||||
try:
|
||||
event = kilostarEvent.model_validate_json(record.event_data_json)
|
||||
event.pending_queue = asyncio.Queue()
|
||||
event.receive_queue = asyncio.Queue()
|
||||
self.event_dict[event.trace_id] = event
|
||||
|
||||
# Store in ray object store for cache
|
||||
event_copy = event.model_copy()
|
||||
event_copy.pending_queue = None
|
||||
event_copy.receive_queue = None
|
||||
self.event_object_refs[event.trace_id] = ray.put(
|
||||
event_copy.model_dump_json()
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to load event {record.trace_id}: {e}")
|
||||
self.logger.info(f"Loaded {len(self.event_dict)} events from database")
|
||||
|
||||
# Trigger resumption of incomplete workflows
|
||||
workflow_running_engine = None
|
||||
for trace_id, event in self.event_dict.items():
|
||||
if event.workflow and event.workflow.status.status in [
|
||||
"waiting_llm_working",
|
||||
"waiting_tool_working",
|
||||
"llm_working",
|
||||
"tool_working",
|
||||
]:
|
||||
self.logger.info(f"Resuming incomplete workflow {trace_id}")
|
||||
if not workflow_running_engine:
|
||||
try:
|
||||
workflow_running_engine = ray_actor_hook(
|
||||
"workflow_running_engine"
|
||||
).workflow_running_engine
|
||||
except AttributeError:
|
||||
self.logger.warning(
|
||||
"workflow_running_engine not found, cannot resume workflow"
|
||||
)
|
||||
break
|
||||
await workflow_running_engine.resume_workflow.remote(event)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to fetch events from database: {e}")
|
||||
|
||||
async def _upsert_event_to_db(self, event: kilostarEvent):
|
||||
try:
|
||||
# Create a copy and remove non-serializable queues
|
||||
event_copy = event.model_copy()
|
||||
event_copy.pending_queue = None
|
||||
event_copy.receive_queue = None
|
||||
|
||||
event_json = event_copy.model_dump_json()
|
||||
# Update cache
|
||||
self.event_object_refs[event.trace_id] = ray.put(event_json)
|
||||
|
||||
await self.postgres_database.upsert_event.remote(event.trace_id, event_json)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Failed to upsert event {event.trace_id} to database: {e}"
|
||||
)
|
||||
|
||||
async def add_event(self, event: kilostarEvent) -> None:
|
||||
event.pending_queue = asyncio.Queue()
|
||||
event.receive_queue = asyncio.Queue()
|
||||
self.event_dict[event.trace_id] = event
|
||||
await self._upsert_event_to_db(event)
|
||||
|
||||
async def delete_event(self, trace_id: str) -> None:
|
||||
if trace_id in self.event_dict:
|
||||
del self.event_dict[trace_id]
|
||||
if trace_id in self.event_object_refs:
|
||||
del self.event_object_refs[trace_id]
|
||||
try:
|
||||
await self.postgres_database.delete_event.remote(trace_id)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to delete event {trace_id} from database: {e}")
|
||||
|
||||
async def get_event(self, trace_id: str) -> kilostarEvent | None:
|
||||
# First check memory dict
|
||||
if trace_id in self.event_dict:
|
||||
return self.event_dict[trace_id]
|
||||
|
||||
# Then check Ray object store cache
|
||||
if trace_id in self.event_object_refs:
|
||||
try:
|
||||
event_json = ray.get(self.event_object_refs[trace_id])
|
||||
return kilostarEvent.model_validate_json(event_json)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
f"Failed to fetch event from cache for trace {trace_id}: {e}"
|
||||
)
|
||||
|
||||
# Fallback to database
|
||||
try:
|
||||
record = await self.postgres_database.get_event.remote(trace_id)
|
||||
if record:
|
||||
event = kilostarEvent.model_validate_json(record.event_data_json)
|
||||
|
||||
# Restore to memory dict with missing transient queues
|
||||
event.pending_queue = asyncio.Queue()
|
||||
event.receive_queue = asyncio.Queue()
|
||||
self.event_dict[trace_id] = event
|
||||
|
||||
# Restore to cache
|
||||
event_copy = event.model_copy()
|
||||
event_copy.pending_queue = None
|
||||
event_copy.receive_queue = None
|
||||
self.event_object_refs[trace_id] = ray.put(event_copy.model_dump_json())
|
||||
|
||||
return event
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Failed to fetch event {trace_id} from database fallback: {e}"
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
async def update_attachment(
|
||||
self, trace_id: str, attachment: Dict[str, str]
|
||||
) -> None:
|
||||
if trace_id in self.event_dict:
|
||||
self.event_dict[trace_id].attachment = attachment
|
||||
await self._upsert_event_to_db(self.event_dict[trace_id])
|
||||
|
||||
async def update_workflow(self, trace_id: str, workflow: kilostarWorkflow) -> None:
|
||||
if trace_id in self.event_dict:
|
||||
self.event_dict[trace_id].workflow = workflow
|
||||
await self._upsert_event_to_db(self.event_dict[trace_id])
|
||||
|
||||
async def get_workflow(self, trace_id: str) -> kilostarWorkflow | None:
|
||||
event = await self.get_event(trace_id)
|
||||
return event.workflow if event else None
|
||||
|
||||
async def list_events(self) -> list[dict]:
|
||||
result = []
|
||||
|
||||
# Read strictly from the database to ensure we get all events,
|
||||
# and ignore the cache to prevent frontend missing items.
|
||||
try:
|
||||
records = await self.postgres_database.get_all_events.remote()
|
||||
for record in records:
|
||||
try:
|
||||
event = kilostarEvent.model_validate_json(record.event_data_json)
|
||||
workflow_title = event.workflow.title if event.workflow else None
|
||||
workflow_status = (
|
||||
event.workflow.status.status
|
||||
if event.workflow and event.workflow.status
|
||||
else None
|
||||
)
|
||||
result.append(
|
||||
{
|
||||
"event_id": event.trace_id,
|
||||
"workflow_title": workflow_title,
|
||||
"status": workflow_status,
|
||||
"user_name": event.user_name,
|
||||
"message": event.message,
|
||||
"create_time": event.create_time,
|
||||
}
|
||||
)
|
||||
# Best-effort cache population
|
||||
self.event_object_refs[event.trace_id] = ray.put(
|
||||
record.event_data_json
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to list_events from DB: {e}")
|
||||
|
||||
return result
|
||||
|
||||
async def put_pending(self, trace_id, item) -> None:
|
||||
if trace_id in self.event_dict and self.event_dict[trace_id].pending_queue:
|
||||
await self.event_dict[trace_id].pending_queue.put(item)
|
||||
|
||||
async def get_pending(self, trace_id) -> str:
|
||||
if trace_id in self.event_dict and self.event_dict[trace_id].pending_queue:
|
||||
return await self.event_dict[trace_id].pending_queue.get()
|
||||
await asyncio.sleep(1) # Prevent CPU spinning if not found
|
||||
return ""
|
||||
|
||||
async def put_received(self, trace_id, item) -> None:
|
||||
if trace_id in self.event_dict and self.event_dict[trace_id].receive_queue:
|
||||
await self.event_dict[trace_id].receive_queue.put(item)
|
||||
|
||||
async def get_received(self, trace_id) -> str:
|
||||
if trace_id in self.event_dict and self.event_dict[trace_id].receive_queue:
|
||||
return await self.event_dict[trace_id].receive_queue.get()
|
||||
await asyncio.sleep(1) # Prevent CPU spinning if not found
|
||||
return ""
|
||||
Reference in New Issue
Block a user