Co-authored-by: Shadow <hi@shadowing.dev>
This commit is contained in:
@@ -48,6 +48,7 @@ Docs: https://docs.openclaw.ai
|
||||
- macOS Voice Wake: fix a crash in trigger trimming for CJK/Unicode transcripts by matching and slicing on original-string ranges instead of transformed-string indices. (#11052) Thanks @Flash-LHR.
|
||||
- Heartbeat: prevent scheduler silent-death races during runner reloads, preserve retry cooldown backoff under wake bursts, and prioritize user/action wake causes over interval/retry reasons when coalescing. (#15108) Thanks @joeykrug.
|
||||
- Outbound targets: fail closed for WhatsApp/Twitch/Google Chat fallback paths so invalid or missing targets are dropped instead of rerouted, and align resolver hints with strict target requirements. (#13578) Thanks @mcaxtr.
|
||||
- Outbound: add a write-ahead delivery queue with crash-recovery retries to prevent lost outbound messages after gateway restarts. (#15636) Thanks @nabbilkhan, @thewilloftheshadow.
|
||||
- Exec/Allowlist: allow multiline heredoc bodies (`<<`, `<<-`) while keeping multiline non-heredoc shell commands blocked, so exec approval parsing permits heredoc input safely without allowing general newline command chaining. (#13811) Thanks @mcaxtr.
|
||||
- Docs/Mermaid: remove hardcoded Mermaid init theme blocks from four docs diagrams so dark mode inherits readable theme defaults. (#15157) Thanks @heytulsiprasad.
|
||||
- Outbound/Threading: pass `replyTo` and `threadId` from `message send` tool actions through the core outbound send path to channel adapters, preserving thread/reply routing. (#14948) Thanks @mcaxtr.
|
||||
|
||||
@@ -470,6 +470,18 @@ export async function startGatewayServer(
|
||||
|
||||
void cron.start().catch((err) => logCron.error(`failed to start: ${String(err)}`));
|
||||
|
||||
// Recover pending outbound deliveries from previous crash/restart.
|
||||
void (async () => {
|
||||
const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js");
|
||||
const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js");
|
||||
const logRecovery = log.child("delivery-recovery");
|
||||
await recoverPendingDeliveries({
|
||||
deliver: deliverOutboundPayloads,
|
||||
log: logRecovery,
|
||||
cfg: cfgAtStart,
|
||||
});
|
||||
})().catch((err) => log.error(`Delivery recovery failed: ${String(err)}`));
|
||||
|
||||
const execApprovalManager = new ExecApprovalManager();
|
||||
const execApprovalForwarder = createExecApprovalForwarder();
|
||||
const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, {
|
||||
|
||||
@@ -20,6 +20,11 @@ const hookMocks = vi.hoisted(() => ({
|
||||
runMessageSent: vi.fn(async () => {}),
|
||||
},
|
||||
}));
|
||||
const queueMocks = vi.hoisted(() => ({
|
||||
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
|
||||
ackDelivery: vi.fn(async () => {}),
|
||||
failDelivery: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../../config/sessions.js")>(
|
||||
@@ -33,6 +38,11 @@ vi.mock("../../config/sessions.js", async () => {
|
||||
vi.mock("../../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: () => hookMocks.runner,
|
||||
}));
|
||||
vi.mock("./delivery-queue.js", () => ({
|
||||
enqueueDelivery: queueMocks.enqueueDelivery,
|
||||
ackDelivery: queueMocks.ackDelivery,
|
||||
failDelivery: queueMocks.failDelivery,
|
||||
}));
|
||||
|
||||
const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js");
|
||||
|
||||
@@ -43,6 +53,12 @@ describe("deliverOutboundPayloads", () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(false);
|
||||
hookMocks.runner.runMessageSent.mockReset();
|
||||
hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
|
||||
queueMocks.enqueueDelivery.mockReset();
|
||||
queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id");
|
||||
queueMocks.ackDelivery.mockReset();
|
||||
queueMocks.ackDelivery.mockResolvedValue(undefined);
|
||||
queueMocks.failDelivery.mockReset();
|
||||
queueMocks.failDelivery.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -389,6 +405,57 @@ describe("deliverOutboundPayloads", () => {
|
||||
expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]);
|
||||
});
|
||||
|
||||
it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
|
||||
const sendWhatsApp = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(new Error("fail"))
|
||||
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
|
||||
const onError = vi.fn();
|
||||
const cfg: OpenClawConfig = {};
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "a" }, { text: "b" }],
|
||||
deps: { sendWhatsApp },
|
||||
bestEffort: true,
|
||||
onError,
|
||||
});
|
||||
|
||||
// onError was called for the first payload's failure.
|
||||
expect(onError).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Queue entry should NOT be acked — failDelivery should be called instead.
|
||||
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
|
||||
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
|
||||
"mock-queue-id",
|
||||
"partial delivery failure (bestEffort)",
|
||||
);
|
||||
});
|
||||
|
||||
it("acks the queue entry when delivery is aborted", async () => {
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
const cfg: OpenClawConfig = {};
|
||||
|
||||
await expect(
|
||||
deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "a" }],
|
||||
deps: { sendWhatsApp },
|
||||
abortSignal: abortController.signal,
|
||||
}),
|
||||
).rejects.toThrow("Operation aborted");
|
||||
|
||||
expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
|
||||
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
|
||||
expect(sendWhatsApp).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("passes normalized payload to onError", async () => {
|
||||
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
|
||||
const onError = vi.fn();
|
||||
|
||||
@@ -25,6 +25,7 @@ import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js";
|
||||
import { sendMessageSignal } from "../../signal/send.js";
|
||||
import { throwIfAborted } from "./abort.js";
|
||||
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
|
||||
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
|
||||
|
||||
export type { NormalizedOutboundPayload } from "./payloads.js";
|
||||
@@ -178,6 +179,8 @@ function createPluginHandler(params: {
|
||||
};
|
||||
}
|
||||
|
||||
const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError";
|
||||
|
||||
export async function deliverOutboundPayloads(params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
@@ -199,6 +202,88 @@ export async function deliverOutboundPayloads(params: {
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
silent?: boolean;
|
||||
/** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */
|
||||
skipQueue?: boolean;
|
||||
}): Promise<OutboundDeliveryResult[]> {
|
||||
const { channel, to, payloads } = params;
|
||||
|
||||
// Write-ahead delivery queue: persist before sending, remove after success.
|
||||
const queueId = params.skipQueue
|
||||
? null
|
||||
: await enqueueDelivery({
|
||||
channel,
|
||||
to,
|
||||
accountId: params.accountId,
|
||||
payloads,
|
||||
threadId: params.threadId,
|
||||
replyToId: params.replyToId,
|
||||
bestEffort: params.bestEffort,
|
||||
gifPlayback: params.gifPlayback,
|
||||
silent: params.silent,
|
||||
mirror: params.mirror,
|
||||
}).catch(() => null); // Best-effort — don't block delivery if queue write fails.
|
||||
|
||||
// Wrap onError to detect partial failures under bestEffort mode.
|
||||
// When bestEffort is true, per-payload errors are caught and passed to onError
|
||||
// without throwing — so the outer try/catch never fires. We track whether any
|
||||
// payload failed so we can call failDelivery instead of ackDelivery.
|
||||
let hadPartialFailure = false;
|
||||
const wrappedParams = params.onError
|
||||
? {
|
||||
...params,
|
||||
onError: (err: unknown, payload: NormalizedOutboundPayload) => {
|
||||
hadPartialFailure = true;
|
||||
params.onError!(err, payload);
|
||||
},
|
||||
}
|
||||
: params;
|
||||
|
||||
try {
|
||||
const results = await deliverOutboundPayloadsCore(wrappedParams);
|
||||
if (queueId) {
|
||||
if (hadPartialFailure) {
|
||||
await failDelivery(queueId, "partial delivery failure (bestEffort)").catch(() => {});
|
||||
} else {
|
||||
await ackDelivery(queueId).catch(() => {}); // Best-effort cleanup.
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (err) {
|
||||
if (queueId) {
|
||||
if (isAbortError(err)) {
|
||||
await ackDelivery(queueId).catch(() => {});
|
||||
} else {
|
||||
await failDelivery(queueId, err instanceof Error ? err.message : String(err)).catch(
|
||||
() => {},
|
||||
);
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/** Core delivery logic (extracted for queue wrapper). */
|
||||
async function deliverOutboundPayloadsCore(params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
payloads: ReplyPayload[];
|
||||
replyToId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
deps?: OutboundSendDeps;
|
||||
gifPlayback?: boolean;
|
||||
abortSignal?: AbortSignal;
|
||||
bestEffort?: boolean;
|
||||
onError?: (err: unknown, payload: NormalizedOutboundPayload) => void;
|
||||
onPayload?: (payload: NormalizedOutboundPayload) => void;
|
||||
mirror?: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
silent?: boolean;
|
||||
}): Promise<OutboundDeliveryResult[]> {
|
||||
const { cfg, channel, to, payloads } = params;
|
||||
const accountId = params.accountId;
|
||||
|
||||
373
src/infra/outbound/delivery-queue.test.ts
Normal file
373
src/infra/outbound/delivery-queue.test.ts
Normal file
@@ -0,0 +1,373 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
ackDelivery,
|
||||
computeBackoffMs,
|
||||
enqueueDelivery,
|
||||
failDelivery,
|
||||
loadPendingDeliveries,
|
||||
MAX_RETRIES,
|
||||
moveToFailed,
|
||||
recoverPendingDeliveries,
|
||||
} from "./delivery-queue.js";
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-dq-test-"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
fs.rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe("enqueue + ack lifecycle", () => {
|
||||
it("creates and removes a queue entry", async () => {
|
||||
const id = await enqueueDelivery(
|
||||
{
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
bestEffort: true,
|
||||
gifPlayback: true,
|
||||
silent: true,
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
text: "hello",
|
||||
mediaUrls: ["https://example.com/file.png"],
|
||||
},
|
||||
},
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
// Entry file exists after enqueue.
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const files = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
|
||||
expect(files).toHaveLength(1);
|
||||
expect(files[0]).toBe(`${id}.json`);
|
||||
|
||||
// Entry contents are correct.
|
||||
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, files[0]), "utf-8"));
|
||||
expect(entry).toMatchObject({
|
||||
id,
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
bestEffort: true,
|
||||
gifPlayback: true,
|
||||
silent: true,
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
text: "hello",
|
||||
mediaUrls: ["https://example.com/file.png"],
|
||||
},
|
||||
retryCount: 0,
|
||||
});
|
||||
expect(entry.payloads).toEqual([{ text: "hello" }]);
|
||||
|
||||
// Ack removes the file.
|
||||
await ackDelivery(id, tmpDir);
|
||||
const remaining = fs.readdirSync(queueDir).filter((f) => f.endsWith(".json"));
|
||||
expect(remaining).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("ack is idempotent (no error on missing file)", async () => {
|
||||
await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("failDelivery", () => {
|
||||
it("increments retryCount and sets lastError", async () => {
|
||||
const id = await enqueueDelivery(
|
||||
{
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
payloads: [{ text: "test" }],
|
||||
},
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
await failDelivery(id, "connection refused", tmpDir);
|
||||
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const entry = JSON.parse(fs.readFileSync(path.join(queueDir, `${id}.json`), "utf-8"));
|
||||
expect(entry.retryCount).toBe(1);
|
||||
expect(entry.lastError).toBe("connection refused");
|
||||
});
|
||||
});
|
||||
|
||||
describe("moveToFailed", () => {
|
||||
it("moves entry to failed/ subdirectory", async () => {
|
||||
const id = await enqueueDelivery(
|
||||
{
|
||||
channel: "slack",
|
||||
to: "#general",
|
||||
payloads: [{ text: "hi" }],
|
||||
},
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
await moveToFailed(id, tmpDir);
|
||||
|
||||
const queueDir = path.join(tmpDir, "delivery-queue");
|
||||
const failedDir = path.join(queueDir, "failed");
|
||||
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
|
||||
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("loadPendingDeliveries", () => {
|
||||
it("returns empty array when queue directory does not exist", async () => {
|
||||
const nonexistent = path.join(tmpDir, "no-such-dir");
|
||||
const entries = await loadPendingDeliveries(nonexistent);
|
||||
expect(entries).toEqual([]);
|
||||
});
|
||||
|
||||
it("loads multiple entries", async () => {
|
||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
|
||||
|
||||
const entries = await loadPendingDeliveries(tmpDir);
|
||||
expect(entries).toHaveLength(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("computeBackoffMs", () => {
|
||||
it("returns 0 for retryCount 0", () => {
|
||||
expect(computeBackoffMs(0)).toBe(0);
|
||||
});
|
||||
|
||||
it("returns correct backoff for each retry", () => {
|
||||
expect(computeBackoffMs(1)).toBe(5_000);
|
||||
expect(computeBackoffMs(2)).toBe(25_000);
|
||||
expect(computeBackoffMs(3)).toBe(120_000);
|
||||
expect(computeBackoffMs(4)).toBe(600_000);
|
||||
// Beyond defined schedule — clamps to last value.
|
||||
expect(computeBackoffMs(5)).toBe(600_000);
|
||||
});
|
||||
});
|
||||
|
||||
describe("recoverPendingDeliveries", () => {
|
||||
const noopDelay = async () => {};
|
||||
const baseCfg = {};
|
||||
|
||||
it("recovers entries from a simulated crash", async () => {
|
||||
// Manually create two queue entries as if gateway crashed before delivery.
|
||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
|
||||
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledTimes(2);
|
||||
expect(result.recovered).toBe(2);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(0);
|
||||
|
||||
// Queue should be empty after recovery.
|
||||
const remaining = await loadPendingDeliveries(tmpDir);
|
||||
expect(remaining).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("moves entries that exceeded max retries to failed/", async () => {
|
||||
// Create an entry and manually set retryCount to MAX_RETRIES.
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
||||
tmpDir,
|
||||
);
|
||||
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
|
||||
entry.retryCount = MAX_RETRIES;
|
||||
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
|
||||
|
||||
const deliver = vi.fn();
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result.skipped).toBe(1);
|
||||
|
||||
// Entry should be in failed/ directory.
|
||||
const failedDir = path.join(tmpDir, "delivery-queue", "failed");
|
||||
expect(fs.existsSync(path.join(failedDir, `${id}.json`))).toBe(true);
|
||||
});
|
||||
|
||||
it("increments retryCount on failed recovery attempt", async () => {
|
||||
await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir);
|
||||
|
||||
const deliver = vi.fn().mockRejectedValue(new Error("network down"));
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(result.failed).toBe(1);
|
||||
expect(result.recovered).toBe(0);
|
||||
|
||||
// Entry should still be in queue with incremented retryCount.
|
||||
const entries = await loadPendingDeliveries(tmpDir);
|
||||
expect(entries).toHaveLength(1);
|
||||
expect(entries[0].retryCount).toBe(1);
|
||||
expect(entries[0].lastError).toBe("network down");
|
||||
});
|
||||
|
||||
it("passes skipQueue: true to prevent re-enqueueing during recovery", async () => {
|
||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
|
||||
});
|
||||
|
||||
it("replays stored delivery options during recovery", async () => {
|
||||
await enqueueDelivery(
|
||||
{
|
||||
channel: "whatsapp",
|
||||
to: "+1",
|
||||
payloads: [{ text: "a" }],
|
||||
bestEffort: true,
|
||||
gifPlayback: true,
|
||||
silent: true,
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
text: "a",
|
||||
mediaUrls: ["https://example.com/a.png"],
|
||||
},
|
||||
},
|
||||
tmpDir,
|
||||
);
|
||||
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(deliver).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
bestEffort: true,
|
||||
gifPlayback: true,
|
||||
silent: true,
|
||||
mirror: {
|
||||
sessionKey: "agent:main:main",
|
||||
text: "a",
|
||||
mediaUrls: ["https://example.com/a.png"],
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("respects maxRecoveryMs time budget", async () => {
|
||||
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
|
||||
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
|
||||
await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir);
|
||||
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
maxRecoveryMs: 0, // Immediate timeout — no entries should be processed.
|
||||
});
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(0);
|
||||
|
||||
// All entries should still be in the queue.
|
||||
const remaining = await loadPendingDeliveries(tmpDir);
|
||||
expect(remaining).toHaveLength(3);
|
||||
|
||||
// Should have logged a warning about deferred entries.
|
||||
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
|
||||
});
|
||||
|
||||
it("defers entries when backoff exceeds the recovery budget", async () => {
|
||||
const id = await enqueueDelivery(
|
||||
{ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] },
|
||||
tmpDir,
|
||||
);
|
||||
const filePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
|
||||
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
|
||||
entry.retryCount = 3;
|
||||
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
|
||||
|
||||
const deliver = vi.fn().mockResolvedValue([]);
|
||||
const delay = vi.fn(async () => {});
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay,
|
||||
maxRecoveryMs: 1000,
|
||||
});
|
||||
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
expect(delay).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
|
||||
|
||||
const remaining = await loadPendingDeliveries(tmpDir);
|
||||
expect(remaining).toHaveLength(1);
|
||||
|
||||
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining("deferred to next restart"));
|
||||
});
|
||||
|
||||
it("returns zeros when queue is empty", async () => {
|
||||
const deliver = vi.fn();
|
||||
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
|
||||
const result = await recoverPendingDeliveries({
|
||||
deliver,
|
||||
log,
|
||||
cfg: baseCfg,
|
||||
stateDir: tmpDir,
|
||||
delay: noopDelay,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
|
||||
expect(deliver).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
328
src/infra/outbound/delivery-queue.ts
Normal file
328
src/infra/outbound/delivery-queue.ts
Normal file
@@ -0,0 +1,328 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import type { ReplyPayload } from "../../auto-reply/types.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { OutboundChannel } from "./targets.js";
|
||||
import { resolveStateDir } from "../../config/paths.js";
|
||||
|
||||
const QUEUE_DIRNAME = "delivery-queue";
|
||||
const FAILED_DIRNAME = "failed";
|
||||
const MAX_RETRIES = 5;
|
||||
|
||||
/** Backoff delays in milliseconds indexed by retry count (1-based). */
|
||||
const BACKOFF_MS: readonly number[] = [
|
||||
5_000, // retry 1: 5s
|
||||
25_000, // retry 2: 25s
|
||||
120_000, // retry 3: 2m
|
||||
600_000, // retry 4: 10m
|
||||
];
|
||||
|
||||
export interface QueuedDelivery {
|
||||
id: string;
|
||||
enqueuedAt: number;
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
/**
|
||||
* Original payloads before plugin hooks. On recovery, hooks re-run on these
|
||||
* payloads — this is intentional since hooks are stateless transforms and
|
||||
* should produce the same result on replay.
|
||||
*/
|
||||
payloads: ReplyPayload[];
|
||||
threadId?: string | number | null;
|
||||
replyToId?: string | null;
|
||||
bestEffort?: boolean;
|
||||
gifPlayback?: boolean;
|
||||
silent?: boolean;
|
||||
mirror?: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
retryCount: number;
|
||||
lastError?: string;
|
||||
}
|
||||
|
||||
function resolveQueueDir(stateDir?: string): string {
|
||||
const base = stateDir ?? resolveStateDir();
|
||||
return path.join(base, QUEUE_DIRNAME);
|
||||
}
|
||||
|
||||
function resolveFailedDir(stateDir?: string): string {
|
||||
return path.join(resolveQueueDir(stateDir), FAILED_DIRNAME);
|
||||
}
|
||||
|
||||
/** Ensure the queue directory (and failed/ subdirectory) exist. */
|
||||
export async function ensureQueueDir(stateDir?: string): Promise<string> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 });
|
||||
await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 });
|
||||
return queueDir;
|
||||
}
|
||||
|
||||
/** Persist a delivery entry to disk before attempting send. Returns the entry ID. */
|
||||
export async function enqueueDelivery(
|
||||
params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
payloads: ReplyPayload[];
|
||||
threadId?: string | number | null;
|
||||
replyToId?: string | null;
|
||||
bestEffort?: boolean;
|
||||
gifPlayback?: boolean;
|
||||
silent?: boolean;
|
||||
mirror?: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
},
|
||||
stateDir?: string,
|
||||
): Promise<string> {
|
||||
const queueDir = await ensureQueueDir(stateDir);
|
||||
const id = crypto.randomUUID();
|
||||
const entry: QueuedDelivery = {
|
||||
id,
|
||||
enqueuedAt: Date.now(),
|
||||
channel: params.channel,
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
payloads: params.payloads,
|
||||
threadId: params.threadId,
|
||||
replyToId: params.replyToId,
|
||||
bestEffort: params.bestEffort,
|
||||
gifPlayback: params.gifPlayback,
|
||||
silent: params.silent,
|
||||
mirror: params.mirror,
|
||||
retryCount: 0,
|
||||
};
|
||||
const filePath = path.join(queueDir, `${id}.json`);
|
||||
const tmp = `${filePath}.${process.pid}.tmp`;
|
||||
const json = JSON.stringify(entry, null, 2);
|
||||
await fs.promises.writeFile(tmp, json, { encoding: "utf-8", mode: 0o600 });
|
||||
await fs.promises.rename(tmp, filePath);
|
||||
return id;
|
||||
}
|
||||
|
||||
/** Remove a successfully delivered entry from the queue. */
|
||||
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
|
||||
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
|
||||
try {
|
||||
await fs.promises.unlink(filePath);
|
||||
} catch (err) {
|
||||
const code =
|
||||
err && typeof err === "object" && "code" in err
|
||||
? String((err as { code?: unknown }).code)
|
||||
: null;
|
||||
if (code !== "ENOENT") {
|
||||
throw err;
|
||||
}
|
||||
// Already removed — no-op.
|
||||
}
|
||||
}
|
||||
|
||||
/** Update a queue entry after a failed delivery attempt. */
|
||||
export async function failDelivery(id: string, error: string, stateDir?: string): Promise<void> {
|
||||
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
|
||||
const raw = await fs.promises.readFile(filePath, "utf-8");
|
||||
const entry: QueuedDelivery = JSON.parse(raw);
|
||||
entry.retryCount += 1;
|
||||
entry.lastError = error;
|
||||
const tmp = `${filePath}.${process.pid}.tmp`;
|
||||
await fs.promises.writeFile(tmp, JSON.stringify(entry, null, 2), {
|
||||
encoding: "utf-8",
|
||||
mode: 0o600,
|
||||
});
|
||||
await fs.promises.rename(tmp, filePath);
|
||||
}
|
||||
|
||||
/** Load all pending delivery entries from the queue directory. */
|
||||
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
let files: string[];
|
||||
try {
|
||||
files = await fs.promises.readdir(queueDir);
|
||||
} catch (err) {
|
||||
const code =
|
||||
err && typeof err === "object" && "code" in err
|
||||
? String((err as { code?: unknown }).code)
|
||||
: null;
|
||||
if (code === "ENOENT") {
|
||||
return [];
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const entries: QueuedDelivery[] = [];
|
||||
for (const file of files) {
|
||||
if (!file.endsWith(".json")) {
|
||||
continue;
|
||||
}
|
||||
const filePath = path.join(queueDir, file);
|
||||
try {
|
||||
const stat = await fs.promises.stat(filePath);
|
||||
if (!stat.isFile()) {
|
||||
continue;
|
||||
}
|
||||
const raw = await fs.promises.readFile(filePath, "utf-8");
|
||||
entries.push(JSON.parse(raw));
|
||||
} catch {
|
||||
// Skip malformed or inaccessible entries.
|
||||
}
|
||||
}
|
||||
return entries;
|
||||
}
|
||||
|
||||
/** Move a queue entry to the failed/ subdirectory. */
|
||||
export async function moveToFailed(id: string, stateDir?: string): Promise<void> {
|
||||
const queueDir = resolveQueueDir(stateDir);
|
||||
const failedDir = resolveFailedDir(stateDir);
|
||||
await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 });
|
||||
const src = path.join(queueDir, `${id}.json`);
|
||||
const dest = path.join(failedDir, `${id}.json`);
|
||||
await fs.promises.rename(src, dest);
|
||||
}
|
||||
|
||||
/** Compute the backoff delay in ms for a given retry count. */
|
||||
export function computeBackoffMs(retryCount: number): number {
|
||||
if (retryCount <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return BACKOFF_MS[Math.min(retryCount - 1, BACKOFF_MS.length - 1)] ?? BACKOFF_MS.at(-1) ?? 0;
|
||||
}
|
||||
|
||||
export type DeliverFn = (params: {
|
||||
cfg: OpenClawConfig;
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
payloads: ReplyPayload[];
|
||||
threadId?: string | number | null;
|
||||
replyToId?: string | null;
|
||||
bestEffort?: boolean;
|
||||
gifPlayback?: boolean;
|
||||
silent?: boolean;
|
||||
mirror?: {
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
skipQueue?: boolean;
|
||||
}) => Promise<unknown>;
|
||||
|
||||
export interface RecoveryLogger {
|
||||
info(msg: string): void;
|
||||
warn(msg: string): void;
|
||||
error(msg: string): void;
|
||||
}
|
||||
|
||||
/**
|
||||
* On gateway startup, scan the delivery queue and retry any pending entries.
|
||||
* Uses exponential backoff and moves entries that exceed MAX_RETRIES to failed/.
|
||||
*/
|
||||
export async function recoverPendingDeliveries(opts: {
|
||||
deliver: DeliverFn;
|
||||
log: RecoveryLogger;
|
||||
cfg: OpenClawConfig;
|
||||
stateDir?: string;
|
||||
/** Override for testing — resolves instead of using real setTimeout. */
|
||||
delay?: (ms: number) => Promise<void>;
|
||||
/** Maximum wall-clock time for recovery in ms. Remaining entries are deferred to next restart. Default: 60 000. */
|
||||
maxRecoveryMs?: number;
|
||||
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const pending = await loadPendingDeliveries(opts.stateDir);
|
||||
if (pending.length === 0) {
|
||||
return { recovered: 0, failed: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// Process oldest first.
|
||||
pending.sort((a, b) => a.enqueuedAt - b.enqueuedAt);
|
||||
|
||||
opts.log.info(`Found ${pending.length} pending delivery entries — starting recovery`);
|
||||
|
||||
const delayFn = opts.delay ?? ((ms: number) => new Promise<void>((r) => setTimeout(r, ms)));
|
||||
const deadline = Date.now() + (opts.maxRecoveryMs ?? 60_000);
|
||||
|
||||
let recovered = 0;
|
||||
let failed = 0;
|
||||
let skipped = 0;
|
||||
|
||||
for (const entry of pending) {
|
||||
const now = Date.now();
|
||||
if (now >= deadline) {
|
||||
const deferred = pending.length - recovered - failed - skipped;
|
||||
opts.log.warn(`Recovery time budget exceeded — ${deferred} entries deferred to next restart`);
|
||||
break;
|
||||
}
|
||||
if (entry.retryCount >= MAX_RETRIES) {
|
||||
opts.log.warn(
|
||||
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
|
||||
);
|
||||
try {
|
||||
await moveToFailed(entry.id, opts.stateDir);
|
||||
} catch (err) {
|
||||
opts.log.error(`Failed to move entry ${entry.id} to failed/: ${String(err)}`);
|
||||
}
|
||||
skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
const backoff = computeBackoffMs(entry.retryCount + 1);
|
||||
if (backoff > 0) {
|
||||
if (now + backoff >= deadline) {
|
||||
const deferred = pending.length - recovered - failed - skipped;
|
||||
opts.log.warn(
|
||||
`Recovery time budget exceeded — ${deferred} entries deferred to next restart`,
|
||||
);
|
||||
break;
|
||||
}
|
||||
opts.log.info(`Waiting ${backoff}ms before retrying delivery ${entry.id}`);
|
||||
await delayFn(backoff);
|
||||
}
|
||||
|
||||
try {
|
||||
await opts.deliver({
|
||||
cfg: opts.cfg,
|
||||
channel: entry.channel,
|
||||
to: entry.to,
|
||||
accountId: entry.accountId,
|
||||
payloads: entry.payloads,
|
||||
threadId: entry.threadId,
|
||||
replyToId: entry.replyToId,
|
||||
bestEffort: entry.bestEffort,
|
||||
gifPlayback: entry.gifPlayback,
|
||||
silent: entry.silent,
|
||||
mirror: entry.mirror,
|
||||
skipQueue: true, // Prevent re-enqueueing during recovery
|
||||
});
|
||||
await ackDelivery(entry.id, opts.stateDir);
|
||||
recovered += 1;
|
||||
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
|
||||
} catch (err) {
|
||||
try {
|
||||
await failDelivery(
|
||||
entry.id,
|
||||
err instanceof Error ? err.message : String(err),
|
||||
opts.stateDir,
|
||||
);
|
||||
} catch {
|
||||
// Best-effort update.
|
||||
}
|
||||
failed += 1;
|
||||
opts.log.warn(
|
||||
`Retry failed for delivery ${entry.id}: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
opts.log.info(
|
||||
`Delivery recovery complete: ${recovered} recovered, ${failed} failed, ${skipped} skipped (max retries)`,
|
||||
);
|
||||
return { recovered, failed, skipped };
|
||||
}
|
||||
|
||||
export { MAX_RETRIES };
|
||||
Reference in New Issue
Block a user