feat(cron): add failure destination support to failed cron jobs (#31059)

* feat(cron): add failure destination support with webhook mode and bestEffort handling

Extends PR #24789 failure alerts with features from PR #29145:
- Add webhook delivery mode for failure alerts (mode: 'webhook')
- Add accountId support for multi-account channel configurations
- Add bestEffort handling to skip alerts when job has bestEffort=true
- Add separate failureDestination config (global + per-job in delivery)
- Add duplicate prevention (prevents sending to same as primary delivery)
- Add CLI flags: --failure-alert-mode, --failure-alert-account-id
- Add UI fields for new options in web cron editor

* fix(cron): merge failureAlert mode/accountId and preserve failureDestination on updates

- Fix mergeCronFailureAlert to merge mode and accountId fields
- Fix mergeCronDelivery to preserve failureDestination on updates
- Fix isSameDeliveryTarget to use 'announce' as default instead of 'none'
  to properly detect duplicates when delivery.mode is undefined

* fix(cron): validate webhook mode requires URL in resolveFailureDestination

When mode is 'webhook' but no 'to' URL is provided, return null
instead of creating an invalid plan that silently fails later.

* fix(cron): fail closed on webhook mode without URL and make failureDestination fields clearable

- sendCronFailureAlert: fail closed when mode is webhook but URL is missing
- mergeCronDelivery: use per-key presence checks so callers can clear
  nested failureDestination fields via cron.update

Note: protocol:check shows missing internalEvents in Swift models - this is
a pre-existing issue unrelated to these changes (upstream sync needed).

* fix(cron): use separate schema for failureDestination and fix type cast

- Create CronFailureDestinationSchema excluding after/cooldownMs fields
- Fix type cast in sendFailureNotificationAnnounce to use CronMessageChannel

* fix(cron): merge global failureDestination with partial job overrides

When job has partial failureDestination config, fall back to global
config for unset fields instead of treating it as a full override.

* fix(cron): avoid forcing announce mode and clear inherited to on mode change

- UI: only include mode in patch if explicitly set to non-default
- delivery.ts: clear inherited 'to' when job overrides mode, since URL
  semantics differ between announce and webhook modes

* fix(cron): preserve explicit to on mode override and always include mode in UI patches

- delivery.ts: preserve job-level explicit 'to' when overriding mode
- UI: always include mode in failureAlert patch so users can switch between announce/webhook

* fix(cron): allow clearing accountId and treat undefined global mode as announce

- UI: always include accountId in patch so users can clear it
- delivery.ts: treat undefined global mode as announce when comparing for clearing inherited 'to'

* Cron: harden failure destination routing and add regression coverage

* Cron: resolve failure destination review feedback

* Cron: drop unrelated timeout assertions from conflict resolution

* Cron: format cron CLI regression test

* Cron: align gateway cron test mock types

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
Evgeny Zislis
2026-03-02 17:27:41 +02:00
committed by GitHub
parent a905b6dabc
commit 4b4ea5df8b
22 changed files with 993 additions and 89 deletions

View File

@@ -202,6 +202,7 @@ Docs: https://docs.openclaw.ai
- Discord/Application ID fallback: parse bot application IDs from token prefixes without numeric precision loss and use token fallback only on transport/timeout failures when probing `/oauth2/applications/@me`. Landed from contributor PR #29695 by @dhananjai1729. Thanks @dhananjai1729.
- Discord/EventQueue timeout config: expose per-account `channels.discord.accounts.<id>.eventQueue.listenerTimeout` (and related queue options) so long-running handlers can avoid Carbon listener timeout drops. Landed from contributor PR #28945 by @Glucksberg. Thanks @Glucksberg.
- CLI/Cron run exit code: return exit code `0` only when `cron run` reports `{ ok: true, ran: true }`, and `1` for non-run/error outcomes so scripting/debugging reflects actual execution status. Landed from contributor PR #31121 by @Sid-Qin. Thanks @Sid-Qin.
- Cron/Failure delivery routing: add `failureAlert.mode` (`announce|webhook`) and `failureAlert.accountId` support, plus `cron.failureDestination` and per-job `delivery.failureDestination` routing with duplicate-target suppression, best-effort skip behavior, and global+job merge semantics. Landed from contributor PR #31059 by @kesor. Thanks @kesor.
- CLI/JSON preflight output: keep `--json` command stdout machine-readable by suppressing doctor preflight note output while still running legacy migration/config doctor flow. (#24368) Thanks @altaywtf.
- Nodes/Screen recording guardrails: cap `nodes` tool `screen_record` `durationMs` to 5 minutes at both schema-validation and runtime invocation layers to prevent long-running blocking captures from unbounded durations. Landed from contributor PR #31106 by @BlueBirdBack. Thanks @BlueBirdBack.
- Telegram/Empty final replies: skip outbound send for null/undefined final text payloads without media so Telegram typing indicators do not linger on `text must be non-empty` errors, with added regression coverage for undefined final payload dispatch. Landed from contributor PRs #30969 by @haosenwang1018 and #30746 by @rylena. Thanks @haosenwang1018 and @rylena.

View File

@@ -679,19 +679,39 @@ describe("cron cli", () => {
expect(patch?.patch?.failureAlert).toBe(false);
});
it("uses a longer default timeout for cron run", async () => {
const { runOpts } = await runCronRunAndCaptureExit({
ran: true,
args: ["cron", "run", "job-1", "--expect-final"],
});
expect(runOpts.timeout).toBe("600000");
});
it("patches failure alert mode/accountId on cron edit", async () => {
callGatewayFromCli.mockClear();
it("preserves explicit --timeout for cron run", async () => {
const { runOpts } = await runCronRunAndCaptureExit({
ran: true,
args: ["cron", "run", "job-1", "--expect-final", "--timeout", "45000"],
});
expect(runOpts.timeout).toBe("45000");
const program = buildProgram();
await program.parseAsync(
[
"cron",
"edit",
"job-1",
"--failure-alert-after",
"1",
"--failure-alert-mode",
"webhook",
"--failure-alert-account-id",
"bot-a",
],
{ from: "user" },
);
const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update");
const patch = updateCall?.[2] as {
patch?: {
failureAlert?: {
after?: number;
mode?: "announce" | "webhook";
accountId?: string;
};
};
};
expect(patch?.patch?.failureAlert?.after).toBe(1);
expect(patch?.patch?.failureAlert?.mode).toBe("webhook");
expect(patch?.patch?.failureAlert?.accountId).toBe("bot-a");
});
});

View File

@@ -73,6 +73,11 @@ export function registerCronEditCommand(cron: Command) {
)
.option("--failure-alert-to <dest>", "Failure alert destination")
.option("--failure-alert-cooldown <duration>", "Minimum time between alerts (e.g. 1h, 30m)")
.option("--failure-alert-mode <mode>", "Failure alert delivery mode (announce or webhook)")
.option(
"--failure-alert-account-id <id>",
"Account ID for failure alert channel (multi-account setups)",
)
.action(async (id, opts) => {
try {
if (opts.session === "main" && opts.message) {
@@ -286,11 +291,15 @@ export function registerCronEditCommand(cron: Command) {
const hasFailureAlertChannel = typeof opts.failureAlertChannel === "string";
const hasFailureAlertTo = typeof opts.failureAlertTo === "string";
const hasFailureAlertCooldown = typeof opts.failureAlertCooldown === "string";
const hasFailureAlertMode = typeof opts.failureAlertMode === "string";
const hasFailureAlertAccountId = typeof opts.failureAlertAccountId === "string";
const hasFailureAlertFields =
hasFailureAlertAfter ||
hasFailureAlertChannel ||
hasFailureAlertTo ||
hasFailureAlertCooldown;
hasFailureAlertCooldown ||
hasFailureAlertMode ||
hasFailureAlertAccountId;
const failureAlertFlag =
typeof opts.failureAlert === "boolean" ? opts.failureAlert : undefined;
if (failureAlertFlag === false && hasFailureAlertFields) {
@@ -322,6 +331,17 @@ export function registerCronEditCommand(cron: Command) {
}
failureAlert.cooldownMs = cooldownMs;
}
if (hasFailureAlertMode) {
const mode = String(opts.failureAlertMode).trim().toLowerCase();
if (mode !== "announce" && mode !== "webhook") {
throw new Error("Invalid --failure-alert-mode (must be 'announce' or 'webhook').");
}
failureAlert.mode = mode;
}
if (hasFailureAlertAccountId) {
const accountId = String(opts.failureAlertAccountId).trim();
failureAlert.accountId = accountId ? accountId : undefined;
}
patch.failureAlert = failureAlert;
}

View File

@@ -14,6 +14,15 @@ export type CronFailureAlertConfig = {
enabled?: boolean;
after?: number;
cooldownMs?: number;
mode?: "announce" | "webhook";
accountId?: string;
};
export type CronFailureDestinationConfig = {
channel?: string;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
export type CronConfig = {
@@ -44,4 +53,6 @@ export type CronConfig = {
keepLines?: number;
};
failureAlert?: CronFailureAlertConfig;
/** Default destination for failure notifications across all cron jobs. */
failureDestination?: CronFailureDestinationConfig;
};

View File

@@ -417,6 +417,17 @@ export const OpenClawSchema = z
enabled: z.boolean().optional(),
after: z.number().int().min(1).optional(),
cooldownMs: z.number().int().min(0).optional(),
mode: z.enum(["announce", "webhook"]).optional(),
accountId: z.string().optional(),
})
.strict()
.optional(),
failureDestination: z
.object({
channel: z.string().optional(),
to: z.string().optional(),
accountId: z.string().optional(),
mode: z.enum(["announce", "webhook"]).optional(),
})
.strict()
.optional(),

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { resolveCronDeliveryPlan } from "./delivery.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js";
import type { CronJob } from "./types.js";
function makeJob(overrides: Partial<CronJob>): CronJob {
@@ -85,3 +85,96 @@ describe("resolveCronDeliveryPlan", () => {
expect(plan.accountId).toBe("bot-a");
});
});
describe("resolveFailureDestination", () => {
it("merges global defaults with job-level overrides", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: { channel: "signal", mode: "announce" },
},
}),
{
channel: "telegram",
to: "222",
mode: "announce",
accountId: "global-account",
},
);
expect(plan).toEqual({
mode: "announce",
channel: "signal",
to: "222",
accountId: "global-account",
});
});
it("returns null for webhook mode without destination URL", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: { mode: "webhook" },
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("returns null when failure destination matches primary delivery target", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
accountId: "bot-a",
failureDestination: {
mode: "announce",
channel: "telegram",
to: "111",
accountId: "bot-a",
},
},
}),
undefined,
);
expect(plan).toBeNull();
});
it("allows job-level failure destination fields to clear inherited global values", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "111",
failureDestination: {
mode: "announce",
channel: undefined as never,
to: undefined as never,
accountId: undefined as never,
},
},
}),
{
channel: "signal",
to: "group-abc",
accountId: "global-account",
mode: "announce",
},
);
expect(plan).toEqual({
mode: "announce",
channel: "last",
to: undefined,
accountId: undefined,
});
});
});

View File

@@ -1,4 +1,14 @@
import type { CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import type { OpenClawConfig } from "../config/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { getChildLogger } from "../logging.js";
import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
export type CronDeliveryPlan = {
mode: CronDeliveryMode;
@@ -90,3 +100,202 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
requested,
};
}
export type CronFailureDeliveryPlan = {
mode: "announce" | "webhook";
channel?: CronMessageChannel;
to?: string;
accountId?: string;
};
export type CronFailureDestinationInput = {
channel?: CronMessageChannel;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
function normalizeFailureMode(value: unknown): "announce" | "webhook" | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim().toLowerCase();
if (trimmed === "announce" || trimmed === "webhook") {
return trimmed;
}
return undefined;
}
export function resolveFailureDestination(
job: CronJob,
globalConfig?: CronFailureDestinationConfig,
): CronFailureDeliveryPlan | null {
const delivery = job.delivery;
const jobFailureDest = delivery?.failureDestination as CronFailureDestinationInput | undefined;
const hasJobFailureDest = jobFailureDest && typeof jobFailureDest === "object";
let channel: CronMessageChannel | undefined;
let to: string | undefined;
let accountId: string | undefined;
let mode: "announce" | "webhook" | undefined;
// Start with global config as base
if (globalConfig) {
channel = normalizeChannel(globalConfig.channel);
to = normalizeTo(globalConfig.to);
accountId = normalizeAccountId(globalConfig.accountId);
mode = normalizeFailureMode(globalConfig.mode);
}
// Override with job-level values if present
if (hasJobFailureDest) {
const jobChannel = normalizeChannel(jobFailureDest.channel);
const jobTo = normalizeTo(jobFailureDest.to);
const jobAccountId = normalizeAccountId(jobFailureDest.accountId);
const jobMode = normalizeFailureMode(jobFailureDest.mode);
const hasJobChannelField = "channel" in jobFailureDest;
const hasJobToField = "to" in jobFailureDest;
const hasJobAccountIdField = "accountId" in jobFailureDest;
// Track if 'to' was explicitly set at job level
const jobToExplicitValue = hasJobToField && jobTo !== undefined;
// Respect explicit clears from partial patches.
if (hasJobChannelField) {
channel = jobChannel;
}
if (hasJobToField) {
to = jobTo;
}
if (hasJobAccountIdField) {
accountId = jobAccountId;
}
if (jobMode !== undefined) {
// Mode was explicitly overridden - clear inherited 'to' since URL semantics differ
// between announce (channel recipient) and webhook (HTTP endpoint)
// But preserve explicit 'to' that was set at job level
// Treat undefined global mode as "announce" for comparison
const globalMode = globalConfig?.mode ?? "announce";
if (!jobToExplicitValue && globalMode !== jobMode) {
to = undefined;
}
mode = jobMode;
}
}
if (!channel && !to && !accountId && !mode) {
return null;
}
const resolvedMode = mode ?? "announce";
// Webhook mode requires a URL
if (resolvedMode === "webhook" && !to) {
return null;
}
const result: CronFailureDeliveryPlan = {
mode: resolvedMode,
channel: resolvedMode === "announce" ? (channel ?? "last") : undefined,
to,
accountId,
};
if (delivery && isSameDeliveryTarget(delivery, result)) {
return null;
}
return result;
}
function isSameDeliveryTarget(
delivery: CronDelivery,
failurePlan: CronFailureDeliveryPlan,
): boolean {
const primaryMode = delivery.mode ?? "announce";
if (primaryMode === "none") {
return false;
}
const primaryChannel = delivery.channel;
const primaryTo = delivery.to;
const primaryAccountId = delivery.accountId;
if (failurePlan.mode === "webhook") {
return primaryMode === "webhook" && primaryTo === failurePlan.to;
}
const primaryChannelNormalized = primaryChannel ?? "last";
const failureChannelNormalized = failurePlan.channel ?? "last";
return (
failureChannelNormalized === primaryChannelNormalized &&
failurePlan.to === primaryTo &&
failurePlan.accountId === primaryAccountId
);
}
const FAILURE_NOTIFICATION_TIMEOUT_MS = 30_000;
const cronDeliveryLogger = getChildLogger({ subsystem: "cron-delivery" });
export async function sendFailureNotificationAnnounce(
deps: CliDeps,
cfg: OpenClawConfig,
agentId: string,
jobId: string,
target: { channel?: string; to?: string; accountId?: string },
message: string,
): Promise<void> {
const resolvedTarget = await resolveDeliveryTarget(cfg, agentId, {
channel: target.channel as CronMessageChannel | undefined,
to: target.to,
accountId: target.accountId,
});
if (!resolvedTarget.ok) {
cronDeliveryLogger.warn(
{ error: resolvedTarget.error.message },
"cron: failed to resolve failure destination target",
);
return;
}
const identity = resolveAgentOutboundIdentity(cfg, agentId);
const session = buildOutboundSessionContext({
cfg,
agentId,
sessionKey: `cron:${jobId}:failure`,
});
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, FAILURE_NOTIFICATION_TIMEOUT_MS);
try {
await deliverOutboundPayloads({
cfg,
channel: resolvedTarget.channel,
to: resolvedTarget.to,
accountId: resolvedTarget.accountId,
threadId: resolvedTarget.threadId,
payloads: [{ text: message }],
session,
identity,
bestEffort: false,
deps: createOutboundSendDeps(deps),
abortSignal: abortController.signal,
});
} catch (err) {
cronDeliveryLogger.warn(
{
err: formatErrorMessage(err),
channel: resolvedTarget.channel,
to: resolvedTarget.to,
},
"cron: failure destination announce failed",
);
} finally {
clearTimeout(timeout);
}
}

View File

@@ -195,4 +195,72 @@ describe("CronService failure alerts", () => {
cron.stop();
await store.cleanup();
});
it("threads failure alert mode/accountId and skips best-effort jobs", async () => {
const store = await makeStorePath();
const sendCronFailureAlert = vi.fn(async () => undefined);
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
error: "temporary upstream error",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
cronConfig: {
failureAlert: {
enabled: true,
after: 1,
mode: "webhook",
accountId: "global-account",
},
},
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
sendCronFailureAlert,
});
await cron.start();
const normalJob = await cron.add({
name: "normal alert job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
});
const bestEffortJob = await cron.add({
name: "best effort alert job",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "run report" },
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
bestEffort: true,
},
});
await cron.run(normalJob.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
expect(sendCronFailureAlert).toHaveBeenCalledWith(
expect.objectContaining({
mode: "webhook",
accountId: "global-account",
to: undefined,
}),
);
await cron.run(bestEffortJob.id, "force");
expect(sendCronFailureAlert).toHaveBeenCalledTimes(1);
cron.stop();
await store.cleanup();
});
});

View File

@@ -222,6 +222,51 @@ describe("applyJobPatch", () => {
expect(job.delivery).toEqual({ mode: "webhook", to: "https://example.invalid/trim" });
});
it("rejects failureDestination on main jobs without webhook delivery mode", () => {
const job = createMainSystemEventJob("job-main-failure-dest", {
mode: "announce",
channel: "telegram",
to: "123",
failureDestination: {
mode: "announce",
channel: "telegram",
to: "999",
},
});
expect(() => applyJobPatch(job, { enabled: true })).toThrow(
'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"',
);
});
it("validates and trims webhook failureDestination target URLs", () => {
const expectedError =
"cron failure destination webhook requires delivery.failureDestination.to to be a valid http(s) URL";
const job = createIsolatedAgentTurnJob("job-failure-webhook-target", {
mode: "announce",
channel: "telegram",
to: "123",
failureDestination: {
mode: "webhook",
to: "not-a-url",
},
});
expect(() => applyJobPatch(job, { enabled: true })).toThrow(expectedError);
job.delivery = {
mode: "announce",
channel: "telegram",
to: "123",
failureDestination: {
mode: "webhook",
to: " https://example.invalid/failure ",
},
};
expect(() => applyJobPatch(job, { enabled: true })).not.toThrow();
expect(job.delivery?.failureDestination?.to).toBe("https://example.invalid/failure");
});
it("rejects Telegram delivery with invalid target (chatId/topicId format)", () => {
const job = createIsolatedAgentTurnJob("job-telegram-invalid", {
mode: "announce",
@@ -365,6 +410,25 @@ describe("createJob rejects sessionTarget main for non-default agents", () => {
}),
).not.toThrow();
});
it("rejects failureDestination on main jobs without webhook delivery mode", () => {
const state = createMockState(now, { defaultAgentId: "main" });
expect(() =>
createJob(state, {
...mainJobInput("main"),
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
failureDestination: {
mode: "announce",
channel: "signal",
to: "+15550001111",
},
},
}),
).toThrow('cron channel delivery config is only supported for sessionTarget="isolated"');
});
});
describe("applyJobPatch rejects sessionTarget main for non-default agents", () => {

View File

@@ -151,6 +151,27 @@ function assertDeliverySupport(job: Pick<CronJob, "sessionTarget" | "delivery">)
}
}
function assertFailureDestinationSupport(job: Pick<CronJob, "sessionTarget" | "delivery">) {
const failureDestination = job.delivery?.failureDestination;
if (!failureDestination) {
return;
}
if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") {
throw new Error(
'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"',
);
}
if (failureDestination.mode === "webhook") {
const target = normalizeHttpWebhookUrl(failureDestination.to);
if (!target) {
throw new Error(
"cron failure destination webhook requires delivery.failureDestination.to to be a valid http(s) URL",
);
}
failureDestination.to = target;
}
}
export function findJobOrThrow(state: CronServiceState, id: string) {
const job = state.store?.jobs.find((j) => j.id === id);
if (!job) {
@@ -452,6 +473,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
assertSupportedJobSpec(job);
assertMainSessionAgentId(job, state.deps.defaultAgentId);
assertDeliverySupport(job);
assertFailureDestinationSupport(job);
job.state.nextRunAtMs = computeJobNextRunAtMs(job, now);
return job;
}
@@ -517,6 +539,15 @@ export function applyJobPatch(
if ("failureAlert" in patch) {
job.failureAlert = mergeCronFailureAlert(job.failureAlert, patch.failureAlert);
}
if (
job.sessionTarget === "main" &&
job.delivery?.mode !== "webhook" &&
job.delivery?.failureDestination
) {
throw new Error(
'cron delivery.failureDestination is only supported for sessionTarget="isolated" unless delivery.mode="webhook"',
);
}
if (job.sessionTarget === "main" && job.delivery?.mode !== "webhook") {
job.delivery = undefined;
}
@@ -532,6 +563,7 @@ export function applyJobPatch(
assertSupportedJobSpec(job);
assertMainSessionAgentId(job, opts?.defaultAgentId);
assertDeliverySupport(job);
assertFailureDestinationSupport(job);
}
function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronPayload {
@@ -668,6 +700,7 @@ function mergeCronDelivery(
to: existing?.to,
accountId: existing?.accountId,
bestEffort: existing?.bestEffort,
failureDestination: existing?.failureDestination,
};
if (typeof patch.mode === "string") {
@@ -685,6 +718,39 @@ function mergeCronDelivery(
if (typeof patch.bestEffort === "boolean") {
next.bestEffort = patch.bestEffort;
}
if ("failureDestination" in patch) {
if (patch.failureDestination === undefined) {
next.failureDestination = undefined;
} else {
const existingFd = next.failureDestination;
const patchFd = patch.failureDestination;
const nextFd: typeof next.failureDestination = {
channel: existingFd?.channel,
to: existingFd?.to,
accountId: existingFd?.accountId,
mode: existingFd?.mode,
};
if (patchFd) {
if ("channel" in patchFd) {
const channel = typeof patchFd.channel === "string" ? patchFd.channel.trim() : "";
nextFd.channel = channel ? channel : undefined;
}
if ("to" in patchFd) {
const to = typeof patchFd.to === "string" ? patchFd.to.trim() : "";
nextFd.to = to ? to : undefined;
}
if ("accountId" in patchFd) {
const accountId = typeof patchFd.accountId === "string" ? patchFd.accountId.trim() : "";
nextFd.accountId = accountId ? accountId : undefined;
}
if ("mode" in patchFd) {
const mode = typeof patchFd.mode === "string" ? patchFd.mode.trim() : "";
nextFd.mode = mode === "announce" || mode === "webhook" ? mode : undefined;
}
}
next.failureDestination = nextFd;
}
}
return next;
}
@@ -719,6 +785,14 @@ function mergeCronFailureAlert(
: -1;
next.cooldownMs = cooldownMs >= 0 ? Math.floor(cooldownMs) : undefined;
}
if ("mode" in patch) {
const mode = typeof patch.mode === "string" ? patch.mode.trim() : "";
next.mode = mode === "announce" || mode === "webhook" ? mode : undefined;
}
if ("accountId" in patch) {
const accountId = typeof patch.accountId === "string" ? patch.accountId.trim() : "";
next.accountId = accountId ? accountId : undefined;
}
return next;
}

View File

@@ -96,6 +96,8 @@ export type CronServiceDeps = {
text: string;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
}) => Promise<void>;
onEvent?: (evt: CronEvent) => void;
};

View File

@@ -188,7 +188,14 @@ function clampNonNegativeInt(value: unknown, fallback: number): number {
function resolveFailureAlert(
state: CronServiceState,
job: CronJob,
): { after: number; cooldownMs: number; channel: CronMessageChannel; to?: string } | null {
): {
after: number;
cooldownMs: number;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
} | null {
const globalConfig = state.deps.cronConfig?.failureAlert;
const jobConfig = job.failureAlert === false ? undefined : job.failureAlert;
@@ -199,6 +206,9 @@ function resolveFailureAlert(
return null;
}
const mode = jobConfig?.mode ?? globalConfig?.mode;
const explicitTo = normalizeTo(jobConfig?.to);
return {
after: clampPositiveInt(jobConfig?.after ?? globalConfig?.after, DEFAULT_FAILURE_ALERT_AFTER),
cooldownMs: clampNonNegativeInt(
@@ -209,7 +219,9 @@ function resolveFailureAlert(
normalizeCronMessageChannel(jobConfig?.channel) ??
normalizeCronMessageChannel(job.delivery?.channel) ??
"last",
to: normalizeTo(jobConfig?.to) ?? normalizeTo(job.delivery?.to),
to: mode === "webhook" ? explicitTo : (explicitTo ?? normalizeTo(job.delivery?.to)),
mode,
accountId: jobConfig?.accountId ?? globalConfig?.accountId,
};
}
@@ -221,6 +233,8 @@ function emitFailureAlert(
consecutiveErrors: number;
channel: CronMessageChannel;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
},
) {
const safeJobName = params.job.name || params.job.id;
@@ -237,6 +251,8 @@ function emitFailureAlert(
text,
channel: params.channel,
to: params.to,
mode: params.mode,
accountId: params.accountId,
})
.catch((err) => {
state.deps.log.warn(
@@ -287,19 +303,26 @@ export function applyJobResult(
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
const alertConfig = resolveFailureAlert(state, job);
if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) {
const now = state.deps.nowMs();
const lastAlert = job.state.lastFailureAlertAtMs;
const inCooldown =
typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs);
if (!inCooldown) {
emitFailureAlert(state, {
job,
error: result.error,
consecutiveErrors: job.state.consecutiveErrors,
channel: alertConfig.channel,
to: alertConfig.to,
});
job.state.lastFailureAlertAtMs = now;
const isBestEffort =
job.delivery?.bestEffort === true ||
(job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true);
if (!isBestEffort) {
const now = state.deps.nowMs();
const lastAlert = job.state.lastFailureAlertAtMs;
const inCooldown =
typeof lastAlert === "number" && now - lastAlert < Math.max(0, alertConfig.cooldownMs);
if (!inCooldown) {
emitFailureAlert(state, {
job,
error: result.error,
consecutiveErrors: job.state.consecutiveErrors,
channel: alertConfig.channel,
to: alertConfig.to,
mode: alertConfig.mode,
accountId: alertConfig.accountId,
});
job.state.lastFailureAlertAtMs = now;
}
}
}
} else {

View File

@@ -25,6 +25,15 @@ export type CronDelivery = {
/** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */
accountId?: string;
bestEffort?: boolean;
/** Separate destination for failure notifications. */
failureDestination?: CronFailureDestination;
};
export type CronFailureDestination = {
channel?: CronMessageChannel;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
export type CronDeliveryPatch = Partial<CronDelivery>;
@@ -61,6 +70,10 @@ export type CronFailureAlert = {
channel?: CronMessageChannel;
to?: string;
cooldownMs?: number;
/** Delivery mode: announce (via messaging channels) or webhook (HTTP POST). */
mode?: "announce" | "webhook";
/** Account ID for multi-account channel configurations. */
accountId?: string;
};
export type CronPayload =

View File

@@ -138,10 +138,33 @@ export const CronPayloadPatchSchema = Type.Union([
cronAgentTurnPayloadSchema({ message: Type.Optional(NonEmptyString) }),
]);
export const CronFailureAlertSchema = Type.Object(
{
after: Type.Optional(Type.Integer({ minimum: 1 })),
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
to: Type.Optional(Type.String()),
cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })),
mode: Type.Optional(Type.Union([Type.Literal("announce"), Type.Literal("webhook")])),
accountId: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const CronFailureDestinationSchema = Type.Object(
{
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
to: Type.Optional(Type.String()),
accountId: Type.Optional(NonEmptyString),
mode: Type.Optional(Type.Union([Type.Literal("announce"), Type.Literal("webhook")])),
},
{ additionalProperties: false },
);
const CronDeliverySharedProperties = {
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
accountId: Type.Optional(NonEmptyString),
bestEffort: Type.Optional(Type.Boolean()),
failureDestination: Type.Optional(CronFailureDestinationSchema),
};
const CronDeliveryNoopSchema = Type.Object(
@@ -188,16 +211,6 @@ export const CronDeliveryPatchSchema = Type.Object(
{ additionalProperties: false },
);
export const CronFailureAlertSchema = Type.Object(
{
after: Type.Optional(Type.Integer({ minimum: 1 })),
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
to: Type.Optional(Type.String()),
cooldownMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const CronJobStateSchema = Type.Object(
{
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),

View File

@@ -8,6 +8,7 @@ import {
resolveAgentMainSessionKey,
} from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { resolveFailureDestination, sendFailureNotificationAnnounce } from "../cron/delivery.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { resolveDeliveryTarget } from "../cron/isolated-agent/delivery-target.js";
import {
@@ -72,6 +73,66 @@ function resolveCronWebhookTarget(params: {
return null;
}
function buildCronWebhookHeaders(webhookToken?: string): Record<string, string> {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
return headers;
}
async function postCronWebhook(params: {
webhookUrl: string;
webhookToken?: string;
payload: unknown;
logContext: Record<string, unknown>;
blockedLog: string;
failedLog: string;
logger: ReturnType<typeof getChildLogger>;
}): Promise<void> {
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
try {
const result = await fetchWithSsrFGuard({
url: params.webhookUrl,
init: {
method: "POST",
headers: buildCronWebhookHeaders(params.webhookToken),
body: JSON.stringify(params.payload),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
params.logger.warn(
{
...params.logContext,
reason: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.blockedLog,
);
} else {
params.logger.warn(
{
...params.logContext,
err: formatErrorMessage(err),
webhookUrl: redactWebhookUrl(params.webhookUrl),
},
params.failedLog,
);
}
} finally {
clearTimeout(timeout);
}
}
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
@@ -226,11 +287,51 @@ export function buildGatewayCronService(params: {
lane: "cron",
});
},
sendCronFailureAlert: async ({ job, text, channel, to }) => {
sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
const webhookToken = params.cfg.cron?.webhookToken?.trim();
// Webhook mode requires a URL - fail closed if missing
if (mode === "webhook" && !to) {
cronLogger.warn(
{ jobId: job.id },
"cron: failure alert webhook mode requires URL, skipping",
);
return;
}
if (mode === "webhook" && to) {
const webhookUrl = normalizeHttpWebhookUrl(to);
if (webhookUrl) {
await postCronWebhook({
webhookUrl,
webhookToken,
payload: {
jobId: job.id,
jobName: job.name,
message: text,
},
logContext: { jobId: job.id },
blockedLog: "cron: failure alert webhook blocked by SSRF guard",
failedLog: "cron: failure alert webhook failed",
logger: cronLogger,
});
} else {
cronLogger.warn(
{
jobId: job.id,
webhookUrl: redactWebhookUrl(to),
},
"cron: failure alert webhook URL is invalid, skipping",
);
}
return;
}
const target = await resolveDeliveryTarget(runtimeConfig, agentId, {
channel,
to,
accountId,
});
if (!target.ok) {
throw target.error;
@@ -284,54 +385,81 @@ export function buildGatewayCronService(params: {
}
if (webhookTarget && evt.summary) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
void (async () => {
try {
const result = await fetchWithSsrFGuard({
url: webhookTarget.url,
init: {
method: "POST",
headers,
body: JSON.stringify(evt),
signal: abortController.signal,
},
});
await result.release();
} catch (err) {
if (err instanceof SsrFBlockedError) {
cronLogger.warn(
{
reason: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookTarget.url),
},
"cron: webhook delivery blocked by SSRF guard",
);
} else {
cronLogger.warn(
{
err: formatErrorMessage(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookTarget.url),
},
"cron: webhook delivery failed",
);
}
} finally {
clearTimeout(timeout);
}
await postCronWebhook({
webhookUrl: webhookTarget.url,
webhookToken,
payload: evt,
logContext: { jobId: evt.jobId },
blockedLog: "cron: webhook delivery blocked by SSRF guard",
failedLog: "cron: webhook delivery failed",
logger: cronLogger,
});
})();
}
if (evt.status === "error" && job) {
const failureDest = resolveFailureDestination(job, params.cfg.cron?.failureDestination);
if (failureDest) {
const isBestEffort =
job.delivery?.bestEffort === true ||
(job.payload.kind === "agentTurn" && job.payload.bestEffortDeliver === true);
if (!isBestEffort) {
const failureMessage = `Cron job "${job.name}" failed: ${evt.error ?? "unknown error"}`;
const failurePayload = {
jobId: job.id,
jobName: job.name,
message: failureMessage,
status: evt.status,
error: evt.error,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
};
if (failureDest.mode === "webhook" && failureDest.to) {
const webhookUrl = normalizeHttpWebhookUrl(failureDest.to);
if (webhookUrl) {
void (async () => {
await postCronWebhook({
webhookUrl,
webhookToken,
payload: failurePayload,
logContext: { jobId: evt.jobId },
blockedLog: "cron: failure destination webhook blocked by SSRF guard",
failedLog: "cron: failure destination webhook failed",
logger: cronLogger,
});
})();
} else {
cronLogger.warn(
{
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(failureDest.to),
},
"cron: failure destination webhook URL is invalid, skipping",
);
}
} else if (failureDest.mode === "announce") {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
void sendFailureNotificationAnnounce(
params.deps,
runtimeConfig,
agentId,
job.id,
{
channel: failureDest.channel,
to: failureDest.to,
accountId: failureDest.accountId,
},
`[Cron Failure] ${failureMessage}`,
);
}
}
}
}
const logPath = resolveCronRunLogPath({
storePath,
jobId: evt.jobId,

View File

@@ -644,6 +644,58 @@ describe("gateway server cron", () => {
await yieldToEventLoop();
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2);
fetchWithSsrFGuardMock.mockClear();
cronIsolatedRun.mockResolvedValueOnce({ status: "error", summary: "delivery failed" });
const failureDestRes = await rpcReq(ws, "cron.add", {
name: "failure destination webhook",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "test" },
delivery: {
mode: "announce",
channel: "telegram",
to: "19098680",
failureDestination: {
mode: "webhook",
to: "https://example.invalid/failure-destination",
},
},
});
expect(failureDestRes.ok).toBe(true);
const failureDestJobIdValue = (failureDestRes.payload as { id?: unknown } | null)?.id;
const failureDestJobId =
typeof failureDestJobIdValue === "string" ? failureDestJobIdValue : "";
expect(failureDestJobId.length > 0).toBe(true);
const failureDestRunRes = await rpcReq(
ws,
"cron.run",
{ id: failureDestJobId, mode: "force" },
20_000,
);
expect(failureDestRunRes.ok).toBe(true);
await waitForCondition(
() => fetchWithSsrFGuardMock.mock.calls.length === 1,
CRON_WAIT_TIMEOUT_MS,
);
const [failureDestArgs] = fetchWithSsrFGuardMock.mock.calls[0] as unknown as [
{
url?: string;
init?: {
method?: string;
headers?: Record<string, string>;
body?: string;
};
},
];
expect(failureDestArgs.url).toBe("https://example.invalid/failure-destination");
const failureDestBody = JSON.parse(failureDestArgs.init?.body ?? "{}");
expect(failureDestBody.message).toBe(
'Cron job "failure destination webhook" failed: unknown error',
);
cronIsolatedRun.mockResolvedValueOnce({ status: "ok", summary: "" });
const noSummaryRes = await rpcReq(ws, "cron.add", {
name: "webhook no summary",
@@ -668,7 +720,7 @@ describe("gateway server cron", () => {
expect(noSummaryRunRes.ok).toBe(true);
await yieldToEventLoop();
await yieldToEventLoop();
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(2);
expect(fetchWithSsrFGuardMock).toHaveBeenCalledTimes(1);
} finally {
await cleanupCronTestRun({ ws, server, dir, prevSkipCron });
}

View File

@@ -44,5 +44,7 @@ export const DEFAULT_CRON_FORM: CronFormState = {
failureAlertCooldownSeconds: "3600",
failureAlertChannel: "last",
failureAlertTo: "",
failureAlertDeliveryMode: "announce",
failureAlertAccountId: "",
timeoutSeconds: "",
};

View File

@@ -630,6 +630,52 @@ describe("cron controller", () => {
cooldownMs: 120_000,
channel: "telegram",
to: "123456",
mode: "announce",
accountId: undefined,
},
},
});
});
it("includes failure alert mode/accountId in cron.update patch", async () => {
const request = vi.fn(async (method: string, _payload?: unknown) => {
if (method === "cron.update") {
return { id: "job-alert-mode" };
}
if (method === "cron.list") {
return { jobs: [{ id: "job-alert-mode" }] };
}
if (method === "cron.status") {
return { enabled: true, jobs: 1, nextWakeAtMs: null };
}
return {};
});
const state = createState({
client: { request } as unknown as CronState["client"],
cronEditingJobId: "job-alert-mode",
cronForm: {
...DEFAULT_CRON_FORM,
name: "alert mode job",
payloadKind: "agentTurn",
payloadText: "run it",
failureAlertMode: "custom",
failureAlertAfter: "1",
failureAlertDeliveryMode: "webhook",
failureAlertAccountId: "bot-a",
},
});
await addCronJob(state);
const updateCall = request.mock.calls.find(([method]) => method === "cron.update");
expect(updateCall).toBeDefined();
expect(updateCall?.[1]).toMatchObject({
id: "job-alert-mode",
patch: {
failureAlert: {
after: 1,
mode: "webhook",
accountId: "bot-a",
},
},
});
@@ -780,6 +826,8 @@ describe("cron controller", () => {
expect(state.cronForm.failureAlertCooldownSeconds).toBe("30");
expect(state.cronForm.failureAlertChannel).toBe("telegram");
expect(state.cronForm.failureAlertTo).toBe("999");
expect(state.cronForm.failureAlertDeliveryMode).toBe("announce");
expect(state.cronForm.failureAlertAccountId).toBe("");
});
it("validates key cron form errors", () => {

View File

@@ -481,6 +481,12 @@ function jobToForm(job: CronJob, prev: CronFormState): CronFormState {
? (failureAlert.channel ?? CRON_CHANNEL_LAST)
: CRON_CHANNEL_LAST,
failureAlertTo: failureAlert && typeof failureAlert === "object" ? (failureAlert.to ?? "") : "",
failureAlertDeliveryMode:
failureAlert && typeof failureAlert === "object"
? (failureAlert.mode ?? "announce")
: "announce",
failureAlertAccountId:
failureAlert && typeof failureAlert === "object" ? (failureAlert.accountId ?? "") : "",
timeoutSeconds:
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
? String(job.payload.timeoutSeconds)
@@ -593,12 +599,21 @@ function buildFailureAlert(form: CronFormState) {
cooldownSeconds !== undefined && Number.isFinite(cooldownSeconds) && cooldownSeconds >= 0
? Math.floor(cooldownSeconds * 1000)
: undefined;
return {
const deliveryMode = form.failureAlertDeliveryMode;
const accountId = form.failureAlertAccountId.trim();
const patch: Record<string, unknown> = {
after: after > 0 ? Math.floor(after) : undefined,
channel: form.failureAlertChannel.trim() || CRON_CHANNEL_LAST,
to: form.failureAlertTo.trim() || undefined,
...(cooldownMs !== undefined ? { cooldownMs } : {}),
};
// Always include mode and accountId so users can switch/clear them
if (deliveryMode) {
patch.mode = deliveryMode;
}
// Include accountId if explicitly set, or send undefined to allow clearing
patch.accountId = accountId || undefined;
return patch;
}
export async function addCronJob(state: CronState) {

View File

@@ -479,6 +479,14 @@ export type CronDelivery = {
to?: string;
accountId?: string;
bestEffort?: boolean;
failureDestination?: CronFailureDestination;
};
export type CronFailureDestination = {
channel?: string;
to?: string;
mode?: "announce" | "webhook";
accountId?: string;
};
export type CronFailureAlert = {
@@ -486,6 +494,8 @@ export type CronFailureAlert = {
channel?: string;
to?: string;
cooldownMs?: number;
mode?: "announce" | "webhook";
accountId?: string;
};
export type CronJobState = {

View File

@@ -48,5 +48,7 @@ export type CronFormState = {
failureAlertCooldownSeconds: string;
failureAlertChannel: string;
failureAlertTo: string;
failureAlertDeliveryMode: "announce" | "webhook";
failureAlertAccountId: string;
timeoutSeconds: string;
};

View File

@@ -1279,6 +1279,31 @@ export function renderCron(props: CronProps) {
Optional recipient override for failure alerts.
</div>
</label>
<label class="field">
${renderFieldLabel("Alert mode")}
<select
.value=${props.form.failureAlertDeliveryMode || "announce"}
@change=${(e: Event) =>
props.onFormChange({
failureAlertDeliveryMode: (e.target as HTMLSelectElement)
.value as CronFormState["failureAlertDeliveryMode"],
})}
>
<option value="announce">Announce (via channel)</option>
<option value="webhook">Webhook (HTTP POST)</option>
</select>
</label>
<label class="field">
${renderFieldLabel("Alert account ID")}
<input
.value=${props.form.failureAlertAccountId}
@input=${(e: Event) =>
props.onFormChange({
failureAlertAccountId: (e.target as HTMLInputElement).value,
})}
placeholder="Account ID for multi-account setups"
/>
</label>
`
: nothing
}