diff --git a/CHANGELOG.md b/CHANGELOG.md index cf9e30958..40bda8949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3. - Auto-reply/system events: restore runtime system events to the message timeline (`System:` lines), preserve think-hint parsing with prepended events, and carry events into deferred followup/collect/steer-backlog prompts to keep cache behavior stable without dropping queued metadata. (#34794) Thanks @anisoptera. - Security/audit account handling: avoid prototype-chain account IDs in audit validation by using own-property checks for `accounts`. (#34982) Thanks @HOYALIM. +- Cron/restart catch-up semantics: replay interrupted recurring jobs and missed immediate cron slots on startup without replaying interrupted one-shot jobs, with guarded missed-slot probing to avoid malformed-schedule startup aborts and duplicate-trigger drift after restart. (from #34466, #34896, #34625, #33206) Thanks @dunamismax, @dsantoreis, @Octane0411, and @Sid-Qin. - Agents/session usage tracking: preserve accumulated usage metadata on embedded Pi runner error exits so failed turns still update session `totalTokens` from real usage instead of stale prior values. (#34275) thanks @RealKai42. - Nodes/system.run approval hardening: use explicit argv-mutation signaling when regenerating prepared `rawCommand`, and cover the `system.run.prepare -> system.run` handoff so direct PATH-based `nodes.run` commands no longer fail with `rawCommand does not match command`. (#33137) thanks @Sid-Qin. - Models/custom provider headers: propagate `models.providers..headers` across inline, fallback, and registry-found model resolution so header-authenticated proxies consistently receive configured request headers. (#27490) thanks @Sid-Qin. diff --git a/src/cron/schedule.test.ts b/src/cron/schedule.test.ts index 6b6c290b3..614a980f4 100644 --- a/src/cron/schedule.test.ts +++ b/src/cron/schedule.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it } from "vitest"; import { clearCronScheduleCacheForTest, computeNextRunAtMs, + computePreviousRunAtMs, getCronScheduleCacheSizeForTest, } from "./schedule.js"; @@ -91,6 +92,17 @@ describe("cron schedule", () => { expect(next!).toBeGreaterThan(nowMs); }); + it("never returns a previous run that is at-or-after now", () => { + const nowMs = Date.parse("2026-03-01T00:00:00.000Z"); + const previous = computePreviousRunAtMs( + { kind: "cron", expr: "0 8 * * *", tz: "Asia/Shanghai" }, + nowMs, + ); + if (previous !== undefined) { + expect(previous).toBeLessThan(nowMs); + } + }); + it("reuses compiled cron evaluators for the same expression/timezone", () => { const nowMs = Date.parse("2026-03-01T00:00:00.000Z"); expect(getCronScheduleCacheSizeForTest()).toBe(0); diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index 70577b761..4c31c0a1a 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -108,6 +108,35 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe return nextMs; } +export function computePreviousRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined { + if (schedule.kind !== "cron") { + return undefined; + } + const cronSchedule = schedule as { expr?: unknown; cron?: unknown }; + const exprSource = typeof cronSchedule.expr === "string" ? cronSchedule.expr : cronSchedule.cron; + if (typeof exprSource !== "string") { + throw new Error("invalid cron schedule: expr is required"); + } + const expr = exprSource.trim(); + if (!expr) { + return undefined; + } + const cron = resolveCachedCron(expr, resolveCronTimezone(schedule.tz)); + const previousRuns = cron.previousRuns(1, new Date(nowMs)); + const previous = previousRuns[0]; + if (!previous) { + return undefined; + } + const previousMs = previous.getTime(); + if (!Number.isFinite(previousMs)) { + return undefined; + } + if (previousMs >= nowMs) { + return undefined; + } + return previousMs; +} + export function clearCronScheduleCacheForTest(): void { cronEvalCache.clear(); } diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts index ea42e7b5a..9c833a994 100644 --- a/src/cron/service.restart-catchup.test.ts +++ b/src/cron/service.restart-catchup.test.ts @@ -128,6 +128,179 @@ describe("CronService restart catch-up", () => { expect(updated?.state.lastRunAtMs).toBeUndefined(); expect((updated?.state.nextRunAtMs ?? 0) > Date.parse("2025-12-13T17:00:00.000Z")).toBe(true); + cron.stop(); + await store.cleanup(); + }); + it("replays the most recent missed cron slot after restart when nextRunAtMs already advanced", async () => { + vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + await writeStoreJobs(store.storePath, [ + { + id: "restart-missed-slot", + name: "every ten minutes +1", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"), + schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "catch missed slot" }, + state: { + // Persisted state may already be recomputed from restart time and + // point to the future slot, even though 04:01 was missed. + nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"), + lastRunAtMs: Date.parse("2025-12-13T03:51:00.000Z"), + lastStatus: "ok", + }, + }, + ]); + + const cron = createRestartCronService({ + storePath: store.storePath, + enqueueSystemEvent, + requestHeartbeatNow, + }); + + await cron.start(); + + expect(enqueueSystemEvent).toHaveBeenCalledWith( + "catch missed slot", + expect.objectContaining({ agentId: undefined }), + ); + expect(requestHeartbeatNow).toHaveBeenCalled(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-missed-slot"); + expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T04:02:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); + + it("does not replay interrupted one-shot jobs on startup", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); + const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); + + await writeStoreJobs(store.storePath, [ + { + id: "restart-stale-one-shot", + name: "one shot stale marker", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"), + schedule: { kind: "at", at: "2025-12-13T16:00:00.000Z" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "one-shot stale marker" }, + state: { + nextRunAtMs: dueAt, + runningAtMs: staleRunningAt, + }, + }, + ]); + + const cron = createRestartCronService({ + storePath: store.storePath, + enqueueSystemEvent, + requestHeartbeatNow, + }); + + await cron.start(); + + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-stale-one-shot"); + expect(updated?.state.runningAtMs).toBeUndefined(); + + cron.stop(); + await store.cleanup(); + }); + + it("does not replay cron slot when the latest slot already ran before restart", async () => { + vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + await writeStoreJobs(store.storePath, [ + { + id: "restart-no-duplicate-slot", + name: "every ten minutes +1 no duplicate", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T04:01:00.000Z"), + schedule: { kind: "cron", expr: "1,11,21,31,41,51 4-20 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "already ran" }, + state: { + nextRunAtMs: Date.parse("2025-12-13T04:11:00.000Z"), + lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"), + lastStatus: "ok", + }, + }, + ]); + + const cron = createRestartCronService({ + storePath: store.storePath, + enqueueSystemEvent, + requestHeartbeatNow, + }); + + await cron.start(); + + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + cron.stop(); + await store.cleanup(); + }); + + it("does not replay missed cron slots while error backoff is pending after restart", async () => { + vi.setSystemTime(new Date("2025-12-13T04:02:00.000Z")); + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + await writeStoreJobs(store.storePath, [ + { + id: "restart-backoff-pending", + name: "backoff pending", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T04:01:10.000Z"), + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "do not run during backoff" }, + state: { + // Next retry is intentionally delayed by backoff despite a newer cron slot. + nextRunAtMs: Date.parse("2025-12-13T04:10:00.000Z"), + lastRunAtMs: Date.parse("2025-12-13T04:01:00.000Z"), + lastStatus: "error", + }, + }, + ]); + + const cron = createRestartCronService({ + storePath: store.storePath, + enqueueSystemEvent, + requestHeartbeatNow, + }); + + await cron.start(); + + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + cron.stop(); await store.cleanup(); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index d0d0befb6..6ae2e1304 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,7 +1,7 @@ import crypto from "node:crypto"; import { normalizeAgentId } from "../../routing/session-key.js"; import { parseAbsoluteTimeMs } from "../parse.js"; -import { computeNextRunAtMs } from "../schedule.js"; +import { computeNextRunAtMs, computePreviousRunAtMs } from "../schedule.js"; import { normalizeCronStaggerMs, resolveCronStaggerMs, @@ -80,6 +80,34 @@ function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) { return undefined; } +function computeStaggeredCronPreviousRunAtMs(job: CronJob, nowMs: number) { + if (job.schedule.kind !== "cron") { + return undefined; + } + + const staggerMs = resolveCronStaggerMs(job.schedule); + const offsetMs = resolveStableCronOffsetMs(job.id, staggerMs); + if (offsetMs <= 0) { + return computePreviousRunAtMs(job.schedule, nowMs); + } + + // Shift the cursor backwards by the same per-job offset used for next-run + // math so previous-run lookup matches the effective staggered schedule. + let cursorMs = Math.max(0, nowMs - offsetMs); + for (let attempt = 0; attempt < 4; attempt += 1) { + const basePrevious = computePreviousRunAtMs(job.schedule, cursorMs); + if (basePrevious === undefined) { + return undefined; + } + const shifted = basePrevious + offsetMs; + if (shifted <= nowMs) { + return shifted; + } + cursorMs = Math.max(0, basePrevious - 1_000); + } + return undefined; +} + function isFiniteTimestamp(value: unknown): value is number { return typeof value === "number" && Number.isFinite(value); } @@ -248,6 +276,14 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return isFiniteTimestamp(next) ? next : undefined; } +export function computeJobPreviousRunAtMs(job: CronJob, nowMs: number): number | undefined { + if (!job.enabled || job.schedule.kind !== "cron") { + return undefined; + } + const previous = computeStaggeredCronPreviousRunAtMs(job, nowMs); + return isFiniteTimestamp(previous) ? previous : undefined; +} + /** Maximum consecutive schedule errors before auto-disabling a job. */ const MAX_SCHEDULE_ERRORS = 3; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index ec9d919ec..081e94084 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -14,6 +14,7 @@ import type { CronRunTelemetry, } from "../types.js"; import { + computeJobPreviousRunAtMs, computeJobNextRunAtMs, nextWakeAtMs, recomputeNextRunsForMaintenance, @@ -700,6 +701,7 @@ function isRunnableJob(params: { nowMs: number; skipJobIds?: ReadonlySet; skipAtIfAlreadyRan?: boolean; + allowCronMissedRunByLastRun?: boolean; }): boolean { const { job, nowMs } = params; if (!job.state) { @@ -732,13 +734,46 @@ function isRunnableJob(params: { return false; } const next = job.state.nextRunAtMs; - return typeof next === "number" && Number.isFinite(next) && nowMs >= next; + if (typeof next === "number" && Number.isFinite(next) && nowMs >= next) { + return true; + } + if ( + typeof next === "number" && + Number.isFinite(next) && + next > nowMs && + job.state.lastStatus === "error" + ) { + // Respect persisted retry backoff windows for recurring jobs on restart. + return false; + } + if (!params.allowCronMissedRunByLastRun || job.schedule.kind !== "cron") { + return false; + } + let previousRunAtMs: number | undefined; + try { + previousRunAtMs = computeJobPreviousRunAtMs(job, nowMs); + } catch { + return false; + } + if (typeof previousRunAtMs !== "number" || !Number.isFinite(previousRunAtMs)) { + return false; + } + const lastRunAtMs = job.state.lastRunAtMs; + if (typeof lastRunAtMs !== "number" || !Number.isFinite(lastRunAtMs)) { + // Only replay a "missed slot" when there is concrete run history. + return false; + } + return previousRunAtMs > lastRunAtMs; } function collectRunnableJobs( state: CronServiceState, nowMs: number, - opts?: { skipJobIds?: ReadonlySet; skipAtIfAlreadyRan?: boolean }, + opts?: { + skipJobIds?: ReadonlySet; + skipAtIfAlreadyRan?: boolean; + allowCronMissedRunByLastRun?: boolean; + }, ): CronJob[] { if (!state.store) { return []; @@ -749,6 +784,7 @@ function collectRunnableJobs( nowMs, skipJobIds: opts?.skipJobIds, skipAtIfAlreadyRan: opts?.skipAtIfAlreadyRan, + allowCronMissedRunByLastRun: opts?.allowCronMissedRunByLastRun, }), ); } @@ -764,7 +800,11 @@ export async function runMissedJobs( } const now = state.deps.nowMs(); const skipJobIds = opts?.skipJobIds; - const missed = collectRunnableJobs(state, now, { skipJobIds, skipAtIfAlreadyRan: true }); + const missed = collectRunnableJobs(state, now, { + skipJobIds, + skipAtIfAlreadyRan: true, + allowCronMissedRunByLastRun: true, + }); if (missed.length === 0) { return [] as Array<{ jobId: string; job: CronJob }>; }