nanobot源码学习(7)-cron
上篇回顾
上一篇讲了多 channel 接入层,ChannelManager 把飞书、微信等渠道的消息统一桥接到 MessageBus。但有一类场景完全没有涉及:无用户消息触发,bot 主动执行任务。提醒、定期报告、自动化清理——这些都属于定时任务,nanobot 用 cron 模块来实现。
1. cron 是什么
nanobot 的 cron 和传统 cron 最大的区别是:每个 job 的载荷不是一段代码,而是一条发给 agent 的消息。job 触发时,系统拼一条系统提示词塞进 AgentLoop.process_direct(),让 agent 去处理。定时任务和普通对话走完全相同的 LLM 调用链,复用全部工具能力,保留对话历史。
典型场景:
- 提醒:每天 9 点提醒站起来喝水
- 定期报告:每小时汇总系统状态发到飞书群
- 自动化任务:每天凌晨 2 点跑数据清理并通知
还有一类是系统内部任务:Dream 记忆整理(第1篇)通过 cron 定期触发,详见第8节。
cron vs heartbeat
nanobot 另有一个 heartbeat 模块,两者容易混淆:
- heartbeat:固定间隔唤醒,先用 LLM 读取
HEARTBEAT.md决策”有没有任务要做”(skip/run),再执行。无 job 概念,不持久化,系统内部机制 - cron:显式注册的任务,有精确时间表(支持 cron 表达式)、执行历史、持久化存储;用户和系统均可注册
gateway 模式下二者同时运行,互不干扰。
2. 类型系统
代码在 nanobot/cron/types.py,共 5 个 dataclass。
2.1 CronSchedule:触发时机
1 |
|
| kind | 含义 | 示例 |
|---|---|---|
at |
绝对时间点,执行一次后禁用或删除 | at_ms=1740000000000 |
every |
固定间隔循环 | every_ms=3600000(每小时) |
cron |
标准 5 段 cron 表达式 | expr="0 9 * * *"(每天9点) |
tz 仅对 cron 模式有效,不指定时使用工具层传入的默认时区。
2.2 CronPayload:触发后做什么
1 |
|
kind 决定执行路径:agent_turn 走完整的 agent 对话链;system_event 直接调用内部方法(Dream 使用此类型,不经过 LLM)。deliver=True 时,agent 的回复会通过 channel/to 推送给用户。
2.3 CronJobState:运行状态
1 |
|
每次执行后追加一条 CronRunRecord(记录时间、状态、耗时、错误),滚动保留最近 20 条(_MAX_RUN_HISTORY = 20)。
2.4 CronJob
1 |
|
2.5 CronStore
1 |
|
jobs.json 的内存映射,版本号留作未来迁移用。
3. CronService 架构
3.1 存储文件
1 | {workspace}/cron/jobs.json # 主存储:所有 job 的完整状态 |
jobs.json 是 CronStore 的完整序列化,包含所有 job 的 schedule、payload、state(含执行历史):
1 | { |
action.jsonl 是追加写入的变更日志,每行一个操作(add/update/del),用于多进程安全和离线注册:
1 | {"action": "add", "params": {"id": "b7d2e5a1", "name": "每周报告", "schedule": {"kind": "cron", "expr": "0 10 * * 1"}, ...}} |
两个文件的协作方式:平时只写 jobs.json;当服务未运行或有外部进程需要变更 job 时,追加写入 action.jsonl;服务启动或下一轮 _on_timer() 开始时,将 action.jsonl 重放合并进 jobs.json,然后清空 action.jsonl。详见第5节。
3.2 初始化参数
1 | class CronService: |
on_job:job 触发时的回调,由 gateway 注入;CronService本身不感知 agent 的存在max_sleep_ms:无 job 时定时器的最长 sleep 时间,防止外部进程新增 job 后长时间未被发现
3.3 两个状态标志
1 | self._running = False # 服务是否已 start() |
4. 调度引擎:定时器驱动循环
4.1 start()
1 | async def start(self) -> None: |
4.2 _arm_timer()
1 | def _arm_timer(self) -> None: |
取所有 enabled job 中最近的 next_run_at_ms,sleep 对应时长后触发 _on_timer()。_on_timer() 结束后再次调用 _arm_timer(),形成自驱动循环。max_sleep_ms 保证即使无 job,定时器也会定期醒来扫描外部变更。
4.3 _on_timer():核心调度
1 | async def _on_timer(self) -> None: |
4.4 三种调度模式的计算
1 | def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None: |
at:时间点过了返回None,不再触发every:每次执行后重算,以执行完成时刻为基准,避免执行耗时导致的时间漂移累积cron:用 croniter 解析表达式,ZoneInfo处理时区,夏令时切换时行为正确
4.5 _execute_job()
1 | async def _execute_job(self, job: CronJob) -> None: |
无论成功失败都记录执行历史。at 类型执行完后,delete_after_run=True 则从 store 删除,否则置 enabled=False,确保不再触发。
5. action.jsonl:离线写入机制
5.1 什么是”离线”
这里的”离线”特指 CronService._running == False 的情况,有两种场景:
- gateway 未启动:用户在
nanobot agent交互模式下创建了 cron job,此时没有 gateway 进程运行,没有任何CronService实例处于 running 状态 - 外部进程操作:gateway 已经运行,但另一个进程(如
nanobot agent)也实例化了一个CronService对象来操作 job——这个外部实例从未调用过start(),对它自己而言_running=False
两种情况下直接写 jobs.json 都不安全,gateway 的 CronService 实例随时可能在写同一个文件。
5.2 类 WAL 机制
解决方案是 action.jsonl:
- 离线时:把操作(add/del/update)追加写入
action.jsonl——append-only,原子性强 - 合并时机:**
start()启动时,以及每次_on_timer()开始时**(每轮定时器唤醒都会调用_load_store(),其中包含_merge_action())
_running 决定走哪条路:
1 | def add_job(self, ...) -> CronJob: |
remove_job()、enable_job()、update_job() 遵循相同模式。
5.3 _merge_action()
1 | def _merge_action(self): |
以 {id: job} map 重放所有操作,保证幂等。FileLock 保护读-处理-清空的整个临界区,防止并发 append 与清空之间的竞态。
6. 用户如何添加 cron job
用户有两条路径注册 job,最终都走到 CronService.add_job()。
6.1 自然语言对话(最常用)
在任意渠道(飞书、微信、CLI 等)直接告诉 bot:
1 | 每天早上 9 点提醒我站起来活动 |
agent 的 LLM 会将自然语言解析为 CronTool 的工具调用,完整流程见第7节。这是普通用户使用 cron 的主要方式,不需要了解任何底层细节。
6.2 代码直接调用
开发者或系统内部可以跳过 LLM,直接构造 CronSchedule 调用 CronService.add_job():
1 | from nanobot.cron.service import CronService |
注意:若此时 gateway 未运行(_running=False),add_job() 会把操作写入 action.jsonl,等 gateway 启动时自动合并生效。
6.3 系统内部注册(register_system_job)
Dream 等系统任务通过 register_system_job() 注册,幂等,每次 gateway 重启都会重新注册并重算执行时间。此类 job 的 payload.kind="system_event",用户可见但不可删除(protected)。详见第8节。
7. CronTool:LLM 的调度接口
CronService 是纯调度引擎,不关心 job 来源。用户用自然语言说”提醒我明天开会”时,经由 CronTool 完成注册。
1 |
|
7.1 上下文绑定
每次 agent 处理消息时,通过 set_context() 注入当前 channel 和 chat_id:
1 | def set_context(self, channel: str, chat_id: str) -> None: |
创建 job 时这两个值存入 CronPayload,触发时推送到注册 job 时所在的会话。
7.2 防递归注册:_in_cron_context
cron job 执行中,agent 可能再次调用 cron(action=add, ...),导致 job 在运行时不断自我复制。用 ContextVar 标记当前执行上下文:
1 | self._in_cron_context: ContextVar[bool] = ContextVar("cron_in_context", default=False) |
gateway 的 on_cron_job 回调设置标记:
1 | cron_token = cron_tool.set_cron_context(True) |
execute() 里检查:
1 | if action == "add" and self._in_cron_context.get(): |
7.3 deliver 与智能推送
job 触发后 agent 生成回复,deliver=True 时通过 evaluate_response() 再调一次 LLM 判断是否值得推送:
1 | if job.payload.deliver and job.payload.to and response: |
对于”检查系统状态,一切正常”这类没有实质内容的回复,evaluate_response() 会返回 False,不触发推送。
完整的 LLM 调用 → 创建 job → 触发执行 → deliver 流程:
8. Dream:系统任务注册
第1篇讲的 Dream 记忆整理系统,其定期触发正是通过 cron 实现的。gateway 启动时:
1 | cron.register_system_job(CronJob( |
on_cron_job 回调中按 job 名特判:
1 | async def on_cron_job(job: CronJob) -> str | None: |
system_event 类型不经 LLM 对话,不占用 session;用户执行 cron(action=list) 时能看到 dream job 但无法删除(protected)。
cron 是 nanobot 所有定时自动化的统一基础设施,用户提醒和系统内部任务走同一套调度,仅执行路径不同。
gateway 启动初始化完整流程:
9. 整体调用链
三条线并行支撑 cron 系统的运转:
线1:用户对话注册
用户发消息 → LLM 识别意图 → CronTool.execute(action=add) → CronService.add_job() → 写 jobs.json 并重算定时器
线2:定时器自驱动执行_arm_timer() sleep → _on_timer() 扫描到期 job → _execute_job() → on_cron_job 回调 → process_direct() / dream.run() → 按需 bus.publish_outbound() 推送用户
线3:多进程离线写入
外部进程 add_job() → 写 action.jsonl → 下一轮 _on_timer() 触发 _load_store() → _merge_action() 合并 → 清空 action.jsonl → 写回 jobs.json
三条线通过 jobs.json 和 action.jsonl 共享状态,FileLock 保护并发写,CronService 内部只有一个 asyncio 定时器驱动整个调度循环。
小结
nanobot 的 cron 模块做了一件值得关注的事:把调度引擎和执行载荷彻底解耦。引擎只管”什么时候触发”,触发后发生什么完全由外部注入的 on_job 回调决定——用户任务走 agent 对话,系统任务直接调内部接口,两者共用一套调度机制。
这个设计的另一个体现是 action.jsonl。传统上一个本地调度器要么只允许单进程操作,要么需要数据库来协调并发写入。nanobot 用一个 append-only 的日志文件加上定期 merge 就解决了这个问题,足够简单,也足够可靠。
在 nanobot 的整体架构里,cron它既承载用户通过自然语言注册的提醒任务,也承载 Dream 记忆整理这类系统级自动化,是”主动型 agent”能力的基础设施。