fix(cron): stabilize restart catch-up replay semantics (#35351)

* Cron: stabilize restart catch-up replay semantics

* Cron: respect backoff in startup missed-run replay
This commit is contained in:
Tak Hoffman
2026-03-04 21:50:16 -06:00
committed by GitHub
parent 1059b406a8
commit 79d00ae398
6 changed files with 295 additions and 4 deletions

View File

@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3.
- Auto-reply/system events: restore runtime system events to the message timeline (`System:` lines), preserve think-hint parsing with prepended events, and carry events into deferred followup/collect/steer-backlog prompts to keep cache behavior stable without dropping queued metadata. (#34794) Thanks @anisoptera.
- Security/audit account handling: avoid prototype-chain account IDs in audit validation by using own-property checks for `accounts`. (#34982) Thanks @HOYALIM.
- Cron/restart catch-up semantics: replay interrupted recurring jobs and missed immediate cron slots on startup without replaying interrupted one-shot jobs, with guarded missed-slot probing to avoid malformed-schedule startup aborts and duplicate-trigger drift after restart. (from #34466, #34896, #34625, #33206) Thanks @dunamismax, @dsantoreis, @Octane0411, and @Sid-Qin.
- Agents/session usage tracking: preserve accumulated usage metadata on embedded Pi runner error exits so failed turns still update session `totalTokens` from real usage instead of stale prior values. (#34275) thanks @RealKai42.
- Nodes/system.run approval hardening: use explicit argv-mutation signaling when regenerating prepared `rawCommand`, and cover the `system.run.prepare -> system.run` handoff so direct PATH-based `nodes.run` commands no longer fail with `rawCommand does not match command`. (#33137) thanks @Sid-Qin.
- Models/custom provider headers: propagate `models.providers.<name>.headers` across inline, fallback, and registry-found model resolution so header-authenticated proxies consistently receive configured request headers. (#27490) thanks @Sid-Qin.

View File

@@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it } from "vitest";
import {
clearCronScheduleCacheForTest,
computeNextRunAtMs,
computePreviousRunAtMs,
getCronScheduleCacheSizeForTest,
} from "./schedule.js";
@@ -91,6 +92,17 @@ describe("cron schedule", () => {
expect(next!).toBeGreaterThan(nowMs);
});
it("never returns a previous run that is at-or-after now", () => {
const nowMs = Date.parse("2026-03-01T00:00:00.000Z");
const previous = computePreviousRunAtMs(
{ kind: "cron", expr: "0 8 * * *", tz: "Asia/Shanghai" },
nowMs,
);
if (previous !== undefined) {
expect(previous).toBeLessThan(nowMs);
}
});
it("reuses compiled cron evaluators for the same expression/timezone", () => {
const nowMs = Date.parse("2026-03-01T00:00:00.000Z");
expect(getCronScheduleCacheSizeForTest()).toBe(0);

View File

@@ -108,6 +108,35 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe
return nextMs;
}
export function computePreviousRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
if (schedule.kind !== "cron") {
return undefined;
}
const cronSchedule = schedule as { expr?: unknown; cron?: unknown };
const exprSource = typeof cronSchedule.expr === "string" ? cronSchedule.expr : cronSchedule.cron;
if (typeof exprSource !== "string") {
throw new Error("invalid cron schedule: expr is required");
}
const expr = exprSource.trim();
if (!expr) {
return undefined;
}
const cron = resolveCachedCron(expr, resolveCronTimezone(schedule.tz));
const previousRuns = cron.previousRuns(1, new Date(nowMs));
const previous = previousRuns[0];
if (!previous) {
return undefined;
}
const previousMs = previous.getTime();
if (!Number.isFinite(previousMs)) {
return undefined;
}
if (previousMs >= nowMs) {
return undefined;
}
return previousMs;
}
export function clearCronScheduleCacheForTest(): void {
cronEvalCache.clear();
}

View File

@@ -128,6 +128,179 @@ describe("CronService restart catch-up", () => {
expect(updated?.state.lastRunAtMs).toBeUndefined();
expect((updated?.state.nextRunAtMs ?? 0) > Date.parse("2025-12-13T17:00:00.000Z")).toBe(true);
cron.stop();
await store.cleanup();
});
it("replays the most recent missed cron slot after restart when nextRunAtMs already advanced", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await writeStoreJobs(store.storePath, [
{
id: "restart-missed-slot",
name: "every ten minutes +1",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "catch missed slot" },
state: {
// Persisted state may already be recomputed from restart time and
// point to the future slot, even though 04:01 was missed.
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"),
lastStatus: "ok",
},
},
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"catch missed slot",
expect.objectContaining({ agentId: undefined }),
);
expect(requestHeartbeatNow).toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-missed-slot");
expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z"));
cron.stop();
await store.cleanup();
});
it("does not replay interrupted one-shot jobs on startup", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
await writeStoreJobs(store.storePath, [
{
id: "restart-stale-one-shot",
name: "one shot stale marker",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"),
schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "one-shot stale marker" },
state: {
nextRunAtMs: dueAt,
runningAtMs: staleRunningAt,
},
},
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((job) => job.id === "restart-stale-one-shot");
expect(updated?.state.runningAtMs).toBeUndefined();
cron.stop();
await store.cleanup();
});
it("does not replay cron slot when the latest slot already ran before restart", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await writeStoreJobs(store.storePath, [
{
id: "restart-no-duplicate-slot",
name: "every ten minutes +1 no duplicate",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "already ran" },
state: {
nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"),
lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
lastStatus: "ok",
},
},
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
it("does not replay missed cron slots while error backoff is pending after restart", async () => {
vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z"));
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await writeStoreJobs(store.storePath, [
{
id: "restart-backoff-pending",
name: "backoff pending",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"),
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "do not run during backoff" },
state: {
// Next retry is intentionally delayed by backoff despite a newer cron slot.
nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"),
lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"),
lastStatus: "error",
},
},
]);
const cron = createRestartCronService({
storePath: store.storePath,
enqueueSystemEvent,
requestHeartbeatNow,
});
await cron.start();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
});

View File

@@ -1,7 +1,7 @@
import crypto from "node:crypto";
import { normalizeAgentId } from "../../routing/session-key.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import { computeNextRunAtMs } from "../schedule.js";
import { computeNextRunAtMs, computePreviousRunAtMs } from "../schedule.js";
import {
normalizeCronStaggerMs,
resolveCronStaggerMs,
@@ -80,6 +80,34 @@ function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) {
return undefined;
}
function computeStaggeredCronPreviousRunAtMs(job: CronJob, nowMs: number) {
if (job.schedule.kind !== "cron") {
return undefined;
}
const staggerMs = resolveCronStaggerMs(job.schedule);
const offsetMs = resolveStableCronOffsetMs(job.id, staggerMs);
if (offsetMs <= 0) {
return computePreviousRunAtMs(job.schedule, nowMs);
}
// Shift the cursor backwards by the same per-job offset used for next-run
// math so previous-run lookup matches the effective staggered schedule.
let cursorMs = Math.max(0, nowMs - offsetMs);
for (let attempt = 0; attempt < 4; attempt += 1) {
const basePrevious = computePreviousRunAtMs(job.schedule, cursorMs);
if (basePrevious === undefined) {
return undefined;
}
const shifted = basePrevious + offsetMs;
if (shifted <= nowMs) {
return shifted;
}
cursorMs = Math.max(0, basePrevious - 1_000);
}
return undefined;
}
function isFiniteTimestamp(value: unknown): value is number {
return typeof value === "number" && Number.isFinite(value);
}
@@ -248,6 +276,14 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
return isFiniteTimestamp(next) ? next : undefined;
}
export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined {
if (!job.enabled || job.schedule.kind !== "cron") {
return undefined;
}
const previous = computeStaggeredCronPreviousRunAtMs(job, nowMs);
return isFiniteTimestamp(previous) ? previous : undefined;
}
/** Maximum consecutive schedule errors before auto-disabling a job. */
const MAX_SCHEDULE_ERRORS = 3;

View File

@@ -14,6 +14,7 @@ import type {
CronRunTelemetry,
} from "../types.js";
import {
computeJobPreviousRunAtMs,
computeJobNextRunAtMs,
nextWakeAtMs,
recomputeNextRunsForMaintenance,
@@ -700,6 +701,7 @@ function isRunnableJob(params: {
nowMs: number;
skipJobIds?: ReadonlySet<string>;
skipAtIfAlreadyRan?: boolean;
allowCronMissedRunByLastRun?: boolean;
}): boolean {
const { job, nowMs } = params;
if (!job.state) {
@@ -732,13 +734,46 @@ function isRunnableJob(params: {
return false;
}
const next = job.state.nextRunAtMs;
return typeof next === "number" && Number.isFinite(next) && nowMs >= next;
if (typeof next === "number" && Number.isFinite(next) && nowMs >= next) {
return true;
}
if (
typeof next === "number" &&
Number.isFinite(next) &&
next > nowMs &&
job.state.lastStatus === "error"
) {
// Respect persisted retry backoff windows for recurring jobs on restart.
return false;
}
if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") {
return false;
}
let previousRunAtMs: number | undefined;
try {
previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs);
} catch {
return false;
}
if (typeof previousRunAtMs !== "number" || !Number.isFinite(previousRunAtMs)) {
return false;
}
const lastRunAtMs = job.state.lastRunAtMs;
if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) {
// Only replay a "missed slot" when there is concrete run history.
return false;
}
return previousRunAtMs > lastRunAtMs;
}
function collectRunnableJobs(
state: CronServiceState,
nowMs: number,
opts?: { skipJobIds?: ReadonlySet<string>; skipAtIfAlreadyRan?: boolean },
opts?: {
skipJobIds?: ReadonlySet<string>;
skipAtIfAlreadyRan?: boolean;
allowCronMissedRunByLastRun?: boolean;
},
): CronJob[] {
if (!state.store) {
return [];
@@ -749,6 +784,7 @@ function collectRunnableJobs(
nowMs,
skipJobIds: opts?.skipJobIds,
skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan,
allowCronMissedRunByLastRun: opts?.allowCronMissedRunByLastRun,
}),
);
}
@@ -764,7 +800,11 @@ export async function runMissedJobs(
}
const now = state.deps.nowMs();
const skipJobIds = opts?.skipJobIds;
const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true });
const missed = collectRunnableJobs(state, now, {
skipJobIds,
skipAtIfAlreadyRan: true,
allowCronMissedRunByLastRun: true,
});
if (missed.length === 0) {
return [] as Array<{ jobId: string; job: CronJob }>;
}