import type { Bot } from "grammy"; import { beforeEach, describe, expect, it, vi } from "vitest"; const createTelegramDraftStream = vi.hoisted(() => vi.fn()); const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn()); const deliverReplies = vi.hoisted(() => vi.fn()); const editMessageTelegram = vi.hoisted(() => vi.fn()); vi.mock("./draft-stream.js", () => ({ createTelegramDraftStream, })); vi.mock("../auto-reply/reply/provider-dispatcher.js", () => ({ dispatchReplyWithBufferedBlockDispatcher, })); vi.mock("./bot/delivery.js", () => ({ deliverReplies, })); vi.mock("./send.js", () => ({ editMessageTelegram, })); vi.mock("./sticker-cache.js", () => ({ cacheSticker: vi.fn(), describeStickerImage: vi.fn(), })); import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; describe("dispatchTelegramMessage draft streaming", () => { beforeEach(() => { createTelegramDraftStream.mockReset(); dispatchReplyWithBufferedBlockDispatcher.mockReset(); deliverReplies.mockReset(); editMessageTelegram.mockReset(); }); it("streams drafts in private threads and forwards thread id", async () => { const draftStream = { update: vi.fn(), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockReturnValue(undefined), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn(), }; createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { await replyOptions?.onPartialReply?.({ text: "Hello" }); await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); return { queuedFinal: true }; }, ); deliverReplies.mockResolvedValue({ delivered: true }); const context = { ctxPayload: {}, primaryCtx: { message: { chat: { id: 123, type: "private" } } }, msg: { chat: { id: 123, type: "private" }, message_id: 456, message_thread_id: 777, }, chatId: 123, isGroup: false, resolvedThreadId: undefined, replyThreadId: 777, threadSpec: { id: 777, scope: "dm" }, historyKey: undefined, historyLimit: 0, groupHistories: new Map(), route: { agentId: "default", accountId: "default" }, skillFilter: undefined, sendTyping: vi.fn(), sendRecordVoice: vi.fn(), ackReactionPromise: null, reactionApi: null, removeAckAfterReply: false, }; const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot; const runtime = { log: vi.fn(), error: vi.fn(), exit: () => { throw new Error("exit"); }, }; await dispatchTelegramMessage({ context, bot, cfg: {}, runtime, replyToMode: "first", streamMode: "partial", textLimit: 4096, telegramCfg: {}, opts: { token: "token" }, }); expect(createTelegramDraftStream).toHaveBeenCalledWith( expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); expect(deliverReplies).toHaveBeenCalledWith( expect.objectContaining({ thread: { id: 777, scope: "dm" }, }), ); expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( expect.objectContaining({ replyOptions: expect.objectContaining({ disableBlockStreaming: true, }), }), ); expect(editMessageTelegram).not.toHaveBeenCalled(); expect(draftStream.clear).toHaveBeenCalledTimes(1); }); it("keeps block streaming enabled when account config enables it", async () => { dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); return { queuedFinal: true }; }); deliverReplies.mockResolvedValue({ delivered: true }); const context = { ctxPayload: {}, primaryCtx: { message: { chat: { id: 123, type: "private" } } }, msg: { chat: { id: 123, type: "private" }, message_id: 456, message_thread_id: 777, }, chatId: 123, isGroup: false, resolvedThreadId: undefined, replyThreadId: 777, threadSpec: { id: 777, scope: "dm" }, historyKey: undefined, historyLimit: 0, groupHistories: new Map(), route: { agentId: "default", accountId: "default" }, skillFilter: undefined, sendTyping: vi.fn(), sendRecordVoice: vi.fn(), ackReactionPromise: null, reactionApi: null, removeAckAfterReply: false, }; const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot; const runtime = { log: vi.fn(), error: vi.fn(), exit: () => { throw new Error("exit"); }, }; await dispatchTelegramMessage({ context, bot, cfg: {}, runtime, replyToMode: "first", streamMode: "partial", textLimit: 4096, telegramCfg: { blockStreaming: true }, opts: { token: "token" }, }); expect(createTelegramDraftStream).not.toHaveBeenCalled(); expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( expect.objectContaining({ replyOptions: expect.objectContaining({ disableBlockStreaming: false, onPartialReply: undefined, }), }), ); }); it("finalizes text-only replies by editing the preview message in place", async () => { const draftStream = { update: vi.fn(), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockReturnValue(999), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn(), }; createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { await replyOptions?.onPartialReply?.({ text: "Hel" }); await dispatcherOptions.deliver({ text: "Hello final" }, { kind: "final" }); return { queuedFinal: true }; }, ); deliverReplies.mockResolvedValue({ delivered: true }); editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); const context = { ctxPayload: {}, primaryCtx: { message: { chat: { id: 123, type: "private" } } }, msg: { chat: { id: 123, type: "private" }, message_id: 456, message_thread_id: 777, }, chatId: 123, isGroup: false, resolvedThreadId: undefined, replyThreadId: 777, threadSpec: { id: 777, scope: "dm" }, historyKey: undefined, historyLimit: 0, groupHistories: new Map(), route: { agentId: "default", accountId: "default" }, skillFilter: undefined, sendTyping: vi.fn(), sendRecordVoice: vi.fn(), ackReactionPromise: null, reactionApi: null, removeAckAfterReply: false, }; const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot; const runtime = { log: vi.fn(), error: vi.fn(), exit: () => { throw new Error("exit"); }, }; await dispatchTelegramMessage({ context, bot, cfg: {}, runtime, replyToMode: "first", streamMode: "partial", textLimit: 4096, telegramCfg: {}, opts: { token: "token" }, }); expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Hello final", expect.any(Object)); expect(deliverReplies).not.toHaveBeenCalled(); expect(draftStream.clear).not.toHaveBeenCalled(); expect(draftStream.stop).toHaveBeenCalled(); }); it("falls back to normal delivery when preview final is too long to edit", async () => { const draftStream = { update: vi.fn(), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockReturnValue(999), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn(), }; createTelegramDraftStream.mockReturnValue(draftStream); const longText = "x".repeat(5000); dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { await dispatcherOptions.deliver({ text: longText }, { kind: "final" }); return { queuedFinal: true }; }); deliverReplies.mockResolvedValue({ delivered: true }); editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); const context = { ctxPayload: {}, primaryCtx: { message: { chat: { id: 123, type: "private" } } }, msg: { chat: { id: 123, type: "private" }, message_id: 456, message_thread_id: 777, }, chatId: 123, isGroup: false, resolvedThreadId: undefined, replyThreadId: 777, threadSpec: { id: 777, scope: "dm" }, historyKey: undefined, historyLimit: 0, groupHistories: new Map(), route: { agentId: "default", accountId: "default" }, skillFilter: undefined, sendTyping: vi.fn(), sendRecordVoice: vi.fn(), ackReactionPromise: null, reactionApi: null, removeAckAfterReply: false, }; const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot; const runtime = { log: vi.fn(), error: vi.fn(), exit: () => { throw new Error("exit"); }, }; await dispatchTelegramMessage({ context, bot, cfg: {}, runtime, replyToMode: "first", streamMode: "partial", textLimit: 4096, telegramCfg: {}, opts: { token: "token" }, }); expect(editMessageTelegram).not.toHaveBeenCalled(); expect(deliverReplies).toHaveBeenCalledWith( expect.objectContaining({ replies: [expect.objectContaining({ text: longText })], }), ); expect(draftStream.clear).toHaveBeenCalledTimes(1); expect(draftStream.stop).toHaveBeenCalled(); }); });