refactor: unify inbound debounce policy and split gateway/models helpers
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import type { ClawdbotConfig } from "openclaw/plugin-sdk";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js";
|
||||
|
||||
const probeFeishuMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
@@ -12,7 +13,22 @@ vi.mock("./client.js", () => ({
|
||||
createEventDispatcher: vi.fn(() => ({ register: vi.fn() })),
|
||||
}));
|
||||
|
||||
import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js";
|
||||
vi.mock("./runtime.js", () => ({
|
||||
getFeishuRuntime: () => ({
|
||||
channel: {
|
||||
debounce: {
|
||||
resolveInboundDebounceMs: () => 0,
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: async () => {},
|
||||
flushKey: async () => {},
|
||||
}),
|
||||
},
|
||||
text: {
|
||||
hasControlCommand: () => false,
|
||||
},
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig {
|
||||
return {
|
||||
|
||||
@@ -2,7 +2,34 @@ import { createServer } from "node:http";
|
||||
import type { AddressInfo } from "node:net";
|
||||
import type { ClawdbotConfig } from "openclaw/plugin-sdk";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { probeFeishuMock } from "./monitor.test-mocks.js";
|
||||
|
||||
const probeFeishuMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("./probe.js", () => ({
|
||||
probeFeishu: probeFeishuMock,
|
||||
}));
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })),
|
||||
createEventDispatcher: vi.fn(() => ({ register: vi.fn() })),
|
||||
}));
|
||||
|
||||
vi.mock("./runtime.js", () => ({
|
||||
getFeishuRuntime: () => ({
|
||||
channel: {
|
||||
debounce: {
|
||||
resolveInboundDebounceMs: () => 0,
|
||||
createInboundDebouncer: () => ({
|
||||
enqueue: async () => {},
|
||||
flushKey: async () => {},
|
||||
}),
|
||||
},
|
||||
text: {
|
||||
hasControlCommand: () => false,
|
||||
},
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("@larksuiteoapi/node-sdk", () => ({
|
||||
adaptDefault: vi.fn(
|
||||
|
||||
@@ -111,6 +111,95 @@ async function readJson(pathname: string): Promise<unknown> {
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveProvidersForModelsJson(params: {
|
||||
cfg: OpenClawConfig;
|
||||
agentDir: string;
|
||||
}): Promise<Record<string, ProviderConfig>> {
|
||||
const { cfg, agentDir } = params;
|
||||
const explicitProviders = cfg.models?.providers ?? {};
|
||||
const implicitProviders = await resolveImplicitProviders({ agentDir, explicitProviders });
|
||||
const providers: Record<string, ProviderConfig> = mergeProviders({
|
||||
implicit: implicitProviders,
|
||||
explicit: explicitProviders,
|
||||
});
|
||||
|
||||
const implicitBedrock = await resolveImplicitBedrockProvider({ agentDir, config: cfg });
|
||||
if (implicitBedrock) {
|
||||
const existing = providers["amazon-bedrock"];
|
||||
providers["amazon-bedrock"] = existing
|
||||
? mergeProviderModels(implicitBedrock, existing)
|
||||
: implicitBedrock;
|
||||
}
|
||||
|
||||
const implicitCopilot = await resolveImplicitCopilotProvider({ agentDir });
|
||||
if (implicitCopilot && !providers["github-copilot"]) {
|
||||
providers["github-copilot"] = implicitCopilot;
|
||||
}
|
||||
return providers;
|
||||
}
|
||||
|
||||
function mergeWithExistingProviderSecrets(params: {
|
||||
nextProviders: Record<string, ProviderConfig>;
|
||||
existingProviders: Record<string, NonNullable<ModelsConfig["providers"]>[string]>;
|
||||
}): Record<string, ProviderConfig> {
|
||||
const { nextProviders, existingProviders } = params;
|
||||
const mergedProviders: Record<string, ProviderConfig> = {};
|
||||
for (const [key, entry] of Object.entries(existingProviders)) {
|
||||
mergedProviders[key] = entry;
|
||||
}
|
||||
for (const [key, newEntry] of Object.entries(nextProviders)) {
|
||||
const existing = existingProviders[key] as
|
||||
| (NonNullable<ModelsConfig["providers"]>[string] & {
|
||||
apiKey?: string;
|
||||
baseUrl?: string;
|
||||
})
|
||||
| undefined;
|
||||
if (!existing) {
|
||||
mergedProviders[key] = newEntry;
|
||||
continue;
|
||||
}
|
||||
const preserved: Record<string, unknown> = {};
|
||||
if (typeof existing.apiKey === "string" && existing.apiKey) {
|
||||
preserved.apiKey = existing.apiKey;
|
||||
}
|
||||
if (typeof existing.baseUrl === "string" && existing.baseUrl) {
|
||||
preserved.baseUrl = existing.baseUrl;
|
||||
}
|
||||
mergedProviders[key] = { ...newEntry, ...preserved };
|
||||
}
|
||||
return mergedProviders;
|
||||
}
|
||||
|
||||
async function resolveProvidersForMode(params: {
|
||||
mode: NonNullable<ModelsConfig["mode"]>;
|
||||
targetPath: string;
|
||||
providers: Record<string, ProviderConfig>;
|
||||
}): Promise<Record<string, ProviderConfig>> {
|
||||
if (params.mode !== "merge") {
|
||||
return params.providers;
|
||||
}
|
||||
const existing = await readJson(params.targetPath);
|
||||
if (!isRecord(existing) || !isRecord(existing.providers)) {
|
||||
return params.providers;
|
||||
}
|
||||
const existingProviders = existing.providers as Record<
|
||||
string,
|
||||
NonNullable<ModelsConfig["providers"]>[string]
|
||||
>;
|
||||
return mergeWithExistingProviderSecrets({
|
||||
nextProviders: params.providers,
|
||||
existingProviders,
|
||||
});
|
||||
}
|
||||
|
||||
async function readRawFile(pathname: string): Promise<string> {
|
||||
try {
|
||||
return await fs.readFile(pathname, "utf8");
|
||||
} catch {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
export async function ensureOpenClawModelsJson(
|
||||
config?: OpenClawConfig,
|
||||
agentDirOverride?: string,
|
||||
@@ -124,23 +213,7 @@ export async function ensureOpenClawModelsJson(
|
||||
// through the full loadConfig() pipeline which applies these.
|
||||
applyConfigEnvVars(cfg);
|
||||
|
||||
const explicitProviders = cfg.models?.providers ?? {};
|
||||
const implicitProviders = await resolveImplicitProviders({ agentDir, explicitProviders });
|
||||
const providers: Record<string, ProviderConfig> = mergeProviders({
|
||||
implicit: implicitProviders,
|
||||
explicit: explicitProviders,
|
||||
});
|
||||
const implicitBedrock = await resolveImplicitBedrockProvider({ agentDir, config: cfg });
|
||||
if (implicitBedrock) {
|
||||
const existing = providers["amazon-bedrock"];
|
||||
providers["amazon-bedrock"] = existing
|
||||
? mergeProviderModels(implicitBedrock, existing)
|
||||
: implicitBedrock;
|
||||
}
|
||||
const implicitCopilot = await resolveImplicitCopilotProvider({ agentDir });
|
||||
if (implicitCopilot && !providers["github-copilot"]) {
|
||||
providers["github-copilot"] = implicitCopilot;
|
||||
}
|
||||
const providers = await resolveProvidersForModelsJson({ cfg, agentDir });
|
||||
|
||||
if (Object.keys(providers).length === 0) {
|
||||
return { agentDir, wrote: false };
|
||||
@@ -148,53 +221,18 @@ export async function ensureOpenClawModelsJson(
|
||||
|
||||
const mode = cfg.models?.mode ?? DEFAULT_MODE;
|
||||
const targetPath = path.join(agentDir, "models.json");
|
||||
|
||||
let mergedProviders = providers;
|
||||
let existingRaw = "";
|
||||
if (mode === "merge") {
|
||||
const existing = await readJson(targetPath);
|
||||
if (isRecord(existing) && isRecord(existing.providers)) {
|
||||
const existingProviders = existing.providers as Record<
|
||||
string,
|
||||
NonNullable<ModelsConfig["providers"]>[string]
|
||||
>;
|
||||
mergedProviders = {};
|
||||
for (const [key, entry] of Object.entries(existingProviders)) {
|
||||
mergedProviders[key] = entry;
|
||||
}
|
||||
for (const [key, newEntry] of Object.entries(providers)) {
|
||||
const existing = existingProviders[key] as
|
||||
| (NonNullable<ModelsConfig["providers"]>[string] & {
|
||||
apiKey?: string;
|
||||
baseUrl?: string;
|
||||
})
|
||||
| undefined;
|
||||
if (existing) {
|
||||
const preserved: Record<string, unknown> = {};
|
||||
if (typeof existing.apiKey === "string" && existing.apiKey) {
|
||||
preserved.apiKey = existing.apiKey;
|
||||
}
|
||||
if (typeof existing.baseUrl === "string" && existing.baseUrl) {
|
||||
preserved.baseUrl = existing.baseUrl;
|
||||
}
|
||||
mergedProviders[key] = { ...newEntry, ...preserved };
|
||||
} else {
|
||||
mergedProviders[key] = newEntry;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const mergedProviders = await resolveProvidersForMode({
|
||||
mode,
|
||||
targetPath,
|
||||
providers,
|
||||
});
|
||||
|
||||
const normalizedProviders = normalizeProviders({
|
||||
providers: mergedProviders,
|
||||
agentDir,
|
||||
});
|
||||
const next = `${JSON.stringify({ providers: normalizedProviders }, null, 2)}\n`;
|
||||
try {
|
||||
existingRaw = await fs.readFile(targetPath, "utf8");
|
||||
} catch {
|
||||
existingRaw = "";
|
||||
}
|
||||
const existingRaw = await readRawFile(targetPath);
|
||||
|
||||
if (existingRaw === next) {
|
||||
return { agentDir, wrote: false };
|
||||
|
||||
@@ -39,14 +39,16 @@ type DebounceBuffer<T> = {
|
||||
debounceMs: number;
|
||||
};
|
||||
|
||||
export function createInboundDebouncer<T>(params: {
|
||||
export type InboundDebounceCreateParams<T> = {
|
||||
debounceMs: number;
|
||||
buildKey: (item: T) => string | null | undefined;
|
||||
shouldDebounce?: (item: T) => boolean;
|
||||
resolveDebounceMs?: (item: T) => number | undefined;
|
||||
onFlush: (items: T[]) => Promise<void>;
|
||||
onError?: (err: unknown, items: T[]) => void;
|
||||
}) {
|
||||
};
|
||||
|
||||
export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>) {
|
||||
const buffers = new Map<string, DebounceBuffer<T>>();
|
||||
const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs));
|
||||
|
||||
|
||||
61
src/channels/inbound-debounce-policy.test.ts
Normal file
61
src/channels/inbound-debounce-policy.test.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
createChannelInboundDebouncer,
|
||||
shouldDebounceTextInbound,
|
||||
} from "./inbound-debounce-policy.js";
|
||||
|
||||
describe("shouldDebounceTextInbound", () => {
|
||||
it("rejects blank text, media, and control commands", () => {
|
||||
const cfg = {} as Parameters<typeof shouldDebounceTextInbound>[0]["cfg"];
|
||||
|
||||
expect(shouldDebounceTextInbound({ text: " ", cfg })).toBe(false);
|
||||
expect(shouldDebounceTextInbound({ text: "hello", cfg, hasMedia: true })).toBe(false);
|
||||
expect(shouldDebounceTextInbound({ text: "/status", cfg })).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts normal text when debounce is allowed", () => {
|
||||
const cfg = {} as Parameters<typeof shouldDebounceTextInbound>[0]["cfg"];
|
||||
expect(shouldDebounceTextInbound({ text: "hello there", cfg })).toBe(true);
|
||||
expect(shouldDebounceTextInbound({ text: "hello there", cfg, allowDebounce: false })).toBe(
|
||||
false,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createChannelInboundDebouncer", () => {
|
||||
it("resolves per-channel debounce and forwards callbacks", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const flushed: string[][] = [];
|
||||
const cfg = {
|
||||
messages: {
|
||||
inbound: {
|
||||
debounceMs: 10,
|
||||
byChannel: {
|
||||
slack: 25,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as Parameters<typeof createChannelInboundDebouncer<{ id: string }>>[0]["cfg"];
|
||||
|
||||
const { debounceMs, debouncer } = createChannelInboundDebouncer<{ id: string }>({
|
||||
cfg,
|
||||
channel: "slack",
|
||||
buildKey: (item) => item.id,
|
||||
onFlush: async (items) => {
|
||||
flushed.push(items.map((entry) => entry.id));
|
||||
},
|
||||
});
|
||||
|
||||
expect(debounceMs).toBe(25);
|
||||
|
||||
await debouncer.enqueue({ id: "a" });
|
||||
await debouncer.enqueue({ id: "a" });
|
||||
await vi.advanceTimersByTimeAsync(30);
|
||||
|
||||
expect(flushed).toEqual([["a", "a"]]);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
51
src/channels/inbound-debounce-policy.ts
Normal file
51
src/channels/inbound-debounce-policy.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { hasControlCommand } from "../auto-reply/command-detection.js";
|
||||
import type { CommandNormalizeOptions } from "../auto-reply/commands-registry.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
type InboundDebounceCreateParams,
|
||||
} from "../auto-reply/inbound-debounce.js";
|
||||
import type { OpenClawConfig } from "../config/types.js";
|
||||
|
||||
export function shouldDebounceTextInbound(params: {
|
||||
text: string | null | undefined;
|
||||
cfg: OpenClawConfig;
|
||||
hasMedia?: boolean;
|
||||
commandOptions?: CommandNormalizeOptions;
|
||||
allowDebounce?: boolean;
|
||||
}): boolean {
|
||||
if (params.allowDebounce === false) {
|
||||
return false;
|
||||
}
|
||||
if (params.hasMedia) {
|
||||
return false;
|
||||
}
|
||||
const text = params.text?.trim() ?? "";
|
||||
if (!text) {
|
||||
return false;
|
||||
}
|
||||
return !hasControlCommand(text, params.cfg, params.commandOptions);
|
||||
}
|
||||
|
||||
export function createChannelInboundDebouncer<T>(
|
||||
params: Omit<InboundDebounceCreateParams<T>, "debounceMs"> & {
|
||||
cfg: OpenClawConfig;
|
||||
channel: string;
|
||||
debounceMsOverride?: number;
|
||||
},
|
||||
): {
|
||||
debounceMs: number;
|
||||
debouncer: ReturnType<typeof createInboundDebouncer<T>>;
|
||||
} {
|
||||
const debounceMs = resolveInboundDebounceMs({
|
||||
cfg: params.cfg,
|
||||
channel: params.channel,
|
||||
overrideMs: params.debounceMsOverride,
|
||||
});
|
||||
const { cfg: _cfg, channel: _channel, debounceMsOverride: _override, ...rest } = params;
|
||||
const debouncer = createInboundDebouncer<T>({
|
||||
debounceMs,
|
||||
...rest,
|
||||
});
|
||||
return { debounceMs, debouncer };
|
||||
}
|
||||
@@ -1,9 +1,8 @@
|
||||
import type { Client } from "@buape/carbon";
|
||||
import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
createChannelInboundDebouncer,
|
||||
shouldDebounceTextInbound,
|
||||
} from "../../channels/inbound-debounce-policy.js";
|
||||
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
|
||||
import { danger } from "../../globals.js";
|
||||
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
|
||||
@@ -33,10 +32,12 @@ export function createDiscordMessageHandler(
|
||||
params.discordConfig?.ackReactionScope ??
|
||||
params.cfg.messages?.ackReactionScope ??
|
||||
"group-mentions";
|
||||
const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" });
|
||||
|
||||
const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({
|
||||
debounceMs,
|
||||
const { debouncer } = createChannelInboundDebouncer<{
|
||||
data: DiscordMessageEvent;
|
||||
client: Client;
|
||||
}>({
|
||||
cfg: params.cfg,
|
||||
channel: "discord",
|
||||
buildKey: (entry) => {
|
||||
const message = entry.data.message;
|
||||
const authorId = entry.data.author?.id;
|
||||
@@ -57,17 +58,15 @@ export function createDiscordMessageHandler(
|
||||
if (!message) {
|
||||
return false;
|
||||
}
|
||||
if (message.attachments && message.attachments.length > 0) {
|
||||
return false;
|
||||
}
|
||||
if (hasDiscordMessageStickers(message)) {
|
||||
return false;
|
||||
}
|
||||
const baseText = resolveDiscordMessageText(message, { includeForwarded: false });
|
||||
if (!baseText.trim()) {
|
||||
return false;
|
||||
}
|
||||
return !hasControlCommand(baseText, params.cfg);
|
||||
return shouldDebounceTextInbound({
|
||||
text: baseText,
|
||||
cfg: params.cfg,
|
||||
hasMedia: Boolean(
|
||||
(message.attachments && message.attachments.length > 0) ||
|
||||
hasDiscordMessageStickers(message),
|
||||
),
|
||||
});
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
|
||||
@@ -27,6 +27,8 @@ import {
|
||||
} from "./control-ui-shared.js";
|
||||
|
||||
const ROOT_PREFIX = "/";
|
||||
const CONTROL_UI_ASSETS_MISSING_MESSAGE =
|
||||
"Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.";
|
||||
|
||||
export type ControlUiRequestOptions = {
|
||||
basePath?: string;
|
||||
@@ -117,6 +119,31 @@ function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
res.end(JSON.stringify(body));
|
||||
}
|
||||
|
||||
function respondControlUiAssetsUnavailable(
|
||||
res: ServerResponse,
|
||||
options?: { configuredRootPath?: string },
|
||||
) {
|
||||
if (options?.configuredRootPath) {
|
||||
respondPlainText(
|
||||
res,
|
||||
503,
|
||||
`Control UI assets not found at ${options.configuredRootPath}. Build them with \`pnpm ui:build\` (auto-installs UI deps), or update gateway.controlUi.root.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
respondPlainText(res, 503, CONTROL_UI_ASSETS_MISSING_MESSAGE);
|
||||
}
|
||||
|
||||
function respondHeadForFile(req: IncomingMessage, res: ServerResponse, filePath: string): boolean {
|
||||
if (req.method !== "HEAD") {
|
||||
return false;
|
||||
}
|
||||
res.statusCode = 200;
|
||||
setStaticFileHeaders(res, filePath);
|
||||
res.end();
|
||||
return true;
|
||||
}
|
||||
|
||||
function isValidAgentId(agentId: string): boolean {
|
||||
return /^[a-z0-9][a-z0-9_-]{0,63}$/i.test(agentId);
|
||||
}
|
||||
@@ -177,11 +204,7 @@ export function handleControlUiAvatarRequest(
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
if (req.method === "HEAD") {
|
||||
res.statusCode = 200;
|
||||
res.setHeader("Content-Type", contentTypeForExt(path.extname(safeAvatar.path).toLowerCase()));
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.end();
|
||||
if (respondHeadForFile(req, res, safeAvatar.path)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -333,19 +356,11 @@ export function handleControlUiHttpRequest(
|
||||
|
||||
const rootState = opts?.root;
|
||||
if (rootState?.kind === "invalid") {
|
||||
respondPlainText(
|
||||
res,
|
||||
503,
|
||||
`Control UI assets not found at ${rootState.path}. Build them with \`pnpm ui:build\` (auto-installs UI deps), or update gateway.controlUi.root.`,
|
||||
);
|
||||
respondControlUiAssetsUnavailable(res, { configuredRootPath: rootState.path });
|
||||
return true;
|
||||
}
|
||||
if (rootState?.kind === "missing") {
|
||||
respondPlainText(
|
||||
res,
|
||||
503,
|
||||
"Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.",
|
||||
);
|
||||
respondControlUiAssetsUnavailable(res);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -358,11 +373,7 @@ export function handleControlUiHttpRequest(
|
||||
cwd: process.cwd(),
|
||||
});
|
||||
if (!root) {
|
||||
respondPlainText(
|
||||
res,
|
||||
503,
|
||||
"Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.",
|
||||
);
|
||||
respondControlUiAssetsUnavailable(res);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -377,11 +388,7 @@ export function handleControlUiHttpRequest(
|
||||
}
|
||||
})();
|
||||
if (!rootReal) {
|
||||
respondPlainText(
|
||||
res,
|
||||
503,
|
||||
"Control UI assets not found. Build them with `pnpm ui:build` (auto-installs UI deps), or run `pnpm ui:dev` during development.",
|
||||
);
|
||||
respondControlUiAssetsUnavailable(res);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -413,10 +420,7 @@ export function handleControlUiHttpRequest(
|
||||
const safeFile = resolveSafeControlUiFile(rootReal, filePath);
|
||||
if (safeFile) {
|
||||
try {
|
||||
if (req.method === "HEAD") {
|
||||
res.statusCode = 200;
|
||||
setStaticFileHeaders(res, safeFile.path);
|
||||
res.end();
|
||||
if (respondHeadForFile(req, res, safeFile.path)) {
|
||||
return true;
|
||||
}
|
||||
if (path.basename(safeFile.path) === "index.html") {
|
||||
@@ -445,10 +449,7 @@ export function handleControlUiHttpRequest(
|
||||
const safeIndex = resolveSafeControlUiFile(rootReal, indexPath);
|
||||
if (safeIndex) {
|
||||
try {
|
||||
if (req.method === "HEAD") {
|
||||
res.statusCode = 200;
|
||||
setStaticFileHeaders(res, safeIndex.path);
|
||||
res.end();
|
||||
if (respondHeadForFile(req, res, safeIndex.path)) {
|
||||
return true;
|
||||
}
|
||||
serveResolvedIndexHtml(res, fs.readFileSync(safeIndex.fd, "utf8"));
|
||||
|
||||
@@ -338,78 +338,111 @@ export function createInternalHookEvent(
|
||||
};
|
||||
}
|
||||
|
||||
export function isAgentBootstrapEvent(event: InternalHookEvent): event is AgentBootstrapHookEvent {
|
||||
if (event.type !== "agent" || event.action !== "bootstrap") {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as Partial<AgentBootstrapHookContext> | null;
|
||||
function isHookEventTypeAndAction(
|
||||
event: InternalHookEvent,
|
||||
type: InternalHookEventType,
|
||||
action: string,
|
||||
): boolean {
|
||||
return event.type === type && event.action === action;
|
||||
}
|
||||
|
||||
function getHookContext<T extends Record<string, unknown>>(
|
||||
event: InternalHookEvent,
|
||||
): Partial<T> | null {
|
||||
const context = event.context as Partial<T> | null;
|
||||
if (!context || typeof context !== "object") {
|
||||
return null;
|
||||
}
|
||||
return context;
|
||||
}
|
||||
|
||||
function hasStringContextField<T extends Record<string, unknown>>(
|
||||
context: Partial<T>,
|
||||
key: keyof T,
|
||||
): boolean {
|
||||
return typeof context[key] === "string";
|
||||
}
|
||||
|
||||
function hasBooleanContextField<T extends Record<string, unknown>>(
|
||||
context: Partial<T>,
|
||||
key: keyof T,
|
||||
): boolean {
|
||||
return typeof context[key] === "boolean";
|
||||
}
|
||||
|
||||
export function isAgentBootstrapEvent(event: InternalHookEvent): event is AgentBootstrapHookEvent {
|
||||
if (!isHookEventTypeAndAction(event, "agent", "bootstrap")) {
|
||||
return false;
|
||||
}
|
||||
if (typeof context.workspaceDir !== "string") {
|
||||
const context = getHookContext<AgentBootstrapHookContext>(event);
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
if (!hasStringContextField(context, "workspaceDir")) {
|
||||
return false;
|
||||
}
|
||||
return Array.isArray(context.bootstrapFiles);
|
||||
}
|
||||
|
||||
export function isGatewayStartupEvent(event: InternalHookEvent): event is GatewayStartupHookEvent {
|
||||
if (event.type !== "gateway" || event.action !== "startup") {
|
||||
if (!isHookEventTypeAndAction(event, "gateway", "startup")) {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as GatewayStartupHookContext | null;
|
||||
return Boolean(context && typeof context === "object");
|
||||
return Boolean(getHookContext<GatewayStartupHookContext>(event));
|
||||
}
|
||||
|
||||
export function isMessageReceivedEvent(
|
||||
event: InternalHookEvent,
|
||||
): event is MessageReceivedHookEvent {
|
||||
if (event.type !== "message" || event.action !== "received") {
|
||||
if (!isHookEventTypeAndAction(event, "message", "received")) {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as Partial<MessageReceivedHookContext> | null;
|
||||
if (!context || typeof context !== "object") {
|
||||
const context = getHookContext<MessageReceivedHookContext>(event);
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
return typeof context.from === "string" && typeof context.channelId === "string";
|
||||
return hasStringContextField(context, "from") && hasStringContextField(context, "channelId");
|
||||
}
|
||||
|
||||
export function isMessageSentEvent(event: InternalHookEvent): event is MessageSentHookEvent {
|
||||
if (event.type !== "message" || event.action !== "sent") {
|
||||
if (!isHookEventTypeAndAction(event, "message", "sent")) {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as Partial<MessageSentHookContext> | null;
|
||||
if (!context || typeof context !== "object") {
|
||||
const context = getHookContext<MessageSentHookContext>(event);
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
typeof context.to === "string" &&
|
||||
typeof context.channelId === "string" &&
|
||||
typeof context.success === "boolean"
|
||||
hasStringContextField(context, "to") &&
|
||||
hasStringContextField(context, "channelId") &&
|
||||
hasBooleanContextField(context, "success")
|
||||
);
|
||||
}
|
||||
|
||||
export function isMessageTranscribedEvent(
|
||||
event: InternalHookEvent,
|
||||
): event is MessageTranscribedHookEvent {
|
||||
if (event.type !== "message" || event.action !== "transcribed") {
|
||||
if (!isHookEventTypeAndAction(event, "message", "transcribed")) {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as Partial<MessageTranscribedHookContext> | null;
|
||||
if (!context || typeof context !== "object") {
|
||||
const context = getHookContext<MessageTranscribedHookContext>(event);
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
return typeof context.transcript === "string" && typeof context.channelId === "string";
|
||||
return (
|
||||
hasStringContextField(context, "transcript") && hasStringContextField(context, "channelId")
|
||||
);
|
||||
}
|
||||
|
||||
export function isMessagePreprocessedEvent(
|
||||
event: InternalHookEvent,
|
||||
): event is MessagePreprocessedHookEvent {
|
||||
if (event.type !== "message" || event.action !== "preprocessed") {
|
||||
if (!isHookEventTypeAndAction(event, "message", "preprocessed")) {
|
||||
return false;
|
||||
}
|
||||
const context = event.context as Partial<MessagePreprocessedHookContext> | null;
|
||||
if (!context || typeof context !== "object") {
|
||||
const context = getHookContext<MessagePreprocessedHookContext>(event);
|
||||
if (!context) {
|
||||
return false;
|
||||
}
|
||||
return typeof context.channelId === "string";
|
||||
return hasStringContextField(context, "channelId");
|
||||
}
|
||||
|
||||
@@ -1,18 +1,17 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { resolveHumanDelayConfig } from "../../agents/identity.js";
|
||||
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
|
||||
import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
import {
|
||||
clearHistoryEntriesIfEnabled,
|
||||
DEFAULT_GROUP_HISTORY_LIMIT,
|
||||
type HistoryEntry,
|
||||
} from "../../auto-reply/reply/history.js";
|
||||
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import {
|
||||
createChannelInboundDebouncer,
|
||||
shouldDebounceTextInbound,
|
||||
} from "../../channels/inbound-debounce-policy.js";
|
||||
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
|
||||
import { recordInboundSession } from "../../channels/session.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
@@ -153,9 +152,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
}
|
||||
}
|
||||
|
||||
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "imessage" });
|
||||
const inboundDebouncer = createInboundDebouncer<{ message: IMessagePayload }>({
|
||||
debounceMs: inboundDebounceMs,
|
||||
const { debouncer: inboundDebouncer } = createChannelInboundDebouncer<{
|
||||
message: IMessagePayload;
|
||||
}>({
|
||||
cfg,
|
||||
channel: "imessage",
|
||||
buildKey: (entry) => {
|
||||
const sender = entry.message.sender?.trim();
|
||||
if (!sender) {
|
||||
@@ -168,14 +169,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`;
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
const text = entry.message.text?.trim() ?? "";
|
||||
if (!text) {
|
||||
return false;
|
||||
}
|
||||
if (entry.message.attachments && entry.message.attachments.length > 0) {
|
||||
return false;
|
||||
}
|
||||
return !hasControlCommand(text, cfg);
|
||||
return shouldDebounceTextInbound({
|
||||
text: entry.message.text,
|
||||
cfg,
|
||||
hasMedia: Boolean(entry.message.attachments && entry.message.attachments.length > 0),
|
||||
});
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
|
||||
@@ -6,10 +6,6 @@ import {
|
||||
formatInboundFromLabel,
|
||||
resolveEnvelopeFormatOptions,
|
||||
} from "../../auto-reply/envelope.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
import {
|
||||
buildPendingHistoryContextFromMap,
|
||||
clearHistoryEntriesIfEnabled,
|
||||
@@ -19,6 +15,10 @@ import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.j
|
||||
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
|
||||
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import { resolveControlCommandGate } from "../../channels/command-gating.js";
|
||||
import {
|
||||
createChannelInboundDebouncer,
|
||||
shouldDebounceTextInbound,
|
||||
} from "../../channels/inbound-debounce-policy.js";
|
||||
import { logInboundDrop, logTypingFailure } from "../../channels/logging.js";
|
||||
import { resolveMentionGatingWithBypass } from "../../channels/mention-gating.js";
|
||||
import { normalizeSignalMessagingTarget } from "../../channels/plugins/normalize/signal.js";
|
||||
@@ -57,8 +57,6 @@ import type {
|
||||
} from "./event-handler.types.js";
|
||||
import { renderSignalMentions } from "./mentions.js";
|
||||
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" });
|
||||
|
||||
type SignalInboundEntry = {
|
||||
senderName: string;
|
||||
senderDisplay: string;
|
||||
@@ -299,8 +297,9 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
}
|
||||
}
|
||||
|
||||
const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({
|
||||
debounceMs: inboundDebounceMs,
|
||||
const { debouncer: inboundDebouncer } = createChannelInboundDebouncer<SignalInboundEntry>({
|
||||
cfg: deps.cfg,
|
||||
channel: "signal",
|
||||
buildKey: (entry) => {
|
||||
const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId;
|
||||
if (!conversationId || !entry.senderPeerId) {
|
||||
@@ -309,13 +308,11 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`;
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
if (!entry.bodyText.trim()) {
|
||||
return false;
|
||||
}
|
||||
if (entry.mediaPath || entry.mediaType) {
|
||||
return false;
|
||||
}
|
||||
return !hasControlCommand(entry.bodyText, deps.cfg);
|
||||
return shouldDebounceTextInbound({
|
||||
text: entry.bodyText,
|
||||
cfg: deps.cfg,
|
||||
hasMedia: Boolean(entry.mediaPath || entry.mediaType),
|
||||
});
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
createChannelInboundDebouncer,
|
||||
shouldDebounceTextInbound,
|
||||
} from "../../channels/inbound-debounce-policy.js";
|
||||
import type { ResolvedSlackAccount } from "../accounts.js";
|
||||
import type { SlackMessageEvent } from "../types.js";
|
||||
import { stripSlackMentionsForCommandDetection } from "./commands.js";
|
||||
@@ -44,14 +43,12 @@ function buildTopLevelSlackConversationKey(
|
||||
|
||||
function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonitorContext["cfg"]) {
|
||||
const text = message.text ?? "";
|
||||
if (!text.trim()) {
|
||||
return false;
|
||||
}
|
||||
if (message.files && message.files.length > 0) {
|
||||
return false;
|
||||
}
|
||||
const textForCommandDetection = stripSlackMentionsForCommandDetection(text);
|
||||
return !hasControlCommand(textForCommandDetection, cfg);
|
||||
return shouldDebounceTextInbound({
|
||||
text: textForCommandDetection,
|
||||
cfg,
|
||||
hasMedia: Boolean(message.files && message.files.length > 0),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -88,15 +85,12 @@ export function createSlackMessageHandler(params: {
|
||||
trackEvent?: () => void;
|
||||
}): SlackMessageHandler {
|
||||
const { ctx, account, trackEvent } = params;
|
||||
const debounceMs = resolveInboundDebounceMs({ cfg: ctx.cfg, channel: "slack" });
|
||||
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
|
||||
const pendingTopLevelDebounceKeys = new Map<string, Set<string>>();
|
||||
|
||||
const debouncer = createInboundDebouncer<{
|
||||
const { debounceMs, debouncer } = createChannelInboundDebouncer<{
|
||||
message: SlackMessageEvent;
|
||||
opts: { source: "message" | "app_mention"; wasMentioned?: boolean };
|
||||
}>({
|
||||
debounceMs,
|
||||
cfg: ctx.cfg,
|
||||
channel: "slack",
|
||||
buildKey: (entry) => buildSlackDebounceKey(entry.message, ctx.accountId),
|
||||
shouldDebounce: (entry) => shouldDebounceSlackMessage(entry.message, ctx.cfg),
|
||||
onFlush: async (entries) => {
|
||||
@@ -156,6 +150,8 @@ export function createSlackMessageHandler(params: {
|
||||
ctx.runtime.error?.(`slack inbound debounce flush failed: ${String(err)}`);
|
||||
},
|
||||
});
|
||||
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
|
||||
const pendingTopLevelDebounceKeys = new Map<string, Set<string>>();
|
||||
|
||||
return async (message, opts) => {
|
||||
if (opts.source === "message" && message.type !== "message") {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import type { Message, ReactionTypeEmoji } from "@grammyjs/types";
|
||||
import { resolveAgentDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { hasControlCommand } from "../auto-reply/command-detection.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
@@ -13,6 +12,7 @@ import {
|
||||
import { resolveStoredModelOverride } from "../auto-reply/reply/model-selection.js";
|
||||
import { listSkillCommandsForAgents } from "../auto-reply/skill-commands.js";
|
||||
import { buildCommandsMessagePaginated } from "../auto-reply/status.js";
|
||||
import { shouldDebounceTextInbound } from "../channels/inbound-debounce-policy.js";
|
||||
import { resolveChannelConfigWrites } from "../channels/plugins/config-writes.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { writeConfigFile } from "../config/io.js";
|
||||
@@ -206,14 +206,18 @@ export const registerTelegramHandlers = ({
|
||||
buildKey: (entry) => entry.debounceKey,
|
||||
shouldDebounce: (entry) => {
|
||||
const text = entry.msg.text ?? entry.msg.caption ?? "";
|
||||
const hasText = text.trim().length > 0;
|
||||
if (hasText && hasControlCommand(text, cfg, { botUsername: entry.botUsername })) {
|
||||
const hasDebounceableText = shouldDebounceTextInbound({
|
||||
text,
|
||||
cfg,
|
||||
commandOptions: { botUsername: entry.botUsername },
|
||||
});
|
||||
if (!hasDebounceableText) {
|
||||
return false;
|
||||
}
|
||||
if (entry.debounceLane === "forward") {
|
||||
return true;
|
||||
}
|
||||
return entry.allMedia.length === 0 && hasText;
|
||||
return entry.allMedia.length === 0;
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
|
||||
Reference in New Issue
Block a user