fix: normalize reply media paths

This commit is contained in:
Ayaan Zaidi
2026-03-07 10:09:10 +05:30
committed by Ayaan Zaidi
parent 15a5e39da2
commit 77ef672468
9 changed files with 347 additions and 34 deletions

View File

@@ -211,7 +211,7 @@ describe("block streaming", () => {
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0][0]).toMatchObject({
text: "Result",
mediaUrls: ["./image.png"],
mediaUrls: [path.join(home, "openclaw", "image.png")],
});
});
});

View File

@@ -45,6 +45,7 @@ import {
import { type BlockReplyPipeline } from "./block-reply-pipeline.js";
import type { FollowupRun } from "./queue.js";
import { createBlockReplyDeliveryHandler } from "./reply-delivery.js";
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
import type { TypingSignaler } from "./typing-mode.js";
export type RuntimeFallbackAttempt = {
@@ -106,6 +107,11 @@ export async function runAgentTurnWithFallback(params: {
const directlySentBlockKeys = new Set<string>();
const runId = params.opts?.runId ?? crypto.randomUUID();
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
cfg: params.followupRun.run.config,
sessionKey: params.sessionKey,
workspaceDir: params.followupRun.run.workspaceDir,
});
let didNotifyAgentRunStart = false;
const notifyAgentRunStart = () => {
if (didNotifyAgentRunStart) {
@@ -402,6 +408,7 @@ export async function runAgentTurnWithFallback(params: {
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
normalizeMediaPaths: normalizeReplyMediaPaths,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,

View File

@@ -10,8 +10,8 @@ const baseParams = {
};
describe("buildReplyPayloads media filter integration", () => {
it("strips media URL from payload when in messagingToolSentMediaUrls", () => {
const { replyPayloads } = buildReplyPayloads({
it("strips media URL from payload when in messagingToolSentMediaUrls", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello", mediaUrl: "file:///tmp/photo.jpg" }],
messagingToolSentMediaUrls: ["file:///tmp/photo.jpg"],
@@ -21,8 +21,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0].mediaUrl).toBeUndefined();
});
it("preserves media URL when not in messagingToolSentMediaUrls", () => {
const { replyPayloads } = buildReplyPayloads({
it("preserves media URL when not in messagingToolSentMediaUrls", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello", mediaUrl: "file:///tmp/photo.jpg" }],
messagingToolSentMediaUrls: ["file:///tmp/other.jpg"],
@@ -32,8 +32,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0].mediaUrl).toBe("file:///tmp/photo.jpg");
});
it("applies media filter after text filter", () => {
const { replyPayloads } = buildReplyPayloads({
it("applies media filter after text filter", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!", mediaUrl: "file:///tmp/photo.jpg" }],
messagingToolSentTexts: ["hello world!"],
@@ -44,8 +44,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("does not dedupe text for cross-target messaging sends", () => {
const { replyPayloads } = buildReplyPayloads({
it("does not dedupe text for cross-target messaging sends", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!" }],
messageProvider: "telegram",
@@ -58,8 +58,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0]?.text).toBe("hello world!");
});
it("does not dedupe media for cross-target messaging sends", () => {
const { replyPayloads } = buildReplyPayloads({
it("does not dedupe media for cross-target messaging sends", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "photo", mediaUrl: "file:///tmp/photo.jpg" }],
messageProvider: "telegram",
@@ -72,8 +72,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/photo.jpg");
});
it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", () => {
const { replyPayloads } = buildReplyPayloads({
it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!" }],
messageProvider: "heartbeat",
@@ -86,8 +86,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("suppresses same-target replies when message tool target provider is generic", () => {
const { replyPayloads } = buildReplyPayloads({
it("suppresses same-target replies when message tool target provider is generic", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!" }],
messageProvider: "heartbeat",
@@ -100,8 +100,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("suppresses same-target replies when target provider is channel alias", () => {
const { replyPayloads } = buildReplyPayloads({
it("suppresses same-target replies when target provider is channel alias", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!" }],
messageProvider: "heartbeat",
@@ -114,8 +114,8 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("does not suppress same-target replies when accountId differs", () => {
const { replyPayloads } = buildReplyPayloads({
it("does not suppress same-target replies when accountId differs", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [{ text: "hello world!" }],
messageProvider: "heartbeat",

View File

@@ -20,7 +20,7 @@ import {
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
export function buildReplyPayloads(params: {
export async function buildReplyPayloads(params: {
payloads: ReplyPayload[];
isHeartbeat: boolean;
didLogHeartbeatStrip: boolean;
@@ -40,7 +40,8 @@ export function buildReplyPayloads(params: {
originatingChannel?: OriginatingChannelType;
originatingTo?: string;
accountId?: string;
}): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } {
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
}): Promise<{ replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean }> {
let didLogHeartbeatStrip = params.didLogHeartbeatStrip;
const sanitizedPayloads = params.isHeartbeat
? params.payloads
@@ -66,22 +67,24 @@ export function buildReplyPayloads(params: {
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({
payloads: sanitizedPayloads,
replyToMode: params.replyToMode,
replyToChannel: params.replyToChannel,
currentMessageId: params.currentMessageId,
})
.map(
(payload) =>
normalizeReplyPayloadDirectives({
const replyTaggedPayloads = (
await Promise.all(
applyReplyThreading({
payloads: sanitizedPayloads,
replyToMode: params.replyToMode,
replyToChannel: params.replyToChannel,
currentMessageId: params.currentMessageId,
}).map(async (payload) => {
const parsed = normalizeReplyPayloadDirectives({
payload,
currentMessageId: params.currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
parseMode: "always",
}).payload,
}).payload;
return params.normalizeMediaPaths ? await params.normalizeMediaPaths(parsed) : parsed;
}),
)
.filter(isRenderablePayload);
).filter(isRenderablePayload);
// Drop final payloads only when block streaming succeeded end-to-end.
// If streaming aborted (e.g., timeout), fall back to final payloads.

View File

@@ -0,0 +1,130 @@
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { TemplateContext } from "../templating.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
const runEmbeddedPiAgentMock = vi.fn();
const runWithModelFallbackMock = vi.fn();
vi.mock("../../agents/model-fallback.js", () => ({
runWithModelFallback: (params: {
provider: string;
model: string;
run: (provider: string, model: string) => Promise<unknown>;
}) => runWithModelFallbackMock(params),
}));
vi.mock("../../agents/pi-embedded.js", async () => {
const actual = await vi.importActual<typeof import("../../agents/pi-embedded.js")>(
"../../agents/pi-embedded.js",
);
return {
...actual,
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
};
});
vi.mock("./queue.js", async () => {
const actual = await vi.importActual<typeof import("./queue.js")>("./queue.js");
return {
...actual,
enqueueFollowupRun: vi.fn(),
scheduleFollowupDrain: vi.fn(),
};
});
import { runReplyAgent } from "./agent-runner.js";
describe("runReplyAgent media path normalization", () => {
beforeEach(() => {
runEmbeddedPiAgentMock.mockReset();
runWithModelFallbackMock.mockReset();
runWithModelFallbackMock.mockImplementation(
async ({
provider,
model,
run,
}: {
provider: string;
model: string;
run: (...args: unknown[]) => Promise<unknown>;
}) => ({
result: await run(provider, model),
provider,
model,
}),
);
});
it("normalizes final MEDIA replies against the run workspace", async () => {
runEmbeddedPiAgentMock.mockResolvedValue({
payloads: [{ text: "MEDIA:./out/generated.png" }],
meta: {
agentMeta: {
sessionId: "session",
provider: "anthropic",
model: "claude",
},
},
});
const result = await runReplyAgent({
commandBody: "generate",
followupRun: {
prompt: "generate",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: "/tmp/agent",
sessionId: "session",
sessionKey: "main",
messageProvider: "telegram",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp/workspace",
config: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "off",
elevatedLevel: "off",
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
timeoutMs: 1_000,
blockReplyBreak: "message_end",
},
} as unknown as FollowupRun,
queueKey: "main",
resolvedQueue: { mode: "interrupt" } as QueueSettings,
shouldSteer: false,
shouldFollowup: false,
isActive: false,
isStreaming: false,
typing: createMockTypingController(),
sessionCtx: {
Provider: "telegram",
Surface: "telegram",
To: "chat-1",
OriginatingTo: "chat-1",
AccountId: "default",
MessageSid: "msg-1",
} as unknown as TemplateContext,
defaultModel: "anthropic/claude",
resolvedVerboseLevel: "off",
isNewSession: false,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
shouldInjectGroupIntro: false,
typingMode: "instant",
});
expect(result).toMatchObject({
mediaUrl: path.join("/tmp/workspace", "out", "generated.png"),
mediaUrls: [path.join("/tmp/workspace", "out", "generated.png")],
});
});
});

View File

@@ -52,6 +52,7 @@ import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-r
import { readPostCompactionContext } from "./post-compaction-context.js";
import { resolveActiveRunQueueAction } from "./queue-policy.js";
import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js";
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js";
import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js";
import { createTypingSignaler } from "./typing-mode.js";
@@ -154,6 +155,11 @@ export async function runReplyAgent(params: {
);
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
const cfg = followupRun.run.config;
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
cfg,
sessionKey,
workspaceDir: followupRun.run.workspaceDir,
});
const blockReplyCoalescing =
blockStreamingEnabled && opts?.onBlockReply
? resolveEffectiveBlockStreamingConfig({
@@ -475,7 +481,7 @@ export async function runReplyAgent(params: {
return finalizeWithFollowup(undefined, queueKey, runFollowupTurn);
}
const payloadResult = buildReplyPayloads({
const payloadResult = await buildReplyPayloads({
payloads: payloadArray,
isHeartbeat,
didLogHeartbeatStrip,
@@ -495,6 +501,7 @@ export async function runReplyAgent(params: {
to: sessionCtx.To,
}),
accountId: sessionCtx.AccountId,
normalizeMediaPaths: normalizeReplyMediaPaths,
});
const { replyPayloads } = payloadResult;
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;

View File

@@ -65,6 +65,7 @@ export function createBlockReplyDeliveryHandler(params: {
currentMessageId?: string;
normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean };
applyReplyToMode: (payload: ReplyPayload) => ReplyPayload;
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
typingSignals: TypingSignaler;
blockStreamingEnabled: boolean;
blockReplyPipeline: BlockReplyPipeline | null;
@@ -101,7 +102,10 @@ export function createBlockReplyDeliveryHandler(params: {
parseMode: "auto",
});
const blockPayload = params.applyReplyToMode(normalized.payload);
const mediaNormalizedPayload = params.normalizeMediaPaths
? await params.normalizeMediaPaths(normalized.payload)
: normalized.payload;
const blockPayload = params.applyReplyToMode(mediaNormalizedPayload);
const blockHasMedia = hasRenderableMedia(blockPayload);
// Skip empty payloads unless they have audioAsVoice flag (need to track it).

View File

@@ -0,0 +1,57 @@
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
const ensureSandboxWorkspaceForSession = vi.hoisted(() => vi.fn());
vi.mock("../../agents/sandbox.js", () => ({
ensureSandboxWorkspaceForSession,
}));
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
describe("createReplyMediaPathNormalizer", () => {
beforeEach(() => {
ensureSandboxWorkspaceForSession.mockReset().mockResolvedValue(null);
});
it("resolves workspace-relative media against the agent workspace", async () => {
const normalize = createReplyMediaPathNormalizer({
cfg: {},
sessionKey: "session-key",
workspaceDir: "/tmp/agent-workspace",
});
const result = await normalize({
mediaUrls: ["./out/photo.png"],
});
expect(result).toMatchObject({
mediaUrl: path.join("/tmp/agent-workspace", "out", "photo.png"),
mediaUrls: [path.join("/tmp/agent-workspace", "out", "photo.png")],
});
});
it("maps sandbox-relative media back to the host sandbox workspace", async () => {
ensureSandboxWorkspaceForSession.mockResolvedValue({
workspaceDir: "/tmp/sandboxes/session-1",
containerWorkdir: "/workspace",
});
const normalize = createReplyMediaPathNormalizer({
cfg: {},
sessionKey: "session-key",
workspaceDir: "/tmp/agent-workspace",
});
const result = await normalize({
mediaUrls: ["./out/photo.png", "file:///workspace/screens/final.png"],
});
expect(result).toMatchObject({
mediaUrl: path.join("/tmp/sandboxes/session-1", "out", "photo.png"),
mediaUrls: [
path.join("/tmp/sandboxes/session-1", "out", "photo.png"),
path.join("/tmp/sandboxes/session-1", "screens", "final.png"),
],
});
});
});

View File

@@ -0,0 +1,105 @@
import { resolvePathFromInput } from "../../agents/path-policy.js";
import { assertMediaNotDataUrl, resolveSandboxedMediaSource } from "../../agents/sandbox-paths.js";
import { ensureSandboxWorkspaceForSession } from "../../agents/sandbox.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { ReplyPayload } from "../types.js";
const HTTP_URL_RE = /^https?:\/\//i;
const FILE_URL_RE = /^file:\/\//i;
const WINDOWS_DRIVE_RE = /^[a-zA-Z]:[\\/]/;
const SCHEME_RE = /^[a-zA-Z][a-zA-Z0-9+.-]*:/;
const HAS_FILE_EXT_RE = /\.\w{1,10}$/;
function isLikelyLocalMediaSource(media: string): boolean {
return (
FILE_URL_RE.test(media) ||
media.startsWith("/") ||
media.startsWith("./") ||
media.startsWith("../") ||
media.startsWith("~") ||
WINDOWS_DRIVE_RE.test(media) ||
media.startsWith("\\\\") ||
(!SCHEME_RE.test(media) &&
(media.includes("/") || media.includes("\\") || HAS_FILE_EXT_RE.test(media)))
);
}
function getPayloadMediaList(payload: ReplyPayload): string[] {
return payload.mediaUrls?.length ? payload.mediaUrls : payload.mediaUrl ? [payload.mediaUrl] : [];
}
export function createReplyMediaPathNormalizer(params: {
cfg: OpenClawConfig;
sessionKey?: string;
workspaceDir: string;
}): (payload: ReplyPayload) => Promise<ReplyPayload> {
let sandboxRootPromise: Promise<string | undefined> | undefined;
const resolveSandboxRoot = async (): Promise<string | undefined> => {
if (!sandboxRootPromise) {
sandboxRootPromise = ensureSandboxWorkspaceForSession({
config: params.cfg,
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
}).then((sandbox) => sandbox?.workspaceDir);
}
return await sandboxRootPromise;
};
const normalizeMediaSource = async (raw: string): Promise<string> => {
const media = raw.trim();
if (!media) {
return media;
}
assertMediaNotDataUrl(media);
if (HTTP_URL_RE.test(media)) {
return media;
}
const sandboxRoot = await resolveSandboxRoot();
if (sandboxRoot) {
return await resolveSandboxedMediaSource({
media,
sandboxRoot,
});
}
if (!isLikelyLocalMediaSource(media)) {
return media;
}
if (FILE_URL_RE.test(media)) {
return media;
}
return resolvePathFromInput(media, params.workspaceDir);
};
return async (payload) => {
const mediaList = getPayloadMediaList(payload);
if (mediaList.length === 0) {
return payload;
}
const normalizedMedia: string[] = [];
const seen = new Set<string>();
for (const media of mediaList) {
const normalized = await normalizeMediaSource(media);
if (!normalized || seen.has(normalized)) {
continue;
}
seen.add(normalized);
normalizedMedia.push(normalized);
}
if (normalizedMedia.length === 0) {
return {
...payload,
mediaUrl: undefined,
mediaUrls: undefined,
};
}
return {
...payload,
mediaUrl: normalizedMedia[0],
mediaUrls: normalizedMedia,
};
};
}