Files
openclaw/src/discord/monitor/message-handler.process.ts

416 lines
14 KiB
TypeScript
Raw Normal View History

2026-01-23 23:04:09 +00:00
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
import {
removeAckReactionAfterReply,
shouldAckReaction as shouldAckReactionGate,
} from "../../channels/ack-reactions.js";
2026-01-23 23:20:07 +00:00
import { logTypingFailure, logAckFailure } from "../../channels/logging.js";
2026-01-23 23:04:09 +00:00
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
2026-01-23 22:55:41 +00:00
import { createTypingCallbacks } from "../../channels/typing.js";
import {
formatInboundEnvelope,
formatThreadStarterEnvelope,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
2026-01-23 22:51:37 +00:00
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
} from "../../auto-reply/reply/history.js";
2026-01-17 05:04:29 +00:00
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
2026-01-14 01:08:15 +00:00
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { recordInboundSession } from "../../channels/session.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
2026-01-14 01:08:15 +00:00
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
import { truncateUtf16Safe } from "../../utils.js";
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
import { normalizeDiscordSlug } from "./allow-list.js";
import { formatDiscordUserTag, resolveTimestampMs } from "./format.js";
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
import {
buildDiscordMediaPayload,
resolveDiscordMessageText,
resolveMediaList,
} from "./message-utils.js";
import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js";
2026-01-14 01:08:15 +00:00
import { deliverDiscordReply } from "./reply-delivery.js";
2026-01-14 17:10:16 -08:00
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
2026-01-14 01:08:15 +00:00
import { sendTyping } from "./typing.js";
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
2026-01-14 01:08:15 +00:00
const {
cfg,
discordConfig,
accountId,
token,
runtime,
guildHistories,
historyLimit,
mediaMaxBytes,
textLimit,
replyToMode,
ackReactionScope,
message,
author,
data,
client,
channelInfo,
channelName,
isGuildMessage,
isDirectMessage,
isGroupDm,
baseText,
messageText,
shouldRequireMention,
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
2026-01-14 01:08:15 +00:00
threadChannel,
threadParentId,
threadParentName,
threadParentType,
threadName,
displayChannelSlug,
guildInfo,
guildSlug,
channelConfig,
baseSessionKey,
route,
commandAuthorized,
} = ctx;
const mediaList = await resolveMediaList(message, mediaMaxBytes);
const text = messageText;
if (!text) {
logVerbose(`discord: drop message ${message.id} (empty content)`);
return;
}
const ackReaction = resolveAckReaction(cfg, route.agentId);
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const shouldAckReaction = () =>
Boolean(
ackReaction &&
shouldAckReactionGate({
scope: ackReactionScope,
isDirect: isDirectMessage,
isGroup: isGuildMessage || isGroupDm,
isMentionableGroup: isGuildMessage,
requireMention: Boolean(shouldRequireMention),
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
}),
);
2026-01-14 01:08:15 +00:00
const ackReactionPromise = shouldAckReaction()
? reactMessageDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
}).then(
() => true,
(err) => {
logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`);
2026-01-14 01:08:15 +00:00
return false;
},
)
: null;
const fromLabel = isDirectMessage
? buildDirectLabel(author)
: buildGuildLabel({
guild: data.guild ?? undefined,
channelName: channelName ?? message.channelId,
channelId: message.channelId,
});
2026-01-17 05:21:02 +00:00
const senderTag = formatDiscordUserTag(author);
const senderDisplay = data.member?.nickname ?? author.globalName ?? author.username;
const senderLabel =
senderDisplay && senderTag && senderDisplay !== senderTag
? `${senderDisplay} (${senderTag})`
2026-01-17 05:48:34 +00:00
: (senderDisplay ?? senderTag ?? author.id);
2026-01-17 07:41:01 +00:00
const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined;
const groupSubject = isDirectMessage ? undefined : groupChannel;
2026-01-14 01:08:15 +00:00
const channelDescription = channelInfo?.topic?.trim();
const systemPromptParts = [
channelDescription ? `Channel topic: ${channelDescription}` : null,
channelConfig?.systemPrompt?.trim() || null,
].filter((entry): entry is string => Boolean(entry));
const groupSystemPrompt =
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
2026-01-17 05:21:02 +00:00
let combinedBody = formatInboundEnvelope({
2026-01-14 01:08:15 +00:00
channel: "Discord",
from: fromLabel,
timestamp: resolveTimestampMs(message.timestamp),
body: text,
2026-01-17 05:21:02 +00:00
chatType: isDirectMessage ? "direct" : "channel",
senderLabel,
previousTimestamp,
envelope: envelopeOptions,
2026-01-14 01:08:15 +00:00
});
const shouldIncludeChannelHistory =
!isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel);
if (shouldIncludeChannelHistory) {
combinedBody = buildPendingHistoryContextFromMap({
2026-01-14 01:08:15 +00:00
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
2026-01-17 05:21:02 +00:00
formatInboundEnvelope({
2026-01-14 01:08:15 +00:00
channel: "Discord",
from: fromLabel,
timestamp: entry.timestamp,
2026-01-17 05:21:02 +00:00
body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`,
chatType: "channel",
senderLabel: entry.sender,
envelope: envelopeOptions,
2026-01-14 01:08:15 +00:00
}),
});
}
const replyContext = resolveReplyContext(message, resolveDiscordMessageText, {
envelope: envelopeOptions,
});
2026-01-14 01:08:15 +00:00
if (replyContext) {
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
}
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter({
channel: threadChannel,
client,
parentId: threadParentId,
parentType: threadParentType,
resolveTimestampMs,
});
if (starter?.text) {
const starterEnvelope = formatThreadStarterEnvelope({
channel: "Discord",
author: starter.author,
timestamp: starter.timestamp,
body: starter.text,
envelope: envelopeOptions,
2026-01-14 01:08:15 +00:00
});
threadStarterBody = starterEnvelope;
}
const parentName = threadParentName ?? "parent";
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)} ${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
if (threadParentId) {
parentSessionKey = buildAgentSessionKey({
agentId: route.agentId,
channel: route.channel,
peer: { kind: "channel", id: threadParentId },
});
}
2026-01-14 17:10:16 -08:00
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
const threadKeys = resolveThreadSessionKeys({
baseSessionKey,
threadId: threadChannel ? message.channelId : undefined,
parentSessionKey,
useSuffix: false,
});
const replyPlan = await resolveDiscordAutoThreadReplyPlan({
client,
message,
isGuildMessage,
channelConfig,
threadChannel,
baseText: baseText ?? "",
combinedBody,
replyToMode,
agentId: route.agentId,
channel: route.channel,
});
const deliverTarget = replyPlan.deliverTarget;
const replyTarget = replyPlan.replyTarget;
const replyReference = replyPlan.replyReference;
const autoThreadContext = replyPlan.autoThreadContext;
2026-01-14 17:10:16 -08:00
const effectiveFrom = isDirectMessage
? `discord:${author.id}`
2026-01-17 08:46:19 +00:00
: (autoThreadContext?.From ?? `discord:channel:${message.channelId}`);
2026-01-14 17:10:16 -08:00
const effectiveTo = autoThreadContext?.To ?? replyTarget;
if (!effectiveTo) {
runtime.error?.(danger("discord: missing reply target"));
return;
}
2026-01-17 05:04:29 +00:00
const ctxPayload = finalizeInboundContext({
2026-01-14 17:10:16 -08:00
Body: combinedBody,
RawBody: baseText,
CommandBody: baseText,
From: effectiveFrom,
To: effectiveTo,
SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey,
AccountId: route.accountId,
2026-01-17 04:04:05 +00:00
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: fromLabel,
2026-01-14 17:10:16 -08:00
SenderName: data.member?.nickname ?? author.globalName ?? author.username,
SenderId: author.id,
SenderUsername: author.username,
SenderTag: formatDiscordUserTag(author),
GroupSubject: groupSubject,
2026-01-17 07:41:01 +00:00
GroupChannel: groupChannel,
2026-01-14 17:10:16 -08:00
GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined,
GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined,
Provider: "discord" as const,
Surface: "discord" as const,
WasMentioned: effectiveWasMentioned,
MessageSid: message.id,
ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: resolveTimestampMs(message.timestamp),
...mediaPayload,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
// Originating channel for reply routing.
OriginatingChannel: "discord" as const,
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
2026-01-17 05:04:29 +00:00
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
},
});
2026-01-14 17:10:16 -08:00
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
logVerbose(
`discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`,
);
}
2026-01-14 01:08:15 +00:00
2026-01-14 17:10:16 -08:00
const typingChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: message.channelId;
2026-01-23 23:04:09 +00:00
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
const tableMode = resolveMarkdownTableMode({
cfg,
channel: "discord",
accountId,
});
2026-01-14 17:10:16 -08:00
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
2026-01-23 23:04:09 +00:00
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
2026-01-14 17:10:16 -08:00
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: client.rest,
runtime,
replyToId,
textLimit,
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
tableMode,
});
replyReference.markSent();
},
2026-01-14 17:10:16 -08:00
onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
},
2026-01-23 22:55:41 +00:00
onReplyStart: createTypingCallbacks({
start: () => sendTyping({ client, channelId: typingChannelId }),
onStartError: (err) => {
2026-01-23 23:20:07 +00:00
logTypingFailure({
log: logVerbose,
channel: "discord",
target: typingChannelId,
error: err,
});
2026-01-23 22:55:41 +00:00
},
}).onReplyStart,
2026-01-14 17:10:16 -08:00
});
2026-01-14 01:08:15 +00:00
2026-01-23 22:51:37 +00:00
const { queuedFinal, counts } = await dispatchInboundMessage({
2026-01-14 01:08:15 +00:00
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
typeof discordConfig?.blockStreaming === "boolean"
? !discordConfig.blockStreaming
: undefined,
onModelSelected: (ctx) => {
2026-01-23 23:04:09 +00:00
prefixContext.onModelSelected(ctx);
},
2026-01-14 01:08:15 +00:00
},
});
markDispatchIdle();
if (!queuedFinal) {
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
2026-01-14 01:08:15 +00:00
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
2026-01-14 01:08:15 +00:00
});
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
}
removeAckReactionAfterReply({
removeAfterReply: removeAckAfterReply,
ackReactionPromise,
ackReactionValue: ackReaction,
2026-01-23 23:30:52 +00:00
remove: async () => {
await removeReactionDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
});
},
onError: (err) => {
2026-01-23 23:20:07 +00:00
logAckFailure({
log: logVerbose,
channel: "discord",
target: `${message.channelId}/${message.id}`,
error: err,
});
},
});
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
2026-01-14 01:08:15 +00:00
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
2026-01-14 01:08:15 +00:00
});
}
}