diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index e051944dc..d982280ab 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -164,51 +164,6 @@ describe("block streaming", () => { }); }); - it("drops final payloads when block replies streamed", async () => { - await withTempHome(async (home) => { - const onBlockReply = vi.fn().mockResolvedValue(undefined); - - const impl = async (params: RunEmbeddedPiAgentParams) => { - void params.onBlockReply?.({ text: "chunk-1" }); - return { - payloads: [{ text: "chunk-1\nchunk-2" }], - meta: { - durationMs: 5, - agentMeta: { sessionId: "s", provider: "p", model: "m" }, - }, - }; - }; - piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); - - const res = await getReplyFromConfig( - { - Body: "ping", - From: "+1004", - To: "+2000", - MessageSid: "msg-124", - Provider: "discord", - }, - { - onBlockReply, - disableBlockStreaming: false, - }, - { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw"), - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions.json") }, - }, - ); - - expect(res).toBeUndefined(); - expect(onBlockReply).toHaveBeenCalledTimes(1); - }); - }); - it("falls back to final payloads when block reply send times out", async () => { await withTempHome(async (home) => { let sawAbort = false; diff --git a/src/auto-reply/reply.raw-body.test.ts b/src/auto-reply/reply.raw-body.test.ts index e66b174e0..0b19df8a1 100644 --- a/src/auto-reply/reply.raw-body.test.ts +++ b/src/auto-reply/reply.raw-body.test.ts @@ -161,36 +161,10 @@ describe("RawBody directive parsing", () => { }, expectedIncludes: ["Verbose logging enabled."], }); - - await assertCommandReply({ - message: { - Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /status\\n[from: Jake McInteer (+6421807830)]`, - RawBody: "/status", - ChatType: "group", - From: "+1222", - To: "+1222", - SessionKey: "agent:main:whatsapp:group:g1", - Provider: "whatsapp", - Surface: "whatsapp", - SenderE164: "+1222", - CommandAuthorized: true, - }, - config: { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw-3"), - }, - }, - channels: { whatsapp: { allowFrom: ["+1222"] } }, - session: { store: path.join(home, "sessions-3.json") }, - }, - expectedIncludes: ["Session: agent:main:whatsapp:group:g1", "anthropic/claude-opus-4-5"], - }); }); }); - it("preserves history when RawBody is provided for command parsing", async () => { + it("preserves history and reuses non-default agent session files", async () => { await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ payloads: [{ text: "ok" }], @@ -238,11 +212,6 @@ describe("RawBody directive parsing", () => { expect(prompt).toContain('"body": "hello"'); expect(prompt).toContain("status please"); expect(prompt).not.toContain("/think:high"); - }); - }); - - it("reuses non-default agent session files without throwing path validation errors", async () => { - await withTempHome(async (home) => { const agentId = "worker1"; const sessionId = "sess-worker-1"; const sessionKey = `agent:${agentId}:telegram:12345`; @@ -259,6 +228,7 @@ describe("RawBody directive parsing", () => { }, }); + vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ payloads: [{ text: "ok" }], meta: { @@ -267,7 +237,7 @@ describe("RawBody directive parsing", () => { }, }); - const res = await getReplyFromConfig( + const resWorker = await getReplyFromConfig( { Body: "hello", From: "telegram:12345", @@ -288,8 +258,8 @@ describe("RawBody directive parsing", () => { }, ); - const text = Array.isArray(res) ? res[0]?.text : res?.text; - expect(text).toBe("ok"); + const textWorker = Array.isArray(resWorker) ? resWorker[0]?.text : resWorker?.text; + expect(textWorker).toBe("ok"); expect(runEmbeddedPiAgent).toHaveBeenCalledOnce(); expect(vi.mocked(runEmbeddedPiAgent).mock.calls[0]?.[0]?.sessionFile).toBe(sessionFile); }); diff --git a/src/gateway/server-reload.config-during-reply.test.ts b/src/gateway/server-reload.config-during-reply.test.ts index 2ae95be55..326e9de75 100644 --- a/src/gateway/server-reload.config-during-reply.test.ts +++ b/src/gateway/server-reload.config-during-reply.test.ts @@ -36,7 +36,7 @@ describe("gateway config reload during reply", () => { const dispatcher = createReplyDispatcher({ deliver: async (payload) => { // Simulate async reply delivery - await new Promise((resolve) => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 20)); deliveredReplies.push(payload.text ?? ""); }, onError: (err) => { @@ -103,49 +103,4 @@ describe("gateway config reload during reply", () => { expect(deliverCalled).toBe(false); expect(getTotalPendingReplies()).toBe(0); }); - - it("should integrate dispatcher reservation with concurrent dispatchers", async () => { - const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); - const { getTotalQueueSize } = await import("../process/command-queue.js"); - - const deliveredReplies: string[] = []; - const dispatcher = createReplyDispatcher({ - deliver: async (payload) => { - await new Promise((resolve) => setTimeout(resolve, 50)); - deliveredReplies.push(payload.text ?? ""); - }, - }); - - // Dispatcher has reservation (pending=1) - expect(getTotalPendingReplies()).toBe(1); - - // Total active = queue + pending - const totalActive = getTotalQueueSize() + getTotalPendingReplies(); - expect(totalActive).toBe(1); // 0 queue + 1 pending - - // Command finishes, replies enqueued - dispatcher.sendFinalReply({ text: "Reply 1" }); - dispatcher.sendFinalReply({ text: "Reply 2" }); - - // Now: pending=3 (reservation + 2 replies) - expect(getTotalPendingReplies()).toBe(3); - - // Mark complete (flags reservation for cleanup on last delivery) - dispatcher.markComplete(); - - // Reservation still counted until delivery .finally() clears it, - // but the important invariant is pending > 0 while deliveries are in flight. - expect(getTotalPendingReplies()).toBeGreaterThan(0); - - // Wait for replies - await dispatcher.waitForIdle(); - - // Replies sent, pending=0 - expect(getTotalPendingReplies()).toBe(0); - expect(deliveredReplies).toEqual(["Reply 1", "Reply 2"]); - - // Now everything is idle - expect(getTotalPendingReplies()).toBe(0); - expect(getTotalQueueSize()).toBe(0); - }); }); diff --git a/src/gateway/server-reload.integration.test.ts b/src/gateway/server-reload.integration.test.ts index d2ab045fa..3bd1bc80e 100644 --- a/src/gateway/server-reload.integration.test.ts +++ b/src/gateway/server-reload.integration.test.ts @@ -31,7 +31,7 @@ describe("gateway restart deferral integration", () => { const dispatcher = createReplyDispatcher({ deliver: async (payload) => { // Simulate network delay - await new Promise((resolve) => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 20)); deliveredReplies.push({ text: payload.text ?? "", timestamp: Date.now(), @@ -116,84 +116,4 @@ describe("gateway restart deferral integration", () => { "restart-can-proceed", ]); }); - - it("should handle concurrent dispatchers with config changes", async () => { - const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); - const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); - - // Simulate two messages being processed concurrently - const deliveredReplies: string[] = []; - - // Message 1 — dispatcher created - const dispatcher1 = createReplyDispatcher({ - deliver: async (payload) => { - await new Promise((resolve) => setTimeout(resolve, 50)); - deliveredReplies.push(`msg1: ${payload.text}`); - }, - }); - - // Message 2 — dispatcher created - const dispatcher2 = createReplyDispatcher({ - deliver: async (payload) => { - await new Promise((resolve) => setTimeout(resolve, 50)); - deliveredReplies.push(`msg2: ${payload.text}`); - }, - }); - - // Both dispatchers have reservations - expect(getTotalPendingReplies()).toBe(2); - - // Config change detected - should defer - const totalActive = getTotalPendingReplies(); - expect(totalActive).toBe(2); // 2 dispatcher reservations - - // Messages process and send replies - dispatcher1.sendFinalReply({ text: "Reply from message 1" }); - dispatcher1.markComplete(); - - dispatcher2.sendFinalReply({ text: "Reply from message 2" }); - dispatcher2.markComplete(); - - // Wait for both - await Promise.all([dispatcher1.waitForIdle(), dispatcher2.waitForIdle()]); - - // All idle - expect(getTotalPendingReplies()).toBe(0); - - // Replies delivered - expect(deliveredReplies).toHaveLength(2); - }); - - it("should handle rapid config changes without losing replies", async () => { - const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); - const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); - - const deliveredReplies: string[] = []; - - // Message received — dispatcher created - const dispatcher = createReplyDispatcher({ - deliver: async (payload) => { - await new Promise((resolve) => setTimeout(resolve, 200)); // Slow network - deliveredReplies.push(payload.text ?? ""); - }, - }); - - // Config change 1, 2, 3 (rapid changes) - // All should be deferred because dispatcher has pending replies - - // Send replies - dispatcher.sendFinalReply({ text: "Processing..." }); - dispatcher.sendFinalReply({ text: "Almost done..." }); - dispatcher.sendFinalReply({ text: "Complete!" }); - dispatcher.markComplete(); - - // Wait for all replies - await dispatcher.waitForIdle(); - - // All replies should be delivered - expect(deliveredReplies).toEqual(["Processing...", "Almost done...", "Complete!"]); - - // Now restart can proceed - expect(getTotalPendingReplies()).toBe(0); - }); }); diff --git a/src/gateway/server-reload.real-scenario.test.ts b/src/gateway/server-reload.real-scenario.test.ts index c3da2723f..19ece2234 100644 --- a/src/gateway/server-reload.real-scenario.test.ts +++ b/src/gateway/server-reload.real-scenario.test.ts @@ -36,7 +36,7 @@ describe("real scenario: config change during message processing", () => { throw new Error(error); } // Slow delivery — restart checks will run during this window - await new Promise((resolve) => setTimeout(resolve, 500)); + await new Promise((resolve) => setTimeout(resolve, 150)); deliveredReplies.push(payload.text ?? ""); }, onError: () => { @@ -59,7 +59,7 @@ describe("real scenario: config change during message processing", () => { // If the tracking is broken, pending would be 0 and we'd restart. let restartTriggered = false; for (let i = 0; i < 3; i++) { - await new Promise((resolve) => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 25)); const pending = getTotalPendingReplies(); if (pending === 0) { restartTriggered = true; @@ -86,7 +86,7 @@ describe("real scenario: config change during message processing", () => { const dispatcher = createReplyDispatcher({ deliver: async (_payload) => { - await new Promise((resolve) => setTimeout(resolve, 50)); + await new Promise((resolve) => setTimeout(resolve, 10)); }, }); @@ -94,7 +94,7 @@ describe("real scenario: config change during message processing", () => { expect(getTotalPendingReplies()).toBe(1); // Simulate command processing delay BEFORE reply is enqueued - await new Promise((resolve) => setTimeout(resolve, 100)); + await new Promise((resolve) => setTimeout(resolve, 20)); // During this delay, pending should STILL be 1 (reservation active) expect(getTotalPendingReplies()).toBe(1); diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 5c0b20930..79b8389a8 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -112,8 +112,6 @@ describe("command queue", () => { await blocker; }); - // Give the event loop a tick for the task to start. - await new Promise((r) => setTimeout(r, 5)); expect(getActiveTaskCount()).toBe(1); resolve1(); @@ -136,18 +134,21 @@ describe("command queue", () => { await blocker; }); - // Give the task a tick to start. - await new Promise((r) => setTimeout(r, 5)); + vi.useFakeTimers(); + try { + const drainPromise = waitForActiveTasks(5000); - const drainPromise = waitForActiveTasks(5000); + // Resolve the blocker after a short delay. + setTimeout(() => resolve1(), 10); + await vi.advanceTimersByTimeAsync(100); - // Resolve the blocker after a short delay. - setTimeout(() => resolve1(), 50); + const { drained } = await drainPromise; + expect(drained).toBe(true); - const { drained } = await drainPromise; - expect(drained).toBe(true); - - await task; + await task; + } finally { + vi.useRealTimers(); + } }); it("waitForActiveTasks returns drained=false on timeout", async () => { @@ -160,13 +161,18 @@ describe("command queue", () => { await blocker; }); - await new Promise((r) => setTimeout(r, 5)); + vi.useFakeTimers(); + try { + const waitPromise = waitForActiveTasks(50); + await vi.advanceTimersByTimeAsync(100); + const { drained } = await waitPromise; + expect(drained).toBe(false); - const { drained } = await waitForActiveTasks(50); - expect(drained).toBe(false); - - resolve1(); - await task; + resolve1(); + await task; + } finally { + vi.useRealTimers(); + } }); it("resetAllLanes drains queued work immediately after reset", async () => { @@ -228,15 +234,12 @@ describe("command queue", () => { const first = enqueueCommandInLane(lane, async () => { await blocker1; }); - await new Promise((r) => setTimeout(r, 5)); - const drainPromise = waitForActiveTasks(2000); // Starts after waitForActiveTasks snapshot and should not block drain completion. const second = enqueueCommandInLane(lane, async () => { await blocker2; }); - await new Promise((r) => setTimeout(r, 5)); expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2); resolve1(); @@ -262,9 +265,6 @@ describe("command queue", () => { // Second task is queued behind the first. const second = enqueueCommand(async () => "second"); - // Give the first task a tick to start. - await new Promise((r) => setTimeout(r, 5)); - const removed = clearCommandLane(); expect(removed).toBe(1); // only the queued (not active) entry