diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cfe6b137..b38e55a25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Discord/Reconnect integrity: release Discord message listener lane immediately while preserving serialized handler execution, add HELLO-stall resume-first recovery with bounded fresh-identify fallback after repeated stalls, and extend lifecycle/listener regression coverage for forced reconnect scenarios. Landed from contributor PR #29508 by @cgdusek. Thanks @cgdusek. - Security/Skills: harden skill installer metadata parsing by rejecting unsafe installer specs (brew/node/go/uv/download) and constrain plugin-declared skill directories to the plugin root (including symlink-escape checks), with regression coverage. - ACP/Harness thread spawn routing: force ACP harness thread creation through `sessions_spawn` (`runtime: "acp"`, `thread: true`) and explicitly forbid `message action=thread-create` for ACP harness requests, avoiding misrouted `Unknown channel` errors. (#30957) Thanks @dutifulbob. - Docs/ACP permissions: document the correct `permissionMode` default (`approve-reads`) and clarify non-interactive permission failure behavior/troubleshooting guidance. (#31044) Thanks @barronlroth. diff --git a/extensions/feishu/src/client.test.ts b/extensions/feishu/src/client.test.ts index f429f8837..fd7cffd1a 100644 --- a/extensions/feishu/src/client.test.ts +++ b/extensions/feishu/src/client.test.ts @@ -84,12 +84,14 @@ describe("createFeishuWSClient proxy handling", () => { createFeishuWSClient(baseAccount); - const expectedHttpsProxy = process.env.https_proxy || process.env.HTTPS_PROXY; + // On Windows env keys are case-insensitive, so setting HTTPS_PROXY may + // overwrite https_proxy. We assert https proxies still win over http. + const expectedProxy = process.env.https_proxy || process.env.HTTPS_PROXY; + expect(expectedProxy).toBeTruthy(); expect(httpsProxyAgentCtorMock).toHaveBeenCalledTimes(1); - expect(expectedHttpsProxy).toBeTruthy(); - expect(httpsProxyAgentCtorMock).toHaveBeenCalledWith(expectedHttpsProxy); + expect(httpsProxyAgentCtorMock).toHaveBeenCalledWith(expectedProxy); const options = firstWsClientOptions(); - expect(options.agent).toEqual({ proxyUrl: expectedHttpsProxy }); + expect(options.agent).toEqual({ proxyUrl: expectedProxy }); }); it("passes HTTP_PROXY to ws client when https vars are unset", () => { diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index 4e185d965..6f555ede6 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -88,16 +88,7 @@ describe("DiscordMessageListener", () => { }; } - async function expectPending(promise: Promise) { - let resolved = false; - void promise.then(() => { - resolved = true; - }); - await Promise.resolve(); - expect(resolved).toBe(false); - } - - it("awaits the handler before returning", async () => { + it("returns immediately while handler continues in background", async () => { let handlerResolved = false; const deferred = createDeferred(); const handler = vi.fn(async () => { @@ -111,19 +102,56 @@ describe("DiscordMessageListener", () => { {} as unknown as import("@buape/carbon").Client, ); - // Handler should be called but not yet resolved - expect(handler).toHaveBeenCalledOnce(); + // handle() returns immediately while the background queue starts on the next tick. + await expect(handlePromise).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledOnce(); + }); expect(handlerResolved).toBe(false); - await expectPending(handlePromise); - // Release the handler + // Release and let background handler finish. deferred.resolve(); - - // Now await handle() - it should complete only after handler resolves - await handlePromise; + await Promise.resolve(); expect(handlerResolved).toBe(true); }); + it("queues subsequent events until prior message handling completes", async () => { + const first = createDeferred(); + const second = createDeferred(); + let runCount = 0; + const handler = vi.fn(async () => { + runCount += 1; + if (runCount === 1) { + await first.promise; + return; + } + await second.promise; + }); + const listener = new DiscordMessageListener(handler); + + await expect( + listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ), + ).resolves.toBeUndefined(); + await expect( + listener.handle( + {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, + {} as unknown as import("@buape/carbon").Client, + ), + ).resolves.toBeUndefined(); + + expect(handler).toHaveBeenCalledTimes(1); + first.resolve(); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + + second.resolve(); + await Promise.resolve(); + }); + it("logs handler failures", async () => { const logger = { warn: vi.fn(), @@ -138,9 +166,9 @@ describe("DiscordMessageListener", () => { {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, {} as unknown as import("@buape/carbon").Client, ); - await Promise.resolve(); - - expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed")); + await vi.waitFor(() => { + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed")); + }); }); it("logs slow handlers after the threshold", async () => { @@ -156,21 +184,20 @@ describe("DiscordMessageListener", () => { } as unknown as ReturnType; const listener = new DiscordMessageListener(handler, logger); - // Start handle() but don't await yet + // handle() should release immediately. const handlePromise = listener.handle( {} as unknown as import("./monitor/listeners.js").DiscordMessageEvent, {} as unknown as import("@buape/carbon").Client, ); - await expectPending(handlePromise); + await expect(handlePromise).resolves.toBeUndefined(); + expect(logger.warn).not.toHaveBeenCalled(); - // Advance time past the slow listener threshold + // Advance wall clock past the slow listener threshold. vi.setSystemTime(31_000); - // Release the handler + // Release the background handler and allow slow-log finalizer to run. deferred.resolve(); - - // Now await handle() - it should complete and log the slow listener - await handlePromise; + await Promise.resolve(); expect(logger.warn).toHaveBeenCalled(); const warnMock = logger.warn as unknown as { mock: { calls: unknown[][] } }; diff --git a/src/discord/monitor/listeners.test.ts b/src/discord/monitor/listeners.test.ts new file mode 100644 index 000000000..00eef1cb0 --- /dev/null +++ b/src/discord/monitor/listeners.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it, vi } from "vitest"; +import { DiscordMessageListener } from "./listeners.js"; + +function createLogger() { + return { + error: vi.fn(), + warn: vi.fn(), + }; +} + +describe("DiscordMessageListener", () => { + it("returns immediately without awaiting handler completion", async () => { + let resolveHandler: (() => void) | undefined; + const handlerDone = new Promise((resolve) => { + resolveHandler = resolve; + }); + const handler = vi.fn(async () => { + await handlerDone; + }); + const logger = createLogger(); + const listener = new DiscordMessageListener(handler as never, logger as never); + + await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + expect(handler).toHaveBeenCalledTimes(1); + expect(logger.error).not.toHaveBeenCalled(); + + resolveHandler?.(); + await handlerDone; + }); + + it("serializes queued handler runs while handle returns immediately", async () => { + let firstResolve: (() => void) | undefined; + let secondResolve: (() => void) | undefined; + const firstDone = new Promise((resolve) => { + firstResolve = resolve; + }); + const secondDone = new Promise((resolve) => { + secondResolve = resolve; + }); + let runCount = 0; + const handler = vi.fn(async () => { + runCount += 1; + if (runCount === 1) { + await firstDone; + return; + } + await secondDone; + }); + const listener = new DiscordMessageListener(handler as never, createLogger() as never); + + await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + + // Second event is queued until the first handler run settles. + expect(handler).toHaveBeenCalledTimes(1); + firstResolve?.(); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + + secondResolve?.(); + await secondDone; + }); + + it("logs async handler failures", async () => { + const handler = vi.fn(async () => { + throw new Error("boom"); + }); + const logger = createLogger(); + const listener = new DiscordMessageListener(handler as never, logger as never); + + await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("discord handler failed: Error: boom"), + ); + }); + }); +}); diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index e6679c4b9..edbbb7f2c 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -118,6 +118,8 @@ export function registerDiscordListener(listeners: Array, listener: obje } export class DiscordMessageListener extends MessageCreateListener { + private messageQueue: Promise = Promise.resolve(); + constructor( private handler: DiscordMessageHandler, private logger?: Logger, @@ -126,15 +128,25 @@ export class DiscordMessageListener extends MessageCreateListener { } async handle(data: DiscordMessageEvent, client: Client) { - await runDiscordListenerWithSlowLog({ - logger: this.logger, - listener: this.constructor.name, - event: this.type, - run: () => this.handler(data, client), - onError: (err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }, + // Release Carbon's dispatch lane immediately, but keep our message handler + // serialized to avoid unbounded parallel model/IO work on traffic bursts. + this.messageQueue = this.messageQueue + .catch(() => {}) + .then(() => + runDiscordListenerWithSlowLog({ + logger: this.logger, + listener: this.constructor.name, + event: this.type, + run: () => this.handler(data, client), + onError: (err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }, + }), + ); + void this.messageQueue.catch((err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); }); } } diff --git a/src/discord/monitor/provider.lifecycle.test.ts b/src/discord/monitor/provider.lifecycle.test.ts index f29bd8e8c..5e1986f18 100644 --- a/src/discord/monitor/provider.lifecycle.test.ts +++ b/src/discord/monitor/provider.lifecycle.test.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "node:events"; import type { Client } from "@buape/carbon"; import { beforeEach, describe, expect, it, vi } from "vitest"; import type { RuntimeEnv } from "../../runtime.js"; @@ -11,9 +12,10 @@ const { waitForDiscordGatewayStopMock, } = vi.hoisted(() => { const stopGatewayLoggingMock = vi.fn(); + const getDiscordGatewayEmitterMock = vi.fn<() => EventEmitter | undefined>(() => undefined); return { attachDiscordGatewayLoggingMock: vi.fn(() => stopGatewayLoggingMock), - getDiscordGatewayEmitterMock: vi.fn(() => undefined), + getDiscordGatewayEmitterMock, waitForDiscordGatewayStopMock: vi.fn(() => Promise.resolve()), registerGatewayMock: vi.fn(), unregisterGatewayMock: vi.fn(), @@ -51,6 +53,19 @@ describe("runDiscordGatewayLifecycle", () => { stop?: () => Promise; isDisallowedIntentsError?: (err: unknown) => boolean; pendingGatewayErrors?: unknown[]; + gateway?: { + isConnected?: boolean; + options?: Record; + disconnect?: () => void; + connect?: (resume?: boolean) => void; + state?: { + sessionId?: string | null; + resumeGatewayUrl?: string | null; + sequence?: number | null; + }; + sequence?: number | null; + emitter?: EventEmitter; + }; }) => { const start = vi.fn(params?.start ?? (async () => undefined)); const stop = vi.fn(params?.stop ?? (async () => undefined)); @@ -72,7 +87,9 @@ describe("runDiscordGatewayLifecycle", () => { releaseEarlyGatewayErrorGuard, lifecycleParams: { accountId: params?.accountId ?? "default", - client: { getPlugin: vi.fn(() => undefined) } as unknown as Client, + client: { + getPlugin: vi.fn((name: string) => (name === "gateway" ? params?.gateway : undefined)), + } as unknown as Client, runtime, isDisallowedIntentsError: params?.isDisallowedIntentsError ?? (() => false), voiceManager: null, @@ -203,4 +220,99 @@ describe("runDiscordGatewayLifecycle", () => { releaseEarlyGatewayErrorGuard, }); }); + + it("retries stalled HELLO with resume before forcing fresh identify", async () => { + vi.useFakeTimers(); + try { + const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js"); + const emitter = new EventEmitter(); + const gateway = { + isConnected: false, + options: {}, + disconnect: vi.fn(), + connect: vi.fn(), + state: { + sessionId: "session-1", + resumeGatewayUrl: "wss://gateway.discord.gg", + sequence: 123, + }, + sequence: 123, + emitter, + }; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + }); + + const { lifecycleParams } = createLifecycleHarness({ gateway }); + await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); + + expect(gateway.disconnect).toHaveBeenCalledTimes(3); + expect(gateway.connect).toHaveBeenNthCalledWith(1, true); + expect(gateway.connect).toHaveBeenNthCalledWith(2, true); + expect(gateway.connect).toHaveBeenNthCalledWith(3, false); + expect(gateway.state.sessionId).toBeNull(); + expect(gateway.state.resumeGatewayUrl).toBeNull(); + expect(gateway.state.sequence).toBeNull(); + expect(gateway.sequence).toBeNull(); + } finally { + vi.useRealTimers(); + } + }); + + it("resets HELLO stall counter after a successful reconnect that drops quickly", async () => { + vi.useFakeTimers(); + try { + const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js"); + const emitter = new EventEmitter(); + const gateway = { + isConnected: false, + options: {}, + disconnect: vi.fn(), + connect: vi.fn(), + state: { + sessionId: "session-2", + resumeGatewayUrl: "wss://gateway.discord.gg", + sequence: 456, + }, + sequence: 456, + emitter, + }; + getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter); + waitForDiscordGatewayStopMock.mockImplementationOnce(async () => { + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + + // Successful reconnect (READY/RESUMED sets isConnected=true), then + // quick drop before the HELLO timeout window finishes. + gateway.isConnected = true; + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(10); + emitter.emit("debug", "WebSocket connection closed with code 1006"); + gateway.isConnected = false; + + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + + emitter.emit("debug", "WebSocket connection opened"); + await vi.advanceTimersByTimeAsync(30000); + }); + + const { lifecycleParams } = createLifecycleHarness({ gateway }); + await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined(); + + expect(gateway.connect).toHaveBeenCalledTimes(3); + expect(gateway.connect).toHaveBeenNthCalledWith(1, true); + expect(gateway.connect).toHaveBeenNthCalledWith(2, true); + expect(gateway.connect).toHaveBeenNthCalledWith(3, true); + expect(gateway.connect).not.toHaveBeenCalledWith(false); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/src/discord/monitor/provider.lifecycle.ts b/src/discord/monitor/provider.lifecycle.ts index 489657d08..e208114c2 100644 --- a/src/discord/monitor/provider.lifecycle.ts +++ b/src/discord/monitor/provider.lifecycle.ts @@ -51,24 +51,95 @@ export async function runDiscordGatewayLifecycle(params: { } const HELLO_TIMEOUT_MS = 30000; + const HELLO_CONNECTED_POLL_MS = 250; + const MAX_CONSECUTIVE_HELLO_STALLS = 3; let helloTimeoutId: ReturnType | undefined; + let helloConnectedPollId: ReturnType | undefined; + let consecutiveHelloStalls = 0; + const clearHelloWatch = () => { + if (helloTimeoutId) { + clearTimeout(helloTimeoutId); + helloTimeoutId = undefined; + } + if (helloConnectedPollId) { + clearInterval(helloConnectedPollId); + helloConnectedPollId = undefined; + } + }; + const resetHelloStallCounter = () => { + consecutiveHelloStalls = 0; + }; + const clearResumeState = () => { + const mutableGateway = gateway as + | (GatewayPlugin & { + state?: { + sessionId?: string | null; + resumeGatewayUrl?: string | null; + sequence?: number | null; + }; + sequence?: number | null; + }) + | undefined; + if (!mutableGateway?.state) { + return; + } + mutableGateway.state.sessionId = null; + mutableGateway.state.resumeGatewayUrl = null; + mutableGateway.state.sequence = null; + mutableGateway.sequence = null; + }; const onGatewayDebug = (msg: unknown) => { const message = String(msg); + if (message.includes("WebSocket connection closed")) { + // Carbon marks `isConnected` true only after READY/RESUMED and flips it + // false during reconnect handling after this debug line is emitted. + if (gateway?.isConnected) { + resetHelloStallCounter(); + } + clearHelloWatch(); + return; + } if (!message.includes("WebSocket connection opened")) { return; } - if (helloTimeoutId) { - clearTimeout(helloTimeoutId); - } - helloTimeoutId = setTimeout(() => { + clearHelloWatch(); + + let sawConnected = gateway?.isConnected === true; + helloConnectedPollId = setInterval(() => { if (!gateway?.isConnected) { + return; + } + sawConnected = true; + resetHelloStallCounter(); + if (helloConnectedPollId) { + clearInterval(helloConnectedPollId); + helloConnectedPollId = undefined; + } + }, HELLO_CONNECTED_POLL_MS); + + helloTimeoutId = setTimeout(() => { + if (helloConnectedPollId) { + clearInterval(helloConnectedPollId); + helloConnectedPollId = undefined; + } + if (sawConnected || gateway?.isConnected) { + resetHelloStallCounter(); + } else { + consecutiveHelloStalls += 1; + const forceFreshIdentify = consecutiveHelloStalls >= MAX_CONSECUTIVE_HELLO_STALLS; params.runtime.log?.( danger( - `connection stalled: no HELLO received within ${HELLO_TIMEOUT_MS}ms, forcing reconnect`, + forceFreshIdentify + ? `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify` + : `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`, ), ); + if (forceFreshIdentify) { + clearResumeState(); + resetHelloStallCounter(); + } gateway?.disconnect(); - gateway?.connect(false); + gateway?.connect(!forceFreshIdentify); } helloTimeoutId = undefined; }, HELLO_TIMEOUT_MS); @@ -137,9 +208,7 @@ export async function runDiscordGatewayLifecycle(params: { params.releaseEarlyGatewayErrorGuard?.(); unregisterGateway(params.accountId); stopGatewayLogging(); - if (helloTimeoutId) { - clearTimeout(helloTimeoutId); - } + clearHelloWatch(); gatewayEmitter?.removeListener("debug", onGatewayDebug); params.abortSignal?.removeEventListener("abort", onAbort); if (params.voiceManager) {