nanobot源码学习(6)-多channel接入
上篇回顾
前五篇下来,nanobot 的核心引擎已经完整地过了一遍:记忆系统、provider 层、AgentLoop 主循环、工具系统、多 agent 协作。但有一个角度一直没讲清楚——外面的消息是怎么进来的。
笔记3讲 AgentLoop 时,入口是 process_direct(),直接传字符串就能用,完全没有”渠道”的概念。但真正部署 nanobot 的时候,用户可能是在飞书里发消息、在微信里发消息,甚至同时在好几个平台上跟它聊天。nanobot 是怎么把这些渠道统一接进来的?不同渠道的对话历史怎么隔离?如果我配了 unified_session,飞书和微信又怎么共享同一段记忆?
这篇先从怎么用讲起,建立直观感受,再深入源码,把背后的机制搞清楚。
先看怎么用:接入飞书渠道
从一个具体例子入手。飞书是 nanobot 支持最完整的渠道之一,而且它有一个很大的优势:不需要公网 IP。大多数 bot 平台要求你提供一个公网可访问的 Webhook 地址,本地开发要配 ngrok 之类的内网穿透。飞书的 WebSocket 长连接模式则是 bot 主动连飞书服务器,无需暴露任何端口。
在飞书开发者平台创建应用
打开飞书开放平台,进入开发者后台
创建企业自建应用(”企业自建应用”tab → 创建应用)
进入应用,导航到 **”功能 → 机器人”**,开启机器人功能
进入 **”权限管理”**,添加以下权限:
im:message:send_as_bot(发送消息)im:message(接收消息)im:message.group_at_msg(接收群 @消息,如果需要群聊)
进入 **”事件订阅”**,订阅方式选 **”使用长连接接收事件”**,添加事件
im.message.receive_v1在 “凭证与基础信息” 里找到 App ID 和 App Secret,记下来
发布应用版本(”版本管理” → 创建版本 → 申请发布)
安装依赖
1 | pip install lark-oapi |
配置 config.json
在 nanobot 的 config.json 里加入飞书渠道配置:
1 | { |
各字段说明:
| 字段 | 说明 |
|---|---|
enabled |
必须设为 true 才会启动该渠道 |
app_id |
飞书应用的 App ID |
app_secret |
飞书应用的 App Secret |
allow_from |
白名单,"*" 表示允许所有人,也可以填具体的用户 open_id 列表(以 ou_ 开头) |
group_policy |
群聊策略:"open" 处理所有群消息,"mention" 只处理 @bot 的消息 |
streaming |
开启流式输出(打字机效果),默认 true |
react_emoji |
收到消息时添加的表情,默认 "THUMBSUP"(👍) |
done_emoji |
处理完成后添加的表情,可选,比如 "OK" |
domain |
"feishu" 或 "lark"(海外版用 lark) |
allow_from 注意:设置为空列表
[]会拒绝所有人,这是一种”默认拒绝”的安全设计,千万不要忘记设置。
启动并测试
渠道模式需要用 nanobot gateway 命令启动
1 | nanobot gateway |
启动后会看到这样的日志输出:
1 | 🐈 Starting nanobot gateway version 0.1.5 on port 18790... |
有几个地方值得注意:第一行 Skipping built-in channel 'matrix' 是正常的——matrix 渠道需要额外安装依赖,没装就跳过,不影响其他渠道。最后 [Lark] connected to wss://... 说明 WebSocket 已经和飞书服务器建立了持久连接,bot 随时可以收消息了。
在飞书里找到你的 bot,发一条消息,稍等一两秒,你会看到:
- bot 先给你的消息加一个 👍 表情(
react_emoji),表示”收到,正在处理” - 如果开了
streaming,回复会以打字机效果逐渐出现在一张卡片里 - 处理完成后,如果配了
done_emoji,👍 会变成 ✅ 之类的完成标志 - 如果回复里有代码块、表格或 Markdown 格式,会自动渲染为飞书的交互式卡片;纯文本则发普通文本消息
一条消息的完整旅程
在深入各个模块之前,先跟着一条消息走完全流程,建立全局视角:
整个流程分七个阶段,每个阶段在后文都有对应章节展开:
| 阶段 | 概括 | 详见章节 |
|---|---|---|
| Phase 1 | nanobot gateway 启动,创建 MessageBus、AgentLoop、ChannelManager,扫描并启动 enabled 渠道 |
整体架构设计 |
| Phase 2 | 飞书通过 WebSocket 推送消息事件,SDK 在独立线程收包 | 飞书渠道实现详解 |
| Phase 3 | _on_message() 去重、过滤、解析,publish_inbound() 发到消息总线 |
BaseChannel 接口契约 |
| Phase 4 | AgentLoop 从 inbound 队列消费,加载 session,运行 agent loop | (笔记3已详述) |
| Phase 5 | 流式 delta 和最终回复都进入 outbound 队列 | 整体架构设计 |
| Phase 6 | ChannelManager 消费 outbound,合并 delta,重试,路由到渠道 | ChannelManager 编排层 |
| Phase 7 | FeishuChannel.send()/send_delta() 回发飞书,CardKit 流式渲染 | 飞书渠道实现详解 |
下面逐层拆解各模块的设计与实现。
整体架构设计
nanobot 的多渠道架构分三层:
整个流程的关键:渠道层和 AgentLoop 之间没有直接依赖,唯一的通信管道是 MessageBus 的两条队列。渠道层不需要知道 AgentLoop 的存在,AgentLoop 也不需要知道消息从哪个渠道来——它只是从 inbound 队列取消息,处理完往 outbound 队列丢回复。
MessageBus:唯一的通信管道
MessageBus 的实现只有两个 asyncio.Queue:
1 | # nanobot/bus/queue.py |
所有消息都是 JSON 序列化友好的 dataclass,没有回调,没有共享状态,没有跨线程引用。这个设计让整个系统高度可测试——你可以往 queue 里直接丢 InboundMessage,然后检查 outbound 队列里的回复。
Phase 1 补充:Gateway 启动顺序
nanobot gateway 命令执行后,commands.py 的 gateway() 函数按以下顺序构造依赖:
1 | # nanobot/cli/commands.py (简化) |
关键点:AgentLoop 先于渠道启动。这保证了当第一条消息进入 inbound 队列时,消费者已经就绪。如果顺序反过来,渠道的消息可能在没有消费者的情况下被丢弃。
Phase 5 补充:输出路径
AgentLoop 的输出有两条路径进入 outbound:
1 | # 路径1:流式 delta(on_stream 回调) |
两条路径的消息格式完全一致,只是 metadata 里的 _stream_delta 标记区分了流式 delta 和最终回复。ChannelManager 会根据这个标记决定是立即发送还是合并等待。
渠道层:BaseChannel 接口契约
所有渠道实现类都继承自 BaseChannel,接口定义在 nanobot/channels/base.py:
1 | # nanobot/channels/base.py |
三个抽象方法,规定了每个渠道必须实现的最小接口。start() 是一个长运行的 async 任务,通常内部是一个事件循环(轮询或 WebSocket 推送)。
权限白名单
接收消息之前要过一道权限检查:
1 | # nanobot/channels/base.py |
allow_from 为空列表时会拒绝所有人,而不是放行所有人——这是一种”默认拒绝”的安全策略,配置不到位的渠道不会意外暴露出去。
_handle_message():打包进总线
所有渠道接收到消息后都要调这个方法,它的职责是做完权限检查、把原始数据打包成 InboundMessage,然后推进总线:
1 | # nanobot/channels/base.py |
_wants_stream 是渠道和 AgentLoop 之间的约定:只要 InboundMessage 带了这个 flag,AgentLoop 就会开启流式回调,把 LLM 生成的文本一段一段地发到 outbound 队列,渠道侧就能实时渲染给用户。
ChannelManager:渠道编排
渠道是一个个孤立的实现,ChannelManager 负责把它们统一管理起来,代码在 nanobot/channels/manager.py。
渠道发现机制
nanobot 支持两种方式注册渠道,发现逻辑在 registry.py:
1 | # nanobot/channels/registry.py |
pkgutil.iter_modules 扫描包目录,找到 feishu.py、weixin.py、dingtalk.py 等所有内置模块,然后反射加载每个模块里的 BaseChannel 子类。外部插件通过 entry_points 注册,第三方开发者不改 nanobot 源码也能扩展新渠道,内置渠道优先级更高,同名时不会被外部插件覆盖。
加载完渠道类后,按 config 决定启动哪些:
1 | # nanobot/channels/manager.py |
逻辑很干净:只有在 config 里显式配了 enabled: true 的渠道才会真正实例化和启动。
outbound 消息分发
当 AgentLoop 把回复放进 outbound 队列,ChannelManager 的分发协程会消费它:
1 | # nanobot/channels/manager.py |
流式消息合并(_coalesce_stream_deltas):LLM 生成文字时可能非常快,queue 里会积压很多只有几个字的小 delta。如果每个 delta 都发一次 API 请求,接口压力很大。ChannelManager 会把同一个 (channel, chat_id) 的连续 delta 合并成一个大包再发出去,平衡延迟和频率。
指数退避重试:发送失败时会重试,延迟是 1s → 2s → 4s。CancelledError 会直接 re-raise,保证优雅关闭时能正常退出。
记忆系统与多渠道的关系
多渠道接入之后,记忆系统是如何工作的?这一块有些细节值得仔细看,不然容易产生误解。
nanobot 的记忆分三层,先把它们和磁盘文件的对应关系建立起来:
1 | workspace/ |
短期记忆:严格按 channel:chat_id 隔离
每条 InboundMessage 都有一个 session_key 属性:
1 | # nanobot/bus/events.py |
飞书用户的消息 key 是 feishu:oc_abc123,CLI 消息的 key 是 cli:direct。_process_message() 拿到这个 key,让 SessionManager 按 key 取对应的 session 文件:
1 | # nanobot/session/manager.py |
然后进 build_messages() 的 history,只来自这一个文件:
1 | # nanobot/agent/loop.py |
所以短期对话记录默认就是严格隔离的,CLI 和飞书永远看不到对方的原始消息轮次,不需要任何额外配置。
中期记忆:全局共享,会混入其他渠道的摘要
Consolidator 被触发时(某个 session 的 token 快撑爆上下文窗口),会把那个 session 里的旧消息压缩成一段摘要,追加到全局唯一的 memory/history.jsonl。
这个文件是跨渠道共享的,飞书和 CLI 各自 consolidate 之后都往同一个文件里追加。
每次对话时,ContextBuilder.build_system_prompt() 会把 dream cursor 之后还没被 Dream 消化的条目注入 system prompt:
1 | # nanobot/agent/context.py |
这意味着:如果你在飞书里聊了很多,Consolidator 把飞书的对话摘要写进了 history.jsonl,而 Dream 还没运行(默认 2 小时一次),那么你在 CLI 发消息时,system prompt 的 # Recent History 里就会包含飞书对话的摘要。
这是有意的设计取舍——nanobot 的定位是单用户个人助手,这些摘要代表的是”你最近在做什么”,从设计哲学上应该跨渠道可见。你在飞书里讨论了某个项目,切换到 CLI 继续工作,助手能感知到这个背景。
但如果你同时开了飞书和微信给多个不同用户用,中期摘要就真的会混。这个场景不在 nanobot 的主要设计目标内。
长期记忆:全局共享,Dream 提炼后写入 MEMORY.md
Dream 系统(默认每 2 小时运行一次)读取 history.jsonl 里未处理的摘要,提炼出结构化的长期事实写入 MEMORY.md。同样是全局唯一一个文件,所有渠道共享。
因此整个 system prompt 的组成结构是:
1 | [长期记忆 MEMORY.md] ← 全局共享 |
unified_session:主动打破短期隔离
如果你希望所有渠道共用同一段短期对话历史,可以开启 unified_session。
在 config.json 里配置:
1 | { |
实现原理:只改了一个 key
代码上只改了一处——_effective_session_key() 的返回值:
1 | # nanobot/agent/loop.py |
这个 effective_key 在 _dispatch() 里被用于三个地方:
1 | # nanobot/agent/loop.py - _dispatch() |
① 让 _process_message() 用 unified:default 去加载 session 文件
② 让所有渠道共用同一把串行锁
③ 让 mid-turn 注入队列也按 unified:default 路由
三个地方统一之后,所有渠道的消息完全串行处理,共用 sessions/unified_default.jsonl。
并发安全:共用锁保证全局串行
开启 unified_session 后,飞书和 CLI 同时发消息,会争抢同一把 asyncio.Lock:
1 | async with lock, gate: # lock 的 key 是 "unified:default" |
只有拿到锁的那个渠道才能进入处理,另一个在锁外等待。所以写 session 文件不存在并发问题——lock 保证了任何时刻只有一个消息在处理,也只有一个写者在操作 JSONL 文件。
回包路由:session_key 只影响读写,不影响寻址
这里有个容易误解的地方:session_key_override 覆盖的只是 session 文件的 key,而回包寻址用的是 msg.channel 和 msg.chat_id,这两个字段是从原始 InboundMessage 里直接取的,全程不变。
看 _dispatch() 里回包的构造:
1 | # 流式 delta 回包 |
session_key 是”去哪个文件里找历史记录”,channel+chat_id 是”回复发给谁”。两套信息独立,unified_session 只修改了前者,后者不受影响。
所以飞书用户发消息,bot 读的是 unified_default.jsonl 里的历史,但回包发到的仍是飞书的那个 chat_id,不会发错地方。
具体表现举例
假设开启 unified_session=true,飞书和 CLI 同时在使用:
- 飞书里你说”帮我分析一下 list.py 的性能问题”,bot 给了一段分析,这轮对话写入
sessions/unified_default.jsonl - 你切到 CLI,直接问”刚才那个分析,你觉得最值得优化的是哪一步?”
_process_message()用unified:default加载 session,能看到飞书那轮对话,直接接上回答
关闭后(默认):
- 飞书的对话写入
sessions/feishu_oc_abc123.jsonl - CLI 的对话写入
sessions/cli_direct.jsonl - 在 CLI 里问同样的问题,bot 完全不知道飞书里发生过什么,会回答”不知道你指的是哪个分析”
什么时候适合开? 你是 bot 的唯一用户,在多个客户端上轮换使用,希望 bot 记住完整的跨渠道对话上下文。多人共用同一个 bot 实例时不要开,所有人的对话历史会混在一起。
下面这张图对比了两种模式下 session 的差异:
可以看到:隔离模式下每个 channel:chat_id 对应独立的 JSONL 文件;共享模式下所有用户都指向同一个 unified_default.jsonl。
并发控制
这块笔记3已经详细讲过 pending queue 的 mid-turn 注入机制,这里只梳理三件套设计,不重复细节:
1 | # nanobot/agent/loop.py |
三件套的组合效果:同一 session 串行处理,不同 session 最多 3 路并发。NANOBOT_MAX_CONCURRENT_REQUESTS=0 可以去掉全局上限。
飞书渠道实现要点
飞书渠道的源码在 nanobot/channels/feishu.py,1700 多行。这里只讲飞书特有的设计细节,完整流程见全局时间线的 Phase 2/3/7。
线程模型:WebSocket 线程 + asyncio 主循环的桥接
飞书的 lark_oapi SDK 内部用的是同步阻塞的 WebSocket 客户端,不能直接在 asyncio 循环里跑。FeishuChannel 把它放到一个独立线程里:
1 | # nanobot/channels/feishu.py (简化) |
WebSocket 线程收到消息后,通过 run_coroutine_threadsafe() 跳回主 asyncio 循环:
1 | def _on_message_sync(self, data: Any) -> None: |
这是 Python asyncio 的标准跨线程调度接口,保证 _on_message() 能在主事件循环里安全地 await 各种异步操作。
去重与群聊策略
_on_message() 里的去重逻辑用 OrderedDict 缓存已处理的 message_id:
1 | if message_id in self._processed_message_ids: |
用 OrderedDict 而不是普通 dict,是因为需要按插入顺序淘汰,popitem(last=False) 直接拿到最先插入的那条。
群聊策略 group_policy 有两个值:"open" 处理所有群消息,"mention" 只处理 @bot 的消息。_is_bot_mentioned() 会检查消息的 mentions 列表或是否有 @_all。
反应表情反馈
收到消息时会加一个 react_emoji(默认 👍),流式输出结束时会删掉它,然后加 done_emoji(如果配了的话)。这给用户一个视觉反馈:从”正在处理”变成”已完成”。
智能格式选择
飞书的消息格式有三种:text、post、interactive。_detect_msg_format() 会根据内容自动选择:
- 有代码块、表格、标题 → interactive(能渲染 Markdown)
- 内容很长 → interactive(阅读体验更好)
- 有链接 → post(支持
<a>标签) - 短纯文本 → text
- 其他 → post
这就是为什么 bot 回复纯文本打招呼时发普通消息,但一旦输出代码或表格就变成卡片形式。
流式输出的节流机制
CardKit Streaming API 要求客户端维护一个严格递增的 sequence 序号。nanobot 在实现时加了一层节流:最快 0.5s 更新一次卡片,而不是每个 token 都发请求。
1 | # 节流逻辑 |
这是在用户体验(更新频率)和 API 限速之间的权衡。sequence 在飞书侧用于防止乱序更新,节流是在客户端侧减少请求频率。
MessageTool:发送带附件的消息
LLM 有两种方式向用户返回内容:
- 自然回复:直接在对话中输出文本,AgentLoop 自动打包成 OutboundMessage 发出去
- MessageTool:调用
message工具发送消息,这是携带文件附件的唯一方式
MessageTool 的描述强调什么
1 |
|
描述明确说:这是发送文件的唯一方式。LLM 看到这段描述,只会在一个场景调用它——需要把本地文件递交给用户。普通文本回复走自然回复就够了,没必要调用工具。
完整时间线:LLM 同时回复文件和文本
假设用户问”帮我分析这段代码并生成报告文档”,LLM 想:
- 发送生成的 PDF 文件
- 给出自然回复”报告已生成,请查收”
正确的做法是调用一次 message 工具:
1 | # LLM 的 tool_calls |
时间线流程:
用户最终收到两条消息(飞书 API 限制一条消息只能有一种类型):
1 | [消息1] 📎 report.pdf |
_sent_in_turn 机制的真正作用
1 | # AgentLoop._process_message() 结尾 |
逻辑是:
- 如果 MessageTool 已被调用(
_sent_in_turn=True) - 且满足
(没有 mid-turn 注入) 或 (自然回复为空) - 则抑制自然回复
为什么需要这个机制?
如果 LLM 调用了 message 工具发送带附件的消息,然后本轮自然回复为空(LLM 不想再说额外的话),AgentLoop 仍然会构造一个空白的 OutboundMessage 返回。用户会收到一条空白消息,体验很差。
有了 _sent_in_turn 检测,AgentLoop 发现 message 工具已发送过有意义的消息,且自然回复为空,就直接返回 None,避免发送空白消息。
例外情况: 如果本轮有 mid-turn 消息注入(用户追发了新消息),自然回复不会被抑制,因为它可能是针对追发消息的回应。
小结
一条消息从飞书到回复要经过七个阶段:gateway 启动初始化 → WebSocket 收包 → 消息处理与转发 → AgentLoop 处理 → 输出发到 outbound → ChannelManager 消费 → 回发飞书。每个阶段都有明确的职责边界,通过 MessageBus 解耦,各模块只依赖队列接口。
BaseChannel 是接口契约,
start()/stop()/send()三个抽象方法定义了每个渠道必须实现的能力,_handle_message()则是渠道和消息总线之间唯一的接入点。MessageTool 是发送文件附件的唯一方式。自然回复只是文本字符串,不能携带 media 字段。LLM 看到工具描述里强调”This is the ONLY way to deliver files”,只会在需要递交本地文件时调用它。一个 OutboundMessage 里可以同时包含 media 和 content,FeishuChannel.send() 会先逐个上传发送 media,再发送文本——用户收到的是多条消息(飞书 API 限制一条消息只能有一种类型)。
_sent_in_turn机制避免发送空白消息。当 MessageTool 已发送过带附件的消息,且自然回复为空时,AgentLoop 会抑制那条空白回复。但如果本轮有 mid-turn 消息注入,自然回复不会被抑制——它可能是针对追发消息的回应。短期 session 默认严格隔离,以
channel:chat_id为 key,每个渠道+用户有独立的 JSONL 文件,互不干扰。中期摘要(history.jsonl)和长期记忆(MEMORY.md)则是全局共享的,这是 nanobot 单用户个人助手定位的有意取舍——让助手在不同渠道之间保持对”你最近在做什么”的感知。开启unified_session=true会进一步把短期 session 也打通,适合单用户多端使用的场景。流式输出的节流机制(0.5s 最小间隔)是在用户体验和 API 限速之间的权衡,并不是每个 token 都发一次请求。