# Copyright 2026 zhaoxi826 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Union from kilostar.utils.ray_hook import ray_actor_hook from fastapi import APIRouter, Depends, Request from pydantic import BaseModel, field_validator from kilostar.utils.access import Accessor, TokenData, RoleChecker from kilostar.core.postgres_database.model import AgentType from fastapi import HTTPException from typing import Optional, List, Dict from kilostar.core.postgres_database.model import UserAuthority from kilostar.utils.mcp_helper import get_all_tools_and_toolsets_for_scope from kilostar.utils.i18n import t agent_router = APIRouter(prefix="/api/v1/agent", tags=["agent"]) class AgentRegister(BaseModel): """``POST /agent`` 入参(远程模型):通过 provider + model_id 加载系统节点。""" provider_title: str model_id: str individual_name: str toolsets: Optional[List[str]] = None persona_id: Optional[str] = None display_name: Optional[str] = None class AgentLocalRegister(BaseModel): """``POST /agent`` 入参(本地模型):通过本地路径加载系统节点。""" path: str individual_name: str toolsets: Optional[List[str]] = None @agent_router.get("") async def get_system_nodes( _: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)), ): """返回两大系统节点(regulatory/consciousness)当前的持久化配置。""" postgres_database = ray_actor_hook("postgres_database").postgres_database configs = await postgres_database.get_all_system_node_configs.remote() return {"system_nodes": configs} @agent_router.post("") async def load_agent( agent_register: Union[AgentRegister, AgentLocalRegister], request: Request, _: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.SUPER_ADMINISTRATOR)), ): """加载/重载某个系统节点的 Agent:先持久化配置,再调用对应节点 Actor 的 ``create_agent``。""" global_state_machine = ray_actor_hook("global_state_machine").global_state_machine postgres_database = ray_actor_hook("postgres_database").postgres_database accept_lang = request.headers.get("accept-language", "") if isinstance(agent_register, AgentLocalRegister): pass elif isinstance(agent_register, AgentRegister): try: await postgres_database.upsert_system_node_config.remote( agent_register.individual_name, agent_register.provider_title, agent_register.model_id, agent_register.toolsets, agent_register.persona_id, agent_register.display_name, ) scope = agent_register.individual_name tools, toolsets = await get_all_tools_and_toolsets_for_scope( scope, toolset_ids=agent_register.toolsets ) # Resolve persona system_prompt from DB persona_prompt = None if agent_register.persona_id: tpl = await postgres_database.get_template.remote(agent_register.persona_id) if tpl: persona_prompt = tpl.system_prompt match scope: case "regulatory_node": node = ray_actor_hook("regulatory_node").regulatory_node await node.create_agent.remote( global_state_machine, agent_register.provider_title, agent_register.model_id, tools, toolsets, accept_lang, persona_prompt, ) case "consciousness_node": node = ray_actor_hook("consciousness_node").consciousness_node await node.create_agent.remote( global_state_machine, agent_register.provider_title, agent_register.model_id, tools, toolsets, accept_lang, persona_prompt, ) case _: pass except Exception as e: from kilostar.utils.logger import get_logger get_logger("agent_api").exception(f"加载节点失败: {e}") raise HTTPException(status_code=500, detail="加载节点失败,请查看服务端日志") return {"message": "创建成功"} _VALID_AFFINITIES = {"cpu", "core", "gpu"} class WorkerIndividualCreate(BaseModel): """``POST /worker`` 入参:创建一个 Worker Agent 所需的完整配置。""" agent_name: str agent_type: AgentType description: str provider_title: str model_id: str persona_id: str output_template: dict bound_skill: Dict[str, List[str]] workspace: List[str] toolsets: Optional[List[str]] = None node_affinity: str = "cpu" @field_validator("node_affinity") @classmethod def _check_affinity(cls, v: str) -> str: if v not in _VALID_AFFINITIES: raise ValueError(f"node_affinity 必须是 cpu/core/gpu,收到: {v}") return v class WorkerIndividualUpdate(BaseModel): """``PUT /worker/{agent_id}`` 入参:可选字段构成的局部更新载荷。""" agent_name: Optional[str] = None agent_type: Optional[AgentType] = None description: Optional[str] = None provider_title: Optional[str] = None model_id: Optional[str] = None persona_id: Optional[str] = None output_template: Optional[dict] = None bound_skill: Optional[Dict[str, List[str]]] = None workspace: Optional[List[str]] = None toolsets: Optional[List[str]] = None node_affinity: Optional[str] = None @field_validator("node_affinity") @classmethod def _check_affinity(cls, v: Optional[str]) -> Optional[str]: if v is not None and v not in _VALID_AFFINITIES: raise ValueError(f"node_affinity 必须是 cpu/core/gpu,收到: {v}") return v @agent_router.post("/worker") async def create_worker_individual( worker_data: WorkerIndividualCreate, token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)), ): """创建一个 Worker Agent,``owner_id`` 自动绑定为当前登录用户。""" postgres_database = ray_actor_hook("postgres_database").postgres_database data_dict = worker_data.model_dump() data_dict["owner_id"] = token_data.user_id worker = await postgres_database.add_worker_individual.remote(**data_dict) return {"message": "success", "agent_id": worker.agent_id} @agent_router.get("/worker") async def get_worker_individual_list( token_data: TokenData = Depends(Accessor.get_current_user), ): """列出当前登录用户名下的全部 Worker Agent,并附加所有 plugin_owned slot。 plugin_owned slot 是插件登记的"占位 agent",所有用户共享同一份配置, 在前端展示时会用徽标标记,并允许任何登录用户装配 provider/model。 """ postgres_database = ray_actor_hook("postgres_database").postgres_database workers = await postgres_database.get_worker_individual_list.remote( owner_id=token_data.user_id ) or [] all_workers = await postgres_database.get_all_worker_individual.remote() or [] seen_ids = {w.agent_id for w in workers} plugin_slots = [ w for w in all_workers if getattr(w, "plugin_owned", None) and w.agent_id not in seen_ids ] return {"workers": list(workers) + plugin_slots} @agent_router.get("/worker/{agent_id}") async def get_worker_individual( agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user) ): """按 ``agent_id`` 查询 Worker Agent;非本人的 Agent 返回 403(plugin_owned slot 例外)。""" postgres_database = ray_actor_hook("postgres_database").postgres_database worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id) if not worker: raise HTTPException(status_code=404, detail="Agent not found") if not getattr(worker, "plugin_owned", None) and worker.owner_id != token_data.user_id: raise HTTPException( status_code=403, detail="Forbidden: You do not own this agent" ) return worker @agent_router.put("/worker/{agent_id}") async def update_worker_individual( agent_id: str, worker_data: WorkerIndividualUpdate, token_data: TokenData = Depends(Accessor.get_current_user), ): """局部更新 Worker Agent 配置;同时把状态机里的旧实例移除等待懒加载。""" postgres_database = ray_actor_hook("postgres_database").postgres_database worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id) if not worker: raise HTTPException(status_code=404, detail="Agent not found") # plugin_owned slot:任何登录用户都能装配 provider/model;普通 worker 仅 owner 可改 if not getattr(worker, "plugin_owned", None) and worker.owner_id != token_data.user_id: raise HTTPException( status_code=403, detail="Forbidden: You do not own this agent" ) update_data = worker_data.model_dump(exclude_unset=True) updated_worker = await postgres_database.update_worker_individual.remote( agent_id=agent_id, **update_data ) global_state_machine = ray_actor_hook("global_state_machine").global_state_machine try: await global_state_machine.remove_individual.remote(agent_id) except Exception: pass # plugin_owned 时顺带触发对应插件的 reload,让新 provider/model 立刻生效 if getattr(worker, "plugin_owned", None): try: pm = ray_actor_hook("global_plugin_manager").global_plugin_manager await pm.reload.remote(worker.plugin_owned) except Exception: pass return {"message": "success", "worker": updated_worker} @agent_router.post("/worker/{agent_id}/reload") async def reload_worker_individual( agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user) ): """强制把 Worker 从内存池中卸载,下次调用时按最新配置重新加载。""" postgres_database = ray_actor_hook("postgres_database").postgres_database worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id) if not worker: raise HTTPException(status_code=404, detail="Agent not found") if worker.owner_id != token_data.user_id: raise HTTPException( status_code=403, detail="Forbidden: You do not own this agent" ) global_state_machine = ray_actor_hook("global_state_machine").global_state_machine await global_state_machine.remove_individual.remote(agent_id) return {"message": "Worker will be reloaded on next use"} @agent_router.delete("/worker/{agent_id}") async def delete_worker_individual( agent_id: str, token_data: TokenData = Depends(Accessor.get_current_user) ): """删除 Worker Agent;非本人 Agent 返回 403。""" postgres_database = ray_actor_hook("postgres_database").postgres_database worker = await postgres_database.get_worker_individual.remote(agent_id=agent_id) if not worker: raise HTTPException(status_code=404, detail="Agent not found") if worker.owner_id != token_data.user_id: raise HTTPException( status_code=403, detail="Forbidden: You do not own this agent" ) await postgres_database.delete_worker_individual.remote(agent_id=agent_id) return {"message": "success"} # ──────────────────────────────── Persona Template ──────────────────────────── class PersonaTemplateCreate(BaseModel): name: str system_prompt: str = "" class PersonaTemplateUpdate(BaseModel): name: Optional[str] = None system_prompt: Optional[str] = None @agent_router.get("/template") async def list_templates( token_data: TokenData = Depends(Accessor.get_current_user), ): postgres_database = ray_actor_hook("postgres_database").postgres_database templates = await postgres_database.list_templates.remote( owner_id=token_data.user_id ) return {"templates": templates} @agent_router.post("/template") async def create_template( data: PersonaTemplateCreate, token_data: TokenData = Depends(RoleChecker(allowed_roles=UserAuthority.USER)), ): postgres_database = ray_actor_hook("postgres_database").postgres_database tpl = await postgres_database.add_template.remote( name=data.name, system_prompt=data.system_prompt, owner_id=token_data.user_id, ) return {"message": "success", "template_id": tpl.template_id} @agent_router.put("/template/{template_id}") async def update_template( template_id: str, data: PersonaTemplateUpdate, token_data: TokenData = Depends(Accessor.get_current_user), ): postgres_database = ray_actor_hook("postgres_database").postgres_database tpl = await postgres_database.get_template.remote(template_id) if not tpl: raise HTTPException(status_code=404, detail="Template not found") if tpl.owner_id != token_data.user_id: raise HTTPException(status_code=403, detail="Forbidden") updated = await postgres_database.update_template.remote( template_id, **data.model_dump(exclude_unset=True) ) return {"message": "success", "template": updated} @agent_router.delete("/template/{template_id}") async def delete_template( template_id: str, token_data: TokenData = Depends(Accessor.get_current_user), ): postgres_database = ray_actor_hook("postgres_database").postgres_database tpl = await postgres_database.get_template.remote(template_id) if not tpl: raise HTTPException(status_code=404, detail="Template not found") if tpl.owner_id != token_data.user_id: raise HTTPException(status_code=403, detail="Forbidden") await postgres_database.delete_template.remote(template_id) return {"message": "success"}