Files
zhaoxi 8f1398c591 feat: 人设模板系统、节点调度标签、pydantic-settings收敛、错误处理增强
新增persona_template表和CRUD API,BaseIndividualModel增加node_affinity和template_origin_id字段,
WorkerCluster支持多集群Ray资源调度,环境变量收敛到pydantic-settings统一校验,
数据库异常转换为结构化BusinessError/RetryableError,系统节点支持custom_system_prompt。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 06:07:46 +00:00

290 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright 2026 zhaoxi826
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OneBot v11 协议适配器。
接收来自 OneBot 实现端(NapCat / go-cqhttp / Lagrange.OneBot 等)的事件上报,
把消息事件翻译成 ``MessageRequest`` 投递给 RegulatoryNode。同时支持两种连接方式:
- HTTP 上报(POST ``/api/v1/adapter/onebot/event``):实现端把事件 POST 过来,
通过返回体里的 ``reply`` 走 v11 "快速操作" 自动回包。
- 反向 WebSocketWS ``/api/v1/adapter/onebot/ws``):实现端主动建立长连接,
服务端按 OneBot v11 反向 WS 规范返回 ``send_msg`` 等 action 主动回包。
模块还提供 ``send_message`` 工具函数,用 OneBot v11 HTTP API 主动给指定会话发消息。
"""
import asyncio
import json
import os
import uuid
from typing import Any, Dict, Optional
import httpx
from fastapi import APIRouter, Header, HTTPException, Request, WebSocket, WebSocketDisconnect
from kilostar.core.individual.regulatory_node.template import (
MessageRequest,
MessageResponse,
)
from kilostar.utils.logger import get_logger
from kilostar.utils.ray_hook import ray_actor_hook
logger = get_logger("onebot")
onebot_router = APIRouter(prefix="/api/v1/adapter/onebot", tags=["onebot"])
def _verify_token(token_from_header: Optional[str]) -> None:
"""校验 OneBot 实现端在 ``Authorization`` 头里携带的 access_token。
若环境变量 ``ONEBOT_ACCESS_TOKEN`` 未设置,根据运行模式决策:
- 开发模式(KILOSTAR_ENV=dev):跳过校验并记录警告
- 生产模式:拒绝所有请求,强制要求配置 token
"""
expected = os.environ.get("ONEBOT_ACCESS_TOKEN")
if not expected:
is_dev = os.environ.get("KILOSTAR_ENV", "production").lower() in ("dev", "development")
if is_dev:
logger.warning("[OneBot] ONEBOT_ACCESS_TOKEN 未设置,开发模式下跳过认证")
return
raise HTTPException(
status_code=401,
detail="ONEBOT_ACCESS_TOKEN 未配置,拒绝未认证的 OneBot 连接",
)
if not token_from_header:
raise HTTPException(status_code=401, detail="Missing access_token")
raw = token_from_header.removeprefix("Bearer ").removeprefix("Token ").strip()
if raw != expected:
raise HTTPException(status_code=401, detail="Invalid access_token")
def _extract_plain_text(message: Any) -> str:
"""把 OneBot 消息字段(字符串或 segment 数组)展平成纯文本。
OneBot v11 既支持 CQ 码字符串,也支持消息段数组形式;这里只抽取 ``text``
段,其它段(图片/at/表情等)暂时丢弃。
"""
if isinstance(message, str):
return message
if isinstance(message, list):
parts = []
for seg in message:
if isinstance(seg, dict) and seg.get("type") == "text":
parts.append(seg.get("data", {}).get("text", ""))
return "".join(parts)
return ""
async def _dispatch_event(payload: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""把一次 OneBot 事件交给 RegulatoryNode 处理,返回快速操作字典或 ``None``。
仅处理 ``post_type == "message"`` 的私聊与群聊;元事件、通知、请求事件
一律忽略。返回结果遵循 OneBot v11 "快速操作" 约定:
- 群聊:``{"reply": "...", "at_sender": False, "_target": {...}}``
- 私聊:``{"reply": "...", "_target": {...}}``
其中 ``_target`` 仅供 ``_dispatch_via_ws`` 决定 send_msg 的入参;HTTP 模式下
会被剔除后再返回给实现端。
"""
if payload.get("post_type") != "message":
return None
message_type = payload.get("message_type") # private | group
user_id = str(payload.get("user_id", ""))
group_id = payload.get("group_id")
raw_text = _extract_plain_text(payload.get("message", ""))
sender = payload.get("sender") or {}
user_name = (
sender.get("card") or sender.get("nickname") or user_id or "onebot_user"
)
platform_id = (
f"group:{group_id}" if message_type == "group" else f"private:{user_id}"
)
if not raw_text.strip():
return None
logger.info(
f"[OneBot] {message_type} 消息 from {user_name}({user_id}) -> {raw_text!r}"
)
msg_request = MessageRequest(
platform="onebot",
user_name=user_name,
platform_id=platform_id,
message=raw_text,
)
try:
regulatory_node = ray_actor_hook("regulatory_node").regulatory_node
result = await regulatory_node.working.remote(msg_request)
except Exception as e:
logger.exception(f"[OneBot] RegulatoryNode 调用失败: {e}")
return None
reply_text = ""
if isinstance(result, MessageResponse):
reply_text = result.reply_message or ""
elif isinstance(result, str):
reply_text = result
if not reply_text:
return None
quick = {
"reply": reply_text,
"_target": {
"message_type": message_type,
"user_id": int(user_id) if user_id.isdigit() else user_id,
"group_id": group_id,
},
}
if message_type == "group":
quick["at_sender"] = False
return quick
@onebot_router.post("/event")
async def receive_event(
request: Request,
authorization: Optional[str] = Header(None),
):
"""HTTP 上报入口:接收 OneBot v11 事件并触发 RegulatoryNode。
若 RegulatoryNode 给出回复,会按 v11 "快速操作" 约定写到响应体里,由实现端
自动发送。若不需要回复则返回 ``{"status": "ok"}``。
"""
_verify_token(authorization)
payload: Dict[str, Any] = await request.json()
quick = await _dispatch_event(payload)
if not quick:
return {"status": "ok"}
quick.pop("_target", None)
return quick
async def _ws_call_action(
ws: WebSocket, action: str, params: Dict[str, Any]
) -> None:
"""通过反向 WS 给实现端发送一次 action 调用,不等待响应。"""
echo = uuid.uuid4().hex
frame = {"action": action, "params": params, "echo": echo}
await ws.send_text(json.dumps(frame, ensure_ascii=False))
@onebot_router.websocket("/ws")
async def reverse_websocket(
websocket: WebSocket,
authorization: Optional[str] = Header(None),
x_self_id: Optional[str] = Header(None),
):
"""反向 WebSocket 入口:接受 OneBot 实现端主动建立的长连接。
握手时校验 ``Authorization`` 头;之后循环读 JSON 帧。带 ``post_type`` 的
视为事件上报,调用 RegulatoryNode 处理后通过 ``send_msg`` action 主动回包;
带 ``echo`` 的视为 action 响应,目前直接丢弃(后续若需可在此处认领 future)。
"""
try:
_verify_token(authorization)
except HTTPException:
await websocket.close(code=4401)
return
await websocket.accept()
logger.info(f"[OneBot] reverse WS connected (self_id={x_self_id})")
try:
while True:
text = await websocket.receive_text()
try:
payload = json.loads(text)
except json.JSONDecodeError:
logger.warning(f"[OneBot] invalid JSON frame: {text[:200]}")
continue
# action 响应帧(含 echo 而无 post_type),目前忽略
if "post_type" not in payload and "echo" in payload:
continue
quick = await _dispatch_event(payload)
if not quick:
continue
target = quick.get("_target", {})
params: Dict[str, Any] = {"message": quick["reply"]}
if target.get("message_type") == "group" and target.get("group_id"):
params["group_id"] = target["group_id"]
action = "send_group_msg"
else:
params["user_id"] = target.get("user_id")
action = "send_private_msg"
asyncio.create_task(_ws_call_action(websocket, action, params))
except WebSocketDisconnect:
logger.info("[OneBot] reverse WS disconnected")
except Exception as e:
logger.exception(f"[OneBot] reverse WS error: {e}")
try:
await websocket.close()
except Exception:
pass
async def send_message(
user_id: Optional[int] = None,
group_id: Optional[int] = None,
message: str = "",
*,
base_url: Optional[str] = None,
access_token: Optional[str] = None,
) -> Dict[str, Any]:
"""通过 OneBot v11 HTTP API 主动给私聊或群聊发送一条消息。
Args:
user_id: 目标 QQ 号;与 ``group_id`` 二选一。
group_id: 目标群号;与 ``user_id`` 二选一。
message: 要发送的消息文本。
base_url: OneBot 实现端的 HTTP API 地址;默认读取 ``ONEBOT_HTTP_URL``。
access_token: 鉴权 token;默认读取 ``ONEBOT_ACCESS_TOKEN``。
Returns:
OneBot HTTP API 的原始响应 JSON。
"""
if not user_id and not group_id:
raise ValueError("必须指定 user_id 或 group_id 之一")
from kilostar.utils.settings import get_settings
_ob = get_settings().onebot
base = base_url or _ob.onebot_http_url
token = access_token or _ob.onebot_access_token or None
if group_id:
action = "send_group_msg"
body = {"group_id": int(group_id), "message": message}
else:
action = "send_private_msg"
body = {"user_id": int(user_id), "message": message}
headers: Dict[str, str] = {}
if token:
headers["Authorization"] = f"Bearer {token}"
url = f"{base.rstrip('/')}/{action}"
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(url, json=body, headers=headers)
resp.raise_for_status()
return resp.json()