nanobot源码学习(8)-heartbeat

上篇回顾

第 7 篇把 cron 模块讲完了——有显式注册、持久化、精确时间表的定时任务体系。这篇聊另一个机制:heartbeat。两者在功能上各有侧重,代码里也是完全独立的两个模块,但在 gateway 模式下同时运行,形成互补。


1. heartbeat 是什么,为什么不用 cron 代替

心跳(heartbeat)在分布式系统里通常指”我还活着”的探活信号。nanobot 的 heartbeat 借用了这个名字,但语义完全不同:它是 bot 主动唤醒、自我检视任务的机制

具体行为:每隔固定间隔(默认 30 分钟),系统自动唤醒一次,读取工作区里的 HEARTBEAT.md 文件,用 LLM 判断”有没有任务需要处理”,有的话就跑一次完整的 agent 循环,完成后视情况推送结果给用户。


1.1 cron 能不能做同样的事?

看完 cron(第 7 篇)再看 heartbeat,很容易有这个疑问:cron 的 every 类型也可以定时执行任务,heartbeat 不就是一个”固定间隔的 cron job”吗?

理论上可以用 cron 模拟:

1
2
3
4
CronJob(
schedule=CronSchedule(kind="every", every_ms=30*60*1000),
payload=CronPayload(message="检查 TODO 列表,有任务就处理"),
)

但这样做有一个根本性的问题:cron 表达式只能描述时间,不能描述条件

举个具体例子:你想让 bot 在下班后(下午 6 点之后)才帮你检查工作邮件,有紧急邮件就通知你。

用 cron 的做法:

你可以创建一个每 30 分钟跑一次的 job,但 cron 不知道”现在是不是下班时间”,它只会无差别地触发。要让 agent 判断时效,只能把逻辑写进 payload:

1
"如果当前时间是下午 6 点到晚上 10 点之间,检查工作邮件;否则跳过。"

问题来了:你有多个这种条件模糊的任务——“工作日才提醒”、”上次结果是失败才重试”、”只在服务器负载低的时候跑”——每个任务都要把时间判断逻辑硬塞进 payload,每个 job 独立运行,互相不知道对方存在,无法统一排优先级。而且这些 job 最终都注册在 cron/jobs.json 里,作为独立实体管理,增删都需要调工具。

用 heartbeat 的做法:

HEARTBEAT.md 里直接写:

1
2
3
4
5
## Active Tasks

- 工作日下班后(18:00+)检查工作邮件,有紧急邮件通知我
- 只在周一早上汇总上周的 GitHub issue 情况
- 如果上次部署失败,每隔半小时重试一次,成功后删掉这一行

LLM 每次醒来时知道当前时间,读完文件后自己推理:今天是周三下午 3 点,第一条”现在不是下班时间跳过”,第二条”今天不是周一跳过”,第三条”上次部署状态是什么”——全部条件在 LLM 的推理中自然处理,不需要写任何额外逻辑。


1.2 两者的本质区别

差异的核心在于 **”谁来决定是否执行”**:

维度 heartbeat cron
谁决定是否执行 LLM(每次读文件后推理,条件是自然语言) 时钟(到时间就触发)
触发条件能表达什么 任意自然语言条件(时间、状态、上下文) 只能表达时间(几点/多久一次)
多任务关系 同一文件,LLM 可以综合判断优先级 每个 job 独立调度,互不知情
任务来源 HEARTBEAT.md,直接编辑文件 显式注册 CronJob,调用工具
持久化 无(重启重新计时),任务本身存文件里 有,存 cron/jobs.json

简单记:

  • 触发条件是时间点的 → cron:明天早上 9 点提醒我开会、每天凌晨 2 点跑数据清理
  • 触发条件是模糊状态或上下文的 → heartbeat:工作日下班后才检查邮件、只在上次失败时重试、服务出问题了就提醒我

heartbeat 的核心价值是:用 LLM 的推理能力替代 cron 表达式,让”什么条件下执行”可以用自然语言来描述。这是 cron 的时钟模型从设计上就无法做到的事。


2. HEARTBEAT.md:任务清单文件

heartbeat 的设计有个很有意思的地方:任务不是在代码里写死的,而是放在一个普通的 Markdown 文件里。文件路径固定:

1
~/.nanobot/workspace/HEARTBEAT.md

文件由 nanobot onboard 命令自动创建,初始内容来自 nanobot/templates/HEARTBEAT.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Heartbeat Tasks

This file is checked every 30 minutes by your nanobot agent.
Add tasks below that you want the agent to work on periodically.

If this file has no tasks (only headers and comments), the agent will skip the heartbeat.

## Active Tasks

<!-- Add your periodic tasks below this line -->

## Completed

<!-- Move completed tasks here or delete them -->

实际使用时怎么写?## Active Tasks 下面加任务就行,格式自由,LLM 能读懂就行。比如:

1
2
3
4
5
## Active Tasks

- [ ] 检查 GitHub 上有没有新的 issue 需要回复
- [ ] 如果 staging 环境异常,发消息告诉我
- 每次唤醒时看一眼天气,如果明天下雨提醒我带伞

也可以直接跟 agent 说”帮我添加一个周期任务:每次检查一下服务器状态”,agent 会用 edit_file 工具把任务写到这个文件里(AGENTS.md 里有指导:HEARTBEAT.md is checked on the configured heartbeat interval. Use file tools to manage periodic tasks)。

注意:如果文件里只有标题和注释、没有实际任务,heartbeat 会被 LLM 判断为 skip,不会执行任何操作。这是有意为之的设计——避免空跑浪费 token。


3. 模块架构:三阶段流水线

heartbeat 的核心实现只有一个文件:nanobot/heartbeat/service.py,188 行。代码量不大,但设计上分了清晰的三个阶段:

1
2
3
Phase 1: 决策   → _decide()          → LLM判断是否有任务 (skip/run)
Phase 2: 执行 → on_execute() → AgentLoop完整执行
Phase 3: 评估 → evaluate_response() → LLM判断是否需要通知用户

整体流程如下图:

heartbeat三阶段时序图

这个三阶段设计解决了一个现实问题:heartbeat 是无人值守的后台任务,不能随便打扰用户

  • Phase 1 负责”要不要动”:有时 HEARTBEAT.md 里虽然有任务,但时间还没到(比如”每周一检查”,今天周三)。让 LLM 读文件后判断,比写硬编码的解析逻辑灵活得多。
  • Phase 2 负责”怎么做”:直接复用 AgentLoop 的全部能力,跟用户正常对话走完全一样的链路。
  • Phase 3 负责”要不要说”:agent 的回复可能是”一切正常”这种废话,这种情况不必打扰用户;但如果发现了异常,必须通知。让 LLM 来判断比硬编码规则更准确。

4. Phase 1:虚拟工具调用决策

_decide() 是整个设计里最有意思的地方,代码不长,值得仔细看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
_HEARTBEAT_TOOL = [
{
"type": "function",
"function": {
"name": "heartbeat",
"description": "Report heartbeat decision after reviewing tasks.",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["skip", "run"],
"description": "skip = nothing to do, run = has active tasks",
},
"tasks": {
"type": "string",
"description": "Natural-language summary of active tasks (required for run)",
},
},
"required": ["action"],
},
},
}
]

这个 _HEARTBEAT_TOOL 是一个”虚拟工具”——它只是一个结构体,定义了 LLM 应该如何回传决策,系统并不会真的执行什么函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def _decide(self, content: str) -> tuple[str, str]:
from nanobot.utils.helpers import current_time_str

response = await self.provider.chat_with_retry(
messages=[
{"role": "system", "content": "You are a heartbeat agent. Call the heartbeat tool to report your decision."},
{"role": "user", "content": (
f"Current Time: {current_time_str(self.timezone)}\n\n"
"Review the following HEARTBEAT.md and decide whether there are active tasks.\n\n"
f"{content}"
)},
],
tools=_HEARTBEAT_TOOL,
model=self.model,
)

if not response.has_tool_calls:
return "skip", ""

args = response.tool_calls[0].arguments
return args.get("action", "skip"), args.get("tasks", "")

几个细节值得注意:

1. 为什么用工具调用而不是解析自由文本?

注释里写得很直白:avoids free-text parsing and the unreliable HEARTBEAT_OK token。早期版本可能是让 LLM 返回一个特殊 token(比如 HEARTBEAT_OK),再用字符串匹配判断。但这种方法太脆弱——LLM 可能回”好的,HEARTBEAT_OK”,也可能回”我决定 HEARTBEAT_OK”,正则写起来永远有漏网之鱼。改成 tool call 之后,结构完全由 schema 约束,action 只能是 "skip""run",省去了一大堆防御性判断。

2. 为什么要传入当前时间?

Current Time: {current_time_str(self.timezone)}——这是为了让 LLM 能判断时效性。比如任务写了”每周一早上检查”,LLM 知道今天是周四,就会返回 skip;如果写了”今天下班前提醒我整理邮件”,LLM 知道现在是下午 5 点,就会返回 run。这个时间戳传入的意义不是记录执行时间,而是给 LLM 一个判断任务是否”当下有效”的锚点。

3. _decide 是无状态的裸 LLM 调用,不带任何历史

注意 messages 列表只有两条:一条 system、一条 user,每次都是全新构造的。没有 session key,没有历史对话,直接调 provider.chat_with_retry,完全绕开了 AgentLoop。

这是刻意的设计——Phase 1 只需要”当前时间 + HEARTBEAT.md 内容”就能做出判断,带历史上下文没有意义,反而浪费 token。

有历史上下文的是 Phase 2:on_execute 里调用的是 agent.process_direct(tasks, session_key="heartbeat"),这才会读取 heartbeat session 保留的最近 8 条历史记录。两个阶段的上下文策略截然不同:

阶段 历史上下文 原因
Phase 1(_decide ,每次新建对话 只需当前状态判断,无需上文
Phase 2(on_execute heartbeat session 最近 8 条 执行任务时可能需要知道”上次结果”

4. 如果 LLM 没有调用工具怎么办?

直接返回 ("skip", ""),安全降级——宁可漏掉一次任务,也不要因为解析失败造成错误执行。


5. Phase 2:完整 agent 执行

Phase 1 返回 run 之后,调用 on_execute 回调。这个回调不是在 HeartbeatService 里定义的,而是在 cli/commands.pygateway 函数里注入的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def on_heartbeat_execute(tasks: str) -> str:
"""Phase 2: execute heartbeat tasks through the full agent loop."""
channel, chat_id = _pick_heartbeat_target()

async def _silent(*_args, **_kwargs):
pass

resp = await agent.process_direct(
tasks,
session_key="heartbeat",
channel=channel,
chat_id=chat_id,
on_progress=_silent,
)

# Keep a small tail of heartbeat history so the loop stays bounded
# without losing all short-term context between runs.
session = agent.sessions.get_or_create("heartbeat")
session.retain_recent_legal_suffix(hb_cfg.keep_recent_messages)
agent.sessions.save(session)

return resp.content if resp else ""

几个设计点:

**session_key="heartbeat"**:heartbeat 有自己独立的 session,不会和用户正常对话的历史混在一起。同时 key 是固定字符串,所以多次 heartbeat 执行之间共享上下文(LLM 知道”上次检查的结果是什么”)。

**on_progress=_silent**:进度回调被设置成空函数。heartbeat 是后台任务,中间的流式输出不需要实时推送给用户,只有最终结果才经过 Phase 3 评估后选择性推送。

session 截断:执行完之后,立即调用 retain_recent_legal_suffix(hb_cfg.keep_recent_messages) 截断历史。keep_recent_messages 默认是 8,意思是只保留最近 8 条消息。

为什么要截断?因为 heartbeat 每隔 30 分钟就跑一次,如果不截断,历史会无限增长,context window 会爆。但也不能全清,保留最近几条是为了让 LLM 在下次执行时还记得”上次发现了什么”。这是一个刻意的 trade-off——以少量上下文换取内存边界可控

**_pick_heartbeat_target()**:决定把结果推送到哪个 channel。逻辑是遍历所有已有 session,跳过 clisystem 这类内部 channel,优先选最近活跃的外部渠道(飞书、微信等)。如果没有外部渠道,fallback 到 ("cli", "direct")

1
2
3
4
5
6
7
8
9
10
11
12
def _pick_heartbeat_target() -> tuple[str, str]:
enabled = set(channels.enabled_channels)
for item in session_manager.list_sessions():
key = item.get("key") or ""
if ":" not in key:
continue
channel, chat_id = key.split(":", 1)
if channel in {"cli", "system"}:
continue
if channel in enabled and chat_id:
return channel, chat_id
return "cli", "direct"

6. Phase 3:评估结果,决定是否推送

Phase 2 拿到 agent 的回复之后,不会直接推送,而是先经过 evaluate_response

1
2
3
4
5
6
7
8
9
if response:
should_notify = await evaluate_response(
response, tasks, self.provider, self.model,
)
if should_notify and self.on_notify:
logger.info("Heartbeat: completed, delivering response")
await self.on_notify(response)
else:
logger.info("Heartbeat: silenced by post-run evaluation")

evaluate_responsenanobot/utils/evaluator.py 里,同样用工具调用模式,走的裸大模型一次调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
_EVALUATE_TOOL = [
{
"type": "function",
"function": {
"name": "evaluate_notification",
"description": "Decide whether the user should be notified about this background task result.",
"parameters": {
"type": "object",
"properties": {
"should_notify": {
"type": "boolean",
"description": "true = result contains actionable/important info the user should see; false = routine or empty, safe to suppress",
},
"reason": {
"type": "string",
"description": "One-sentence reason for the decision",
},
},
"required": ["should_notify"],
},
},
}
]

提示词在 nanobot/templates/agent/evaluator.md

1
2
3
4
5
6
7
You are a notification gate for a background agent. You will be given the original task and the agent's response. Call the evaluate_notification tool to decide whether the user should be notified.

Notify when the response contains actionable information, errors, completed deliverables, scheduled reminder/timer completions, or anything the user explicitly asked to be reminded about.

A user-scheduled reminder should usually notify even when the response is brief or mostly repeats the original reminder.

Suppress when the response is a routine status check with nothing new, a confirmation that everything is normal, or essentially empty.

规则非常直白:

  • 要通知:有可操作信息、有报错、有完成的任务、用户明确要求的提醒
  • 要静默:例行检查没发现问题、”一切正常”类的回复、内容基本为空

失败时默认通知(return True),这是一个”宁可多打扰一次,也不能丢失重要消息”的容错策略。

evaluate_response 同时被 heartbeat 和 cron 使用——cron 里的任务执行后也会走这个评估门,两者复用了相同的逻辑(详见第 7 篇 on_cron_job 里的相关代码)。


7. 循环控制:start/stop

心跳循环的控制逻辑很简单,看 _run_loop 就够了:

1
2
3
4
5
6
7
8
9
10
11
async def _run_loop(self) -> None:
"""Main heartbeat loop."""
while self._running:
try:
await asyncio.sleep(self.interval_s)
if self._running:
await self._tick()
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Heartbeat error: {}", e)

注意这里 sleep_tick 之前:先等,再执行。这意味着 gateway 启动后的第一次心跳要等满一个周期(30 分钟)才会触发,不是启动就立刻跑。这是有意为之——避免启动时 LLM provider 还没完全初始化就发起请求,以及避免每次重启都触发一次任务。

启动和停止的逻辑:

1
2
3
4
5
6
7
8
9
10
async def start(self) -> None:
...
self._running = True
self._task = asyncio.create_task(self._run_loop())

def stop(self) -> None:
self._running = False
if self._task:
self._task.cancel()
self._task = None

8. 配置项

HeartbeatConfignanobot/config/schema.py

1
2
3
4
5
6
class HeartbeatConfig(Base):
"""Heartbeat service configuration."""

enabled: bool = True
interval_s: int = 30 * 60 # 30 minutes
keep_recent_messages: int = 8

三个字段:

  • enabled:可以直接关掉 heartbeat,比如开发调试时不想被后台任务打扰
  • interval_s:唤醒间隔,默认 1800 秒(30 分钟)。如果任务时效性要求高,可以调小(比如 300 = 5 分钟);反之调大
  • keep_recent_messages:heartbeat session 的历史保留条数,默认 8 条

配置在 nanobot.yamlgateway.heartbeat 节点下:

1
2
3
4
5
gateway:
heartbeat:
enabled: true
interval_s: 1800
keep_recent_messages: 8

在 gateway 启动时,这些配置会在 cli/commands.py 里被读取并传给 HeartbeatService

1
2
3
4
5
6
7
8
9
10
11
hb_cfg = config.gateway.heartbeat
heartbeat = HeartbeatService(
workspace=config.workspace_path,
provider=provider,
model=agent.model,
on_execute=on_heartbeat_execute,
on_notify=on_heartbeat_notify,
interval_s=hb_cfg.interval_s,
enabled=hb_cfg.enabled,
timezone=config.agents.defaults.timezone,
)

9. 整体串联:gateway 里的初始化顺序

gateway 启动时,heartbeat 和 cron、channel 等组件是并行初始化的,最后统一启动。完整顺序大致如下(简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 1. 基础组件
bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)

# 2. 核心 agent
agent = AgentLoop(bus=bus, provider=provider, ...)

# 3. cron(需要 agent 来执行任务)
cron = CronService(cron_store_path)
cron.on_job = on_cron_job # 回调里用到了 agent

# 4. channel(消息路由层)
channels = ChannelManager(config, bus)

# 5. heartbeat(需要 agent 和 channel 来执行和推送)
heartbeat = HeartbeatService(
on_execute=on_heartbeat_execute, # 用到 agent
on_notify=on_heartbeat_notify, # 用到 bus/channel
...
)

# 6. 统一启动(进入事件循环)
await asyncio.gather(
cron.start(),
heartbeat.start(),
channels.start(),
...
)

heartbeat 被放在最后初始化是因为它依赖 agent(执行任务)和 channel(推送结果),必须等这两个组件就绪之后再创建。


10. 小结

heartbeat 是 nanobot 里最轻量的一个后台服务,代码不到 200 行,但设计上有几个值得记住的点:

  1. 用工具调用代替自由文本解析:Phase 1 和 Phase 3 都采用”定义一个只返回结构化数据的虚拟工具”的方式,从根本上避免了 LLM 输出格式不稳定的问题。
  2. 三阶段分离关注点:决策(要不要执行)/ 执行/ 评估(值不值得打扰用户),每个阶段职责单一、独立。
  3. session 边界主动截断:heartbeat 的历史不自然增长,而是在每次执行后截断到固定大小,以可控的代价换取长期运行的稳定性。
  4. 配置化但有合理默认值:30 分钟间隔、8 条历史,大多数场景不需要改,真有需求也能调。
  5. 失败时保守降级:Phase 1 无工具调用 → skip;Phase 3 异常 → 默认通知。两个方向的错误处理策略不同,反映了”宁可漏跑一次,不能漏推重要通知”的优先级。