fix: dedupe inbound Telegram DM replies per agent (#40519)
Merged via squash. Prepared head SHA: 6e235e7d1f7a00ef43455a9240b62e24dbc4ef94 Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus
This commit is contained in:
@@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Docs/Changelog: correct the contributor credit for the bundled Control UI global-install fix to @LarytheLord. (#40420) Thanks @velvet-shark.
|
- Docs/Changelog: correct the contributor credit for the bundled Control UI global-install fix to @LarytheLord. (#40420) Thanks @velvet-shark.
|
||||||
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
|
- Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii.
|
||||||
- Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung.
|
- Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung.
|
||||||
|
- Telegram/DM routing: dedupe inbound Telegram DMs per agent instead of per session key so the same DM cannot trigger duplicate replies when both `agent:main:main` and `agent:main:telegram:direct:<id>` resolve for one agent. Fixes #40005. Supersedes #40116. (#40519) thanks @obviyus.
|
||||||
|
|
||||||
## 2026.3.7
|
## 2026.3.7
|
||||||
|
|
||||||
|
|||||||
@@ -236,7 +236,7 @@ describe("inbound dedupe", () => {
|
|||||||
).toBe(false);
|
).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not dedupe across session keys", () => {
|
it("does not dedupe across agent ids", () => {
|
||||||
resetInboundDedupe();
|
resetInboundDedupe();
|
||||||
const base: MsgContext = {
|
const base: MsgContext = {
|
||||||
Provider: "whatsapp",
|
Provider: "whatsapp",
|
||||||
@@ -248,12 +248,36 @@ describe("inbound dedupe", () => {
|
|||||||
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 100 }),
|
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 100 }),
|
||||||
).toBe(false);
|
).toBe(false);
|
||||||
expect(
|
expect(
|
||||||
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:bravo:main" }, { now: 200 }),
|
shouldSkipDuplicateInbound(
|
||||||
|
{ ...base, SessionKey: "agent:bravo:whatsapp:direct:+1555" },
|
||||||
|
{
|
||||||
|
now: 200,
|
||||||
|
},
|
||||||
|
),
|
||||||
).toBe(false);
|
).toBe(false);
|
||||||
expect(
|
expect(
|
||||||
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 300 }),
|
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:alpha:main" }, { now: 300 }),
|
||||||
).toBe(true);
|
).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("dedupes when the same agent sees the same inbound message under different session keys", () => {
|
||||||
|
resetInboundDedupe();
|
||||||
|
const base: MsgContext = {
|
||||||
|
Provider: "telegram",
|
||||||
|
OriginatingChannel: "telegram",
|
||||||
|
OriginatingTo: "telegram:7463849194",
|
||||||
|
MessageSid: "msg-1",
|
||||||
|
};
|
||||||
|
expect(
|
||||||
|
shouldSkipDuplicateInbound({ ...base, SessionKey: "agent:main:main" }, { now: 100 }),
|
||||||
|
).toBe(false);
|
||||||
|
expect(
|
||||||
|
shouldSkipDuplicateInbound(
|
||||||
|
{ ...base, SessionKey: "agent:main:telegram:direct:7463849194" },
|
||||||
|
{ now: 200 },
|
||||||
|
),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("createInboundDebouncer", () => {
|
describe("createInboundDebouncer", () => {
|
||||||
|
|||||||
@@ -1539,6 +1539,38 @@ describe("dispatchReplyFromConfig", () => {
|
|||||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("deduplicates same-agent inbound replies across main and direct session keys", async () => {
|
||||||
|
setNoAbort();
|
||||||
|
const cfg = emptyConfig;
|
||||||
|
const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload);
|
||||||
|
const baseCtx = buildTestCtx({
|
||||||
|
Provider: "telegram",
|
||||||
|
Surface: "telegram",
|
||||||
|
OriginatingChannel: "telegram",
|
||||||
|
OriginatingTo: "telegram:7463849194",
|
||||||
|
MessageSid: "msg-1",
|
||||||
|
SessionKey: "agent:main:main",
|
||||||
|
});
|
||||||
|
|
||||||
|
await dispatchReplyFromConfig({
|
||||||
|
ctx: baseCtx,
|
||||||
|
cfg,
|
||||||
|
dispatcher: createDispatcher(),
|
||||||
|
replyResolver,
|
||||||
|
});
|
||||||
|
await dispatchReplyFromConfig({
|
||||||
|
ctx: {
|
||||||
|
...baseCtx,
|
||||||
|
SessionKey: "agent:main:telegram:direct:7463849194",
|
||||||
|
},
|
||||||
|
cfg,
|
||||||
|
dispatcher: createDispatcher(),
|
||||||
|
replyResolver,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
it("emits message_received hook with originating channel metadata", async () => {
|
it("emits message_received hook with originating channel metadata", async () => {
|
||||||
setNoAbort();
|
setNoAbort();
|
||||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { logVerbose, shouldLogVerbose } from "../../globals.js";
|
import { logVerbose, shouldLogVerbose } from "../../globals.js";
|
||||||
import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
|
import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
|
||||||
|
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
|
||||||
import type { MsgContext } from "../templating.js";
|
import type { MsgContext } from "../templating.js";
|
||||||
|
|
||||||
const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000;
|
const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000;
|
||||||
@@ -15,6 +16,23 @@ const normalizeProvider = (value?: string | null) => value?.trim().toLowerCase()
|
|||||||
const resolveInboundPeerId = (ctx: MsgContext) =>
|
const resolveInboundPeerId = (ctx: MsgContext) =>
|
||||||
ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey;
|
ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey;
|
||||||
|
|
||||||
|
function resolveInboundDedupeSessionScope(ctx: MsgContext): string {
|
||||||
|
const sessionKey =
|
||||||
|
(ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey : undefined)?.trim() ||
|
||||||
|
ctx.SessionKey?.trim() ||
|
||||||
|
"";
|
||||||
|
if (!sessionKey) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
const parsed = parseAgentSessionKey(sessionKey);
|
||||||
|
if (!parsed) {
|
||||||
|
return sessionKey;
|
||||||
|
}
|
||||||
|
// The same physical inbound message should never run twice for the same
|
||||||
|
// agent, even if a routing bug presents it under both main and direct keys.
|
||||||
|
return `agent:${parsed.agentId}`;
|
||||||
|
}
|
||||||
|
|
||||||
export function buildInboundDedupeKey(ctx: MsgContext): string | null {
|
export function buildInboundDedupeKey(ctx: MsgContext): string | null {
|
||||||
const provider = normalizeProvider(ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface);
|
const provider = normalizeProvider(ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface);
|
||||||
const messageId = ctx.MessageSid?.trim();
|
const messageId = ctx.MessageSid?.trim();
|
||||||
@@ -25,13 +43,13 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null {
|
|||||||
if (!peerId) {
|
if (!peerId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
const sessionKey = ctx.SessionKey?.trim() ?? "";
|
const sessionScope = resolveInboundDedupeSessionScope(ctx);
|
||||||
const accountId = ctx.AccountId?.trim() ?? "";
|
const accountId = ctx.AccountId?.trim() ?? "";
|
||||||
const threadId =
|
const threadId =
|
||||||
ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null
|
ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null
|
||||||
? String(ctx.MessageThreadId)
|
? String(ctx.MessageThreadId)
|
||||||
: "";
|
: "";
|
||||||
return [provider, accountId, sessionKey, peerId, threadId, messageId].filter(Boolean).join("|");
|
return [provider, accountId, sessionScope, peerId, threadId, messageId].filter(Boolean).join("|");
|
||||||
}
|
}
|
||||||
|
|
||||||
export function shouldSkipDuplicateInbound(
|
export function shouldSkipDuplicateInbound(
|
||||||
|
|||||||
Reference in New Issue
Block a user