From 207e2c5affa9a747873f822850d0be308eb71c01 Mon Sep 17 00:00:00 2001 From: nabbilkhan Date: Fri, 13 Feb 2026 15:54:07 -0600 Subject: [PATCH] fix: add outbound delivery crash recovery (#15636) (thanks @nabbilkhan) (#15636) Co-authored-by: Shadow --- CHANGELOG.md | 1 + src/gateway/server.impl.ts | 12 + src/infra/outbound/deliver.test.ts | 67 ++++ src/infra/outbound/deliver.ts | 85 +++++ src/infra/outbound/delivery-queue.test.ts | 373 ++++++++++++++++++++++ src/infra/outbound/delivery-queue.ts | 328 +++++++++++++++++++ 6 files changed, 866 insertions(+) create mode 100644 src/infra/outbound/delivery-queue.test.ts create mode 100644 src/infra/outbound/delivery-queue.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c7252c469..b87bc0064 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai - macOS Voice Wake: fix a crash in trigger trimming for CJK/Unicode transcripts by matching and slicing on original-string ranges instead of transformed-string indices. (#11052) Thanks @Flash-LHR. - Heartbeat: prevent scheduler silent-death races during runner reloads, preserve retry cooldown backoff under wake bursts, and prioritize user/action wake causes over interval/retry reasons when coalescing. (#15108) Thanks @joeykrug. - Outbound targets: fail closed for WhatsApp/Twitch/Google Chat fallback paths so invalid or missing targets are dropped instead of rerouted, and align resolver hints with strict target requirements. (#13578) Thanks @mcaxtr. +- Outbound: add a write-ahead delivery queue with crash-recovery retries to prevent lost outbound messages after gateway restarts. (#15636) Thanks @nabbilkhan, @thewilloftheshadow. - Exec/Allowlist: allow multiline heredoc bodies (`<<`, `<<-`) while keeping multiline non-heredoc shell commands blocked, so exec approval parsing permits heredoc input safely without allowing general newline command chaining. (#13811) Thanks @mcaxtr. - Docs/Mermaid: remove hardcoded Mermaid init theme blocks from four docs diagrams so dark mode inherits readable theme defaults. (#15157) Thanks @heytulsiprasad. - Outbound/Threading: pass `replyTo` and `threadId` from `message send` tool actions through the core outbound send path to channel adapters, preserving thread/reply routing. (#14948) Thanks @mcaxtr. diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 5b422a2be..3146c0c6d 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -470,6 +470,18 @@ export async function startGatewayServer( void cron.start().catch((err) => logCron.error(`failed to start: ${String(err)}`)); + // Recover pending outbound deliveries from previous crash/restart. + void (async () => { + const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js"); + const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js"); + const logRecovery = log.child("delivery-recovery"); + await recoverPendingDeliveries({ + deliver: deliverOutboundPayloads, + log: logRecovery, + cfg: cfgAtStart, + }); + })().catch((err) => log.error(`Delivery recovery failed: ${String(err)}`)); + const execApprovalManager = new ExecApprovalManager(); const execApprovalForwarder = createExecApprovalForwarder(); const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, { diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 221050cc4..3247149be 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -20,6 +20,11 @@ const hookMocks = vi.hoisted(() => ({ runMessageSent: vi.fn(async () => {}), }, })); +const queueMocks = vi.hoisted(() => ({ + enqueueDelivery: vi.fn(async () => "mock-queue-id"), + ackDelivery: vi.fn(async () => {}), + failDelivery: vi.fn(async () => {}), +})); vi.mock("../../config/sessions.js", async () => { const actual = await vi.importActual( @@ -33,6 +38,11 @@ vi.mock("../../config/sessions.js", async () => { vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookMocks.runner, })); +vi.mock("./delivery-queue.js", () => ({ + enqueueDelivery: queueMocks.enqueueDelivery, + ackDelivery: queueMocks.ackDelivery, + failDelivery: queueMocks.failDelivery, +})); const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js"); @@ -43,6 +53,12 @@ describe("deliverOutboundPayloads", () => { hookMocks.runner.hasHooks.mockReturnValue(false); hookMocks.runner.runMessageSent.mockReset(); hookMocks.runner.runMessageSent.mockResolvedValue(undefined); + queueMocks.enqueueDelivery.mockReset(); + queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id"); + queueMocks.ackDelivery.mockReset(); + queueMocks.ackDelivery.mockResolvedValue(undefined); + queueMocks.failDelivery.mockReset(); + queueMocks.failDelivery.mockResolvedValue(undefined); }); afterEach(() => { @@ -389,6 +405,57 @@ describe("deliverOutboundPayloads", () => { expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]); }); + it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => { + const sendWhatsApp = vi + .fn() + .mockRejectedValueOnce(new Error("fail")) + .mockResolvedValueOnce({ messageId: "w2", toJid: "jid" }); + const onError = vi.fn(); + const cfg: OpenClawConfig = {}; + + await deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "a" }, { text: "b" }], + deps: { sendWhatsApp }, + bestEffort: true, + onError, + }); + + // onError was called for the first payload's failure. + expect(onError).toHaveBeenCalledTimes(1); + + // Queue entry should NOT be acked — failDelivery should be called instead. + expect(queueMocks.ackDelivery).not.toHaveBeenCalled(); + expect(queueMocks.failDelivery).toHaveBeenCalledWith( + "mock-queue-id", + "partial delivery failure (bestEffort)", + ); + }); + + it("acks the queue entry when delivery is aborted", async () => { + const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" }); + const abortController = new AbortController(); + abortController.abort(); + const cfg: OpenClawConfig = {}; + + await expect( + deliverOutboundPayloads({ + cfg, + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "a" }], + deps: { sendWhatsApp }, + abortSignal: abortController.signal, + }), + ).rejects.toThrow("Operation aborted"); + + expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id"); + expect(queueMocks.failDelivery).not.toHaveBeenCalled(); + expect(sendWhatsApp).not.toHaveBeenCalled(); + }); + it("passes normalized payload to onError", async () => { const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom")); const onError = vi.fn(); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 6460efc01..acbd49369 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -25,6 +25,7 @@ import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js"; import { sendMessageSignal } from "../../signal/send.js"; import { throwIfAborted } from "./abort.js"; +import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; export type { NormalizedOutboundPayload } from "./payloads.js"; @@ -178,6 +179,8 @@ function createPluginHandler(params: { }; } +const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError"; + export async function deliverOutboundPayloads(params: { cfg: OpenClawConfig; channel: Exclude; @@ -199,6 +202,88 @@ export async function deliverOutboundPayloads(params: { mediaUrls?: string[]; }; silent?: boolean; + /** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */ + skipQueue?: boolean; +}): Promise { + const { channel, to, payloads } = params; + + // Write-ahead delivery queue: persist before sending, remove after success. + const queueId = params.skipQueue + ? null + : await enqueueDelivery({ + channel, + to, + accountId: params.accountId, + payloads, + threadId: params.threadId, + replyToId: params.replyToId, + bestEffort: params.bestEffort, + gifPlayback: params.gifPlayback, + silent: params.silent, + mirror: params.mirror, + }).catch(() => null); // Best-effort — don't block delivery if queue write fails. + + // Wrap onError to detect partial failures under bestEffort mode. + // When bestEffort is true, per-payload errors are caught and passed to onError + // without throwing — so the outer try/catch never fires. We track whether any + // payload failed so we can call failDelivery instead of ackDelivery. + let hadPartialFailure = false; + const wrappedParams = params.onError + ? { + ...params, + onError: (err: unknown, payload: NormalizedOutboundPayload) => { + hadPartialFailure = true; + params.onError!(err, payload); + }, + } + : params; + + try { + const results = await deliverOutboundPayloadsCore(wrappedParams); + if (queueId) { + if (hadPartialFailure) { + await failDelivery(queueId, "partial delivery failure (bestEffort)").catch(() => {}); + } else { + await ackDelivery(queueId).catch(() => {}); // Best-effort cleanup. + } + } + return results; + } catch (err) { + if (queueId) { + if (isAbortError(err)) { + await ackDelivery(queueId).catch(() => {}); + } else { + await failDelivery(queueId, err instanceof Error ? err.message : String(err)).catch( + () => {}, + ); + } + } + throw err; + } +} + +/** Core delivery logic (extracted for queue wrapper). */ +async function deliverOutboundPayloadsCore(params: { + cfg: OpenClawConfig; + channel: Exclude; + to: string; + accountId?: string; + payloads: ReplyPayload[]; + replyToId?: string | null; + threadId?: string | number | null; + deps?: OutboundSendDeps; + gifPlayback?: boolean; + abortSignal?: AbortSignal; + bestEffort?: boolean; + onError?: (err: unknown, payload: NormalizedOutboundPayload) => void; + onPayload?: (payload: NormalizedOutboundPayload) => void; + mirror?: { + sessionKey: string; + agentId?: string; + text?: string; + mediaUrls?: string[]; + }; + silent?: boolean; }): Promise { const { cfg, channel, to, payloads } = params; const accountId = params.accountId; diff --git a/src/infra/outbound/delivery-queue.test.ts b/src/infra/outbound/delivery-queue.test.ts new file mode 100644 index 000000000..ee94d13b6 --- /dev/null +++ b/src/infra/outbound/delivery-queue.test.ts @@ -0,0 +1,373 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + ackDelivery, + computeBackoffMs, + enqueueDelivery, + failDelivery, + loadPendingDeliveries, + MAX_RETRIES, + moveToFailed, + recoverPendingDeliveries, +} from "./delivery-queue.js"; + +let tmpDir: string; + +beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-test-")); +}); + +afterEach(() => { + fs.rmSync(tmpDir, { recursive: true, force: true }); +}); + +describe("enqueue + ack lifecycle", () => { + it("creates and removes a queue entry", async () => { + const id = await enqueueDelivery( + { + channel: "whatsapp", + to: "+1555", + payloads: [{ text: "hello" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + }, + tmpDir, + ); + + // Entry file exists after enqueue. + const queueDir = path.join(tmpDir, "delivery-queue"); + const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); + expect(files).toHaveLength(1); + expect(files[0]).toBe(`${id}.json`); + + // Entry contents are correct. + const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8")); + expect(entry).toMatchObject({ + id, + channel: "whatsapp", + to: "+1555", + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "hello", + mediaUrls: ["https://example.com/file.png"], + }, + retryCount: 0, + }); + expect(entry.payloads).toEqual([{ text: "hello" }]); + + // Ack removes the file. + await ackDelivery(id, tmpDir); + const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json")); + expect(remaining).toHaveLength(0); + }); + + it("ack is idempotent (no error on missing file)", async () => { + await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined(); + }); +}); + +describe("failDelivery", () => { + it("increments retryCount and sets lastError", async () => { + const id = await enqueueDelivery( + { + channel: "telegram", + to: "123", + payloads: [{ text: "test" }], + }, + tmpDir, + ); + + await failDelivery(id, "connection refused", tmpDir); + + const queueDir = path.join(tmpDir, "delivery-queue"); + const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8")); + expect(entry.retryCount).toBe(1); + expect(entry.lastError).toBe("connection refused"); + }); +}); + +describe("moveToFailed", () => { + it("moves entry to failed/ subdirectory", async () => { + const id = await enqueueDelivery( + { + channel: "slack", + to: "#general", + payloads: [{ text: "hi" }], + }, + tmpDir, + ); + + await moveToFailed(id, tmpDir); + + const queueDir = path.join(tmpDir, "delivery-queue"); + const failedDir = path.join(queueDir, "failed"); + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + }); +}); + +describe("loadPendingDeliveries", () => { + it("returns empty array when queue directory does not exist", async () => { + const nonexistent = path.join(tmpDir, "no-such-dir"); + const entries = await loadPendingDeliveries(nonexistent); + expect(entries).toEqual([]); + }); + + it("loads multiple entries", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); + + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(2); + }); +}); + +describe("computeBackoffMs", () => { + it("returns 0 for retryCount 0", () => { + expect(computeBackoffMs(0)).toBe(0); + }); + + it("returns correct backoff for each retry", () => { + expect(computeBackoffMs(1)).toBe(5_000); + expect(computeBackoffMs(2)).toBe(25_000); + expect(computeBackoffMs(3)).toBe(120_000); + expect(computeBackoffMs(4)).toBe(600_000); + // Beyond defined schedule — clamps to last value. + expect(computeBackoffMs(5)).toBe(600_000); + }); +}); + +describe("recoverPendingDeliveries", () => { + const noopDelay = async () => {}; + const baseCfg = {}; + + it("recovers entries from a simulated crash", async () => { + // Manually create two queue entries as if gateway crashed before delivery. + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); + + const deliver = vi.fn().mockResolvedValue([]); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(deliver).toHaveBeenCalledTimes(2); + expect(result.recovered).toBe(2); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + + // Queue should be empty after recovery. + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(0); + }); + + it("moves entries that exceeded max retries to failed/", async () => { + // Create an entry and manually set retryCount to MAX_RETRIES. + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir, + ); + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); + entry.retryCount = MAX_RETRIES; + fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); + + const deliver = vi.fn(); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.skipped).toBe(1); + + // Entry should be in failed/ directory. + const failedDir = path.join(tmpDir, "delivery-queue", "failed"); + expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true); + }); + + it("increments retryCount on failed recovery attempt", async () => { + await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir); + + const deliver = vi.fn().mockRejectedValue(new Error("network down")); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(result.failed).toBe(1); + expect(result.recovered).toBe(0); + + // Entry should still be in queue with incremented retryCount. + const entries = await loadPendingDeliveries(tmpDir); + expect(entries).toHaveLength(1); + expect(entries[0].retryCount).toBe(1); + expect(entries[0].lastError).toBe("network down"); + }); + + it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + + const deliver = vi.fn().mockResolvedValue([]); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); + }); + + it("replays stored delivery options during recovery", async () => { + await enqueueDelivery( + { + channel: "whatsapp", + to: "+1", + payloads: [{ text: "a" }], + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }, + tmpDir, + ); + + const deliver = vi.fn().mockResolvedValue([]); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(deliver).toHaveBeenCalledWith( + expect.objectContaining({ + bestEffort: true, + gifPlayback: true, + silent: true, + mirror: { + sessionKey: "agent:main:main", + text: "a", + mediaUrls: ["https://example.com/a.png"], + }, + }), + ); + }); + + it("respects maxRecoveryMs time budget", async () => { + await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); + await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); + await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir); + + const deliver = vi.fn().mockResolvedValue([]); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + maxRecoveryMs: 0, // Immediate timeout — no entries should be processed. + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(result.recovered).toBe(0); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + + // All entries should still be in the queue. + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(3); + + // Should have logged a warning about deferred entries. + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart")); + }); + + it("defers entries when backoff exceeds the recovery budget", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, + tmpDir, + ); + const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`); + const entry = JSON.parse(fs.readFileSync(filePath, "utf-8")); + entry.retryCount = 3; + fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); + + const deliver = vi.fn().mockResolvedValue([]); + const delay = vi.fn(async () => {}); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay, + maxRecoveryMs: 1000, + }); + + expect(deliver).not.toHaveBeenCalled(); + expect(delay).not.toHaveBeenCalled(); + expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + + const remaining = await loadPendingDeliveries(tmpDir); + expect(remaining).toHaveLength(1); + + expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart")); + }); + + it("returns zeros when queue is empty", async () => { + const deliver = vi.fn(); + const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + const result = await recoverPendingDeliveries({ + deliver, + log, + cfg: baseCfg, + stateDir: tmpDir, + delay: noopDelay, + }); + + expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); + expect(deliver).not.toHaveBeenCalled(); + }); +}); diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts new file mode 100644 index 000000000..7303d8272 --- /dev/null +++ b/src/infra/outbound/delivery-queue.ts @@ -0,0 +1,328 @@ +import crypto from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { OutboundChannel } from "./targets.js"; +import { resolveStateDir } from "../../config/paths.js"; + +const QUEUE_DIRNAME = "delivery-queue"; +const FAILED_DIRNAME = "failed"; +const MAX_RETRIES = 5; + +/** Backoff delays in milliseconds indexed by retry count (1-based). */ +const BACKOFF_MS: readonly number[] = [ + 5_000, // retry 1: 5s + 25_000, // retry 2: 25s + 120_000, // retry 3: 2m + 600_000, // retry 4: 10m +]; + +export interface QueuedDelivery { + id: string; + enqueuedAt: number; + channel: Exclude; + to: string; + accountId?: string; + /** + * Original payloads before plugin hooks. On recovery, hooks re-run on these + * payloads — this is intentional since hooks are stateless transforms and + * should produce the same result on replay. + */ + payloads: ReplyPayload[]; + threadId?: string | number | null; + replyToId?: string | null; + bestEffort?: boolean; + gifPlayback?: boolean; + silent?: boolean; + mirror?: { + sessionKey: string; + agentId?: string; + text?: string; + mediaUrls?: string[]; + }; + retryCount: number; + lastError?: string; +} + +function resolveQueueDir(stateDir?: string): string { + const base = stateDir ?? resolveStateDir(); + return path.join(base, QUEUE_DIRNAME); +} + +function resolveFailedDir(stateDir?: string): string { + return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME); +} + +/** Ensure the queue directory (and failed/ subdirectory) exist. */ +export async function ensureQueueDir(stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); + await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); + return queueDir; +} + +/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */ +export async function enqueueDelivery( + params: { + channel: Exclude; + to: string; + accountId?: string; + payloads: ReplyPayload[]; + threadId?: string | number | null; + replyToId?: string | null; + bestEffort?: boolean; + gifPlayback?: boolean; + silent?: boolean; + mirror?: { + sessionKey: string; + agentId?: string; + text?: string; + mediaUrls?: string[]; + }; + }, + stateDir?: string, +): Promise { + const queueDir = await ensureQueueDir(stateDir); + const id = crypto.randomUUID(); + const entry: QueuedDelivery = { + id, + enqueuedAt: Date.now(), + channel: params.channel, + to: params.to, + accountId: params.accountId, + payloads: params.payloads, + threadId: params.threadId, + replyToId: params.replyToId, + bestEffort: params.bestEffort, + gifPlayback: params.gifPlayback, + silent: params.silent, + mirror: params.mirror, + retryCount: 0, + }; + const filePath = path.join(queueDir, `${id}.json`); + const tmp = `${filePath}.${process.pid}.tmp`; + const json = JSON.stringify(entry, null, 2); + await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 }); + await fs.promises.rename(tmp, filePath); + return id; +} + +/** Remove a successfully delivered entry from the queue. */ +export async function ackDelivery(id: string, stateDir?: string): Promise { + const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + try { + await fs.promises.unlink(filePath); + } catch (err) { + const code = + err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; + if (code !== "ENOENT") { + throw err; + } + // Already removed — no-op. + } +} + +/** Update a queue entry after a failed delivery attempt. */ +export async function failDelivery(id: string, error: string, stateDir?: string): Promise { + const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + const raw = await fs.promises.readFile(filePath, "utf-8"); + const entry: QueuedDelivery = JSON.parse(raw); + entry.retryCount += 1; + entry.lastError = error; + const tmp = `${filePath}.${process.pid}.tmp`; + await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), { + encoding: "utf-8", + mode: 0o600, + }); + await fs.promises.rename(tmp, filePath); +} + +/** Load all pending delivery entries from the queue directory. */ +export async function loadPendingDeliveries(stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + let files: string[]; + try { + files = await fs.promises.readdir(queueDir); + } catch (err) { + const code = + err && typeof err === "object" && "code" in err + ? String((err as { code?: unknown }).code) + : null; + if (code === "ENOENT") { + return []; + } + throw err; + } + const entries: QueuedDelivery[] = []; + for (const file of files) { + if (!file.endsWith(".json")) { + continue; + } + const filePath = path.join(queueDir, file); + try { + const stat = await fs.promises.stat(filePath); + if (!stat.isFile()) { + continue; + } + const raw = await fs.promises.readFile(filePath, "utf-8"); + entries.push(JSON.parse(raw)); + } catch { + // Skip malformed or inaccessible entries. + } + } + return entries; +} + +/** Move a queue entry to the failed/ subdirectory. */ +export async function moveToFailed(id: string, stateDir?: string): Promise { + const queueDir = resolveQueueDir(stateDir); + const failedDir = resolveFailedDir(stateDir); + await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); + const src = path.join(queueDir, `${id}.json`); + const dest = path.join(failedDir, `${id}.json`); + await fs.promises.rename(src, dest); +} + +/** Compute the backoff delay in ms for a given retry count. */ +export function computeBackoffMs(retryCount: number): number { + if (retryCount <= 0) { + return 0; + } + return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0; +} + +export type DeliverFn = (params: { + cfg: OpenClawConfig; + channel: Exclude; + to: string; + accountId?: string; + payloads: ReplyPayload[]; + threadId?: string | number | null; + replyToId?: string | null; + bestEffort?: boolean; + gifPlayback?: boolean; + silent?: boolean; + mirror?: { + sessionKey: string; + agentId?: string; + text?: string; + mediaUrls?: string[]; + }; + skipQueue?: boolean; +}) => Promise; + +export interface RecoveryLogger { + info(msg: string): void; + warn(msg: string): void; + error(msg: string): void; +} + +/** + * On gateway startup, scan the delivery queue and retry any pending entries. + * Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/. + */ +export async function recoverPendingDeliveries(opts: { + deliver: DeliverFn; + log: RecoveryLogger; + cfg: OpenClawConfig; + stateDir?: string; + /** Override for testing — resolves instead of using real setTimeout. */ + delay?: (ms: number) => Promise; + /** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */ + maxRecoveryMs?: number; +}): Promise<{ recovered: number; failed: number; skipped: number }> { + const pending = await loadPendingDeliveries(opts.stateDir); + if (pending.length === 0) { + return { recovered: 0, failed: 0, skipped: 0 }; + } + + // Process oldest first. + pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt); + + opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`); + + const delayFn = opts.delay ?? ((ms: number) => new Promise((r) => setTimeout(r, ms))); + const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000); + + let recovered = 0; + let failed = 0; + let skipped = 0; + + for (const entry of pending) { + const now = Date.now(); + if (now >= deadline) { + const deferred = pending.length - recovered - failed - skipped; + opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next restart`); + break; + } + if (entry.retryCount >= MAX_RETRIES) { + opts.log.warn( + `Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`, + ); + try { + await moveToFailed(entry.id, opts.stateDir); + } catch (err) { + opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`); + } + skipped += 1; + continue; + } + + const backoff = computeBackoffMs(entry.retryCount + 1); + if (backoff > 0) { + if (now + backoff >= deadline) { + const deferred = pending.length - recovered - failed - skipped; + opts.log.warn( + `Recovery time budget exceeded — ${deferred} entries deferred to next restart`, + ); + break; + } + opts.log.info(`Waiting ${backoff}ms before retrying delivery ${entry.id}`); + await delayFn(backoff); + } + + try { + await opts.deliver({ + cfg: opts.cfg, + channel: entry.channel, + to: entry.to, + accountId: entry.accountId, + payloads: entry.payloads, + threadId: entry.threadId, + replyToId: entry.replyToId, + bestEffort: entry.bestEffort, + gifPlayback: entry.gifPlayback, + silent: entry.silent, + mirror: entry.mirror, + skipQueue: true, // Prevent re-enqueueing during recovery + }); + await ackDelivery(entry.id, opts.stateDir); + recovered += 1; + opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`); + } catch (err) { + try { + await failDelivery( + entry.id, + err instanceof Error ? err.message : String(err), + opts.stateDir, + ); + } catch { + // Best-effort update. + } + failed += 1; + opts.log.warn( + `Retry failed for delivery ${entry.id}: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + opts.log.info( + `Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries)`, + ); + return { recovered, failed, skipped }; +} + +export { MAX_RETRIES };