fix(telegram): unify inbound handling for message-like updates (#20591)
Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 442a100071dc908716f9f7b00ce3f58c1dbc6588 Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus
This commit is contained in:
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Telegram: unify message-like inbound handling so `message` and `channel_post` share the same dedupe/access/media pipeline and remain behaviorally consistent. (#20591) Thanks @obviyus.
|
||||
- Telegram/Agents: gate exec/bash tool-failure warnings behind verbose mode so default Telegram replies stay clean while verbose sessions still surface diagnostics. (#20560) Thanks @obviyus.
|
||||
- Gateway/Daemon: forward `TMPDIR` into installed service environments so macOS LaunchAgent gateway runs can open SQLite temp/journal files reliably instead of failing with `SQLITE_CANTOPEN`. (#20512) Thanks @Clawborn.
|
||||
- Agents/Billing: include the active model that produced a billing error in user-facing billing messages (for example, `OpenAI (gpt-5.3)`) across payload, failover, and lifecycle error paths, so users can identify exactly which key needs credits. (#20510) Thanks @echoVic.
|
||||
|
||||
@@ -22,14 +22,17 @@ import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
||||
import {
|
||||
firstDefined,
|
||||
isSenderAllowed,
|
||||
normalizeAllowFromWithStore,
|
||||
type NormalizedAllowFrom,
|
||||
} from "./bot-access.js";
|
||||
import type { TelegramMediaRef } from "./bot-message-context.js";
|
||||
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
|
||||
import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js";
|
||||
import {
|
||||
MEDIA_GROUP_TIMEOUT_MS,
|
||||
type MediaGroupEntry,
|
||||
type TelegramUpdateKeyContext,
|
||||
} from "./bot-updates.js";
|
||||
import { resolveMedia } from "./bot/delivery.js";
|
||||
import {
|
||||
buildTelegramGroupPeerId,
|
||||
@@ -1035,25 +1038,33 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
});
|
||||
|
||||
bot.on("message", async (ctx) => {
|
||||
type InboundTelegramEvent = {
|
||||
ctxForDedupe: TelegramUpdateKeyContext;
|
||||
ctx: TelegramContext;
|
||||
msg: Message;
|
||||
chatId: number;
|
||||
isGroup: boolean;
|
||||
isForum: boolean;
|
||||
messageThreadId?: number;
|
||||
senderId: string;
|
||||
senderUsername: string;
|
||||
requireConfiguredGroup: boolean;
|
||||
sendOversizeWarning: boolean;
|
||||
oversizeLogMessage: string;
|
||||
errorMessage: string;
|
||||
};
|
||||
|
||||
const handleInboundMessageLike = async (event: InboundTelegramEvent) => {
|
||||
try {
|
||||
const msg = ctx.message;
|
||||
if (!msg) {
|
||||
return;
|
||||
}
|
||||
if (shouldSkipUpdate(ctx)) {
|
||||
if (shouldSkipUpdate(event.ctxForDedupe)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = msg.chat.id;
|
||||
const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup";
|
||||
const messageThreadId = msg.message_thread_id;
|
||||
const isForum = msg.chat.is_forum === true;
|
||||
const groupAllowContext = await resolveTelegramGroupAllowFromContext({
|
||||
chatId,
|
||||
chatId: event.chatId,
|
||||
accountId,
|
||||
isForum,
|
||||
messageThreadId,
|
||||
isForum: event.isForum,
|
||||
messageThreadId: event.messageThreadId,
|
||||
groupAllowFrom,
|
||||
resolveTelegramGroupConfig,
|
||||
});
|
||||
@@ -1066,16 +1077,19 @@ export const registerTelegramHandlers = ({
|
||||
hasGroupAllowOverride,
|
||||
} = groupAllowContext;
|
||||
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "";
|
||||
const senderUsername = msg.from?.username ?? "";
|
||||
if (event.requireConfiguredGroup && (!groupConfig || groupConfig.enabled === false)) {
|
||||
logVerbose(`Blocked telegram channel ${event.chatId} (channel disabled)`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
shouldSkipGroupMessage({
|
||||
isGroup,
|
||||
chatId,
|
||||
chatTitle: msg.chat.title,
|
||||
isGroup: event.isGroup,
|
||||
chatId: event.chatId,
|
||||
chatTitle: event.msg.chat.title,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
senderUsername,
|
||||
senderId: event.senderId,
|
||||
senderUsername: event.senderUsername,
|
||||
effectiveGroupAllow,
|
||||
hasGroupAllowOverride,
|
||||
groupConfig,
|
||||
@@ -1086,155 +1100,91 @@ export const registerTelegramHandlers = ({
|
||||
}
|
||||
|
||||
await processInboundMessage({
|
||||
ctx,
|
||||
msg,
|
||||
chatId,
|
||||
ctx: event.ctx,
|
||||
msg: event.msg,
|
||||
chatId: event.chatId,
|
||||
resolvedThreadId,
|
||||
storeAllowFrom,
|
||||
sendOversizeWarning: true,
|
||||
oversizeLogMessage: "media exceeds size limit",
|
||||
sendOversizeWarning: event.sendOversizeWarning,
|
||||
oversizeLogMessage: event.oversizeLogMessage,
|
||||
});
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`handler failed: ${String(err)}`));
|
||||
runtime.error?.(danger(`${event.errorMessage}: ${String(err)}`));
|
||||
}
|
||||
};
|
||||
|
||||
bot.on("message", async (ctx) => {
|
||||
const msg = ctx.message;
|
||||
if (!msg) {
|
||||
return;
|
||||
}
|
||||
await handleInboundMessageLike({
|
||||
ctxForDedupe: ctx,
|
||||
ctx: buildSyntheticContext(ctx, msg),
|
||||
msg,
|
||||
chatId: msg.chat.id,
|
||||
isGroup: msg.chat.type === "group" || msg.chat.type === "supergroup",
|
||||
isForum: msg.chat.is_forum === true,
|
||||
messageThreadId: msg.message_thread_id,
|
||||
senderId: msg.from?.id != null ? String(msg.from.id) : "",
|
||||
senderUsername: msg.from?.username ?? "",
|
||||
requireConfiguredGroup: false,
|
||||
sendOversizeWarning: true,
|
||||
oversizeLogMessage: "media exceeds size limit",
|
||||
errorMessage: "handler failed",
|
||||
});
|
||||
});
|
||||
|
||||
// Handle channel posts — enables bot-to-bot communication via Telegram channels.
|
||||
// Telegram bots cannot see other bot messages in groups, but CAN in channels.
|
||||
// This handler normalizes channel_post updates into the standard message pipeline.
|
||||
bot.on("channel_post", async (ctx) => {
|
||||
try {
|
||||
const post = ctx.channelPost;
|
||||
if (!post) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Deduplication check — same as the regular message handler
|
||||
if (shouldSkipUpdate(ctx)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = post.chat.id;
|
||||
|
||||
// Use the full group allow-from context for access control (same as message handler)
|
||||
const groupAllowContext = await resolveTelegramGroupAllowFromContext({
|
||||
chatId,
|
||||
accountId,
|
||||
isForum: false,
|
||||
messageThreadId: undefined,
|
||||
groupAllowFrom,
|
||||
resolveTelegramGroupConfig,
|
||||
});
|
||||
const { storeAllowFrom, groupConfig, effectiveGroupAllow, hasGroupAllowOverride } =
|
||||
groupAllowContext;
|
||||
|
||||
// Check group allowlist (channels use the same groups config)
|
||||
const groupAllowlist = resolveGroupPolicy(chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!groupConfig || groupConfig.enabled === false) {
|
||||
logVerbose(`Blocked telegram channel ${chatId} (channel disabled)`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Group policy filtering (same as message handler)
|
||||
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
|
||||
const groupPolicy = firstDefined(
|
||||
groupConfig?.groupPolicy,
|
||||
telegramCfg.groupPolicy,
|
||||
defaultGroupPolicy,
|
||||
"open",
|
||||
);
|
||||
if (groupPolicy === "disabled") {
|
||||
logVerbose(`Blocked telegram channel message (groupPolicy: disabled)`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasGroupAllowOverride) {
|
||||
const senderId = post.sender_chat?.id ?? post.from?.id;
|
||||
const senderUsername = post.sender_chat?.username ?? post.from?.username ?? "";
|
||||
const allowed =
|
||||
senderId != null &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(
|
||||
`Blocked telegram channel sender ${senderId ?? "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (groupPolicy === "allowlist") {
|
||||
const senderId = post.sender_chat?.id ?? post.from?.id;
|
||||
if (senderId == null) {
|
||||
logVerbose(`Blocked telegram channel message (no sender ID, groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
if (!effectiveGroupAllow.hasEntries) {
|
||||
logVerbose(
|
||||
"Blocked telegram channel message (groupPolicy: allowlist, no allowlist entries)",
|
||||
);
|
||||
return;
|
||||
}
|
||||
const senderUsername = post.sender_chat?.username ?? post.from?.username ?? "";
|
||||
if (
|
||||
!isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
})
|
||||
) {
|
||||
logVerbose(`Blocked telegram channel message from ${senderId} (groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Build a synthetic `from` field since channel posts may not have one.
|
||||
// Use sender_chat (the bot/user that posted) if available.
|
||||
const syntheticFrom = post.sender_chat
|
||||
? {
|
||||
id: post.sender_chat.id,
|
||||
is_bot: true as const,
|
||||
first_name: post.sender_chat.title || "Channel",
|
||||
username: post.sender_chat.username,
|
||||
}
|
||||
: {
|
||||
id: chatId,
|
||||
is_bot: true as const,
|
||||
first_name: post.chat.title || "Channel",
|
||||
username: post.chat.username,
|
||||
};
|
||||
|
||||
const syntheticMsg: Message = {
|
||||
...post,
|
||||
from: post.from ?? syntheticFrom,
|
||||
chat: {
|
||||
...post.chat,
|
||||
type: "supergroup" as const,
|
||||
},
|
||||
} as Message;
|
||||
|
||||
const syntheticCtx = Object.create(ctx, {
|
||||
message: { value: syntheticMsg, writable: true, enumerable: true },
|
||||
});
|
||||
|
||||
await processInboundMessage({
|
||||
ctx: syntheticCtx as TelegramContext,
|
||||
msg: syntheticMsg,
|
||||
chatId,
|
||||
resolvedThreadId: undefined,
|
||||
storeAllowFrom,
|
||||
sendOversizeWarning: false,
|
||||
oversizeLogMessage: "channel post media exceeds size limit",
|
||||
});
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`channel_post handler failed: ${String(err)}`));
|
||||
const post = ctx.channelPost;
|
||||
if (!post) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = post.chat.id;
|
||||
const syntheticFrom = post.sender_chat
|
||||
? {
|
||||
id: post.sender_chat.id,
|
||||
is_bot: true as const,
|
||||
first_name: post.sender_chat.title || "Channel",
|
||||
username: post.sender_chat.username,
|
||||
}
|
||||
: {
|
||||
id: chatId,
|
||||
is_bot: true as const,
|
||||
first_name: post.chat.title || "Channel",
|
||||
username: post.chat.username,
|
||||
};
|
||||
const syntheticMsg: Message = {
|
||||
...post,
|
||||
from: post.from ?? syntheticFrom,
|
||||
chat: {
|
||||
...post.chat,
|
||||
type: "supergroup" as const,
|
||||
},
|
||||
} as Message;
|
||||
|
||||
await handleInboundMessageLike({
|
||||
ctxForDedupe: ctx,
|
||||
ctx: buildSyntheticContext(ctx, syntheticMsg),
|
||||
msg: syntheticMsg,
|
||||
chatId,
|
||||
isGroup: true,
|
||||
isForum: false,
|
||||
senderId:
|
||||
post.sender_chat?.id != null
|
||||
? String(post.sender_chat.id)
|
||||
: post.from?.id != null
|
||||
? String(post.from.id)
|
||||
: "",
|
||||
senderUsername: post.sender_chat?.username ?? post.from?.username ?? "",
|
||||
requireConfiguredGroup: true,
|
||||
sendOversizeWarning: false,
|
||||
oversizeLogMessage: "channel post media exceeds size limit",
|
||||
errorMessage: "channel_post handler failed",
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
@@ -19,9 +19,13 @@ export type TelegramUpdateKeyContext = {
|
||||
update_id?: number;
|
||||
message?: Message;
|
||||
edited_message?: Message;
|
||||
channel_post?: Message;
|
||||
edited_channel_post?: Message;
|
||||
};
|
||||
update_id?: number;
|
||||
message?: Message;
|
||||
channelPost?: Message;
|
||||
editedChannelPost?: Message;
|
||||
callbackQuery?: { id?: string; message?: Message };
|
||||
};
|
||||
|
||||
@@ -38,7 +42,14 @@ export const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => {
|
||||
return `callback:${callbackId}`;
|
||||
}
|
||||
const msg =
|
||||
ctx.message ?? ctx.update?.message ?? ctx.update?.edited_message ?? ctx.callbackQuery?.message;
|
||||
ctx.message ??
|
||||
ctx.channelPost ??
|
||||
ctx.editedChannelPost ??
|
||||
ctx.update?.message ??
|
||||
ctx.update?.edited_message ??
|
||||
ctx.update?.channel_post ??
|
||||
ctx.update?.edited_channel_post ??
|
||||
ctx.callbackQuery?.message;
|
||||
const chatId = msg?.chat?.id;
|
||||
const messageId = msg?.message_id;
|
||||
if (typeof chatId !== "undefined" && typeof messageId === "number") {
|
||||
|
||||
@@ -151,6 +151,18 @@ describe("createTelegramBot", () => {
|
||||
update: { message: mockMessage({ chat: mockChat({ id: 555 }) }) },
|
||||
}),
|
||||
).toBe("telegram:555");
|
||||
expect(
|
||||
getTelegramSequentialKey({
|
||||
channelPost: mockMessage({ chat: mockChat({ id: -100777111222, type: "channel" }) }),
|
||||
}),
|
||||
).toBe("telegram:-100777111222");
|
||||
expect(
|
||||
getTelegramSequentialKey({
|
||||
update: {
|
||||
channel_post: mockMessage({ chat: mockChat({ id: -100777111223, type: "channel" }) }),
|
||||
},
|
||||
}),
|
||||
).toBe("telegram:-100777111223");
|
||||
expect(
|
||||
getTelegramSequentialKey({
|
||||
message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }),
|
||||
@@ -1992,6 +2004,44 @@ describe("createTelegramBot", () => {
|
||||
await handler(ctx);
|
||||
await handler(ctx);
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
it("dedupes duplicate channel_post updates by chat/message key", async () => {
|
||||
onSpy.mockReset();
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: {
|
||||
groupPolicy: "open",
|
||||
groups: {
|
||||
"-100777111222": {
|
||||
enabled: true,
|
||||
requireMention: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const handler = getOnHandler("channel_post") as (ctx: Record<string, unknown>) => Promise<void>;
|
||||
|
||||
const ctx = {
|
||||
channelPost: {
|
||||
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
|
||||
from: { id: 98765, is_bot: true, first_name: "wakebot", username: "wake_bot" },
|
||||
message_id: 777,
|
||||
text: "wake check",
|
||||
date: 1736380800,
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
};
|
||||
|
||||
await handler(ctx);
|
||||
await handler(ctx);
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -66,9 +66,13 @@ export function getTelegramSequentialKey(ctx: {
|
||||
chat?: { id?: number };
|
||||
me?: UserFromGetMe;
|
||||
message?: Message;
|
||||
channelPost?: Message;
|
||||
editedChannelPost?: Message;
|
||||
update?: {
|
||||
message?: Message;
|
||||
edited_message?: Message;
|
||||
channel_post?: Message;
|
||||
edited_channel_post?: Message;
|
||||
callback_query?: { message?: Message };
|
||||
message_reaction?: { chat?: { id?: number } };
|
||||
};
|
||||
@@ -80,8 +84,12 @@ export function getTelegramSequentialKey(ctx: {
|
||||
}
|
||||
const msg =
|
||||
ctx.message ??
|
||||
ctx.channelPost ??
|
||||
ctx.editedChannelPost ??
|
||||
ctx.update?.message ??
|
||||
ctx.update?.edited_message ??
|
||||
ctx.update?.channel_post ??
|
||||
ctx.update?.edited_channel_post ??
|
||||
ctx.update?.callback_query?.message;
|
||||
const chatId = msg?.chat?.id ?? ctx.chat?.id;
|
||||
const rawText = msg?.text ?? msg?.caption;
|
||||
|
||||
Reference in New Issue
Block a user