diff --git a/CHANGELOG.md b/CHANGELOG.md index a2aa745fc..deaf82d99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai - Channels/Multi-account config: when adding a non-default channel account to a single-account top-level channel setup, move existing account-scoped top-level single-account values into `channels..accounts.default` before writing the new account so the original account keeps working without duplicated account values at channel root; `openclaw doctor --fix` now repairs previously mixed channel account shapes the same way. (#27334) thanks @gumadeiras. - Telegram/Inline buttons: allow callback-query button handling in groups (including `/models` follow-up buttons) when group policy authorizes the sender, by removing the redundant callback allowlist gate that blocked open-policy groups. (#27343) Thanks @GodsBoy. - Telegram/Streaming preview: when finalizing without an existing preview message, prime pending preview text with final answer before stop-flush so users do not briefly see stale 1-2 word fragments (for example `no` before `no problem`). (#27449) Thanks @emanuelst for the original fix direction in #19673. +- Telegram/sendChatAction 401 handling: add bounded exponential backoff + temporary local typing suppression after repeated unauthorized failures to stop unbounded `sendChatAction` retry loops that can trigger Telegram abuse enforcement and bot deletion. (#27415) Thanks @widingmarcus-cyber. - Typing/Main reply pipeline: always mark dispatch idle in `agent-runner` finalization so typing cleanup runs even when dispatcher `onIdle` does not fire, preventing stuck typing indicators after run completion. (#27250) Thanks @Sid-Qin. - Typing/Run completion race: prevent post-run keepalive ticks from re-triggering typing callbacks by guarding `triggerTyping()` with `runComplete`, with regression coverage for no-restart behavior during run-complete/dispatch-idle boundaries. (#27413) Thanks @widingmarcus-cyber. - Daemon/macOS launchd: forward proxy env vars into supervised service environments, keep LaunchAgent `KeepAlive=true` semantics, and harden restart sequencing to `print -> bootout -> wait old pid exit -> bootstrap -> kickstart`. (#27276) thanks @frankekn. diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index c3a2cfcdc..d519d86df 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -116,6 +116,8 @@ export type BuildTelegramMessageContextParams = { resolveGroupActivation: ResolveGroupActivation; resolveGroupRequireMention: ResolveGroupRequireMention; resolveTelegramGroupConfig: ResolveTelegramGroupConfig; + /** Global (per-account) handler for sendChatAction 401 backoff (#27092). */ + sendChatActionHandler: import("./sendchataction-401-backoff.js").TelegramSendChatActionHandler; }; async function resolveStickerVisionSupport(params: { @@ -156,6 +158,7 @@ export const buildTelegramMessageContext = async ({ resolveGroupActivation, resolveGroupRequireMention, resolveTelegramGroupConfig, + sendChatActionHandler, }: BuildTelegramMessageContextParams) => { const msg = primaryCtx.message; const chatId = msg.chat.id; @@ -243,7 +246,12 @@ export const buildTelegramMessageContext = async ({ const sendTyping = async () => { await withTelegramApiErrorLogging({ operation: "sendChatAction", - fn: () => bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(replyThreadId)), + fn: () => + sendChatActionHandler.sendChatAction( + chatId, + "typing", + buildTypingThreadParams(replyThreadId), + ), }); }; @@ -252,7 +260,11 @@ export const buildTelegramMessageContext = async ({ await withTelegramApiErrorLogging({ operation: "sendChatAction", fn: () => - bot.api.sendChatAction(chatId, "record_voice", buildTypingThreadParams(replyThreadId)), + sendChatActionHandler.sendChatAction( + chatId, + "record_voice", + buildTypingThreadParams(replyThreadId), + ), }); } catch (err) { logVerbose(`telegram record_voice cue failed for chat ${chatId}: ${String(err)}`); diff --git a/src/telegram/bot-message.ts b/src/telegram/bot-message.ts index 6d9fa9ee4..1b598b714 100644 --- a/src/telegram/bot-message.ts +++ b/src/telegram/bot-message.ts @@ -39,6 +39,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep resolveGroupActivation, resolveGroupRequireMention, resolveTelegramGroupConfig, + sendChatActionHandler, runtime, replyToMode, streamMode, @@ -70,6 +71,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep resolveGroupActivation, resolveGroupRequireMention, resolveTelegramGroupConfig, + sendChatActionHandler, }); if (!context) { return; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 409815fa3..a501be232 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -40,6 +40,7 @@ import { resolveTelegramStreamMode, } from "./bot/helpers.js"; import { resolveTelegramFetch } from "./fetch.js"; +import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js"; export type TelegramBotOptions = { token: string; @@ -348,6 +349,20 @@ export function createTelegramBot(opts: TelegramBotOptions) { return { groupConfig, topicConfig }; }; + // Global sendChatAction handler with 401 backoff / circuit breaker (issue #27092). + // Created BEFORE the message processor so it can be injected into every message context. + // Shared across all message contexts for this account so that consecutive 401s + // from ANY chat are tracked together — prevents infinite retry storms. + const sendChatActionHandler = createTelegramSendChatActionHandler({ + sendChatActionFn: (chatId, action, threadParams) => + bot.api.sendChatAction( + chatId, + action, + threadParams as Parameters[2], + ), + logger: (message) => logVerbose(`telegram: ${message}`), + }); + const processMessage = createTelegramMessageProcessor({ bot, cfg, @@ -363,6 +378,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { resolveGroupActivation, resolveGroupRequireMention, resolveTelegramGroupConfig, + sendChatActionHandler, runtime, replyToMode, streamMode, diff --git a/src/telegram/sendchataction-401-backoff.test.ts b/src/telegram/sendchataction-401-backoff.test.ts new file mode 100644 index 000000000..4fbaaaaf9 --- /dev/null +++ b/src/telegram/sendchataction-401-backoff.test.ts @@ -0,0 +1,145 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js"; + +// Mock the backoff sleep to avoid real delays in tests +vi.mock("../infra/backoff.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + sleepWithAbort: vi.fn().mockResolvedValue(undefined), + }; +}); + +describe("createTelegramSendChatActionHandler", () => { + const make401Error = () => new Error("401 Unauthorized"); + const make500Error = () => new Error("500 Internal Server Error"); + + it("calls sendChatActionFn on success", async () => { + const fn = vi.fn().mockResolvedValue(true); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + }); + + await handler.sendChatAction(123, "typing"); + expect(fn).toHaveBeenCalledWith(123, "typing", undefined); + expect(handler.isSuspended()).toBe(false); + }); + + it("applies exponential backoff on consecutive 401 errors", async () => { + const fn = vi.fn().mockRejectedValue(make401Error()); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 5, + }); + + // First call fails with 401 + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + expect(handler.isSuspended()).toBe(false); + + // Second call should mention backoff in logs + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + expect(logger).toHaveBeenCalledWith(expect.stringContaining("backoff")); + }); + + it("suspends after maxConsecutive401 failures", async () => { + const fn = vi.fn().mockRejectedValue(make401Error()); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 3, + }); + + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + + expect(handler.isSuspended()).toBe(true); + expect(logger).toHaveBeenCalledWith(expect.stringContaining("CRITICAL")); + + // Subsequent calls are silently skipped + await handler.sendChatAction(123, "typing"); + expect(fn).toHaveBeenCalledTimes(3); // not called again + }); + + it("resets failure counter on success", async () => { + let callCount = 0; + const fn = vi.fn().mockImplementation(() => { + callCount++; + if (callCount <= 2) { + throw make401Error(); + } + return Promise.resolve(true); + }); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 5, + }); + + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + // Third call succeeds + await handler.sendChatAction(123, "typing"); + + expect(handler.isSuspended()).toBe(false); + expect(logger).toHaveBeenCalledWith(expect.stringContaining("recovered")); + }); + + it("does not count non-401 errors toward suspension", async () => { + const fn = vi.fn().mockRejectedValue(make500Error()); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 2, + }); + + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("500"); + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("500"); + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("500"); + + expect(handler.isSuspended()).toBe(false); + }); + + it("reset() clears suspension", async () => { + const fn = vi.fn().mockRejectedValue(make401Error()); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 1, + }); + + await expect(handler.sendChatAction(123, "typing")).rejects.toThrow("401"); + expect(handler.isSuspended()).toBe(true); + + handler.reset(); + expect(handler.isSuspended()).toBe(false); + }); + + it("is shared across multiple chatIds (global handler)", async () => { + const fn = vi.fn().mockRejectedValue(make401Error()); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + maxConsecutive401: 3, + }); + + // Different chatIds all contribute to the same failure counter + await expect(handler.sendChatAction(111, "typing")).rejects.toThrow("401"); + await expect(handler.sendChatAction(222, "typing")).rejects.toThrow("401"); + await expect(handler.sendChatAction(333, "typing")).rejects.toThrow("401"); + + expect(handler.isSuspended()).toBe(true); + // Suspended for all chats + await handler.sendChatAction(444, "typing"); + expect(fn).toHaveBeenCalledTimes(3); + }); +}); diff --git a/src/telegram/sendchataction-401-backoff.ts b/src/telegram/sendchataction-401-backoff.ts new file mode 100644 index 000000000..f87915961 --- /dev/null +++ b/src/telegram/sendchataction-401-backoff.ts @@ -0,0 +1,133 @@ +import { computeBackoff, sleepWithAbort, type BackoffPolicy } from "../infra/backoff.js"; + +export type TelegramSendChatActionLogger = (message: string) => void; + +type ChatAction = + | "typing" + | "upload_photo" + | "record_video" + | "upload_video" + | "record_voice" + | "upload_voice" + | "upload_document" + | "find_location" + | "record_video_note" + | "upload_video_note" + | "choose_sticker"; + +type SendChatActionFn = ( + chatId: number | string, + action: ChatAction, + threadParams?: unknown, +) => Promise; + +export type TelegramSendChatActionHandler = { + /** + * Send a chat action with automatic 401 backoff and circuit breaker. + * Safe to call from multiple concurrent message contexts. + */ + sendChatAction: ( + chatId: number | string, + action: ChatAction, + threadParams?: unknown, + ) => Promise; + isSuspended: () => boolean; + reset: () => void; +}; + +export type CreateTelegramSendChatActionHandlerParams = { + sendChatActionFn: SendChatActionFn; + logger: TelegramSendChatActionLogger; + maxConsecutive401?: number; +}; + +const BACKOFF_POLICY: BackoffPolicy = { + initialMs: 1000, + maxMs: 300_000, // 5 minutes + factor: 2, + jitter: 0.1, +}; + +function is401Error(error: unknown): boolean { + if (!error) { + return false; + } + const message = error instanceof Error ? error.message : JSON.stringify(error); + return message.includes("401") || message.toLowerCase().includes("unauthorized"); +} + +/** + * Creates a GLOBAL (per-account) handler for sendChatAction that tracks 401 errors + * across all message contexts. This prevents the infinite loop that caused Telegram + * to delete bots (issue #27092). + * + * When a 401 occurs, exponential backoff is applied (1s → 2s → 4s → ... → 5min). + * After maxConsecutive401 failures (default 10), all sendChatAction calls are + * suspended until reset() is called. + */ +export function createTelegramSendChatActionHandler({ + sendChatActionFn, + logger, + maxConsecutive401 = 10, +}: CreateTelegramSendChatActionHandlerParams): TelegramSendChatActionHandler { + let consecutive401Failures = 0; + let suspended = false; + + const reset = () => { + consecutive401Failures = 0; + suspended = false; + }; + + const sendChatAction = async ( + chatId: number | string, + action: ChatAction, + threadParams?: unknown, + ): Promise => { + if (suspended) { + return; + } + + if (consecutive401Failures > 0) { + const backoffMs = computeBackoff(BACKOFF_POLICY, consecutive401Failures); + logger( + `sendChatAction backoff: waiting ${backoffMs}ms before retry ` + + `(failure ${consecutive401Failures}/${maxConsecutive401})`, + ); + await sleepWithAbort(backoffMs); + } + + try { + await sendChatActionFn(chatId, action, threadParams); + // Success: reset failure counter + if (consecutive401Failures > 0) { + logger(`sendChatAction recovered after ${consecutive401Failures} consecutive 401 failures`); + consecutive401Failures = 0; + } + } catch (error) { + if (is401Error(error)) { + consecutive401Failures++; + + if (consecutive401Failures >= maxConsecutive401) { + suspended = true; + logger( + `CRITICAL: sendChatAction suspended after ${consecutive401Failures} consecutive 401 errors. ` + + `Bot token is likely invalid. Telegram may DELETE the bot if requests continue. ` + + `Replace the token and restart: openclaw channels restart telegram`, + ); + } else { + logger( + `sendChatAction 401 error (${consecutive401Failures}/${maxConsecutive401}). ` + + `Retrying with exponential backoff.`, + ); + } + } + throw error; + } + }; + + return { + sendChatAction, + isSuspended: () => suspended, + reset, + }; +}