Files
openclaw/src/auto-reply/reply/queue.ts

625 lines
18 KiB
TypeScript
Raw Normal View History

2026-01-04 05:47:21 +01:00
import type { SkillSnapshot } from "../../agents/skills.js";
import { parseDurationMs } from "../../cli/parse-duration.js";
2026-01-04 14:32:47 +00:00
import type { ClawdbotConfig } from "../../config/config.js";
2026-01-04 05:47:21 +01:00
import type { SessionEntry } from "../../config/sessions.js";
import { defaultRuntime } from "../../runtime.js";
import type { OriginatingChannelType } from "../templating.js";
import type {
ElevatedLevel,
ReasoningLevel,
ThinkLevel,
VerboseLevel,
} from "./directives.js";
import { isRoutableChannel } from "./route-reply.js";
2026-01-04 05:47:21 +01:00
export type QueueMode =
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "interrupt"
| "queue";
export type QueueDropPolicy = "old" | "new" | "summarize";
export type QueueSettings = {
mode: QueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
};
export type FollowupRun = {
prompt: string;
/** Provider message ID, when available (for deduplication). */
messageId?: string;
2026-01-04 05:47:21 +01:00
summaryLine?: string;
enqueuedAt: number;
/**
* Originating channel for reply routing.
* When set, replies should be routed back to this provider
* instead of using the session's lastChannel.
*/
originatingChannel?: OriginatingChannelType;
/**
* Originating destination for reply routing.
* The chat/channel/user ID where the reply should be sent.
*/
originatingTo?: string;
/** Provider account id (multi-account). */
originatingAccountId?: string;
/** Telegram forum topic thread id. */
originatingThreadId?: number;
2026-01-04 05:47:21 +01:00
run: {
agentId: string;
agentDir: string;
2026-01-04 05:47:21 +01:00
sessionId: string;
sessionKey?: string;
messageProvider?: string;
2026-01-08 08:49:16 +01:00
agentAccountId?: string;
2026-01-04 05:47:21 +01:00
sessionFile: string;
workspaceDir: string;
2026-01-04 14:32:47 +00:00
config: ClawdbotConfig;
2026-01-04 05:47:21 +01:00
skillsSnapshot?: SkillSnapshot;
provider: string;
model: string;
authProfileId?: string;
2026-01-04 05:47:21 +01:00
thinkLevel?: ThinkLevel;
verboseLevel?: VerboseLevel;
reasoningLevel?: ReasoningLevel;
2026-01-04 05:15:42 +00:00
elevatedLevel?: ElevatedLevel;
bashElevated?: {
enabled: boolean;
allowed: boolean;
defaultLevel: ElevatedLevel;
};
2026-01-04 05:47:21 +01:00
timeoutMs: number;
blockReplyBreak: "text_end" | "message_end";
ownerNumbers?: string[];
extraSystemPrompt?: string;
enforceFinalTag?: boolean;
};
};
type FollowupQueueState = {
items: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
debounceMs: number;
cap: number;
dropPolicy: QueueDropPolicy;
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
};
const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
const DEFAULT_QUEUE_CAP = 20;
const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
function normalizeQueueMode(raw?: string): QueueMode | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "queue" || cleaned === "queued") return "steer";
if (
cleaned === "interrupt" ||
cleaned === "interrupts" ||
cleaned === "abort"
)
return "interrupt";
if (cleaned === "steer" || cleaned === "steering") return "steer";
if (
cleaned === "followup" ||
cleaned === "follow-ups" ||
cleaned === "followups"
)
return "followup";
if (cleaned === "collect" || cleaned === "coalesce") return "collect";
if (
cleaned === "steer+backlog" ||
cleaned === "steer-backlog" ||
cleaned === "steer_backlog"
)
return "steer-backlog";
return undefined;
}
function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "old" || cleaned === "oldest") return "old";
if (cleaned === "new" || cleaned === "newest") return "new";
if (cleaned === "summarize" || cleaned === "summary") return "summarize";
return undefined;
}
function parseQueueDebounce(raw?: string): number | undefined {
if (!raw) return undefined;
try {
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
if (!parsed || parsed < 0) return undefined;
return Math.round(parsed);
} catch {
return undefined;
}
}
function parseQueueCap(raw?: string): number | undefined {
if (!raw) return undefined;
const num = Number(raw);
if (!Number.isFinite(num)) return undefined;
const cap = Math.floor(num);
if (cap < 1) return undefined;
return cap;
}
function parseQueueDirectiveArgs(raw: string): {
consumed: number;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
let i = 0;
const len = raw.length;
while (i < len && /\s/.test(raw[i])) i += 1;
if (raw[i] === ":") {
i += 1;
while (i < len && /\s/.test(raw[i])) i += 1;
}
let consumed = i;
let queueMode: QueueMode | undefined;
let queueReset = false;
let rawMode: string | undefined;
let debounceMs: number | undefined;
let cap: number | undefined;
let dropPolicy: QueueDropPolicy | undefined;
let rawDebounce: string | undefined;
let rawCap: string | undefined;
let rawDrop: string | undefined;
let hasOptions = false;
const takeToken = (): string | null => {
if (i >= len) return null;
const start = i;
while (i < len && !/\s/.test(raw[i])) i += 1;
if (start === i) return null;
const token = raw.slice(start, i);
while (i < len && /\s/.test(raw[i])) i += 1;
return token;
};
while (i < len) {
const token = takeToken();
if (!token) break;
const lowered = token.trim().toLowerCase();
if (lowered === "default" || lowered === "reset" || lowered === "clear") {
queueReset = true;
consumed = i;
break;
}
if (lowered.startsWith("debounce:") || lowered.startsWith("debounce=")) {
rawDebounce = token.split(/[:=]/)[1] ?? "";
debounceMs = parseQueueDebounce(rawDebounce);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("cap:") || lowered.startsWith("cap=")) {
rawCap = token.split(/[:=]/)[1] ?? "";
cap = parseQueueCap(rawCap);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("drop:") || lowered.startsWith("drop=")) {
rawDrop = token.split(/[:=]/)[1] ?? "";
dropPolicy = normalizeQueueDropPolicy(rawDrop);
hasOptions = true;
consumed = i;
continue;
}
const mode = normalizeQueueMode(token);
if (mode) {
queueMode = mode;
rawMode = token;
consumed = i;
continue;
}
// Stop at first unrecognized token.
break;
}
return {
consumed,
queueMode,
queueReset,
rawMode,
debounceMs,
cap,
dropPolicy,
rawDebounce,
rawCap,
rawDrop,
hasOptions,
};
}
export function extractQueueDirective(body?: string): {
cleaned: string;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
hasDirective: boolean;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
if (!body)
return {
cleaned: "",
hasDirective: false,
queueReset: false,
hasOptions: false,
};
const re = /(?:^|\s)\/queue(?=$|\s|:)/i;
const match = re.exec(body);
if (!match) {
return {
cleaned: body.trim(),
hasDirective: false,
queueReset: false,
hasOptions: false,
};
}
const start = match.index + match[0].indexOf("/queue");
const argsStart = start + "/queue".length;
const args = body.slice(argsStart);
const parsed = parseQueueDirectiveArgs(args);
const cleanedRaw = `${body.slice(0, start)} ${body.slice(
argsStart + parsed.consumed,
)}`;
2026-01-04 05:47:21 +01:00
const cleaned = cleanedRaw.replace(/\s+/g, " ").trim();
return {
cleaned,
queueMode: parsed.queueMode,
queueReset: parsed.queueReset,
rawMode: parsed.rawMode,
debounceMs: parsed.debounceMs,
cap: parsed.cap,
dropPolicy: parsed.dropPolicy,
rawDebounce: parsed.rawDebounce,
rawCap: parsed.rawCap,
rawDrop: parsed.rawDrop,
hasDirective: true,
hasOptions: parsed.hasOptions,
};
}
function elideText(text: string, limit = 140): string {
if (text.length <= limit) return text;
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`;
}
function buildQueueSummaryLine(run: FollowupRun): string {
const base = run.summaryLine?.trim() || run.prompt.trim();
const cleaned = base.replace(/\s+/g, " ").trim();
return elideText(cleaned, 160);
}
function getFollowupQueue(
key: string,
settings: QueueSettings,
): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
existing.debounceMs =
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: existing.debounceMs;
existing.cap =
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: existing.cap;
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
return existing;
}
const created: FollowupQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_QUEUE_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_QUEUE_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP,
droppedCount: 0,
summaryLines: [],
};
FOLLOWUP_QUEUES.set(key, created);
return created;
}
/**
* Check if a run is already queued using a stable dedup key.
*/
function isRunAlreadyQueued(
run: FollowupRun,
queue: FollowupQueueState,
): boolean {
const hasSameRouting = (item: FollowupRun) =>
item.originatingChannel === run.originatingChannel &&
item.originatingTo === run.originatingTo &&
item.originatingAccountId === run.originatingAccountId &&
item.originatingThreadId === run.originatingThreadId;
const messageId = run.messageId?.trim();
if (messageId) {
return queue.items.some(
(item) => item.messageId?.trim() === messageId && hasSameRouting(item),
);
}
// Without a provider message id, avoid prompt-based dedupe to ensure rapid
// directive messages are not dropped.
return false;
}
2026-01-04 05:47:21 +01:00
export function enqueueFollowupRun(
key: string,
run: FollowupRun,
settings: QueueSettings,
): boolean {
const queue = getFollowupQueue(key, settings);
// Deduplicate: skip if the same message is already queued.
if (isRunAlreadyQueued(run, queue)) {
return false;
}
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
2026-01-04 05:47:21 +01:00
const cap = queue.cap;
if (cap > 0 && queue.items.length >= cap) {
if (queue.dropPolicy === "new") {
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryLines.push(buildQueueSummaryLine(item));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
}
}
queue.items.push(run);
return true;
}
async function waitForQueueDebounce(queue: FollowupQueueState): Promise<void> {
const debounceMs = Math.max(0, queue.debounceMs);
if (debounceMs <= 0) return;
while (true) {
const since = Date.now() - queue.lastEnqueuedAt;
if (since >= debounceMs) return;
await new Promise((resolve) => setTimeout(resolve, debounceMs - since));
}
}
function buildSummaryPrompt(queue: FollowupQueueState): string | undefined {
if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) {
return undefined;
}
const lines = [
`[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`,
];
if (queue.summaryLines.length > 0) {
lines.push("Summary:");
for (const line of queue.summaryLines) {
lines.push(`- ${line}`);
}
}
queue.droppedCount = 0;
queue.summaryLines = [];
return lines.join("\n");
}
function buildCollectPrompt(items: FollowupRun[], summary?: string): string {
const blocks: string[] = ["[Queued messages while agent was busy]"];
if (summary) {
blocks.push(summary);
}
items.forEach((item, idx) => {
blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim());
});
return blocks.join("\n\n");
}
/**
* Checks if queued items have different routable originating channels.
*
* Returns true if messages come from different providers (e.g., Slack + Telegram),
* meaning they cannot be safely collected into one prompt without losing routing.
* Also returns true for a mix of routable and non-routable channels.
*/
function hasCrossProviderItems(items: FollowupRun[]): boolean {
const keys = new Set<string>();
let hasUnkeyed = false;
for (const item of items) {
const channel = item.originatingChannel;
const to = item.originatingTo;
const accountId = item.originatingAccountId;
const threadId = item.originatingThreadId;
if (!channel && !to && !accountId && typeof threadId !== "number") {
hasUnkeyed = true;
continue;
}
if (!isRoutableChannel(channel) || !to) {
return true;
}
keys.add(
[
channel,
to,
accountId || "",
typeof threadId === "number" ? String(threadId) : "",
].join("|"),
);
}
if (keys.size === 0) return false;
if (hasUnkeyed) return true;
return keys.size > 1;
}
2026-01-04 05:47:21 +01:00
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = FOLLOWUP_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
void (async () => {
try {
let forceIndividualCollect = false;
2026-01-04 05:47:21 +01:00
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
// Once the batch is mixed, never collect again within this drain.
// Prevents “collect after shift” collapsing different targets.
//
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
if (forceIndividualCollect) {
const next = queue.items.shift();
if (!next) break;
await runFollowup(next);
continue;
}
// Check if messages span multiple providers.
// If so, process individually to preserve per-message routing.
const isCrossProvider = hasCrossProviderItems(queue.items);
if (isCrossProvider) {
forceIndividualCollect = true;
// Process one at a time to preserve per-message routing info.
const next = queue.items.shift();
if (!next) break;
await runFollowup(next);
continue;
}
// Same-provider messages can be safely collected.
2026-01-04 05:47:21 +01:00
const items = queue.items.splice(0, queue.items.length);
const summary = buildSummaryPrompt(queue);
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) break;
// Preserve originating channel from items when collecting same-provider.
const originatingChannel = items.find(
(i) => i.originatingChannel,
)?.originatingChannel;
const originatingTo = items.find(
(i) => i.originatingTo,
)?.originatingTo;
const originatingAccountId = items.find(
(i) => i.originatingAccountId,
)?.originatingAccountId;
const originatingThreadId = items.find(
(i) => typeof i.originatingThreadId === "number",
)?.originatingThreadId;
2026-01-04 05:47:21 +01:00
const prompt = buildCollectPrompt(items, summary);
await runFollowup({
prompt,
run,
enqueuedAt: Date.now(),
originatingChannel,
originatingTo,
originatingAccountId,
originatingThreadId,
2026-01-04 05:47:21 +01:00
});
continue;
}
const summaryPrompt = buildSummaryPrompt(queue);
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) break;
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const next = queue.items.shift();
if (!next) break;
await runFollowup(next);
}
} catch (err) {
defaultRuntime.error?.(
`followup queue drain failed for ${key}: ${String(err)}`,
);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
}
}
})();
}
Move provider to a plugin-architecture (#661) * refactor: introduce provider plugin registry * refactor: move provider CLI to plugins * docs: add provider plugin implementation notes * refactor: shift provider runtime logic into plugins * refactor: add plugin defaults and summaries * docs: update provider plugin notes * feat(commands): add /commands slash list * Auto-reply: tidy help message * Auto-reply: fix status command lint * Tests: align google shared expectations * Auto-reply: tidy help message * Auto-reply: fix status command lint * refactor: move provider routing into plugins * test: align agent routing expectations * docs: update provider plugin notes * refactor: route replies via provider plugins * docs: note route-reply plugin hooks * refactor: extend provider plugin contract * refactor: derive provider status from plugins * refactor: unify gateway provider control * refactor: use plugin metadata in auto-reply * fix: parenthesize cron target selection * refactor: derive gateway methods from plugins * refactor: generalize provider logout * refactor: route provider logout through plugins * refactor: move WhatsApp web login methods into plugin * refactor: generalize provider log prefixes * refactor: centralize default chat provider * refactor: derive provider lists from registry * refactor: move provider reload noops into plugins * refactor: resolve web login provider via alias * refactor: derive CLI provider options from plugins * refactor: derive prompt provider list from plugins * style: apply biome lint fixes * fix: resolve provider routing edge cases * docs: update provider plugin refactor notes * fix(gateway): harden agent provider routing * refactor: move provider routing into plugins * refactor: move provider CLI to plugins * refactor: derive provider lists from registry * fix: restore slash command parsing * refactor: align provider ids for schema * refactor: unify outbound target resolution * fix: keep outbound labels stable * feat: add msteams to cron surfaces * fix: clean up lint build issues * refactor: localize chat provider alias normalization * refactor: drive gateway provider lists from plugins * docs: update provider plugin notes * style: format message-provider * fix: avoid provider registry init cycles * style: sort message-provider imports * fix: relax provider alias map typing * refactor: move provider routing into plugins * refactor: add plugin pairing/config adapters * refactor: route pairing and provider removal via plugins * refactor: align auto-reply provider typing * test: stabilize telegram media mocks * docs: update provider plugin refactor notes * refactor: pluginize outbound targets * refactor: pluginize provider selection * refactor: generalize text chunk limits * docs: update provider plugin notes * refactor: generalize group session/config * fix: normalize provider id for room detection * fix: avoid provider init in system prompt * style: formatting cleanup * refactor: normalize agent delivery targets * test: update outbound delivery labels * chore: fix lint regressions * refactor: extend provider plugin adapters * refactor: move elevated/block streaming defaults to plugins * refactor: defer outbound send deps to plugins * docs: note plugin-driven streaming/elevated defaults * refactor: centralize webchat provider constant * refactor: add provider setup adapters * refactor: delegate provider add config to plugins * docs: document plugin-driven provider add * refactor: add plugin state/binding metadata * refactor: build agent provider status from plugins * docs: note plugin-driven agent bindings * refactor: centralize internal provider constant usage * fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing) * refactor: centralize default chat provider * refactor: centralize WhatsApp target normalization * refactor: move provider routing into plugins * refactor: normalize agent delivery targets * chore: fix lint regressions * fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing) * feat: expand provider plugin adapters * refactor: route auto-reply via provider plugins * fix: align WhatsApp target normalization * fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing) * refactor: centralize WhatsApp target normalization * feat: add /config chat config updates * docs: add /config get alias * feat(commands): add /commands slash list * refactor: centralize default chat provider * style: apply biome lint fixes * chore: fix lint regressions * fix: clean up whatsapp allowlist typing * style: format config command helpers * refactor: pluginize tool threading context * refactor: normalize session announce targets * docs: note new plugin threading and announce hooks * refactor: pluginize message actions * docs: update provider plugin actions notes * fix: align provider action adapters * refactor: centralize webchat checks * style: format message provider helpers * refactor: move provider onboarding into adapters * docs: note onboarding provider adapters * feat: add msteams onboarding adapter * style: organize onboarding imports * fix: normalize msteams allowFrom types * feat: add plugin text chunk limits * refactor: use plugin chunk limit fallbacks * feat: add provider mention stripping hooks * style: organize provider plugin type imports * refactor: generalize health snapshots * refactor: update macOS health snapshot handling * docs: refresh health snapshot notes * style: format health snapshot updates * refactor: drive security warnings via plugins * docs: note provider security adapter * style: format provider security adapters * refactor: centralize provider account defaults * refactor: type gateway client identity constants * chore: regen gateway protocol swift * fix: degrade health on failed provider probe * refactor: centralize pairing approve hint * docs: add plugin CLI command references * refactor: route auth and tool sends through plugins * docs: expand provider plugin hooks * refactor: document provider docking touchpoints * refactor: normalize internal provider defaults * refactor: streamline outbound delivery wiring * refactor: make provider onboarding plugin-owned * refactor: support provider-owned agent tools * refactor: move telegram draft chunking into telegram module * refactor: infer provider tool sends via extractToolSend * fix: repair plugin onboarding imports * refactor: de-dup outbound target normalization * style: tidy plugin and agent imports * refactor: data-drive provider selection line * fix: satisfy lint after provider plugin rebase * test: deflake gateway-cli coverage * style: format gateway-cli coverage test * refactor(provider-plugins): simplify provider ids * test(pairing-cli): avoid provider-specific ternary * style(macos): swiftformat HealthStore * refactor(sandbox): derive provider tool denylist * fix(sandbox): avoid plugin init in defaults * refactor(provider-plugins): centralize provider aliases * style(test): satisfy biome * refactor(protocol): v3 providers.status maps * refactor(ui): adapt to protocol v3 * refactor(macos): adapt to protocol v3 * test: update providers.status v3 fixtures * refactor(gateway): map provider runtime snapshot * test(gateway): update reload runtime snapshot * refactor(whatsapp): normalize heartbeat provider id * docs(refactor): update provider plugin notes * style: satisfy biome after rebase * fix: describe sandboxed elevated in prompt * feat(gateway): add agent image attachments + live probe * refactor: derive CLI provider options from plugins * fix(gateway): harden agent provider routing * fix(gateway): harden agent provider routing * refactor: align provider ids for schema * fix(protocol): keep agent provider string * fix(gateway): harden agent provider routing * fix(protocol): keep agent provider string * refactor: normalize agent delivery targets * refactor: support provider-owned agent tools * refactor(config): provider-keyed elevated allowFrom * style: satisfy biome * fix(gateway): appease provider narrowing * style: satisfy biome * refactor(reply): move group intro hints into plugin * fix(reply): avoid plugin registry init cycle * refactor(providers): add lightweight provider dock * refactor(gateway): use typed client id in connect * refactor(providers): document docks and avoid init cycles * refactor(providers): make media limit helper generic * fix(providers): break plugin registry import cycles * style: satisfy biome * refactor(status-all): build providers table from plugins * refactor(gateway): delegate web login to provider plugin * refactor(provider): drop web alias * refactor(provider): lazy-load monitors * style: satisfy lint/format * style: format status-all providers table * style: swiftformat gateway discovery model * test: make reload plan plugin-driven * fix: avoid token stringification in status-all * refactor: make provider IDs explicit in status * feat: warn on signal/imessage provider runtime errors * test: cover gateway provider runtime warnings in status * fix: add runtime kind to provider status issues * test: cover health degradation on probe failure * fix: keep routeReply lightweight * style: organize routeReply imports * refactor(web): extract auth-store helpers * refactor(whatsapp): lazy login imports * refactor(outbound): route replies via plugin outbound * docs: update provider plugin notes * style: format provider status issues * fix: make sandbox scope warning wrap-safe * refactor: load outbound adapters from provider plugins * docs: update provider plugin outbound notes * style(macos): fix swiftformat lint * docs: changelog for provider plugins * fix(macos): satisfy swiftformat * fix(macos): open settings via menu action * style: format after rebase * fix(macos): open Settings via menu action --------- Co-authored-by: LK <luke@kyohere.com> Co-authored-by: Luke K (pr-0f3t) <2609441+lc0rp@users.noreply.github.com> Co-authored-by: Xin <xin@imfing.com>
2026-01-11 11:45:25 +00:00
function defaultQueueModeForProvider(_provider?: string): QueueMode {
2026-01-04 05:47:21 +01:00
return "collect";
}
export function resolveQueueSettings(params: {
2026-01-04 14:32:47 +00:00
cfg: ClawdbotConfig;
provider?: string;
2026-01-04 05:47:21 +01:00
sessionEntry?: SessionEntry;
inlineMode?: QueueMode;
inlineOptions?: Partial<QueueSettings>;
}): QueueSettings {
const providerKey = params.provider?.trim().toLowerCase();
const queueCfg = params.cfg.messages?.queue;
const providerModeRaw =
providerKey && queueCfg?.byProvider
? (queueCfg.byProvider as Record<string, string | undefined>)[providerKey]
2026-01-04 05:47:21 +01:00
: undefined;
const resolvedMode =
params.inlineMode ??
normalizeQueueMode(params.sessionEntry?.queueMode) ??
normalizeQueueMode(providerModeRaw) ??
2026-01-04 05:47:21 +01:00
normalizeQueueMode(queueCfg?.mode) ??
defaultQueueModeForProvider(providerKey);
2026-01-04 05:47:21 +01:00
const debounceRaw =
params.inlineOptions?.debounceMs ??
params.sessionEntry?.queueDebounceMs ??
queueCfg?.debounceMs ??
DEFAULT_QUEUE_DEBOUNCE_MS;
const capRaw =
params.inlineOptions?.cap ??
params.sessionEntry?.queueCap ??
queueCfg?.cap ??
DEFAULT_QUEUE_CAP;
const dropRaw =
params.inlineOptions?.dropPolicy ??
params.sessionEntry?.queueDrop ??
normalizeQueueDropPolicy(queueCfg?.drop) ??
DEFAULT_QUEUE_DROP;
return {
mode: resolvedMode,
debounceMs:
typeof debounceRaw === "number" ? Math.max(0, debounceRaw) : undefined,
cap:
typeof capRaw === "number" ? Math.max(1, Math.floor(capRaw)) : undefined,
dropPolicy: dropRaw,
};
}
2026-01-07 07:21:56 +01:00
export function getFollowupQueueDepth(key: string): number {
const cleaned = key.trim();
if (!cleaned) return 0;
const queue = FOLLOWUP_QUEUES.get(cleaned);
if (!queue) return 0;
return queue.items.length;
}