nanobot源码学习(3)-AgentLoop

上篇回顾

两篇下来,整体骨架和两个主要模块已经弄清楚了:笔记1讲了项目结构、CLI启动、记忆系统(短/中/长记忆的分层设计,Consolidator压缩,Dream长记忆更新);笔记2讲了provider层——怎么把各家大模型的API抹平成统一接口、流式响应的拼接、重试逻辑。

两篇里都反复提到”AgentLoop”,但都是点到为止,到了核心的循环调用部分就略过去了。这篇来把这个最核心的引擎从头到尾走一遍。

AgentLoop 是整个系统的心脏:它接收用户消息,把历史和记忆组装成prompt,驱动LLM循环推理,执行工具,最后把结果回给调用方。光loop.py就1000行,runner.py也近1000行,但逻辑其实相当清晰——只要按调用顺序一段一段看,并不难理解。

全局调用链

先有个整体感觉,再细讲每一段。

整体调用时序

从最外层的process_direct进去,一路到AgentRunner.run里的迭代循环,最后回到_process_message做收尾。下面按这个顺序逐段拆解。

从入口开始:process_direct

CLI模式下调用的是这个方法:

1
2
3
4
5
6
# nanobot/cli/commands.py
response = await agent_loop.process_direct(
message, session_id,
on_progress=_cli_progress,
on_stream=renderer.on_delta,
)

process_direct 本身很薄,只是把字符串包装成 InboundMessage,然后转给 _process_message

1
2
3
4
5
6
7
8
9
10
11
12
13
# loop.py
async def process_direct(
self,
content: str,
session_key: str = "cli:direct",
...
) -> OutboundMessage | None:
await self._connect_mcp() # 懒连接 MCP 服务(如果配置了的话)
msg = InboundMessage(
channel=channel, sender_id="user", chat_id=chat_id,
content=content, media=media or [],
)
return await self._process_message(msg, session_key=session_key, ...)

InboundMessage 就是个普通的数据类,记录了发消息的渠道(cli/telegram/discord等)、发送者、内容、附件。之所以要包装成这个结构,是因为 nanobot 支持多渠道接入——Telegram、钉钉、Slack、Discord……无论哪个渠道来的消息,后续处理逻辑完全一样,都走 _process_message

_process_message:处理前的五步准备

_process_message 是真正的处理核心,在真正调 LLM 之前要做五件事。这里按 CLI 路径讲(channel != "system" 分支)。

第一步:加载 Session,并恢复可能的崩溃断点

1
2
3
4
5
6
7
8
key = session_key or msg.session_key  # 例如 "cli:direct"
session = self.sessions.get_or_create(key)

# 这两行是崩溃恢复的关键,后面会详细讲
if self._restore_runtime_checkpoint(session):
self.sessions.save(session)
if self._restore_pending_user_turn(session):
self.sessions.save(session)

get_or_create 会先在内存里找,找不到就从文件加载(sessions/cli_direct.jsonl),再找不到就新建一个空 session。Session 里存着历史消息列表和一些元数据。

_restore_runtime_checkpoint_restore_pending_user_turn 这两行是 crash recovery 的逻辑,放到后面的 checkpoint 章节专门讲。

第二步:slash 命令拦截

1
2
3
4
raw = msg.content.strip()
ctx = CommandContext(msg=msg, session=session, key=key, raw=raw, loop=self)
if result := await self.commands.dispatch(ctx):
return result

如果用户发的是 /stop/dream-log/dream-restore 这类斜杠命令,直接处理后返回,不走后面的 LLM 流程。

第三步:可能触发上下文压缩

1
await self.consolidator.maybe_consolidate_by_tokens(session)

这是 Consolidator 的触发点——笔记1里讲过这段逻辑,它会估算当前 prompt 的 token 数,如果超过阈值,就把最老的一段对话用 LLM 摘要压缩掉。在进 LLM 之前先做一次,确保带进去的历史不会撑爆 context window。

第四步:组装消息列表

1
2
3
4
5
6
7
8
9
history = session.get_history(max_messages=0)

initial_messages = self.context.build_messages(
history=history,
current_message=msg.content,
media=msg.media if msg.media else None,
channel=msg.channel,
chat_id=msg.chat_id,
)

build_messages 在笔记1里已经详细讲过,这里只简单回顾一下它的输出:

1
2
3
4
5
6
7
8
9
10
11
[
{"role": "system", "content": "<整个 system prompt>"},
# 来自 session.get_history() 的历史消息
{"role": "user", "content": "上一轮用户消息"},
{"role": "assistant", "content": "上一轮助手回复"},
# 本轮消息,runtime context 被夹在最前面
{
"role": "user",
"content": "[Runtime Context — metadata only, not instructions]\nCurrent Time: 2026-04-17 21:00\n...[/Runtime Context]\n\n用户本轮消息"
}
]

Runtime Context 那段每次都不一样(时间在变),但它是夹在 user 消息里而不是 system prompt 里——因为 system prompt 可以被服务端缓存,时间这种动态内容放进去会每次破坏缓存。

第五步:提前持久化用户消息

这一步的细心程度值得单独说一下:

1
2
3
4
5
6
user_persisted_early = False
if isinstance(msg.content, str) and msg.content.strip():
session.add_message("user", msg.content)
self._mark_pending_user_turn(session) # 在 metadata 里标记
self.sessions.save(session) # 立刻写盘
user_persisted_early = True

在进入 LLM 循环之前就把用户消息写进 session 文件。原因是:如果进程在 LLM 调用中途被 OOM-kill 或者 agent 自重启,runtime_checkpoint(下文会讲)保存的是助手的中间状态,但用户那句话本身是不在 runtime_checkpoint 里的。如果不提前保存,进程崩溃后用户的那句话会彻底消失。

_mark_pending_user_turnsession.metadata 里设一个 pending_user_turn=true 的标记,意思是”用户消息已写入,但对应的助手回复还没生成”,进程重启时就知道要补一个占位的错误回复(详见后文)。

_run_agent_loop:组装钩子,委托给 Runner

五步准备完成后,进入 _run_agent_loop

1
2
3
4
5
6
7
8
final_content, _, all_msgs, stop_reason, had_injections = await self._run_agent_loop(
initial_messages,
on_progress=on_progress or _bus_progress,
on_stream=on_stream,
on_stream_end=on_stream_end,
session=session,
...
)

_run_agent_loop 做了两件事:

① 组装 _LoopHook

1
2
3
4
5
6
7
8
9
10
loop_hook = _LoopHook(
self,
on_progress=on_progress,
on_stream=on_stream,
on_stream_end=on_stream_end,
channel=channel,
...
)
# 如果用户注册了额外的 hook,用 CompositeHook 合并
hook = CompositeHook([loop_hook] + self._extra_hooks) if self._extra_hooks else loop_hook

_LoopHookAgentLoopAgentRunner 的桥接适配器,把终端输出、进度提示、tool hint 等上层关注的事件,翻译成 AgentRunner 能调用的 hook 接口。稍后会单独讲 hook 机制。

② 组装 AgentRunSpec 并委托给 Runner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
result = await self.runner.run(AgentRunSpec(
initial_messages=initial_messages,
tools=self.tools,
model=self.model,
max_iterations=self.max_iterations,
max_tool_result_chars=self.max_tool_result_chars,
hook=hook,
concurrent_tools=True,
workspace=self.workspace,
session_key=session.key if session else None,
context_window_tokens=self.context_window_tokens,
provider_retry_mode=self.provider_retry_mode,
checkpoint_callback=_checkpoint, # 写 checkpoint 的回调
injection_callback=_drain_pending, # 从 pending queue 拉新消息的回调
))

AgentRunSpec 是个纯数据类,把所有配置集中在一起传给 Runner。这种设计很干净:Runner 自己不依赖 AgentLoop,只看 spec 里的参数,所以 Runner 可以被 Dream(记忆更新)等其他地方复用。

AgentRunner.run:核心循环

这是整个系统里最重要的一段代码,在 runner.pyrun() 方法里。它是一个 for iteration in range(max_iterations) 的循环,每一轮做:消息治理 → 调 LLM → 根据 finish_reason 决定走工具分支还是结束分支。

AgentRunner 迭代状态机

从图里能看出来这不是一个简单的 if-else,而是一组嵌套的边界条件检查,每个分支最终要么 continue(回到下一轮迭代)要么 break(退出循环)。下面把每个分支都走一遍。

每轮开始:context governance(消息治理)

1
2
3
4
5
6
7
8
9
for iteration in range(spec.max_iterations):
messages_for_model = self._drop_orphan_tool_results(messages)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)
messages_for_model = self._microcompact(messages_for_model)
messages_for_model = self._apply_tool_result_budget(spec, messages_for_model)
messages_for_model = self._snip_history(spec, messages_for_model)
# snip 可能产生新的孤儿,再清一遍
messages_for_model = self._drop_orphan_tool_results(messages_for_model)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)

这里有个核心设计:messages 是整个 turn 的档案(会在 turn 结束后保存到 session 文件),messages_for_model 是送给 LLM 的临时投影——两者可能不同,消息治理的所有修改只作用于这份投影,不会污染档案。

这引出了一个合理的疑问:既然每次都要重新处理,为什么不在持久化的时候就把消息整理成干净格式,这样每次直接用就好了,不用重复处理?

原因有几个:

首先,这些治理操作中有些依赖当前运行时上下文,不适合持久化。比如 _snip_history 要根据当前 context window 大小做裁剪,而 context window 是运行时配置,不同时候可能不同;_microcompact 的”最近10条保留”也是相对于当前 iteration 来说的。

其次,有些操作是临时修复,不应该写入档案。_drop_orphan_tool_results_backfill_missing_tool_results 是为了应对崩溃恢复后可能出现的不一致状态,这些”临时修复”只是为了让当次 LLM 调用能正常进行,持久化了反而污染历史。

最后,**_save_turn 已经做了一部分持久化层面的清理**(比如去掉 Runtime Context、截断超长工具结果)。而 context governance 里其余的处理都是”发送前的最后一公里优化”,本来就是设计为每次重建的。考虑到这些操作都是纯内存的列表扫描/字符串替换,性能开销实际上极低,不是瓶颈。

每一步的作用:

**_drop_orphan_tool_results**:扫描所有 role=tool 的消息,找没有对应 assistant.tool_calls 的孤儿 tool 结果并删掉。这种孤儿一般是崩溃恢复后的历史状态不一致导致的。

**_backfill_missing_tool_results**:反过来,找有 tool_calls 声明但没有对应 role=tool 消息的 assistant 消息,给它补一条占位错误消息:

1
[Tool result unavailable — call was interrupted or lost]

这是因为某些 provider 要求 assistant 消息中每个 tool_call 都必须有对应的 tool 结果,否则会报 400。

**_microcompact**:把历史中较旧的大型工具返回值(read_fileexecgrep 等的输出)替换成一行摘要,例如:

1
[read_file result omitted from context]

具体规则:统计所有”可压缩工具”(_COMPACTABLE_TOOLS 集合)的结果条数,最近10条保留原文,更早的且内容超过500字符的才压缩。这样既节省 token,又保留了最近使用的工具结果(因为模型通常还会引用最近几次的工具输出)。

**_apply_tool_result_budget**:对单条工具结果应用字符上限(max_tool_result_chars),超长的截断。超长工具结果还可能被”溢出到文件”——这个功能由 maybe_persist_tool_result 实现,超出限制的内容写到 workspace 下的临时文件,tool 消息里只保留一个文件引用:

1
2
3
[Tool result saved to: .tool_results/cli_direct/call_abc123.txt (42678 chars)]
Preview:
... 前512字符 ...

模型看到这个引用后,如果需要完整内容,可以自己用 read_file 去读那个文件。

**_snip_history**:如果经过上面几步之后,估算的 token 数仍然超过 context window 预算,就从消息数组里从前往后裁掉最老的消息(只裁非 system 消息,且保证裁掉之后第一条消息是 user 消息)。这是最后的兜底手段。

调 LLM:_request_model

1
response = await self._request_model(spec, messages_for_model, hook, context)
1
2
3
4
5
6
7
8
9
10
11
12
13
async def _request_model(self, spec, messages, hook, context):
kwargs = self._build_request_kwargs(
spec, messages,
tools=spec.tools.get_definitions(), # 把所有注册工具的 JSON Schema 一起传进去
)
if hook.wants_streaming():
async def _stream(delta: str) -> None:
await hook.on_stream(context, delta) # 每收到一个文本 delta,立刻回调
return await self.provider.chat_stream_with_retry(
**kwargs,
on_content_delta=_stream,
)
return await self.provider.chat_with_retry(**kwargs)

是否走流式取决于 hook.wants_streaming()——对应 _LoopHook.wants_streaming() 里的判断:调用方有没有传 on_stream 回调。CLI 模式下传了 renderer.on_delta,所以走流式;某些后台任务(如 Dream)没有 stream 需求,就走非流式。

不管哪条路,最终都返回同一种格式的 LLMResponse(这是笔记2里讲的设计)。

分支一:模型要调工具(has_tool_calls = True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
if response.has_tool_calls:
# 通知 streaming 这一段结束了,但 agent 还没有完成(resuming=True)
if hook.wants_streaming():
await hook.on_stream_end(context, resuming=True)

# 把模型的 tool_calls 意图序列化成 OpenAI 格式,追加进 messages
assistant_message = build_assistant_message(
response.content or "",
tool_calls=[tc.to_openai_tool_call() for tc in response.tool_calls],
reasoning_content=response.reasoning_content,
thinking_blocks=response.thinking_blocks,
)
messages.append(assistant_message)

# 写 checkpoint(下文详讲)
await self._emit_checkpoint(spec, {
"phase": "awaiting_tools",
"assistant_message": assistant_message,
"pending_tool_calls": [tc.to_openai_tool_call() for tc in response.tool_calls],
...
})

# 执行工具
await hook.before_execute_tools(context) # 触发 terminal 上的 "🔧 工具提示"
results, new_events, fatal_error = await self._execute_tools(
spec, response.tool_calls, external_lookup_counts,
)
# 三个返回值各司其职:
# - results: tool_calls对应的执行结果列表(字符串或结构体),
# 每个结果都会被包装成 role=tool 的消息追加进历史
# - new_events: 事件记录列表,每条记录 {name, status, detail},
# 用于 Dream 等后台任务判断"哪些工具调用成功/失败了"
# - fatal_error: 通常为 None;当 fail_on_tool_error=True 且工具真正抛异常时才非 None,
# 意味着本轮循环必须强制终止(普通工具错误只是在 result 里返回错误字符串,不会设置这个)

# 把每个工具的执行结果追加进 messages
for tool_call, result in zip(response.tool_calls, results):
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_call.name,
"content": self._normalize_tool_result(spec, tool_call.id, tool_call.name, result),
})

# 再写一次 checkpoint(所有工具都执行完了)
await self._emit_checkpoint(spec, {"phase": "tools_completed", ...})

# 继续下一轮迭代(带着新追加的工具结果重新问 LLM)
continue

_execute_tools 支持并发执行多个工具,具体的并发机制、分批逻辑和错误处理在后面的”工具的并发执行细节”一节详细讲,这里先继续主流程。

**关于 fatal_error**:fatal_error 非 None 时循环立刻 break,具体触发条件见后面章节的 _run_tool 分析。

分支二:正常文本回复

1
2
# 过滤掉 <think>...</think> 推理内容
clean = hook.finalize_content(context, response.content)

这里 hook.finalize_content 实际上调的是 _LoopHook.finalize_content

1
2
def finalize_content(self, context, content):
return self._loop._strip_think(content)

也就是把模型回复里的 <think>...</think> 块全部去掉,只留用户可见的正式回答。

这里有个值得说清楚的细节:<think> 过滤只影响展示给用户的内容,不影响存进 messages 历史的内容。实际上对于 DeepSeek-R1、Kimi 这类推理模型,推理内容会以 reasoning_content 字段单独存在 assistant_message 里(而不是嵌在 content 里),然后随着 messages 传给下一轮 LLM——这在 _ALLOWED_MSG_KEYS 里是明确允许的字段。至于用 <think>...</think> 标签格式的模型(如 MiniMax),strip_think 是在 finalize_content 里做的,这时候原始 response.content 仍然完整,build_assistant_message 传入的是过滤后的 clean,所以存进历史的是干净版本——这类模型的推理内容在历史里确实会被丢掉,这是一个取舍:标签格式的推理内容往往篇幅很大,保留会消耗大量 context window,而且没有专用字段处理,所以选择不保留。

处理空回复

1
2
3
4
5
6
if response.finish_reason != "error" and is_blank_text(clean):
empty_content_retries += 1
if empty_content_retries < _MAX_EMPTY_RETRIES: # _MAX_EMPTY_RETRIES = 2
continue # 直接重试,不追加任何消息
# 超过重试次数,尝试"催促"一下
response = await self._request_finalization_retry(spec, messages_for_model)

偶尔模型会返回空回复(通常是 bug 或临时问题),先默默重试2次。超过2次还是空的,就追加一条催促消息再试一次:

1
2
# runtime.py
FINALIZATION_RETRY_PROMPT = "Please provide your response to the user based on the conversation above."

这个重试不带 tools 参数(tools=None),强迫模型直接给文字答案,而不是再去调工具。

处理 finish_reason == "length"(输出被截断)

1
2
3
4
5
6
7
if response.finish_reason == "length" and not is_blank_text(clean):
length_recovery_count += 1
if length_recovery_count <= _MAX_LENGTH_RECOVERIES: # 最多3次
# 把已有的截断内容追加进去,然后加一条 "继续" 消息
messages.append(build_assistant_message(clean, ...))
messages.append(build_length_recovery_message())
continue
1
2
3
4
LENGTH_RECOVERY_PROMPT = (
"Output limit reached. Continue exactly where you left off "
"— no recap, no apology. Break remaining work into smaller steps if needed."
)

max_tokens 限制到了但模型还没说完,就把已经输出的部分追加进历史,然后发一条”继续,不要重复”的催促,让模型接着说。最多3次,防止无限续写。

检查是否有 mid-turn injection

1
2
3
4
5
6
7
should_continue, injection_cycles = await self._try_drain_injections(
spec, messages, assistant_message, injection_cycles,
phase="after final response",
iteration=iteration,
)
if should_continue:
had_injections = True

这是 mid-turn injection 机制的一部分。简单理解就是:用户在 agent 还在循环调用工具的时候,又发来了新消息。这些消息不会启动新的处理流程,而是被放进 pending_queue,等当前循环迭代自然结束、或者进入新一轮迭代时被”注入”进来。具体设计在后面单独讲。

正常结束的判断条件

经过前面所有检查之后,能走到这里说明:

  • response.has_tool_calls 为 False(否则早就 continue 了)
  • clean 不为空(空回复分支要么 continue 重试要么已经 break 了)
  • response.finish_reason 不是 "length"(截断分支 continue 了)
  • response.finish_reason 不是 "error"(错误分支 break 了)
  • 没有 pending injection(有 injection 的话 continue 了)

也就是说”正常结束”没有一个显式的 if finish_reason == "stop" 判断,而是穷举了所有异常情况后剩下的那条路。当上面五个条件全都不满足时,就认为这是一次正常的、有内容的、完整的回复:

1
2
3
4
messages.append(assistant_message or build_assistant_message(clean, ...))
await self._emit_checkpoint(spec, {"phase": "final_response", ...})
final_content = clean
break

把最终的 assistant 回复追加进 messages,写最后一次 checkpoint,设好 final_content,break 出循环。assistant_message 在这里可能已经提前构造好了(injection 检查之前就建了,因为要传给 _try_drain_injections),所以这里优先用它,不重复构造。

工具的并发执行细节

模型可以在一次回复里声明多个 tool_call,_execute_tools 需要把这些工具全部执行完,再把结果汇总返回。先来看整体流程,再看并发是怎么发生的。

整体执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def _execute_tools(self, spec, tool_calls, external_lookup_counts):
# 第一步:把工具调用列表分成若干批次
batches = self._partition_tool_batches(spec, tool_calls)
tool_results = []

# 第二步:逐批执行
for batch in batches:
if spec.concurrent_tools and len(batch) > 1:
# 批次内并发:用 asyncio.gather 同时启动多个协程
tool_results.extend(await asyncio.gather(*(
self._run_tool(spec, tool_call, external_lookup_counts)
for tool_call in batch
)))
else:
# 单个工具:串行 await
for tool_call in batch:
tool_results.append(
await self._run_tool(spec, tool_call, external_lookup_counts)
)

# 第三步:汇总结果
results, events, fatal_error = [], [], None
for result, event, error in tool_results:
results.append(result)
events.append(event)
if error is not None and fatal_error is None:
fatal_error = error
return results, events, fatal_error

并发的核心是 asyncio.gather——它把同一批次里的所有工具协程同时提交给事件循环,等所有都完成后再把结果按原顺序返回。**tool_results 里结果的顺序和 tool_calls 的顺序严格对应**,这样后续 zip(tool_calls, results) 能正确配对。

_partition_tool_batches:如何决定哪些工具可以并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def _partition_tool_batches(self, spec, tool_calls):
if not spec.concurrent_tools:
return [[tool_call] for tool_call in tool_calls] # 全局关闭并发,每个单独一批

batches = []
current = [] # 当前正在积累的"可并发"批次
for tool_call in tool_calls:
tool = spec.tools.get(tool_call.name)
can_batch = bool(tool and tool.concurrency_safe) # 看工具的 concurrency_safe 属性
if can_batch:
current.append(tool_call) # 可并发:加入当前批次
continue
# 不可并发:先把已积累的批次收起来,这个工具单独成一批
if current:
batches.append(current)
current = []
batches.append([tool_call])
if current:
batches.append(current)
return batches

规则很简单:concurrency_safe=True 的工具可以和其他安全工具合并成一批并发执行;concurrency_safe=False(如 write_file)的工具必须独占一批串行执行。

举个具体例子,假设模型一次调了5个工具:

1
2
原始顺序:read_file(安全), read_file(安全), write_file(不安全), grep(安全), grep(安全)
分批结果:[read_file_A, read_file_B] → [write_file_C] → [grep_D, grep_E]

执行过程:

1
2
3
4
5
6
7
第1批: asyncio.gather(read_file_A, read_file_B)  ← 两个同时跑
↓ 都完成
第2批: await write_file_C ← 单独等它跑完
↓ 完成
第3批: asyncio.gather(grep_D, grep_E) ← 两个同时跑
↓ 都完成
汇总 results: [resultA, resultB, resultC, resultD, resultE] ← 顺序不变

为什么要这样设计?write_file 如果和另一个 write_file 同时跑,可能写到同一个文件,结果不可预测。但两个 read_file 只是读,同时跑完全没问题。grepread_file 这类只读操作是 concurrency_safe=Truewrite_fileedit_filespawn(会启动新进程)这类有副作用的工具是 concurrency_safe=False

_run_tool:单个工具的实际执行

每个工具的执行在 _run_tool 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def _run_tool(self, spec, tool_call, external_lookup_counts):
# 1. 重复外部查询检查(防止模型无限循环搜索同一个 URL 或词)
lookup_error = repeated_external_lookup_error(
tool_call.name, tool_call.arguments, external_lookup_counts,
)
if lookup_error:
# 超过2次相同查询:直接返回错误字符串,不真正执行
if spec.fail_on_tool_error:
return lookup_error + _HINT, event, RuntimeError(lookup_error)
return lookup_error + _HINT, event, None

# 2. 真正执行工具
try:
result = await tool.execute(**params)
except asyncio.CancelledError:
raise # 取消信号不能吞掉,必须往上传
except BaseException as exc:
if spec.fail_on_tool_error:
return f"Error: {type(exc).__name__}: {exc}", event, exc # fatal_error 非 None
return f"Error: {type(exc).__name__}: {exc}", event, None # fatal_error 为 None

# 3. 返回 (result, event, fatal_error)
return result_str, event, None

asyncio.CancelledError 的特殊处理值得注意:gather 里的任意一个协程如果被取消,CancelledError 必须向上传播,不能被 except BaseException 吞掉,否则整个 gather 就卡住了。这是 Python asyncio 里的一个常见坑,这里做了正确处理。

hook 机制

nanobot 的 hook 系统设计得很干净,值得专门看一下。

AgentHookAgentHookContext

AgentHook 是生命周期钩子的抽象基类,定义了几个可重写的方法:

1
2
3
4
5
6
7
8
class AgentHook:
def wants_streaming(self) -> bool: return False
async def before_iteration(self, context): pass
async def on_stream(self, context, delta: str): pass
async def on_stream_end(self, context, *, resuming: bool): pass
async def before_execute_tools(self, context): pass
async def after_iteration(self, context): pass
def finalize_content(self, context, content) -> str | None: return content

每个 hook 方法都接受一个 AgentHookContext,里面有当前迭代的完整上下文:

1
2
3
4
5
6
7
8
9
10
11
@dataclass
class AgentHookContext:
iteration: int
messages: list[dict]
response: LLMResponse | None = None
usage: dict[str, int] = ...
tool_calls: list[ToolCallRequest] = ...
tool_results: list[Any] = ...
final_content: str | None = None
stop_reason: str | None = None
error: str | None = None

这些字段在循环里是按顺序填充的——response 在 LLM 调用后填,tool_results 在工具执行后填,final_content 在 break 之前填。hook 方法调用的时机对应这些字段的填充顺序。

CompositeHook:多 hook 扇出

当用户注册了额外的 hook(比如用于监控、审计的自定义 hook),用 CompositeHook 把它们和内置的 _LoopHook 合并:

1
hook = CompositeHook([loop_hook] + self._extra_hooks)

CompositeHook 会依次调用所有 hook,并对除 _LoopHook 之外的 hook 做异常隔离(catch 住不重新抛出),防止自定义 hook 的 bug 把整个 agent 崩掉:

1
2
3
4
5
6
7
8
9
async def _for_each_hook_safe(self, method_name, *args, **kwargs):
for h in self._hooks:
if getattr(h, "_reraise", False):
await getattr(h, method_name)(*args, **kwargs)
continue
try:
await getattr(h, method_name)(*args, **kwargs)
except Exception:
logger.exception("AgentHook.{} error in {}", method_name, type(h).__name__)

_reraise=True 的 hook(比如 _LoopHook)发生异常会向上传播,_reraise=False 的(用户自定义的)只打 log 不崩溃。

_LoopHook:内置 hook 的实现

_LoopHookAgentLoop 注册的内置 hook,负责把 runner 的事件翻译成上层可用的行为。几个关键实现:

**before_execute_tools**:工具执行前触发,打印工具调用提示:

1
2
3
4
5
6
7
8
9
10
11
async def before_execute_tools(self, context):
if self._on_progress:
thought = self._loop._strip_think(context.response.content)
if thought:
await self._on_progress(thought) # 如果有文字+工具调用,先打文字
tool_hint = self._loop._tool_hint(context.tool_calls)
await self._on_progress(tool_hint, tool_hint=True) # 然后打"工具调用提示"
for tc in context.tool_calls:
args_str = json.dumps(tc.arguments, ensure_ascii=False)
logger.info("Tool call: {}({})", tc.name, args_str[:200]) # 日志记录
self._loop._set_tool_context(...) # 更新 message/spawn/cron 工具的路由上下文

**on_stream**:流式输出的处理最有意思,它不是直接把 delta 传出去,而是做了一层 think 过滤:

1
2
3
4
5
6
7
async def on_stream(self, context, delta):
prev_clean = strip_think(self._stream_buf)
self._stream_buf += delta # 追加 delta 到缓冲
new_clean = strip_think(self._stream_buf)
incremental = new_clean[len(prev_clean):] # 计算过滤后的增量
if incremental and self._on_stream:
await self._on_stream(incremental)

为什么要这样处理而不是直接 strip_think(delta) 就行?因为 <think>...</think> 是一个”跨 chunk 的结构”——开头的 <think> 可能在第3个 chunk,结尾的 </think> 可能在第50个 chunk。如果每个 delta 单独过滤,过滤不了跨 chunk 的 think 块。

维护一个累积缓冲 _stream_buf,每收到新 delta 就用 strip_think 处理整个缓冲,计算处理后的结果比上次多了什么,只把新增量传出去。这样用户看到的是过滤了 think 内容的干净文本,并且实时更新。

**after_iteration**:每轮结束后打印 token 用量日志:

1
2
3
4
5
6
7
8
async def after_iteration(self, context):
u = context.usage or {}
logger.debug(
"LLM usage: prompt={} completion={} cached={}",
u.get("prompt_tokens", 0),
u.get("completion_tokens", 0),
u.get("cached_tokens", 0),
)

checkpoint:断点续传与崩溃恢复

这个机制是 nanobot 的容错设计里最复杂的部分,值得单独仔细讲。

checkpoint 写入与恢复时序

为什么不直接把 messages 存进 session?

在讲 checkpoint 的实现之前,先说清楚一个容易产生的疑问:runner 在执行工具的过程中一直在往 messages 里 append 新消息,为什么不直接把这个列表存进 session.messages 再落盘,而是要引入 runtime_checkpoint 这套额外字段?

关键在于:runner 里的 messagessession.messages 是两个独立的对象

整个 runner 运行期间操作的 messages 是在 _run_agent_loop 里从 initial_messages 拷贝出来的一个本轮 turn 的本地列表。runner 运行期间产生的所有新消息(assistant 的工具调用声明、每个工具的执行结果、最终回复……)都只 append 进这个本地列表,session.messages 在这期间完全没有变化。只有 runner 跑完、_save_turn 被调用之后,这些新消息才被写进 session.messages

所以如果进程在 runner 运行中途被杀,session 文件里的 session.messages 完全是上一轮结束时的状态,本轮所有中间消息都消失了。runtime_checkpoint 充当的正是”把 runner 内部产生的中间状态提前存到 session.metadata 里”的桥梁。

写 checkpoint

在 runner 的每一轮迭代里,有三个时间点会写 checkpoint:

① 发出工具调用,等待执行(phase=awaiting_tools

1
2
3
4
5
6
7
8
await self._emit_checkpoint(spec, {
"phase": "awaiting_tools",
"iteration": iteration,
"model": spec.model,
"assistant_message": assistant_message, # 助手决定调什么工具
"completed_tool_results": [],
"pending_tool_calls": [tc.to_openai_tool_call() for tc in response.tool_calls], # 还没执行的
})

② 所有工具执行完(phase=tools_completed

1
2
3
4
5
6
await self._emit_checkpoint(spec, {
"phase": "tools_completed",
"assistant_message": assistant_message,
"completed_tool_results": completed_tool_results, # 全部工具结果
"pending_tool_calls": [], # 已全部完成,没有 pending 的了
})

③ 生成了最终回复(phase=final_response

1
2
3
4
5
6
await self._emit_checkpoint(spec, {
"phase": "final_response",
"assistant_message": messages[-1],
"completed_tool_results": [],
"pending_tool_calls": [],
})

_emit_checkpoint 会调 checkpoint_callback,而 callback 是 AgentLoop._set_runtime_checkpoint

1
2
3
4
5
6
7
8
async def _checkpoint(payload):
if session is None:
return
self._set_runtime_checkpoint(session, payload)

def _set_runtime_checkpoint(self, session, payload):
session.metadata[self._RUNTIME_CHECKPOINT_KEY] = payload
self.sessions.save(session) # 立刻写盘

每次 checkpoint 都会立刻 sessions.save(session)。这保证了进程随时被杀,session 文件里都有最新的进度快照。

这里有一个值得注意的性能问题:sessions.save()全量重写,不是追加写:

1
2
3
4
5
def save(self, session: Session) -> None:
with open(path, "w", encoding="utf-8") as f: # "w" 模式,全量覆盖
f.write(json.dumps(metadata_line) + "\n") # 先写 metadata(含 checkpoint)
for msg in session.messages: # 再把所有消息逐行写下去
f.write(json.dumps(msg) + "\n")

也就是说,每次 checkpoint 都要把 session.messages 里的所有历史消息加上最新的 metadata 从头写一遍。考虑一个 agent 调用了大量工具的场景:每次工具调用前后各有一次 checkpoint,加上用户消息提前落盘、最终 turn 保存、背景 consolidation……一轮 agentLoop 下来可能触发十几次全量重写。如果 session 历史很长,这是个明显的写放大问题。

不过 nanobot 本身定位是个人工具,session 通过 Consolidator 定期压缩历史,文件通常不会很大,在实际使用中这个开销一般不是瓶颈。若要改进,可以考虑引入 WAL(预写日志)或把 checkpoint 单独存成一个轻量的旁路文件——但这会显著增加代码复杂度,现阶段的取舍是合理的。

用户消息也要提前保存

除了 runner 里的 checkpoint,在 _process_message 里还有两个标记:

  • **_mark_pending_user_turn**:进入 runner 之前,在 session.metadata 里设 pending_user_turn=true,表示”用户消息已写入但还没有助手回复”
  • **_clear_pending_user_turn**:runner 正常结束后清掉这个标记

这两个标记加上 runtime_checkpoint,构成了完整的崩溃恢复的依据。

崩溃恢复:_restore_runtime_checkpoint

下次收到消息时(或者进程重启后有新消息进来),_process_message 一开始就调:

1
2
3
4
if self._restore_runtime_checkpoint(session):
self.sessions.save(session)
if self._restore_pending_user_turn(session):
self.sessions.save(session)

_restore_runtime_checkpoint 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _restore_runtime_checkpoint(self, session):
checkpoint = session.metadata.get("runtime_checkpoint")
if not checkpoint:
return False

assistant_message = checkpoint.get("assistant_message")
completed_tool_results = checkpoint.get("completed_tool_results") or []
pending_tool_calls = checkpoint.get("pending_tool_calls") or []

restored_messages = []
# 1. 把 assistant_message 追加进 session
if isinstance(assistant_message, dict):
restored_messages.append(assistant_message)
# 2. 把已完成的工具结果追加进 session
for message in completed_tool_results:
restored_messages.append(message)
# 3. 对于还没执行完的工具,生成一条占位错误消息
for tool_call in pending_tool_calls:
restored_messages.append({
"role": "tool",
"tool_call_id": tool_call.get("id"),
"name": ...,
"content": "Error: Task interrupted before this tool finished.",
})
# ...
session.messages.extend(restored_messages[overlap:]) # 去重后追加
return True

恢复时有一个去重逻辑:检查 restored_messages 的前 N 条和 session.messages 的最后 N 条是否已经一样,如果一样就不重复追加(用 _checkpoint_message_key 做内容级别的比较,不是引用比较)。这是因为 checkpoint 里保存的内容和 _save_turn 最终保存的内容可能有重叠,去重避免消息重复出现。

_restore_pending_user_turn 处理更简单的情况:如果 pending_user_turn=true 且 session 末尾是 user 消息(说明 runner 还没来得及生成任何回复就崩了),就追加一条占位错误:

1
2
3
4
session.messages.append({
"role": "assistant",
"content": "Error: Task interrupted before a response was generated.",
})

这样下次 LLM 看到的历史是语义完整的——用户说了话,助手也有回复(虽然是个错误消息),不会出现 user 消息悬空的情况。

_process_message 收尾:保存本轮数据

Runner 执行完之后,回到 _process_message

1
2
3
4
5
6
7
8
9
10
# save_skip 是 initial_messages 的长度,加上提前持久化的用户消息(如果有的话)
save_skip = 1 + len(history) + (1 if user_persisted_early else 0)
self._save_turn(session, all_msgs, save_skip)

self._clear_pending_user_turn(session)
self._clear_runtime_checkpoint(session)
self.sessions.save(session)

# 把下一轮的压缩操作扔到后台,不阻塞这次响应
self._schedule_background(self.consolidator.maybe_consolidate_by_tokens(session))

_save_turn:把本轮新消息沉淀到 session

all_msgs(runner 返回的 messages 列表)包含了整个历史,save_skip 告诉 _save_turn 从哪里开始才是新的、本轮产生的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _save_turn(self, session, messages, skip):
for m in messages[skip:]:
entry = dict(m)
role, content = entry.get("role"), entry.get("content")

# 跳过空 assistant 消息(防止污染历史)
if role == "assistant" and not content and not entry.get("tool_calls"):
continue

# tool 消息:超长内容截断
if role == "tool" and isinstance(content, str) and len(content) > max_chars:
entry["content"] = truncate_text(content, max_chars)

# user 消息:剥掉 Runtime Context 块(时间/渠道这些元数据不需要存进历史)
if role == "user" and isinstance(content, str):
if content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG):
end_pos = content.find(ContextBuilder._RUNTIME_CONTEXT_END)
after = content[end_pos + len(end_marker):].lstrip("\n")
entry["content"] = after # 只保留用户的实际内容

entry.setdefault("timestamp", datetime.now().isoformat())
session.messages.append(entry)

这里的 Runtime Context 剥离很重要。每次发给 LLM 的 user 消息里都带着时间戳、渠道等动态信息,但这些不需要保存进 session,下次重建 prompt 时会重新生成。如果存进去,以后 LLM 看历史消息里永远有旧的时间戳,反而产生干扰。

最终返回

1
2
3
4
5
6
7
8
9
10
meta = dict(msg.metadata or {})
if on_stream is not None and stop_reason != "error":
meta["_streamed"] = True # 标记这次响应是流式的

return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final_content,
metadata=meta,
)

OutboundMessage 带着 _streamed=True 让下游渠道知道”内容已经实时发送过了,不要重复发”。但如果是非流式(或出现错误),这条消息就是正常的完整回复。

还有一个特殊情况:如果模型在这轮调用了 message 工具(nanobot 内置的一个工具,允许模型主动给用户发消息),且没有 mid-turn injection,那么 _process_message 会返回 None——因为用户可见的内容已经通过 message 工具发出去了,不应该再发一遍:

1
2
3
if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:
if not had_injections or stop_reason == "empty_final_response":
return None # 内容已通过 MessageTool 发送,无需再发

mid-turn injection:同一个 session 的并发消息

这是一个相对高级的特性,值得单独说一下。

问题背景

想象一个场景:用户发了一句”帮我分析这100个文件”,agent 开始疯狂调工具,要跑好几分钟。这期间用户等得不耐烦,又发了一句”先专注最重要的10个就行”。

如果直接启动两个并发任务处理这两条消息,它们会在 session 上竞争,消息顺序会错乱。正确的做法是:第二条消息应该注入进当前正在运行的任务里,让它感知到用户的新指令。

实现方式

每个活跃 session 有一个 pending_queue(最大容量20条):

1
2
3
# _dispatch 里
pending = asyncio.Queue(maxsize=20)
self._pending_queues[session_key] = pending

当有新消息进来,但这个 session 已经有任务在跑,就路由进 pending queue 而不是启动新任务:

1
2
3
4
# run() 里
if effective_key in self._pending_queues:
self._pending_queues[effective_key].put_nowait(pending_msg)
continue # 不创建新 task

在 runner 的每轮迭代结束时,以及工具执行完后,都会调 _try_drain_injections 尝试从 pending queue 取出新消息追加进 messages:

1
2
3
4
5
6
7
8
9
10
async def _drain_pending(*, limit=_MAX_INJECTIONS_PER_TURN) -> list[dict]:
items = []
while len(items) < limit:
try:
pending_msg = pending_queue.get_nowait()
except asyncio.QueueEmpty:
break
# 构造成标准 user 消息格式,带 runtime context
items.append({"role": "user", "content": merged})
return items

注入有上限:单次最多注入3条(_MAX_INJECTIONS_PER_TURN),一个 turn 内最多注入5轮(_MAX_INJECTION_CYCLES),防止被无限追加消息。

任务结束后,如果 pending queue 里还有消息没被消费(比如 max_iterations 到了),这些消息会被重新 publish 进 bus,作为新的 inbound message 触发新一轮处理:

1
2
3
4
5
6
7
8
# _dispatch 的 finally 块
queue = self._pending_queues.pop(session_key, None)
while True:
try:
item = queue.get_nowait()
except asyncio.QueueEmpty:
break
await self.bus.publish_inbound(item) # 重新入队,作为新 inbound 消息

并发控制与消息调度

AgentLoop.run() 是服务模式(非 CLI 直接调用)的入口,它是一个事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def run(self):
while self._running:
msg = await self.bus.consume_inbound() # 等消息

# 优先命令(/stop等)立刻处理,不排队
if self.commands.is_priority(raw):
await self.commands.dispatch_priority(ctx)
continue

# 如果 session 已有任务在跑,路由进 pending queue
if effective_key in self._pending_queues:
self._pending_queues[effective_key].put_nowait(msg)
continue

# 为这条消息创建一个 asyncio.Task
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(effective_key, []).append(task)

_dispatch 里有两层并发控制:

1
2
3
4
5
6
7
8
async def _dispatch(self, msg):
lock = self._session_locks.setdefault(session_key, asyncio.Lock()) # per-session 串行锁
gate = self._concurrency_gate or nullcontext() # 全局并发限制(默认3)

async with lock, gate:
response = await self._process_message(...)
if response:
await self.bus.publish_outbound(response)
  • lock:同一个 session 的消息串行处理,不会有两条消息同时修改同一个 session
  • gate:全局最多同时处理3个 session 的消息(通过 NANOBOT_MAX_CONCURRENT_REQUESTS 环境变量可调整,<=0 表示不限制)

总结

这篇从process_direct入口走到OutboundMessage返回,把 AgentLoop 的完整运行过程串了一遍。几个值得反复回味的设计点:

  1. 职责分离得很干净AgentLoop 管理 session 状态和外部集成(memory、tools、bus),AgentRunner 专注于迭代调用逻辑,LLMProvider 只管调 API。三层各自不知道彼此的细节,通过 AgentRunSpecAgentRunResultLLMResponse 这几个数据类传递信息。

  2. 消息治理(context governance):每次迭代都会做孤儿修复、微压缩、工具结果溢出、history snip,把历史消息修整成”LLM 能接受的干净格式”。但这些修整都只作用于 messages_for_model 这个临时投影,不会污染 messages 这个档案。

  3. 三重崩溃保护:提前持久化用户消息(pending_user_turn) + 分阶段写 runtime_checkpoint + 恢复时去重追加。三者配合,让进程随时被杀都能从合理的状态恢复,不丢消息、不重复消息。

  4. hook 的错误隔离CompositeHook 对用户自定义 hook 做异常隔离,内置的 _LoopHook 则用 reraise=True 允许异常上浮。这样自定义 hook 的 bug 不会把整个 agent 崩掉。

  5. 流式输出里的 think 过滤_LoopHook.on_stream 维护一个累积缓冲来过滤跨 chunk 的 <think>...</think> 块,这个细节如果用”每个 delta 单独过滤”就会出 bug,解法很经典。

  6. mid-turn injection:在工具调用期间允许注入新用户消息,避免了”开启并发任务导致 session 状态竞争”的问题,同时让 agent 能感知到用户中途改变了意图。

至此,三篇笔记把 nanobot 从 CLI 启动、记忆系统、provider 层到 AgentLoop 核心引擎,系统地走了一遍。代码量虽然不多,但每个模块的设计都很扎实,是学习 AI Agent 框架设计很好的案例。