Mixin 详解
约 2676 字大约 9 分钟
2026-03-19
EventMixin / TimeTaskMixin / RBACMixin / ConfigMixin / DataMixin 完整 API
1. EventMixin
路径:
ncatbot/plugin/mixin/event_mixin.py
代理 AsyncEventDispatcher 的事件消费接口。卸载时自动关闭所有活跃的 EventStream。
1.1 events()
def events(
self,
event_type: Optional[Union[str, EventType]] = None,
) -> EventStream创建事件流,可选按类型过滤。返回的 EventStream 支持 async with / async for。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
event_type | str | EventType | None | None | 事件类型过滤(前缀匹配),如 "message"、"notice" |
返回值:EventStream 异步迭代器
示例:
async def on_load(self):
async with self.events("message") as stream:
async for event in stream:
print(event.raw_message)1.2 wait_event()
async def wait_event(
self,
predicate: Optional[Callable[[Event], bool]] = None,
timeout: Optional[float] = None,
) -> Event等待下一个满足条件的事件。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
predicate | Callable[[Event], bool] | None | None | 过滤函数,None 接受任意事件 |
timeout | float | None | None | 超时秒数,None 无限等待 |
返回值:匹配的 Event
异常:asyncio.TimeoutError — 超时未匹配到事件
示例:
# 等待特定用户的下一条消息,最多 30 秒
event = await self.wait_event(
predicate=lambda e: e.user_id == 12345,
timeout=30.0,
)1.3 wait_session_event()
async def wait_session_event(
self,
event: object,
*,
timeout: Optional[float] = None,
extra_predicate: Optional[Union[Callable[[Event], bool], P]] = None,
cancel_words: Optional[Sequence[str]] = None,
) -> Event等待同 session 的下一个事件。自动通过 from_event(event) 绑定 session(同用户/同群/同消息类型)。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
event | object | — | 触发事件(高层 BaseEvent 或底层 Event 均可) |
timeout | float | None | None | 超时秒数 |
extra_predicate | Callable | P | None | None | 额外过滤条件,与 session 谓词 AND 组合 |
cancel_words | Sequence[str] | None | None | 取消词列表 |
返回值:匹配的 Event
异常:
asyncio.TimeoutError— 超时SessionCancelled— 用户输入了取消词(.event为触发事件,.word为匹配词)
示例:
from ncatbot.core import SessionCancelled
try:
reply = await self.wait_session_event(event, timeout=30, cancel_words=["取消"])
except asyncio.TimeoutError:
await event.reply("超时了")
except SessionCancelled as e:
await event.reply(f"已取消(关键词: {e.word})")1.4 wait_session_reply()
async def wait_session_reply(
self,
event: object,
*,
timeout: Optional[float] = None,
cancel_words: Optional[Sequence[str]] = None,
) -> SessionResult等待同 session 的文本回复,将超时和取消包装为 SessionResult 而非异常。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
event | object | — | 触发事件 |
timeout | float | None | None | 超时秒数 |
cancel_words | Sequence[str] | None | None | 取消词列表 |
返回值:SessionResult — .ok 判断成功,.text 获取回复文本
示例:
result = await self.wait_session_reply(event, timeout=30, cancel_words=["取消"])
if result.ok:
name = result.text
elif result.timed_out:
await event.reply("超时了")
elif result.cancelled:
await event.reply("已取消")1.5 session_prompt()
async def session_prompt(
self,
prompt_text: str,
event: object,
*,
timeout: Optional[float] = None,
cancel_words: Optional[Sequence[str]] = None,
timeout_reply: Optional[str] = None,
cancel_reply: Optional[str] = None,
) -> SessionResult发送提示消息并等待同 session 回复(一站式)。超时/取消时可自动回复用户。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
prompt_text | str | — | 发送给用户的提示文本 |
event | object | — | 触发事件(需有 .reply() 方法) |
timeout | float | None | None | 超时秒数 |
cancel_words | Sequence[str] | None | None | 取消词列表 |
timeout_reply | str | None | None | 超时时自动回复文本 |
cancel_reply | str | None | None | 取消时自动回复文本 |
返回值:SessionResult
示例:
result = await self.session_prompt(
"请输入你的名字:", event,
timeout=30,
cancel_words=["取消"],
timeout_reply="⏰ 超时了",
cancel_reply="❌ 已取消",
)
if result.ok:
name = result.text1.6 session_choose()
async def session_choose(
self,
prompt_text: str,
event: object,
*,
choices: Dict[str, str],
timeout: Optional[float] = None,
timeout_reply: Optional[str] = None,
invalid_reply: Optional[str] = None,
max_retries: int = 0,
) -> SessionResult发送选择题并等待用户选择,支持重试。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
prompt_text | str | — | 提示文本 |
event | object | — | 触发事件 |
choices | Dict[str, str] | — | {用户输入: 语义key} 映射 |
timeout | float | None | None | 超时秒数 |
timeout_reply | str | None | None | 超时自动回复 |
invalid_reply | str | None | None | 无效输入自动回复 |
max_retries | int | 0 | 最大重试次数 |
返回值:SessionResult — .key 为匹配的选项 key,.text 为原始输入
示例:
result = await self.session_choose(
"确认注册?回复「确认」或「取消」", event,
choices={"确认": "confirm", "取消": "cancel"},
timeout=15,
invalid_reply="请回复「确认」或「取消」",
max_retries=2,
)
if result.ok and result.key == "confirm":
# 执行注册
...1.7 辅助类型
SessionCancelled
路径:
ncatbot.plugin.mixin.session_types(也可从ncatbot.core导入)
class SessionCancelled(Exception):
event: Event # 触发取消的事件
word: str # 匹配到的取消词SessionResult
路径:
ncatbot.plugin.mixin.session_types(也可从ncatbot.core导入)
@dataclass(frozen=True, slots=True)
class SessionResult:
ok: bool # True 表示正常回复
text: Optional[str] # 回复文本 (raw_message.strip())
event: Optional[Event] # 原始事件
cancelled: bool # True 表示取消词触发
timed_out: bool # True 表示超时
cancel_word: Optional[str] # 匹配的取消词
key: Optional[str] # session_choose 的选项 key工厂方法:
| 方法 | 说明 |
|---|---|
SessionResult.of(event, text, *, key=None) | 创建成功结果 |
SessionResult.from_timeout() | 创建超时结果 |
SessionResult.from_cancel(event, word) | 创建取消结果 |
SessionResult.from_invalid(text, event) | 创建无效输入结果 |
2. TimeTaskMixin
路径:
ncatbot/plugin/mixin/time_task_mixin.py
代理 TimeTaskService,提供定时任务的添加、移除、查询接口。 卸载时自动清理本插件注册的所有定时任务。
2.1 add_scheduled_task()
@final
def add_scheduled_task(
self,
name: str,
interval: Union[str, int, float],
conditions: Optional[List[Callable[[], bool]]] = None,
max_runs: Optional[int] = None,
callback: Optional[Callable] = None,
) -> bool添加定时任务。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
name | str | — | 任务唯一名称 |
interval | str | int | float | — | 调度参数(见下表) |
conditions | List[Callable[[], bool]] | None | None | 执行条件列表,全部为 True 时才执行 |
max_runs | int | None | None | 最大执行次数 |
callback | Callable | None | None | 自定义回调函数,覆盖默认的同名方法查找 |
interval 支持的格式:
| 格式 | 示例 | 说明 |
|---|---|---|
| 秒数 | 120, 0.5 | 整数或浮点数 |
| 时间字符串 | "30s", "2h30m", "0.5d" | 人类可读的时间间隔 |
| 每日时间 | "HH:MM" | 每天定时执行 |
| 一次性 | "YYYY-MM-DD HH:MM:SS" | 指定时刻执行一次 |
返回值:bool — 是否添加成功
2.2 remove_scheduled_task()
@final
def remove_scheduled_task(self, name: str) -> bool| 参数 | 类型 | 说明 |
|---|---|---|
name | str | 任务名称 |
返回值:bool — 是否移除成功
2.3 get_task_status()
@final
def get_task_status(self, name: str) -> Optional[Dict[str, Any]]获取指定任务状态。返回包含 name、next_run、run_count、max_runs 的字典, 任务不存在返回 None。
2.4 list_scheduled_tasks()
@final
def list_scheduled_tasks(self) -> List[str]列出本插件注册的所有定时任务名称。
2.5 cleanup_scheduled_tasks()
@final
def cleanup_scheduled_tasks(self) -> None清理本插件的所有定时任务。通常无需手动调用,_mixin_unload 钩子会自动执行。
示例:
class MyPlugin(NcatBotPlugin):
name = "heartbeat"
version = "1.0.0"
async def on_load(self):
self.add_scheduled_task("tick", "30s", max_runs=100)
async def on_close(self):
pass # cleanup_scheduled_tasks() 由 _mixin_unload 自动调用3. RBACMixin
路径:
ncatbot/plugin/mixin/rbac_mixin.py
代理 RBACService,提供角色-权限管理的便捷接口。
3.1 check_permission()
def check_permission(self, user: str, permission: str) -> bool| 参数 | 类型 | 说明 |
|---|---|---|
user | str | 用户标识 |
permission | str | 权限路径 |
返回值:bool — 是否拥有权限,RBAC 服务不可用时返回 False
3.2 add_permission()
def add_permission(self, path: str) -> None注册权限路径。
| 参数 | 类型 | 说明 |
|---|---|---|
path | str | 权限路径,如 "plugin_name.feature" |
3.3 remove_permission()
def remove_permission(self, path: str) -> None移除权限路径。
3.4 add_role()
def add_role(self, role: str, exist_ok: bool = True) -> None创建角色。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
role | str | — | 角色名称 |
exist_ok | bool | True | 角色已存在时是否忽略 |
3.5 user_has_role()
def user_has_role(self, user: str, role: str) -> bool检查用户是否拥有指定角色。RBAC 服务不可用时返回 False。
示例:
class AdminPlugin(NcatBotPlugin):
name = "admin"
version = "1.0.0"
async def on_load(self):
self.add_permission("admin.manage")
self.add_role("admin_role")
def can_manage(self, user_id: int) -> bool:
return self.check_permission(str(user_id), "admin.manage")4. ConfigMixin
路径:
ncatbot/plugin/mixin/config_mixin.py
双层配置模型:插件源码目录的 config.yaml 提供低优先级默认值,全局 config.yaml 的 plugin.plugin_configs.<name> 提供高优先级覆盖。
生命周期行为:
| 钩子 | 行为 |
|---|---|
_mixin_load | 从插件源码目录加载 config.yaml 默认值,再合并全局配置覆盖到 self.config |
_mixin_unload | No-op(所有持久化通过 set_config / update_config 即时完成) |
全局配置覆盖:如果
config.yaml中plugin.plugin_configs.<plugin_name>存在条目, 会在加载后覆盖对应配置项(全局优先)。
4.1 get_config()
def get_config(self, key: str, default: Any = None) -> Any读取配置值。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
key | str | — | 配置键名 |
default | Any | None | 键不存在时的默认值 |
4.2 set_config()
def set_config(self, key: str, value: Any) -> None设置配置值并立即持久化到 config.yaml。
4.3 remove_config()
def remove_config(self, key: str) -> bool移除配置项并持久化。键不存在返回 False。
4.4 update_config()
def update_config(self, updates: Dict[str, Any]) -> None批量更新配置并持久化。
4.5 init_defaults()
def init_defaults(self, defaults: Dict[str, Any]) -> None补充缺失的默认配置项(仅内存,不持久化)。遍历 defaults,仅当 self.config 中不存在对应键时才写入。
示例:
class MyPlugin(NcatBotPlugin):
name = "my_plugin"
version = "1.0.0"
async def on_load(self):
# 补充缺失的默认配置(不持久化)
self.init_defaults({"api_key": "", "timeout": 30, "retry": 3})
# 读取
key = self.get_config("api_key")5. DataMixin
路径:
ncatbot/plugin/mixin/data_mixin.py
管理插件 data.json 的加载和保存,提供 self.data 字典供自由读写。
生命周期行为:
| 钩子 | 行为 |
|---|---|
_mixin_load | 从 workspace/data.json 加载数据到 self.data |
_mixin_unload | 将 self.data 保存回 workspace/data.json |
5.1 self.data 字典操作
self.data 是一个 Dict[str, Any],支持标准字典操作:
class CounterPlugin(NcatBotPlugin):
name = "counter"
version = "1.0.0"
async def on_load(self):
# 读取(首次运行时 self.data 为空字典)
self.data["counter"] = self.data.get("counter", 0) + 1
# 写入
self.data["last_online"] = "2026-01-01"
# 删除
self.data.pop("temp_key", None)
# 插件卸载时框架自动保存到 data/<plugin_name>/data.json⚠️ 注意:
self.data的修改不会立即持久化,仅在插件卸载时(_mixin_unload) 自动保存。如需立即保存,可手动调用self._save_data()。
存储路径:data/<plugin_name>/data.json
6. DispatchFilterMixin
路径:
ncatbot/plugin/mixin/dispatch_filter_mixin.py
代理 DispatchFilterService,提供按群/用户维度禁用插件或命令的便捷接口。 无 RBAC 限制,任何插件均可自由调用。管理范围为全局(不限于本插件)。
6.1 dispatch_filter (property)
@property
def dispatch_filter(self) -> Optional[DispatchFilterService]获取 DispatchFilterService 实例。服务不可用时返回 None。
6.2 block_in_group()
def block_in_group(
self,
group_id: str,
plugin_name: str,
commands: Optional[List[str]] = None,
) -> Optional[FilterRule]在群中禁用指定插件或命令。
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
group_id | str | — | 群号 |
plugin_name | str | — | 目标插件名("*" = 全部) |
commands | List[str] | None | None | 要禁用的命令列表(None = 整个插件) |
返回值:FilterRule,服务不可用时返回 None
6.3 unblock_in_group()
def unblock_in_group(
self,
group_id: str,
plugin_name: str,
commands: Optional[List[str]] = None,
) -> int解除群中对指定插件或命令的禁用。返回值:移除的规则数量。
6.4 block_for_user()
def block_for_user(
self,
user_id: str,
plugin_name: str,
commands: Optional[List[str]] = None,
) -> Optional[FilterRule]对指定用户禁用插件或命令。参数与 block_in_group() 类似。
6.5 unblock_for_user()
def unblock_for_user(
self,
user_id: str,
plugin_name: str,
commands: Optional[List[str]] = None,
) -> int解除对指定用户的插件或命令的禁用。返回值:移除的规则数量。
6.6 list_filters()
def list_filters(
self,
scope_type: Optional[str] = None,
scope_id: Optional[str] = None,
plugin_name: Optional[str] = None,
) -> List[FilterRule]条件查询过滤规则。全部参数为 None 时返回所有规则。
6.7 clear_filters()
def clear_filters(self, plugin_name: Optional[str] = None) -> int批量清除过滤规则。plugin_name=None 清除全部。返回值:清除的规则数量。
示例:
class AdminPlugin(NcatBotPlugin):
name = "admin"
version = "1.0.0"
@registrar.on_group_command("disable")
async def disable_plugin(self, event, target: str):
"""在当前群禁用指定插件"""
self.block_in_group(str(event.group_id), target)
await event.reply(f"已在本群禁用 {target}")
@registrar.on_group_command("enable")
async def enable_plugin(self, event, target: str):
"""在当前群启用指定插件"""
count = self.unblock_in_group(str(event.group_id), target)
await event.reply(f"已在本群启用 {target}(移除 {count} 条规则)")版权所有
版权归属:MI
