nanobot源码学习(3)-AgentLoop
上篇回顾
两篇下来,整体骨架和两个主要模块已经弄清楚了:笔记1讲了项目结构、CLI启动、记忆系统(短/中/长记忆的分层设计,Consolidator压缩,Dream长记忆更新);笔记2讲了provider层——怎么把各家大模型的API抹平成统一接口、流式响应的拼接、重试逻辑。
两篇里都反复提到”AgentLoop”,但都是点到为止,到了核心的循环调用部分就略过去了。这篇来把这个最核心的引擎从头到尾走一遍。
AgentLoop 是整个系统的心脏:它接收用户消息,把历史和记忆组装成prompt,驱动LLM循环推理,执行工具,最后把结果回给调用方。光loop.py就1000行,runner.py也近1000行,但逻辑其实相当清晰——只要按调用顺序一段一段看,并不难理解。
全局调用链
先有个整体感觉,再细讲每一段。
从最外层的process_direct进去,一路到AgentRunner.run里的迭代循环,最后回到_process_message做收尾。下面按这个顺序逐段拆解。
从入口开始:process_direct
CLI模式下调用的是这个方法:
1 | # nanobot/cli/commands.py |
process_direct 本身很薄,只是把字符串包装成 InboundMessage,然后转给 _process_message:
1 | # loop.py |
InboundMessage 就是个普通的数据类,记录了发消息的渠道(cli/telegram/discord等)、发送者、内容、附件。之所以要包装成这个结构,是因为 nanobot 支持多渠道接入——Telegram、钉钉、Slack、Discord……无论哪个渠道来的消息,后续处理逻辑完全一样,都走 _process_message。
_process_message:处理前的五步准备
_process_message 是真正的处理核心,在真正调 LLM 之前要做五件事。这里按 CLI 路径讲(channel != "system" 分支)。
第一步:加载 Session,并恢复可能的崩溃断点
1 | key = session_key or msg.session_key # 例如 "cli:direct" |
get_or_create 会先在内存里找,找不到就从文件加载(sessions/cli_direct.jsonl),再找不到就新建一个空 session。Session 里存着历史消息列表和一些元数据。
_restore_runtime_checkpoint 和 _restore_pending_user_turn 这两行是 crash recovery 的逻辑,放到后面的 checkpoint 章节专门讲。
第二步:slash 命令拦截
1 | raw = msg.content.strip() |
如果用户发的是 /stop、/dream-log、/dream-restore 这类斜杠命令,直接处理后返回,不走后面的 LLM 流程。
第三步:可能触发上下文压缩
1 | await self.consolidator.maybe_consolidate_by_tokens(session) |
这是 Consolidator 的触发点——笔记1里讲过这段逻辑,它会估算当前 prompt 的 token 数,如果超过阈值,就把最老的一段对话用 LLM 摘要压缩掉。在进 LLM 之前先做一次,确保带进去的历史不会撑爆 context window。
第四步:组装消息列表
1 | history = session.get_history(max_messages=0) |
build_messages 在笔记1里已经详细讲过,这里只简单回顾一下它的输出:
1 | [ |
Runtime Context 那段每次都不一样(时间在变),但它是夹在 user 消息里而不是 system prompt 里——因为 system prompt 可以被服务端缓存,时间这种动态内容放进去会每次破坏缓存。
第五步:提前持久化用户消息
这一步的细心程度值得单独说一下:
1 | user_persisted_early = False |
在进入 LLM 循环之前就把用户消息写进 session 文件。原因是:如果进程在 LLM 调用中途被 OOM-kill 或者 agent 自重启,runtime_checkpoint(下文会讲)保存的是助手的中间状态,但用户那句话本身是不在 runtime_checkpoint 里的。如果不提前保存,进程崩溃后用户的那句话会彻底消失。
_mark_pending_user_turn 在 session.metadata 里设一个 pending_user_turn=true 的标记,意思是”用户消息已写入,但对应的助手回复还没生成”,进程重启时就知道要补一个占位的错误回复(详见后文)。
_run_agent_loop:组装钩子,委托给 Runner
五步准备完成后,进入 _run_agent_loop:
1 | final_content, _, all_msgs, stop_reason, had_injections = await self._run_agent_loop( |
_run_agent_loop 做了两件事:
① 组装 _LoopHook
1 | loop_hook = _LoopHook( |
_LoopHook 是 AgentLoop 对 AgentRunner 的桥接适配器,把终端输出、进度提示、tool hint 等上层关注的事件,翻译成 AgentRunner 能调用的 hook 接口。稍后会单独讲 hook 机制。
② 组装 AgentRunSpec 并委托给 Runner
1 | result = await self.runner.run(AgentRunSpec( |
AgentRunSpec 是个纯数据类,把所有配置集中在一起传给 Runner。这种设计很干净:Runner 自己不依赖 AgentLoop,只看 spec 里的参数,所以 Runner 可以被 Dream(记忆更新)等其他地方复用。
AgentRunner.run:核心循环
这是整个系统里最重要的一段代码,在 runner.py 的 run() 方法里。它是一个 for iteration in range(max_iterations) 的循环,每一轮做:消息治理 → 调 LLM → 根据 finish_reason 决定走工具分支还是结束分支。
从图里能看出来这不是一个简单的 if-else,而是一组嵌套的边界条件检查,每个分支最终要么 continue(回到下一轮迭代)要么 break(退出循环)。下面把每个分支都走一遍。
每轮开始:context governance(消息治理)
1 | for iteration in range(spec.max_iterations): |
这里有个核心设计:messages 是整个 turn 的档案(会在 turn 结束后保存到 session 文件),messages_for_model 是送给 LLM 的临时投影——两者可能不同,消息治理的所有修改只作用于这份投影,不会污染档案。
这引出了一个合理的疑问:既然每次都要重新处理,为什么不在持久化的时候就把消息整理成干净格式,这样每次直接用就好了,不用重复处理?
原因有几个:
首先,这些治理操作中有些依赖当前运行时上下文,不适合持久化。比如 _snip_history 要根据当前 context window 大小做裁剪,而 context window 是运行时配置,不同时候可能不同;_microcompact 的”最近10条保留”也是相对于当前 iteration 来说的。
其次,有些操作是临时修复,不应该写入档案。_drop_orphan_tool_results 和 _backfill_missing_tool_results 是为了应对崩溃恢复后可能出现的不一致状态,这些”临时修复”只是为了让当次 LLM 调用能正常进行,持久化了反而污染历史。
最后,**_save_turn 已经做了一部分持久化层面的清理**(比如去掉 Runtime Context、截断超长工具结果)。而 context governance 里其余的处理都是”发送前的最后一公里优化”,本来就是设计为每次重建的。考虑到这些操作都是纯内存的列表扫描/字符串替换,性能开销实际上极低,不是瓶颈。
每一步的作用:
**_drop_orphan_tool_results**:扫描所有 role=tool 的消息,找没有对应 assistant.tool_calls 的孤儿 tool 结果并删掉。这种孤儿一般是崩溃恢复后的历史状态不一致导致的。
**_backfill_missing_tool_results**:反过来,找有 tool_calls 声明但没有对应 role=tool 消息的 assistant 消息,给它补一条占位错误消息:
1 | [Tool result unavailable — call was interrupted or lost] |
这是因为某些 provider 要求 assistant 消息中每个 tool_call 都必须有对应的 tool 结果,否则会报 400。
**_microcompact**:把历史中较旧的大型工具返回值(read_file、exec、grep 等的输出)替换成一行摘要,例如:
1 | [read_file result omitted from context] |
具体规则:统计所有”可压缩工具”(_COMPACTABLE_TOOLS 集合)的结果条数,最近10条保留原文,更早的且内容超过500字符的才压缩。这样既节省 token,又保留了最近使用的工具结果(因为模型通常还会引用最近几次的工具输出)。
**_apply_tool_result_budget**:对单条工具结果应用字符上限(max_tool_result_chars),超长的截断。超长工具结果还可能被”溢出到文件”——这个功能由 maybe_persist_tool_result 实现,超出限制的内容写到 workspace 下的临时文件,tool 消息里只保留一个文件引用:
1 | [Tool result saved to: .tool_results/cli_direct/call_abc123.txt (42678 chars)] |
模型看到这个引用后,如果需要完整内容,可以自己用 read_file 去读那个文件。
**_snip_history**:如果经过上面几步之后,估算的 token 数仍然超过 context window 预算,就从消息数组里从前往后裁掉最老的消息(只裁非 system 消息,且保证裁掉之后第一条消息是 user 消息)。这是最后的兜底手段。
调 LLM:_request_model
1 | response = await self._request_model(spec, messages_for_model, hook, context) |
1 | async def _request_model(self, spec, messages, hook, context): |
是否走流式取决于 hook.wants_streaming()——对应 _LoopHook.wants_streaming() 里的判断:调用方有没有传 on_stream 回调。CLI 模式下传了 renderer.on_delta,所以走流式;某些后台任务(如 Dream)没有 stream 需求,就走非流式。
不管哪条路,最终都返回同一种格式的 LLMResponse(这是笔记2里讲的设计)。
分支一:模型要调工具(has_tool_calls = True)
1 | if response.has_tool_calls: |
_execute_tools 支持并发执行多个工具,具体的并发机制、分批逻辑和错误处理在后面的”工具的并发执行细节”一节详细讲,这里先继续主流程。
**关于 fatal_error**:fatal_error 非 None 时循环立刻 break,具体触发条件见后面章节的 _run_tool 分析。
分支二:正常文本回复
1 | # 过滤掉 <think>...</think> 推理内容 |
这里 hook.finalize_content 实际上调的是 _LoopHook.finalize_content:
1 | def finalize_content(self, context, content): |
也就是把模型回复里的 <think>...</think> 块全部去掉,只留用户可见的正式回答。
这里有个值得说清楚的细节:<think> 过滤只影响展示给用户的内容,不影响存进 messages 历史的内容。实际上对于 DeepSeek-R1、Kimi 这类推理模型,推理内容会以 reasoning_content 字段单独存在 assistant_message 里(而不是嵌在 content 里),然后随着 messages 传给下一轮 LLM——这在 _ALLOWED_MSG_KEYS 里是明确允许的字段。至于用 <think>...</think> 标签格式的模型(如 MiniMax),strip_think 是在 finalize_content 里做的,这时候原始 response.content 仍然完整,build_assistant_message 传入的是过滤后的 clean,所以存进历史的是干净版本——这类模型的推理内容在历史里确实会被丢掉,这是一个取舍:标签格式的推理内容往往篇幅很大,保留会消耗大量 context window,而且没有专用字段处理,所以选择不保留。
处理空回复
1 | if response.finish_reason != "error" and is_blank_text(clean): |
偶尔模型会返回空回复(通常是 bug 或临时问题),先默默重试2次。超过2次还是空的,就追加一条催促消息再试一次:
1 | # runtime.py |
这个重试不带 tools 参数(tools=None),强迫模型直接给文字答案,而不是再去调工具。
处理 finish_reason == "length"(输出被截断)
1 | if response.finish_reason == "length" and not is_blank_text(clean): |
1 | LENGTH_RECOVERY_PROMPT = ( |
max_tokens 限制到了但模型还没说完,就把已经输出的部分追加进历史,然后发一条”继续,不要重复”的催促,让模型接着说。最多3次,防止无限续写。
检查是否有 mid-turn injection
1 | should_continue, injection_cycles = await self._try_drain_injections( |
这是 mid-turn injection 机制的一部分。简单理解就是:用户在 agent 还在循环调用工具的时候,又发来了新消息。这些消息不会启动新的处理流程,而是被放进 pending_queue,等当前循环迭代自然结束、或者进入新一轮迭代时被”注入”进来。具体设计在后面单独讲。
正常结束的判断条件
经过前面所有检查之后,能走到这里说明:
response.has_tool_calls为 False(否则早就continue了)clean不为空(空回复分支要么continue重试要么已经 break 了)response.finish_reason不是"length"(截断分支continue了)response.finish_reason不是"error"(错误分支break了)- 没有 pending injection(有 injection 的话
continue了)
也就是说”正常结束”没有一个显式的 if finish_reason == "stop" 判断,而是穷举了所有异常情况后剩下的那条路。当上面五个条件全都不满足时,就认为这是一次正常的、有内容的、完整的回复:
1 | messages.append(assistant_message or build_assistant_message(clean, ...)) |
把最终的 assistant 回复追加进 messages,写最后一次 checkpoint,设好 final_content,break 出循环。assistant_message 在这里可能已经提前构造好了(injection 检查之前就建了,因为要传给 _try_drain_injections),所以这里优先用它,不重复构造。
工具的并发执行细节
模型可以在一次回复里声明多个 tool_call,_execute_tools 需要把这些工具全部执行完,再把结果汇总返回。先来看整体流程,再看并发是怎么发生的。
整体执行流程
1 | async def _execute_tools(self, spec, tool_calls, external_lookup_counts): |
并发的核心是 asyncio.gather——它把同一批次里的所有工具协程同时提交给事件循环,等所有都完成后再把结果按原顺序返回。**tool_results 里结果的顺序和 tool_calls 的顺序严格对应**,这样后续 zip(tool_calls, results) 能正确配对。
_partition_tool_batches:如何决定哪些工具可以并发
1 | def _partition_tool_batches(self, spec, tool_calls): |
规则很简单:concurrency_safe=True 的工具可以和其他安全工具合并成一批并发执行;concurrency_safe=False(如 write_file)的工具必须独占一批串行执行。
举个具体例子,假设模型一次调了5个工具:
1 | 原始顺序:read_file(安全), read_file(安全), write_file(不安全), grep(安全), grep(安全) |
执行过程:
1 | 第1批: asyncio.gather(read_file_A, read_file_B) ← 两个同时跑 |
为什么要这样设计?write_file 如果和另一个 write_file 同时跑,可能写到同一个文件,结果不可预测。但两个 read_file 只是读,同时跑完全没问题。grep、read_file 这类只读操作是 concurrency_safe=True;write_file、edit_file、spawn(会启动新进程)这类有副作用的工具是 concurrency_safe=False。
_run_tool:单个工具的实际执行
每个工具的执行在 _run_tool 里:
1 | async def _run_tool(self, spec, tool_call, external_lookup_counts): |
asyncio.CancelledError 的特殊处理值得注意:gather 里的任意一个协程如果被取消,CancelledError 必须向上传播,不能被 except BaseException 吞掉,否则整个 gather 就卡住了。这是 Python asyncio 里的一个常见坑,这里做了正确处理。
hook 机制
nanobot 的 hook 系统设计得很干净,值得专门看一下。
AgentHook 和 AgentHookContext
AgentHook 是生命周期钩子的抽象基类,定义了几个可重写的方法:
1 | class AgentHook: |
每个 hook 方法都接受一个 AgentHookContext,里面有当前迭代的完整上下文:
1 |
|
这些字段在循环里是按顺序填充的——response 在 LLM 调用后填,tool_results 在工具执行后填,final_content 在 break 之前填。hook 方法调用的时机对应这些字段的填充顺序。
CompositeHook:多 hook 扇出
当用户注册了额外的 hook(比如用于监控、审计的自定义 hook),用 CompositeHook 把它们和内置的 _LoopHook 合并:
1 | hook = CompositeHook([loop_hook] + self._extra_hooks) |
CompositeHook 会依次调用所有 hook,并对除 _LoopHook 之外的 hook 做异常隔离(catch 住不重新抛出),防止自定义 hook 的 bug 把整个 agent 崩掉:
1 | async def _for_each_hook_safe(self, method_name, *args, **kwargs): |
_reraise=True 的 hook(比如 _LoopHook)发生异常会向上传播,_reraise=False 的(用户自定义的)只打 log 不崩溃。
_LoopHook:内置 hook 的实现
_LoopHook 是 AgentLoop 注册的内置 hook,负责把 runner 的事件翻译成上层可用的行为。几个关键实现:
**before_execute_tools**:工具执行前触发,打印工具调用提示:
1 | async def before_execute_tools(self, context): |
**on_stream**:流式输出的处理最有意思,它不是直接把 delta 传出去,而是做了一层 think 过滤:
1 | async def on_stream(self, context, delta): |
为什么要这样处理而不是直接 strip_think(delta) 就行?因为 <think>...</think> 是一个”跨 chunk 的结构”——开头的 <think> 可能在第3个 chunk,结尾的 </think> 可能在第50个 chunk。如果每个 delta 单独过滤,过滤不了跨 chunk 的 think 块。
维护一个累积缓冲 _stream_buf,每收到新 delta 就用 strip_think 处理整个缓冲,计算处理后的结果比上次多了什么,只把新增量传出去。这样用户看到的是过滤了 think 内容的干净文本,并且实时更新。
**after_iteration**:每轮结束后打印 token 用量日志:
1 | async def after_iteration(self, context): |
checkpoint:断点续传与崩溃恢复
这个机制是 nanobot 的容错设计里最复杂的部分,值得单独仔细讲。
为什么不直接把 messages 存进 session?
在讲 checkpoint 的实现之前,先说清楚一个容易产生的疑问:runner 在执行工具的过程中一直在往 messages 里 append 新消息,为什么不直接把这个列表存进 session.messages 再落盘,而是要引入 runtime_checkpoint 这套额外字段?
关键在于:runner 里的 messages 和 session.messages 是两个独立的对象。
整个 runner 运行期间操作的 messages 是在 _run_agent_loop 里从 initial_messages 拷贝出来的一个本轮 turn 的本地列表。runner 运行期间产生的所有新消息(assistant 的工具调用声明、每个工具的执行结果、最终回复……)都只 append 进这个本地列表,session.messages 在这期间完全没有变化。只有 runner 跑完、_save_turn 被调用之后,这些新消息才被写进 session.messages。
所以如果进程在 runner 运行中途被杀,session 文件里的 session.messages 完全是上一轮结束时的状态,本轮所有中间消息都消失了。runtime_checkpoint 充当的正是”把 runner 内部产生的中间状态提前存到 session.metadata 里”的桥梁。
写 checkpoint
在 runner 的每一轮迭代里,有三个时间点会写 checkpoint:
① 发出工具调用,等待执行(phase=awaiting_tools)
1 | await self._emit_checkpoint(spec, { |
② 所有工具执行完(phase=tools_completed)
1 | await self._emit_checkpoint(spec, { |
③ 生成了最终回复(phase=final_response)
1 | await self._emit_checkpoint(spec, { |
_emit_checkpoint 会调 checkpoint_callback,而 callback 是 AgentLoop._set_runtime_checkpoint:
1 | async def _checkpoint(payload): |
每次 checkpoint 都会立刻 sessions.save(session)。这保证了进程随时被杀,session 文件里都有最新的进度快照。
这里有一个值得注意的性能问题:sessions.save() 是全量重写,不是追加写:
1 | def save(self, session: Session) -> None: |
也就是说,每次 checkpoint 都要把 session.messages 里的所有历史消息加上最新的 metadata 从头写一遍。考虑一个 agent 调用了大量工具的场景:每次工具调用前后各有一次 checkpoint,加上用户消息提前落盘、最终 turn 保存、背景 consolidation……一轮 agentLoop 下来可能触发十几次全量重写。如果 session 历史很长,这是个明显的写放大问题。
不过 nanobot 本身定位是个人工具,session 通过 Consolidator 定期压缩历史,文件通常不会很大,在实际使用中这个开销一般不是瓶颈。若要改进,可以考虑引入 WAL(预写日志)或把 checkpoint 单独存成一个轻量的旁路文件——但这会显著增加代码复杂度,现阶段的取舍是合理的。
用户消息也要提前保存
除了 runner 里的 checkpoint,在 _process_message 里还有两个标记:
- **
_mark_pending_user_turn**:进入 runner 之前,在session.metadata里设pending_user_turn=true,表示”用户消息已写入但还没有助手回复” - **
_clear_pending_user_turn**:runner 正常结束后清掉这个标记
这两个标记加上 runtime_checkpoint,构成了完整的崩溃恢复的依据。
崩溃恢复:_restore_runtime_checkpoint
下次收到消息时(或者进程重启后有新消息进来),_process_message 一开始就调:
1 | if self._restore_runtime_checkpoint(session): |
_restore_runtime_checkpoint 的逻辑:
1 | def _restore_runtime_checkpoint(self, session): |
恢复时有一个去重逻辑:检查 restored_messages 的前 N 条和 session.messages 的最后 N 条是否已经一样,如果一样就不重复追加(用 _checkpoint_message_key 做内容级别的比较,不是引用比较)。这是因为 checkpoint 里保存的内容和 _save_turn 最终保存的内容可能有重叠,去重避免消息重复出现。
_restore_pending_user_turn 处理更简单的情况:如果 pending_user_turn=true 且 session 末尾是 user 消息(说明 runner 还没来得及生成任何回复就崩了),就追加一条占位错误:
1 | session.messages.append({ |
这样下次 LLM 看到的历史是语义完整的——用户说了话,助手也有回复(虽然是个错误消息),不会出现 user 消息悬空的情况。
_process_message 收尾:保存本轮数据
Runner 执行完之后,回到 _process_message:
1 | # save_skip 是 initial_messages 的长度,加上提前持久化的用户消息(如果有的话) |
_save_turn:把本轮新消息沉淀到 session
all_msgs(runner 返回的 messages 列表)包含了整个历史,save_skip 告诉 _save_turn 从哪里开始才是新的、本轮产生的消息:
1 | def _save_turn(self, session, messages, skip): |
这里的 Runtime Context 剥离很重要。每次发给 LLM 的 user 消息里都带着时间戳、渠道等动态信息,但这些不需要保存进 session,下次重建 prompt 时会重新生成。如果存进去,以后 LLM 看历史消息里永远有旧的时间戳,反而产生干扰。
最终返回
1 | meta = dict(msg.metadata or {}) |
OutboundMessage 带着 _streamed=True 让下游渠道知道”内容已经实时发送过了,不要重复发”。但如果是非流式(或出现错误),这条消息就是正常的完整回复。
还有一个特殊情况:如果模型在这轮调用了 message 工具(nanobot 内置的一个工具,允许模型主动给用户发消息),且没有 mid-turn injection,那么 _process_message 会返回 None——因为用户可见的内容已经通过 message 工具发出去了,不应该再发一遍:
1 | if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: |
mid-turn injection:同一个 session 的并发消息
这是一个相对高级的特性,值得单独说一下。
问题背景
想象一个场景:用户发了一句”帮我分析这100个文件”,agent 开始疯狂调工具,要跑好几分钟。这期间用户等得不耐烦,又发了一句”先专注最重要的10个就行”。
如果直接启动两个并发任务处理这两条消息,它们会在 session 上竞争,消息顺序会错乱。正确的做法是:第二条消息应该注入进当前正在运行的任务里,让它感知到用户的新指令。
实现方式
每个活跃 session 有一个 pending_queue(最大容量20条):
1 | # _dispatch 里 |
当有新消息进来,但这个 session 已经有任务在跑,就路由进 pending queue 而不是启动新任务:
1 | # run() 里 |
在 runner 的每轮迭代结束时,以及工具执行完后,都会调 _try_drain_injections 尝试从 pending queue 取出新消息追加进 messages:
1 | async def _drain_pending(*, limit=_MAX_INJECTIONS_PER_TURN) -> list[dict]: |
注入有上限:单次最多注入3条(_MAX_INJECTIONS_PER_TURN),一个 turn 内最多注入5轮(_MAX_INJECTION_CYCLES),防止被无限追加消息。
任务结束后,如果 pending queue 里还有消息没被消费(比如 max_iterations 到了),这些消息会被重新 publish 进 bus,作为新的 inbound message 触发新一轮处理:
1 | # _dispatch 的 finally 块 |
并发控制与消息调度
AgentLoop.run() 是服务模式(非 CLI 直接调用)的入口,它是一个事件循环:
1 | async def run(self): |
_dispatch 里有两层并发控制:
1 | async def _dispatch(self, msg): |
lock:同一个 session 的消息串行处理,不会有两条消息同时修改同一个 sessiongate:全局最多同时处理3个 session 的消息(通过NANOBOT_MAX_CONCURRENT_REQUESTS环境变量可调整,<=0表示不限制)
总结
这篇从process_direct入口走到OutboundMessage返回,把 AgentLoop 的完整运行过程串了一遍。几个值得反复回味的设计点:
职责分离得很干净:
AgentLoop管理 session 状态和外部集成(memory、tools、bus),AgentRunner专注于迭代调用逻辑,LLMProvider只管调 API。三层各自不知道彼此的细节,通过AgentRunSpec、AgentRunResult、LLMResponse这几个数据类传递信息。消息治理(context governance):每次迭代都会做孤儿修复、微压缩、工具结果溢出、history snip,把历史消息修整成”LLM 能接受的干净格式”。但这些修整都只作用于
messages_for_model这个临时投影,不会污染messages这个档案。三重崩溃保护:提前持久化用户消息(
pending_user_turn) + 分阶段写runtime_checkpoint+ 恢复时去重追加。三者配合,让进程随时被杀都能从合理的状态恢复,不丢消息、不重复消息。hook 的错误隔离:
CompositeHook对用户自定义 hook 做异常隔离,内置的_LoopHook则用reraise=True允许异常上浮。这样自定义 hook 的 bug 不会把整个 agent 崩掉。流式输出里的 think 过滤:
_LoopHook.on_stream维护一个累积缓冲来过滤跨 chunk 的<think>...</think>块,这个细节如果用”每个 delta 单独过滤”就会出 bug,解法很经典。mid-turn injection:在工具调用期间允许注入新用户消息,避免了”开启并发任务导致 session 状态竞争”的问题,同时让 agent 能感知到用户中途改变了意图。
至此,三篇笔记把 nanobot 从 CLI 启动、记忆系统、provider 层到 AgentLoop 核心引擎,系统地走了一遍。代码量虽然不多,但每个模块的设计都很扎实,是学习 AI Agent 框架设计很好的案例。