上篇回顾

前五篇下来,nanobot 的核心引擎已经完整地过了一遍:记忆系统、provider 层、AgentLoop 主循环、工具系统、多 agent 协作。但有一个角度一直没讲清楚——外面的消息是怎么进来的

笔记3讲 AgentLoop 时,入口是 process_direct(),直接传字符串就能用,完全没有”渠道”的概念。但真正部署 nanobot 的时候,用户可能是在飞书里发消息、在微信里发消息,甚至同时在好几个平台上跟它聊天。nanobot 是怎么把这些渠道统一接进来的?不同渠道的对话历史怎么隔离?如果我配了 unified_session,飞书和微信又怎么共享同一段记忆?

这篇先从怎么用讲起,建立直观感受,再深入源码,把背后的机制搞清楚。


先看怎么用:接入飞书渠道

从一个具体例子入手。飞书是 nanobot 支持最完整的渠道之一,而且它有一个很大的优势:不需要公网 IP。大多数 bot 平台要求你提供一个公网可访问的 Webhook 地址,本地开发要配 ngrok 之类的内网穿透。飞书的 WebSocket 长连接模式则是 bot 主动连飞书服务器,无需暴露任何端口。

在飞书开发者平台创建应用

  1. 打开飞书开放平台,进入开发者后台

  2. 创建企业自建应用(”企业自建应用”tab → 创建应用)

  3. 进入应用,导航到 **”功能 → 机器人”**,开启机器人功能

  4. 进入 **”权限管理”**,添加以下权限:

    • im:message:send_as_bot(发送消息)
    • im:message(接收消息)
    • im:message.group_at_msg(接收群 @消息,如果需要群聊)
  5. 进入 **”事件订阅”**,订阅方式选 **”使用长连接接收事件”**,添加事件 im.message.receive_v1

  6. “凭证与基础信息” 里找到 App IDApp Secret,记下来

  7. 发布应用版本(”版本管理” → 创建版本 → 申请发布)

安装依赖

1
pip install lark-oapi

配置 config.json

在 nanobot 的 config.json 里加入飞书渠道配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"channels": {
"feishu": {
"enabled": true,
"app_id": "cli_xxxxxxxxxxxxxxxx",
"app_secret": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"allow_from": ["*"],
"group_policy": "mention",
"streaming": true,
"react_emoji": "THUMBSUP",
"done_emoji": "OK",
"domain": "feishu"
}
}
}

各字段说明:

字段 说明
enabled 必须设为 true 才会启动该渠道
app_id 飞书应用的 App ID
app_secret 飞书应用的 App Secret
allow_from 白名单,"*" 表示允许所有人,也可以填具体的用户 open_id 列表(以 ou_ 开头)
group_policy 群聊策略:"open" 处理所有群消息,"mention" 只处理 @bot 的消息
streaming 开启流式输出(打字机效果),默认 true
react_emoji 收到消息时添加的表情,默认 "THUMBSUP"(👍)
done_emoji 处理完成后添加的表情,可选,比如 "OK"
domain "feishu""lark"(海外版用 lark)

allow_from 注意:设置为空列表 [] 会拒绝所有人,这是一种”默认拒绝”的安全设计,千万不要忘记设置。

启动并测试

渠道模式需要用 nanobot gateway 命令启动

1
nanobot gateway

启动后会看到这样的日志输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
🐈 Starting nanobot gateway version 0.1.5 on port 18790...
DEBUG nanobot.channels.registry Skipping built-in channel 'matrix': Matrix dependencies not installed.
INFO nanobot.channels.manager Feishu channel enabled
✓ Channels enabled: feishu
✓ Heartbeat: every 1800s
✓ Dream: every 2h
INFO nanobot.cron.service Cron service started with 1 jobs
INFO nanobot.agent.loop Agent loop started
INFO nanobot.channels.manager Starting feishu channel...
INFO nanobot.channels.manager Outbound dispatcher started
✓ Health endpoint: http://127.0.0.1:18790/health
INFO nanobot.channels.feishu Feishu bot open_id: ou_xxxxxxxxxxxxxxxx
INFO nanobot.channels.feishu Feishu bot started with WebSocket long connection
INFO nanobot.channels.feishu No public IP required - using WebSocket to receive events
[Lark] connected to wss://msg-frontier.feishu.cn/ws/v2?... [conn_id=xxx]

有几个地方值得注意:第一行 Skipping built-in channel 'matrix' 是正常的——matrix 渠道需要额外安装依赖,没装就跳过,不影响其他渠道。最后 [Lark] connected to wss://... 说明 WebSocket 已经和飞书服务器建立了持久连接,bot 随时可以收消息了。

在飞书里找到你的 bot,发一条消息,稍等一两秒,你会看到:

  1. bot 先给你的消息加一个 👍 表情(react_emoji),表示”收到,正在处理”
  2. 如果开了 streaming,回复会以打字机效果逐渐出现在一张卡片里
  3. 处理完成后,如果配了 done_emoji,👍 会变成 ✅ 之类的完成标志
  4. 如果回复里有代码块、表格或 Markdown 格式,会自动渲染为飞书的交互式卡片;纯文本则发普通文本消息

一条消息的完整旅程

在深入各个模块之前,先跟着一条消息走完全流程,建立全局视角:

全局时间线

阅读这张图的方法:从上往下,Phase 1 → Phase 7 是时间顺序。 虚线箭头表示跨阶段的数据流动(入队、出队)。

整个流程分七个阶段,每个阶段在后文都有对应章节展开:

阶段 概括 详见章节
Phase 1 nanobot gateway 启动,创建 MessageBus、AgentLoop、ChannelManager,扫描并启动 enabled 渠道 整体架构设计
Phase 2 飞书通过 WebSocket 推送消息事件,SDK 在独立线程收包 飞书渠道实现详解
Phase 3 _on_message() 去重、过滤、解析,publish_inbound() 发到消息总线 BaseChannel 接口契约
Phase 4 AgentLoop 从 inbound 队列消费,加载 session,运行 agent loop (笔记3已详述)
Phase 5 流式 delta 和最终回复都进入 outbound 队列 整体架构设计
Phase 6 ChannelManager 消费 outbound,合并 delta,重试,路由到渠道 ChannelManager 编排层
Phase 7 FeishuChannel.send()/send_delta() 回发飞书,CardKit 流式渲染 飞书渠道实现详解

下面逐层拆解各模块的设计与实现。


整体架构设计

nanobot 的多渠道架构分三层:

整体架构时序

阅读这张图的方法:从左往右,跟着数字标号走。

  • ① 用户发消息,渠道层(FeishuChannel/CLIChannel)收到后调用 publish_inbound() 放进 inbound 队列
  • ② AgentLoop 从 inbound 队列取出消息,运行 agent loop(加载 session、调用 LLM、执行工具)
  • ③ AgentLoop 生成回复,调用 publish_outbound() 放进 outbound 队列
  • ④ ChannelManager 从 outbound 队列取出回复,按 msg.channel 字段路由到对应的渠道实例
  • ⑤ 渠道实例调用 send() 把回复发回给用户

整个流程的关键:渠道层和 AgentLoop 之间没有直接依赖,唯一的通信管道是 MessageBus 的两条队列。渠道层不需要知道 AgentLoop 的存在,AgentLoop 也不需要知道消息从哪个渠道来——它只是从 inbound 队列取消息,处理完往 outbound 队列丢回复。

MessageBus:唯一的通信管道

MessageBus 的实现只有两个 asyncio.Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# nanobot/bus/queue.py
class MessageBus:
def __init__(self):
self._inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self._outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()

async def publish_inbound(self, msg: InboundMessage) -> None:
await self._inbound.put(msg)

async def consume_inbound(self) -> InboundMessage:
return await self._inbound.get()

async def publish_outbound(self, msg: OutboundMessage) -> None:
await self._outbound.put(msg)

async def consume_outbound(self) -> OutboundMessage:
return await self._outbound.get()

所有消息都是 JSON 序列化友好的 dataclass,没有回调,没有共享状态,没有跨线程引用。这个设计让整个系统高度可测试——你可以往 queue 里直接丢 InboundMessage,然后检查 outbound 队列里的回复。

Phase 1 补充:Gateway 启动顺序

nanobot gateway 命令执行后,commands.pygateway() 函数按以下顺序构造依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# nanobot/cli/commands.py (简化)
bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)

agent = AgentLoop(
bus=bus,
provider=provider,
sessions=session_manager,
channels_config=config.channels,
...
)

channels = ChannelManager(config, bus)

asyncio.run(asyncio.gather(
agent.run(), # 启动 AgentLoop 消费循环
channels.start_all(), # 启动所有 enabled 渠道
_health_server(...), # 健康检查 HTTP 服务
))

关键点:AgentLoop 先于渠道启动。这保证了当第一条消息进入 inbound 队列时,消费者已经就绪。如果顺序反过来,渠道的消息可能在没有消费者的情况下被丢弃。

Phase 5 补充:输出路径

AgentLoop 的输出有两条路径进入 outbound:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 路径1:流式 delta(on_stream 回调)
async def on_stream(delta: str) -> None:
await bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=delta,
metadata={"_stream_delta": True, "_stream_id": stream_id},
))

# 路径2:最终回复(_process_message 返回值)
response = OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
)
await bus.publish_outbound(response)

两条路径的消息格式完全一致,只是 metadata 里的 _stream_delta 标记区分了流式 delta 和最终回复。ChannelManager 会根据这个标记决定是立即发送还是合并等待。


渠道层:BaseChannel 接口契约

所有渠道实现类都继承自 BaseChannel,接口定义在 nanobot/channels/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# nanobot/channels/base.py
class BaseChannel(ABC):
name: str = "base"
display_name: str = "Base"

def __init__(self, config: Any, bus: MessageBus):
self.config = config
self.bus = bus
self._running = False

@abstractmethod
async def start(self) -> None:
"""连接平台,开始监听消息(长运行任务)"""
pass

@abstractmethod
async def stop(self) -> None:
"""断开连接,清理资源"""
pass

@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""发送一条消息"""
pass

三个抽象方法,规定了每个渠道必须实现的最小接口。start() 是一个长运行的 async 任务,通常内部是一个事件循环(轮询或 WebSocket 推送)。

权限白名单

接收消息之前要过一道权限检查:

1
2
3
4
5
6
7
8
9
10
# nanobot/channels/base.py
def is_allowed(self, sender_id: str) -> bool:
"""空列表→拒绝所有;"*"→允许所有。"""
allow_list = ... # 从 config.allow_from 读取
if not allow_list:
logger.warning("{}: allow_from is empty — all access denied", self.name)
return False
if "*" in allow_list:
return True
return str(sender_id) in allow_list

allow_from 为空列表时会拒绝所有人,而不是放行所有人——这是一种”默认拒绝”的安全策略,配置不到位的渠道不会意外暴露出去。

_handle_message():打包进总线

所有渠道接收到消息后都要调这个方法,它的职责是做完权限检查、把原始数据打包成 InboundMessage,然后推进总线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# nanobot/channels/base.py
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None,
session_key: str | None = None,
) -> None:
if not self.is_allowed(sender_id):
return

meta = metadata or {}
if self.supports_streaming:
meta = {**meta, "_wants_stream": True} # 通知 AgentLoop 这个渠道支持流式

msg = InboundMessage(
channel=self.name, # "feishu" / "weixin" / "cli" ...
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
metadata=meta,
session_key_override=session_key,
)

await self.bus.publish_inbound(msg)

_wants_stream 是渠道和 AgentLoop 之间的约定:只要 InboundMessage 带了这个 flag,AgentLoop 就会开启流式回调,把 LLM 生成的文本一段一段地发到 outbound 队列,渠道侧就能实时渲染给用户。


ChannelManager:渠道编排

渠道是一个个孤立的实现,ChannelManager 负责把它们统一管理起来,代码在 nanobot/channels/manager.py

渠道发现机制

nanobot 支持两种方式注册渠道,发现逻辑在 registry.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# nanobot/channels/registry.py
def discover_channel_names() -> list[str]:
"""扫描 nanobot.channels 包下所有模块(不含 base/manager/registry)"""
import nanobot.channels as pkg
return [
name
for _, name, ispkg in pkgutil.iter_modules(pkg.__path__)
if name not in _INTERNAL and not ispkg
]

def discover_plugins() -> dict[str, type[BaseChannel]]:
"""通过 entry_points 发现外部插件渠道"""
for ep in entry_points(group="nanobot.channels"):
cls = ep.load()
plugins[ep.name] = cls
return plugins

pkgutil.iter_modules 扫描包目录,找到 feishu.pyweixin.pydingtalk.py 等所有内置模块,然后反射加载每个模块里的 BaseChannel 子类。外部插件通过 entry_points 注册,第三方开发者不改 nanobot 源码也能扩展新渠道,内置渠道优先级更高,同名时不会被外部插件覆盖。

加载完渠道类后,按 config 决定启动哪些:

1
2
3
4
5
6
7
8
9
10
11
# nanobot/channels/manager.py
def _init_channels(self) -> None:
for name, cls in discover_all().items():
section = getattr(self.config.channels, name, None)
if section is None:
continue # config 里没有这个渠道的配置,跳过
enabled = section.get("enabled", False)
if not enabled:
continue # enabled=false,跳过
channel = cls(section, self.bus)
self.channels[name] = channel

逻辑很干净:只有在 config 里显式配了 enabled: true 的渠道才会真正实例化和启动。

outbound 消息分发

当 AgentLoop 把回复放进 outbound 队列,ChannelManager 的分发协程会消费它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# nanobot/channels/manager.py
async def _dispatch_outbound(self) -> None:
while True:
msg = await asyncio.wait_for(self.bus.consume_outbound(), timeout=1.0)

# 进度消息过滤(根据配置决定是否发送工具调用提示)
if msg.metadata.get("_progress"):
if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints:
continue
...

# 流式消息合并(减少 API 调用次数)
if msg.metadata.get("_stream_delta") and not msg.metadata.get("_stream_end"):
msg, extra_pending = self._coalesce_stream_deltas(msg)
pending.extend(extra_pending)

channel = self.channels.get(msg.channel)
if channel:
await self._send_with_retry(channel, msg)

流式消息合并_coalesce_stream_deltas):LLM 生成文字时可能非常快,queue 里会积压很多只有几个字的小 delta。如果每个 delta 都发一次 API 请求,接口压力很大。ChannelManager 会把同一个 (channel, chat_id) 的连续 delta 合并成一个大包再发出去,平衡延迟和频率。

指数退避重试:发送失败时会重试,延迟是 1s → 2s → 4s。CancelledError 会直接 re-raise,保证优雅关闭时能正常退出。


记忆系统与多渠道的关系

多渠道接入之后,记忆系统是如何工作的?这一块有些细节值得仔细看,不然容易产生误解。

nanobot 的记忆分三层,先把它们和磁盘文件的对应关系建立起来:

1
2
3
4
5
6
7
8
9
10
workspace/
├── memory/
│ ├── MEMORY.md ← 长期记忆(所有渠道共享)
│ └── history.jsonl ← 中期摘要(所有渠道共享)
├── sessions/
│ ├── cli_direct.jsonl ← 短期记忆(CLI 专属)
│ ├── feishu_oc_abc123.jsonl ← 短期记忆(飞书用户A专属)
│ └── unified_default.jsonl ← 短期记忆(unified_session=true 时所有渠道共用)
├── SOUL.md
└── USER.md

短期记忆:严格按 channel:chat_id 隔离

每条 InboundMessage 都有一个 session_key 属性:

1
2
3
4
# nanobot/bus/events.py
@property
def session_key(self) -> str:
return self.session_key_override or f"{self.channel}:{self.chat_id}"

飞书用户的消息 key 是 feishu:oc_abc123,CLI 消息的 key 是 cli:direct_process_message() 拿到这个 key,让 SessionManager 按 key 取对应的 session 文件:

1
2
3
4
5
# nanobot/session/manager.py
def _get_session_path(self, key: str) -> Path:
safe_key = safe_filename(key.replace(":", "_"))
return self.sessions_dir / f"{safe_key}.jsonl"
# "feishu:oc_abc123" → sessions/feishu_oc_abc123.jsonl

然后进 build_messages() 的 history,只来自这一个文件:

1
2
3
4
5
# nanobot/agent/loop.py
key = session_key or msg.session_key
session = self.sessions.get_or_create(key) # 只加载这个 key 对应的文件
history = session.get_history(max_messages=0)
# session.get_history() 只返回 session.messages[last_consolidated:] 里的消息

所以短期对话记录默认就是严格隔离的,CLI 和飞书永远看不到对方的原始消息轮次,不需要任何额外配置。

中期记忆:全局共享,会混入其他渠道的摘要

Consolidator 被触发时(某个 session 的 token 快撑爆上下文窗口),会把那个 session 里的旧消息压缩成一段摘要,追加到全局唯一的 memory/history.jsonl

这个文件是跨渠道共享的,飞书和 CLI 各自 consolidate 之后都往同一个文件里追加。

每次对话时,ContextBuilder.build_system_prompt() 会把 dream cursor 之后还没被 Dream 消化的条目注入 system prompt:

1
2
3
4
5
6
7
# nanobot/agent/context.py
entries = self.memory.read_unprocessed_history(since_cursor=self.memory.get_last_dream_cursor())
if entries:
capped = entries[-self._MAX_RECENT_HISTORY:] # 最多 50 条
parts.append("# Recent History\n\n" + "\n".join(
f"- [{e['timestamp']}] {e['content']}" for e in capped
))

这意味着:如果你在飞书里聊了很多,Consolidator 把飞书的对话摘要写进了 history.jsonl,而 Dream 还没运行(默认 2 小时一次),那么你在 CLI 发消息时,system prompt 的 # Recent History 里就会包含飞书对话的摘要。

这是有意的设计取舍——nanobot 的定位是单用户个人助手,这些摘要代表的是”你最近在做什么”,从设计哲学上应该跨渠道可见。你在飞书里讨论了某个项目,切换到 CLI 继续工作,助手能感知到这个背景。

但如果你同时开了飞书和微信给多个不同用户用,中期摘要就真的会混。这个场景不在 nanobot 的主要设计目标内。

长期记忆:全局共享,Dream 提炼后写入 MEMORY.md

Dream 系统(默认每 2 小时运行一次)读取 history.jsonl 里未处理的摘要,提炼出结构化的长期事实写入 MEMORY.md。同样是全局唯一一个文件,所有渠道共享。

因此整个 system prompt 的组成结构是:

1
2
3
4
[长期记忆 MEMORY.md]                ← 全局共享
[未被 Dream 消化的中期摘要] ← 全局共享,可能混有其他渠道
[当前 session 的短期对话 history] ← 严格隔离,只属于当前 channel:chat_id
[当前用户消息]

unified_session:主动打破短期隔离

如果你希望所有渠道共用同一段短期对话历史,可以开启 unified_session

config.json 里配置:

1
2
3
4
5
6
7
{
"agents": {
"defaults": {
"unified_session": true
}
}
}

实现原理:只改了一个 key

代码上只改了一处——_effective_session_key() 的返回值:

1
2
3
4
5
6
7
# nanobot/agent/loop.py
UNIFIED_SESSION_KEY = "unified:default"

def _effective_session_key(self, msg: InboundMessage) -> str:
if self._unified_session and not msg.session_key_override:
return UNIFIED_SESSION_KEY # 所有渠道都用同一个 key
return msg.session_key # 默认:channel:chat_id

这个 effective_key_dispatch() 里被用于三个地方:

1
2
3
4
5
6
7
8
# nanobot/agent/loop.py - _dispatch()
session_key = self._effective_session_key(msg)
if session_key != msg.session_key:
msg = dataclasses.replace(msg, session_key_override=session_key) # ①覆盖 msg 的 session key

lock = self._session_locks.setdefault(session_key, asyncio.Lock()) # ②用 effective_key 拿锁
pending = asyncio.Queue(maxsize=20)
self._pending_queues[session_key] = pending # ③注册 pending 队列

① 让 _process_message()unified:default 去加载 session 文件
② 让所有渠道共用同一把串行锁
③ 让 mid-turn 注入队列也按 unified:default 路由

三个地方统一之后,所有渠道的消息完全串行处理,共用 sessions/unified_default.jsonl

并发安全:共用锁保证全局串行

开启 unified_session 后,飞书和 CLI 同时发消息,会争抢同一把 asyncio.Lock

1
2
async with lock, gate:   # lock 的 key 是 "unified:default"
response = await self._process_message(...)

只有拿到锁的那个渠道才能进入处理,另一个在锁外等待。所以写 session 文件不存在并发问题——lock 保证了任何时刻只有一个消息在处理,也只有一个写者在操作 JSONL 文件。

回包路由:session_key 只影响读写,不影响寻址

这里有个容易误解的地方:session_key_override 覆盖的只是 session 文件的 key,而回包寻址用的是 msg.channelmsg.chat_id,这两个字段是从原始 InboundMessage 里直接取的,全程不变。

_dispatch() 里回包的构造:

1
2
3
4
5
6
7
8
9
10
11
# 流式 delta 回包
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel, # 原始 InboundMessage 的 channel,"feishu" 或 "cli"
chat_id=msg.chat_id, # 原始 InboundMessage 的 chat_id
content=delta,
))

# 最终回包
if response is not None:
await self.bus.publish_outbound(response)
# _process_message() 里构造的 response 也是用 msg.channel / msg.chat_id

session_key 是”去哪个文件里找历史记录”,channel+chat_id 是”回复发给谁”。两套信息独立,unified_session 只修改了前者,后者不受影响。

所以飞书用户发消息,bot 读的是 unified_default.jsonl 里的历史,但回包发到的仍是飞书的那个 chat_id,不会发错地方。

具体表现举例

假设开启 unified_session=true,飞书和 CLI 同时在使用:

  • 飞书里你说”帮我分析一下 list.py 的性能问题”,bot 给了一段分析,这轮对话写入 sessions/unified_default.jsonl
  • 你切到 CLI,直接问”刚才那个分析,你觉得最值得优化的是哪一步?”
  • _process_message()unified:default 加载 session,能看到飞书那轮对话,直接接上回答

关闭后(默认):

  • 飞书的对话写入 sessions/feishu_oc_abc123.jsonl
  • CLI 的对话写入 sessions/cli_direct.jsonl
  • 在 CLI 里问同样的问题,bot 完全不知道飞书里发生过什么,会回答”不知道你指的是哪个分析”

什么时候适合开? 你是 bot 的唯一用户,在多个客户端上轮换使用,希望 bot 记住完整的跨渠道对话上下文。多人共用同一个 bot 实例时不要开,所有人的对话历史会混在一起。

下面这张图对比了两种模式下 session 的差异:

Session隔离与共享

阅读这张图的方法:上半部分是默认隔离模式,下半部分是 unified_session=true 的共享模式。 对比左右两边可以看到:隔离模式下每个 channel:chat_id 对应独立的 JSONL 文件;共享模式下所有用户都指向同一个 unified_default.jsonl


并发控制

这块笔记3已经详细讲过 pending queue 的 mid-turn 注入机制,这里只梳理三件套设计,不重复细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
# nanobot/agent/loop.py
# 1. per-session 串行锁:同一个 session 里的消息不会并发处理
self._session_locks: dict[str, asyncio.Lock] = {}

# 2. 全局并发门:控制同时处理多少个不同 session 的消息
# 默认值3,可以通过环境变量调整
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
self._concurrency_gate: asyncio.Semaphore | None = (
asyncio.Semaphore(_max) if _max > 0 else None
)

# 3. mid-turn 注入队列:session 处理中时,新消息入队而不是创建新任务
self._pending_queues: dict[str, asyncio.Queue] = {}

三件套的组合效果:同一 session 串行处理,不同 session 最多 3 路并发NANOBOT_MAX_CONCURRENT_REQUESTS=0 可以去掉全局上限。


飞书渠道实现要点

飞书渠道的源码在 nanobot/channels/feishu.py,1700 多行。这里只讲飞书特有的设计细节,完整流程见全局时间线的 Phase 2/3/7。

飞书消息处理时序

线程模型:WebSocket 线程 + asyncio 主循环的桥接

飞书的 lark_oapi SDK 内部用的是同步阻塞的 WebSocket 客户端,不能直接在 asyncio 循环里跑。FeishuChannel 把它放到一个独立线程里:

1
2
3
4
5
6
7
8
9
10
11
12
13
# nanobot/channels/feishu.py (简化)
async def start(self) -> None:
self._loop = asyncio.get_running_loop() # 保存主事件循环的引用

def run_ws():
ws_loop = asyncio.new_event_loop()
asyncio.set_event_loop(ws_loop)
_lark_ws_client.loop = ws_loop # patch SDK 内部的 loop 引用
while self._running:
self._ws_client.start() # 阻塞直到断连
time.sleep(5) # 断连后 5s 重连

threading.Thread(target=run_ws, daemon=True).start()

WebSocket 线程收到消息后,通过 run_coroutine_threadsafe() 跳回主 asyncio 循环:

1
2
3
def _on_message_sync(self, data: Any) -> None:
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(self._on_message(data), self._loop)

这是 Python asyncio 的标准跨线程调度接口,保证 _on_message() 能在主事件循环里安全地 await 各种异步操作。

去重与群聊策略

_on_message() 里的去重逻辑用 OrderedDict 缓存已处理的 message_id:

1
2
3
4
5
if message_id in self._processed_message_ids:
return
self._processed_message_ids[message_id] = None
while len(self._processed_message_ids) > 1000:
self._processed_message_ids.popitem(last=False) # 淘汰最旧的

用 OrderedDict 而不是普通 dict,是因为需要按插入顺序淘汰,popitem(last=False) 直接拿到最先插入的那条。

群聊策略 group_policy 有两个值:"open" 处理所有群消息,"mention" 只处理 @bot 的消息。_is_bot_mentioned() 会检查消息的 mentions 列表或是否有 @_all

反应表情反馈

收到消息时会加一个 react_emoji(默认 👍),流式输出结束时会删掉它,然后加 done_emoji(如果配了的话)。这给用户一个视觉反馈:从”正在处理”变成”已完成”。

智能格式选择

飞书的消息格式有三种:text、post、interactive。_detect_msg_format() 会根据内容自动选择:

  • 有代码块、表格、标题 → interactive(能渲染 Markdown)
  • 内容很长 → interactive(阅读体验更好)
  • 有链接 → post(支持 <a> 标签)
  • 短纯文本 → text
  • 其他 → post

这就是为什么 bot 回复纯文本打招呼时发普通消息,但一旦输出代码或表格就变成卡片形式。

流式输出的节流机制

CardKit Streaming API 要求客户端维护一个严格递增的 sequence 序号。nanobot 在实现时加了一层节流:最快 0.5s 更新一次卡片,而不是每个 token 都发请求。

1
2
3
4
5
# 节流逻辑
if (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: # 0.5s
buf.sequence += 1
self._stream_update_text_sync(buf.card_id, buf.text, buf.sequence)
buf.last_edit = now

这是在用户体验(更新频率)和 API 限速之间的权衡。sequence 在飞书侧用于防止乱序更新,节流是在客户端侧减少请求频率。


小结

  • 一条消息从飞书到回复要经过七个阶段:gateway 启动初始化 → WebSocket 收包 → 消息处理与转发 → AgentLoop 处理 → 输出发到 outbound → ChannelManager 消费 → 回发飞书。每个阶段都有明确的职责边界,通过 MessageBus 解耦,各模块只依赖队列接口。

  • BaseChannel 是接口契约start()/stop()/send() 三个抽象方法定义了每个渠道必须实现的能力,_handle_message() 则是渠道和消息总线之间唯一的接入点。

  • 短期 session 默认严格隔离,以 channel:chat_id 为 key,每个渠道+用户有独立的 JSONL 文件,互不干扰。中期摘要(history.jsonl)和长期记忆(MEMORY.md)则是全局共享的,这是 nanobot 单用户个人助手定位的有意取舍——让助手在不同渠道之间保持对”你最近在做什么”的感知。开启 unified_session=true 会进一步把短期 session 也打通,适合单用户多端使用的场景。

  • 流式输出的节流机制(0.5s 最小间隔)是在用户体验和 API 限速之间的权衡,并不是每个 token 都发一次请求。