diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index eca237c66..864f2cbe7 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -6,10 +6,12 @@ import { normalizeDeliveryContext, } from "../utils/delivery-context.js"; import { + applyQueueRuntimeSettings, applyQueueDropPolicy, buildCollectPrompt, - buildQueueSummaryPrompt, + clearQueueSummaryState, hasCrossChannelItems, + previewQueueSummaryPrompt, waitForQueueDebounce, } from "../utils/queue-helpers.js"; @@ -47,22 +49,6 @@ type AnnounceQueueState = { const ANNOUNCE_QUEUES = new Map(); -function previewQueueSummaryPrompt(queue: AnnounceQueueState): string | undefined { - return buildQueueSummaryPrompt({ - state: { - dropPolicy: queue.dropPolicy, - droppedCount: queue.droppedCount, - summaryLines: [...queue.summaryLines], - }, - noun: "announce", - }); -} - -function clearQueueSummaryState(queue: AnnounceQueueState) { - queue.droppedCount = 0; - queue.summaryLines = []; -} - export function resetAnnounceQueuesForTests() { // Test isolation: other suites may leave a draining queue behind in the worker. // Clearing the map alone isn't enough because drain loops capture `queue` by reference. @@ -82,16 +68,10 @@ function getAnnounceQueue( ) { const existing = ANNOUNCE_QUEUES.get(key); if (existing) { - existing.mode = settings.mode; - existing.debounceMs = - typeof settings.debounceMs === "number" - ? Math.max(0, settings.debounceMs) - : existing.debounceMs; - existing.cap = - typeof settings.cap === "number" && settings.cap > 0 - ? Math.floor(settings.cap) - : existing.cap; - existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; + applyQueueRuntimeSettings({ + target: existing, + settings, + }); existing.send = send; return existing; } @@ -107,6 +87,10 @@ function getAnnounceQueue( summaryLines: [], send, }; + applyQueueRuntimeSettings({ + target: created, + settings, + }); ANNOUNCE_QUEUES.set(key, created); return created; } @@ -152,7 +136,7 @@ function scheduleAnnounceDrain(key: string) { continue; } const items = queue.items.slice(); - const summary = previewQueueSummaryPrompt(queue); + const summary = previewQueueSummaryPrompt({ state: queue, noun: "announce" }); const prompt = buildCollectPrompt({ title: "[Queued announce messages while agent was busy]", items, @@ -171,7 +155,7 @@ function scheduleAnnounceDrain(key: string) { continue; } - const summaryPrompt = previewQueueSummaryPrompt(queue); + const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "announce" }); if (summaryPrompt) { const next = queue.items[0]; if (!next) { diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index ac2927fc0..3d739b3dc 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -1,34 +1,15 @@ import { defaultRuntime } from "../../../runtime.js"; import { buildCollectPrompt, - buildQueueSummaryPrompt, + clearQueueSummaryState, hasCrossChannelItems, + previewQueueSummaryPrompt, waitForQueueDebounce, } from "../../../utils/queue-helpers.js"; import { isRoutableChannel } from "../route-reply.js"; import { FOLLOWUP_QUEUES } from "./state.js"; import type { FollowupRun } from "./types.js"; -function previewQueueSummaryPrompt(queue: { - dropPolicy: "summarize" | "old" | "new"; - droppedCount: number; - summaryLines: string[]; -}): string | undefined { - return buildQueueSummaryPrompt({ - state: { - dropPolicy: queue.dropPolicy, - droppedCount: queue.droppedCount, - summaryLines: [...queue.summaryLines], - }, - noun: "message", - }); -} - -function clearQueueSummaryState(queue: { droppedCount: number; summaryLines: string[] }): void { - queue.droppedCount = 0; - queue.summaryLines = []; -} - export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -89,7 +70,7 @@ export function scheduleFollowupDrain( } const items = queue.items.slice(); - const summary = previewQueueSummaryPrompt(queue); + const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" }); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) { break; @@ -127,7 +108,7 @@ export function scheduleFollowupDrain( continue; } - const summaryPrompt = previewQueueSummaryPrompt(queue); + const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" }); if (summaryPrompt) { const run = queue.lastRun; if (!run) { diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index 816926622..6f135d98a 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -1,3 +1,4 @@ +import { applyQueueRuntimeSettings } from "../../../utils/queue-helpers.js"; import type { FollowupRun, QueueDropPolicy, QueueMode, QueueSettings } from "./types.js"; export type FollowupQueueState = { @@ -22,16 +23,10 @@ export const FOLLOWUP_QUEUES = new Map(); export function getFollowupQueue(key: string, settings: QueueSettings): FollowupQueueState { const existing = FOLLOWUP_QUEUES.get(key); if (existing) { - existing.mode = settings.mode; - existing.debounceMs = - typeof settings.debounceMs === "number" - ? Math.max(0, settings.debounceMs) - : existing.debounceMs; - existing.cap = - typeof settings.cap === "number" && settings.cap > 0 - ? Math.floor(settings.cap) - : existing.cap; - existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; + applyQueueRuntimeSettings({ + target: existing, + settings, + }); return existing; } @@ -52,6 +47,10 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup droppedCount: 0, summaryLines: [], }; + applyQueueRuntimeSettings({ + target: created, + settings, + }); FOLLOWUP_QUEUES.set(key, created); return created; } diff --git a/src/utils/queue-helpers.test.ts b/src/utils/queue-helpers.test.ts new file mode 100644 index 000000000..7df1a0d49 --- /dev/null +++ b/src/utils/queue-helpers.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it } from "vitest"; +import { + applyQueueRuntimeSettings, + buildQueueSummaryPrompt, + clearQueueSummaryState, + previewQueueSummaryPrompt, +} from "./queue-helpers.js"; + +describe("applyQueueRuntimeSettings", () => { + it("updates runtime queue settings with normalization", () => { + const target = { + mode: "followup" as const, + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize" as const, + }; + + applyQueueRuntimeSettings({ + target, + settings: { + mode: "collect", + debounceMs: -12, + cap: 9.8, + dropPolicy: "new", + }, + }); + + expect(target).toEqual({ + mode: "collect", + debounceMs: 0, + cap: 9, + dropPolicy: "new", + }); + }); + + it("keeps existing values when optional settings are missing/invalid", () => { + const target = { + mode: "followup" as const, + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize" as const, + }; + + applyQueueRuntimeSettings({ + target, + settings: { + mode: "queue", + cap: 0, + }, + }); + + expect(target).toEqual({ + mode: "queue", + debounceMs: 1000, + cap: 20, + dropPolicy: "summarize", + }); + }); +}); + +describe("queue summary helpers", () => { + it("previewQueueSummaryPrompt does not mutate state", () => { + const state = { + dropPolicy: "summarize" as const, + droppedCount: 2, + summaryLines: ["first", "second"], + }; + + const prompt = previewQueueSummaryPrompt({ + state, + noun: "message", + }); + + expect(prompt).toContain("[Queue overflow] Dropped 2 messages due to cap."); + expect(prompt).toContain("first"); + expect(state).toEqual({ + dropPolicy: "summarize", + droppedCount: 2, + summaryLines: ["first", "second"], + }); + }); + + it("buildQueueSummaryPrompt clears state after rendering", () => { + const state = { + dropPolicy: "summarize" as const, + droppedCount: 1, + summaryLines: ["line"], + }; + + const prompt = buildQueueSummaryPrompt({ + state, + noun: "announce", + }); + + expect(prompt).toContain("[Queue overflow] Dropped 1 announce due to cap."); + expect(state).toEqual({ + dropPolicy: "summarize", + droppedCount: 0, + summaryLines: [], + }); + }); + + it("clearQueueSummaryState resets summary counters", () => { + const state = { + dropPolicy: "summarize" as const, + droppedCount: 5, + summaryLines: ["a", "b"], + }; + clearQueueSummaryState(state); + expect(state.droppedCount).toBe(0); + expect(state.summaryLines).toEqual([]); + }); +}); diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index 990b52bb5..1a5d310c0 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -11,6 +11,53 @@ export type QueueState = QueueSummaryState & { cap: number; }; +export function clearQueueSummaryState(state: QueueSummaryState): void { + state.droppedCount = 0; + state.summaryLines = []; +} + +export function previewQueueSummaryPrompt(params: { + state: QueueSummaryState; + noun: string; + title?: string; +}): string | undefined { + return buildQueueSummaryPrompt({ + state: { + dropPolicy: params.state.dropPolicy, + droppedCount: params.state.droppedCount, + summaryLines: [...params.state.summaryLines], + }, + noun: params.noun, + title: params.title, + }); +} + +export function applyQueueRuntimeSettings(params: { + target: { + mode: TMode; + debounceMs: number; + cap: number; + dropPolicy: QueueDropPolicy; + }; + settings: { + mode: TMode; + debounceMs?: number; + cap?: number; + dropPolicy?: QueueDropPolicy; + }; +}): void { + params.target.mode = params.settings.mode; + params.target.debounceMs = + typeof params.settings.debounceMs === "number" + ? Math.max(0, params.settings.debounceMs) + : params.target.debounceMs; + params.target.cap = + typeof params.settings.cap === "number" && params.settings.cap > 0 + ? Math.floor(params.settings.cap) + : params.target.cap; + params.target.dropPolicy = params.settings.dropPolicy ?? params.target.dropPolicy; +} + export function elideQueueText(text: string, limit = 140): string { if (text.length <= limit) { return text; @@ -101,8 +148,7 @@ export function buildQueueSummaryPrompt(params: { lines.push(`- ${line}`); } } - params.state.droppedCount = 0; - params.state.summaryLines = []; + clearQueueSummaryState(params.state); return lines.join("\n"); }