fix(queue): land #33168 from @rylena
Landed from contributor PR #33168 by @rylena. Co-authored-by: Rylen Anil <rylen.anil@gmail.com>
This commit is contained in:
@@ -1,8 +1,28 @@
|
||||
import { createDedupeCache } from "../../../infra/dedupe.js";
|
||||
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
|
||||
import { kickFollowupDrainIfIdle } from "./drain.js";
|
||||
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
|
||||
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
|
||||
|
||||
const RECENT_QUEUE_MESSAGE_IDS = createDedupeCache({
|
||||
ttlMs: 5 * 60 * 1000,
|
||||
maxSize: 10_000,
|
||||
});
|
||||
|
||||
function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined {
|
||||
const messageId = run.messageId?.trim();
|
||||
if (!messageId) {
|
||||
return undefined;
|
||||
}
|
||||
const route = [
|
||||
run.originatingChannel ?? "",
|
||||
run.originatingTo ?? "",
|
||||
run.originatingAccountId ?? "",
|
||||
run.originatingThreadId == null ? "" : String(run.originatingThreadId),
|
||||
].join("|");
|
||||
return `${queueKey}|${route}|${messageId}`;
|
||||
}
|
||||
|
||||
function isRunAlreadyQueued(
|
||||
run: FollowupRun,
|
||||
items: FollowupRun[],
|
||||
@@ -31,6 +51,11 @@ export function enqueueFollowupRun(
|
||||
dedupeMode: QueueDedupeMode = "message-id",
|
||||
): boolean {
|
||||
const queue = getFollowupQueue(key, settings);
|
||||
const recentMessageIdKey = dedupeMode !== "none" ? buildRecentMessageIdKey(run, key) : undefined;
|
||||
if (recentMessageIdKey && RECENT_QUEUE_MESSAGE_IDS.peek(recentMessageIdKey)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const dedupe =
|
||||
dedupeMode === "none"
|
||||
? undefined
|
||||
@@ -54,6 +79,9 @@ export function enqueueFollowupRun(
|
||||
}
|
||||
|
||||
queue.items.push(run);
|
||||
if (recentMessageIdKey) {
|
||||
RECENT_QUEUE_MESSAGE_IDS.check(recentMessageIdKey);
|
||||
}
|
||||
// If drain finished and deleted the queue before this item arrived, a new queue
|
||||
// object was created (draining: false) but nobody scheduled a drain for it.
|
||||
// Use the cached callback to restart the drain now.
|
||||
@@ -70,3 +98,7 @@ export function getFollowupQueueDepth(key: string): number {
|
||||
}
|
||||
return queue.items.length;
|
||||
}
|
||||
|
||||
export function resetRecentQueuedMessageIdDedupe(): void {
|
||||
RECENT_QUEUE_MESSAGE_IDS.clear();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user