diff --git a/extensions/feishu/src/monitor.startup.test.ts b/extensions/feishu/src/monitor.startup.test.ts index 5abd61cc5..2c142e85e 100644 --- a/extensions/feishu/src/monitor.startup.test.ts +++ b/extensions/feishu/src/monitor.startup.test.ts @@ -1,5 +1,6 @@ import type { ClawdbotConfig } from "openclaw/plugin-sdk"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js"; const probeFeishuMock = vi.hoisted(() => vi.fn()); @@ -12,7 +13,22 @@ vi.mock("./client.js", () => ({ createEventDispatcher: vi.fn(() => ({ register: vi.fn() })), })); -import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js"; +vi.mock("./runtime.js", () => ({ + getFeishuRuntime: () => ({ + channel: { + debounce: { + resolveInboundDebounceMs: () => 0, + createInboundDebouncer: () => ({ + enqueue: async () => {}, + flushKey: async () => {}, + }), + }, + text: { + hasControlCommand: () => false, + }, + }, + }), +})); function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig { return { diff --git a/extensions/feishu/src/monitor.webhook-security.test.ts b/extensions/feishu/src/monitor.webhook-security.test.ts index b98450092..bca56edb5 100644 --- a/extensions/feishu/src/monitor.webhook-security.test.ts +++ b/extensions/feishu/src/monitor.webhook-security.test.ts @@ -2,7 +2,34 @@ import { createServer } from "node:http"; import type { AddressInfo } from "node:net"; import type { ClawdbotConfig } from "openclaw/plugin-sdk"; import { afterEach, describe, expect, it, vi } from "vitest"; -import { probeFeishuMock } from "./monitor.test-mocks.js"; + +const probeFeishuMock = vi.hoisted(() => vi.fn()); + +vi.mock("./probe.js", () => ({ + probeFeishu: probeFeishuMock, +})); + +vi.mock("./client.js", () => ({ + createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })), + createEventDispatcher: vi.fn(() => ({ register: vi.fn() })), +})); + +vi.mock("./runtime.js", () => ({ + getFeishuRuntime: () => ({ + channel: { + debounce: { + resolveInboundDebounceMs: () => 0, + createInboundDebouncer: () => ({ + enqueue: async () => {}, + flushKey: async () => {}, + }), + }, + text: { + hasControlCommand: () => false, + }, + }, + }), +})); vi.mock("@larksuiteoapi/node-sdk", () => ({ adaptDefault: vi.fn( diff --git a/src/agents/models-config.ts b/src/agents/models-config.ts index 3de079426..e31d61044 100644 --- a/src/agents/models-config.ts +++ b/src/agents/models-config.ts @@ -111,6 +111,95 @@ async function readJson(pathname: string): Promise { } } +async function resolveProvidersForModelsJson(params: { + cfg: OpenClawConfig; + agentDir: string; +}): Promise> { + const { cfg, agentDir } = params; + const explicitProviders = cfg.models?.providers ?? {}; + const implicitProviders = await resolveImplicitProviders({ agentDir, explicitProviders }); + const providers: Record = mergeProviders({ + implicit: implicitProviders, + explicit: explicitProviders, + }); + + const implicitBedrock = await resolveImplicitBedrockProvider({ agentDir, config: cfg }); + if (implicitBedrock) { + const existing = providers["amazon-bedrock"]; + providers["amazon-bedrock"] = existing + ? mergeProviderModels(implicitBedrock, existing) + : implicitBedrock; + } + + const implicitCopilot = await resolveImplicitCopilotProvider({ agentDir }); + if (implicitCopilot && !providers["github-copilot"]) { + providers["github-copilot"] = implicitCopilot; + } + return providers; +} + +function mergeWithExistingProviderSecrets(params: { + nextProviders: Record; + existingProviders: Record[string]>; +}): Record { + const { nextProviders, existingProviders } = params; + const mergedProviders: Record = {}; + for (const [key, entry] of Object.entries(existingProviders)) { + mergedProviders[key] = entry; + } + for (const [key, newEntry] of Object.entries(nextProviders)) { + const existing = existingProviders[key] as + | (NonNullable[string] & { + apiKey?: string; + baseUrl?: string; + }) + | undefined; + if (!existing) { + mergedProviders[key] = newEntry; + continue; + } + const preserved: Record = {}; + if (typeof existing.apiKey === "string" && existing.apiKey) { + preserved.apiKey = existing.apiKey; + } + if (typeof existing.baseUrl === "string" && existing.baseUrl) { + preserved.baseUrl = existing.baseUrl; + } + mergedProviders[key] = { ...newEntry, ...preserved }; + } + return mergedProviders; +} + +async function resolveProvidersForMode(params: { + mode: NonNullable; + targetPath: string; + providers: Record; +}): Promise> { + if (params.mode !== "merge") { + return params.providers; + } + const existing = await readJson(params.targetPath); + if (!isRecord(existing) || !isRecord(existing.providers)) { + return params.providers; + } + const existingProviders = existing.providers as Record< + string, + NonNullable[string] + >; + return mergeWithExistingProviderSecrets({ + nextProviders: params.providers, + existingProviders, + }); +} + +async function readRawFile(pathname: string): Promise { + try { + return await fs.readFile(pathname, "utf8"); + } catch { + return ""; + } +} + export async function ensureOpenClawModelsJson( config?: OpenClawConfig, agentDirOverride?: string, @@ -124,23 +213,7 @@ export async function ensureOpenClawModelsJson( // through the full loadConfig() pipeline which applies these. applyConfigEnvVars(cfg); - const explicitProviders = cfg.models?.providers ?? {}; - const implicitProviders = await resolveImplicitProviders({ agentDir, explicitProviders }); - const providers: Record = mergeProviders({ - implicit: implicitProviders, - explicit: explicitProviders, - }); - const implicitBedrock = await resolveImplicitBedrockProvider({ agentDir, config: cfg }); - if (implicitBedrock) { - const existing = providers["amazon-bedrock"]; - providers["amazon-bedrock"] = existing - ? mergeProviderModels(implicitBedrock, existing) - : implicitBedrock; - } - const implicitCopilot = await resolveImplicitCopilotProvider({ agentDir }); - if (implicitCopilot && !providers["github-copilot"]) { - providers["github-copilot"] = implicitCopilot; - } + const providers = await resolveProvidersForModelsJson({ cfg, agentDir }); if (Object.keys(providers).length === 0) { return { agentDir, wrote: false }; @@ -148,53 +221,18 @@ export async function ensureOpenClawModelsJson( const mode = cfg.models?.mode ?? DEFAULT_MODE; const targetPath = path.join(agentDir, "models.json"); - - let mergedProviders = providers; - let existingRaw = ""; - if (mode === "merge") { - const existing = await readJson(targetPath); - if (isRecord(existing) && isRecord(existing.providers)) { - const existingProviders = existing.providers as Record< - string, - NonNullable[string] - >; - mergedProviders = {}; - for (const [key, entry] of Object.entries(existingProviders)) { - mergedProviders[key] = entry; - } - for (const [key, newEntry] of Object.entries(providers)) { - const existing = existingProviders[key] as - | (NonNullable[string] & { - apiKey?: string; - baseUrl?: string; - }) - | undefined; - if (existing) { - const preserved: Record = {}; - if (typeof existing.apiKey === "string" && existing.apiKey) { - preserved.apiKey = existing.apiKey; - } - if (typeof existing.baseUrl === "string" && existing.baseUrl) { - preserved.baseUrl = existing.baseUrl; - } - mergedProviders[key] = { ...newEntry, ...preserved }; - } else { - mergedProviders[key] = newEntry; - } - } - } - } + const mergedProviders = await resolveProvidersForMode({ + mode, + targetPath, + providers, + }); const normalizedProviders = normalizeProviders({ providers: mergedProviders, agentDir, }); const next = `${JSON.stringify({ providers: normalizedProviders }, null, 2)}\n`; - try { - existingRaw = await fs.readFile(targetPath, "utf8"); - } catch { - existingRaw = ""; - } + const existingRaw = await readRawFile(targetPath); if (existingRaw === next) { return { agentDir, wrote: false }; diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 38d20d2fa..5dc26a6b4 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -39,14 +39,16 @@ type DebounceBuffer = { debounceMs: number; }; -export function createInboundDebouncer(params: { +export type InboundDebounceCreateParams = { debounceMs: number; buildKey: (item: T) => string | null | undefined; shouldDebounce?: (item: T) => boolean; resolveDebounceMs?: (item: T) => number | undefined; onFlush: (items: T[]) => Promise; onError?: (err: unknown, items: T[]) => void; -}) { +}; + +export function createInboundDebouncer(params: InboundDebounceCreateParams) { const buffers = new Map>(); const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs)); diff --git a/src/channels/inbound-debounce-policy.test.ts b/src/channels/inbound-debounce-policy.test.ts new file mode 100644 index 000000000..f17276aa3 --- /dev/null +++ b/src/channels/inbound-debounce-policy.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it, vi } from "vitest"; +import { + createChannelInboundDebouncer, + shouldDebounceTextInbound, +} from "./inbound-debounce-policy.js"; + +describe("shouldDebounceTextInbound", () => { + it("rejects blank text, media, and control commands", () => { + const cfg = {} as Parameters[0]["cfg"]; + + expect(shouldDebounceTextInbound({ text: " ", cfg })).toBe(false); + expect(shouldDebounceTextInbound({ text: "hello", cfg, hasMedia: true })).toBe(false); + expect(shouldDebounceTextInbound({ text: "/status", cfg })).toBe(false); + }); + + it("accepts normal text when debounce is allowed", () => { + const cfg = {} as Parameters[0]["cfg"]; + expect(shouldDebounceTextInbound({ text: "hello there", cfg })).toBe(true); + expect(shouldDebounceTextInbound({ text: "hello there", cfg, allowDebounce: false })).toBe( + false, + ); + }); +}); + +describe("createChannelInboundDebouncer", () => { + it("resolves per-channel debounce and forwards callbacks", async () => { + vi.useFakeTimers(); + try { + const flushed: string[][] = []; + const cfg = { + messages: { + inbound: { + debounceMs: 10, + byChannel: { + slack: 25, + }, + }, + }, + } as Parameters>[0]["cfg"]; + + const { debounceMs, debouncer } = createChannelInboundDebouncer<{ id: string }>({ + cfg, + channel: "slack", + buildKey: (item) => item.id, + onFlush: async (items) => { + flushed.push(items.map((entry) => entry.id)); + }, + }); + + expect(debounceMs).toBe(25); + + await debouncer.enqueue({ id: "a" }); + await debouncer.enqueue({ id: "a" }); + await vi.advanceTimersByTimeAsync(30); + + expect(flushed).toEqual([["a", "a"]]); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/src/channels/inbound-debounce-policy.ts b/src/channels/inbound-debounce-policy.ts new file mode 100644 index 000000000..7101ba6f1 --- /dev/null +++ b/src/channels/inbound-debounce-policy.ts @@ -0,0 +1,51 @@ +import { hasControlCommand } from "../auto-reply/command-detection.js"; +import type { CommandNormalizeOptions } from "../auto-reply/commands-registry.js"; +import { + createInboundDebouncer, + resolveInboundDebounceMs, + type InboundDebounceCreateParams, +} from "../auto-reply/inbound-debounce.js"; +import type { OpenClawConfig } from "../config/types.js"; + +export function shouldDebounceTextInbound(params: { + text: string | null | undefined; + cfg: OpenClawConfig; + hasMedia?: boolean; + commandOptions?: CommandNormalizeOptions; + allowDebounce?: boolean; +}): boolean { + if (params.allowDebounce === false) { + return false; + } + if (params.hasMedia) { + return false; + } + const text = params.text?.trim() ?? ""; + if (!text) { + return false; + } + return !hasControlCommand(text, params.cfg, params.commandOptions); +} + +export function createChannelInboundDebouncer( + params: Omit, "debounceMs"> & { + cfg: OpenClawConfig; + channel: string; + debounceMsOverride?: number; + }, +): { + debounceMs: number; + debouncer: ReturnType>; +} { + const debounceMs = resolveInboundDebounceMs({ + cfg: params.cfg, + channel: params.channel, + overrideMs: params.debounceMsOverride, + }); + const { cfg: _cfg, channel: _channel, debounceMsOverride: _override, ...rest } = params; + const debouncer = createInboundDebouncer({ + debounceMs, + ...rest, + }); + return { debounceMs, debouncer }; +} diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index 71eb38ca7..0aca2c76a 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -1,9 +1,8 @@ import type { Client } from "@buape/carbon"; -import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "../../auto-reply/inbound-debounce.js"; + createChannelInboundDebouncer, + shouldDebounceTextInbound, +} from "../../channels/inbound-debounce-policy.js"; import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js"; import { danger } from "../../globals.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; @@ -33,10 +32,12 @@ export function createDiscordMessageHandler( params.discordConfig?.ackReactionScope ?? params.cfg.messages?.ackReactionScope ?? "group-mentions"; - const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" }); - - const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({ - debounceMs, + const { debouncer } = createChannelInboundDebouncer<{ + data: DiscordMessageEvent; + client: Client; + }>({ + cfg: params.cfg, + channel: "discord", buildKey: (entry) => { const message = entry.data.message; const authorId = entry.data.author?.id; @@ -57,17 +58,15 @@ export function createDiscordMessageHandler( if (!message) { return false; } - if (message.attachments && message.attachments.length > 0) { - return false; - } - if (hasDiscordMessageStickers(message)) { - return false; - } const baseText = resolveDiscordMessageText(message, { includeForwarded: false }); - if (!baseText.trim()) { - return false; - } - return !hasControlCommand(baseText, params.cfg); + return shouldDebounceTextInbound({ + text: baseText, + cfg: params.cfg, + hasMedia: Boolean( + (message.attachments && message.attachments.length > 0) || + hasDiscordMessageStickers(message), + ), + }); }, onFlush: async (entries) => { const last = entries.at(-1); diff --git a/src/gateway/control-ui.ts b/src/gateway/control-ui.ts index 73d727f15..6075e8281 100644 --- a/src/gateway/control-ui.ts +++ b/src/gateway/control-ui.ts @@ -27,6 +27,8 @@ import { } from "./control-ui-shared.js"; const ROOT_PREFIX = "/"; +const CONTROL_UI_ASSETS_MISSING_MESSAGE = + "Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development."; export type ControlUiRequestOptions = { basePath?: string; @@ -117,6 +119,31 @@ function sendJson(res: ServerResponse, status: number, body: unknown) { res.end(JSON.stringify(body)); } +function respondControlUiAssetsUnavailable( + res: ServerResponse, + options?: { configuredRootPath?: string }, +) { + if (options?.configuredRootPath) { + respondPlainText( + res, + 503, + `Control UI assets not found at ${options.configuredRootPath}. Build them with \`pnpm ui:build\` (auto-installs UI deps), or update gateway.controlUi.root.`, + ); + return; + } + respondPlainText(res, 503, CONTROL_UI_ASSETS_MISSING_MESSAGE); +} + +function respondHeadForFile(req: IncomingMessage, res: ServerResponse, filePath: string): boolean { + if (req.method !== "HEAD") { + return false; + } + res.statusCode = 200; + setStaticFileHeaders(res, filePath); + res.end(); + return true; +} + function isValidAgentId(agentId: string): boolean { return /^[a-z0-9][a-z0-9_-]{0,63}$/i.test(agentId); } @@ -177,11 +204,7 @@ export function handleControlUiAvatarRequest( return true; } try { - if (req.method === "HEAD") { - res.statusCode = 200; - res.setHeader("Content-Type", contentTypeForExt(path.extname(safeAvatar.path).toLowerCase())); - res.setHeader("Cache-Control", "no-cache"); - res.end(); + if (respondHeadForFile(req, res, safeAvatar.path)) { return true; } @@ -333,19 +356,11 @@ export function handleControlUiHttpRequest( const rootState = opts?.root; if (rootState?.kind === "invalid") { - respondPlainText( - res, - 503, - `Control UI assets not found at ${rootState.path}. Build them with \`pnpm ui:build\` (auto-installs UI deps), or update gateway.controlUi.root.`, - ); + respondControlUiAssetsUnavailable(res, { configuredRootPath: rootState.path }); return true; } if (rootState?.kind === "missing") { - respondPlainText( - res, - 503, - "Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.", - ); + respondControlUiAssetsUnavailable(res); return true; } @@ -358,11 +373,7 @@ export function handleControlUiHttpRequest( cwd: process.cwd(), }); if (!root) { - respondPlainText( - res, - 503, - "Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.", - ); + respondControlUiAssetsUnavailable(res); return true; } @@ -377,11 +388,7 @@ export function handleControlUiHttpRequest( } })(); if (!rootReal) { - respondPlainText( - res, - 503, - "Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.", - ); + respondControlUiAssetsUnavailable(res); return true; } @@ -413,10 +420,7 @@ export function handleControlUiHttpRequest( const safeFile = resolveSafeControlUiFile(rootReal, filePath); if (safeFile) { try { - if (req.method === "HEAD") { - res.statusCode = 200; - setStaticFileHeaders(res, safeFile.path); - res.end(); + if (respondHeadForFile(req, res, safeFile.path)) { return true; } if (path.basename(safeFile.path) === "index.html") { @@ -445,10 +449,7 @@ export function handleControlUiHttpRequest( const safeIndex = resolveSafeControlUiFile(rootReal, indexPath); if (safeIndex) { try { - if (req.method === "HEAD") { - res.statusCode = 200; - setStaticFileHeaders(res, safeIndex.path); - res.end(); + if (respondHeadForFile(req, res, safeIndex.path)) { return true; } serveResolvedIndexHtml(res, fs.readFileSync(safeIndex.fd, "utf8")); diff --git a/src/hooks/internal-hooks.ts b/src/hooks/internal-hooks.ts index fec142b5d..625261e3c 100644 --- a/src/hooks/internal-hooks.ts +++ b/src/hooks/internal-hooks.ts @@ -338,78 +338,111 @@ export function createInternalHookEvent( }; } -export function isAgentBootstrapEvent(event: InternalHookEvent): event is AgentBootstrapHookEvent { - if (event.type !== "agent" || event.action !== "bootstrap") { - return false; - } - const context = event.context as Partial | null; +function isHookEventTypeAndAction( + event: InternalHookEvent, + type: InternalHookEventType, + action: string, +): boolean { + return event.type === type && event.action === action; +} + +function getHookContext>( + event: InternalHookEvent, +): Partial | null { + const context = event.context as Partial | null; if (!context || typeof context !== "object") { + return null; + } + return context; +} + +function hasStringContextField>( + context: Partial, + key: keyof T, +): boolean { + return typeof context[key] === "string"; +} + +function hasBooleanContextField>( + context: Partial, + key: keyof T, +): boolean { + return typeof context[key] === "boolean"; +} + +export function isAgentBootstrapEvent(event: InternalHookEvent): event is AgentBootstrapHookEvent { + if (!isHookEventTypeAndAction(event, "agent", "bootstrap")) { return false; } - if (typeof context.workspaceDir !== "string") { + const context = getHookContext(event); + if (!context) { + return false; + } + if (!hasStringContextField(context, "workspaceDir")) { return false; } return Array.isArray(context.bootstrapFiles); } export function isGatewayStartupEvent(event: InternalHookEvent): event is GatewayStartupHookEvent { - if (event.type !== "gateway" || event.action !== "startup") { + if (!isHookEventTypeAndAction(event, "gateway", "startup")) { return false; } - const context = event.context as GatewayStartupHookContext | null; - return Boolean(context && typeof context === "object"); + return Boolean(getHookContext(event)); } export function isMessageReceivedEvent( event: InternalHookEvent, ): event is MessageReceivedHookEvent { - if (event.type !== "message" || event.action !== "received") { + if (!isHookEventTypeAndAction(event, "message", "received")) { return false; } - const context = event.context as Partial | null; - if (!context || typeof context !== "object") { + const context = getHookContext(event); + if (!context) { return false; } - return typeof context.from === "string" && typeof context.channelId === "string"; + return hasStringContextField(context, "from") && hasStringContextField(context, "channelId"); } export function isMessageSentEvent(event: InternalHookEvent): event is MessageSentHookEvent { - if (event.type !== "message" || event.action !== "sent") { + if (!isHookEventTypeAndAction(event, "message", "sent")) { return false; } - const context = event.context as Partial | null; - if (!context || typeof context !== "object") { + const context = getHookContext(event); + if (!context) { return false; } return ( - typeof context.to === "string" && - typeof context.channelId === "string" && - typeof context.success === "boolean" + hasStringContextField(context, "to") && + hasStringContextField(context, "channelId") && + hasBooleanContextField(context, "success") ); } export function isMessageTranscribedEvent( event: InternalHookEvent, ): event is MessageTranscribedHookEvent { - if (event.type !== "message" || event.action !== "transcribed") { + if (!isHookEventTypeAndAction(event, "message", "transcribed")) { return false; } - const context = event.context as Partial | null; - if (!context || typeof context !== "object") { + const context = getHookContext(event); + if (!context) { return false; } - return typeof context.transcript === "string" && typeof context.channelId === "string"; + return ( + hasStringContextField(context, "transcript") && hasStringContextField(context, "channelId") + ); } export function isMessagePreprocessedEvent( event: InternalHookEvent, ): event is MessagePreprocessedHookEvent { - if (event.type !== "message" || event.action !== "preprocessed") { + if (!isHookEventTypeAndAction(event, "message", "preprocessed")) { return false; } - const context = event.context as Partial | null; - if (!context || typeof context !== "object") { + const context = getHookContext(event); + if (!context) { return false; } - return typeof context.channelId === "string"; + return hasStringContextField(context, "channelId"); } diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 8a7b62d5c..2ca8d3015 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -1,18 +1,17 @@ import fs from "node:fs/promises"; import { resolveHumanDelayConfig } from "../../agents/identity.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; -import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; -import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "../../auto-reply/inbound-debounce.js"; import { clearHistoryEntriesIfEnabled, DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry, } from "../../auto-reply/reply/history.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; +import { + createChannelInboundDebouncer, + shouldDebounceTextInbound, +} from "../../channels/inbound-debounce-policy.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { recordInboundSession } from "../../channels/session.js"; import { loadConfig } from "../../config/config.js"; @@ -153,9 +152,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P } } - const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" }); - const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({ - debounceMs: inboundDebounceMs, + const { debouncer: inboundDebouncer } = createChannelInboundDebouncer<{ + message: IMessagePayload; + }>({ + cfg, + channel: "imessage", buildKey: (entry) => { const sender = entry.message.sender?.trim(); if (!sender) { @@ -168,14 +169,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`; }, shouldDebounce: (entry) => { - const text = entry.message.text?.trim() ?? ""; - if (!text) { - return false; - } - if (entry.message.attachments && entry.message.attachments.length > 0) { - return false; - } - return !hasControlCommand(text, cfg); + return shouldDebounceTextInbound({ + text: entry.message.text, + cfg, + hasMedia: Boolean(entry.message.attachments && entry.message.attachments.length > 0), + }); }, onFlush: async (entries) => { const last = entries.at(-1); diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index bb8bfce02..7369a166a 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -6,10 +6,6 @@ import { formatInboundFromLabel, resolveEnvelopeFormatOptions, } from "../../auto-reply/envelope.js"; -import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "../../auto-reply/inbound-debounce.js"; import { buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, @@ -19,6 +15,10 @@ import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.j import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import { resolveControlCommandGate } from "../../channels/command-gating.js"; +import { + createChannelInboundDebouncer, + shouldDebounceTextInbound, +} from "../../channels/inbound-debounce-policy.js"; import { logInboundDrop, logTypingFailure } from "../../channels/logging.js"; import { resolveMentionGatingWithBypass } from "../../channels/mention-gating.js"; import { normalizeSignalMessagingTarget } from "../../channels/plugins/normalize/signal.js"; @@ -57,8 +57,6 @@ import type { } from "./event-handler.types.js"; import { renderSignalMentions } from "./mentions.js"; export function createSignalEventHandler(deps: SignalEventHandlerDeps) { - const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" }); - type SignalInboundEntry = { senderName: string; senderDisplay: string; @@ -299,8 +297,9 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { } } - const inboundDebouncer = createInboundDebouncer({ - debounceMs: inboundDebounceMs, + const { debouncer: inboundDebouncer } = createChannelInboundDebouncer({ + cfg: deps.cfg, + channel: "signal", buildKey: (entry) => { const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId; if (!conversationId || !entry.senderPeerId) { @@ -309,13 +308,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`; }, shouldDebounce: (entry) => { - if (!entry.bodyText.trim()) { - return false; - } - if (entry.mediaPath || entry.mediaType) { - return false; - } - return !hasControlCommand(entry.bodyText, deps.cfg); + return shouldDebounceTextInbound({ + text: entry.bodyText, + cfg: deps.cfg, + hasMedia: Boolean(entry.mediaPath || entry.mediaType), + }); }, onFlush: async (entries) => { const last = entries.at(-1); diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index f5d47400c..647c9a62c 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -1,8 +1,7 @@ -import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "../../auto-reply/inbound-debounce.js"; + createChannelInboundDebouncer, + shouldDebounceTextInbound, +} from "../../channels/inbound-debounce-policy.js"; import type { ResolvedSlackAccount } from "../accounts.js"; import type { SlackMessageEvent } from "../types.js"; import { stripSlackMentionsForCommandDetection } from "./commands.js"; @@ -44,14 +43,12 @@ function buildTopLevelSlackConversationKey( function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) { const text = message.text ?? ""; - if (!text.trim()) { - return false; - } - if (message.files && message.files.length > 0) { - return false; - } const textForCommandDetection = stripSlackMentionsForCommandDetection(text); - return !hasControlCommand(textForCommandDetection, cfg); + return shouldDebounceTextInbound({ + text: textForCommandDetection, + cfg, + hasMedia: Boolean(message.files && message.files.length > 0), + }); } /** @@ -88,15 +85,12 @@ export function createSlackMessageHandler(params: { trackEvent?: () => void; }): SlackMessageHandler { const { ctx, account, trackEvent } = params; - const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" }); - const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); - const pendingTopLevelDebounceKeys = new Map>(); - - const debouncer = createInboundDebouncer<{ + const { debounceMs, debouncer } = createChannelInboundDebouncer<{ message: SlackMessageEvent; opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; }>({ - debounceMs, + cfg: ctx.cfg, + channel: "slack", buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId), shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg), onFlush: async (entries) => { @@ -156,6 +150,8 @@ export function createSlackMessageHandler(params: { ctx.runtime.error?.(`slack inbound debounce flush failed: ${String(err)}`); }, }); + const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); + const pendingTopLevelDebounceKeys = new Map>(); return async (message, opts) => { if (opts.source === "message" && message.type !== "message") { diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index 0b9539895..a71f4cafe 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -1,6 +1,5 @@ import type { Message, ReactionTypeEmoji } from "@grammyjs/types"; import { resolveAgentDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; -import { hasControlCommand } from "../auto-reply/command-detection.js"; import { createInboundDebouncer, resolveInboundDebounceMs, @@ -13,6 +12,7 @@ import { import { resolveStoredModelOverride } from "../auto-reply/reply/model-selection.js"; import { listSkillCommandsForAgents } from "../auto-reply/skill-commands.js"; import { buildCommandsMessagePaginated } from "../auto-reply/status.js"; +import { shouldDebounceTextInbound } from "../channels/inbound-debounce-policy.js"; import { resolveChannelConfigWrites } from "../channels/plugins/config-writes.js"; import { loadConfig } from "../config/config.js"; import { writeConfigFile } from "../config/io.js"; @@ -206,14 +206,18 @@ export const registerTelegramHandlers = ({ buildKey: (entry) => entry.debounceKey, shouldDebounce: (entry) => { const text = entry.msg.text ?? entry.msg.caption ?? ""; - const hasText = text.trim().length > 0; - if (hasText && hasControlCommand(text, cfg, { botUsername: entry.botUsername })) { + const hasDebounceableText = shouldDebounceTextInbound({ + text, + cfg, + commandOptions: { botUsername: entry.botUsername }, + }); + if (!hasDebounceableText) { return false; } if (entry.debounceLane === "forward") { return true; } - return entry.allMedia.length === 0 && hasText; + return entry.allMedia.length === 0; }, onFlush: async (entries) => { const last = entries.at(-1);