Files
openclaw/src/signal/monitor/event-handler.ts
2026-02-18 01:34:35 +00:00

679 lines
23 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { resolveHumanDelayConfig } from "../../agents/identity.js";
import { hasControlCommand } from "../../auto-reply/command-detection.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatInboundFromLabel,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "../../auto-reply/inbound-debounce.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
recordPendingHistoryEntryIfEnabled,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import { resolveControlCommandGate } from "../../channels/command-gating.js";
import { logInboundDrop, logTypingFailure } from "../../channels/logging.js";
import { resolveMentionGatingWithBypass } from "../../channels/mention-gating.js";
import { normalizeSignalMessagingTarget } from "../../channels/plugins/normalize/signal.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { recordInboundSession } from "../../channels/session.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveChannelGroupRequireMention } from "../../config/group-policy.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import {
readChannelAllowFromStore,
upsertChannelPairingRequest,
} from "../../pairing/pairing-store.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import { normalizeE164 } from "../../utils.js";
import {
formatSignalPairingIdLine,
formatSignalSenderDisplay,
formatSignalSenderId,
isSignalSenderAllowed,
resolveSignalPeerId,
resolveSignalRecipient,
resolveSignalSender,
} from "../identity.js";
import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js";
import type { SignalEventHandlerDeps, SignalReceivePayload } from "./event-handler.types.js";
import { renderSignalMentions } from "./mentions.js";
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" });
type SignalInboundEntry = {
senderName: string;
senderDisplay: string;
senderRecipient: string;
senderPeerId: string;
groupId?: string;
groupName?: string;
isGroup: boolean;
bodyText: string;
timestamp?: number;
messageId?: string;
mediaPath?: string;
mediaType?: string;
commandAuthorized: boolean;
wasMentioned?: boolean;
};
async function handleSignalInboundMessage(entry: SignalInboundEntry) {
const fromLabel = formatInboundFromLabel({
isGroup: entry.isGroup,
groupLabel: entry.groupName ?? undefined,
groupId: entry.groupId ?? "unknown",
groupFallback: "Group",
directLabel: entry.senderName,
directId: entry.senderDisplay,
});
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: entry.isGroup ? "group" : "direct",
id: entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId,
},
});
const storePath = resolveStorePath(deps.cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(deps.cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const body = formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: entry.timestamp ?? undefined,
body: entry.bodyText,
chatType: entry.isGroup ? "group" : "direct",
sender: { name: entry.senderName, id: entry.senderDisplay },
previousTimestamp,
envelope: envelopeOptions,
});
let combinedBody = body;
const historyKey = entry.isGroup ? String(entry.groupId ?? "unknown") : undefined;
if (entry.isGroup && historyKey) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
currentMessage: combinedBody,
formatEntry: (historyEntry) =>
formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: historyEntry.timestamp,
body: `${historyEntry.body}${
historyEntry.messageId ? ` [id:${historyEntry.messageId}]` : ""
}`,
chatType: "group",
senderLabel: historyEntry.sender,
envelope: envelopeOptions,
}),
});
}
const signalToRaw = entry.isGroup
? `group:${entry.groupId}`
: `signal:${entry.senderRecipient}`;
const signalTo = normalizeSignalMessagingTarget(signalToRaw) ?? signalToRaw;
const inboundHistory =
entry.isGroup && historyKey && deps.historyLimit > 0
? (deps.groupHistories.get(historyKey) ?? []).map((historyEntry) => ({
sender: historyEntry.sender,
body: historyEntry.body,
timestamp: historyEntry.timestamp,
}))
: undefined;
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
BodyForAgent: entry.bodyText,
InboundHistory: inboundHistory,
RawBody: entry.bodyText,
CommandBody: entry.bodyText,
From: entry.isGroup
? `group:${entry.groupId ?? "unknown"}`
: `signal:${entry.senderRecipient}`,
To: signalTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: entry.isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
GroupSubject: entry.isGroup ? (entry.groupName ?? undefined) : undefined,
SenderName: entry.senderName,
SenderId: entry.senderDisplay,
Provider: "signal" as const,
Surface: "signal" as const,
MessageSid: entry.messageId,
Timestamp: entry.timestamp ?? undefined,
MediaPath: entry.mediaPath,
MediaType: entry.mediaType,
MediaUrl: entry.mediaPath,
WasMentioned: entry.isGroup ? entry.wasMentioned === true : undefined,
CommandAuthorized: entry.commandAuthorized,
OriginatingChannel: "signal" as const,
OriginatingTo: signalTo,
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n");
logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`);
}
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg: deps.cfg,
agentId: route.agentId,
channel: "signal",
accountId: route.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!ctxPayload.To) {
return;
}
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
},
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "signal",
target: ctxPayload.To ?? undefined,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
deliver: async (payload) => {
await deps.deliverReplies({
replies: [payload],
target: ctxPayload.To,
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
runtime: deps.runtime,
maxBytes: deps.mediaMaxBytes,
textLimit: deps.textLimit,
});
},
onError: (err, info) => {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: typingCallbacks.onReplyStart,
});
const { queuedFinal } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected,
},
});
markDispatchIdle();
if (!queuedFinal) {
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
return;
}
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
}
const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId;
if (!conversationId || !entry.senderPeerId) {
return null;
}
return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`;
},
shouldDebounce: (entry) => {
if (!entry.bodyText.trim()) {
return false;
}
if (entry.mediaPath || entry.mediaType) {
return false;
}
return !hasControlCommand(entry.bodyText, deps.cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await handleSignalInboundMessage(last);
return;
}
const combinedText = entries
.map((entry) => entry.bodyText)
.filter(Boolean)
.join("\\n");
if (!combinedText.trim()) {
return;
}
await handleSignalInboundMessage({
...last,
bodyText: combinedText,
mediaPath: undefined,
mediaType: undefined,
});
},
onError: (err) => {
deps.runtime.error?.(`signal debounce flush failed: ${String(err)}`);
},
});
return async (event: { event?: string; data?: string }) => {
if (event.event !== "receive" || !event.data) {
return;
}
let payload: SignalReceivePayload | null = null;
try {
payload = JSON.parse(event.data) as SignalReceivePayload;
} catch (err) {
deps.runtime.error?.(`failed to parse event: ${String(err)}`);
return;
}
if (payload?.exception?.message) {
deps.runtime.error?.(`receive exception: ${payload.exception.message}`);
}
const envelope = payload?.envelope;
if (!envelope) {
return;
}
if (envelope.syncMessage) {
return;
}
const sender = resolveSignalSender(envelope);
if (!sender) {
return;
}
if (deps.account && sender.kind === "phone") {
if (sender.e164 === normalizeE164(deps.account)) {
return;
}
}
const dataMessage = envelope.dataMessage ?? envelope.editMessage?.dataMessage;
const reaction = deps.isSignalReactionMessage(envelope.reactionMessage)
? envelope.reactionMessage
: deps.isSignalReactionMessage(dataMessage?.reaction)
? dataMessage?.reaction
: null;
// Replace (object replacement character) with @uuid or @phone from mentions
// Signal encodes mentions as the object replacement character; hydrate them from metadata first.
const rawMessage = dataMessage?.message ?? "";
const normalizedMessage = renderSignalMentions(rawMessage, dataMessage?.mentions);
const messageText = normalizedMessage.trim();
const quoteText = dataMessage?.quote?.text?.trim() ?? "";
const hasBodyContent =
Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length);
if (reaction && !hasBodyContent) {
if (reaction.isRemove) {
return;
} // Ignore reaction removals
const emojiLabel = reaction.emoji?.trim() || "emoji";
const senderDisplay = formatSignalSenderDisplay(sender);
const senderName = envelope.sourceName ?? senderDisplay;
logVerbose(`signal reaction: ${emojiLabel} from ${senderName}`);
const targets = deps.resolveSignalReactionTargets(reaction);
const shouldNotify = deps.shouldEmitSignalReactionNotification({
mode: deps.reactionMode,
account: deps.account,
targets,
sender,
allowlist: deps.reactionAllowlist,
});
if (!shouldNotify) {
return;
}
const groupId = reaction.groupInfo?.groupId ?? undefined;
const groupName = reaction.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const senderPeerId = resolveSignalPeerId(sender);
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const groupLabel = isGroup ? `${groupName ?? "Signal Group"} id:${groupId}` : undefined;
const messageId = reaction.targetSentTimestamp
? String(reaction.targetSentTimestamp)
: "unknown";
const text = deps.buildSignalReactionSystemEventText({
emojiLabel,
actorLabel: senderName,
messageId,
targetLabel: targets[0]?.display,
groupLabel,
});
const senderId = formatSignalSenderId(sender);
const contextKey = [
"signal",
"reaction",
"added",
messageId,
senderId,
emojiLabel,
groupId ?? "",
]
.filter(Boolean)
.join(":");
enqueueSystemEvent(text, { sessionKey: route.sessionKey, contextKey });
return;
}
if (!dataMessage) {
return;
}
const senderDisplay = formatSignalSenderDisplay(sender);
const senderRecipient = resolveSignalRecipient(sender);
const senderPeerId = resolveSignalPeerId(sender);
const senderAllowId = formatSignalSenderId(sender);
if (!senderRecipient) {
return;
}
const senderIdLine = formatSignalPairingIdLine(sender);
const groupId = dataMessage.groupInfo?.groupId ?? undefined;
const groupName = dataMessage.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const storeAllowFrom = await readChannelAllowFromStore("signal").catch(() => []);
const effectiveDmAllow = [...deps.allowFrom, ...storeAllowFrom];
const effectiveGroupAllow = [...deps.groupAllowFrom, ...storeAllowFrom];
const dmAllowed =
deps.dmPolicy === "open" ? true : isSignalSenderAllowed(sender, effectiveDmAllow);
if (!isGroup) {
if (deps.dmPolicy === "disabled") {
return;
}
if (!dmAllowed) {
if (deps.dmPolicy === "pairing") {
const senderId = senderAllowId;
const { code, created } = await upsertChannelPairingRequest({
channel: "signal",
id: senderId,
meta: { name: envelope.sourceName ?? undefined },
});
if (created) {
logVerbose(`signal pairing request sender=${senderId}`);
try {
await sendMessageSignal(
`signal:${senderRecipient}`,
buildPairingReply({
channel: "signal",
idLine: senderIdLine,
code,
}),
{
baseUrl: deps.baseUrl,
account: deps.account,
maxBytes: deps.mediaMaxBytes,
accountId: deps.accountId,
},
);
} catch (err) {
logVerbose(`signal pairing reply failed for ${senderId}: ${String(err)}`);
}
}
} else {
logVerbose(`Blocked signal sender ${senderDisplay} (dmPolicy=${deps.dmPolicy})`);
}
return;
}
}
if (isGroup && deps.groupPolicy === "disabled") {
logVerbose("Blocked signal group message (groupPolicy: disabled)");
return;
}
if (isGroup && deps.groupPolicy === "allowlist") {
if (effectiveGroupAllow.length === 0) {
logVerbose("Blocked signal group message (groupPolicy: allowlist, no groupAllowFrom)");
return;
}
if (!isSignalSenderAllowed(sender, effectiveGroupAllow)) {
logVerbose(`Blocked signal group sender ${senderDisplay} (not in groupAllowFrom)`);
return;
}
}
const useAccessGroups = deps.cfg.commands?.useAccessGroups !== false;
const ownerAllowedForCommands = isSignalSenderAllowed(sender, effectiveDmAllow);
const groupAllowedForCommands = isSignalSenderAllowed(sender, effectiveGroupAllow);
const hasControlCommandInMessage = hasControlCommand(messageText, deps.cfg);
const commandGate = resolveControlCommandGate({
useAccessGroups,
authorizers: [
{ configured: effectiveDmAllow.length > 0, allowed: ownerAllowedForCommands },
{ configured: effectiveGroupAllow.length > 0, allowed: groupAllowedForCommands },
],
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
});
const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAllowed;
if (isGroup && commandGate.shouldBlock) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "control command (unauthorized)",
target: senderDisplay,
});
return;
}
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const mentionRegexes = buildMentionRegexes(deps.cfg, route.agentId);
const wasMentioned = isGroup && matchesMentionPatterns(messageText, mentionRegexes);
const requireMention =
isGroup &&
resolveChannelGroupRequireMention({
cfg: deps.cfg,
channel: "signal",
groupId,
accountId: deps.accountId,
});
const canDetectMention = mentionRegexes.length > 0;
const mentionGate = resolveMentionGatingWithBypass({
isGroup,
requireMention: Boolean(requireMention),
canDetectMention,
wasMentioned,
implicitMention: false,
hasAnyMention: false,
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
commandAuthorized,
});
const effectiveWasMentioned = mentionGate.effectiveWasMentioned;
if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "no mention",
target: senderDisplay,
});
const quoteText = dataMessage.quote?.text?.trim() || "";
const pendingPlaceholder = (() => {
if (!dataMessage.attachments?.length) {
return "";
}
// When we're skipping a message we intentionally avoid downloading attachments.
// Still record a useful placeholder for pending-history context.
if (deps.ignoreAttachments) {
return "<media:attachment>";
}
const firstContentType = dataMessage.attachments?.[0]?.contentType;
const pendingKind = mediaKindFromMime(firstContentType ?? undefined);
return pendingKind ? `<media:${pendingKind}>` : "<media:attachment>";
})();
const pendingBodyText = messageText || pendingPlaceholder || quoteText;
const historyKey = groupId ?? "unknown";
recordPendingHistoryEntryIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
entry: {
sender: envelope.sourceName ?? senderDisplay,
body: pendingBodyText,
timestamp: envelope.timestamp ?? undefined,
messageId:
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
},
});
return;
}
let mediaPath: string | undefined;
let mediaType: string | undefined;
let placeholder = "";
const firstAttachment = dataMessage.attachments?.[0];
if (firstAttachment?.id && !deps.ignoreAttachments) {
try {
const fetched = await deps.fetchAttachment({
baseUrl: deps.baseUrl,
account: deps.account,
attachment: firstAttachment,
sender: senderRecipient,
groupId,
maxBytes: deps.mediaMaxBytes,
});
if (fetched) {
mediaPath = fetched.path;
mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined;
}
} catch (err) {
deps.runtime.error?.(danger(`attachment fetch failed: ${String(err)}`));
}
}
const kind = mediaKindFromMime(mediaType ?? undefined);
if (kind) {
placeholder = `<media:${kind}>`;
} else if (dataMessage.attachments?.length) {
placeholder = "<media:attachment>";
}
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) {
return;
}
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
: typeof dataMessage.timestamp === "number"
? dataMessage.timestamp
: undefined;
if (deps.sendReadReceipts && !deps.readReceiptsViaDaemon && !isGroup && receiptTimestamp) {
try {
await sendReadReceiptSignal(`signal:${senderRecipient}`, receiptTimestamp, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
} catch (err) {
logVerbose(`signal read receipt failed for ${senderDisplay}: ${String(err)}`);
}
} else if (
deps.sendReadReceipts &&
!deps.readReceiptsViaDaemon &&
!isGroup &&
!receiptTimestamp
) {
logVerbose(`signal read receipt skipped (missing timestamp) for ${senderDisplay}`);
}
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;
await inboundDebouncer.enqueue({
senderName,
senderDisplay,
senderRecipient,
senderPeerId,
groupId,
groupName,
isGroup,
bodyText,
timestamp: envelope.timestamp ?? undefined,
messageId,
mediaPath,
mediaType,
commandAuthorized,
wasMentioned: effectiveWasMentioned,
});
};
}