From 9889c6da537c4cdd518394542ac79da64dd40d52 Mon Sep 17 00:00:00 2001 From: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> Date: Tue, 3 Mar 2026 21:25:32 -0600 Subject: [PATCH] Runtime: stabilize tool/run state transitions under compaction and backpressure Synthesize runtime state transition fixes for compaction tool-use integrity and long-running handler backpressure. Sources: #33630, #33583 Co-authored-by: Kevin Shenghui Co-authored-by: Theo Tarr --- CHANGELOG.md | 1 + ...pi-embedded-helpers.validate-turns.test.ts | 193 ++++++++ src/agents/pi-embedded-helpers/turns.ts | 95 +++- src/channels/plugins/types.core.ts | 3 + src/channels/run-state-machine.test.ts | 42 ++ src/channels/run-state-machine.ts | 99 +++++ .../monitor/message-handler.process.ts | 4 + .../monitor/message-handler.queue.test.ts | 411 ++++++++++++++++++ src/discord/monitor/message-handler.ts | 93 +++- src/discord/monitor/provider.ts | 5 + src/discord/monitor/status.ts | 3 + src/gateway/channel-health-monitor.test.ts | 57 +++ src/gateway/channel-health-policy.test.ts | 62 +++ src/gateway/channel-health-policy.ts | 40 ++ src/gateway/protocol/schema/channels.ts | 3 + 15 files changed, 1090 insertions(+), 21 deletions(-) create mode 100644 src/channels/run-state-machine.test.ts create mode 100644 src/channels/run-state-machine.ts create mode 100644 src/discord/monitor/message-handler.queue.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index b65675bc5..6a993b3d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr. - Gateway/security default response headers: add `Permissions-Policy: camera=(), microphone=(), geolocation=()` to baseline gateway HTTP security headers for all responses. (#30186) thanks @habakan. - Plugins/startup loading: lazily initialize plugin runtime, split startup-critical plugin SDK imports into `openclaw/plugin-sdk/core` and `openclaw/plugin-sdk/telegram`, and preserve `api.runtime` reflection semantics for plugin compatibility. (#28620) thanks @hmemcpy. - Build/lazy runtime boundaries: replace ineffective dynamic import sites with dedicated lazy runtime boundaries across Slack slash handling, Telegram audit, CLI send deps, memory fallback, and outbound delivery paths while preserving behavior. (#33690) thanks @gumadeiras. diff --git a/src/agents/pi-embedded-helpers.validate-turns.test.ts b/src/agents/pi-embedded-helpers.validate-turns.test.ts index ff1f9628c..8ba3f3830 100644 --- a/src/agents/pi-embedded-helpers.validate-turns.test.ts +++ b/src/agents/pi-embedded-helpers.validate-turns.test.ts @@ -336,3 +336,196 @@ describe("mergeConsecutiveUserTurns", () => { expect(merged.timestamp).toBe(1000); }); }); + +describe("validateAnthropicTurns strips dangling tool_use blocks", () => { + it("should strip tool_use blocks without matching tool_result", () => { + // Simulates: user asks -> assistant has tool_use -> user responds without tool_result + // This happens after compaction trims history + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: [ + { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "text", text: "I'll check that" }, + ], + }, + { role: "user", content: [{ type: "text", text: "Hello" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + // The dangling tool_use should be stripped, but text content preserved + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([{ type: "text", text: "I'll check that" }]); + }); + + it("should preserve tool_use blocks with matching tool_result", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: [ + { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "text", text: "Here's result" }, + ], + }, + { + role: "user", + content: [ + { type: "toolResult", toolUseId: "tool-1", content: [{ type: "text", text: "Result" }] }, + { type: "text", text: "Thanks" }, + ], + }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + // tool_use should be preserved because matching tool_result exists + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([ + { type: "toolUse", id: "tool-1", name: "test", input: {} }, + { type: "text", text: "Here's result" }, + ]); + }); + + it("should insert fallback text when all content would be removed", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }], + }, + { role: "user", content: [{ type: "text", text: "Hello" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + // Should insert fallback text since all content would be removed + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([{ type: "text", text: "[tool calls omitted]" }]); + }); + + it("should handle multiple dangling tool_use blocks", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tools" }] }, + { + role: "assistant", + content: [ + { type: "toolUse", id: "tool-1", name: "test1", input: {} }, + { type: "toolUse", id: "tool-2", name: "test2", input: {} }, + { type: "text", text: "Done" }, + ], + }, + { role: "user", content: [{ type: "text", text: "OK" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + const assistantContent = (result[1] as { content?: unknown[] }).content; + // Only text content should remain + expect(assistantContent).toEqual([{ type: "text", text: "Done" }]); + }); + + it("should handle mixed tool_use with some having matching tool_result", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tools" }] }, + { + role: "assistant", + content: [ + { type: "toolUse", id: "tool-1", name: "test1", input: {} }, + { type: "toolUse", id: "tool-2", name: "test2", input: {} }, + { type: "text", text: "Done" }, + ], + }, + { + role: "user", + content: [ + { + type: "toolResult", + toolUseId: "tool-1", + content: [{ type: "text", text: "Result 1" }], + }, + { type: "text", text: "Thanks" }, + ], + }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + // tool-1 should be preserved (has matching tool_result), tool-2 stripped, text preserved + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([ + { type: "toolUse", id: "tool-1", name: "test1", input: {} }, + { type: "text", text: "Done" }, + ]); + }); + + it("should not modify messages when next is not user", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: [{ type: "toolUse", id: "tool-1", name: "test", input: {} }], + }, + // Next is assistant, not user - should not strip + { role: "assistant", content: [{ type: "text", text: "Continue" }] }, + ]); + + const result = validateAnthropicTurns(msgs); + + expect(result).toHaveLength(3); + // Original tool_use should be preserved + const assistantContent = (result[1] as { content?: unknown[] }).content; + expect(assistantContent).toEqual([{ type: "toolUse", id: "tool-1", name: "test", input: {} }]); + }); + + it("is replay-safe across repeated validation passes", () => { + const msgs = asMessages([ + { role: "user", content: [{ type: "text", text: "Use tools" }] }, + { + role: "assistant", + content: [ + { type: "toolUse", id: "tool-1", name: "test1", input: {} }, + { type: "toolUse", id: "tool-2", name: "test2", input: {} }, + { type: "text", text: "Done" }, + ], + }, + { + role: "user", + content: [ + { + type: "toolResult", + toolUseId: "tool-1", + content: [{ type: "text", text: "Result 1" }], + }, + ], + }, + ]); + + const firstPass = validateAnthropicTurns(msgs); + const secondPass = validateAnthropicTurns(firstPass); + + expect(secondPass).toEqual(firstPass); + }); + + it("does not crash when assistant content is non-array", () => { + const msgs = [ + { role: "user", content: [{ type: "text", text: "Use tool" }] }, + { + role: "assistant", + content: "legacy-content", + }, + { role: "user", content: [{ type: "text", text: "Thanks" }] }, + ] as unknown as AgentMessage[]; + + expect(() => validateAnthropicTurns(msgs)).not.toThrow(); + const result = validateAnthropicTurns(msgs); + expect(result).toHaveLength(3); + }); +}); diff --git a/src/agents/pi-embedded-helpers/turns.ts b/src/agents/pi-embedded-helpers/turns.ts index f6dddb20a..df90ee30d 100644 --- a/src/agents/pi-embedded-helpers/turns.ts +++ b/src/agents/pi-embedded-helpers/turns.ts @@ -1,5 +1,94 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; +type AnthropicContentBlock = { + type: "text" | "toolUse" | "toolResult"; + text?: string; + id?: string; + name?: string; + toolUseId?: string; +}; + +/** + * Strips dangling tool_use blocks from assistant messages when the immediately + * following user message does not contain a matching tool_result block. + * This fixes the "tool_use ids found without tool_result blocks" error from Anthropic. + */ +function stripDanglingAnthropicToolUses(messages: AgentMessage[]): AgentMessage[] { + const result: AgentMessage[] = []; + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + if (!msg || typeof msg !== "object") { + result.push(msg); + continue; + } + + const msgRole = (msg as { role?: unknown }).role as string | undefined; + if (msgRole !== "assistant") { + result.push(msg); + continue; + } + + const assistantMsg = msg as { + content?: AnthropicContentBlock[]; + }; + + // Get the next message to check for tool_result blocks + const nextMsg = messages[i + 1]; + const nextMsgRole = + nextMsg && typeof nextMsg === "object" + ? ((nextMsg as { role?: unknown }).role as string | undefined) + : undefined; + + // If next message is not user, keep the assistant message as-is + if (nextMsgRole !== "user") { + result.push(msg); + continue; + } + + // Collect tool_use_ids from the next user message's tool_result blocks + const nextUserMsg = nextMsg as { + content?: AnthropicContentBlock[]; + }; + const validToolUseIds = new Set(); + if (Array.isArray(nextUserMsg.content)) { + for (const block of nextUserMsg.content) { + if (block && block.type === "toolResult" && block.toolUseId) { + validToolUseIds.add(block.toolUseId); + } + } + } + + // Filter out tool_use blocks that don't have matching tool_result + const originalContent = Array.isArray(assistantMsg.content) ? assistantMsg.content : []; + const filteredContent = originalContent.filter((block) => { + if (!block) { + return false; + } + if (block.type !== "toolUse") { + return true; + } + // Keep tool_use if its id is in the valid set + return validToolUseIds.has(block.id || ""); + }); + + // If all content would be removed, insert a minimal fallback text block + if (originalContent.length > 0 && filteredContent.length === 0) { + result.push({ + ...assistantMsg, + content: [{ type: "text", text: "[tool calls omitted]" }], + } as AgentMessage); + } else { + result.push({ + ...assistantMsg, + content: filteredContent, + } as AgentMessage); + } + } + + return result; +} + function validateTurnsWithConsecutiveMerge(params: { messages: AgentMessage[]; role: TRole; @@ -98,10 +187,14 @@ export function mergeConsecutiveUserTurns( * Validates and fixes conversation turn sequences for Anthropic API. * Anthropic requires strict alternating user→assistant pattern. * Merges consecutive user messages together. + * Also strips dangling tool_use blocks that lack corresponding tool_result blocks. */ export function validateAnthropicTurns(messages: AgentMessage[]): AgentMessage[] { + // First, strip dangling tool_use blocks from assistant messages + const stripped = stripDanglingAnthropicToolUses(messages); + return validateTurnsWithConsecutiveMerge({ - messages, + messages: stripped, role: "user", merge: mergeConsecutiveUserTurns, }); diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 775fdef64..319daf1ac 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -120,6 +120,9 @@ export type ChannelAccountSnapshot = { lastStopAt?: number | null; lastInboundAt?: number | null; lastOutboundAt?: number | null; + busy?: boolean; + activeRuns?: number; + lastRunActivityAt?: number | null; mode?: string; dmPolicy?: string; allowFrom?: string[]; diff --git a/src/channels/run-state-machine.test.ts b/src/channels/run-state-machine.test.ts new file mode 100644 index 000000000..a46a5081a --- /dev/null +++ b/src/channels/run-state-machine.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it, vi } from "vitest"; +import { createRunStateMachine } from "./run-state-machine.js"; + +describe("createRunStateMachine", () => { + it("resets stale busy fields on init", () => { + const setStatus = vi.fn(); + createRunStateMachine({ setStatus }); + expect(setStatus).toHaveBeenCalledWith({ activeRuns: 0, busy: false }); + }); + + it("emits busy status while active and clears when done", () => { + const setStatus = vi.fn(); + const machine = createRunStateMachine({ + setStatus, + now: () => 123, + }); + machine.onRunStart(); + machine.onRunEnd(); + expect(setStatus).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ activeRuns: 1, busy: true, lastRunActivityAt: 123 }), + ); + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ activeRuns: 0, busy: false, lastRunActivityAt: 123 }), + ); + }); + + it("stops publishing after lifecycle abort", () => { + const setStatus = vi.fn(); + const abortController = new AbortController(); + const machine = createRunStateMachine({ + setStatus, + abortSignal: abortController.signal, + now: () => 999, + }); + machine.onRunStart(); + const callsBeforeAbort = setStatus.mock.calls.length; + abortController.abort(); + machine.onRunEnd(); + expect(setStatus.mock.calls.length).toBe(callsBeforeAbort); + }); +}); diff --git a/src/channels/run-state-machine.ts b/src/channels/run-state-machine.ts new file mode 100644 index 000000000..84cf7135c --- /dev/null +++ b/src/channels/run-state-machine.ts @@ -0,0 +1,99 @@ +export type RunStateStatusPatch = { + busy?: boolean; + activeRuns?: number; + lastRunActivityAt?: number | null; +}; + +export type RunStateStatusSink = (patch: RunStateStatusPatch) => void; + +type RunStateMachineParams = { + setStatus?: RunStateStatusSink; + abortSignal?: AbortSignal; + heartbeatMs?: number; + now?: () => number; +}; + +const DEFAULT_RUN_ACTIVITY_HEARTBEAT_MS = 60_000; + +export function createRunStateMachine(params: RunStateMachineParams) { + const heartbeatMs = params.heartbeatMs ?? DEFAULT_RUN_ACTIVITY_HEARTBEAT_MS; + const now = params.now ?? Date.now; + let activeRuns = 0; + let runActivityHeartbeat: ReturnType | null = null; + let lifecycleActive = !params.abortSignal?.aborted; + + const publish = () => { + if (!lifecycleActive) { + return; + } + params.setStatus?.({ + activeRuns, + busy: activeRuns > 0, + lastRunActivityAt: now(), + }); + }; + + const clearHeartbeat = () => { + if (!runActivityHeartbeat) { + return; + } + clearInterval(runActivityHeartbeat); + runActivityHeartbeat = null; + }; + + const ensureHeartbeat = () => { + if (runActivityHeartbeat || activeRuns <= 0 || !lifecycleActive) { + return; + } + runActivityHeartbeat = setInterval(() => { + if (!lifecycleActive || activeRuns <= 0) { + clearHeartbeat(); + return; + } + publish(); + }, heartbeatMs); + runActivityHeartbeat.unref?.(); + }; + + const deactivate = () => { + lifecycleActive = false; + clearHeartbeat(); + }; + + const onAbort = () => { + deactivate(); + }; + + if (params.abortSignal?.aborted) { + onAbort(); + } else { + params.abortSignal?.addEventListener("abort", onAbort, { once: true }); + } + + if (lifecycleActive) { + // Reset inherited status from previous process lifecycle. + params.setStatus?.({ + activeRuns: 0, + busy: false, + }); + } + + return { + isActive() { + return lifecycleActive; + }, + onRunStart() { + activeRuns += 1; + publish(); + ensureHeartbeat(); + }, + onRunEnd() { + activeRuns = Math.max(0, activeRuns - 1); + if (activeRuns <= 0) { + clearHeartbeat(); + } + publish(); + }, + deactivate, + }; +} diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 81d8df5fb..cf942046c 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -58,6 +58,8 @@ function sleep(ms: number): Promise { }); } +const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000; + export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { const { cfg, @@ -430,6 +432,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) error: err, }); }, + // Long tool-heavy runs are expected on Discord; keep heartbeats alive. + maxDurationMs: DISCORD_TYPING_MAX_DURATION_MS, }); // --- Discord draft stream (edit-based preview streaming) --- diff --git a/src/discord/monitor/message-handler.queue.test.ts b/src/discord/monitor/message-handler.queue.test.ts new file mode 100644 index 000000000..1424b29d4 --- /dev/null +++ b/src/discord/monitor/message-handler.queue.test.ts @@ -0,0 +1,411 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/types.js"; +import { createNoopThreadBindingManager } from "./thread-bindings.js"; + +const preflightDiscordMessageMock = vi.hoisted(() => vi.fn()); +const processDiscordMessageMock = vi.hoisted(() => vi.fn()); + +vi.mock("./message-handler.preflight.js", () => ({ + preflightDiscordMessage: preflightDiscordMessageMock, +})); + +vi.mock("./message-handler.process.js", () => ({ + processDiscordMessage: processDiscordMessageMock, +})); + +const { createDiscordMessageHandler } = await import("./message-handler.js"); + +function createDeferred() { + let resolve: (value: T | PromiseLike) => void = () => {}; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return { promise, resolve }; +} + +function createHandlerParams(overrides?: { + setStatus?: (patch: Record) => void; + abortSignal?: AbortSignal; +}) { + const cfg: OpenClawConfig = { + channels: { + discord: { + enabled: true, + token: "test-token", + groupPolicy: "allowlist", + }, + }, + messages: { + inbound: { + debounceMs: 0, + }, + }, + }; + return { + cfg, + discordConfig: cfg.channels?.discord, + accountId: "default", + token: "test-token", + runtime: { + log: vi.fn(), + error: vi.fn(), + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }, + botUserId: "bot-123", + guildHistories: new Map(), + historyLimit: 0, + mediaMaxBytes: 10_000, + textLimit: 2_000, + replyToMode: "off" as const, + dmEnabled: true, + groupDmEnabled: false, + threadBindings: createNoopThreadBindingManager("default"), + setStatus: overrides?.setStatus, + abortSignal: overrides?.abortSignal, + }; +} + +function createMessageData(messageId: string, channelId = "ch-1") { + return { + channel_id: channelId, + author: { id: "user-1" }, + message: { + id: messageId, + author: { id: "user-1", bot: false }, + content: "hello", + channel_id: channelId, + attachments: [{ id: `att-${messageId}` }], + }, + }; +} + +function createPreflightContext(channelId = "ch-1") { + return { + route: { + sessionKey: `agent:main:discord:channel:${channelId}`, + }, + baseSessionKey: `agent:main:discord:channel:${channelId}`, + messageChannelId: channelId, + }; +} + +describe("createDiscordMessageHandler queue behavior", () => { + it("resets busy counters when the handler is created", () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const setStatus = vi.fn(); + createDiscordMessageHandler(createHandlerParams({ setStatus })); + + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + activeRuns: 0, + busy: false, + }), + ); + }); + + it("returns immediately and tracks busy status while queued runs execute", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const firstRun = createDeferred(); + const secondRun = createDeferred(); + processDiscordMessageMock + .mockImplementationOnce(async () => { + await firstRun.promise; + }) + .mockImplementationOnce(async () => { + await secondRun.promise; + }); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const setStatus = vi.fn(); + const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); + + await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ + activeRuns: 1, + busy: true, + }), + ); + + await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); + + await vi.waitFor(() => { + expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + + firstRun.resolve(); + await firstRun.promise; + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + + secondRun.resolve(); + await secondRun.promise; + + await vi.waitFor(() => { + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + activeRuns: 0, + busy: false, + }), + ); + }); + }); + + it("refreshes run activity while active runs are in progress", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const runInFlight = createDeferred(); + processDiscordMessageMock.mockImplementation(async () => { + await runInFlight.promise; + }); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + let heartbeatTick: () => void = () => {}; + let capturedHeartbeat = false; + const setIntervalSpy = vi + .spyOn(globalThis, "setInterval") + .mockImplementation((callback: TimerHandler) => { + if (typeof callback === "function") { + heartbeatTick = () => { + callback(); + }; + capturedHeartbeat = true; + } + return 1 as unknown as ReturnType; + }); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval"); + + try { + const setStatus = vi.fn(); + const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); + await expect( + handler(createMessageData("m-1") as never, {} as never), + ).resolves.toBeUndefined(); + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + expect(capturedHeartbeat).toBe(true); + const busyCallsBefore = setStatus.mock.calls.filter( + ([patch]) => (patch as { busy?: boolean }).busy === true, + ).length; + + heartbeatTick(); + + const busyCallsAfter = setStatus.mock.calls.filter( + ([patch]) => (patch as { busy?: boolean }).busy === true, + ).length; + expect(busyCallsAfter).toBeGreaterThan(busyCallsBefore); + + runInFlight.resolve(); + await runInFlight.promise; + + await vi.waitFor(() => { + expect(clearIntervalSpy).toHaveBeenCalled(); + }); + } finally { + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + } + }); + + it("stops status publishing after lifecycle abort", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const runInFlight = createDeferred(); + processDiscordMessageMock.mockImplementation(async () => { + await runInFlight.promise; + }); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const setStatus = vi.fn(); + const abortController = new AbortController(); + const handler = createDiscordMessageHandler( + createHandlerParams({ setStatus, abortSignal: abortController.signal }), + ); + + await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + const callsBeforeAbort = setStatus.mock.calls.length; + abortController.abort(); + + runInFlight.resolve(); + await runInFlight.promise; + await Promise.resolve(); + + expect(setStatus.mock.calls.length).toBe(callsBeforeAbort); + }); + + it("stops status publishing after handler deactivation", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const runInFlight = createDeferred(); + processDiscordMessageMock.mockImplementation(async () => { + await runInFlight.promise; + }); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const setStatus = vi.fn(); + const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); + + await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + const callsBeforeDeactivate = setStatus.mock.calls.length; + handler.deactivate(); + + runInFlight.resolve(); + await runInFlight.promise; + await Promise.resolve(); + + expect(setStatus.mock.calls.length).toBe(callsBeforeDeactivate); + }); + + it("skips queued runs that have not started yet after deactivation", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const firstRun = createDeferred(); + processDiscordMessageMock + .mockImplementationOnce(async () => { + await firstRun.promise; + }) + .mockImplementationOnce(async () => undefined); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const handler = createDiscordMessageHandler(createHandlerParams()); + await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); + handler.deactivate(); + + firstRun.resolve(); + await firstRun.promise; + await Promise.resolve(); + + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + + it("preserves non-debounced message ordering by awaiting debouncer enqueue", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const firstPreflight = createDeferred(); + const processedMessageIds: string[] = []; + + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string; message?: { id?: string } } }) => { + const messageId = params.data.message?.id ?? "unknown"; + if (messageId === "m-1") { + await firstPreflight.promise; + } + return { + ...createPreflightContext(params.data.channel_id), + messageId, + }; + }, + ); + + processDiscordMessageMock.mockImplementation(async (ctx: { messageId?: string }) => { + processedMessageIds.push(ctx.messageId ?? "unknown"); + }); + + const handler = createDiscordMessageHandler(createHandlerParams()); + + const sequentialDispatch = (async () => { + await handler(createMessageData("m-1") as never, {} as never); + await handler(createMessageData("m-2") as never, {} as never); + })(); + + await vi.waitFor(() => { + expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); + }); + await Promise.resolve(); + expect(preflightDiscordMessageMock).toHaveBeenCalledTimes(1); + + firstPreflight.resolve(); + await sequentialDispatch; + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + expect(processedMessageIds).toEqual(["m-1", "m-2"]); + }); + + it("recovers queue progress after a run failure without leaving busy state stuck", async () => { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + const firstRun = createDeferred(); + processDiscordMessageMock + .mockImplementationOnce(async () => { + await firstRun.promise; + throw new Error("simulated run failure"); + }) + .mockImplementationOnce(async () => undefined); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const setStatus = vi.fn(); + const handler = createDiscordMessageHandler(createHandlerParams({ setStatus })); + + await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); + await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); + + firstRun.resolve(); + await firstRun.promise.catch(() => undefined); + + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + await vi.waitFor(() => { + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ activeRuns: 0, busy: false }), + ); + }); + }); +}); diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index c105a0aa3..a069a5a52 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -3,26 +3,51 @@ import { createChannelInboundDebouncer, shouldDebounceTextInbound, } from "../../channels/inbound-debounce-policy.js"; +import { createRunStateMachine } from "../../channels/run-state-machine.js"; import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js"; import { danger } from "../../globals.js"; +import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { preflightDiscordMessage } from "./message-handler.preflight.js"; -import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js"; +import type { + DiscordMessagePreflightContext, + DiscordMessagePreflightParams, +} from "./message-handler.preflight.types.js"; import { processDiscordMessage } from "./message-handler.process.js"; import { hasDiscordMessageStickers, resolveDiscordMessageChannelId, resolveDiscordMessageText, } from "./message-utils.js"; +import type { DiscordMonitorStatusSink } from "./status.js"; type DiscordMessageHandlerParams = Omit< DiscordMessagePreflightParams, "ackReactionScope" | "groupPolicy" | "data" | "client" ->; +> & { + setStatus?: DiscordMonitorStatusSink; + abortSignal?: AbortSignal; +}; + +export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { + deactivate: () => void; +}; + +function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string { + const sessionKey = ctx.route.sessionKey?.trim(); + if (sessionKey) { + return sessionKey; + } + const baseSessionKey = ctx.baseSessionKey?.trim(); + if (baseSessionKey) { + return baseSessionKey; + } + return ctx.messageChannelId; +} export function createDiscordMessageHandler( params: DiscordMessageHandlerParams, -): DiscordMessageHandler { +): DiscordMessageHandlerWithLifecycle { const { groupPolicy } = resolveOpenProviderRuntimeGroupPolicy({ providerConfigPresent: params.cfg.channels?.discord !== undefined, groupPolicy: params.discordConfig?.groupPolicy, @@ -32,6 +57,34 @@ export function createDiscordMessageHandler( params.discordConfig?.ackReactionScope ?? params.cfg.messages?.ackReactionScope ?? "group-mentions"; + const runQueue = new KeyedAsyncQueue(); + const runState = createRunStateMachine({ + setStatus: params.setStatus, + abortSignal: params.abortSignal, + }); + + const enqueueDiscordRun = (ctx: DiscordMessagePreflightContext) => { + const queueKey = resolveDiscordRunQueueKey(ctx); + void runQueue + .enqueue(queueKey, async () => { + if (!runState.isActive()) { + return; + } + runState.onRunStart(); + try { + if (!runState.isActive()) { + return; + } + await processDiscordMessage(ctx); + } finally { + runState.onRunEnd(); + } + }) + .catch((err) => { + params.runtime.error?.(danger(`discord process failed: ${String(err)}`)); + }); + }; + const { debouncer } = createChannelInboundDebouncer<{ data: DiscordMessageEvent; client: Client; @@ -84,9 +137,7 @@ export function createDiscordMessageHandler( if (!ctx) { return; } - void processDiscordMessage(ctx).catch((err) => { - params.runtime.error?.(danger(`discord process failed: ${String(err)}`)); - }); + enqueueDiscordRun(ctx); return; } const combinedBaseText = entries @@ -130,30 +181,32 @@ export function createDiscordMessageHandler( ctxBatch.MessageSidLast = ids[ids.length - 1]; } } - void processDiscordMessage(ctx).catch((err) => { - params.runtime.error?.(danger(`discord process failed: ${String(err)}`)); - }); + enqueueDiscordRun(ctx); }, onError: (err) => { params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`)); }, }); - return async (data, client) => { - try { - // Filter bot-own messages before they enter the debounce queue. - // The same check exists in preflightDiscordMessage(), but by that point - // the message has already consumed debounce capacity and blocked - // legitimate user messages. On active servers this causes cumulative - // slowdown (see #15874). - const msgAuthorId = data.message?.author?.id ?? data.author?.id; - if (params.botUserId && msgAuthorId === params.botUserId) { - return; - } + const handler: DiscordMessageHandlerWithLifecycle = async (data, client) => { + // Filter bot-own messages before they enter the debounce queue. + // The same check exists in preflightDiscordMessage(), but by that point + // the message has already consumed debounce capacity and blocked + // legitimate user messages. On active servers this causes cumulative + // slowdown (see #15874). + const msgAuthorId = data.message?.author?.id ?? data.author?.id; + if (params.botUserId && msgAuthorId === params.botUserId) { + return; + } + try { await debouncer.enqueue({ data, client }); } catch (err) { params.runtime.error?.(danger(`handler failed: ${String(err)}`)); } }; + + handler.deactivate = runState.deactivate; + + return handler; } diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index 9ed99778d..d69cc6d16 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -395,6 +395,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { } let lifecycleStarted = false; let releaseEarlyGatewayErrorGuard = () => {}; + let deactivateMessageHandler: (() => void) | undefined; let autoPresenceController: ReturnType | null = null; try { const commands: BaseCommand[] = commandSpecs.map((spec) => @@ -596,6 +597,8 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { accountId: account.accountId, token, runtime, + setStatus: opts.setStatus, + abortSignal: opts.abortSignal, botUserId, guildHistories, historyLimit, @@ -610,6 +613,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { threadBindings, discordRestFetch, }); + deactivateMessageHandler = messageHandler.deactivate; const trackInboundEvent = opts.setStatus ? () => { const at = Date.now(); @@ -679,6 +683,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { releaseEarlyGatewayErrorGuard, }); } finally { + deactivateMessageHandler?.(); autoPresenceController?.stop(); opts.setStatus?.({ connected: false }); releaseEarlyGatewayErrorGuard(); diff --git a/src/discord/monitor/status.ts b/src/discord/monitor/status.ts index 403fc7eee..8f7100e19 100644 --- a/src/discord/monitor/status.ts +++ b/src/discord/monitor/status.ts @@ -13,6 +13,9 @@ export type DiscordMonitorStatusPatch = { | null; lastInboundAt?: number | null; lastError?: string | null; + busy?: boolean; + activeRuns?: number; + lastRunActivityAt?: number | null; }; export type DiscordMonitorStatusSink = (patch: DiscordMonitorStatusPatch) => void; diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts index 2fc9ea229..3657dcb2c 100644 --- a/src/gateway/channel-health-monitor.test.ts +++ b/src/gateway/channel-health-monitor.test.ts @@ -229,6 +229,63 @@ describe("channel-health-monitor", () => { monitor.stop(); }); + it("skips restart when channel is busy with active runs", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 2, + busy: true, + lastRunActivityAt: now - 30_000, + }, + }, + }); + await expectNoRestart(manager); + }); + + it("restarts busy channels when run activity is stale", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + lastRunActivityAt: now - 26 * 60_000, + }, + }, + }); + await expectRestartedChannel(manager, "discord"); + }); + + it("restarts disconnected channels when busy flags are inherited from a prior lifecycle", async () => { + const now = Date.now(); + const manager = createSnapshotManager({ + discord: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 300_000, + activeRuns: 1, + busy: true, + lastRunActivityAt: now - 301_000, + }, + }, + }); + await expectRestartedChannel(manager, "discord"); + }); + it("skips recently-started channels while they are still connecting", async () => { const now = Date.now(); const manager = createSnapshotManager({ diff --git a/src/gateway/channel-health-policy.test.ts b/src/gateway/channel-health-policy.test.ts index 2567283da..71b8f7ce8 100644 --- a/src/gateway/channel-health-policy.test.ts +++ b/src/gateway/channel-health-policy.test.ts @@ -36,6 +36,68 @@ describe("evaluateChannelHealth", () => { expect(evaluation).toEqual({ healthy: true, reason: "startup-connect-grace" }); }); + it("treats active runs as busy even when disconnected", () => { + const now = 100_000; + const evaluation = evaluateChannelHealth( + { + running: true, + connected: false, + enabled: true, + configured: true, + activeRuns: 1, + lastRunActivityAt: now - 30_000, + }, + { + now, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: true, reason: "busy" }); + }); + + it("flags stale busy channels as stuck when run activity is too old", () => { + const now = 100_000; + const evaluation = evaluateChannelHealth( + { + running: true, + connected: false, + enabled: true, + configured: true, + activeRuns: 1, + lastRunActivityAt: now - 26 * 60_000, + }, + { + now, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: false, reason: "stuck" }); + }); + + it("ignores inherited busy flags until current lifecycle reports run activity", () => { + const now = 100_000; + const evaluation = evaluateChannelHealth( + { + running: true, + connected: false, + enabled: true, + configured: true, + lastStartAt: now - 30_000, + busy: true, + activeRuns: 1, + lastRunActivityAt: now - 31_000, + }, + { + now, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: false, reason: "disconnected" }); + }); + it("flags stale sockets when no events arrive beyond threshold", () => { const evaluation = evaluateChannelHealth( { diff --git a/src/gateway/channel-health-policy.ts b/src/gateway/channel-health-policy.ts index 6e563a590..31938a904 100644 --- a/src/gateway/channel-health-policy.ts +++ b/src/gateway/channel-health-policy.ts @@ -3,6 +3,9 @@ export type ChannelHealthSnapshot = { connected?: boolean; enabled?: boolean; configured?: boolean; + busy?: boolean; + activeRuns?: number; + lastRunActivityAt?: number | null; lastEventAt?: number | null; lastStartAt?: number | null; reconnectAttempts?: number; @@ -12,6 +15,8 @@ export type ChannelHealthEvaluationReason = | "healthy" | "unmanaged" | "not-running" + | "busy" + | "stuck" | "startup-connect-grace" | "disconnected" | "stale-socket"; @@ -33,6 +38,8 @@ function isManagedAccount(snapshot: ChannelHealthSnapshot): boolean { return snapshot.enabled !== false && snapshot.configured !== false; } +const BUSY_ACTIVITY_STALE_THRESHOLD_MS = 25 * 60_000; + export function evaluateChannelHealth( snapshot: ChannelHealthSnapshot, policy: ChannelHealthPolicy, @@ -43,6 +50,39 @@ export function evaluateChannelHealth( if (!snapshot.running) { return { healthy: false, reason: "not-running" }; } + const activeRuns = + typeof snapshot.activeRuns === "number" && Number.isFinite(snapshot.activeRuns) + ? Math.max(0, Math.trunc(snapshot.activeRuns)) + : 0; + const isBusy = snapshot.busy === true || activeRuns > 0; + const lastStartAt = + typeof snapshot.lastStartAt === "number" && Number.isFinite(snapshot.lastStartAt) + ? snapshot.lastStartAt + : null; + const lastRunActivityAt = + typeof snapshot.lastRunActivityAt === "number" && Number.isFinite(snapshot.lastRunActivityAt) + ? snapshot.lastRunActivityAt + : null; + const busyStateInitializedForLifecycle = + lastStartAt == null || (lastRunActivityAt != null && lastRunActivityAt >= lastStartAt); + + // Runtime snapshots are patch-merged, so a restarted lifecycle can temporarily + // inherit stale busy fields from the previous instance. Ignore busy short-circuit + // until run activity is known to belong to the current lifecycle. + if (isBusy) { + if (!busyStateInitializedForLifecycle) { + // Fall through to normal startup/disconnect checks below. + } else { + const runActivityAge = + lastRunActivityAt == null + ? Number.POSITIVE_INFINITY + : Math.max(0, policy.now - lastRunActivityAt); + if (runActivityAge < BUSY_ACTIVITY_STALE_THRESHOLD_MS) { + return { healthy: true, reason: "busy" }; + } + return { healthy: false, reason: "stuck" }; + } + } if (snapshot.lastStartAt != null) { const upDuration = policy.now - snapshot.lastStartAt; if (upDuration < policy.channelConnectGraceMs) { diff --git a/src/gateway/protocol/schema/channels.ts b/src/gateway/protocol/schema/channels.ts index 51f5194cc..dc85ba12a 100644 --- a/src/gateway/protocol/schema/channels.ts +++ b/src/gateway/protocol/schema/channels.ts @@ -95,6 +95,9 @@ export const ChannelAccountSnapshotSchema = Type.Object( lastStopAt: Type.Optional(Type.Integer({ minimum: 0 })), lastInboundAt: Type.Optional(Type.Integer({ minimum: 0 })), lastOutboundAt: Type.Optional(Type.Integer({ minimum: 0 })), + busy: Type.Optional(Type.Boolean()), + activeRuns: Type.Optional(Type.Integer({ minimum: 0 })), + lastRunActivityAt: Type.Optional(Type.Integer({ minimum: 0 })), lastProbeAt: Type.Optional(Type.Integer({ minimum: 0 })), mode: Type.Optional(Type.String()), dmPolicy: Type.Optional(Type.String()),