Files
openclaw/src/auto-reply/reply/agent-runner.ts
Yaroslav Boiko 838259331f fix(discord): add media dedup production code for messaging tool pipeline
Wire media URL tracking through the embedded agent pipeline so that
media already sent via messaging tools is not delivered again by the
reply dispatcher.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 23:51:51 +01:00

572 lines
18 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import crypto from "node:crypto";
import fs from "node:fs";
import type { TypingMode } from "../../config/types.js";
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { TypingController } from "./typing.js";
import { lookupContextTokens } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded.js";
import { hasNonzeroUsage } from "../../agents/usage.js";
import {
resolveAgentIdFromSessionKey,
resolveSessionFilePath,
resolveSessionTranscriptPath,
type SessionEntry,
updateSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions.js";
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import { defaultRuntime } from "../../runtime.js";
import { estimateUsageCost, resolveModelCostConfig } from "../../utils/usage-format.js";
import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js";
import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
import {
createShouldEmitToolOutput,
createShouldEmitToolResult,
finalizeWithFollowup,
isAudioPayload,
signalTypingIfNeeded,
} from "./agent-runner-helpers.js";
import { runMemoryFlushIfNeeded } from "./agent-runner-memory.js";
import { buildReplyPayloads } from "./agent-runner-payloads.js";
import { appendUsageLine, formatResponseUsageLine } from "./agent-runner-utils.js";
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveBlockStreamingCoalescing } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
import { createTypingSignaler } from "./typing-mode.js";
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
const UNSCHEDULED_REMINDER_NOTE =
"Note: I did not schedule a reminder in this turn, so this will not trigger automatically.";
const REMINDER_COMMITMENT_PATTERNS: RegExp[] = [
/\b(?:i\s*[']?ll|i will)\s+(?:make sure to\s+)?(?:remember|remind|ping|follow up|follow-up|check back|circle back)\b/i,
/\b(?:i\s*[']?ll|i will)\s+(?:set|create|schedule)\s+(?:a\s+)?reminder\b/i,
];
function hasUnbackedReminderCommitment(text: string): boolean {
const normalized = text.toLowerCase();
if (!normalized.trim()) {
return false;
}
if (normalized.includes(UNSCHEDULED_REMINDER_NOTE.toLowerCase())) {
return false;
}
return REMINDER_COMMITMENT_PATTERNS.some((pattern) => pattern.test(text));
}
function appendUnscheduledReminderNote(payloads: ReplyPayload[]): ReplyPayload[] {
let appended = false;
return payloads.map((payload) => {
if (appended || payload.isError || typeof payload.text !== "string") {
return payload;
}
if (!hasUnbackedReminderCommitment(payload.text)) {
return payload;
}
appended = true;
const trimmed = payload.text.trimEnd();
return {
...payload,
text: `${trimmed}\n\n${UNSCHEDULED_REMINDER_NOTE}`,
};
});
}
export async function runReplyAgent(params: {
commandBody: string;
followupRun: FollowupRun;
queueKey: string;
resolvedQueue: QueueSettings;
shouldSteer: boolean;
shouldFollowup: boolean;
isActive: boolean;
isStreaming: boolean;
opts?: GetReplyOptions;
typing: TypingController;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey?: string;
storePath?: string;
defaultModel: string;
agentCfgContextTokens?: number;
resolvedVerboseLevel: VerboseLevel;
isNewSession: boolean;
blockStreamingEnabled: boolean;
blockReplyChunking?: {
minChars: number;
maxChars: number;
breakPreference: "paragraph" | "newline" | "sentence";
flushOnParagraph?: boolean;
};
resolvedBlockStreamingBreak: "text_end" | "message_end";
sessionCtx: TemplateContext;
shouldInjectGroupIntro: boolean;
typingMode: TypingMode;
}): Promise<ReplyPayload | ReplyPayload[] | undefined> {
const {
commandBody,
followupRun,
queueKey,
resolvedQueue,
shouldSteer,
shouldFollowup,
isActive,
isStreaming,
opts,
typing,
sessionEntry,
sessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
isNewSession,
blockStreamingEnabled,
blockReplyChunking,
resolvedBlockStreamingBreak,
sessionCtx,
shouldInjectGroupIntro,
typingMode,
} = params;
let activeSessionEntry = sessionEntry;
const activeSessionStore = sessionStore;
let activeIsNewSession = isNewSession;
const isHeartbeat = opts?.isHeartbeat === true;
const typingSignals = createTypingSignaler({
typing,
mode: typingMode,
isHeartbeat,
});
const shouldEmitToolResult = createShouldEmitToolResult({
sessionKey,
storePath,
resolvedVerboseLevel,
});
const shouldEmitToolOutput = createShouldEmitToolOutput({
sessionKey,
storePath,
resolvedVerboseLevel,
});
const pendingToolTasks = new Set<Promise<void>>();
const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS;
const replyToChannel =
sessionCtx.OriginatingChannel ??
((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as
| OriginatingChannelType
| undefined);
const replyToMode = resolveReplyToMode(
followupRun.run.config,
replyToChannel,
sessionCtx.AccountId,
sessionCtx.ChatType,
);
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
const cfg = followupRun.run.config;
const blockReplyCoalescing =
blockStreamingEnabled && opts?.onBlockReply
? resolveBlockStreamingCoalescing(
cfg,
sessionCtx.Provider,
sessionCtx.AccountId,
blockReplyChunking,
)
: undefined;
const blockReplyPipeline =
blockStreamingEnabled && opts?.onBlockReply
? createBlockReplyPipeline({
onBlockReply: opts.onBlockReply,
timeoutMs: blockReplyTimeoutMs,
coalescing: blockReplyCoalescing,
buffer: createAudioAsVoiceBuffer({ isAudioPayload }),
})
: null;
const touchActiveSessionEntry = async () => {
if (!activeSessionEntry || !activeSessionStore || !sessionKey) {
return;
}
const updatedAt = Date.now();
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({ updatedAt }),
});
}
};
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(followupRun.run.sessionId, followupRun.prompt);
if (steered && !shouldFollowup) {
await touchActiveSessionEntry();
typing.cleanup();
return undefined;
}
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
await touchActiveSessionEntry();
typing.cleanup();
return undefined;
}
await typingSignals.signalRunStart();
activeSessionEntry = await runMemoryFlushIfNeeded({
cfg,
followupRun,
sessionCtx,
opts,
defaultModel,
agentCfgContextTokens,
resolvedVerboseLevel,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
isHeartbeat,
});
const runFollowupTurn = createFollowupRunner({
opts,
typing,
typingMode,
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
defaultModel,
agentCfgContextTokens,
});
let responseUsageLine: string | undefined;
type SessionResetOptions = {
failureLabel: string;
buildLogMessage: (nextSessionId: string) => string;
cleanupTranscripts?: boolean;
};
const resetSession = async ({
failureLabel,
buildLogMessage,
cleanupTranscripts,
}: SessionResetOptions): Promise<boolean> => {
if (!sessionKey || !activeSessionStore || !storePath) {
return false;
}
const prevEntry = activeSessionStore[sessionKey] ?? activeSessionEntry;
if (!prevEntry) {
return false;
}
const prevSessionId = cleanupTranscripts ? prevEntry.sessionId : undefined;
const nextSessionId = crypto.randomUUID();
const nextEntry: SessionEntry = {
...prevEntry,
sessionId: nextSessionId,
updatedAt: Date.now(),
systemSent: false,
abortedLastRun: false,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(
nextSessionId,
agentId,
sessionCtx.MessageThreadId,
);
nextEntry.sessionFile = nextSessionFile;
activeSessionStore[sessionKey] = nextEntry;
try {
await updateSessionStore(storePath, (store) => {
store[sessionKey] = nextEntry;
});
} catch (err) {
defaultRuntime.error(
`Failed to persist session reset after ${failureLabel} (${sessionKey}): ${String(err)}`,
);
}
followupRun.run.sessionId = nextSessionId;
followupRun.run.sessionFile = nextSessionFile;
activeSessionEntry = nextEntry;
activeIsNewSession = true;
defaultRuntime.error(buildLogMessage(nextSessionId));
if (cleanupTranscripts && prevSessionId) {
const transcriptCandidates = new Set<string>();
const resolved = resolveSessionFilePath(prevSessionId, prevEntry, { agentId });
if (resolved) {
transcriptCandidates.add(resolved);
}
transcriptCandidates.add(resolveSessionTranscriptPath(prevSessionId, agentId));
for (const candidate of transcriptCandidates) {
try {
fs.unlinkSync(candidate);
} catch {
// Best-effort cleanup.
}
}
}
return true;
};
const resetSessionAfterCompactionFailure = async (reason: string): Promise<boolean> =>
resetSession({
failureLabel: "compaction failure",
buildLogMessage: (nextSessionId) =>
`Auto-compaction failed (${reason}). Restarting session ${sessionKey} -> ${nextSessionId} and retrying.`,
});
const resetSessionAfterRoleOrderingConflict = async (reason: string): Promise<boolean> =>
resetSession({
failureLabel: "role ordering conflict",
buildLogMessage: (nextSessionId) =>
`Role ordering conflict (${reason}). Restarting session ${sessionKey} -> ${nextSessionId}.`,
cleanupTranscripts: true,
});
try {
const runStartedAt = Date.now();
const runOutcome = await runAgentTurnWithFallback({
commandBody,
followupRun,
sessionCtx,
opts,
typingSignals,
blockReplyPipeline,
blockStreamingEnabled,
blockReplyChunking,
resolvedBlockStreamingBreak,
applyReplyToMode,
shouldEmitToolResult,
shouldEmitToolOutput,
pendingToolTasks,
resetSessionAfterCompactionFailure,
resetSessionAfterRoleOrderingConflict,
isHeartbeat,
sessionKey,
getActiveSessionEntry: () => activeSessionEntry,
activeSessionStore,
storePath,
resolvedVerboseLevel,
});
if (runOutcome.kind === "final") {
return finalizeWithFollowup(runOutcome.payload, queueKey, runFollowupTurn);
}
const { runResult, fallbackProvider, fallbackModel, directlySentBlockKeys } = runOutcome;
let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome;
if (
shouldInjectGroupIntro &&
activeSessionEntry &&
activeSessionStore &&
sessionKey &&
activeSessionEntry.groupActivationNeedsSystemIntro
) {
const updatedAt = Date.now();
activeSessionEntry.groupActivationNeedsSystemIntro = false;
activeSessionEntry.updatedAt = updatedAt;
activeSessionStore[sessionKey] = activeSessionEntry;
if (storePath) {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({
groupActivationNeedsSystemIntro: false,
updatedAt,
}),
});
}
}
const payloadArray = runResult.payloads ?? [];
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();
}
if (pendingToolTasks.size > 0) {
await Promise.allSettled(pendingToolTasks);
}
const usage = runResult.meta.agentMeta?.usage;
const promptTokens = runResult.meta.agentMeta?.promptTokens;
const modelUsed = runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel;
const providerUsed =
runResult.meta.agentMeta?.provider ?? fallbackProvider ?? followupRun.run.provider;
const cliSessionId = isCliProvider(providerUsed, cfg)
? runResult.meta.agentMeta?.sessionId?.trim()
: undefined;
const contextTokensUsed =
agentCfgContextTokens ??
lookupContextTokens(modelUsed) ??
activeSessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
await persistRunSessionUsage({
storePath,
sessionKey,
usage,
lastCallUsage: runResult.meta.agentMeta?.lastCallUsage,
promptTokens,
modelUsed,
providerUsed,
contextTokensUsed,
systemPromptReport: runResult.meta.systemPromptReport,
cliSessionId,
});
// Drain any late tool/block deliveries before deciding there's "nothing to send".
// Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and
// keep the typing indicator stuck.
if (payloadArray.length === 0) {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
}
const payloadResult = buildReplyPayloads({
payloads: payloadArray,
isHeartbeat,
didLogHeartbeatStrip,
blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
replyToMode,
replyToChannel,
currentMessageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,
messageProvider: followupRun.run.messageProvider,
messagingToolSentTexts: runResult.messagingToolSentTexts,
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,
messagingToolSentTargets: runResult.messagingToolSentTargets,
originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To,
accountId: sessionCtx.AccountId,
});
const { replyPayloads } = payloadResult;
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
if (replyPayloads.length === 0) {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
}
const successfulCronAdds = runResult.successfulCronAdds ?? 0;
const hasReminderCommitment = replyPayloads.some(
(payload) =>
!payload.isError &&
typeof payload.text === "string" &&
hasUnbackedReminderCommitment(payload.text),
);
const guardedReplyPayloads =
hasReminderCommitment && successfulCronAdds === 0
? appendUnscheduledReminderNote(replyPayloads)
: replyPayloads;
await signalTypingIfNeeded(guardedReplyPayloads, typingSignals);
if (isDiagnosticsEnabled(cfg) && hasNonzeroUsage(usage)) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const cacheRead = usage.cacheRead ?? 0;
const cacheWrite = usage.cacheWrite ?? 0;
const promptTokens = input + cacheRead + cacheWrite;
const totalTokens = usage.total ?? promptTokens + output;
const costConfig = resolveModelCostConfig({
provider: providerUsed,
model: modelUsed,
config: cfg,
});
const costUsd = estimateUsageCost({ usage, cost: costConfig });
emitDiagnosticEvent({
type: "model.usage",
sessionKey,
sessionId: followupRun.run.sessionId,
channel: replyToChannel,
provider: providerUsed,
model: modelUsed,
usage: {
input,
output,
cacheRead,
cacheWrite,
promptTokens,
total: totalTokens,
},
lastCallUsage: runResult.meta.agentMeta?.lastCallUsage,
context: {
limit: contextTokensUsed,
used: totalTokens,
},
costUsd,
durationMs: Date.now() - runStartedAt,
});
}
const responseUsageRaw =
activeSessionEntry?.responseUsage ??
(sessionKey ? activeSessionStore?.[sessionKey]?.responseUsage : undefined);
const responseUsageMode = resolveResponseUsageMode(responseUsageRaw);
if (responseUsageMode !== "off" && hasNonzeroUsage(usage)) {
const authMode = resolveModelAuthMode(providerUsed, cfg);
const showCost = authMode === "api-key";
const costConfig = showCost
? resolveModelCostConfig({
provider: providerUsed,
model: modelUsed,
config: cfg,
})
: undefined;
let formatted = formatResponseUsageLine({
usage,
showCost,
costConfig,
});
if (formatted && responseUsageMode === "full" && sessionKey) {
formatted = `${formatted} · session ${sessionKey}`;
}
if (formatted) {
responseUsageLine = formatted;
}
}
// If verbose is enabled and this is a new session, prepend a session hint.
let finalPayloads = guardedReplyPayloads;
const verboseEnabled = resolvedVerboseLevel !== "off";
if (autoCompactionCompleted) {
const count = await incrementRunCompactionCount({
sessionEntry: activeSessionEntry,
sessionStore: activeSessionStore,
sessionKey,
storePath,
lastCallUsage: runResult.meta.agentMeta?.lastCallUsage,
contextTokensUsed,
});
if (verboseEnabled) {
const suffix = typeof count === "number" ? ` (count ${count})` : "";
finalPayloads = [{ text: `🧹 Auto-compaction complete${suffix}.` }, ...finalPayloads];
}
}
if (verboseEnabled && activeIsNewSession) {
finalPayloads = [{ text: `🧭 New session: ${followupRun.run.sessionId}` }, ...finalPayloads];
}
if (responseUsageLine) {
finalPayloads = appendUsageLine(finalPayloads, responseUsageLine);
}
return finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
queueKey,
runFollowupTurn,
);
} finally {
blockReplyPipeline?.stop();
typing.markRunComplete();
}
}