feat: workflow和chat分离

1,增加了创建workflow的页面
2.删除了event
This commit is contained in:
2026-05-14 15:51:28 +00:00
parent c0e4fd34ae
commit 78bd6adc48
30 changed files with 1196 additions and 760 deletions
@@ -1,212 +1,48 @@
import ray
import asyncio
from typing import Dict
from kilostar.api.platform.event import kilostarEvent
from kilostar.core.work.workflow.workflow import KiloStarWorkflow
from kilostar.utils.ray_hook import ray_actor_hook
from kilostar.utils.logger import get_logger
class TraceQueues:
def __init__(self):
self.pending: asyncio.Queue[str] = asyncio.Queue()
self.receive: asyncio.Queue[str] = asyncio.Queue()
@ray.remote
class GlobalWorkflowManager:
def __init__(self):
self.event_dict: Dict[str, kilostarEvent] = {}
self.event_object_refs: Dict[str, ray.ObjectRef] = {}
self.postgres_database = None
self._traces: Dict[str, TraceQueues] = {}
self.logger = get_logger("GlobalWorkflowManager")
async def init_manager(self):
self.postgres_database = ray_actor_hook("postgres_database").postgres_database
self.logger.info("GlobalWorkflowManager initialized")
# Load all events from database to memory
try:
records = await self.postgres_database.get_all_events.remote()
for record in records:
try:
event = kilostarEvent.model_validate_json(record.event_data_json)
event.pending_queue = asyncio.Queue()
event.receive_queue = asyncio.Queue()
self.event_dict[event.trace_id] = event
async def create_trace(self, trace_id: str) -> None:
if trace_id not in self._traces:
self._traces[trace_id] = TraceQueues()
# Store in ray object store for cache
event_copy = event.model_copy()
event_copy.pending_queue = None
event_copy.receive_queue = None
self.event_object_refs[event.trace_id] = ray.put(
event_copy.model_dump_json()
)
except Exception as e:
self.logger.error(f"Failed to load event {record.trace_id}: {e}")
self.logger.info(f"Loaded {len(self.event_dict)} events from database")
# Trigger resumption of incomplete workflows
workflow_running_engine = None
for trace_id, event in self.event_dict.items():
if event.workflow and event.workflow.status.status in [
"waiting_llm_working",
"waiting_tool_working",
"llm_working",
"tool_working",
]:
self.logger.info(f"Resuming incomplete workflow {trace_id}")
if not workflow_running_engine:
try:
workflow_running_engine = ray_actor_hook(
"workflow_running_engine"
).workflow_running_engine
except AttributeError:
self.logger.warning(
"workflow_running_engine not found, cannot resume workflow"
)
break
await workflow_running_engine.resume_workflow.remote(event)
except Exception as e:
self.logger.error(f"Failed to fetch events from database: {e}")
async def _upsert_event_to_db(self, event: kilostarEvent):
try:
# Create a copy and remove non-serializable queues
event_copy = event.model_copy()
event_copy.pending_queue = None
event_copy.receive_queue = None
event_json = event_copy.model_dump_json()
# Update cache
self.event_object_refs[event.trace_id] = ray.put(event_json)
await self.postgres_database.upsert_event.remote(event.trace_id, event_json)
except Exception as e:
self.logger.error(
f"Failed to upsert event {event.trace_id} to database: {e}"
)
async def add_event(self, event: kilostarEvent) -> None:
event.pending_queue = asyncio.Queue()
event.receive_queue = asyncio.Queue()
self.event_dict[event.trace_id] = event
await self._upsert_event_to_db(event)
async def delete_event(self, trace_id: str) -> None:
if trace_id in self.event_dict:
del self.event_dict[trace_id]
if trace_id in self.event_object_refs:
del self.event_object_refs[trace_id]
try:
await self.postgres_database.delete_event.remote(trace_id)
except Exception as e:
self.logger.error(f"Failed to delete event {trace_id} from database: {e}")
async def get_event(self, trace_id: str) -> kilostarEvent | None:
# First check memory dict
if trace_id in self.event_dict:
return self.event_dict[trace_id]
# Then check Ray object store cache
if trace_id in self.event_object_refs:
try:
event_json = ray.get(self.event_object_refs[trace_id])
return kilostarEvent.model_validate_json(event_json)
except Exception as e:
self.logger.warning(
f"Failed to fetch event from cache for trace {trace_id}: {e}"
)
# Fallback to database
try:
record = await self.postgres_database.get_event.remote(trace_id)
if record:
event = kilostarEvent.model_validate_json(record.event_data_json)
# Restore to memory dict with missing transient queues
event.pending_queue = asyncio.Queue()
event.receive_queue = asyncio.Queue()
self.event_dict[trace_id] = event
# Restore to cache
event_copy = event.model_copy()
event_copy.pending_queue = None
event_copy.receive_queue = None
self.event_object_refs[trace_id] = ray.put(event_copy.model_dump_json())
return event
except Exception as e:
self.logger.error(
f"Failed to fetch event {trace_id} from database fallback: {e}"
)
return None
async def update_attachment(
self, trace_id: str, attachment: Dict[str, str]
) -> None:
if trace_id in self.event_dict:
self.event_dict[trace_id].attachment = attachment
await self._upsert_event_to_db(self.event_dict[trace_id])
async def update_workflow(self, trace_id: str, workflow: KiloStarWorkflow) -> None:
if trace_id in self.event_dict:
self.event_dict[trace_id].workflow = workflow
await self._upsert_event_to_db(self.event_dict[trace_id])
async def get_workflow(self, trace_id: str) -> KiloStarWorkflow | None:
event = await self.get_event(trace_id)
return event.workflow if event else None
async def list_events(self) -> list[dict]:
result = []
# Read strictly from the database to ensure we get all events,
# and ignore the cache to prevent frontend missing items.
try:
records = await self.postgres_database.get_all_events.remote()
for record in records:
try:
event = kilostarEvent.model_validate_json(record.event_data_json)
workflow_title = event.workflow.title if event.workflow else None
workflow_status = (
event.workflow.status.status
if event.workflow and event.workflow.status
else None
)
result.append(
{
"event_id": event.trace_id,
"workflow_title": workflow_title,
"status": workflow_status,
"user_name": event.user_name,
"message": event.message,
"create_time": event.create_time,
}
)
# Best-effort cache population
self.event_object_refs[event.trace_id] = ray.put(
record.event_data_json
)
except Exception:
continue
except Exception as e:
self.logger.error(f"Failed to list_events from DB: {e}")
return result
async def delete_trace(self, trace_id: str) -> None:
self._traces.pop(trace_id, None)
async def put_pending(self, trace_id, item) -> None:
if trace_id in self.event_dict and self.event_dict[trace_id].pending_queue:
await self.event_dict[trace_id].pending_queue.put(item)
if trace_id in self._traces:
await self._traces[trace_id].pending.put(item)
async def get_pending(self, trace_id) -> str:
if trace_id in self.event_dict and self.event_dict[trace_id].pending_queue:
return await self.event_dict[trace_id].pending_queue.get()
await asyncio.sleep(1) # Prevent CPU spinning if not found
if trace_id in self._traces:
return await self._traces[trace_id].pending.get()
await asyncio.sleep(1)
return ""
async def put_received(self, trace_id, item) -> None:
if trace_id in self.event_dict and self.event_dict[trace_id].receive_queue:
await self.event_dict[trace_id].receive_queue.put(item)
if trace_id in self._traces:
await self._traces[trace_id].receive.put(item)
async def get_received(self, trace_id) -> str:
if trace_id in self.event_dict and self.event_dict[trace_id].receive_queue:
return await self.event_dict[trace_id].receive_queue.get()
await asyncio.sleep(1) # Prevent CPU spinning if not found
if trace_id in self._traces:
return await self._traces[trace_id].receive.get()
await asyncio.sleep(1)
return ""