diff --git a/CHANGELOG.md b/CHANGELOG.md index dcef30232..b65675bc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Discord/voice messages: request upload slots with JSON fetch calls so voice message uploads no longer fail with content-type errors. Thanks @thewilloftheshadow. - Discord/voice decoder fallback: drop the native Opus dependency and use opusscript for voice decoding to avoid native-opus installs. Thanks @thewilloftheshadow. - Discord/auto presence health signal: add runtime availability-driven presence updates plus connected-state reporting to improve health monitoring and operator visibility. (#33277) Thanks @thewilloftheshadow. +- Telegram/draft-stream boundary stability: materialize DM draft previews at assistant-message/tool boundaries, serialize lane-boundary callbacks before final delivery, and scope preview cleanup to the active preview so multi-step Telegram streams no longer lose, overwrite, or leave stale preview bubbles. (#33842) Thanks @ngutman. - Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils. - Telegram/draft preview boundary + silent-token reliability: stabilize answer-lane message boundaries across late-partial/message-start races, preserve/reset finalized preview state at the correct boundaries, and suppress `NO_REPLY` lead-fragment leaks without broad heartbeat-prefix false positives. (#33169) Thanks @obviyus. - Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow. diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 38dee0f01..b0411e65e 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -422,6 +422,178 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); + it("materializes boundary preview and keeps it when no matching final arrives", async () => { + const answerDraftStream = createDraftStream(999); + answerDraftStream.materialize.mockResolvedValue(4321); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Before tool boundary" }); + await replyOptions?.onAssistantMessageStart?.(); + return { queuedFinal: false }; + }); + + const bot = createBot(); + await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); + + expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1); + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); + const deleteMessageCalls = ( + bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } } + ).deleteMessage.mock.calls; + expect(deleteMessageCalls).not.toContainEqual([123, 4321]); + }); + + it("waits for queued boundary rotation before final lane delivery", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + let resolveMaterialize: ((value: number | undefined) => void) | undefined; + const materializePromise = new Promise((resolve) => { + resolveMaterialize = resolve; + }); + answerDraftStream.materialize.mockImplementation(() => materializePromise); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + const startPromise = replyOptions?.onAssistantMessageStart?.(); + const finalPromise = dispatcherOptions.deliver( + { text: "Message B final" }, + { kind: "final" }, + ); + resolveMaterialize?.(1001); + await startPromise; + await finalPromise; + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledTimes(2); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + }); + + it("clears active preview even when an unrelated boundary archive exists", async () => { + const answerDraftStream = createDraftStream(999); + answerDraftStream.materialize.mockResolvedValue(4321); + answerDraftStream.forceNewMessage.mockImplementation(() => { + answerDraftStream.setMessageId(5555); + }); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Before tool boundary" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Unfinalized next preview" }); + return { queuedFinal: false }; + }); + + const bot = createBot(); + await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); + + expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); + const deleteMessageCalls = ( + bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } } + ).deleteMessage.mock.calls; + expect(deleteMessageCalls).not.toContainEqual([123, 4321]); + }); + + it("queues late partials behind async boundary materialization", async () => { + const answerDraftStream = createDraftStream(999); + let resolveMaterialize: ((value: number | undefined) => void) | undefined; + const materializePromise = new Promise((resolve) => { + resolveMaterialize = resolve; + }); + answerDraftStream.materialize.mockImplementation(() => materializePromise); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + + // Simulate provider fire-and-forget ordering: boundary callback starts + // and a new partial arrives before boundary materialization resolves. + const startPromise = replyOptions?.onAssistantMessageStart?.(); + const nextPartialPromise = replyOptions?.onPartialReply?.({ text: "Message B early" }); + + expect(answerDraftStream.update).toHaveBeenCalledTimes(1); + resolveMaterialize?.(4321); + + await startPromise; + await nextPartialPromise; + return { queuedFinal: false }; + }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1); + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(answerDraftStream.update).toHaveBeenCalledTimes(2); + expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early"); + const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; + const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1]; + expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder); + }); + + it("keeps final-only preview lane finalized until a real boundary rotation happens", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // Final-only first response (no streamed partials). + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + // Simulate provider ordering bug: first chunk arrives before message-start callback. + await replyOptions?.onPartialReply?.({ text: "Message B early" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + }); + it("does not force new message on first assistant message start", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -829,6 +1001,32 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); + it("queues reasoning-end split decisions behind queued reasoning deltas", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // Simulate fire-and-forget upstream ordering: reasoning_end arrives + // before the queued reasoning delta callback has finished. + const firstReasoningPromise = replyOptions?.onReasoningStream?.({ + text: "Reasoning:\n_first block_", + }); + await replyOptions?.onReasoningEnd?.(); + await firstReasoningPromise; + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" }); + await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + }); + it("cleans superseded reasoning previews after lane rotation", async () => { let reasoningDraftParams: | { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index c72ed3f59..0433fed9f 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -214,6 +214,7 @@ export const dispatchTelegramMessage = async ({ archivedAnswerPreviews.push({ messageId: preview.messageId, textSnapshot: preview.textSnapshot, + deleteIfUnused: true, }); } : undefined, @@ -239,7 +240,15 @@ export const dispatchTelegramMessage = async ({ const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; let skipNextAnswerMessageStartRotation = false; + let draftLaneEventQueue = Promise.resolve(); const reasoningStepState = createTelegramReasoningStepState(); + const enqueueDraftLaneEvent = (task: () => Promise): Promise => { + const next = draftLaneEventQueue.then(task); + draftLaneEventQueue = next.catch((err) => { + logVerbose(`telegram: draft lane callback failed: ${String(err)}`); + }); + return draftLaneEventQueue; + }; type SplitLaneSegment = { lane: LaneName; text: string }; type SplitLaneSegmentsResult = { segments: SplitLaneSegment[]; @@ -265,17 +274,18 @@ export const dispatchTelegramMessage = async ({ lane.lastPartialText = ""; lane.hasStreamedMessage = false; }; - const rotateAnswerLaneForNewAssistantMessage = () => { + const rotateAnswerLaneForNewAssistantMessage = async () => { let didForceNewMessage = false; if (answerLane.hasStreamedMessage) { - const previewMessageId = answerLane.stream?.messageId(); - // Only archive previews that still need a matching final text update. - // Once a preview has already been finalized, archiving it here causes - // cleanup to delete a user-visible final message on later media-only turns. + // Materialize the current streamed draft into a permanent message + // so it remains visible across tool boundaries. + const materializedId = await answerLane.stream?.materialize?.(); + const previewMessageId = materializedId ?? answerLane.stream?.messageId(); if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) { archivedAnswerPreviews.push({ messageId: previewMessageId, textSnapshot: answerLane.lastPartialText, + deleteIfUnused: false, }); } answerLane.stream?.forceNewMessage(); @@ -311,14 +321,14 @@ export const dispatchTelegramMessage = async ({ lane.lastPartialText = text; laneStream.update(text); }; - const ingestDraftLaneSegments = (text: string | undefined) => { + const ingestDraftLaneSegments = async (text: string | undefined) => { const split = splitTextIntoLaneSegments(text); const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer"); if (hasAnswerSegment && finalizedPreviewByLane.answer) { // Some providers can emit the first partial of a new assistant message before // onAssistantMessageStart() arrives. Rotate preemptively so we do not edit // the previously finalized preview message with the next message's text. - skipNextAnswerMessageStartRotation = rotateAnswerLaneForNewAssistantMessage(); + skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage(); } for (const segment of split.segments) { if (segment.lane === "reasoning") { @@ -501,6 +511,11 @@ export const dispatchTelegramMessage = async ({ ...prefixOptions, typingCallbacks, deliver: async (payload, info) => { + if (info.kind === "final") { + // Assistant callbacks are fire-and-forget; ensure queued boundary + // rotations/partials are applied before final delivery mapping. + await enqueueDraftLaneEvent(async () => {}); + } const previewButtons = ( payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined )?.buttons; @@ -610,42 +625,48 @@ export const dispatchTelegramMessage = async ({ disableBlockStreaming, onPartialReply: answerLane.stream || reasoningLane.stream - ? (payload) => ingestDraftLaneSegments(payload.text) + ? (payload) => + enqueueDraftLaneEvent(async () => { + await ingestDraftLaneSegments(payload.text); + }) : undefined, onReasoningStream: reasoningLane.stream - ? (payload) => { - // Split between reasoning blocks only when the next reasoning - // stream starts. Splitting at reasoning-end can orphan the active - // preview and cause duplicate reasoning sends on reasoning final. - if (splitReasoningOnNextStream) { - reasoningLane.stream?.forceNewMessage(); - resetDraftLaneState(reasoningLane); - splitReasoningOnNextStream = false; - } - ingestDraftLaneSegments(payload.text); - } + ? (payload) => + enqueueDraftLaneEvent(async () => { + // Split between reasoning blocks only when the next reasoning + // stream starts. Splitting at reasoning-end can orphan the active + // preview and cause duplicate reasoning sends on reasoning final. + if (splitReasoningOnNextStream) { + reasoningLane.stream?.forceNewMessage(); + resetDraftLaneState(reasoningLane); + splitReasoningOnNextStream = false; + } + await ingestDraftLaneSegments(payload.text); + }) : undefined, onAssistantMessageStart: answerLane.stream - ? async () => { - reasoningStepState.resetForNextStep(); - if (skipNextAnswerMessageStartRotation) { - skipNextAnswerMessageStartRotation = false; + ? () => + enqueueDraftLaneEvent(async () => { + reasoningStepState.resetForNextStep(); + if (skipNextAnswerMessageStartRotation) { + skipNextAnswerMessageStartRotation = false; + finalizedPreviewByLane.answer = false; + return; + } + await rotateAnswerLaneForNewAssistantMessage(); + // Message-start is an explicit assistant-message boundary. + // Even when no forceNewMessage happened (e.g. prior answer had no + // streamed partials), the next partial belongs to a fresh lifecycle + // and must not trigger late pre-rotation mid-message. finalizedPreviewByLane.answer = false; - return; - } - rotateAnswerLaneForNewAssistantMessage(); - // Message-start is an explicit assistant-message boundary. - // Even when no forceNewMessage happened (e.g. prior answer had no - // streamed partials), the next partial belongs to a fresh lifecycle - // and must not trigger late pre-rotation mid-message. - finalizedPreviewByLane.answer = false; - } + }) : undefined, onReasoningEnd: reasoningLane.stream - ? () => { - // Split when/if a later reasoning block begins. - splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; - } + ? () => + enqueueDraftLaneEvent(async () => { + // Split when/if a later reasoning block begins. + splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; + }) : undefined, onToolStart: statusReactionController ? async (payload) => { @@ -656,6 +677,9 @@ export const dispatchTelegramMessage = async ({ }, })); } finally { + // Upstream assistant callbacks are fire-and-forget; drain queued lane work + // before stream cleanup so boundary rotations/materialization complete first. + await draftLaneEventQueue; // Must stop() first to flush debounced content before clear() wipes state. const streamCleanupStates = new Map< NonNullable, @@ -670,7 +694,17 @@ export const dispatchTelegramMessage = async ({ if (!stream) { continue; } - const shouldClear = !finalizedPreviewByLane[laneState.laneName]; + // Don't clear (delete) the stream if: (a) it was finalized, or + // (b) the active stream message is itself a boundary-finalized archive. + const activePreviewMessageId = stream.messageId(); + const hasBoundaryFinalizedActivePreview = + laneState.laneName === "answer" && + typeof activePreviewMessageId === "number" && + archivedAnswerPreviews.some( + (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, + ); + const shouldClear = + !finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; const existing = streamCleanupStates.get(stream); if (!existing) { streamCleanupStates.set(stream, { shouldClear }); @@ -685,6 +719,9 @@ export const dispatchTelegramMessage = async ({ } } for (const archivedPreview of archivedAnswerPreviews) { + if (archivedPreview.deleteIfUnused === false) { + continue; + } try { await bot.api.deleteMessage(chatId, archivedPreview.messageId); } catch (err) { diff --git a/src/telegram/draft-stream.test-helpers.ts b/src/telegram/draft-stream.test-helpers.ts index 120204ecb..0a6073309 100644 --- a/src/telegram/draft-stream.test-helpers.ts +++ b/src/telegram/draft-stream.test-helpers.ts @@ -11,6 +11,7 @@ export type TestDraftStream = { lastDeliveredText: ReturnType string>>; clear: ReturnType Promise>>; stop: ReturnType Promise>>; + materialize: ReturnType Promise>>; forceNewMessage: ReturnType void>>; setMessageId: (value: number | undefined) => void; }; @@ -40,6 +41,7 @@ export function createTestDraftStream(params?: { stop: vi.fn().mockImplementation(async () => { await params?.onStop?.(); }), + materialize: vi.fn().mockImplementation(async () => messageId), forceNewMessage: vi.fn().mockImplementation(() => { if (params?.clearMessageIdOnForceNew) { messageId = undefined; @@ -71,6 +73,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn().mockResolvedValue(undefined), + materialize: vi.fn().mockImplementation(async () => activeMessageId), forceNewMessage: vi.fn().mockImplementation(() => { activeMessageId = undefined; }), diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 594b5df96..07de41344 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -218,6 +218,66 @@ describe("createTelegramDraftStream", () => { ); }); + it("materializes draft previews using rendered HTML text", async () => { + const api = createMockDraftApi(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + renderText: (text) => ({ + text: text.replace("**bold**", "bold"), + parseMode: "HTML", + }), + }); + + stream.update("**bold**"); + await stream.flush(); + await stream.materialize?.(); + + expect(api.sendMessage).toHaveBeenCalledWith(123, "bold", { + message_thread_id: 42, + parse_mode: "HTML", + }); + }); + + it("retries materialize send without thread when dm thread lookup fails", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockRejectedValueOnce(new Error("400: Bad Request: message thread not found")) + .mockResolvedValueOnce({ message_id: 55 }); + const warn = vi.fn(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "draft", + warn, + }); + + stream.update("Hello"); + await stream.flush(); + const materializedId = await stream.materialize?.(); + + expect(materializedId).toBe(55); + expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello", { message_thread_id: 42 }); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Hello", undefined); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview materialize send failed with message_thread_id, retrying without thread", + ); + }); + + it("returns existing preview id when materializing message transport", async () => { + const api = createMockDraftApi(); + const stream = createDraftStream(api, { + thread: { id: 42, scope: "dm" }, + previewTransport: "message", + }); + + stream.update("Hello"); + await stream.flush(); + const materializedId = await stream.materialize?.(); + + expect(materializedId).toBe(17); + expect(api.sendMessage).toHaveBeenCalledTimes(1); + }); + it("does not edit or delete messages after DM draft stream finalization", async () => { const api = createMockDraftApi(); const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" }); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 1a578ad46..cb64ba80a 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -62,6 +62,8 @@ export type TelegramDraftStream = { lastDeliveredText?: () => string; clear: () => Promise; stop: () => Promise; + /** Convert the current draft preview into a permanent message (sendMessage). */ + materialize?: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; }; @@ -137,6 +139,38 @@ export function createTelegramDraftStream(params: { renderedParseMode: "HTML" | undefined; sendGeneration: number; }; + const sendRenderedMessageWithThreadFallback = async (sendArgs: { + renderedText: string; + renderedParseMode: "HTML" | undefined; + fallbackWarnMessage: string; + }) => { + const sendParams = sendArgs.renderedParseMode + ? { + ...replyParams, + parse_mode: sendArgs.renderedParseMode, + } + : replyParams; + try { + return await params.api.sendMessage(chatId, sendArgs.renderedText, sendParams); + } catch (err) { + const hasThreadParam = + "message_thread_id" in (sendParams ?? {}) && + typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number"; + if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) { + throw err; + } + const threadlessParams = { + ...(sendParams as Record), + }; + delete threadlessParams.message_thread_id; + params.warn?.(sendArgs.fallbackWarnMessage); + return await params.api.sendMessage( + chatId, + sendArgs.renderedText, + Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined, + ); + } + }; const sendMessageTransportPreview = async ({ renderedText, renderedParseMode, @@ -152,35 +186,12 @@ export function createTelegramDraftStream(params: { } return true; } - const sendParams = renderedParseMode - ? { - ...replyParams, - parse_mode: renderedParseMode, - } - : replyParams; - let sent; - try { - sent = await params.api.sendMessage(chatId, renderedText, sendParams); - } catch (err) { - const hasThreadParam = - "message_thread_id" in (sendParams ?? {}) && - typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number"; - if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) { - throw err; - } - const threadlessParams = { - ...(sendParams as Record), - }; - delete threadlessParams.message_thread_id; - params.warn?.( + const sent = await sendRenderedMessageWithThreadFallback({ + renderedText, + renderedParseMode, + fallbackWarnMessage: "telegram stream preview send failed with message_thread_id, retrying without thread", - ); - sent = await params.api.sendMessage( - chatId, - renderedText, - Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined, - ); - } + }); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { streamState.stopped = true; @@ -324,6 +335,9 @@ export function createTelegramDraftStream(params: { }); const forceNewMessage = () => { + // Boundary rotation may call stop() to finalize the previous draft. + // Re-open the stream lifecycle for the next assistant segment. + streamState.final = false; generation += 1; streamMessageId = undefined; if (previewTransport === "draft") { @@ -335,6 +349,45 @@ export function createTelegramDraftStream(params: { loop.resetThrottleWindow(); }; + /** + * Materialize the current draft into a permanent message. + * For draft transport: sends the accumulated text as a real sendMessage. + * For message transport: the message is already permanent (noop). + * Returns the permanent message id, or undefined if nothing to materialize. + */ + const materialize = async (): Promise => { + await stop(); + // If using message transport, the streamMessageId is already a real message. + if (previewTransport === "message" && typeof streamMessageId === "number") { + return streamMessageId; + } + // For draft transport, use the rendered snapshot first so parse_mode stays + // aligned with the text being materialized. + const renderedText = lastSentText || lastDeliveredText; + if (!renderedText) { + return undefined; + } + const renderedParseMode = lastSentText ? lastSentParseMode : undefined; + try { + const sent = await sendRenderedMessageWithThreadFallback({ + renderedText, + renderedParseMode, + fallbackWarnMessage: + "telegram stream preview materialize send failed with message_thread_id, retrying without thread", + }); + const sentId = sent?.message_id; + if (typeof sentId === "number" && Number.isFinite(sentId)) { + streamMessageId = Math.trunc(sentId); + return streamMessageId; + } + } catch (err) { + params.warn?.( + `telegram stream preview materialize failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + return undefined; + }; + params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { @@ -346,6 +399,7 @@ export function createTelegramDraftStream(params: { lastDeliveredText: () => lastDeliveredText, clear, stop, + materialize, forceNewMessage, }; } diff --git a/src/telegram/lane-delivery.test.ts b/src/telegram/lane-delivery.test.ts index a0ab90308..15344f856 100644 --- a/src/telegram/lane-delivery.test.ts +++ b/src/telegram/lane-delivery.test.ts @@ -43,7 +43,11 @@ function createHarness(params?: { const log = vi.fn(); const markDelivered = vi.fn(); const finalizedPreviewByLane: Record = { answer: false, reasoning: false }; - const archivedAnswerPreviews: Array<{ messageId: number; textSnapshot: string }> = []; + const archivedAnswerPreviews: Array<{ + messageId: number; + textSnapshot: string; + deleteIfUnused?: boolean; + }> = []; const deliverLaneText = createLaneTextDeliverer({ lanes, @@ -71,8 +75,10 @@ function createHarness(params?: { flushDraftLane, stopDraftLane, editPreview, + deletePreviewMessage, log, markDelivered, + archivedAnswerPreviews, }; } @@ -306,4 +312,26 @@ describe("createLaneTextDeliverer", () => { ); expect(harness.markDelivered).not.toHaveBeenCalled(); }); + + it("deletes consumed boundary previews after fallback final send", async () => { + const harness = createHarness(); + harness.archivedAnswerPreviews.push({ + messageId: 4444, + textSnapshot: "Boundary preview", + deleteIfUnused: false, + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Final with media", + payload: { text: "Final with media", mediaUrl: "file:///tmp/example.png" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Final with media", mediaUrl: "file:///tmp/example.png" }), + ); + expect(harness.deletePreviewMessage).toHaveBeenCalledWith(4444); + }); }); diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 7ae70fbe9..5196b4d99 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -13,6 +13,9 @@ export type DraftLaneState = { export type ArchivedPreview = { messageId: number; textSnapshot: string; + // Boundary-finalized previews should remain visible even if no matching + // final edit arrives; superseded previews can be safely deleted. + deleteIfUnused?: boolean; }; export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; @@ -303,14 +306,20 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return "preview-finalized"; } } - try { - await params.deletePreviewMessage(archivedPreview.messageId); - } catch (err) { - params.log( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } + // Send the replacement message first, then clean up the old preview. + // This avoids the visual "disappear then reappear" flash. const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + // Once this archived preview is consumed by a fallback final send, delete it + // regardless of deleteIfUnused. That flag only applies to unconsumed boundaries. + if (delivered || archivedPreview.deleteIfUnused !== false) { + try { + await params.deletePreviewMessage(archivedPreview.messageId); + } catch (err) { + params.log( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + } return delivered ? "sent" : "skipped"; };