Files
openclaw/src/signal/monitor.ts
Shawn 654f63e8f8 fix(signal): prevent sentTranscript sync messages from bypassing loop protection (#31093)
* fix(signal): prevent sentTranscript sync messages from bypassing loop protection

Issue: #31084

On daemon restart, sentTranscript sync messages could bypass loop protection
because the syncMessage check happened before the sender validation. This
reorganizes the checks to:

1. First resolve the sender (phone or UUID)
2. Check if the message is from our own account (both phone and UUID)
3. Only skip sync messages from other sources after confirming not own account

This ensures that sync messages from the own account are properly filtered
to prevent self-reply loops, while still allowing messages synced from other
devices to be processed.

Added optional accountUuid config field for UUID-based account identification.

* fix(signal): cover UUID-only own-message loop protection

* build: regenerate host env security policy swift

---------

Co-authored-by: Kevin Wang <kevin@example.com>
Co-authored-by: Peter Steinberger <steipete@gmail.com>
2026-03-02 01:11:22 +00:00

478 lines
15 KiB
TypeScript

import { chunkTextWithMode, resolveChunkMode, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "../auto-reply/reply/history.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { OpenClawConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import {
resolveAllowlistProviderRuntimeGroupPolicy,
resolveDefaultGroupPolicy,
warnMissingProviderGroupPolicyFallbackOnce,
} from "../config/runtime-group-policy.js";
import type { SignalReactionNotificationMode } from "../config/types.js";
import type { BackoffPolicy } from "../infra/backoff.js";
import { waitForTransportReady } from "../infra/transport-ready.js";
import { saveMediaBuffer } from "../media/store.js";
import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeStringEntries } from "../shared/string-normalization.js";
import { normalizeE164 } from "../utils.js";
import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js";
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
import { createSignalEventHandler } from "./monitor/event-handler.js";
import type {
SignalAttachment,
SignalReactionMessage,
SignalReactionTarget,
} from "./monitor/event-handler.types.js";
import { sendMessageSignal } from "./send.js";
import { runSignalSseLoop } from "./sse-reconnect.js";
export type MonitorSignalOpts = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
account?: string;
accountId?: string;
config?: OpenClawConfig;
baseUrl?: string;
autoStart?: boolean;
startupTimeoutMs?: number;
cliPath?: string;
httpHost?: string;
httpPort?: number;
receiveMode?: "on-start" | "manual";
ignoreAttachments?: boolean;
ignoreStories?: boolean;
sendReadReceipts?: boolean;
allowFrom?: Array<string | number>;
groupAllowFrom?: Array<string | number>;
mediaMaxMb?: number;
reconnectPolicy?: Partial<BackoffPolicy>;
};
function resolveRuntime(opts: MonitorSignalOpts): RuntimeEnv {
return opts.runtime ?? createNonExitingRuntime();
}
function mergeAbortSignals(
a?: AbortSignal,
b?: AbortSignal,
): { signal?: AbortSignal; dispose: () => void } {
if (!a && !b) {
return { signal: undefined, dispose: () => {} };
}
if (!a) {
return { signal: b, dispose: () => {} };
}
if (!b) {
return { signal: a, dispose: () => {} };
}
const controller = new AbortController();
const abortFrom = (source: AbortSignal) => {
if (!controller.signal.aborted) {
controller.abort(source.reason);
}
};
if (a.aborted) {
abortFrom(a);
return { signal: controller.signal, dispose: () => {} };
}
if (b.aborted) {
abortFrom(b);
return { signal: controller.signal, dispose: () => {} };
}
const onAbortA = () => abortFrom(a);
const onAbortB = () => abortFrom(b);
a.addEventListener("abort", onAbortA, { once: true });
b.addEventListener("abort", onAbortB, { once: true });
return {
signal: controller.signal,
dispose: () => {
a.removeEventListener("abort", onAbortA);
b.removeEventListener("abort", onAbortB);
},
};
}
function createSignalDaemonLifecycle(params: { abortSignal?: AbortSignal }) {
let daemonHandle: SignalDaemonHandle | null = null;
let daemonStopRequested = false;
let daemonExitError: Error | undefined;
const daemonAbortController = new AbortController();
const mergedAbort = mergeAbortSignals(params.abortSignal, daemonAbortController.signal);
const stop = () => {
daemonStopRequested = true;
daemonHandle?.stop();
};
const attach = (handle: SignalDaemonHandle) => {
daemonHandle = handle;
void handle.exited.then((exit) => {
if (daemonStopRequested || params.abortSignal?.aborted) {
return;
}
daemonExitError = new Error(formatSignalDaemonExit(exit));
if (!daemonAbortController.signal.aborted) {
daemonAbortController.abort(daemonExitError);
}
});
};
const getExitError = () => daemonExitError;
return {
attach,
stop,
getExitError,
abortSignal: mergedAbort.signal,
dispose: mergedAbort.dispose,
};
}
function normalizeAllowList(raw?: Array<string | number>): string[] {
return normalizeStringEntries(raw);
}
function resolveSignalReactionTargets(reaction: SignalReactionMessage): SignalReactionTarget[] {
const targets: SignalReactionTarget[] = [];
const uuid = reaction.targetAuthorUuid?.trim();
if (uuid) {
targets.push({ kind: "uuid", id: uuid, display: `uuid:${uuid}` });
}
const author = reaction.targetAuthor?.trim();
if (author) {
const normalized = normalizeE164(author);
targets.push({ kind: "phone", id: normalized, display: normalized });
}
return targets;
}
function isSignalReactionMessage(
reaction: SignalReactionMessage | null | undefined,
): reaction is SignalReactionMessage {
if (!reaction) {
return false;
}
const emoji = reaction.emoji?.trim();
const timestamp = reaction.targetSentTimestamp;
const hasTarget = Boolean(reaction.targetAuthor?.trim() || reaction.targetAuthorUuid?.trim());
return Boolean(emoji && typeof timestamp === "number" && timestamp > 0 && hasTarget);
}
function shouldEmitSignalReactionNotification(params: {
mode?: SignalReactionNotificationMode;
account?: string | null;
targets?: SignalReactionTarget[];
sender?: ReturnType<typeof resolveSignalSender> | null;
allowlist?: string[];
}) {
const { mode, account, targets, sender, allowlist } = params;
const effectiveMode = mode ?? "own";
if (effectiveMode === "off") {
return false;
}
if (effectiveMode === "own") {
const accountId = account?.trim();
if (!accountId || !targets || targets.length === 0) {
return false;
}
const normalizedAccount = normalizeE164(accountId);
return targets.some((target) => {
if (target.kind === "uuid") {
return accountId === target.id || accountId === `uuid:${target.id}`;
}
return normalizedAccount === target.id;
});
}
if (effectiveMode === "allowlist") {
if (!sender || !allowlist || allowlist.length === 0) {
return false;
}
return isSignalSenderAllowed(sender, allowlist);
}
return true;
}
function buildSignalReactionSystemEventText(params: {
emojiLabel: string;
actorLabel: string;
messageId: string;
targetLabel?: string;
groupLabel?: string;
}) {
const base = `Signal reaction added: ${params.emojiLabel} by ${params.actorLabel} msg ${params.messageId}`;
const withTarget = params.targetLabel ? `${base} from ${params.targetLabel}` : base;
return params.groupLabel ? `${withTarget} in ${params.groupLabel}` : withTarget;
}
async function waitForSignalDaemonReady(params: {
baseUrl: string;
abortSignal?: AbortSignal;
timeoutMs: number;
logAfterMs: number;
logIntervalMs?: number;
runtime: RuntimeEnv;
}): Promise<void> {
await waitForTransportReady({
label: "signal daemon",
timeoutMs: params.timeoutMs,
logAfterMs: params.logAfterMs,
logIntervalMs: params.logIntervalMs,
pollIntervalMs: 150,
abortSignal: params.abortSignal,
runtime: params.runtime,
check: async () => {
const res = await signalCheck(params.baseUrl, 1000);
if (res.ok) {
return { ok: true };
}
return {
ok: false,
error: res.error ?? (res.status ? `HTTP ${res.status}` : "unreachable"),
};
},
});
}
async function fetchAttachment(params: {
baseUrl: string;
account?: string;
attachment: SignalAttachment;
sender?: string;
groupId?: string;
maxBytes: number;
}): Promise<{ path: string; contentType?: string } | null> {
const { attachment } = params;
if (!attachment?.id) {
return null;
}
if (attachment.size && attachment.size > params.maxBytes) {
throw new Error(
`Signal attachment ${attachment.id} exceeds ${(params.maxBytes / (1024 * 1024)).toFixed(0)}MB limit`,
);
}
const rpcParams: Record<string, unknown> = {
id: attachment.id,
};
if (params.account) {
rpcParams.account = params.account;
}
if (params.groupId) {
rpcParams.groupId = params.groupId;
} else if (params.sender) {
rpcParams.recipient = params.sender;
} else {
return null;
}
const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl,
});
if (!result?.data) {
return null;
}
const buffer = Buffer.from(result.data, "base64");
const saved = await saveMediaBuffer(
buffer,
attachment.contentType ?? undefined,
"inbound",
params.maxBytes,
);
return { path: saved.path, contentType: saved.contentType };
}
async function deliverReplies(params: {
replies: ReplyPayload[];
target: string;
baseUrl: string;
account?: string;
accountId?: string;
runtime: RuntimeEnv;
maxBytes: number;
textLimit: number;
chunkMode: "length" | "newline";
}) {
const { replies, target, baseUrl, account, accountId, runtime, maxBytes, textLimit, chunkMode } =
params;
for (const payload of replies) {
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const text = payload.text ?? "";
if (!text && mediaList.length === 0) {
continue;
}
if (mediaList.length === 0) {
for (const chunk of chunkTextWithMode(text, textLimit, chunkMode)) {
await sendMessageSignal(target, chunk, {
baseUrl,
account,
maxBytes,
accountId,
});
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? text : "";
first = false;
await sendMessageSignal(target, caption, {
baseUrl,
account,
mediaUrl: url,
maxBytes,
accountId,
});
}
}
runtime.log?.(`delivered reply to ${target}`);
}
}
export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promise<void> {
const runtime = resolveRuntime(opts);
const cfg = opts.config ?? loadConfig();
const accountInfo = resolveSignalAccount({
cfg,
accountId: opts.accountId,
});
const historyLimit = Math.max(
0,
accountInfo.config.historyLimit ??
cfg.messages?.groupChat?.historyLimit ??
DEFAULT_GROUP_HISTORY_LIMIT,
);
const groupHistories = new Map<string, HistoryEntry[]>();
const textLimit = resolveTextChunkLimit(cfg, "signal", accountInfo.accountId);
const chunkMode = resolveChunkMode(cfg, "signal", accountInfo.accountId);
const baseUrl = opts.baseUrl?.trim() || accountInfo.baseUrl;
const account = opts.account?.trim() || accountInfo.config.account?.trim();
const dmPolicy = accountInfo.config.dmPolicy ?? "pairing";
const allowFrom = normalizeAllowList(opts.allowFrom ?? accountInfo.config.allowFrom);
const groupAllowFrom = normalizeAllowList(
opts.groupAllowFrom ??
accountInfo.config.groupAllowFrom ??
(accountInfo.config.allowFrom && accountInfo.config.allowFrom.length > 0
? accountInfo.config.allowFrom
: []),
);
const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg);
const { groupPolicy, providerMissingFallbackApplied } =
resolveAllowlistProviderRuntimeGroupPolicy({
providerConfigPresent: cfg.channels?.signal !== undefined,
groupPolicy: accountInfo.config.groupPolicy,
defaultGroupPolicy,
});
warnMissingProviderGroupPolicyFallbackOnce({
providerMissingFallbackApplied,
providerKey: "signal",
accountId: accountInfo.accountId,
log: (message) => runtime.log?.(message),
});
const reactionMode = accountInfo.config.reactionNotifications ?? "own";
const reactionAllowlist = normalizeAllowList(accountInfo.config.reactionAllowlist);
const mediaMaxBytes = (opts.mediaMaxMb ?? accountInfo.config.mediaMaxMb ?? 8) * 1024 * 1024;
const ignoreAttachments = opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments ?? false;
const sendReadReceipts = Boolean(opts.sendReadReceipts ?? accountInfo.config.sendReadReceipts);
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
const startupTimeoutMs = Math.min(
120_000,
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
);
const readReceiptsViaDaemon = Boolean(autoStart && sendReadReceipts);
const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal });
let daemonHandle: SignalDaemonHandle | null = null;
if (autoStart) {
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1";
const httpPort = opts.httpPort ?? accountInfo.config.httpPort ?? 8080;
daemonHandle = spawnSignalDaemon({
cliPath,
account,
httpHost,
httpPort,
receiveMode: opts.receiveMode ?? accountInfo.config.receiveMode,
ignoreAttachments: opts.ignoreAttachments ?? accountInfo.config.ignoreAttachments,
ignoreStories: opts.ignoreStories ?? accountInfo.config.ignoreStories,
sendReadReceipts,
runtime,
});
daemonLifecycle.attach(daemonHandle);
}
const onAbort = () => {
daemonLifecycle.stop();
};
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
try {
if (daemonHandle) {
await waitForSignalDaemonReady({
baseUrl,
abortSignal: daemonLifecycle.abortSignal,
timeoutMs: startupTimeoutMs,
logAfterMs: 10_000,
logIntervalMs: 10_000,
runtime,
});
const daemonExitError = daemonLifecycle.getExitError();
if (daemonExitError) {
throw daemonExitError;
}
}
const handleEvent = createSignalEventHandler({
runtime,
cfg,
baseUrl,
account,
accountUuid: accountInfo.config.accountUuid,
accountId: accountInfo.accountId,
blockStreaming: accountInfo.config.blockStreaming,
historyLimit,
groupHistories,
textLimit,
dmPolicy,
allowFrom,
groupAllowFrom,
groupPolicy,
reactionMode,
reactionAllowlist,
mediaMaxBytes,
ignoreAttachments,
sendReadReceipts,
readReceiptsViaDaemon,
fetchAttachment,
deliverReplies: (params) => deliverReplies({ ...params, chunkMode }),
resolveSignalReactionTargets,
isSignalReactionMessage,
shouldEmitSignalReactionNotification,
buildSignalReactionSystemEventText,
});
await runSignalSseLoop({
baseUrl,
account,
abortSignal: daemonLifecycle.abortSignal,
runtime,
policy: opts.reconnectPolicy,
onEvent: (event) => {
void handleEvent(event).catch((err) => {
runtime.error?.(`event handler failed: ${String(err)}`);
});
},
});
const daemonExitError = daemonLifecycle.getExitError();
if (daemonExitError) {
throw daemonExitError;
}
} catch (err) {
const daemonExitError = daemonLifecycle.getExitError();
if (opts.abortSignal?.aborted && !daemonExitError) {
return;
}
throw err;
} finally {
daemonLifecycle.dispose();
opts.abortSignal?.removeEventListener("abort", onAbort);
daemonLifecycle.stop();
}
}