fix: stabilize telegram draft boundary previews (#33842) (thanks @ngutman)

This commit is contained in:
Ayaan Zaidi
2026-03-04 08:55:13 +05:30
committed by Ayaan Zaidi
parent 5ce53095c5
commit 575bd77196
8 changed files with 463 additions and 73 deletions

View File

@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
- Discord/voice messages: request upload slots with JSON fetch calls so voice message uploads no longer fail with content-type errors. Thanks @thewilloftheshadow.
- Discord/voice decoder fallback: drop the native Opus dependency and use opusscript for voice decoding to avoid native-opus installs. Thanks @thewilloftheshadow.
- Discord/auto presence health signal: add runtime availability-driven presence updates plus connected-state reporting to improve health monitoring and operator visibility. (#33277) Thanks @thewilloftheshadow.
- Telegram/draft-stream boundary stability: materialize DM draft previews at assistant-message/tool boundaries, serialize lane-boundary callbacks before final delivery, and scope preview cleanup to the active preview so multi-step Telegram streams no longer lose, overwrite, or leave stale preview bubbles. (#33842) Thanks @ngutman.
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
- Telegram/draft preview boundary + silent-token reliability: stabilize answer-lane message boundaries across late-partial/message-start races, preserve/reset finalized preview state at the correct boundaries, and suppress `NO_REPLY` lead-fragment leaks without broad heartbeat-prefix false positives. (#33169) Thanks @obviyus.
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.

View File

@@ -422,6 +422,178 @@ describe("dispatchTelegramMessage draft streaming", () => {
},
);
it("materializes boundary preview and keeps it when no matching final arrives", async () => {
const answerDraftStream = createDraftStream(999);
answerDraftStream.materialize.mockResolvedValue(4321);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
await replyOptions?.onAssistantMessageStart?.();
return { queuedFinal: false };
});
const bot = createBot();
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
const deleteMessageCalls = (
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
).deleteMessage.mock.calls;
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
});
it("waits for queued boundary rotation before final lane delivery", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
const materializePromise = new Promise<number | undefined>((resolve) => {
resolveMaterialize = resolve;
});
answerDraftStream.materialize.mockImplementation(() => materializePromise);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
const startPromise = replyOptions?.onAssistantMessageStart?.();
const finalPromise = dispatcherOptions.deliver(
{ text: "Message B final" },
{ kind: "final" },
);
resolveMaterialize?.(1001);
await startPromise;
await finalPromise;
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenCalledTimes(2);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("clears active preview even when an unrelated boundary archive exists", async () => {
const answerDraftStream = createDraftStream(999);
answerDraftStream.materialize.mockResolvedValue(4321);
answerDraftStream.forceNewMessage.mockImplementation(() => {
answerDraftStream.setMessageId(5555);
});
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Unfinalized next preview" });
return { queuedFinal: false };
});
const bot = createBot();
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
const deleteMessageCalls = (
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
).deleteMessage.mock.calls;
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
});
it("queues late partials behind async boundary materialization", async () => {
const answerDraftStream = createDraftStream(999);
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
const materializePromise = new Promise<number | undefined>((resolve) => {
resolveMaterialize = resolve;
});
answerDraftStream.materialize.mockImplementation(() => materializePromise);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
// Simulate provider fire-and-forget ordering: boundary callback starts
// and a new partial arrives before boundary materialization resolves.
const startPromise = replyOptions?.onAssistantMessageStart?.();
const nextPartialPromise = replyOptions?.onPartialReply?.({ text: "Message B early" });
expect(answerDraftStream.update).toHaveBeenCalledTimes(1);
resolveMaterialize?.(4321);
await startPromise;
await nextPartialPromise;
return { queuedFinal: false };
});
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.update).toHaveBeenCalledTimes(2);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
});
it("keeps final-only preview lane finalized until a real boundary rotation happens", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// Final-only first response (no streamed partials).
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
// Simulate provider ordering bug: first chunk arrives before message-start callback.
await replyOptions?.onPartialReply?.({ text: "Message B early" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
});
it("does not force new message on first assistant message start", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
@@ -829,6 +1001,32 @@ describe("dispatchTelegramMessage draft streaming", () => {
},
);
it("queues reasoning-end split decisions behind queued reasoning deltas", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// Simulate fire-and-forget upstream ordering: reasoning_end arrives
// before the queued reasoning delta callback has finished.
const firstReasoningPromise = replyOptions?.onReasoningStream?.({
text: "Reasoning:\n_first block_",
});
await replyOptions?.onReasoningEnd?.();
await firstReasoningPromise;
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" });
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" });
expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("cleans superseded reasoning previews after lane rotation", async () => {
let reasoningDraftParams:
| {

View File

@@ -214,6 +214,7 @@ export const dispatchTelegramMessage = async ({
archivedAnswerPreviews.push({
messageId: preview.messageId,
textSnapshot: preview.textSnapshot,
deleteIfUnused: true,
});
}
: undefined,
@@ -239,7 +240,15 @@ export const dispatchTelegramMessage = async ({
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
let skipNextAnswerMessageStartRotation = false;
let draftLaneEventQueue = Promise.resolve();
const reasoningStepState = createTelegramReasoningStepState();
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
const next = draftLaneEventQueue.then(task);
draftLaneEventQueue = next.catch((err) => {
logVerbose(`telegram: draft lane callback failed: ${String(err)}`);
});
return draftLaneEventQueue;
};
type SplitLaneSegment = { lane: LaneName; text: string };
type SplitLaneSegmentsResult = {
segments: SplitLaneSegment[];
@@ -265,17 +274,18 @@ export const dispatchTelegramMessage = async ({
lane.lastPartialText = "";
lane.hasStreamedMessage = false;
};
const rotateAnswerLaneForNewAssistantMessage = () => {
const rotateAnswerLaneForNewAssistantMessage = async () => {
let didForceNewMessage = false;
if (answerLane.hasStreamedMessage) {
const previewMessageId = answerLane.stream?.messageId();
// Only archive previews that still need a matching final text update.
// Once a preview has already been finalized, archiving it here causes
// cleanup to delete a user-visible final message on later media-only turns.
// Materialize the current streamed draft into a permanent message
// so it remains visible across tool boundaries.
const materializedId = await answerLane.stream?.materialize?.();
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) {
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
deleteIfUnused: false,
});
}
answerLane.stream?.forceNewMessage();
@@ -311,14 +321,14 @@ export const dispatchTelegramMessage = async ({
lane.lastPartialText = text;
laneStream.update(text);
};
const ingestDraftLaneSegments = (text: string | undefined) => {
const ingestDraftLaneSegments = async (text: string | undefined) => {
const split = splitTextIntoLaneSegments(text);
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
if (hasAnswerSegment && finalizedPreviewByLane.answer) {
// Some providers can emit the first partial of a new assistant message before
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
// the previously finalized preview message with the next message's text.
skipNextAnswerMessageStartRotation = rotateAnswerLaneForNewAssistantMessage();
skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage();
}
for (const segment of split.segments) {
if (segment.lane === "reasoning") {
@@ -501,6 +511,11 @@ export const dispatchTelegramMessage = async ({
...prefixOptions,
typingCallbacks,
deliver: async (payload, info) => {
if (info.kind === "final") {
// Assistant callbacks are fire-and-forget; ensure queued boundary
// rotations/partials are applied before final delivery mapping.
await enqueueDraftLaneEvent(async () => {});
}
const previewButtons = (
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
)?.buttons;
@@ -610,42 +625,48 @@ export const dispatchTelegramMessage = async ({
disableBlockStreaming,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) => ingestDraftLaneSegments(payload.text)
? (payload) =>
enqueueDraftLaneEvent(async () => {
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => {
// Split between reasoning blocks only when the next reasoning
// stream starts. Splitting at reasoning-end can orphan the active
// preview and cause duplicate reasoning sends on reasoning final.
if (splitReasoningOnNextStream) {
reasoningLane.stream?.forceNewMessage();
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
ingestDraftLaneSegments(payload.text);
}
? (payload) =>
enqueueDraftLaneEvent(async () => {
// Split between reasoning blocks only when the next reasoning
// stream starts. Splitting at reasoning-end can orphan the active
// preview and cause duplicate reasoning sends on reasoning final.
if (splitReasoningOnNextStream) {
reasoningLane.stream?.forceNewMessage();
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onAssistantMessageStart: answerLane.stream
? async () => {
reasoningStepState.resetForNextStep();
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
? () =>
enqueueDraftLaneEvent(async () => {
reasoningStepState.resetForNextStep();
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
finalizedPreviewByLane.answer = false;
return;
}
await rotateAnswerLaneForNewAssistantMessage();
// Message-start is an explicit assistant-message boundary.
// Even when no forceNewMessage happened (e.g. prior answer had no
// streamed partials), the next partial belongs to a fresh lifecycle
// and must not trigger late pre-rotation mid-message.
finalizedPreviewByLane.answer = false;
return;
}
rotateAnswerLaneForNewAssistantMessage();
// Message-start is an explicit assistant-message boundary.
// Even when no forceNewMessage happened (e.g. prior answer had no
// streamed partials), the next partial belongs to a fresh lifecycle
// and must not trigger late pre-rotation mid-message.
finalizedPreviewByLane.answer = false;
}
})
: undefined,
onReasoningEnd: reasoningLane.stream
? () => {
// Split when/if a later reasoning block begins.
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
}
? () =>
enqueueDraftLaneEvent(async () => {
// Split when/if a later reasoning block begins.
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
})
: undefined,
onToolStart: statusReactionController
? async (payload) => {
@@ -656,6 +677,9 @@ export const dispatchTelegramMessage = async ({
},
}));
} finally {
// Upstream assistant callbacks are fire-and-forget; drain queued lane work
// before stream cleanup so boundary rotations/materialization complete first.
await draftLaneEventQueue;
// Must stop() first to flush debounced content before clear() wipes state.
const streamCleanupStates = new Map<
NonNullable<DraftLaneState["stream"]>,
@@ -670,7 +694,17 @@ export const dispatchTelegramMessage = async ({
if (!stream) {
continue;
}
const shouldClear = !finalizedPreviewByLane[laneState.laneName];
// Don't clear (delete) the stream if: (a) it was finalized, or
// (b) the active stream message is itself a boundary-finalized archive.
const activePreviewMessageId = stream.messageId();
const hasBoundaryFinalizedActivePreview =
laneState.laneName === "answer" &&
typeof activePreviewMessageId === "number" &&
archivedAnswerPreviews.some(
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
);
const shouldClear =
!finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
const existing = streamCleanupStates.get(stream);
if (!existing) {
streamCleanupStates.set(stream, { shouldClear });
@@ -685,6 +719,9 @@ export const dispatchTelegramMessage = async ({
}
}
for (const archivedPreview of archivedAnswerPreviews) {
if (archivedPreview.deleteIfUnused === false) {
continue;
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {

View File

@@ -11,6 +11,7 @@ export type TestDraftStream = {
lastDeliveredText: ReturnType<typeof vi.fn<() => string>>;
clear: ReturnType<typeof vi.fn<() => Promise<void>>>;
stop: ReturnType<typeof vi.fn<() => Promise<void>>>;
materialize: ReturnType<typeof vi.fn<() => Promise<number | undefined>>>;
forceNewMessage: ReturnType<typeof vi.fn<() => void>>;
setMessageId: (value: number | undefined) => void;
};
@@ -40,6 +41,7 @@ export function createTestDraftStream(params?: {
stop: vi.fn().mockImplementation(async () => {
await params?.onStop?.();
}),
materialize: vi.fn().mockImplementation(async () => messageId),
forceNewMessage: vi.fn().mockImplementation(() => {
if (params?.clearMessageIdOnForceNew) {
messageId = undefined;
@@ -71,6 +73,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
materialize: vi.fn().mockImplementation(async () => activeMessageId),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
}),

View File

@@ -218,6 +218,66 @@ describe("createTelegramDraftStream", () => {
);
});
it("materializes draft previews using rendered HTML text", async () => {
const api = createMockDraftApi();
const stream = createDraftStream(api, {
thread: { id: 42, scope: "dm" },
previewTransport: "draft",
renderText: (text) => ({
text: text.replace("**bold**", "<b>bold</b>"),
parseMode: "HTML",
}),
});
stream.update("**bold**");
await stream.flush();
await stream.materialize?.();
expect(api.sendMessage).toHaveBeenCalledWith(123, "<b>bold</b>", {
message_thread_id: 42,
parse_mode: "HTML",
});
});
it("retries materialize send without thread when dm thread lookup fails", async () => {
const api = createMockDraftApi();
api.sendMessage
.mockRejectedValueOnce(new Error("400: Bad Request: message thread not found"))
.mockResolvedValueOnce({ message_id: 55 });
const warn = vi.fn();
const stream = createDraftStream(api, {
thread: { id: 42, scope: "dm" },
previewTransport: "draft",
warn,
});
stream.update("Hello");
await stream.flush();
const materializedId = await stream.materialize?.();
expect(materializedId).toBe(55);
expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello", { message_thread_id: 42 });
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Hello", undefined);
expect(warn).toHaveBeenCalledWith(
"telegram stream preview materialize send failed with message_thread_id, retrying without thread",
);
});
it("returns existing preview id when materializing message transport", async () => {
const api = createMockDraftApi();
const stream = createDraftStream(api, {
thread: { id: 42, scope: "dm" },
previewTransport: "message",
});
stream.update("Hello");
await stream.flush();
const materializedId = await stream.materialize?.();
expect(materializedId).toBe(17);
expect(api.sendMessage).toHaveBeenCalledTimes(1);
});
it("does not edit or delete messages after DM draft stream finalization", async () => {
const api = createMockDraftApi();
const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" });

View File

@@ -62,6 +62,8 @@ export type TelegramDraftStream = {
lastDeliveredText?: () => string;
clear: () => Promise<void>;
stop: () => Promise<void>;
/** Convert the current draft preview into a permanent message (sendMessage). */
materialize?: () => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
};
@@ -137,6 +139,38 @@ export function createTelegramDraftStream(params: {
renderedParseMode: "HTML" | undefined;
sendGeneration: number;
};
const sendRenderedMessageWithThreadFallback = async (sendArgs: {
renderedText: string;
renderedParseMode: "HTML" | undefined;
fallbackWarnMessage: string;
}) => {
const sendParams = sendArgs.renderedParseMode
? {
...replyParams,
parse_mode: sendArgs.renderedParseMode,
}
: replyParams;
try {
return await params.api.sendMessage(chatId, sendArgs.renderedText, sendParams);
} catch (err) {
const hasThreadParam =
"message_thread_id" in (sendParams ?? {}) &&
typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number";
if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) {
throw err;
}
const threadlessParams = {
...(sendParams as Record<string, unknown>),
};
delete threadlessParams.message_thread_id;
params.warn?.(sendArgs.fallbackWarnMessage);
return await params.api.sendMessage(
chatId,
sendArgs.renderedText,
Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined,
);
}
};
const sendMessageTransportPreview = async ({
renderedText,
renderedParseMode,
@@ -152,35 +186,12 @@ export function createTelegramDraftStream(params: {
}
return true;
}
const sendParams = renderedParseMode
? {
...replyParams,
parse_mode: renderedParseMode,
}
: replyParams;
let sent;
try {
sent = await params.api.sendMessage(chatId, renderedText, sendParams);
} catch (err) {
const hasThreadParam =
"message_thread_id" in (sendParams ?? {}) &&
typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number";
if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) {
throw err;
}
const threadlessParams = {
...(sendParams as Record<string, unknown>),
};
delete threadlessParams.message_thread_id;
params.warn?.(
const sent = await sendRenderedMessageWithThreadFallback({
renderedText,
renderedParseMode,
fallbackWarnMessage:
"telegram stream preview send failed with message_thread_id, retrying without thread",
);
sent = await params.api.sendMessage(
chatId,
renderedText,
Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined,
);
}
});
const sentMessageId = sent?.message_id;
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
streamState.stopped = true;
@@ -324,6 +335,9 @@ export function createTelegramDraftStream(params: {
});
const forceNewMessage = () => {
// Boundary rotation may call stop() to finalize the previous draft.
// Re-open the stream lifecycle for the next assistant segment.
streamState.final = false;
generation += 1;
streamMessageId = undefined;
if (previewTransport === "draft") {
@@ -335,6 +349,45 @@ export function createTelegramDraftStream(params: {
loop.resetThrottleWindow();
};
/**
* Materialize the current draft into a permanent message.
* For draft transport: sends the accumulated text as a real sendMessage.
* For message transport: the message is already permanent (noop).
* Returns the permanent message id, or undefined if nothing to materialize.
*/
const materialize = async (): Promise<number | undefined> => {
await stop();
// If using message transport, the streamMessageId is already a real message.
if (previewTransport === "message" && typeof streamMessageId === "number") {
return streamMessageId;
}
// For draft transport, use the rendered snapshot first so parse_mode stays
// aligned with the text being materialized.
const renderedText = lastSentText || lastDeliveredText;
if (!renderedText) {
return undefined;
}
const renderedParseMode = lastSentText ? lastSentParseMode : undefined;
try {
const sent = await sendRenderedMessageWithThreadFallback({
renderedText,
renderedParseMode,
fallbackWarnMessage:
"telegram stream preview materialize send failed with message_thread_id, retrying without thread",
});
const sentId = sent?.message_id;
if (typeof sentId === "number" && Number.isFinite(sentId)) {
streamMessageId = Math.trunc(sentId);
return streamMessageId;
}
} catch (err) {
params.warn?.(
`telegram stream preview materialize failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
return undefined;
};
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return {
@@ -346,6 +399,7 @@ export function createTelegramDraftStream(params: {
lastDeliveredText: () => lastDeliveredText,
clear,
stop,
materialize,
forceNewMessage,
};
}

View File

@@ -43,7 +43,11 @@ function createHarness(params?: {
const log = vi.fn();
const markDelivered = vi.fn();
const finalizedPreviewByLane: Record<LaneName, boolean> = { answer: false, reasoning: false };
const archivedAnswerPreviews: Array<{ messageId: number; textSnapshot: string }> = [];
const archivedAnswerPreviews: Array<{
messageId: number;
textSnapshot: string;
deleteIfUnused?: boolean;
}> = [];
const deliverLaneText = createLaneTextDeliverer({
lanes,
@@ -71,8 +75,10 @@ function createHarness(params?: {
flushDraftLane,
stopDraftLane,
editPreview,
deletePreviewMessage,
log,
markDelivered,
archivedAnswerPreviews,
};
}
@@ -306,4 +312,26 @@ describe("createLaneTextDeliverer", () => {
);
expect(harness.markDelivered).not.toHaveBeenCalled();
});
it("deletes consumed boundary previews after fallback final send", async () => {
const harness = createHarness();
harness.archivedAnswerPreviews.push({
messageId: 4444,
textSnapshot: "Boundary preview",
deleteIfUnused: false,
});
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Final with media",
payload: { text: "Final with media", mediaUrl: "file:///tmp/example.png" },
infoKind: "final",
});
expect(result).toBe("sent");
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Final with media", mediaUrl: "file:///tmp/example.png" }),
);
expect(harness.deletePreviewMessage).toHaveBeenCalledWith(4444);
});
});

View File

@@ -13,6 +13,9 @@ export type DraftLaneState = {
export type ArchivedPreview = {
messageId: number;
textSnapshot: string;
// Boundary-finalized previews should remain visible even if no matching
// final edit arrives; superseded previews can be safely deleted.
deleteIfUnused?: boolean;
};
export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
@@ -303,14 +306,20 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
return "preview-finalized";
}
}
try {
await params.deletePreviewMessage(archivedPreview.messageId);
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
// Send the replacement message first, then clean up the old preview.
// This avoids the visual "disappear then reappear" flash.
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
// Once this archived preview is consumed by a fallback final send, delete it
// regardless of deleteIfUnused. That flag only applies to unconsumed boundaries.
if (delivered || archivedPreview.deleteIfUnused !== false) {
try {
await params.deletePreviewMessage(archivedPreview.messageId);
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
}
return delivered ? "sent" : "skipped";
};