From e6f67d5f3150e1febd687f1535d2d646c5fda900 Mon Sep 17 00:00:00 2001 From: Michael Verrilli Date: Sat, 14 Feb 2026 14:24:20 -0500 Subject: [PATCH] fix(agent): prevent session lock deadlock on timeout during compaction (#9855) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 64a28900f183941a496a6fd5baaa9efcfb38f0f8 Co-authored-by: mverrilli <816450+mverrilli@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + ...pi-agent.auth-profile-rotation.e2e.test.ts | 49 ++++++++ .../run.overflow-compaction.e2e.test.ts | 1 + src/agents/pi-embedded-runner/run.ts | 13 ++- src/agents/pi-embedded-runner/run/attempt.ts | 105 +++++++++++++++--- .../run/compaction-timeout.e2e.test.ts | 61 ++++++++++ .../run/compaction-timeout.ts | 54 +++++++++ src/agents/pi-embedded-runner/run/types.ts | 2 + .../pi-embedded-subscribe.handlers.types.ts | 2 + ...ction-retries-before-resolving.e2e.test.ts | 32 ++++++ src/agents/pi-embedded-subscribe.ts | 65 ++++++++++- 11 files changed, 365 insertions(+), 20 deletions(-) create mode 100644 src/agents/pi-embedded-runner/run/compaction-timeout.e2e.test.ts create mode 100644 src/agents/pi-embedded-runner/run/compaction-timeout.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c31a0dd46..6c7ec070c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior. - Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla. - BlueBubbles: include sender identity in group chat envelopes and pass clean message text to the agent prompt, aligning with iMessage/Signal formatting. (#16210) Thanks @zerone0x. - WhatsApp: honor per-account `dmPolicy` overrides (account-level settings now take precedence over channel defaults for inbound DMs). (#10082) Thanks @mcaxtr. diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index 51cfc40ac..24d6b1c38 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -47,6 +47,7 @@ const buildAssistant = (overrides: Partial): AssistantMessage const makeAttempt = (overrides: Partial): EmbeddedRunAttemptResult => ({ aborted: false, timedOut: false, + timedOutDuringCompaction: false, promptError: null, sessionIdUsed: "session:test", systemPromptReport: undefined, @@ -174,6 +175,54 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { } }); + it("does not rotate for compaction timeouts", async () => { + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-")); + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-")); + try { + await writeAuthStore(agentDir); + + runEmbeddedAttemptMock.mockResolvedValueOnce( + makeAttempt({ + aborted: true, + timedOut: true, + timedOutDuringCompaction: true, + assistantTexts: ["partial"], + lastAssistant: buildAssistant({ + stopReason: "stop", + content: [{ type: "text", text: "partial" }], + }), + }), + ); + + const result = await runEmbeddedPiAgent({ + sessionId: "session:test", + sessionKey: "agent:test:compaction-timeout", + sessionFile: path.join(workspaceDir, "session.jsonl"), + workspaceDir, + agentDir, + config: makeConfig(), + prompt: "hello", + provider: "openai", + model: "mock-1", + authProfileId: "openai:p1", + authProfileIdSource: "auto", + timeoutMs: 5_000, + runId: "run:compaction-timeout", + }); + + expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1); + expect(result.meta.aborted).toBe(true); + + const stored = JSON.parse( + await fs.readFile(path.join(agentDir, "auth-profiles.json"), "utf-8"), + ) as { usageStats?: Record }; + expect(stored.usageStats?.["openai:p2"]?.lastUsed).toBe(2); + } finally { + await fs.rm(agentDir, { recursive: true, force: true }); + await fs.rm(workspaceDir, { recursive: true, force: true }); + } + }); + it("does not rotate for user-pinned profiles", async () => { const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-")); const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-")); diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.e2e.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.e2e.test.ts index 059ceb2c4..170b0c65c 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.e2e.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.e2e.test.ts @@ -206,6 +206,7 @@ function makeAttemptResult( return { aborted: false, timedOut: false, + timedOutDuringCompaction: false, promptError: null, sessionIdUsed: "test-session", assistantTexts: ["Hello!"], diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 96dc8db03..74a07e37f 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -480,7 +480,14 @@ export async function runEmbeddedPiAgent( enforceFinalTag: params.enforceFinalTag, }); - const { aborted, promptError, timedOut, sessionIdUsed, lastAssistant } = attempt; + const { + aborted, + promptError, + timedOut, + timedOutDuringCompaction, + sessionIdUsed, + lastAssistant, + } = attempt; const lastAssistantUsage = normalizeUsage(lastAssistant?.usage as UsageLike); const attemptUsage = attempt.attemptUsage ?? lastAssistantUsage; mergeUsageIntoAccumulator(usageAccumulator, attemptUsage); @@ -801,7 +808,9 @@ export async function runEmbeddedPiAgent( } // Treat timeout as potential rate limit (Antigravity hangs on rate limit) - const shouldRotate = (!aborted && failoverFailure) || timedOut; + // But exclude post-prompt compaction timeouts (model succeeded; no profile issue) + const shouldRotate = + (!aborted && failoverFailure) || (timedOut && !timedOutDuringCompaction); if (shouldRotate) { if (lastProfileId) { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 98c67404e..7c58a6880 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -91,6 +91,10 @@ import { import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; +import { + selectCompactionTimeoutSnapshot, + shouldFlagCompactionTimeout, +} from "./compaction-timeout.js"; import { detectAndLoadPromptImages } from "./images.js"; export function injectHistoryImagesIntoMessages( @@ -665,6 +669,7 @@ export async function runEmbeddedAttempt( let aborted = Boolean(params.abortSignal?.aborted); let timedOut = false; + let timedOutDuringCompaction = false; const getAbortReason = (signal: AbortSignal): unknown => "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; const makeTimeoutAbortReason = (): Error => { @@ -769,6 +774,15 @@ export async function runEmbeddedAttempt( `embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`, ); } + if ( + shouldFlagCompactionTimeout({ + isTimeout: true, + isCompactionPendingOrRetrying: subscription.isCompacting(), + isCompactionInFlight: activeSession.isCompacting, + }) + ) { + timedOutDuringCompaction = true; + } abortRun(true); if (!abortWarnTimer) { abortWarnTimer = setTimeout(() => { @@ -791,6 +805,15 @@ export async function runEmbeddedAttempt( const onAbort = () => { const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; const timeout = reason ? isTimeoutError(reason) : false; + if ( + shouldFlagCompactionTimeout({ + isTimeout: timeout, + isCompactionPendingOrRetrying: subscription.isCompacting(), + isCompactionInFlight: activeSession.isCompacting, + }) + ) { + timedOutDuringCompaction = true; + } abortRun(timeout, reason); }; if (params.abortSignal) { @@ -939,13 +962,28 @@ export async function runEmbeddedAttempt( ); } + // Capture snapshot before compaction wait so we have complete messages if timeout occurs + // Check compaction state before and after to avoid race condition where compaction starts during capture + // Use session state (not subscription) for snapshot decisions - need instantaneous compaction status + const wasCompactingBefore = activeSession.isCompacting; + const snapshot = activeSession.messages.slice(); + const wasCompactingAfter = activeSession.isCompacting; + // Only trust snapshot if compaction wasn't running before or after capture + const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot; + const preCompactionSessionId = activeSession.sessionId; + try { - await waitForCompactionRetry(); + await abortable(waitForCompactionRetry()); } catch (err) { if (isRunnerAbortError(err)) { if (!promptError) { promptError = err; } + if (!isProbeSession) { + log.debug( + `compaction wait aborted: runId=${params.runId} sessionId=${params.sessionId}`, + ); + } } else { throw err; } @@ -956,27 +994,51 @@ export async function runEmbeddedAttempt( // inserted between compaction and the next prompt — breaking the // prepareCompaction() guard that checks the last entry type, leading to // double-compaction. See: https://github.com/openclaw/openclaw/issues/9282 - const shouldTrackCacheTtl = - params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" && - isCacheTtlEligibleProvider(params.provider, params.modelId); - if (shouldTrackCacheTtl) { - appendCacheTtlTimestamp(sessionManager, { - timestamp: Date.now(), - provider: params.provider, - modelId: params.modelId, - }); + // Skip when timed out during compaction — session state may be inconsistent. + if (!timedOutDuringCompaction) { + const shouldTrackCacheTtl = + params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" && + isCacheTtlEligibleProvider(params.provider, params.modelId); + if (shouldTrackCacheTtl) { + appendCacheTtlTimestamp(sessionManager, { + timestamp: Date.now(), + provider: params.provider, + modelId: params.modelId, + }); + } } - messagesSnapshot = activeSession.messages.slice(); - sessionIdUsed = activeSession.sessionId; + // If timeout occurred during compaction, use pre-compaction snapshot when available + // (compaction restructures messages but does not add user/assistant turns). + const snapshotSelection = selectCompactionTimeoutSnapshot({ + timedOutDuringCompaction, + preCompactionSnapshot, + preCompactionSessionId, + currentSnapshot: activeSession.messages.slice(), + currentSessionId: activeSession.sessionId, + }); + if (timedOutDuringCompaction) { + if (!isProbeSession) { + log.warn( + `using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`, + ); + } + } + messagesSnapshot = snapshotSelection.messagesSnapshot; + sessionIdUsed = snapshotSelection.sessionIdUsed; cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, - note: promptError ? "prompt error" : undefined, + note: timedOutDuringCompaction + ? "compaction timeout" + : promptError + ? "prompt error" + : undefined, }); anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError); // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await + // Run even on compaction timeout so plugins can log/cleanup if (hookRunner?.hasHooks("agent_end")) { hookRunner .runAgentEnd( @@ -1003,7 +1065,21 @@ export async function runEmbeddedAttempt( if (abortWarnTimer) { clearTimeout(abortWarnTimer); } - unsubscribe(); + if (!isProbeSession && (aborted || timedOut) && !timedOutDuringCompaction) { + log.debug( + `run cleanup: runId=${params.runId} sessionId=${params.sessionId} aborted=${aborted} timedOut=${timedOut}`, + ); + } + try { + unsubscribe(); + } catch (err) { + // unsubscribe() should never throw; if it does, it indicates a serious bug. + // Log at error level to ensure visibility, but don't rethrow in finally block + // as it would mask any exception from the try block above. + log.error( + `CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`, + ); + } clearActiveEmbeddedRun(params.sessionId, queueHandle); params.abortSignal?.removeEventListener?.("abort", onAbort); } @@ -1023,6 +1099,7 @@ export async function runEmbeddedAttempt( return { aborted, timedOut, + timedOutDuringCompaction, promptError, sessionIdUsed, systemPromptReport, diff --git a/src/agents/pi-embedded-runner/run/compaction-timeout.e2e.test.ts b/src/agents/pi-embedded-runner/run/compaction-timeout.e2e.test.ts new file mode 100644 index 000000000..ce4351e39 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/compaction-timeout.e2e.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it } from "vitest"; +import { + selectCompactionTimeoutSnapshot, + shouldFlagCompactionTimeout, +} from "./compaction-timeout.js"; + +describe("compaction-timeout helpers", () => { + it("flags compaction timeout consistently for internal and external timeout sources", () => { + const internalTimer = shouldFlagCompactionTimeout({ + isTimeout: true, + isCompactionPendingOrRetrying: true, + isCompactionInFlight: false, + }); + const externalAbort = shouldFlagCompactionTimeout({ + isTimeout: true, + isCompactionPendingOrRetrying: true, + isCompactionInFlight: false, + }); + expect(internalTimer).toBe(true); + expect(externalAbort).toBe(true); + }); + + it("does not flag when timeout is false", () => { + expect( + shouldFlagCompactionTimeout({ + isTimeout: false, + isCompactionPendingOrRetrying: true, + isCompactionInFlight: true, + }), + ).toBe(false); + }); + + it("uses pre-compaction snapshot when compaction timeout occurs", () => { + const pre = [{ role: "assistant", content: "pre" }] as const; + const current = [{ role: "assistant", content: "current" }] as const; + const selected = selectCompactionTimeoutSnapshot({ + timedOutDuringCompaction: true, + preCompactionSnapshot: [...pre], + preCompactionSessionId: "session-pre", + currentSnapshot: [...current], + currentSessionId: "session-current", + }); + expect(selected.source).toBe("pre-compaction"); + expect(selected.sessionIdUsed).toBe("session-pre"); + expect(selected.messagesSnapshot).toEqual(pre); + }); + + it("falls back to current snapshot when pre-compaction snapshot is unavailable", () => { + const current = [{ role: "assistant", content: "current" }] as const; + const selected = selectCompactionTimeoutSnapshot({ + timedOutDuringCompaction: true, + preCompactionSnapshot: null, + preCompactionSessionId: "session-pre", + currentSnapshot: [...current], + currentSessionId: "session-current", + }); + expect(selected.source).toBe("current"); + expect(selected.sessionIdUsed).toBe("session-current"); + expect(selected.messagesSnapshot).toEqual(current); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/compaction-timeout.ts b/src/agents/pi-embedded-runner/run/compaction-timeout.ts new file mode 100644 index 000000000..45a945257 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/compaction-timeout.ts @@ -0,0 +1,54 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +export type CompactionTimeoutSignal = { + isTimeout: boolean; + isCompactionPendingOrRetrying: boolean; + isCompactionInFlight: boolean; +}; + +export function shouldFlagCompactionTimeout(signal: CompactionTimeoutSignal): boolean { + if (!signal.isTimeout) { + return false; + } + return signal.isCompactionPendingOrRetrying || signal.isCompactionInFlight; +} + +export type SnapshotSelectionParams = { + timedOutDuringCompaction: boolean; + preCompactionSnapshot: AgentMessage[] | null; + preCompactionSessionId: string; + currentSnapshot: AgentMessage[]; + currentSessionId: string; +}; + +export type SnapshotSelection = { + messagesSnapshot: AgentMessage[]; + sessionIdUsed: string; + source: "pre-compaction" | "current"; +}; + +export function selectCompactionTimeoutSnapshot( + params: SnapshotSelectionParams, +): SnapshotSelection { + if (!params.timedOutDuringCompaction) { + return { + messagesSnapshot: params.currentSnapshot, + sessionIdUsed: params.currentSessionId, + source: "current", + }; + } + + if (params.preCompactionSnapshot) { + return { + messagesSnapshot: params.preCompactionSnapshot, + sessionIdUsed: params.preCompactionSessionId, + source: "pre-compaction", + }; + } + + return { + messagesSnapshot: params.currentSnapshot, + sessionIdUsed: params.currentSessionId, + source: "current", + }; +} diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index a8f95a688..dc05d0a12 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -24,6 +24,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { export type EmbeddedRunAttemptResult = { aborted: boolean; timedOut: boolean; + /** True if the timeout occurred while compaction was in progress or pending. */ + timedOutDuringCompaction: boolean; promptError: unknown; sessionIdUsed: string; systemPromptReport?: SessionSystemPromptReport; diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index fcb6a3e75..4dd091ab5 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -55,7 +55,9 @@ export type EmbeddedPiSubscribeState = { compactionInFlight: boolean; pendingCompactionRetry: number; compactionRetryResolve?: () => void; + compactionRetryReject?: (reason?: unknown) => void; compactionRetryPromise: Promise | null; + unsubscribed: boolean; messagingToolSentTexts: string[]; messagingToolSentTextsNormalized: string[]; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.e2e.test.ts index c9ca1eeca..2f9610825 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.waits-multiple-compaction-retries-before-resolving.e2e.test.ts @@ -97,6 +97,38 @@ describe("subscribeEmbeddedPiSession", () => { { phase: "end", willRetry: false }, ]); }); + + it("rejects compaction wait with AbortError when unsubscribed", async () => { + const listeners: SessionEventHandler[] = []; + const abortCompaction = vi.fn(); + const session = { + isCompacting: true, + abortCompaction, + subscribe: (listener: SessionEventHandler) => { + listeners.push(listener); + return () => {}; + }, + } as unknown as Parameters[0]["session"]; + + const subscription = subscribeEmbeddedPiSession({ + session, + runId: "run-abort-on-unsubscribe", + }); + + for (const listener of listeners) { + listener({ type: "auto_compaction_start" }); + } + + const waitPromise = subscription.waitForCompactionRetry(); + subscription.unsubscribe(); + + await expect(waitPromise).rejects.toMatchObject({ name: "AbortError" }); + await expect(subscription.waitForCompactionRetry()).rejects.toMatchObject({ + name: "AbortError", + }); + expect(abortCompaction).toHaveBeenCalledTimes(1); + }); + it("emits tool summaries at tool start when verbose is on", async () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 146066d21..8f8b1cadd 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -65,7 +65,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar compactionInFlight: false, pendingCompactionRetry: 0, compactionRetryResolve: undefined, + compactionRetryReject: undefined, compactionRetryPromise: null, + unsubscribed: false, messagingToolSentTexts: [], messagingToolSentTextsNormalized: [], messagingToolSentTargets: [], @@ -203,8 +205,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const ensureCompactionPromise = () => { if (!state.compactionRetryPromise) { - state.compactionRetryPromise = new Promise((resolve) => { + // Create a single promise that resolves when ALL pending compactions complete + // (tracked by pendingCompactionRetry counter, decremented in resolveCompactionRetry) + state.compactionRetryPromise = new Promise((resolve, reject) => { state.compactionRetryResolve = resolve; + state.compactionRetryReject = reject; + }); + // Prevent unhandled rejection if rejected after all consumers have resolved + state.compactionRetryPromise.catch((err) => { + log.debug(`compaction promise rejected (no waiter): ${String(err)}`); }); } }; @@ -222,6 +231,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) { state.compactionRetryResolve?.(); state.compactionRetryResolve = undefined; + state.compactionRetryReject = undefined; state.compactionRetryPromise = null; } }; @@ -230,6 +240,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) { state.compactionRetryResolve?.(); state.compactionRetryResolve = undefined; + state.compactionRetryReject = undefined; state.compactionRetryPromise = null; } }; @@ -608,13 +619,47 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar getCompactionCount: () => compactionCount, }; - const unsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx)); + const sessionUnsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx)); + + const unsubscribe = () => { + if (state.unsubscribed) { + return; + } + // Mark as unsubscribed FIRST to prevent waitForCompactionRetry from creating + // new un-resolvable promises during teardown. + state.unsubscribed = true; + // Reject pending compaction wait to unblock awaiting code. + // Don't resolve, as that would incorrectly signal "compaction complete" when it's still in-flight. + if (state.compactionRetryPromise) { + log.debug(`unsubscribe: rejecting compaction wait runId=${params.runId}`); + const reject = state.compactionRetryReject; + state.compactionRetryResolve = undefined; + state.compactionRetryReject = undefined; + state.compactionRetryPromise = null; + // Reject with AbortError so it's caught by isAbortError() check in cleanup paths + const abortErr = new Error("Unsubscribed during compaction"); + abortErr.name = "AbortError"; + reject?.(abortErr); + } + // Cancel any in-flight compaction to prevent resource leaks when unsubscribing. + // Only abort if compaction is actually running to avoid unnecessary work. + if (params.session.isCompacting) { + log.debug(`unsubscribe: aborting in-flight compaction runId=${params.runId}`); + try { + params.session.abortCompaction(); + } catch (err) { + log.warn(`unsubscribe: compaction abort failed runId=${params.runId} err=${String(err)}`); + } + } + sessionUnsubscribe(); + }; return { assistantTexts, toolMetas, unsubscribe, isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0, + isCompactionInFlight: () => state.compactionInFlight, getMessagingToolSentTexts: () => messagingToolSentTexts.slice(), getMessagingToolSentTargets: () => messagingToolSentTargets.slice(), // Returns true if any messaging tool successfully sent a message. @@ -625,15 +670,27 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar getUsageTotals, getCompactionCount: () => compactionCount, waitForCompactionRetry: () => { + // Reject after unsubscribe so callers treat it as cancellation, not success + if (state.unsubscribed) { + const err = new Error("Unsubscribed during compaction wait"); + err.name = "AbortError"; + return Promise.reject(err); + } if (state.compactionInFlight || state.pendingCompactionRetry > 0) { ensureCompactionPromise(); return state.compactionRetryPromise ?? Promise.resolve(); } - return new Promise((resolve) => { + return new Promise((resolve, reject) => { queueMicrotask(() => { + if (state.unsubscribed) { + const err = new Error("Unsubscribed during compaction wait"); + err.name = "AbortError"; + reject(err); + return; + } if (state.compactionInFlight || state.pendingCompactionRetry > 0) { ensureCompactionPromise(); - void (state.compactionRetryPromise ?? Promise.resolve()).then(resolve); + void (state.compactionRetryPromise ?? Promise.resolve()).then(resolve, reject); } else { resolve(); }