nanobot源码学习(7)-cron

上篇回顾

上一篇讲了多 channel 接入层,ChannelManager 把飞书、微信等渠道的消息统一桥接到 MessageBus。但有一类场景完全没有涉及:无用户消息触发,bot 主动执行任务。提醒、定期报告、自动化清理——这些都属于定时任务,nanobot 用 cron 模块来实现。


1. cron 是什么

nanobot 的 cron 和传统 cron 最大的区别是:每个 job 的载荷不是一段代码,而是一条发给 agent 的消息。job 触发时,系统拼一条系统提示词塞进 AgentLoop.process_direct(),让 agent 去处理。定时任务和普通对话走完全相同的 LLM 调用链,复用全部工具能力,保留对话历史。

典型场景:

  • 提醒:每天 9 点提醒站起来喝水
  • 定期报告:每小时汇总系统状态发到飞书群
  • 自动化任务:每天凌晨 2 点跑数据清理并通知

还有一类是系统内部任务:Dream 记忆整理(第1篇)通过 cron 定期触发,详见第8节。

cron vs heartbeat

nanobot 另有一个 heartbeat 模块,两者容易混淆:

  • heartbeat:固定间隔唤醒,先用 LLM 读取 HEARTBEAT.md 决策”有没有任务要做”(skip/run),再执行。无 job 概念,不持久化,系统内部机制
  • cron:显式注册的任务,有精确时间表(支持 cron 表达式)、执行历史、持久化存储;用户和系统均可注册

gateway 模式下二者同时运行,互不干扰。


2. 类型系统

代码在 nanobot/cron/types.py,共 5 个 dataclass。

2.1 CronSchedule:触发时机

1
2
3
4
5
6
7
@dataclass
class CronSchedule:
kind: Literal["at", "every", "cron"]
at_ms: int | None = None # "at":绝对时间戳(ms)
every_ms: int | None = None # "every":间隔(ms)
expr: str | None = None # "cron":标准 cron 表达式
tz: str | None = None # "cron" 专用:IANA 时区名
kind 含义 示例
at 绝对时间点,执行一次后禁用或删除 at_ms=1740000000000
every 固定间隔循环 every_ms=3600000(每小时)
cron 标准 5 段 cron 表达式 expr="0 9 * * *"(每天9点)

tz 仅对 cron 模式有效,不指定时使用工具层传入的默认时区。

2.2 CronPayload:触发后做什么

1
2
3
4
5
6
7
@dataclass
class CronPayload:
kind: Literal["system_event", "agent_turn"] = "agent_turn"
message: str = ""
deliver: bool = False # 是否将 agent 回复推送给用户
channel: str | None = None # 推送渠道(如 "feishu")
to: str | None = None # 推送目标(open_id 或手机号等)

kind 决定执行路径:agent_turn 走完整的 agent 对话链;system_event 直接调用内部方法(Dream 使用此类型,不经过 LLM)。deliver=True 时,agent 的回复会通过 channel/to 推送给用户。

2.3 CronJobState:运行状态

1
2
3
4
5
6
7
@dataclass
class CronJobState:
next_run_at_ms: int | None = None
last_run_at_ms: int | None = None
last_status: Literal["ok", "error", "skipped"] | None = None
last_error: str | None = None
run_history: list[CronRunRecord] = field(default_factory=list)

每次执行后追加一条 CronRunRecord(记录时间、状态、耗时、错误),滚动保留最近 20 条(_MAX_RUN_HISTORY = 20)。

2.4 CronJob

1
2
3
4
5
6
7
8
9
10
11
@dataclass
class CronJob:
id: str # 8位 uuid 截断
name: str
enabled: bool = True
schedule: CronSchedule = ...
payload: CronPayload = ...
state: CronJobState = ...
created_at_ms: int = 0
updated_at_ms: int = 0
delete_after_run: bool = False # at 类型默认 True,执行后从 store 删除

2.5 CronStore

1
2
3
4
@dataclass
class CronStore:
version: int = 1
jobs: list[CronJob] = field(default_factory=list)

jobs.json 的内存映射,版本号留作未来迁移用。


3. CronService 架构

3.1 存储文件

1
2
{workspace}/cron/jobs.json      # 主存储:所有 job 的完整状态
{workspace}/cron/action.jsonl # 变更日志:离线写入的临时队列

jobs.jsonCronStore 的完整序列化,包含所有 job 的 schedule、payload、state(含执行历史):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"version": 1,
"jobs": [{
"id": "a3f9c1b2",
"name": "每日提醒",
"enabled": true,
"schedule": { "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" },
"payload": { "kind": "agent_turn", "message": "提醒站起来活动", "deliver": true, "channel": "feishu", "to": "ou_xxx" },
"state": {
"nextRunAtMs": 1745200800000,
"lastRunAtMs": 1745114400000,
"lastStatus": "ok",
"lastError": null,
"runHistory": [
{ "runAtMs": 1745114400000, "status": "ok", "durationMs": 3120, "error": null },
{ "runAtMs": 1745028000000, "status": "error", "durationMs": 502, "error": "LLM timeout" }
]
},
"createdAtMs": 1745114400000,
"deleteAfterRun": false
}]
}

action.jsonl 是追加写入的变更日志,每行一个操作(add/update/del),用于多进程安全和离线注册:

1
2
{"action": "add", "params": {"id": "b7d2e5a1", "name": "每周报告", "schedule": {"kind": "cron", "expr": "0 10 * * 1"}, ...}}
{"action": "del", "params": {"job_id": "a3f9c1b2"}}

两个文件的协作方式:平时只写 jobs.json;当服务未运行或有外部进程需要变更 job 时,追加写入 action.jsonl;服务启动或下一轮 _on_timer() 开始时,将 action.jsonl 重放合并进 jobs.json,然后清空 action.jsonl。详见第5节。

3.2 初始化参数

1
2
3
4
5
6
7
class CronService:
def __init__(
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None,
max_sleep_ms: int = 300_000, # 5分钟
):
  • on_job:job 触发时的回调,由 gateway 注入;CronService 本身不感知 agent 的存在
  • max_sleep_ms:无 job 时定时器的最长 sleep 时间,防止外部进程新增 job 后长时间未被发现

3.3 两个状态标志

1
2
self._running = False       # 服务是否已 start()
self._timer_active = False # 当前是否在 _on_timer() 执行中

4. 调度引擎:定时器驱动循环

定时器驱动执行流程

4.1 start()

1
2
3
4
5
6
async def start(self) -> None:
self._running = True
self._load_store() # 加载 jobs.json + 合并 action.jsonl
self._recompute_next_runs() # 重算所有 next_run_at_ms(跨重启时间已流逝)
self._save_store()
self._arm_timer()

4.2 _arm_timer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _arm_timer(self) -> None:
if self._timer_task:
self._timer_task.cancel()

next_wake = self._get_next_wake_ms()
delay_ms = self.max_sleep_ms if next_wake is None \
else min(self.max_sleep_ms, max(0, next_wake - _now_ms()))

async def tick():
await asyncio.sleep(delay_ms / 1000)
if self._running:
await self._on_timer()

self._timer_task = asyncio.create_task(tick())

取所有 enabled job 中最近的 next_run_at_ms,sleep 对应时长后触发 _on_timer()_on_timer() 结束后再次调用 _arm_timer(),形成自驱动循环。max_sleep_ms 保证即使无 job,定时器也会定期醒来扫描外部变更。

4.3 _on_timer():核心调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def _on_timer(self) -> None:
self._load_store() # 重新加载,合并外部变更
self._timer_active = True
try:
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
for job in due_jobs:
await self._execute_job(job)
self._save_store()
finally:
self._timer_active = False
self._arm_timer()

4.4 三种调度模式的计算

1
2
3
4
5
6
7
8
9
10
11
12
def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None

if schedule.kind == "every":
return now_ms + schedule.every_ms # 从"执行完那刻"起算,不漂移

if schedule.kind == "cron" and schedule.expr:
base_dt = datetime.fromtimestamp(now_ms / 1000, tz=tz)
cron = croniter(schedule.expr, base_dt)
next_dt = cron.get_next(datetime)
return int(next_dt.timestamp() * 1000)
  • at:时间点过了返回 None,不再触发
  • every:每次执行后重算,以执行完成时刻为基准,避免执行耗时导致的时间漂移累积
  • cron:用 croniter 解析表达式,ZoneInfo 处理时区,夏令时切换时行为正确

4.5 _execute_job()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def _execute_job(self, job: CronJob) -> None:
start_ms = _now_ms()
try:
if self.on_job:
await self.on_job(job)
job.state.last_status = "ok"
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)

end_ms = _now_ms()
job.state.last_run_at_ms = start_ms
job.state.run_history.append(CronRunRecord(
run_at_ms=start_ms, status=job.state.last_status,
duration_ms=end_ms - start_ms, error=job.state.last_error,
))
job.state.run_history = job.state.run_history[-self._MAX_RUN_HISTORY:]

if job.schedule.kind == "at":
if job.delete_after_run:
self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
else:
job.enabled = False
job.state.next_run_at_ms = None
else:
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())

无论成功失败都记录执行历史。at 类型执行完后,delete_after_run=True 则从 store 删除,否则置 enabled=False,确保不再触发。


5. action.jsonl:离线写入机制

5.1 什么是”离线”

这里的”离线”特指 CronService._running == False 的情况,有两种场景:

  1. gateway 未启动:用户在 nanobot agent 交互模式下创建了 cron job,此时没有 gateway 进程运行,没有任何 CronService 实例处于 running 状态
  2. 外部进程操作:gateway 已经运行,但另一个进程(如 nanobot agent)也实例化了一个 CronService 对象来操作 job——这个外部实例从未调用过 start(),对它自己而言 _running=False

两种情况下直接写 jobs.json 都不安全,gateway 的 CronService 实例随时可能在写同一个文件。

5.2 类 WAL 机制

解决方案是 action.jsonl

  • 离线时:把操作(add/del/update)追加写入 action.jsonl——append-only,原子性强
  • 合并时机:**start() 启动时,以及每次 _on_timer() 开始时**(每轮定时器唤醒都会调用 _load_store(),其中包含 _merge_action()

离线写入与上线合并

_running 决定走哪条路:

1
2
3
4
5
6
7
8
def add_job(self, ...) -> CronJob:
if self._running:
store = self._load_store()
store.jobs.append(job)
self._save_store()
self._arm_timer()
else:
self._append_action("add", asdict(job))

remove_job()enable_job()update_job() 遵循相同模式。

5.3 _merge_action()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _merge_action(self):
jobs_map = {j.id: j for j in self._store.jobs}
with self._lock:
with open(self._action_path, "r") as f:
changed = False
for line in f:
action = json.loads(line.strip())
if action["action"] == "del":
jobs_map.pop(action["params"].get("job_id"), None)
else:
jobs_map[action["params"]["id"]] = CronJob.from_dict(action["params"])
changed = True
self._store.jobs = list(jobs_map.values())
if self._running and changed:
self._action_path.write_text("", encoding="utf-8") # 清空
self._save_store()

{id: job} map 重放所有操作,保证幂等。FileLock 保护读-处理-清空的整个临界区,防止并发 append 与清空之间的竞态。


6. 用户如何添加 cron job

用户有两条路径注册 job,最终都走到 CronService.add_job()

6.1 自然语言对话(最常用)

在任意渠道(飞书、微信、CLI 等)直接告诉 bot:

1
2
3
每天早上 9 点提醒我站起来活动
下周一 10 点提醒我开周会
每隔 2 小时检查一次系统状态并发到飞书

agent 的 LLM 会将自然语言解析为 CronTool 的工具调用,完整流程见第7节。这是普通用户使用 cron 的主要方式,不需要了解任何底层细节。

6.2 代码直接调用

开发者或系统内部可以跳过 LLM,直接构造 CronSchedule 调用 CronService.add_job()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from nanobot.cron.service import CronService
from nanobot.cron.types import CronSchedule

cron = CronService(store_path)

# 每天 9 点(cron 表达式)
cron.add_job(
name="morning-standup",
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="Asia/Shanghai"),
message="提醒站起来活动",
deliver=True,
channel="feishu",
to="ou_xxx",
)

# 每小时一次
cron.add_job(
name="hourly-check",
schedule=CronSchedule(kind="every", every_ms=3_600_000),
message="检查系统状态并汇报",
deliver=False,
)

# 一次性定时(at 模式,执行后自动删除)
cron.add_job(
name="meeting-reminder",
schedule=CronSchedule(kind="at", at_ms=1745200800000),
message="10分钟后开周会,提前准备",
deliver=True,
channel="feishu",
to="ou_xxx",
)

注意:若此时 gateway 未运行(_running=False),add_job() 会把操作写入 action.jsonl,等 gateway 启动时自动合并生效。

6.3 系统内部注册(register_system_job)

Dream 等系统任务通过 register_system_job() 注册,幂等,每次 gateway 重启都会重新注册并重算执行时间。此类 job 的 payload.kind="system_event",用户可见但不可删除(protected)。详见第8节。


7. CronTool:LLM 的调度接口

CronService 是纯调度引擎,不关心 job 来源。用户用自然语言说”提醒我明天开会”时,经由 CronTool 完成注册。

1
2
3
4
5
6
7
8
9
10
11
12
@tool_parameters(tool_parameters_schema(
action=StringSchema("Action to perform", enum=["add", "list", "remove"]),
message=StringSchema("Instruction for the agent when job triggers"),
every_seconds=IntegerSchema(0, description="Interval in seconds"),
cron_expr=StringSchema("Cron expression like '0 9 * * *'"),
tz=StringSchema("Optional IANA timezone"),
at=StringSchema("ISO datetime for one-time execution"),
deliver=BooleanSchema(description="Whether to deliver result to user", default=True),
job_id=StringSchema("Job ID (for remove)"),
required=["action"],
))
class CronTool(Tool):

7.1 上下文绑定

每次 agent 处理消息时,通过 set_context() 注入当前 channel 和 chat_id:

1
2
3
def set_context(self, channel: str, chat_id: str) -> None:
self._channel = channel
self._chat_id = chat_id

创建 job 时这两个值存入 CronPayload,触发时推送到注册 job 时所在的会话

7.2 防递归注册:_in_cron_context

cron job 执行中,agent 可能再次调用 cron(action=add, ...),导致 job 在运行时不断自我复制。用 ContextVar 标记当前执行上下文:

1
self._in_cron_context: ContextVar[bool] = ContextVar("cron_in_context", default=False)

gateway 的 on_cron_job 回调设置标记:

1
2
3
4
5
cron_token = cron_tool.set_cron_context(True)
try:
resp = await agent.process_direct(...)
finally:
cron_tool.reset_cron_context(cron_token)

execute() 里检查:

1
2
if action == "add" and self._in_cron_context.get():
return "Error: cannot schedule new jobs from within a cron job execution"

7.3 deliver 与智能推送

job 触发后 agent 生成回复,deliver=True 时通过 evaluate_response() 再调一次 LLM 判断是否值得推送:

1
2
3
4
5
6
7
8
if job.payload.deliver and job.payload.to and response:
should_notify = await evaluate_response(response, reminder_note, provider, agent.model)
if should_notify:
await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=response,
))

对于”检查系统状态,一切正常”这类没有实质内容的回复,evaluate_response() 会返回 False,不触发推送。

完整的 LLM 调用 → 创建 job → 触发执行 → deliver 流程:

LLM 调用链:add 到 deliver


8. Dream:系统任务注册

第1篇讲的 Dream 记忆整理系统,其定期触发正是通过 cron 实现的。gateway 启动时:

1
2
3
4
5
6
cron.register_system_job(CronJob(
id="dream",
name="dream",
schedule=dream_cfg.build_schedule(config.agents.defaults.timezone),
payload=CronPayload(kind="system_event"),
))

on_cron_job 回调中按 job 名特判:

1
2
3
4
5
async def on_cron_job(job: CronJob) -> str | None:
if job.name == "dream":
await agent.dream.run() # 直接调用内部方法,不走 agent 对话
return None
# 其他 job 走 process_direct()

system_event 类型不经 LLM 对话,不占用 session;用户执行 cron(action=list) 时能看到 dream job 但无法删除(protected)。

cron 是 nanobot 所有定时自动化的统一基础设施,用户提醒和系统内部任务走同一套调度,仅执行路径不同。

gateway 启动初始化完整流程:

gateway 启动初始化


9. 整体调用链

三条线并行支撑 cron 系统的运转:

线1:用户对话注册
用户发消息 → LLM 识别意图 → CronTool.execute(action=add)CronService.add_job() → 写 jobs.json 并重算定时器

线2:定时器自驱动执行
_arm_timer() sleep → _on_timer() 扫描到期 job → _execute_job()on_cron_job 回调 → process_direct() / dream.run() → 按需 bus.publish_outbound() 推送用户

线3:多进程离线写入
外部进程 add_job() → 写 action.jsonl → 下一轮 _on_timer() 触发 _load_store()_merge_action() 合并 → 清空 action.jsonl → 写回 jobs.json

三条线通过 jobs.jsonaction.jsonl 共享状态,FileLock 保护并发写,CronService 内部只有一个 asyncio 定时器驱动整个调度循环。


小结

nanobot 的 cron 模块做了一件值得关注的事:把调度引擎和执行载荷彻底解耦。引擎只管”什么时候触发”,触发后发生什么完全由外部注入的 on_job 回调决定——用户任务走 agent 对话,系统任务直接调内部接口,两者共用一套调度机制。

这个设计的另一个体现是 action.jsonl。传统上一个本地调度器要么只允许单进程操作,要么需要数据库来协调并发写入。nanobot 用一个 append-only 的日志文件加上定期 merge 就解决了这个问题,足够简单,也足够可靠。

在 nanobot 的整体架构里,cron它既承载用户通过自然语言注册的提醒任务,也承载 Dream 记忆整理这类系统级自动化,是”主动型 agent”能力的基础设施。