import { ChannelType } from "@buape/carbon"; import type { ReplyPayload } from "../../auto-reply/types.js"; import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js"; import { resolveChunkMode } from "../../auto-reply/chunk.js"; import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { formatInboundEnvelope, resolveEnvelopeFormatOptions } from "../../auto-reply/envelope.js"; import { buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, } from "../../auto-reply/reply/history.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; import { shouldAckReaction as shouldAckReactionGate } from "../../channels/ack-reactions.js"; import { logTypingFailure, logAckFailure } from "../../channels/logging.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { recordInboundSession } from "../../channels/session.js"; import { createTypingCallbacks } from "../../channels/typing.js"; import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { buildAgentSessionKey } from "../../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../../routing/session-key.js"; import { buildUntrustedChannelMetadata } from "../../security/channel-metadata.js"; import { truncateUtf16Safe } from "../../utils.js"; import { reactMessageDiscord, removeReactionDiscord } from "../send.js"; import { normalizeDiscordSlug, resolveDiscordOwnerAllowFrom } from "./allow-list.js"; import { resolveTimestampMs } from "./format.js"; import { buildDiscordMediaPayload, resolveDiscordMessageText, resolveForwardedMediaList, resolveMediaList, } from "./message-utils.js"; import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js"; import { deliverDiscordReply } from "./reply-delivery.js"; import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; import { sendTyping } from "./typing.js"; const DISCORD_STATUS_THINKING_EMOJI = "🧠"; const DISCORD_STATUS_TOOL_EMOJI = "🛠️"; const DISCORD_STATUS_CODING_EMOJI = "💻"; const DISCORD_STATUS_WEB_EMOJI = "🌐"; const DISCORD_STATUS_DONE_EMOJI = "✅"; const DISCORD_STATUS_ERROR_EMOJI = "❌"; const DISCORD_STATUS_STALL_SOFT_EMOJI = "⏳"; const DISCORD_STATUS_STALL_HARD_EMOJI = "⚠️"; const DISCORD_STATUS_DONE_HOLD_MS = 1500; const DISCORD_STATUS_ERROR_HOLD_MS = 2500; const DISCORD_STATUS_DEBOUNCE_MS = 700; const DISCORD_STATUS_STALL_SOFT_MS = 10_000; const DISCORD_STATUS_STALL_HARD_MS = 30_000; const CODING_STATUS_TOOL_TOKENS = [ "exec", "process", "read", "write", "edit", "session_status", "bash", ]; const WEB_STATUS_TOOL_TOKENS = ["web_search", "web-search", "web_fetch", "web-fetch", "browser"]; function resolveToolStatusEmoji(toolName?: string): string { const normalized = toolName?.trim().toLowerCase() ?? ""; if (!normalized) { return DISCORD_STATUS_TOOL_EMOJI; } if (WEB_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) { return DISCORD_STATUS_WEB_EMOJI; } if (CODING_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) { return DISCORD_STATUS_CODING_EMOJI; } return DISCORD_STATUS_TOOL_EMOJI; } function sleep(ms: number): Promise { return new Promise((resolve) => { setTimeout(resolve, ms); }); } function createDiscordStatusReactionController(params: { enabled: boolean; channelId: string; messageId: string; initialEmoji: string; rest: unknown; }) { let activeEmoji: string | null = null; let chain: Promise = Promise.resolve(); let pendingEmoji: string | null = null; let pendingTimer: ReturnType | null = null; let finished = false; let softStallTimer: ReturnType | null = null; let hardStallTimer: ReturnType | null = null; const enqueue = (work: () => Promise) => { chain = chain.then(work).catch((err) => { logAckFailure({ log: logVerbose, channel: "discord", target: `${params.channelId}/${params.messageId}`, error: err, }); }); return chain; }; const clearStallTimers = () => { if (softStallTimer) { clearTimeout(softStallTimer); softStallTimer = null; } if (hardStallTimer) { clearTimeout(hardStallTimer); hardStallTimer = null; } }; const clearPendingDebounce = () => { if (pendingTimer) { clearTimeout(pendingTimer); pendingTimer = null; } pendingEmoji = null; }; const applyEmoji = (emoji: string) => enqueue(async () => { if (!params.enabled || !emoji || activeEmoji === emoji) { return; } const previousEmoji = activeEmoji; await reactMessageDiscord(params.channelId, params.messageId, emoji, { rest: params.rest as never, }); activeEmoji = emoji; if (previousEmoji && previousEmoji !== emoji) { await removeReactionDiscord(params.channelId, params.messageId, previousEmoji, { rest: params.rest as never, }); } }); const requestEmoji = (emoji: string, options?: { immediate?: boolean }) => { if (!params.enabled || !emoji) { return Promise.resolve(); } if (options?.immediate) { clearPendingDebounce(); return applyEmoji(emoji); } pendingEmoji = emoji; if (pendingTimer) { clearTimeout(pendingTimer); } pendingTimer = setTimeout(() => { pendingTimer = null; const emojiToApply = pendingEmoji; pendingEmoji = null; if (!emojiToApply || emojiToApply === activeEmoji) { return; } void applyEmoji(emojiToApply); }, DISCORD_STATUS_DEBOUNCE_MS); return Promise.resolve(); }; const scheduleStallTimers = () => { if (!params.enabled || finished) { return; } clearStallTimers(); softStallTimer = setTimeout(() => { if (finished) { return; } void requestEmoji(DISCORD_STATUS_STALL_SOFT_EMOJI, { immediate: true }); }, DISCORD_STATUS_STALL_SOFT_MS); hardStallTimer = setTimeout(() => { if (finished) { return; } void requestEmoji(DISCORD_STATUS_STALL_HARD_EMOJI, { immediate: true }); }, DISCORD_STATUS_STALL_HARD_MS); }; const setPhase = (emoji: string) => { if (!params.enabled || finished) { return Promise.resolve(); } scheduleStallTimers(); return requestEmoji(emoji); }; const setTerminal = async (emoji: string) => { if (!params.enabled) { return; } finished = true; clearStallTimers(); await requestEmoji(emoji, { immediate: true }); }; const clear = async () => { if (!params.enabled) { return; } finished = true; clearStallTimers(); clearPendingDebounce(); await enqueue(async () => { const cleanupCandidates = new Set([ params.initialEmoji, activeEmoji ?? "", DISCORD_STATUS_THINKING_EMOJI, DISCORD_STATUS_TOOL_EMOJI, DISCORD_STATUS_CODING_EMOJI, DISCORD_STATUS_WEB_EMOJI, DISCORD_STATUS_DONE_EMOJI, DISCORD_STATUS_ERROR_EMOJI, DISCORD_STATUS_STALL_SOFT_EMOJI, DISCORD_STATUS_STALL_HARD_EMOJI, ]); activeEmoji = null; for (const emoji of cleanupCandidates) { if (!emoji) { continue; } try { await removeReactionDiscord(params.channelId, params.messageId, emoji, { rest: params.rest as never, }); } catch (err) { logAckFailure({ log: logVerbose, channel: "discord", target: `${params.channelId}/${params.messageId}`, error: err, }); } } }); }; const restoreInitial = async () => { if (!params.enabled) { return; } finished = true; clearStallTimers(); clearPendingDebounce(); await requestEmoji(params.initialEmoji, { immediate: true }); }; return { setQueued: () => { scheduleStallTimers(); return requestEmoji(params.initialEmoji, { immediate: true }); }, setThinking: () => setPhase(DISCORD_STATUS_THINKING_EMOJI), setTool: (toolName?: string) => setPhase(resolveToolStatusEmoji(toolName)), setDone: () => setTerminal(DISCORD_STATUS_DONE_EMOJI), setError: () => setTerminal(DISCORD_STATUS_ERROR_EMOJI), clear, restoreInitial, }; } export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { const { cfg, discordConfig, accountId, token, runtime, guildHistories, historyLimit, mediaMaxBytes, textLimit, replyToMode, ackReactionScope, message, author, sender, data, client, channelInfo, channelName, messageChannelId, isGuildMessage, isDirectMessage, isGroupDm, baseText, messageText, shouldRequireMention, canDetectMention, effectiveWasMentioned, shouldBypassMention, threadChannel, threadParentId, threadParentName, threadParentType, threadName, displayChannelSlug, guildInfo, guildSlug, channelConfig, baseSessionKey, route, commandAuthorized, } = ctx; const mediaList = await resolveMediaList(message, mediaMaxBytes); const forwardedMediaList = await resolveForwardedMediaList(message, mediaMaxBytes); mediaList.push(...forwardedMediaList); const text = messageText; if (!text) { logVerbose(`discord: drop message ${message.id} (empty content)`); return; } const ackReaction = resolveAckReaction(cfg, route.agentId, { channel: "discord", accountId, }); const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; const shouldAckReaction = () => Boolean( ackReaction && shouldAckReactionGate({ scope: ackReactionScope, isDirect: isDirectMessage, isGroup: isGuildMessage || isGroupDm, isMentionableGroup: isGuildMessage, requireMention: Boolean(shouldRequireMention), canDetectMention, effectiveWasMentioned, shouldBypassMention, }), ); const statusReactionsEnabled = shouldAckReaction(); const statusReactions = createDiscordStatusReactionController({ enabled: statusReactionsEnabled, channelId: messageChannelId, messageId: message.id, initialEmoji: ackReaction, rest: client.rest, }); if (statusReactionsEnabled) { void statusReactions.setQueued(); } const fromLabel = isDirectMessage ? buildDirectLabel(author) : buildGuildLabel({ guild: data.guild ?? undefined, channelName: channelName ?? messageChannelId, channelId: messageChannelId, }); const senderLabel = sender.label; const isForumParent = threadParentType === ChannelType.GuildForum || threadParentType === ChannelType.GuildMedia; const forumParentSlug = isForumParent && threadParentName ? normalizeDiscordSlug(threadParentName) : ""; const threadChannelId = threadChannel?.id; const isForumStarter = Boolean(threadChannelId && isForumParent && forumParentSlug) && message.id === threadChannelId; const forumContextLine = isForumStarter ? `[Forum parent: #${forumParentSlug}]` : null; const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined; const groupSubject = isDirectMessage ? undefined : groupChannel; const untrustedChannelMetadata = isGuildMessage ? buildUntrustedChannelMetadata({ source: "discord", label: "Discord channel topic", entries: [channelInfo?.topic], }) : undefined; const senderName = sender.isPluralKit ? (sender.name ?? author.username) : (data.member?.nickname ?? author.globalName ?? author.username); const senderUsername = sender.isPluralKit ? (sender.tag ?? sender.name ?? author.username) : author.username; const senderTag = sender.tag; const systemPromptParts = [channelConfig?.systemPrompt?.trim() || null].filter( (entry): entry is string => Boolean(entry), ); const groupSystemPrompt = systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; const ownerAllowFrom = resolveDiscordOwnerAllowFrom({ channelConfig, guildInfo, sender: { id: sender.id, name: sender.name, tag: sender.tag }, }); const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId, }); const envelopeOptions = resolveEnvelopeFormatOptions(cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, sessionKey: route.sessionKey, }); let combinedBody = formatInboundEnvelope({ channel: "Discord", from: fromLabel, timestamp: resolveTimestampMs(message.timestamp), body: text, chatType: isDirectMessage ? "direct" : "channel", senderLabel, previousTimestamp, envelope: envelopeOptions, }); const shouldIncludeChannelHistory = !isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel); if (shouldIncludeChannelHistory) { combinedBody = buildPendingHistoryContextFromMap({ historyMap: guildHistories, historyKey: messageChannelId, limit: historyLimit, currentMessage: combinedBody, formatEntry: (entry) => formatInboundEnvelope({ channel: "Discord", from: fromLabel, timestamp: entry.timestamp, body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${messageChannelId}]`, chatType: "channel", senderLabel: entry.sender, envelope: envelopeOptions, }), }); } const replyContext = resolveReplyContext(message, resolveDiscordMessageText); if (forumContextLine) { combinedBody = `${combinedBody}\n${forumContextLine}`; } let threadStarterBody: string | undefined; let threadLabel: string | undefined; let parentSessionKey: string | undefined; if (threadChannel) { const includeThreadStarter = channelConfig?.includeThreadStarter !== false; if (includeThreadStarter) { const starter = await resolveDiscordThreadStarter({ channel: threadChannel, client, parentId: threadParentId, parentType: threadParentType, resolveTimestampMs, }); if (starter?.text) { // Keep thread starter as raw text; metadata is provided out-of-band in the system prompt. threadStarterBody = starter.text; } } const parentName = threadParentName ?? "parent"; threadLabel = threadName ? `Discord thread #${normalizeDiscordSlug(parentName)} › ${threadName}` : `Discord thread #${normalizeDiscordSlug(parentName)}`; if (threadParentId) { parentSessionKey = buildAgentSessionKey({ agentId: route.agentId, channel: route.channel, peer: { kind: "channel", id: threadParentId }, }); } } const mediaPayload = buildDiscordMediaPayload(mediaList); const threadKeys = resolveThreadSessionKeys({ baseSessionKey, threadId: threadChannel ? messageChannelId : undefined, parentSessionKey, useSuffix: false, }); const replyPlan = await resolveDiscordAutoThreadReplyPlan({ client, message, messageChannelId, isGuildMessage, channelConfig, threadChannel, baseText: baseText ?? "", combinedBody, replyToMode, agentId: route.agentId, channel: route.channel, }); const deliverTarget = replyPlan.deliverTarget; const replyTarget = replyPlan.replyTarget; const replyReference = replyPlan.replyReference; const autoThreadContext = replyPlan.autoThreadContext; const effectiveFrom = isDirectMessage ? `discord:${author.id}` : (autoThreadContext?.From ?? `discord:channel:${messageChannelId}`); const effectiveTo = autoThreadContext?.To ?? replyTarget; if (!effectiveTo) { runtime.error?.(danger("discord: missing reply target")); return; } const inboundHistory = shouldIncludeChannelHistory && historyLimit > 0 ? (guildHistories.get(messageChannelId) ?? []).map((entry) => ({ sender: entry.sender, body: entry.body, timestamp: entry.timestamp, })) : undefined; const ctxPayload = finalizeInboundContext({ Body: combinedBody, BodyForAgent: baseText ?? text, InboundHistory: inboundHistory, RawBody: baseText, CommandBody: baseText, From: effectiveFrom, To: effectiveTo, SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : "channel", ConversationLabel: fromLabel, SenderName: senderName, SenderId: sender.id, SenderUsername: senderUsername, SenderTag: senderTag, GroupSubject: groupSubject, GroupChannel: groupChannel, UntrustedContext: untrustedChannelMetadata ? [untrustedChannelMetadata] : undefined, GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined, GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined, OwnerAllowFrom: ownerAllowFrom, Provider: "discord" as const, Surface: "discord" as const, WasMentioned: effectiveWasMentioned, MessageSid: message.id, ReplyToId: replyContext?.id, ReplyToBody: replyContext?.body, ReplyToSender: replyContext?.sender, ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey, ThreadStarterBody: threadStarterBody, ThreadLabel: threadLabel, Timestamp: resolveTimestampMs(message.timestamp), ...mediaPayload, CommandAuthorized: commandAuthorized, CommandSource: "text" as const, // Originating channel for reply routing. OriginatingChannel: "discord" as const, OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget, }); await recordInboundSession({ storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey, ctx: ctxPayload, updateLastRoute: isDirectMessage ? { sessionKey: route.mainSessionKey, channel: "discord", to: `user:${author.id}`, accountId: route.accountId, } : undefined, onRecordError: (err) => { logVerbose(`discord: failed updating session meta: ${String(err)}`); }, }); if (shouldLogVerbose()) { const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n"); logVerbose( `discord inbound: channel=${messageChannelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`, ); } const typingChannelId = deliverTarget.startsWith("channel:") ? deliverTarget.slice("channel:".length) : messageChannelId; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, agentId: route.agentId, channel: "discord", accountId: route.accountId, }); const tableMode = resolveMarkdownTableMode({ cfg, channel: "discord", accountId, }); const typingCallbacks = createTypingCallbacks({ start: () => sendTyping({ client, channelId: typingChannelId }), onStartError: (err) => { logTypingFailure({ log: logVerbose, channel: "discord", target: typingChannelId, error: err, }); }, }); const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload: ReplyPayload) => { const replyToId = replyReference.use(); await deliverDiscordReply({ replies: [payload], target: deliverTarget, token, accountId, rest: client.rest, runtime, replyToId, textLimit, maxLinesPerMessage: discordConfig?.maxLinesPerMessage, tableMode, chunkMode: resolveChunkMode(cfg, "discord", accountId), }); replyReference.markSent(); }, onError: (err, info) => { runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: async () => { await typingCallbacks.onReplyStart(); await statusReactions.setThinking(); }, }); let dispatchResult: Awaited> | null = null; let dispatchError = false; try { dispatchResult = await dispatchInboundMessage({ ctx: ctxPayload, cfg, dispatcher, replyOptions: { ...replyOptions, skillFilter: channelConfig?.skills, disableBlockStreaming: typeof discordConfig?.blockStreaming === "boolean" ? !discordConfig.blockStreaming : undefined, onModelSelected, onReasoningStream: async () => { await statusReactions.setThinking(); }, onToolStart: async (payload) => { await statusReactions.setTool(payload.name); }, }, }); } catch (err) { dispatchError = true; throw err; } finally { markDispatchIdle(); if (statusReactionsEnabled) { if (dispatchError) { await statusReactions.setError(); } else { await statusReactions.setDone(); } if (removeAckAfterReply) { void (async () => { await sleep(dispatchError ? DISCORD_STATUS_ERROR_HOLD_MS : DISCORD_STATUS_DONE_HOLD_MS); await statusReactions.clear(); })(); } else { void statusReactions.restoreInitial(); } } } if (!dispatchResult?.queuedFinal) { if (isGuildMessage) { clearHistoryEntriesIfEnabled({ historyMap: guildHistories, historyKey: messageChannelId, limit: historyLimit, }); } return; } if (shouldLogVerbose()) { const finalCount = dispatchResult.counts.final; logVerbose( `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); } if (isGuildMessage) { clearHistoryEntriesIfEnabled({ historyMap: guildHistories, historyKey: messageChannelId, limit: historyLimit, }); } }