From 77ef6724683087548279bb7e0ebf6b47464260fe Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 7 Mar 2026 10:09:10 +0530 Subject: [PATCH] fix: normalize reply media paths --- src/auto-reply/reply.block-streaming.test.ts | 2 +- .../reply/agent-runner-execution.ts | 7 + .../reply/agent-runner-payloads.test.ts | 36 ++--- src/auto-reply/reply/agent-runner-payloads.ts | 29 ++-- .../reply/agent-runner.media-paths.test.ts | 130 ++++++++++++++++++ src/auto-reply/reply/agent-runner.ts | 9 +- src/auto-reply/reply/reply-delivery.ts | 6 +- .../reply/reply-media-paths.test.ts | 57 ++++++++ src/auto-reply/reply/reply-media-paths.ts | 105 ++++++++++++++ 9 files changed, 347 insertions(+), 34 deletions(-) create mode 100644 src/auto-reply/reply/agent-runner.media-paths.test.ts create mode 100644 src/auto-reply/reply/reply-media-paths.test.ts create mode 100644 src/auto-reply/reply/reply-media-paths.ts diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index 0ac2574fc..456b8a40f 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -211,7 +211,7 @@ describe("block streaming", () => { expect(onBlockReply).toHaveBeenCalledTimes(1); expect(onBlockReply.mock.calls[0][0]).toMatchObject({ text: "Result", - mediaUrls: ["./image.png"], + mediaUrls: [path.join(home, "openclaw", "image.png")], }); }); }); diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 524934ad4..6748e3cbe 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -45,6 +45,7 @@ import { import { type BlockReplyPipeline } from "./block-reply-pipeline.js"; import type { FollowupRun } from "./queue.js"; import { createBlockReplyDeliveryHandler } from "./reply-delivery.js"; +import { createReplyMediaPathNormalizer } from "./reply-media-paths.js"; import type { TypingSignaler } from "./typing-mode.js"; export type RuntimeFallbackAttempt = { @@ -106,6 +107,11 @@ export async function runAgentTurnWithFallback(params: { const directlySentBlockKeys = new Set(); const runId = params.opts?.runId ?? crypto.randomUUID(); + const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({ + cfg: params.followupRun.run.config, + sessionKey: params.sessionKey, + workspaceDir: params.followupRun.run.workspaceDir, + }); let didNotifyAgentRunStart = false; const notifyAgentRunStart = () => { if (didNotifyAgentRunStart) { @@ -402,6 +408,7 @@ export async function runAgentTurnWithFallback(params: { params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, normalizeStreamingText, applyReplyToMode: params.applyReplyToMode, + normalizeMediaPaths: normalizeReplyMediaPaths, typingSignals: params.typingSignals, blockStreamingEnabled: params.blockStreamingEnabled, blockReplyPipeline, diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 138efd8e4..23b898ce3 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -10,8 +10,8 @@ const baseParams = { }; describe("buildReplyPayloads media filter integration", () => { - it("strips media URL from payload when in messagingToolSentMediaUrls", () => { - const { replyPayloads } = buildReplyPayloads({ + it("strips media URL from payload when in messagingToolSentMediaUrls", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello", mediaUrl: "file:///tmp/photo.jpg" }], messagingToolSentMediaUrls: ["file:///tmp/photo.jpg"], @@ -21,8 +21,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0].mediaUrl).toBeUndefined(); }); - it("preserves media URL when not in messagingToolSentMediaUrls", () => { - const { replyPayloads } = buildReplyPayloads({ + it("preserves media URL when not in messagingToolSentMediaUrls", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello", mediaUrl: "file:///tmp/photo.jpg" }], messagingToolSentMediaUrls: ["file:///tmp/other.jpg"], @@ -32,8 +32,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0].mediaUrl).toBe("file:///tmp/photo.jpg"); }); - it("applies media filter after text filter", () => { - const { replyPayloads } = buildReplyPayloads({ + it("applies media filter after text filter", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!", mediaUrl: "file:///tmp/photo.jpg" }], messagingToolSentTexts: ["hello world!"], @@ -44,8 +44,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); - it("does not dedupe text for cross-target messaging sends", () => { - const { replyPayloads } = buildReplyPayloads({ + it("does not dedupe text for cross-target messaging sends", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], messageProvider: "telegram", @@ -58,8 +58,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0]?.text).toBe("hello world!"); }); - it("does not dedupe media for cross-target messaging sends", () => { - const { replyPayloads } = buildReplyPayloads({ + it("does not dedupe media for cross-target messaging sends", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "photo", mediaUrl: "file:///tmp/photo.jpg" }], messageProvider: "telegram", @@ -72,8 +72,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/photo.jpg"); }); - it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", () => { - const { replyPayloads } = buildReplyPayloads({ + it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], messageProvider: "heartbeat", @@ -86,8 +86,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); - it("suppresses same-target replies when message tool target provider is generic", () => { - const { replyPayloads } = buildReplyPayloads({ + it("suppresses same-target replies when message tool target provider is generic", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], messageProvider: "heartbeat", @@ -100,8 +100,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); - it("suppresses same-target replies when target provider is channel alias", () => { - const { replyPayloads } = buildReplyPayloads({ + it("suppresses same-target replies when target provider is channel alias", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], messageProvider: "heartbeat", @@ -114,8 +114,8 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); - it("does not suppress same-target replies when accountId differs", () => { - const { replyPayloads } = buildReplyPayloads({ + it("does not suppress same-target replies when accountId differs", async () => { + const { replyPayloads } = await buildReplyPayloads({ ...baseParams, payloads: [{ text: "hello world!" }], messageProvider: "heartbeat", diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 38737171c..5bca8d19e 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -20,7 +20,7 @@ import { shouldSuppressMessagingToolReplies, } from "./reply-payloads.js"; -export function buildReplyPayloads(params: { +export async function buildReplyPayloads(params: { payloads: ReplyPayload[]; isHeartbeat: boolean; didLogHeartbeatStrip: boolean; @@ -40,7 +40,8 @@ export function buildReplyPayloads(params: { originatingChannel?: OriginatingChannelType; originatingTo?: string; accountId?: string; -}): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } { + normalizeMediaPaths?: (payload: ReplyPayload) => Promise; +}): Promise<{ replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean }> { let didLogHeartbeatStrip = params.didLogHeartbeatStrip; const sanitizedPayloads = params.isHeartbeat ? params.payloads @@ -66,22 +67,24 @@ export function buildReplyPayloads(params: { return [{ ...payload, text: stripped.text }]; }); - const replyTaggedPayloads: ReplyPayload[] = applyReplyThreading({ - payloads: sanitizedPayloads, - replyToMode: params.replyToMode, - replyToChannel: params.replyToChannel, - currentMessageId: params.currentMessageId, - }) - .map( - (payload) => - normalizeReplyPayloadDirectives({ + const replyTaggedPayloads = ( + await Promise.all( + applyReplyThreading({ + payloads: sanitizedPayloads, + replyToMode: params.replyToMode, + replyToChannel: params.replyToChannel, + currentMessageId: params.currentMessageId, + }).map(async (payload) => { + const parsed = normalizeReplyPayloadDirectives({ payload, currentMessageId: params.currentMessageId, silentToken: SILENT_REPLY_TOKEN, parseMode: "always", - }).payload, + }).payload; + return params.normalizeMediaPaths ? await params.normalizeMediaPaths(parsed) : parsed; + }), ) - .filter(isRenderablePayload); + ).filter(isRenderablePayload); // Drop final payloads only when block streaming succeeded end-to-end. // If streaming aborted (e.g., timeout), fall back to final payloads. diff --git a/src/auto-reply/reply/agent-runner.media-paths.test.ts b/src/auto-reply/reply/agent-runner.media-paths.test.ts new file mode 100644 index 000000000..f5658287a --- /dev/null +++ b/src/auto-reply/reply/agent-runner.media-paths.test.ts @@ -0,0 +1,130 @@ +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { TemplateContext } from "../templating.js"; +import type { FollowupRun, QueueSettings } from "./queue.js"; +import { createMockTypingController } from "./test-helpers.js"; + +const runEmbeddedPiAgentMock = vi.fn(); +const runWithModelFallbackMock = vi.fn(); + +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: (params: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => runWithModelFallbackMock(params), +})); + +vi.mock("../../agents/pi-embedded.js", async () => { + const actual = await vi.importActual( + "../../agents/pi-embedded.js", + ); + return { + ...actual, + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), + }; +}); + +vi.mock("./queue.js", async () => { + const actual = await vi.importActual("./queue.js"); + return { + ...actual, + enqueueFollowupRun: vi.fn(), + scheduleFollowupDrain: vi.fn(), + }; +}); + +import { runReplyAgent } from "./agent-runner.js"; + +describe("runReplyAgent media path normalization", () => { + beforeEach(() => { + runEmbeddedPiAgentMock.mockReset(); + runWithModelFallbackMock.mockReset(); + runWithModelFallbackMock.mockImplementation( + async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (...args: unknown[]) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + }), + ); + }); + + it("normalizes final MEDIA replies against the run workspace", async () => { + runEmbeddedPiAgentMock.mockResolvedValue({ + payloads: [{ text: "MEDIA:./out/generated.png" }], + meta: { + agentMeta: { + sessionId: "session", + provider: "anthropic", + model: "claude", + }, + }, + }); + + const result = await runReplyAgent({ + commandBody: "generate", + followupRun: { + prompt: "generate", + enqueuedAt: Date.now(), + run: { + agentId: "main", + agentDir: "/tmp/agent", + sessionId: "session", + sessionKey: "main", + messageProvider: "telegram", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp/workspace", + config: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + timeoutMs: 1_000, + blockReplyBreak: "message_end", + }, + } as unknown as FollowupRun, + queueKey: "main", + resolvedQueue: { mode: "interrupt" } as QueueSettings, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing: createMockTypingController(), + sessionCtx: { + Provider: "telegram", + Surface: "telegram", + To: "chat-1", + OriginatingTo: "chat-1", + AccountId: "default", + MessageSid: "msg-1", + } as unknown as TemplateContext, + defaultModel: "anthropic/claude", + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(result).toMatchObject({ + mediaUrl: path.join("/tmp/workspace", "out", "generated.png"), + mediaUrls: [path.join("/tmp/workspace", "out", "generated.png")], + }); + }); +}); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 8b126382d..b6dcd7dcd 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -52,6 +52,7 @@ import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-r import { readPostCompactionContext } from "./post-compaction-context.js"; import { resolveActiveRunQueueAction } from "./queue-policy.js"; import { enqueueFollowupRun, type FollowupRun, type QueueSettings } from "./queue.js"; +import { createReplyMediaPathNormalizer } from "./reply-media-paths.js"; import { createReplyToModeFilterForChannel, resolveReplyToMode } from "./reply-threading.js"; import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-run-accounting.js"; import { createTypingSignaler } from "./typing-mode.js"; @@ -154,6 +155,11 @@ export async function runReplyAgent(params: { ); const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel); const cfg = followupRun.run.config; + const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({ + cfg, + sessionKey, + workspaceDir: followupRun.run.workspaceDir, + }); const blockReplyCoalescing = blockStreamingEnabled && opts?.onBlockReply ? resolveEffectiveBlockStreamingConfig({ @@ -475,7 +481,7 @@ export async function runReplyAgent(params: { return finalizeWithFollowup(undefined, queueKey, runFollowupTurn); } - const payloadResult = buildReplyPayloads({ + const payloadResult = await buildReplyPayloads({ payloads: payloadArray, isHeartbeat, didLogHeartbeatStrip, @@ -495,6 +501,7 @@ export async function runReplyAgent(params: { to: sessionCtx.To, }), accountId: sessionCtx.AccountId, + normalizeMediaPaths: normalizeReplyMediaPaths, }); const { replyPayloads } = payloadResult; didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip; diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index 78930c708..acf04e73a 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -65,6 +65,7 @@ export function createBlockReplyDeliveryHandler(params: { currentMessageId?: string; normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean }; applyReplyToMode: (payload: ReplyPayload) => ReplyPayload; + normalizeMediaPaths?: (payload: ReplyPayload) => Promise; typingSignals: TypingSignaler; blockStreamingEnabled: boolean; blockReplyPipeline: BlockReplyPipeline | null; @@ -101,7 +102,10 @@ export function createBlockReplyDeliveryHandler(params: { parseMode: "auto", }); - const blockPayload = params.applyReplyToMode(normalized.payload); + const mediaNormalizedPayload = params.normalizeMediaPaths + ? await params.normalizeMediaPaths(normalized.payload) + : normalized.payload; + const blockPayload = params.applyReplyToMode(mediaNormalizedPayload); const blockHasMedia = hasRenderableMedia(blockPayload); // Skip empty payloads unless they have audioAsVoice flag (need to track it). diff --git a/src/auto-reply/reply/reply-media-paths.test.ts b/src/auto-reply/reply/reply-media-paths.test.ts new file mode 100644 index 000000000..01bb865b1 --- /dev/null +++ b/src/auto-reply/reply/reply-media-paths.test.ts @@ -0,0 +1,57 @@ +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const ensureSandboxWorkspaceForSession = vi.hoisted(() => vi.fn()); + +vi.mock("../../agents/sandbox.js", () => ({ + ensureSandboxWorkspaceForSession, +})); + +import { createReplyMediaPathNormalizer } from "./reply-media-paths.js"; + +describe("createReplyMediaPathNormalizer", () => { + beforeEach(() => { + ensureSandboxWorkspaceForSession.mockReset().mockResolvedValue(null); + }); + + it("resolves workspace-relative media against the agent workspace", async () => { + const normalize = createReplyMediaPathNormalizer({ + cfg: {}, + sessionKey: "session-key", + workspaceDir: "/tmp/agent-workspace", + }); + + const result = await normalize({ + mediaUrls: ["./out/photo.png"], + }); + + expect(result).toMatchObject({ + mediaUrl: path.join("/tmp/agent-workspace", "out", "photo.png"), + mediaUrls: [path.join("/tmp/agent-workspace", "out", "photo.png")], + }); + }); + + it("maps sandbox-relative media back to the host sandbox workspace", async () => { + ensureSandboxWorkspaceForSession.mockResolvedValue({ + workspaceDir: "/tmp/sandboxes/session-1", + containerWorkdir: "/workspace", + }); + const normalize = createReplyMediaPathNormalizer({ + cfg: {}, + sessionKey: "session-key", + workspaceDir: "/tmp/agent-workspace", + }); + + const result = await normalize({ + mediaUrls: ["./out/photo.png", "file:///workspace/screens/final.png"], + }); + + expect(result).toMatchObject({ + mediaUrl: path.join("/tmp/sandboxes/session-1", "out", "photo.png"), + mediaUrls: [ + path.join("/tmp/sandboxes/session-1", "out", "photo.png"), + path.join("/tmp/sandboxes/session-1", "screens", "final.png"), + ], + }); + }); +}); diff --git a/src/auto-reply/reply/reply-media-paths.ts b/src/auto-reply/reply/reply-media-paths.ts new file mode 100644 index 000000000..1c09316af --- /dev/null +++ b/src/auto-reply/reply/reply-media-paths.ts @@ -0,0 +1,105 @@ +import { resolvePathFromInput } from "../../agents/path-policy.js"; +import { assertMediaNotDataUrl, resolveSandboxedMediaSource } from "../../agents/sandbox-paths.js"; +import { ensureSandboxWorkspaceForSession } from "../../agents/sandbox.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { ReplyPayload } from "../types.js"; + +const HTTP_URL_RE = /^https?:\/\//i; +const FILE_URL_RE = /^file:\/\//i; +const WINDOWS_DRIVE_RE = /^[a-zA-Z]:[\\/]/; +const SCHEME_RE = /^[a-zA-Z][a-zA-Z0-9+.-]*:/; +const HAS_FILE_EXT_RE = /\.\w{1,10}$/; + +function isLikelyLocalMediaSource(media: string): boolean { + return ( + FILE_URL_RE.test(media) || + media.startsWith("/") || + media.startsWith("./") || + media.startsWith("../") || + media.startsWith("~") || + WINDOWS_DRIVE_RE.test(media) || + media.startsWith("\\\\") || + (!SCHEME_RE.test(media) && + (media.includes("/") || media.includes("\\") || HAS_FILE_EXT_RE.test(media))) + ); +} + +function getPayloadMediaList(payload: ReplyPayload): string[] { + return payload.mediaUrls?.length ? payload.mediaUrls : payload.mediaUrl ? [payload.mediaUrl] : []; +} + +export function createReplyMediaPathNormalizer(params: { + cfg: OpenClawConfig; + sessionKey?: string; + workspaceDir: string; +}): (payload: ReplyPayload) => Promise { + let sandboxRootPromise: Promise | undefined; + + const resolveSandboxRoot = async (): Promise => { + if (!sandboxRootPromise) { + sandboxRootPromise = ensureSandboxWorkspaceForSession({ + config: params.cfg, + sessionKey: params.sessionKey, + workspaceDir: params.workspaceDir, + }).then((sandbox) => sandbox?.workspaceDir); + } + return await sandboxRootPromise; + }; + + const normalizeMediaSource = async (raw: string): Promise => { + const media = raw.trim(); + if (!media) { + return media; + } + assertMediaNotDataUrl(media); + if (HTTP_URL_RE.test(media)) { + return media; + } + const sandboxRoot = await resolveSandboxRoot(); + if (sandboxRoot) { + return await resolveSandboxedMediaSource({ + media, + sandboxRoot, + }); + } + if (!isLikelyLocalMediaSource(media)) { + return media; + } + if (FILE_URL_RE.test(media)) { + return media; + } + return resolvePathFromInput(media, params.workspaceDir); + }; + + return async (payload) => { + const mediaList = getPayloadMediaList(payload); + if (mediaList.length === 0) { + return payload; + } + + const normalizedMedia: string[] = []; + const seen = new Set(); + for (const media of mediaList) { + const normalized = await normalizeMediaSource(media); + if (!normalized || seen.has(normalized)) { + continue; + } + seen.add(normalized); + normalizedMedia.push(normalized); + } + + if (normalizedMedia.length === 0) { + return { + ...payload, + mediaUrl: undefined, + mediaUrls: undefined, + }; + } + + return { + ...payload, + mediaUrl: normalizedMedia[0], + mediaUrls: normalizedMedia, + }; + }; +}