上篇回顾 四篇下来,nanobot 的核心流程已经相当完整了:笔记1讲了项目结构和记忆系统;笔记2讲了 provider 层怎么抹平各家大模型接口;笔记3讲了 AgentLoop 从接收消息到 AgentRunner 迭代循环的全过程;笔记4把工具系统讲了一遍,包括并发策略、MCP接入和参数校验。
但这几篇里有个话题一直没深讲:spawn 工具 。在笔记4工具清单里它只出现了一行——“spawn 子代理工具”。这个工具背后藏着 nanobot 整个多 agent 协作的架构:主 agent 可以把任务拆出去让后台的子 agent 独立执行,子 agent 完成后再把结果”喂”回给主 agent,整个通信走一条统一的消息总线。
这篇把这套机制从头到尾讲清楚。
整体设计思路 在展开看代码之前,先把脑子里的模型建起来。
nanobot 的多 agent 架构有三个核心组件:
MessageBus (nanobot/bus/queue.py):一条异步消息队列,所有进出 agent 的消息都走这里,包括后面子 agent 的结果回传也是。
SubagentManager (nanobot/agent/subagent.py):子 agent 的生命周期管理器,负责 spawn、跟踪、取消。
SpawnTool (nanobot/agent/tools/spawn.py):暴露给 LLM 的工具接口,LLM 调用它来触发子 agent。
这三个东西的关系很清楚:SpawnTool 是入口,SubagentManager 是执行引擎,MessageBus 是通信通道。
MessageBus:消息总线 先看最底层的 MessageBus。它的代码很短,但设计思路值得讲一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MessageBus : def __init__ (self ): self .inbound: asyncio.Queue[InboundMessage] = asyncio.Queue() self .outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue() async def publish_inbound (self, msg: InboundMessage ) -> None : await self .inbound.put(msg) async def consume_inbound (self ) -> InboundMessage: return await self .inbound.get() async def publish_outbound (self, msg: OutboundMessage ) -> None : await self .outbound.put(msg) async def consume_outbound (self ) -> OutboundMessage: return await self .outbound.get()
就两条队列,一进一出。inbound 是各个渠道(CLI、Telegram、Discord……)往 agent 发消息用的,outbound 是 agent 往外发响应用的。
对应的两个数据类也很简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @dataclass class InboundMessage : channel: str sender_id: str chat_id: str content: str media: list [str ] = ... metadata: dict [str , Any ] = ... session_key_override: str | None = None @property def session_key (self ) -> str : return self .session_key_override or f"{self.channel} :{self.chat_id} " @dataclass class OutboundMessage : channel: str chat_id: str content: str reply_to: str | None = None media: list [str ] = ... metadata: dict [str , Any ] = ...
这个设计有一个好处:channel=”system” 的 InboundMessage 也走同一条队列 。子 agent 完成任务后把结果包装成 channel="system" 的 InboundMessage,发到 inbound 队列,主 agent 的消息循环自然就会处理到它——不需要任何额外的通知机制,完全复用现有的消息处理路径。
SpawnTool 是 LLM 用来启动子 agent 的工具,代码很薄:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @tool_parameters( tool_parameters_schema( task=StringSchema("The task for the subagent to complete" ), label=StringSchema("Optional short label for the task (for display)" ), required=["task" ], ) )class SpawnTool (Tool ): def __init__ (self, manager: "SubagentManager" ): self ._manager = manager self ._origin_channel = "cli" self ._origin_chat_id = "direct" def set_context (self, channel: str , chat_id: str ) -> None : """记录当前消息来源,子agent结束后结果要回到这里""" self ._origin_channel = channel self ._origin_chat_id = chat_id self ._session_key = f"{channel} :{chat_id} " async def execute (self, task: str , label: str | None = None , **kwargs ) -> str : return await self ._manager.spawn( task=task, label=label, origin_channel=self ._origin_channel, origin_chat_id=self ._origin_chat_id, session_key=self ._session_key, )
工具参数只有两个:task(子 agent 要做什么)和可选的 label(展示用的短标签)。set_context 这个方法是在每次处理消息时被 AgentLoop 调用的,确保子 agent 知道要把结果回传到哪个 channel + chat_id。
工具的 description 里还有一段引导语:
1 2 3 4 "Use this for complex or time-consuming tasks that can run independently. The subagent will complete the task and report back when done. For deliverables or existing projects, inspect the workspace first and use a dedicated subdirectory when helpful."
这段描述直接影响 LLM 的行为:遇到耗时或复杂的独立任务时优先 spawn,不要阻塞主对话流。
从 spawn 工具调用说起:主流程 来看 LLM 调用 spawn 之后的完整流程。
整个过程拆开看:
用户发消息进来,AgentLoop 从 MessageBus 消费,包装成异步任务开始处理
_process_message 组装好历史消息,调 runner.run
LLM 返回的 tool_calls 里包含 spawn(task="...", label="...")
AgentLoop 执行 SpawnTool,SpawnTool 把参数转给 SubagentManager
SubagentManager 用 asyncio.create_task 把子 agent 丢到后台,立刻返回 一条确认文字
LLM 拿到 spawn 的返回值,生成最终回复(比如”已在后台启动任务”)
主 agent 把响应推给用户——此时子 agent 还在后台跑着
这里关键点是非阻塞 :spawn 调用本身不等子 agent 完成,主 agent 马上就能回复用户,子 agent 独立在后台执行。
SubagentManager:子 agent 的生命周期 SubagentManager 是管理子 agent 的核心,代码在 nanobot/agent/subagent.py。先看 spawn 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class SubagentManager : def __init__ (self, provider, workspace, bus, ... ): self .runner = AgentRunner(provider) self ._running_tasks: dict [str , asyncio.Task[None ]] = {} self ._session_tasks: dict [str , set [str ]] = {} async def spawn ( self, task: str , label: str | None = None , origin_channel: str = "cli" , origin_chat_id: str = "direct" , session_key: str | None = None , ) -> str : task_id = str (uuid.uuid4())[:8 ] display_label = label or task[:30 ] + ("..." if len (task) > 30 else "" ) origin = {"channel" : origin_channel, "chat_id" : origin_chat_id} bg_task = asyncio.create_task( self ._run_subagent(task_id, task, display_label, origin) ) self ._running_tasks[task_id] = bg_task if session_key: self ._session_tasks.setdefault(session_key, set ()).add(task_id) def _cleanup (_: asyncio.Task ) -> None : self ._running_tasks.pop(task_id, None ) if session_key and (ids := self ._session_tasks.get(session_key)): ids.discard(task_id) if not ids: del self ._session_tasks[session_key] bg_task.add_done_callback(_cleanup) return f"Subagent [{display_label} ] started (id: {task_id} ). I'll notify you when it completes."
task_id 取 UUID 的前8位,够短够唯一。_running_tasks 是个字典,key 是 task_id,方便查询和取消。_session_tasks 按 session 分组,这样 /stop 命令可以一次取消某个会话的所有子 agent。
add_done_callback 做清理,任务完成后把自己从两个字典里删掉。
_run_subagent:子 agent 的实际执行 子 agent 运行在 _run_subagent 里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 async def _run_subagent ( self, task_id: str , task: str , label: str , origin: dict [str , str ], ) -> None : try : tools = ToolRegistry() tools.register(ReadFileTool(...)) tools.register(WriteFileTool(...)) tools.register(EditFileTool(...)) tools.register(ListDirTool(...)) tools.register(GlobTool(...)) tools.register(GrepTool(...)) if self .exec_config.enable: tools.register(ExecTool(...)) if self .web_config.enable: tools.register(WebSearchTool(...)) tools.register(WebFetchTool(...)) system_prompt = self ._build_subagent_prompt() messages = [ {"role" : "system" , "content" : system_prompt}, {"role" : "user" , "content" : task}, ] result = await self .runner.run(AgentRunSpec( initial_messages=messages, tools=tools, model=self .model, max_iterations=15 , max_tool_result_chars=self .max_tool_result_chars, hook=_SubagentHook(task_id), fail_on_tool_error=True , )) if result.stop_reason in ("tool_error" , "error" ): await self ._announce_result(..., status="error" ) return await self ._announce_result(..., status="ok" ) except Exception as e: await self ._announce_result(..., status="error" )
几个细节值得关注:
工具集裁剪 :子 agent 没有 spawn 工具,所以子 agent 不能再 spawn 更多子 agent(不支持递归嵌套)。子 agent 也没有 message 工具,它不能直接给用户发消息,只能把结果交给主 agent 来转发。
独立系统 prompt :子 agent 用 subagent_system.md 模板构建自己的系统 prompt,和主 agent 完全隔离,没有主 agent 的记忆和历史。
1 2 3 4 5 6 # subagent_system.md {{ time_ctx }} You are a subagent spawned by the main agent to complete a specific task. Stay focused on the assigned task. Your final response will be reported back to the main agent.
简短而直接——告诉 LLM:你是子 agent,专注完成分配的任务,最终响应会被报告给主 agent。
max_iterations=15 :子 agent 的最大迭代次数限制为 15,比主 agent 少一些,避免子 agent 失控地无限跑下去。
**_SubagentHook**:子 agent 有一个简单的 hook,只做日志记录——每次工具调用前打印 [task_id] executing: tool_name with arguments: ...,方便调试。
子 agent 不写 checkpoint,也不污染主 agent 的 session :这个细节容易被忽略,但很关键。仔细对比一下主 agent 和子 agent 传给 AgentRunSpec 的参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 result = await self .runner.run(AgentRunSpec( ... workspace=self .workspace, session_key=session.key, checkpoint_callback=_checkpoint, ... )) result = await self .runner.run(AgentRunSpec( initial_messages=messages, tools=tools, model=self .model, max_iterations=15 , ... ))
AgentRunner._emit_checkpoint 的实现如下:
1 2 3 4 async def _emit_checkpoint (self, spec: AgentRunSpec, payload: dict ) -> None : callback = spec.checkpoint_callback if callback is not None : await callback(payload)
checkpoint_callback 是 None,所以子 agent 运行期间不会触发任何 checkpoint 写入,不会碰主 agent session 的 metadata。主 agent 的崩溃恢复机制和子 agent 完全互不干扰。
另一个值得关注的地方是超大工具结果的磁盘落盘。当工具调用返回内容超过 max_tool_result_chars 限制时,runner 会把超出部分持久化到 .nanobot/tool-results/ 目录,路径结构是 bucket/tool_call_id.txt:
1 2 3 bucket = ensure_dir(root / safe_filename(session_key or "default" )) path = bucket / f"{safe_filename(tool_call_id)} .txt"
主 agent 的 bucket 按 session_key 区分(比如 cli_direct/),子 agent 因为没传 session_key,都落到 "default" 这个共用桶里。但并发安全靠 tool_call_id 保证 :每个工具调用的 ID 来自 LLM 响应,各自独立,不同子 agent 的文件路径天然不同,互不干扰。
结果回传:子 agent 如何通知主 agent 子 agent 跑完之后,通过 _announce_result 把结果”注入”回系统:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 async def _announce_result ( self, task_id: str , label: str , task: str , result: str , origin: dict [str , str ], status: str , ) -> None : status_text = "completed successfully" if status == "ok" else "failed" announce_content = render_template( "agent/subagent_announce.md" , label=label, status_text=status_text, task=task, result=result, ) msg = InboundMessage( channel="system" , sender_id="subagent" , chat_id=f"{origin['channel' ]} :{origin['chat_id' ]} " , content=announce_content, ) await self .bus.publish_inbound(msg)
subagent_announce.md 模板内容:
1 2 3 4 5 6 7 8 9 [Subagent '{{ label }}' {{ status_text }}] Task: {{ task }} Result: {{ result }} Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs.
注意最后一句——不要向用户提及”subagent”或 task ID 这些技术细节 。这是有意为之的:对用户来说,这件事就是 nanobot 帮你做完了,不需要知道背后是主 agent 还是子 agent 做的。
整个回传流程的关键在于 chat_id 的构造:f"{origin['channel']}:{origin['chat_id']}"。这样主 agent 收到 system 消息后,从 chat_id 里解析出原始的 channel 和 chat_id,就能找到对应的 session 和最终推给用户的出口。
主 agent 如何处理子 agent 的结果 子 agent 把结果发进 inbound 队列后,主 agent 的消息循环正常消费到它,走进 _process_message。这里有个专门的分支处理 channel == "system" 的消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 async def _process_message (self, msg: InboundMessage, ... ) -> OutboundMessage | None : if msg.channel == "system" : channel, chat_id = ( msg.chat_id.split(":" , 1 ) if ":" in msg.chat_id else ("cli" , msg.chat_id) ) logger.info("Processing system message from {}" , msg.sender_id) key = f"{channel} :{chat_id} " session = self .sessions.get_or_create(key) if self ._restore_runtime_checkpoint(session): self .sessions.save(session) if self ._restore_pending_user_turn(session): self .sessions.save(session) session, pending = self .auto_compact.prepare_session(session, key) await self .consolidator.maybe_consolidate_by_tokens(session) self ._set_tool_context(channel, chat_id, ...) history = session.get_history(max_messages=0 ) current_role = "assistant" if msg.sender_id == "subagent" else "user" messages = self .context.build_messages( history=history, current_message=msg.content, channel=channel, chat_id=chat_id, session_summary=pending, current_role=current_role, ) final_content, _, all_msgs, _, _ = await self ._run_agent_loop(messages, ...) self ._save_turn(session, all_msgs, 1 + len (history)) self ._clear_runtime_checkpoint(session) self .sessions.save(session) return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content)
这里有个微妙的设计:current_role。
通常用户发的消息构建成 "role": "user" 的消息,主 agent 构建成 "role": "assistant"。但子 agent 的结果是以 "role": "assistant" 插进来的——因为子 agent 本质上是主 agent 派出去的”手”,它做完事情汇报结果,这件事从语义上更像是 assistant 在描述自己做了什么,而不是用户在问问题。
主 agent 拿到这个带有子 agent 结果的消息历史,再走一轮 runner.run,用自然语言把结果整合后回复给用户。这一轮里 LLM 受 subagent_announce.md 末尾那句指令引导,会把技术细节屏蔽掉,只给用户简洁的结果说明。
session 隔离与持久化 子 agent 和主 agent 共用同一个 session 。回传的 system 消息里 chat_id = "cli:direct"(原始用户的 chat_id),所以 _process_message 找到的 session 跟用户对话用的是同一个,子 agent 的执行结果会被 _save_turn 正常写进这个 session 的消息历史里。
下次用户发消息时,历史里就有这段子 agent 的执行记录,LLM 可以根据上下文给出更连贯的回复。
session 文件是 JSONL 格式(sessions/cli_direct.jsonl),每次 sessions.save() 都全量重写——metadata 一行,然后每条消息一行:
1 2 3 4 5 6 7 { "_type" : "metadata" , "key" : "cli:direct" , "created_at" : "..." , "updated_at" : "..." , "metadata" : { ...} } { "role" : "user" , "content" : "帮我写个爬虫" , "timestamp" : "..." } { "role" : "assistant" , "content" : null , "tool_calls" : [ { "id" : "call_xxx" , "function" : { "name" : "spawn" , "arguments" : "{\"task\": \"...\"}" } } , ...] , "timestamp" : "..." } { "role" : "tool" , "tool_call_id" : "call_xxx" , "name" : "spawn" , "content" : "Subagent [爬虫任务] started (id: a1b2c3d4)." , "timestamp" : "..." } { "role" : "assistant" , "content" : "已在后台启动任务,完成后通知你。" , "timestamp" : "..." } { "role" : "assistant" , "content" : "[Subagent '爬虫任务' completed successfully]\n\nTask: ...\n\nResult: ..." , "timestamp" : "..." } { "role" : "assistant" , "content" : "爬虫脚本已写好,保存在 workspace/crawler.py。" , "timestamp" : "..." }
注意:子 agent 的回传结果以 "role": "assistant" 存进 session,这样历史记录是完整的,下次能正确还原整个对话脉络。
mid-turn 消息注入与多 agent 的关系 mid-turn 注入机制在笔记3里已经详细讲过(_pending_queues、injection_callback、_try_drain_injections),这里不重复。只说一个和多 agent 直接相关的点:
子 agent 完成后推进来的 system 消息,走的是主 agent 主循环而不是 mid-turn 注入 。主循环 run() 用 asyncio.create_task 分发,不阻塞,所以它能同时:一边处理用户消息的 _dispatch,一边等待并消费子 agent 推进来的 system 消息,触发第二个 _dispatch。这是两个独立的 asyncio task,和 mid-turn 注入(同一 task 内追加消息)是不同的机制。
session 并发控制 多 agent 带来的另一个问题是并发:多个用户同时发消息、一个用户快速连发消息,怎么保证不乱套?
nanobot 的并发控制有两层:
1 2 3 4 5 6 self ._session_locks: dict [str , asyncio.Lock] = {} _max = int (os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS" , "3" )) self ._concurrency_gate: asyncio.Semaphore | None = ( asyncio.Semaphore(_max ) if _max > 0 else None )
第一层:session 级别的 Lock。 _session_locks 是按 session key 分配的 asyncio.Lock,同一个 session 的消息必须串行处理——不能同时有两个请求在跑同一个用户的对话。
第二层:全局信号量。 _concurrency_gate 是一个全局 Semaphore,默认值 3(可以通过环境变量 NANOBOT_MAX_CONCURRENT_REQUESTS 调整,<=0 表示不限制)。这个限制的是跨 session 的整体并发量 ,防止瞬间涌入大量请求把系统压垮。
两层组合起来:
1 2 3 async with lock, gate: response = await self ._process_message(msg, ...)
同一 session 串行(lock),不同 session 最多同时跑 N 个(gate)。这样在多用户场景下,每个用户的对话不会互相干扰,整体资源也不会失控。
子 agent 的取消 如果用户发 /stop 命令,主 agent 可以取消当前 session 的所有子 agent:
1 2 3 4 5 6 7 8 9 10 11 12 async def cancel_by_session (self, session_key: str ) -> int : tasks = [ self ._running_tasks[tid] for tid in self ._session_tasks.get(session_key, []) if tid in self ._running_tasks and not self ._running_tasks[tid].done() ] for t in tasks: t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True ) return len (tasks)
cancel() 会向协程注入 CancelledError,asyncio.gather 等它们真的停下来(return_exceptions=True 让取消异常不往上抛)。返回值是实际取消的数量,命令层可以用来给用户一个反馈。
另外 get_running_count() 和 get_running_count_by_session() 提供查询接口,可以在需要时检查有多少子 agent 正在运行。
完整的多 agent 通信流 讲到这里有一个问题还没交代清楚:用户最终会收到几条回复?什么时候收到? 要回答这个问题,必须先看 AgentLoop.run() 这个主循环是怎么工作的。
AgentLoop.run():永不停歇的消息泵1 2 3 4 5 6 7 8 9 async def run (self ) -> None : self ._running = True while self ._running: msg = await asyncio.wait_for(self .bus.consume_inbound(), timeout=1.0 ) ... task = asyncio.create_task(self ._dispatch(msg)) self ._active_tasks.setdefault(effective_key, []).append(task)
关键在 asyncio.create_task——主循环不等 _dispatch 完成,创建好任务就立刻循环回去,继续 consume_inbound 等下一条消息。这意味着:
处理用户消息的 _dispatch(msg1) 在后台异步跑
主循环同时在等待 inbound 队列的下一条消息
子 agent 完成后往 inbound 队列推的 system 消息,也会被同一个主循环消费到,再 create_task(_dispatch(system_msg))
CLI 侧:两个并行的 asyncio task CLI 模式下启动时跑了两个独立的 asyncio task:
1 2 3 bus_task = asyncio.create_task(agent_loop.run()) outbound_task = asyncio.create_task(_consume_outbound())
_consume_outbound 负责把 outbound 队列里的消息打印给用户。但 CLI 是交互式的,用户发一条消息、等一条回复,提示符不能一直占着。所以这里用了一个 turn_done Event 来管控输入提示符的节奏:
用户按回车发消息时,turn_done.clear(),随后 await turn_done.wait() 阻塞,提示符消失、等待回复
_consume_outbound 收到第一条实质性回复时,turn_done.set(),提示符回来,用户可以继续输入
此后 再收到的消息(即子 agent 异步推送的结果)turn_done 已经是 set 状态,直接打印,不影响提示符
1 2 3 4 5 6 7 8 9 10 11 12 13 async def _consume_outbound (): while True : msg = await bus.consume_outbound() ... if not turn_done.is_set(): turn_response.append((msg.content, ...)) turn_done.set () elif msg.content: await _print_interactive_response(msg.content, ...)
用户确实会收到两条回复 :第一条是主 agent 立刻给出的”任务已在后台启动”,第二条是子 agent 完成后异步推送的结果——两条消息之间可能隔几秒甚至几分钟。
完整的通信时序 用一张图把整条链路串起来:
图里最关键的两个地方:
AgentLoop 主循环用 create_task 分发,不阻塞 :处理用户消息的 _dispatch 在后台跑,主循环立刻回去 consume_inbound 等下一条。所以子 agent 完成后推进来的 system 消息,能被同一个主循环消费到,再触发第二个 _dispatch。
turn_done 的状态决定 CLI 怎么处理这条 outbound 消息 :用户等待期间收到的是”直接回复”,拿到之后提示符恢复;此后再收到的是”异步推送”,直接打印出来,不影响用户继续输入。
整条链路都是异步的、松耦合的 :子 agent 不直接调主 agent 的方法,主 agent 也不阻塞等待子 agent,所有通信都通过 MessageBus 的队列传递,主循环永远是那个消息泵。
小结 这篇把 nanobot 多 agent 的实现从头到尾走了一遍。
通信全靠 MessageBus 一条队列 。子 agent 完成任务后不走回调、不走单独通道,而是构造 channel="system" 的 InboundMessage 发回 inbound 队列,主循环像处理普通消息一样消费到它,再起一个新的 _dispatch task——整个多 agent 协作完全复用单 agent 的消息处理路径。
子 agent 的边界明确 。没有 spawn 工具所以不能递归嵌套,没有 message 工具所以不能绕过主 agent 直接推消息给用户,checkpoint_callback 和 session_key 都不传所以运行期间不碰主 agent 的 session 和崩溃恢复状态。子 agent 是一把受控的手,做完事情汇报结果。
用户会收到两条独立回复 。第一条是 spawn 调用完成后主 agent 立刻回的”任务已启动”,第二条是子 agent 完成后经主循环再次处理推送过来的结果——两条消息可能相差几秒到几分钟,CLI 侧用 turn_done Event 区分这两种情况,第一条回来提示符恢复,第二条异步打印不打断用户输入。
子 agent 的结果以 “assistant” 角色存进 session ,保证历史记录完整,主 agent 下次处理请求时能看到完整上下文。