import type { Bot } from "grammy"; import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; export type TelegramDraftStream = { update: (text: string) => void; flush: () => Promise; messageId: () => number | undefined; clear: () => Promise; stop: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; }; type TelegramDraftPreview = { text: string; parseMode?: "HTML"; }; export function createTelegramDraftStream(params: { api: Bot["api"]; chatId: number; maxChars?: number; thread?: TelegramThreadSpec | null; replyToMessageId?: number; throttleMs?: number; /** Minimum chars before sending first message (debounce for push notifications) */ minInitialChars?: number; /** Optional preview renderer (e.g. markdown -> HTML + parse mode). */ renderText?: (text: string) => TelegramDraftPreview; log?: (message: string) => void; warn?: (message: string) => void; }): TelegramDraftStream { const maxChars = Math.min( params.maxChars ?? TELEGRAM_STREAM_MAX_CHARS, TELEGRAM_STREAM_MAX_CHARS, ); const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); const minInitialChars = params.minInitialChars; const chatId = params.chatId; const threadParams = buildTelegramThreadParams(params.thread); const replyParams = params.replyToMessageId != null ? { ...threadParams, reply_to_message_id: params.replyToMessageId } : threadParams; let streamMessageId: number | undefined; let lastSentText = ""; let lastSentParseMode: "HTML" | undefined; let stopped = false; let isFinal = false; const sendOrEditStreamMessage = async (text: string): Promise => { // Allow final flush even if stopped (e.g., after clear()). if (stopped && !isFinal) { return false; } const trimmed = text.trimEnd(); if (!trimmed) { return false; } const rendered = params.renderText?.(trimmed) ?? { text: trimmed }; const renderedText = rendered.text.trimEnd(); const renderedParseMode = rendered.parseMode; if (!renderedText) { return false; } if (renderedText.length > maxChars) { // Telegram text messages/edits cap at 4096 chars. // Stop streaming once we exceed the cap to avoid repeated API failures. stopped = true; params.warn?.( `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, ); return false; } if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) { return true; } // Debounce first preview send for better push notification quality. if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) { if (renderedText.length < minInitialChars) { return false; } } lastSentText = renderedText; lastSentParseMode = renderedParseMode; try { if (typeof streamMessageId === "number") { if (renderedParseMode) { await params.api.editMessageText(chatId, streamMessageId, renderedText, { parse_mode: renderedParseMode, }); } else { await params.api.editMessageText(chatId, streamMessageId, renderedText); } return true; } const sendParams = renderedParseMode ? { ...replyParams, parse_mode: renderedParseMode, } : replyParams; const sent = await params.api.sendMessage(chatId, renderedText, sendParams); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { stopped = true; params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); return false; } streamMessageId = Math.trunc(sentMessageId); return true; } catch (err) { stopped = true; params.warn?.( `telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`, ); return false; } }; const loop = createDraftStreamLoop({ throttleMs, isStopped: () => stopped, sendOrEditStreamMessage, }); const update = (text: string) => { if (stopped || isFinal) { return; } loop.update(text); }; const stop = async (): Promise => { isFinal = true; await loop.flush(); }; const clear = async () => { stopped = true; loop.stop(); await loop.waitForInFlight(); const messageId = streamMessageId; streamMessageId = undefined; if (typeof messageId !== "number") { return; } try { await params.api.deleteMessage(chatId, messageId); params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); } catch (err) { params.warn?.( `telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`, ); } }; const forceNewMessage = () => { streamMessageId = undefined; lastSentText = ""; lastSentParseMode = undefined; loop.resetPending(); loop.resetThrottleWindow(); }; params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { update, flush: loop.flush, messageId: () => streamMessageId, clear, stop, forceNewMessage, }; }