2026-01-14 01:08:15 +00:00
|
|
|
import { defaultRuntime } from "../../../runtime.js";
|
2026-01-17 06:01:37 +00:00
|
|
|
import {
|
|
|
|
|
buildCollectPrompt,
|
2026-02-22 17:54:19 +00:00
|
|
|
beginQueueDrain,
|
2026-02-18 17:54:56 +00:00
|
|
|
clearQueueSummaryState,
|
2026-02-22 17:54:19 +00:00
|
|
|
drainCollectQueueStep,
|
2026-02-19 06:43:17 +00:00
|
|
|
drainNextQueueItem,
|
2026-01-17 06:01:37 +00:00
|
|
|
hasCrossChannelItems,
|
2026-02-18 17:54:56 +00:00
|
|
|
previewQueueSummaryPrompt,
|
2026-01-17 06:01:37 +00:00
|
|
|
waitForQueueDebounce,
|
|
|
|
|
} from "../../../utils/queue-helpers.js";
|
2026-01-14 01:08:15 +00:00
|
|
|
import { isRoutableChannel } from "../route-reply.js";
|
|
|
|
|
import { FOLLOWUP_QUEUES } from "./state.js";
|
2026-02-18 01:34:35 +00:00
|
|
|
import type { FollowupRun } from "./types.js";
|
2026-01-14 01:08:15 +00:00
|
|
|
|
2026-02-24 23:28:26 +00:00
|
|
|
type OriginRoutingMetadata = Pick<
|
|
|
|
|
FollowupRun,
|
|
|
|
|
"originatingChannel" | "originatingTo" | "originatingAccountId" | "originatingThreadId"
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
|
function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetadata {
|
|
|
|
|
return {
|
|
|
|
|
originatingChannel: items.find((item) => item.originatingChannel)?.originatingChannel,
|
|
|
|
|
originatingTo: items.find((item) => item.originatingTo)?.originatingTo,
|
|
|
|
|
originatingAccountId: items.find((item) => item.originatingAccountId)?.originatingAccountId,
|
|
|
|
|
// Support both number (Telegram topic) and string (Slack thread_ts) thread IDs.
|
|
|
|
|
originatingThreadId: items.find(
|
|
|
|
|
(item) => item.originatingThreadId != null && item.originatingThreadId !== "",
|
|
|
|
|
)?.originatingThreadId,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string } {
|
|
|
|
|
const { originatingChannel: channel, originatingTo: to, originatingAccountId: accountId } = item;
|
|
|
|
|
const threadId = item.originatingThreadId;
|
|
|
|
|
if (!channel && !to && !accountId && (threadId == null || threadId === "")) {
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
if (!isRoutableChannel(channel) || !to) {
|
|
|
|
|
return { cross: true };
|
|
|
|
|
}
|
|
|
|
|
// Support both number (Telegram topic IDs) and string (Slack thread_ts) thread IDs.
|
|
|
|
|
const threadKey = threadId != null && threadId !== "" ? String(threadId) : "";
|
|
|
|
|
return {
|
|
|
|
|
key: [channel, to, accountId || "", threadKey].join("|"),
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-14 01:08:15 +00:00
|
|
|
export function scheduleFollowupDrain(
|
|
|
|
|
key: string,
|
|
|
|
|
runFollowup: (run: FollowupRun) => Promise<void>,
|
|
|
|
|
): void {
|
2026-02-22 17:54:19 +00:00
|
|
|
const queue = beginQueueDrain(FOLLOWUP_QUEUES, key);
|
|
|
|
|
if (!queue) {
|
2026-01-31 16:19:20 +09:00
|
|
|
return;
|
|
|
|
|
}
|
2026-01-14 01:08:15 +00:00
|
|
|
void (async () => {
|
|
|
|
|
try {
|
2026-02-22 17:54:19 +00:00
|
|
|
const collectState = { forceIndividualCollect: false };
|
2026-01-14 01:08:15 +00:00
|
|
|
while (queue.items.length > 0 || queue.droppedCount > 0) {
|
|
|
|
|
await waitForQueueDebounce(queue);
|
|
|
|
|
if (queue.mode === "collect") {
|
|
|
|
|
// Once the batch is mixed, never collect again within this drain.
|
|
|
|
|
// Prevents “collect after shift” collapsing different targets.
|
|
|
|
|
//
|
2026-02-22 13:26:31 -05:00
|
|
|
// Debug: `pnpm test src/auto-reply/reply/reply-flow.test.ts`
|
2026-01-14 01:08:15 +00:00
|
|
|
// Check if messages span multiple channels.
|
|
|
|
|
// If so, process individually to preserve per-message routing.
|
2026-02-24 23:28:26 +00:00
|
|
|
const isCrossChannel = hasCrossChannelItems(queue.items, resolveCrossChannelKey);
|
2026-01-14 01:08:15 +00:00
|
|
|
|
2026-02-22 17:54:19 +00:00
|
|
|
const collectDrainResult = await drainCollectQueueStep({
|
|
|
|
|
collectState,
|
2026-02-19 07:00:44 +00:00
|
|
|
isCrossChannel,
|
|
|
|
|
items: queue.items,
|
|
|
|
|
run: runFollowup,
|
|
|
|
|
});
|
|
|
|
|
if (collectDrainResult === "empty") {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (collectDrainResult === "drained") {
|
2026-01-14 01:08:15 +00:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-14 20:23:23 -08:00
|
|
|
const items = queue.items.slice();
|
2026-02-18 17:54:56 +00:00
|
|
|
const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" });
|
2026-01-14 01:08:15 +00:00
|
|
|
const run = items.at(-1)?.run ?? queue.lastRun;
|
2026-01-31 16:19:20 +09:00
|
|
|
if (!run) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
2026-01-14 01:08:15 +00:00
|
|
|
|
2026-02-24 23:28:26 +00:00
|
|
|
const routing = resolveOriginRoutingMetadata(items);
|
2026-01-14 01:08:15 +00:00
|
|
|
|
2026-01-17 06:01:37 +00:00
|
|
|
const prompt = buildCollectPrompt({
|
|
|
|
|
title: "[Queued messages while agent was busy]",
|
|
|
|
|
items,
|
|
|
|
|
summary,
|
|
|
|
|
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
|
|
|
|
|
});
|
2026-01-14 01:08:15 +00:00
|
|
|
await runFollowup({
|
|
|
|
|
prompt,
|
|
|
|
|
run,
|
|
|
|
|
enqueuedAt: Date.now(),
|
2026-02-24 23:28:26 +00:00
|
|
|
...routing,
|
2026-01-14 01:08:15 +00:00
|
|
|
});
|
2026-02-14 20:23:23 -08:00
|
|
|
queue.items.splice(0, items.length);
|
|
|
|
|
if (summary) {
|
|
|
|
|
clearQueueSummaryState(queue);
|
|
|
|
|
}
|
2026-01-14 01:08:15 +00:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-18 17:54:56 +00:00
|
|
|
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" });
|
2026-01-14 01:08:15 +00:00
|
|
|
if (summaryPrompt) {
|
|
|
|
|
const run = queue.lastRun;
|
2026-01-31 16:19:20 +09:00
|
|
|
if (!run) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
2026-02-19 06:43:17 +00:00
|
|
|
if (
|
2026-02-24 23:13:51 +00:00
|
|
|
!(await drainNextQueueItem(queue.items, async (item) => {
|
2026-02-19 06:43:17 +00:00
|
|
|
await runFollowup({
|
|
|
|
|
prompt: summaryPrompt,
|
|
|
|
|
run,
|
|
|
|
|
enqueuedAt: Date.now(),
|
2026-02-24 23:13:51 +00:00
|
|
|
originatingChannel: item.originatingChannel,
|
|
|
|
|
originatingTo: item.originatingTo,
|
|
|
|
|
originatingAccountId: item.originatingAccountId,
|
|
|
|
|
originatingThreadId: item.originatingThreadId,
|
2026-02-19 06:43:17 +00:00
|
|
|
});
|
|
|
|
|
}))
|
|
|
|
|
) {
|
2026-02-14 20:23:23 -08:00
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
clearQueueSummaryState(queue);
|
2026-01-14 01:08:15 +00:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-19 06:43:17 +00:00
|
|
|
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
|
2026-01-31 16:19:20 +09:00
|
|
|
break;
|
|
|
|
|
}
|
2026-01-14 01:08:15 +00:00
|
|
|
}
|
|
|
|
|
} catch (err) {
|
2026-02-14 20:23:23 -08:00
|
|
|
queue.lastEnqueuedAt = Date.now();
|
2026-01-14 14:31:43 +00:00
|
|
|
defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`);
|
2026-01-14 01:08:15 +00:00
|
|
|
} finally {
|
|
|
|
|
queue.draining = false;
|
|
|
|
|
if (queue.items.length === 0 && queue.droppedCount === 0) {
|
|
|
|
|
FOLLOWUP_QUEUES.delete(key);
|
|
|
|
|
} else {
|
|
|
|
|
scheduleFollowupDrain(key, runFollowup);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})();
|
|
|
|
|
}
|