nanobot源码学习(5)-多agent

上篇回顾

四篇下来,nanobot 的核心流程已经相当完整了:笔记1讲了项目结构和记忆系统;笔记2讲了 provider 层怎么抹平各家大模型接口;笔记3讲了 AgentLoop 从接收消息到 AgentRunner 迭代循环的全过程;笔记4把工具系统讲了一遍,包括并发策略、MCP接入和参数校验。

但这几篇里有个话题一直没深讲:spawn 工具。在笔记4工具清单里它只出现了一行——“spawn 子代理工具”。这个工具背后藏着 nanobot 整个多 agent 协作的架构:主 agent 可以把任务拆出去让后台的子 agent 独立执行,子 agent 完成后再把结果”喂”回给主 agent,整个通信走一条统一的消息总线。

这篇把这套机制从头到尾讲清楚。


整体设计思路

在展开看代码之前,先把脑子里的模型建起来。

nanobot 的多 agent 架构有三个核心组件:

  • MessageBusnanobot/bus/queue.py):一条异步消息队列,所有进出 agent 的消息都走这里,包括后面子 agent 的结果回传也是。
  • SubagentManagernanobot/agent/subagent.py):子 agent 的生命周期管理器,负责 spawn、跟踪、取消。
  • SpawnToolnanobot/agent/tools/spawn.py):暴露给 LLM 的工具接口,LLM 调用它来触发子 agent。

这三个东西的关系很清楚:SpawnTool 是入口,SubagentManager 是执行引擎,MessageBus 是通信通道。


MessageBus:消息总线

先看最底层的 MessageBus。它的代码很短,但设计思路值得讲一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# nanobot/bus/queue.py
class MessageBus:
def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()

async def publish_inbound(self, msg: InboundMessage) -> None:
await self.inbound.put(msg)

async def consume_inbound(self) -> InboundMessage:
return await self.inbound.get()

async def publish_outbound(self, msg: OutboundMessage) -> None:
await self.outbound.put(msg)

async def consume_outbound(self) -> OutboundMessage:
return await self.outbound.get()

就两条队列,一进一出。inbound 是各个渠道(CLI、Telegram、Discord……)往 agent 发消息用的,outbound 是 agent 往外发响应用的。

对应的两个数据类也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# nanobot/bus/events.py
@dataclass
class InboundMessage:
channel: str # 来自哪个渠道,"telegram" / "cli" / "system"
sender_id: str # 发送者 ID
chat_id: str # 会话 ID
content: str # 消息正文
media: list[str] = ... # 附件 URL 列表
metadata: dict[str, Any] = ... # 渠道特有数据
session_key_override: str | None = None # 强制指定 session key

@property
def session_key(self) -> str:
return self.session_key_override or f"{self.channel}:{self.chat_id}"

@dataclass
class OutboundMessage:
channel: str
chat_id: str
content: str
reply_to: str | None = None
media: list[str] = ...
metadata: dict[str, Any] = ...

这个设计有一个好处:channel=”system” 的 InboundMessage 也走同一条队列。子 agent 完成任务后把结果包装成 channel="system" 的 InboundMessage,发到 inbound 队列,主 agent 的消息循环自然就会处理到它——不需要任何额外的通知机制,完全复用现有的消息处理路径。


SpawnTool:LLM 的发令枪

SpawnTool 是 LLM 用来启动子 agent 的工具,代码很薄:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# nanobot/agent/tools/spawn.py
@tool_parameters(
tool_parameters_schema(
task=StringSchema("The task for the subagent to complete"),
label=StringSchema("Optional short label for the task (for display)"),
required=["task"],
)
)
class SpawnTool(Tool):
def __init__(self, manager: "SubagentManager"):
self._manager = manager
self._origin_channel = "cli"
self._origin_chat_id = "direct"

def set_context(self, channel: str, chat_id: str) -> None:
"""记录当前消息来源,子agent结束后结果要回到这里"""
self._origin_channel = channel
self._origin_chat_id = chat_id
self._session_key = f"{channel}:{chat_id}"

async def execute(self, task: str, label: str | None = None, **kwargs) -> str:
return await self._manager.spawn(
task=task,
label=label,
origin_channel=self._origin_channel,
origin_chat_id=self._origin_chat_id,
session_key=self._session_key,
)

工具参数只有两个:task(子 agent 要做什么)和可选的 label(展示用的短标签)。set_context 这个方法是在每次处理消息时被 AgentLoop 调用的,确保子 agent 知道要把结果回传到哪个 channel + chat_id。

工具的 description 里还有一段引导语:

1
2
3
4
"Use this for complex or time-consuming tasks that can run independently.
The subagent will complete the task and report back when done.
For deliverables or existing projects, inspect the workspace first
and use a dedicated subdirectory when helpful."

这段描述直接影响 LLM 的行为:遇到耗时或复杂的独立任务时优先 spawn,不要阻塞主对话流。


从 spawn 工具调用说起:主流程

来看 LLM 调用 spawn 之后的完整流程。

spawn主流程

整个过程拆开看:

  1. 用户发消息进来,AgentLoop 从 MessageBus 消费,包装成异步任务开始处理
  2. _process_message 组装好历史消息,调 runner.run
  3. LLM 返回的 tool_calls 里包含 spawn(task="...", label="...")
  4. AgentLoop 执行 SpawnTool,SpawnTool 把参数转给 SubagentManager
  5. SubagentManager 用 asyncio.create_task 把子 agent 丢到后台,立刻返回一条确认文字
  6. LLM 拿到 spawn 的返回值,生成最终回复(比如”已在后台启动任务”)
  7. 主 agent 把响应推给用户——此时子 agent 还在后台跑着

这里关键点是非阻塞:spawn 调用本身不等子 agent 完成,主 agent 马上就能回复用户,子 agent 独立在后台执行。


SubagentManager:子 agent 的生命周期

SubagentManager 是管理子 agent 的核心,代码在 nanobot/agent/subagent.py。先看 spawn 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# nanobot/agent/subagent.py
class SubagentManager:
def __init__(self, provider, workspace, bus, ...):
self.runner = AgentRunner(provider)
self._running_tasks: dict[str, asyncio.Task[None]] = {}
# 按 session 分组跟踪,方便用 /stop 命令取消
self._session_tasks: dict[str, set[str]] = {}

async def spawn(
self,
task: str,
label: str | None = None,
origin_channel: str = "cli",
origin_chat_id: str = "direct",
session_key: str | None = None,
) -> str:
task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {"channel": origin_channel, "chat_id": origin_chat_id}

bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin)
)
self._running_tasks[task_id] = bg_task
if session_key:
self._session_tasks.setdefault(session_key, set()).add(task_id)

def _cleanup(_: asyncio.Task) -> None:
self._running_tasks.pop(task_id, None)
if session_key and (ids := self._session_tasks.get(session_key)):
ids.discard(task_id)
if not ids:
del self._session_tasks[session_key]

bg_task.add_done_callback(_cleanup)
return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."

task_id 取 UUID 的前8位,够短够唯一。_running_tasks 是个字典,key 是 task_id,方便查询和取消。_session_tasks 按 session 分组,这样 /stop 命令可以一次取消某个会话的所有子 agent。

add_done_callback 做清理,任务完成后把自己从两个字典里删掉。


_run_subagent:子 agent 的实际执行

子 agent 运行在 _run_subagent 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async def _run_subagent(
self,
task_id: str,
task: str,
label: str,
origin: dict[str, str],
) -> None:
try:
# 1. 构建工具集(注意:没有 spawn 和 message)
tools = ToolRegistry()
tools.register(ReadFileTool(...))
tools.register(WriteFileTool(...))
tools.register(EditFileTool(...))
tools.register(ListDirTool(...))
tools.register(GlobTool(...))
tools.register(GrepTool(...))
if self.exec_config.enable:
tools.register(ExecTool(...))
if self.web_config.enable:
tools.register(WebSearchTool(...))
tools.register(WebFetchTool(...))

# 2. 构建系统 prompt
system_prompt = self._build_subagent_prompt()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task},
]

# 3. 用 AgentRunner 跑起来
result = await self.runner.run(AgentRunSpec(
initial_messages=messages,
tools=tools,
model=self.model,
max_iterations=15,
max_tool_result_chars=self.max_tool_result_chars,
hook=_SubagentHook(task_id),
fail_on_tool_error=True,
))

# 4. 处理结果
if result.stop_reason in ("tool_error", "error"):
await self._announce_result(..., status="error")
return
await self._announce_result(..., status="ok")

except Exception as e:
await self._announce_result(..., status="error")

几个细节值得关注:

工具集裁剪:子 agent 没有 spawn 工具,所以子 agent 不能再 spawn 更多子 agent(不支持递归嵌套)。子 agent 也没有 message 工具,它不能直接给用户发消息,只能把结果交给主 agent 来转发。

独立系统 prompt:子 agent 用 subagent_system.md 模板构建自己的系统 prompt,和主 agent 完全隔离,没有主 agent 的记忆和历史。

1
2
3
4
5
6
# subagent_system.md
{{ time_ctx }}

You are a subagent spawned by the main agent to complete a specific task.
Stay focused on the assigned task. Your final response will be reported
back to the main agent.

简短而直接——告诉 LLM:你是子 agent,专注完成分配的任务,最终响应会被报告给主 agent。

max_iterations=15:子 agent 的最大迭代次数限制为 15,比主 agent 少一些,避免子 agent 失控地无限跑下去。

**_SubagentHook**:子 agent 有一个简单的 hook,只做日志记录——每次工具调用前打印 [task_id] executing: tool_name with arguments: ...,方便调试。

子 agent 不写 checkpoint,也不污染主 agent 的 session:这个细节容易被忽略,但很关键。仔细对比一下主 agent 和子 agent 传给 AgentRunSpec 的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 主 agent(loop.py 的 _run_agent_loop)
result = await self.runner.run(AgentRunSpec(
...
workspace=self.workspace,
session_key=session.key, # 例如 "cli:direct"
checkpoint_callback=_checkpoint, # 写入 session.metadata
...
))

# 子 agent(subagent.py 的 _run_subagent)
result = await self.runner.run(AgentRunSpec(
initial_messages=messages,
tools=tools,
model=self.model,
max_iterations=15,
...
# 注意:没有 session_key,没有 checkpoint_callback!
))

AgentRunner._emit_checkpoint 的实现如下:

1
2
3
4
async def _emit_checkpoint(self, spec: AgentRunSpec, payload: dict) -> None:
callback = spec.checkpoint_callback
if callback is not None: # 子 agent 这里是 None,直接跳过
await callback(payload)

checkpoint_callbackNone,所以子 agent 运行期间不会触发任何 checkpoint 写入,不会碰主 agent session 的 metadata主 agent 的崩溃恢复机制和子 agent 完全互不干扰。

另一个值得关注的地方是超大工具结果的磁盘落盘。当工具调用返回内容超过 max_tool_result_chars 限制时,runner 会把超出部分持久化到 .nanobot/tool-results/ 目录,路径结构是 bucket/tool_call_id.txt

1
2
3
# nanobot/utils/helpers.py
bucket = ensure_dir(root / safe_filename(session_key or "default"))
path = bucket / f"{safe_filename(tool_call_id)}.txt"

主 agent 的 bucket 按 session_key 区分(比如 cli_direct/),子 agent 因为没传 session_key,都落到 "default" 这个共用桶里。但并发安全靠 tool_call_id 保证:每个工具调用的 ID 来自 LLM 响应,各自独立,不同子 agent 的文件路径天然不同,互不干扰。


结果回传:子 agent 如何通知主 agent

子 agent 跑完之后,通过 _announce_result 把结果”注入”回系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def _announce_result(
self,
task_id: str,
label: str,
task: str,
result: str,
origin: dict[str, str],
status: str,
) -> None:
status_text = "completed successfully" if status == "ok" else "failed"

# 渲染 subagent_announce.md 模板
announce_content = render_template(
"agent/subagent_announce.md",
label=label,
status_text=status_text,
task=task,
result=result,
)

# 包装成 system 消息发进 inbound 队列
msg = InboundMessage(
channel="system",
sender_id="subagent",
chat_id=f"{origin['channel']}:{origin['chat_id']}",
content=announce_content,
)
await self.bus.publish_inbound(msg)

subagent_announce.md 模板内容:

1
2
3
4
5
6
7
8
9
[Subagent '{{ label }}' {{ status_text }}]

Task: {{ task }}

Result:
{{ result }}

Summarize this naturally for the user. Keep it brief (1-2 sentences).
Do not mention technical details like "subagent" or task IDs.

注意最后一句——不要向用户提及”subagent”或 task ID 这些技术细节。这是有意为之的:对用户来说,这件事就是 nanobot 帮你做完了,不需要知道背后是主 agent 还是子 agent 做的。

子agent执行与回传

整个回传流程的关键在于 chat_id 的构造:f"{origin['channel']}:{origin['chat_id']}"。这样主 agent 收到 system 消息后,从 chat_id 里解析出原始的 channel 和 chat_id,就能找到对应的 session 和最终推给用户的出口。


主 agent 如何处理子 agent 的结果

子 agent 把结果发进 inbound 队列后,主 agent 的消息循环正常消费到它,走进 _process_message。这里有个专门的分支处理 channel == "system" 的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# nanobot/agent/loop.py
async def _process_message(self, msg: InboundMessage, ...) -> OutboundMessage | None:
# system 消息:子agent的结果
if msg.channel == "system":
channel, chat_id = (
msg.chat_id.split(":", 1) if ":" in msg.chat_id else ("cli", msg.chat_id)
)
logger.info("Processing system message from {}", msg.sender_id)
key = f"{channel}:{chat_id}"
session = self.sessions.get_or_create(key)
# 崩溃恢复
if self._restore_runtime_checkpoint(session):
self.sessions.save(session)
if self._restore_pending_user_turn(session):
self.sessions.save(session)

session, pending = self.auto_compact.prepare_session(session, key)
await self.consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(channel, chat_id, ...)
history = session.get_history(max_messages=0)

# 注意这里:current_role="assistant"
current_role = "assistant" if msg.sender_id == "subagent" else "user"

messages = self.context.build_messages(
history=history,
current_message=msg.content,
channel=channel, chat_id=chat_id,
session_summary=pending,
current_role=current_role, # <-- 子agent的结果以 assistant 身份加入对话
)
final_content, _, all_msgs, _, _ = await self._run_agent_loop(messages, ...)
self._save_turn(session, all_msgs, 1 + len(history))
self._clear_runtime_checkpoint(session)
self.sessions.save(session)
return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content)

主agent处理system消息

这里有个微妙的设计:current_role

通常用户发的消息构建成 "role": "user" 的消息,主 agent 构建成 "role": "assistant"。但子 agent 的结果是以 "role": "assistant" 插进来的——因为子 agent 本质上是主 agent 派出去的”手”,它做完事情汇报结果,这件事从语义上更像是 assistant 在描述自己做了什么,而不是用户在问问题。

主 agent 拿到这个带有子 agent 结果的消息历史,再走一轮 runner.run,用自然语言把结果整合后回复给用户。这一轮里 LLM 受 subagent_announce.md 末尾那句指令引导,会把技术细节屏蔽掉,只给用户简洁的结果说明。


session 隔离与持久化

子 agent 和主 agent 共用同一个 session。回传的 system 消息里 chat_id = "cli:direct"(原始用户的 chat_id),所以 _process_message 找到的 session 跟用户对话用的是同一个,子 agent 的执行结果会被 _save_turn 正常写进这个 session 的消息历史里。

下次用户发消息时,历史里就有这段子 agent 的执行记录,LLM 可以根据上下文给出更连贯的回复。

session 文件是 JSONL 格式(sessions/cli_direct.jsonl),每次 sessions.save() 都全量重写——metadata 一行,然后每条消息一行:

1
2
3
4
5
6
7
{"_type": "metadata", "key": "cli:direct", "created_at": "...", "updated_at": "...", "metadata": {...}}
{"role": "user", "content": "帮我写个爬虫", "timestamp": "..."}
{"role": "assistant", "content": null, "tool_calls": [{"id": "call_xxx", "function": {"name": "spawn", "arguments": "{\"task\": \"...\"}"}}, ...], "timestamp": "..."}
{"role": "tool", "tool_call_id": "call_xxx", "name": "spawn", "content": "Subagent [爬虫任务] started (id: a1b2c3d4).", "timestamp": "..."}
{"role": "assistant", "content": "已在后台启动任务,完成后通知你。", "timestamp": "..."}
{"role": "assistant", "content": "[Subagent '爬虫任务' completed successfully]\n\nTask: ...\n\nResult: ...", "timestamp": "..."}
{"role": "assistant", "content": "爬虫脚本已写好,保存在 workspace/crawler.py。", "timestamp": "..."}

注意:子 agent 的回传结果以 "role": "assistant" 存进 session,这样历史记录是完整的,下次能正确还原整个对话脉络。


mid-turn 消息注入与多 agent 的关系

mid-turn 注入机制在笔记3里已经详细讲过(_pending_queuesinjection_callback_try_drain_injections),这里不重复。只说一个和多 agent 直接相关的点:

子 agent 完成后推进来的 system 消息,走的是主 agent 主循环而不是 mid-turn 注入。主循环 run()asyncio.create_task 分发,不阻塞,所以它能同时:一边处理用户消息的 _dispatch,一边等待并消费子 agent 推进来的 system 消息,触发第二个 _dispatch。这是两个独立的 asyncio task,和 mid-turn 注入(同一 task 内追加消息)是不同的机制。


session 并发控制

多 agent 带来的另一个问题是并发:多个用户同时发消息、一个用户快速连发消息,怎么保证不乱套?

nanobot 的并发控制有两层:

1
2
3
4
5
6
# AgentLoop.__init__
self._session_locks: dict[str, asyncio.Lock] = {} # 同一 session 串行
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
self._concurrency_gate: asyncio.Semaphore | None = (
asyncio.Semaphore(_max) if _max > 0 else None
) # 全局并发上限

第一层:session 级别的 Lock。_session_locks 是按 session key 分配的 asyncio.Lock,同一个 session 的消息必须串行处理——不能同时有两个请求在跑同一个用户的对话。

第二层:全局信号量。_concurrency_gate 是一个全局 Semaphore,默认值 3(可以通过环境变量 NANOBOT_MAX_CONCURRENT_REQUESTS 调整,<=0 表示不限制)。这个限制的是跨 session 的整体并发量,防止瞬间涌入大量请求把系统压垮。

两层组合起来:

1
2
3
# _dispatch
async with lock, gate:
response = await self._process_message(msg, ...)

同一 session 串行(lock),不同 session 最多同时跑 N 个(gate)。这样在多用户场景下,每个用户的对话不会互相干扰,整体资源也不会失控。


子 agent 的取消

如果用户发 /stop 命令,主 agent 可以取消当前 session 的所有子 agent:

1
2
3
4
5
6
7
8
9
10
11
12
# nanobot/agent/subagent.py
async def cancel_by_session(self, session_key: str) -> int:
tasks = [
self._running_tasks[tid]
for tid in self._session_tasks.get(session_key, [])
if tid in self._running_tasks and not self._running_tasks[tid].done()
]
for t in tasks:
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
return len(tasks)

cancel() 会向协程注入 CancelledErrorasyncio.gather 等它们真的停下来(return_exceptions=True 让取消异常不往上抛)。返回值是实际取消的数量,命令层可以用来给用户一个反馈。

另外 get_running_count()get_running_count_by_session() 提供查询接口,可以在需要时检查有多少子 agent 正在运行。


完整的多 agent 通信流

讲到这里有一个问题还没交代清楚:用户最终会收到几条回复?什么时候收到? 要回答这个问题,必须先看 AgentLoop.run() 这个主循环是怎么工作的。

AgentLoop.run():永不停歇的消息泵

1
2
3
4
5
6
7
8
9
# nanobot/agent/loop.py
async def run(self) -> None:
self._running = True
while self._running:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
...
# 注意:create_task,不等待!
task = asyncio.create_task(self._dispatch(msg))
self._active_tasks.setdefault(effective_key, []).append(task)

关键在 asyncio.create_task——主循环不等 _dispatch 完成,创建好任务就立刻循环回去,继续 consume_inbound 等下一条消息。这意味着:

  • 处理用户消息的 _dispatch(msg1) 在后台异步跑
  • 主循环同时在等待 inbound 队列的下一条消息
  • 子 agent 完成后往 inbound 队列推的 system 消息,也会被同一个主循环消费到,再 create_task(_dispatch(system_msg))

CLI 侧:两个并行的 asyncio task

CLI 模式下启动时跑了两个独立的 asyncio task:

1
2
3
# nanobot/cli/commands.py - run_interactive()
bus_task = asyncio.create_task(agent_loop.run()) # 消费 inbound,驱动 agent
outbound_task = asyncio.create_task(_consume_outbound()) # 消费 outbound,打印给用户

_consume_outbound 负责把 outbound 队列里的消息打印给用户。但 CLI 是交互式的,用户发一条消息、等一条回复,提示符不能一直占着。所以这里用了一个 turn_done Event 来管控输入提示符的节奏:

  • 用户按回车发消息时,turn_done.clear(),随后 await turn_done.wait() 阻塞,提示符消失、等待回复
  • _consume_outbound 收到第一条实质性回复时,turn_done.set(),提示符回来,用户可以继续输入
  • 此后再收到的消息(即子 agent 异步推送的结果)turn_done 已经是 set 状态,直接打印,不影响提示符
1
2
3
4
5
6
7
8
9
10
11
12
13
async def _consume_outbound():
while True:
msg = await bus.consume_outbound()
...
if not turn_done.is_set():
# 第一条回复:主 agent 说"任务已在后台启动"
# 把提示符还给用户
turn_response.append((msg.content, ...))
turn_done.set()
elif msg.content:
# 后续异步推送:子 agent 完成后的结果
# 直接打印,不需要等待任何 event
await _print_interactive_response(msg.content, ...)

用户确实会收到两条回复:第一条是主 agent 立刻给出的”任务已在后台启动”,第二条是子 agent 完成后异步推送的结果——两条消息之间可能隔几秒甚至几分钟。

完整的通信时序

用一张图把整条链路串起来:

完整多agent通信流

图里最关键的两个地方:

  1. AgentLoop 主循环用 create_task 分发,不阻塞:处理用户消息的 _dispatch 在后台跑,主循环立刻回去 consume_inbound 等下一条。所以子 agent 完成后推进来的 system 消息,能被同一个主循环消费到,再触发第二个 _dispatch

  2. turn_done 的状态决定 CLI 怎么处理这条 outbound 消息:用户等待期间收到的是”直接回复”,拿到之后提示符恢复;此后再收到的是”异步推送”,直接打印出来,不影响用户继续输入。

整条链路都是异步的、松耦合的:子 agent 不直接调主 agent 的方法,主 agent 也不阻塞等待子 agent,所有通信都通过 MessageBus 的队列传递,主循环永远是那个消息泵。


小结

这篇把 nanobot 多 agent 的实现从头到尾走了一遍。

通信全靠 MessageBus 一条队列。子 agent 完成任务后不走回调、不走单独通道,而是构造 channel="system" 的 InboundMessage 发回 inbound 队列,主循环像处理普通消息一样消费到它,再起一个新的 _dispatch task——整个多 agent 协作完全复用单 agent 的消息处理路径。

子 agent 的边界明确。没有 spawn 工具所以不能递归嵌套,没有 message 工具所以不能绕过主 agent 直接推消息给用户,checkpoint_callbacksession_key 都不传所以运行期间不碰主 agent 的 session 和崩溃恢复状态。子 agent 是一把受控的手,做完事情汇报结果。

用户会收到两条独立回复。第一条是 spawn 调用完成后主 agent 立刻回的”任务已启动”,第二条是子 agent 完成后经主循环再次处理推送过来的结果——两条消息可能相差几秒到几分钟,CLI 侧用 turn_done Event 区分这两种情况,第一条回来提示符恢复,第二条异步打印不打断用户输入。

子 agent 的结果以 “assistant” 角色存进 session,保证历史记录完整,主 agent 下次处理请求时能看到完整上下文。