Cron: respect aborts in main wake-now retries (#23967)
* Cron: respect aborts in main wake-now retries * Changelog: add main-session cron abort retry fix note * Cron tests: format post-rebase conflict resolution
This commit is contained in:
@@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman.
|
- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman.
|
||||||
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
|
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
|
||||||
- Cron/Startup: enforce per-job timeout guards for startup catch-up replay runs so missed isolated jobs cannot hang indefinitely during gateway boot recovery.
|
- Cron/Startup: enforce per-job timeout guards for startup catch-up replay runs so missed isolated jobs cannot hang indefinitely during gateway boot recovery.
|
||||||
|
- Cron/Main session: honor abort/timeout signals while retrying `wakeMode=now` heartbeat contention loops so main-target cron runs stop promptly instead of waiting through the full busy-retry window.
|
||||||
- Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber.
|
- Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber.
|
||||||
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
|
- Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves.
|
||||||
- Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves.
|
- Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves.
|
||||||
|
|||||||
@@ -3,12 +3,13 @@ import fs from "node:fs/promises";
|
|||||||
import os from "node:os";
|
import os from "node:os";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
|
||||||
import * as schedule from "./schedule.js";
|
import * as schedule from "./schedule.js";
|
||||||
import { CronService } from "./service.js";
|
import { CronService } from "./service.js";
|
||||||
import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js";
|
import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js";
|
||||||
import { computeJobNextRunAtMs } from "./service/jobs.js";
|
import { computeJobNextRunAtMs } from "./service/jobs.js";
|
||||||
import { createCronServiceState, type CronEvent } from "./service/state.js";
|
import { createCronServiceState, type CronEvent } from "./service/state.js";
|
||||||
import { onTimer, runMissedJobs } from "./service/timer.js";
|
import { executeJobCore, onTimer, runMissedJobs } from "./service/timer.js";
|
||||||
import type { CronJob, CronJobState } from "./types.js";
|
import type { CronJob, CronJobState } from "./types.js";
|
||||||
|
|
||||||
const noopLogger = {
|
const noopLogger = {
|
||||||
@@ -859,6 +860,55 @@ describe("Cron issue regressions", () => {
|
|||||||
expect(job?.state.lastError).toContain("timed out");
|
expect(job?.state.lastError).toContain("timed out");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("respects abort signals while retrying main-session wake-now heartbeat runs", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
const abortController = new AbortController();
|
||||||
|
const runHeartbeatOnce = vi.fn(
|
||||||
|
async (): Promise<HeartbeatRunResult> => ({
|
||||||
|
status: "skipped",
|
||||||
|
reason: "requests-in-flight",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
const mainJob: CronJob = {
|
||||||
|
id: "main-abort",
|
||||||
|
name: "main abort",
|
||||||
|
enabled: true,
|
||||||
|
createdAtMs: Date.now(),
|
||||||
|
updatedAtMs: Date.now(),
|
||||||
|
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
|
||||||
|
sessionTarget: "main",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "systemEvent", text: "tick" },
|
||||||
|
state: {},
|
||||||
|
};
|
||||||
|
const state = createCronServiceState({
|
||||||
|
cronEnabled: true,
|
||||||
|
storePath: "/tmp/openclaw-cron-abort-test/jobs.json",
|
||||||
|
log: noopLogger,
|
||||||
|
nowMs: () => Date.now(),
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runHeartbeatOnce,
|
||||||
|
wakeNowHeartbeatBusyMaxWaitMs: 30,
|
||||||
|
wakeNowHeartbeatBusyRetryDelayMs: 5,
|
||||||
|
runIsolatedAgentJob: createDefaultIsolatedRunner(),
|
||||||
|
});
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
abortController.abort();
|
||||||
|
}, 10);
|
||||||
|
|
||||||
|
const result = await executeJobCore(state, mainJob, abortController.signal);
|
||||||
|
|
||||||
|
expect(result.status).toBe("error");
|
||||||
|
expect(result.error).toContain("timed out");
|
||||||
|
expect(enqueueSystemEvent).toHaveBeenCalledTimes(1);
|
||||||
|
expect(runHeartbeatOnce).toHaveBeenCalled();
|
||||||
|
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
|
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
|
||||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||||
const cronJob = createIsolatedRegressionJob({
|
const cronJob = createIsolatedRegressionJob({
|
||||||
|
|||||||
@@ -607,8 +607,34 @@ export async function executeJobCore(
|
|||||||
job: CronJob,
|
job: CronJob,
|
||||||
abortSignal?: AbortSignal,
|
abortSignal?: AbortSignal,
|
||||||
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
||||||
|
const resolveAbortError = () => ({
|
||||||
|
status: "error" as const,
|
||||||
|
error: timeoutErrorMessage(),
|
||||||
|
});
|
||||||
|
const waitWithAbort = async (ms: number) => {
|
||||||
|
if (!abortSignal) {
|
||||||
|
await new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (abortSignal.aborted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
abortSignal.removeEventListener("abort", onAbort);
|
||||||
|
resolve();
|
||||||
|
}, ms);
|
||||||
|
const onAbort = () => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
abortSignal.removeEventListener("abort", onAbort);
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
abortSignal.addEventListener("abort", onAbort, { once: true });
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
if (abortSignal?.aborted) {
|
if (abortSignal?.aborted) {
|
||||||
return { status: "error", error: timeoutErrorMessage() };
|
return resolveAbortError();
|
||||||
}
|
}
|
||||||
if (job.sessionTarget === "main") {
|
if (job.sessionTarget === "main") {
|
||||||
const text = resolveJobPayloadTextForMain(job);
|
const text = resolveJobPayloadTextForMain(job);
|
||||||
@@ -629,7 +655,6 @@ export async function executeJobCore(
|
|||||||
});
|
});
|
||||||
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
||||||
const reason = `cron:${job.id}`;
|
const reason = `cron:${job.id}`;
|
||||||
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
|
||||||
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
|
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
|
||||||
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
|
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
|
||||||
const waitStartedAt = state.deps.nowMs();
|
const waitStartedAt = state.deps.nowMs();
|
||||||
@@ -637,7 +662,7 @@ export async function executeJobCore(
|
|||||||
let heartbeatResult: HeartbeatRunResult;
|
let heartbeatResult: HeartbeatRunResult;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (abortSignal?.aborted) {
|
if (abortSignal?.aborted) {
|
||||||
return { status: "error", error: timeoutErrorMessage() };
|
return resolveAbortError();
|
||||||
}
|
}
|
||||||
heartbeatResult = await state.deps.runHeartbeatOnce({
|
heartbeatResult = await state.deps.runHeartbeatOnce({
|
||||||
reason,
|
reason,
|
||||||
@@ -650,7 +675,13 @@ export async function executeJobCore(
|
|||||||
) {
|
) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
return resolveAbortError();
|
||||||
|
}
|
||||||
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
|
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
return resolveAbortError();
|
||||||
|
}
|
||||||
state.deps.requestHeartbeatNow({
|
state.deps.requestHeartbeatNow({
|
||||||
reason,
|
reason,
|
||||||
agentId: job.agentId,
|
agentId: job.agentId,
|
||||||
@@ -658,7 +689,7 @@ export async function executeJobCore(
|
|||||||
});
|
});
|
||||||
return { status: "ok", summary: text };
|
return { status: "ok", summary: text };
|
||||||
}
|
}
|
||||||
await delay(retryDelayMs);
|
await waitWithAbort(retryDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (heartbeatResult.status === "ran") {
|
if (heartbeatResult.status === "ran") {
|
||||||
@@ -669,6 +700,9 @@ export async function executeJobCore(
|
|||||||
return { status: "error", error: heartbeatResult.reason, summary: text };
|
return { status: "error", error: heartbeatResult.reason, summary: text };
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
return resolveAbortError();
|
||||||
|
}
|
||||||
state.deps.requestHeartbeatNow({
|
state.deps.requestHeartbeatNow({
|
||||||
reason: `cron:${job.id}`,
|
reason: `cron:${job.id}`,
|
||||||
agentId: job.agentId,
|
agentId: job.agentId,
|
||||||
@@ -682,7 +716,7 @@ export async function executeJobCore(
|
|||||||
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
|
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
|
||||||
}
|
}
|
||||||
if (abortSignal?.aborted) {
|
if (abortSignal?.aborted) {
|
||||||
return { status: "error", error: timeoutErrorMessage() };
|
return resolveAbortError();
|
||||||
}
|
}
|
||||||
|
|
||||||
const res = await state.deps.runIsolatedAgentJob({
|
const res = await state.deps.runIsolatedAgentJob({
|
||||||
|
|||||||
Reference in New Issue
Block a user