Files
openclaw/src/discord/monitor/message-handler.process.ts

380 lines
13 KiB
TypeScript
Raw Normal View History

2026-01-14 01:08:15 +00:00
import {
resolveAckReaction,
resolveEffectiveMessagesConfig,
resolveHumanDelayConfig,
resolveIdentityName,
2026-01-14 01:08:15 +00:00
} from "../../agents/identity.js";
import {
extractShortModelName,
type ResponsePrefixContext,
} from "../../auto-reply/reply/response-prefix-template.js";
2026-01-17 05:48:34 +00:00
import { formatInboundEnvelope, formatThreadStarterEnvelope } from "../../auto-reply/envelope.js";
2026-01-14 01:08:15 +00:00
import { dispatchReplyFromConfig } from "../../auto-reply/reply/dispatch-from-config.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntries,
} from "../../auto-reply/reply/history.js";
2026-01-17 05:04:29 +00:00
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
2026-01-14 01:08:15 +00:00
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveStorePath, updateLastRoute } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
import { truncateUtf16Safe } from "../../utils.js";
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
import { normalizeDiscordSlug } from "./allow-list.js";
import { formatDiscordUserTag, resolveTimestampMs } from "./format.js";
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
import {
buildDiscordMediaPayload,
resolveDiscordMessageText,
resolveMediaList,
} from "./message-utils.js";
import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js";
2026-01-14 01:08:15 +00:00
import { deliverDiscordReply } from "./reply-delivery.js";
2026-01-14 17:10:16 -08:00
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
2026-01-14 01:08:15 +00:00
import { sendTyping } from "./typing.js";
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
2026-01-14 01:08:15 +00:00
const {
cfg,
discordConfig,
accountId,
token,
runtime,
guildHistories,
historyLimit,
mediaMaxBytes,
textLimit,
replyToMode,
ackReactionScope,
message,
author,
data,
client,
channelInfo,
channelName,
isGuildMessage,
isDirectMessage,
isGroupDm,
baseText,
messageText,
shouldRequireMention,
canDetectMention,
effectiveWasMentioned,
threadChannel,
threadParentId,
threadParentName,
threadParentType,
threadName,
displayChannelSlug,
guildInfo,
guildSlug,
channelConfig,
baseSessionKey,
route,
commandAuthorized,
} = ctx;
const mediaList = await resolveMediaList(message, mediaMaxBytes);
const text = messageText;
if (!text) {
logVerbose(`discord: drop message ${message.id} (empty content)`);
return;
}
const ackReaction = resolveAckReaction(cfg, route.agentId);
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const shouldAckReaction = () => {
if (!ackReaction) return false;
if (ackReactionScope === "all") return true;
if (ackReactionScope === "direct") return isDirectMessage;
const isGroupChat = isGuildMessage || isGroupDm;
if (ackReactionScope === "group-all") return isGroupChat;
if (ackReactionScope === "group-mentions") {
if (!isGuildMessage) return false;
if (!shouldRequireMention) return false;
if (!canDetectMention) return false;
return effectiveWasMentioned;
2026-01-14 01:08:15 +00:00
}
return false;
};
const ackReactionPromise = shouldAckReaction()
? reactMessageDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
}).then(
() => true,
(err) => {
logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`);
2026-01-14 01:08:15 +00:00
return false;
},
)
: null;
const fromLabel = isDirectMessage
? buildDirectLabel(author)
: buildGuildLabel({
guild: data.guild ?? undefined,
channelName: channelName ?? message.channelId,
channelId: message.channelId,
});
2026-01-17 05:21:02 +00:00
const senderTag = formatDiscordUserTag(author);
const senderDisplay = data.member?.nickname ?? author.globalName ?? author.username;
const senderLabel =
senderDisplay && senderTag && senderDisplay !== senderTag
? `${senderDisplay} (${senderTag})`
2026-01-17 05:48:34 +00:00
: (senderDisplay ?? senderTag ?? author.id);
2026-01-17 07:41:01 +00:00
const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined;
const groupSubject = isDirectMessage ? undefined : groupChannel;
2026-01-14 01:08:15 +00:00
const channelDescription = channelInfo?.topic?.trim();
const systemPromptParts = [
channelDescription ? `Channel topic: ${channelDescription}` : null,
channelConfig?.systemPrompt?.trim() || null,
].filter((entry): entry is string => Boolean(entry));
const groupSystemPrompt =
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
2026-01-17 05:21:02 +00:00
let combinedBody = formatInboundEnvelope({
2026-01-14 01:08:15 +00:00
channel: "Discord",
from: fromLabel,
timestamp: resolveTimestampMs(message.timestamp),
body: text,
2026-01-17 05:21:02 +00:00
chatType: isDirectMessage ? "direct" : "channel",
senderLabel,
2026-01-14 01:08:15 +00:00
});
const shouldIncludeChannelHistory =
!isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel);
if (shouldIncludeChannelHistory) {
combinedBody = buildPendingHistoryContextFromMap({
2026-01-14 01:08:15 +00:00
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
2026-01-17 05:21:02 +00:00
formatInboundEnvelope({
2026-01-14 01:08:15 +00:00
channel: "Discord",
from: fromLabel,
timestamp: entry.timestamp,
2026-01-17 05:21:02 +00:00
body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`,
chatType: "channel",
senderLabel: entry.sender,
2026-01-14 01:08:15 +00:00
}),
});
}
2026-01-14 01:08:15 +00:00
const replyContext = resolveReplyContext(message, resolveDiscordMessageText);
if (replyContext) {
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
}
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter({
channel: threadChannel,
client,
parentId: threadParentId,
parentType: threadParentType,
resolveTimestampMs,
});
if (starter?.text) {
const starterEnvelope = formatThreadStarterEnvelope({
channel: "Discord",
author: starter.author,
timestamp: starter.timestamp,
body: starter.text,
});
threadStarterBody = starterEnvelope;
}
const parentName = threadParentName ?? "parent";
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)} ${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
if (threadParentId) {
parentSessionKey = buildAgentSessionKey({
agentId: route.agentId,
channel: route.channel,
peer: { kind: "channel", id: threadParentId },
});
}
2026-01-14 17:10:16 -08:00
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
const threadKeys = resolveThreadSessionKeys({
baseSessionKey,
threadId: threadChannel ? message.channelId : undefined,
parentSessionKey,
useSuffix: false,
});
const replyPlan = await resolveDiscordAutoThreadReplyPlan({
client,
message,
isGuildMessage,
channelConfig,
threadChannel,
baseText: baseText ?? "",
combinedBody,
replyToMode,
agentId: route.agentId,
channel: route.channel,
});
const deliverTarget = replyPlan.deliverTarget;
const replyTarget = replyPlan.replyTarget;
const replyReference = replyPlan.replyReference;
const autoThreadContext = replyPlan.autoThreadContext;
2026-01-14 17:10:16 -08:00
const effectiveFrom = isDirectMessage
? `discord:${author.id}`
2026-01-17 08:46:19 +00:00
: (autoThreadContext?.From ?? `discord:channel:${message.channelId}`);
2026-01-14 17:10:16 -08:00
const effectiveTo = autoThreadContext?.To ?? replyTarget;
if (!effectiveTo) {
runtime.error?.(danger("discord: missing reply target"));
return;
}
2026-01-17 05:04:29 +00:00
const ctxPayload = finalizeInboundContext({
2026-01-14 17:10:16 -08:00
Body: combinedBody,
RawBody: baseText,
CommandBody: baseText,
From: effectiveFrom,
To: effectiveTo,
SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey,
AccountId: route.accountId,
2026-01-17 04:04:05 +00:00
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: fromLabel,
2026-01-14 17:10:16 -08:00
SenderName: data.member?.nickname ?? author.globalName ?? author.username,
SenderId: author.id,
SenderUsername: author.username,
SenderTag: formatDiscordUserTag(author),
GroupSubject: groupSubject,
2026-01-17 07:41:01 +00:00
GroupChannel: groupChannel,
2026-01-14 17:10:16 -08:00
GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined,
GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined,
Provider: "discord" as const,
Surface: "discord" as const,
WasMentioned: effectiveWasMentioned,
MessageSid: message.id,
ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: resolveTimestampMs(message.timestamp),
...mediaPayload,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
// Originating channel for reply routing.
OriginatingChannel: "discord" as const,
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
2026-01-17 05:04:29 +00:00
});
2026-01-14 01:08:15 +00:00
if (isDirectMessage) {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
},
2026-01-14 01:08:15 +00:00
});
}
2026-01-14 17:10:16 -08:00
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
logVerbose(
`discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`,
);
}
2026-01-14 01:08:15 +00:00
2026-01-14 17:10:16 -08:00
const typingChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: message.channelId;
// Create mutable context for response prefix template interpolation
let prefixContext: ResponsePrefixContext = {
identityName: resolveIdentityName(cfg, route.agentId),
};
2026-01-14 17:10:16 -08:00
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId).responsePrefix,
responsePrefixContextProvider: () => prefixContext,
2026-01-14 17:10:16 -08:00
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: client.rest,
runtime,
replyToId,
textLimit,
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
});
replyReference.markSent();
},
2026-01-14 17:10:16 -08:00
onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: () => sendTyping({ client, channelId: typingChannelId }),
});
2026-01-14 01:08:15 +00:00
const { queuedFinal, counts } = await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
typeof discordConfig?.blockStreaming === "boolean"
? !discordConfig.blockStreaming
: undefined,
onModelSelected: (ctx) => {
// Mutate the object directly instead of reassigning to ensure the closure sees updates
prefixContext.provider = ctx.provider;
prefixContext.model = extractShortModelName(ctx.model);
prefixContext.modelFull = `${ctx.provider}/${ctx.model}`;
prefixContext.thinkingLevel = ctx.thinkLevel ?? "off";
},
2026-01-14 01:08:15 +00:00
},
});
markDispatchIdle();
if (!queuedFinal) {
if (isGuildMessage && historyLimit > 0) {
2026-01-14 01:08:15 +00:00
clearHistoryEntries({
historyMap: guildHistories,
historyKey: message.channelId,
});
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
}
if (removeAckAfterReply && ackReactionPromise && ackReaction) {
const ackReactionValue = ackReaction;
void ackReactionPromise.then((didAck) => {
if (!didAck) return;
removeReactionDiscord(message.channelId, message.id, ackReactionValue, {
rest: client.rest,
}).catch((err) => {
logVerbose(
`discord: failed to remove ack reaction from ${message.channelId}/${message.id}: ${String(err)}`,
);
});
});
}
if (isGuildMessage && historyLimit > 0) {
2026-01-14 01:08:15 +00:00
clearHistoryEntries({
historyMap: guildHistories,
historyKey: message.channelId,
});
}
}