nanobot源码学习(6)-多channel接入
上篇回顾
前五篇下来,nanobot 的核心引擎已经完整地过了一遍:记忆系统、provider 层、AgentLoop 主循环、工具系统、多 agent 协作。但有一个角度一直没讲清楚——外面的消息是怎么进来的。
笔记3讲 AgentLoop 时,入口是 process_direct(),直接传字符串就能用,完全没有”渠道”的概念。但真正部署 nanobot 的时候,用户可能是在飞书里发消息、在微信里发消息,甚至同时在好几个平台上跟它聊天。nanobot 是怎么把这些渠道统一接进来的?不同渠道的对话历史怎么隔离?如果我配了 unified_session,飞书和微信又怎么共享同一段记忆?
这篇先从怎么用讲起,建立直观感受,再深入源码,把背后的机制搞清楚。
先看怎么用:接入飞书渠道
从一个具体例子入手。飞书是 nanobot 支持最完整的渠道之一,而且它有一个很大的优势:不需要公网 IP。大多数 bot 平台要求你提供一个公网可访问的 Webhook 地址,本地开发要配 ngrok 之类的内网穿透。飞书的 WebSocket 长连接模式则是 bot 主动连飞书服务器,无需暴露任何端口。
在飞书开发者平台创建应用
打开飞书开放平台,进入开发者后台
创建企业自建应用(”企业自建应用”tab → 创建应用)
进入应用,导航到 **”功能 → 机器人”**,开启机器人功能
进入 **”权限管理”**,添加以下权限:
im:message:send_as_bot(发送消息)im:message(接收消息)im:message.group_at_msg(接收群 @消息,如果需要群聊)
进入 **”事件订阅”**,订阅方式选 **”使用长连接接收事件”**,添加事件
im.message.receive_v1在 “凭证与基础信息” 里找到 App ID 和 App Secret,记下来
发布应用版本(”版本管理” → 创建版本 → 申请发布)
安装依赖
1 | pip install lark-oapi |
配置 config.json
在 nanobot 的 config.json 里加入飞书渠道配置:
1 | { |
各字段说明:
| 字段 | 说明 |
|---|---|
enabled |
必须设为 true 才会启动该渠道 |
app_id |
飞书应用的 App ID |
app_secret |
飞书应用的 App Secret |
allow_from |
白名单,"*" 表示允许所有人,也可以填具体的用户 open_id 列表(以 ou_ 开头) |
group_policy |
群聊策略:"open" 处理所有群消息,"mention" 只处理 @bot 的消息 |
streaming |
开启流式输出(打字机效果),默认 true |
react_emoji |
收到消息时添加的表情,默认 "THUMBSUP"(👍) |
done_emoji |
处理完成后添加的表情,可选,比如 "OK" |
domain |
"feishu" 或 "lark"(海外版用 lark) |
allow_from 注意:设置为空列表
[]会拒绝所有人,这是一种”默认拒绝”的安全设计,千万不要忘记设置。
启动并测试
渠道模式需要用 nanobot gateway 命令启动
1 | nanobot gateway |
启动后会看到这样的日志输出:
1 | 🐈 Starting nanobot gateway version 0.1.5 on port 18790... |
有几个地方值得注意:第一行 Skipping built-in channel 'matrix' 是正常的——matrix 渠道需要额外安装依赖,没装就跳过,不影响其他渠道。最后 [Lark] connected to wss://... 说明 WebSocket 已经和飞书服务器建立了持久连接,bot 随时可以收消息了。
在飞书里找到你的 bot,发一条消息,稍等一两秒,你会看到:
- bot 先给你的消息加一个 👍 表情(
react_emoji),表示”收到,正在处理” - 如果开了
streaming,回复会以打字机效果逐渐出现在一张卡片里 - 处理完成后,如果配了
done_emoji,👍 会变成 ✅ 之类的完成标志 - 如果回复里有代码块、表格或 Markdown 格式,会自动渲染为飞书的交互式卡片;纯文本则发普通文本消息
一条消息的完整旅程
在深入各个模块之前,先跟着一条消息走完全流程,建立全局视角:
阅读这张图的方法:从上往下,Phase 1 → Phase 7 是时间顺序。 虚线箭头表示跨阶段的数据流动(入队、出队)。
整个流程分七个阶段,每个阶段在后文都有对应章节展开:
| 阶段 | 概括 | 详见章节 |
|---|---|---|
| Phase 1 | nanobot gateway 启动,创建 MessageBus、AgentLoop、ChannelManager,扫描并启动 enabled 渠道 |
整体架构设计 |
| Phase 2 | 飞书通过 WebSocket 推送消息事件,SDK 在独立线程收包 | 飞书渠道实现详解 |
| Phase 3 | _on_message() 去重、过滤、解析,publish_inbound() 发到消息总线 |
BaseChannel 接口契约 |
| Phase 4 | AgentLoop 从 inbound 队列消费,加载 session,运行 agent loop | (笔记3已详述) |
| Phase 5 | 流式 delta 和最终回复都进入 outbound 队列 | 整体架构设计 |
| Phase 6 | ChannelManager 消费 outbound,合并 delta,重试,路由到渠道 | ChannelManager 编排层 |
| Phase 7 | FeishuChannel.send()/send_delta() 回发飞书,CardKit 流式渲染 | 飞书渠道实现详解 |
下面逐层拆解各模块的设计与实现。
整体架构设计
nanobot 的多渠道架构分三层:
阅读这张图的方法:从左往右,跟着数字标号走。
- ① 用户发消息,渠道层(FeishuChannel/CLIChannel)收到后调用
publish_inbound()放进 inbound 队列 - ② AgentLoop 从 inbound 队列取出消息,运行 agent loop(加载 session、调用 LLM、执行工具)
- ③ AgentLoop 生成回复,调用
publish_outbound()放进 outbound 队列 - ④ ChannelManager 从 outbound 队列取出回复,按
msg.channel字段路由到对应的渠道实例 - ⑤ 渠道实例调用
send()把回复发回给用户
整个流程的关键:渠道层和 AgentLoop 之间没有直接依赖,唯一的通信管道是 MessageBus 的两条队列。渠道层不需要知道 AgentLoop 的存在,AgentLoop 也不需要知道消息从哪个渠道来——它只是从 inbound 队列取消息,处理完往 outbound 队列丢回复。
MessageBus:唯一的通信管道
MessageBus 的实现只有两个 asyncio.Queue:
1 | # nanobot/bus/queue.py |
所有消息都是 JSON 序列化友好的 dataclass,没有回调,没有共享状态,没有跨线程引用。这个设计让整个系统高度可测试——你可以往 queue 里直接丢 InboundMessage,然后检查 outbound 队列里的回复。
Phase 1 补充:Gateway 启动顺序
nanobot gateway 命令执行后,commands.py 的 gateway() 函数按以下顺序构造依赖:
1 | # nanobot/cli/commands.py (简化) |
关键点:AgentLoop 先于渠道启动。这保证了当第一条消息进入 inbound 队列时,消费者已经就绪。如果顺序反过来,渠道的消息可能在没有消费者的情况下被丢弃。
Phase 5 补充:输出路径
AgentLoop 的输出有两条路径进入 outbound:
1 | # 路径1:流式 delta(on_stream 回调) |
两条路径的消息格式完全一致,只是 metadata 里的 _stream_delta 标记区分了流式 delta 和最终回复。ChannelManager 会根据这个标记决定是立即发送还是合并等待。
渠道层:BaseChannel 接口契约
所有渠道实现类都继承自 BaseChannel,接口定义在 nanobot/channels/base.py:
1 | # nanobot/channels/base.py |
三个抽象方法,规定了每个渠道必须实现的最小接口。start() 是一个长运行的 async 任务,通常内部是一个事件循环(轮询或 WebSocket 推送)。
权限白名单
接收消息之前要过一道权限检查:
1 | # nanobot/channels/base.py |
allow_from 为空列表时会拒绝所有人,而不是放行所有人——这是一种”默认拒绝”的安全策略,配置不到位的渠道不会意外暴露出去。
_handle_message():打包进总线
所有渠道接收到消息后都要调这个方法,它的职责是做完权限检查、把原始数据打包成 InboundMessage,然后推进总线:
1 | # nanobot/channels/base.py |
_wants_stream 是渠道和 AgentLoop 之间的约定:只要 InboundMessage 带了这个 flag,AgentLoop 就会开启流式回调,把 LLM 生成的文本一段一段地发到 outbound 队列,渠道侧就能实时渲染给用户。
ChannelManager:渠道编排
渠道是一个个孤立的实现,ChannelManager 负责把它们统一管理起来,代码在 nanobot/channels/manager.py。
渠道发现机制
nanobot 支持两种方式注册渠道,发现逻辑在 registry.py:
1 | # nanobot/channels/registry.py |
pkgutil.iter_modules 扫描包目录,找到 feishu.py、weixin.py、dingtalk.py 等所有内置模块,然后反射加载每个模块里的 BaseChannel 子类。外部插件通过 entry_points 注册,第三方开发者不改 nanobot 源码也能扩展新渠道,内置渠道优先级更高,同名时不会被外部插件覆盖。
加载完渠道类后,按 config 决定启动哪些:
1 | # nanobot/channels/manager.py |
逻辑很干净:只有在 config 里显式配了 enabled: true 的渠道才会真正实例化和启动。
outbound 消息分发
当 AgentLoop 把回复放进 outbound 队列,ChannelManager 的分发协程会消费它:
1 | # nanobot/channels/manager.py |
流式消息合并(_coalesce_stream_deltas):LLM 生成文字时可能非常快,queue 里会积压很多只有几个字的小 delta。如果每个 delta 都发一次 API 请求,接口压力很大。ChannelManager 会把同一个 (channel, chat_id) 的连续 delta 合并成一个大包再发出去,平衡延迟和频率。
指数退避重试:发送失败时会重试,延迟是 1s → 2s → 4s。CancelledError 会直接 re-raise,保证优雅关闭时能正常退出。
记忆系统与多渠道的关系
多渠道接入之后,记忆系统是如何工作的?这一块有些细节值得仔细看,不然容易产生误解。
nanobot 的记忆分三层,先把它们和磁盘文件的对应关系建立起来:
1 | workspace/ |
短期记忆:严格按 channel:chat_id 隔离
每条 InboundMessage 都有一个 session_key 属性:
1 | # nanobot/bus/events.py |
飞书用户的消息 key 是 feishu:oc_abc123,CLI 消息的 key 是 cli:direct。_process_message() 拿到这个 key,让 SessionManager 按 key 取对应的 session 文件:
1 | # nanobot/session/manager.py |
然后进 build_messages() 的 history,只来自这一个文件:
1 | # nanobot/agent/loop.py |
所以短期对话记录默认就是严格隔离的,CLI 和飞书永远看不到对方的原始消息轮次,不需要任何额外配置。
中期记忆:全局共享,会混入其他渠道的摘要
Consolidator 被触发时(某个 session 的 token 快撑爆上下文窗口),会把那个 session 里的旧消息压缩成一段摘要,追加到全局唯一的 memory/history.jsonl。
这个文件是跨渠道共享的,飞书和 CLI 各自 consolidate 之后都往同一个文件里追加。
每次对话时,ContextBuilder.build_system_prompt() 会把 dream cursor 之后还没被 Dream 消化的条目注入 system prompt:
1 | # nanobot/agent/context.py |
这意味着:如果你在飞书里聊了很多,Consolidator 把飞书的对话摘要写进了 history.jsonl,而 Dream 还没运行(默认 2 小时一次),那么你在 CLI 发消息时,system prompt 的 # Recent History 里就会包含飞书对话的摘要。
这是有意的设计取舍——nanobot 的定位是单用户个人助手,这些摘要代表的是”你最近在做什么”,从设计哲学上应该跨渠道可见。你在飞书里讨论了某个项目,切换到 CLI 继续工作,助手能感知到这个背景。
但如果你同时开了飞书和微信给多个不同用户用,中期摘要就真的会混。这个场景不在 nanobot 的主要设计目标内。
长期记忆:全局共享,Dream 提炼后写入 MEMORY.md
Dream 系统(默认每 2 小时运行一次)读取 history.jsonl 里未处理的摘要,提炼出结构化的长期事实写入 MEMORY.md。同样是全局唯一一个文件,所有渠道共享。
因此整个 system prompt 的组成结构是:
1 | [长期记忆 MEMORY.md] ← 全局共享 |
unified_session:主动打破短期隔离
如果你希望所有渠道共用同一段短期对话历史,可以开启 unified_session。
在 config.json 里配置:
1 | { |
实现原理:只改了一个 key
代码上只改了一处——_effective_session_key() 的返回值:
1 | # nanobot/agent/loop.py |
这个 effective_key 在 _dispatch() 里被用于三个地方:
1 | # nanobot/agent/loop.py - _dispatch() |
① 让 _process_message() 用 unified:default 去加载 session 文件
② 让所有渠道共用同一把串行锁
③ 让 mid-turn 注入队列也按 unified:default 路由
三个地方统一之后,所有渠道的消息完全串行处理,共用 sessions/unified_default.jsonl。
并发安全:共用锁保证全局串行
开启 unified_session 后,飞书和 CLI 同时发消息,会争抢同一把 asyncio.Lock:
1 | async with lock, gate: # lock 的 key 是 "unified:default" |
只有拿到锁的那个渠道才能进入处理,另一个在锁外等待。所以写 session 文件不存在并发问题——lock 保证了任何时刻只有一个消息在处理,也只有一个写者在操作 JSONL 文件。
回包路由:session_key 只影响读写,不影响寻址
这里有个容易误解的地方:session_key_override 覆盖的只是 session 文件的 key,而回包寻址用的是 msg.channel 和 msg.chat_id,这两个字段是从原始 InboundMessage 里直接取的,全程不变。
看 _dispatch() 里回包的构造:
1 | # 流式 delta 回包 |
session_key 是”去哪个文件里找历史记录”,channel+chat_id 是”回复发给谁”。两套信息独立,unified_session 只修改了前者,后者不受影响。
所以飞书用户发消息,bot 读的是 unified_default.jsonl 里的历史,但回包发到的仍是飞书的那个 chat_id,不会发错地方。
具体表现举例
假设开启 unified_session=true,飞书和 CLI 同时在使用:
- 飞书里你说”帮我分析一下 list.py 的性能问题”,bot 给了一段分析,这轮对话写入
sessions/unified_default.jsonl - 你切到 CLI,直接问”刚才那个分析,你觉得最值得优化的是哪一步?”
_process_message()用unified:default加载 session,能看到飞书那轮对话,直接接上回答
关闭后(默认):
- 飞书的对话写入
sessions/feishu_oc_abc123.jsonl - CLI 的对话写入
sessions/cli_direct.jsonl - 在 CLI 里问同样的问题,bot 完全不知道飞书里发生过什么,会回答”不知道你指的是哪个分析”
什么时候适合开? 你是 bot 的唯一用户,在多个客户端上轮换使用,希望 bot 记住完整的跨渠道对话上下文。多人共用同一个 bot 实例时不要开,所有人的对话历史会混在一起。
下面这张图对比了两种模式下 session 的差异:
阅读这张图的方法:上半部分是默认隔离模式,下半部分是 unified_session=true 的共享模式。 对比左右两边可以看到:隔离模式下每个 channel:chat_id 对应独立的 JSONL 文件;共享模式下所有用户都指向同一个 unified_default.jsonl。
并发控制
这块笔记3已经详细讲过 pending queue 的 mid-turn 注入机制,这里只梳理三件套设计,不重复细节:
1 | # nanobot/agent/loop.py |
三件套的组合效果:同一 session 串行处理,不同 session 最多 3 路并发。NANOBOT_MAX_CONCURRENT_REQUESTS=0 可以去掉全局上限。
飞书渠道实现要点
飞书渠道的源码在 nanobot/channels/feishu.py,1700 多行。这里只讲飞书特有的设计细节,完整流程见全局时间线的 Phase 2/3/7。
线程模型:WebSocket 线程 + asyncio 主循环的桥接
飞书的 lark_oapi SDK 内部用的是同步阻塞的 WebSocket 客户端,不能直接在 asyncio 循环里跑。FeishuChannel 把它放到一个独立线程里:
1 | # nanobot/channels/feishu.py (简化) |
WebSocket 线程收到消息后,通过 run_coroutine_threadsafe() 跳回主 asyncio 循环:
1 | def _on_message_sync(self, data: Any) -> None: |
这是 Python asyncio 的标准跨线程调度接口,保证 _on_message() 能在主事件循环里安全地 await 各种异步操作。
去重与群聊策略
_on_message() 里的去重逻辑用 OrderedDict 缓存已处理的 message_id:
1 | if message_id in self._processed_message_ids: |
用 OrderedDict 而不是普通 dict,是因为需要按插入顺序淘汰,popitem(last=False) 直接拿到最先插入的那条。
群聊策略 group_policy 有两个值:"open" 处理所有群消息,"mention" 只处理 @bot 的消息。_is_bot_mentioned() 会检查消息的 mentions 列表或是否有 @_all。
反应表情反馈
收到消息时会加一个 react_emoji(默认 👍),流式输出结束时会删掉它,然后加 done_emoji(如果配了的话)。这给用户一个视觉反馈:从”正在处理”变成”已完成”。
智能格式选择
飞书的消息格式有三种:text、post、interactive。_detect_msg_format() 会根据内容自动选择:
- 有代码块、表格、标题 → interactive(能渲染 Markdown)
- 内容很长 → interactive(阅读体验更好)
- 有链接 → post(支持
<a>标签) - 短纯文本 → text
- 其他 → post
这就是为什么 bot 回复纯文本打招呼时发普通消息,但一旦输出代码或表格就变成卡片形式。
流式输出的节流机制
CardKit Streaming API 要求客户端维护一个严格递增的 sequence 序号。nanobot 在实现时加了一层节流:最快 0.5s 更新一次卡片,而不是每个 token 都发请求。
1 | # 节流逻辑 |
这是在用户体验(更新频率)和 API 限速之间的权衡。sequence 在飞书侧用于防止乱序更新,节流是在客户端侧减少请求频率。
小结
一条消息从飞书到回复要经过七个阶段:gateway 启动初始化 → WebSocket 收包 → 消息处理与转发 → AgentLoop 处理 → 输出发到 outbound → ChannelManager 消费 → 回发飞书。每个阶段都有明确的职责边界,通过 MessageBus 解耦,各模块只依赖队列接口。
BaseChannel 是接口契约,
start()/stop()/send()三个抽象方法定义了每个渠道必须实现的能力,_handle_message()则是渠道和消息总线之间唯一的接入点。短期 session 默认严格隔离,以
channel:chat_id为 key,每个渠道+用户有独立的 JSONL 文件,互不干扰。中期摘要(history.jsonl)和长期记忆(MEMORY.md)则是全局共享的,这是 nanobot 单用户个人助手定位的有意取舍——让助手在不同渠道之间保持对”你最近在做什么”的感知。开启unified_session=true会进一步把短期 session 也打通,适合单用户多端使用的场景。流式输出的节流机制(0.5s 最小间隔)是在用户体验和 API 限速之间的权衡,并不是每个 token 都发一次请求。