feat(feishu): add broadcast support for multi-agent groups (#29575)
* feat(feishu): add broadcast support for multi-agent group observation When multiple agents share a Feishu group chat, only the @mentioned agent receives the message. This prevents observer agents from building session memory of group activity they weren't directly addressed in. Adds broadcast support (reusing the same cfg.broadcast schema as WhatsApp) so all configured agents receive every group message in their session transcripts. Only the @mentioned agent responds on Feishu; observer agents process silently via no-op dispatchers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): guard sequential broadcast dispatch against single-agent failure Wrap each dispatchForAgent() call in the sequential loop with try/catch so one agent's dispatch failure doesn't abort delivery to remaining agents. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): avoid duplicate messages in broadcast observer mode and normalize agent IDs - Skip recordPendingHistoryEntryIfEnabled for broadcast groups when not mentioned, since the message is dispatched directly to all agents. Previously the message appeared twice in the agent prompt. - Normalize agent IDs with toLowerCase() before membership checks so config casing mismatches don't silently skip valid agents. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): set WasMentioned per-agent and normalize broadcast IDs - buildCtxPayloadForAgent now takes a wasMentioned parameter so active agents get WasMentioned=true and observers get false (P1 fix) - Normalize broadcastAgents to lowercase at resolution time and lowercase activeAgentId so all comparisons and session key generation use canonical IDs regardless of config casing (P2 fix) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): canonicalize broadcast agent IDs with normalizeAgentId * fix(feishu): match ReplyDispatcher sync return types for noop dispatcher The upstream ReplyDispatcher changed sendToolResult/sendBlockReply/ sendFinalReply to synchronous (returning boolean). Update the broadcast observer noop dispatcher to match. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): deduplicate broadcast agent IDs after normalization Config entries like "Main" and "main" collapse to the same canonical ID after normalizeAgentId but were dispatched multiple times. Use Set to deduplicate after normalization. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): honor requireMention=false when selecting broadcast responder When requireMention is false, the routed agent should be active (reply on Feishu) even without an explicit @mention. Previously activeAgentId was null whenever ctx.mentionedBot was false, so all agents got the noop dispatcher and no reply was sent — silently breaking groups that disabled mention gating. Hoist requireMention out of the if(isGroup) block so it's accessible in the dispatch code. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): cross-account broadcast dedup to prevent duplicate dispatches In multi-account Feishu setups, the same message event is delivered to every bot account in a group. Without cross-account dedup, each account independently dispatches broadcast agents, causing 2×N dispatches instead of N (where N = number of broadcast agents). Two changes: 1. requireMention=true + bot not mentioned: return early instead of falling through to broadcast. The mentioned bot's handler will dispatch for all agents. Non-mentioned handlers record to history. 2. Add cross-account broadcast dedup using a shared 'broadcast' namespace (tryRecordMessagePersistent). The first handler to reach the broadcast block claims the message; subsequent accounts skip. This handles the requireMention=false multi-account case. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): strip CommandAuthorized from broadcast observer contexts Broadcast observer agents inherited CommandAuthorized from the sender, causing slash commands (e.g. /reset) to silently execute on every observer session. Now only the active agent retains CommandAuthorized; observers have it stripped before dispatch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): use actual mention state for broadcast WasMentioned The active broadcast agent's WasMentioned was set to true whenever requireMention=false, even when the bot was not actually @mentioned. Now uses ctx.mentionedBot && agentId === activeAgentId, consistent with the single-agent path which passes ctx.mentionedBot directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(feishu): skip history buffer for broadcast accounts and log parallel failures 1. In requireMention groups with broadcast, non-mentioned accounts no longer buffer pending history — the mentioned handler's broadcast dispatch already writes turns into all agent sessions. Buffering caused duplicate replay via buildPendingHistoryContextFromMap. 2. Parallel broadcast dispatch now inspects Promise.allSettled results and logs rejected entries, matching the sequential path's per-agent error logging. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Changelog: note Feishu multi-agent broadcast dispatch * Changelog: restore author credit for Feishu broadcast entry --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import {
|
||||
createScopedPairingAccess,
|
||||
DEFAULT_GROUP_HISTORY_LIMIT,
|
||||
type HistoryEntry,
|
||||
normalizeAgentId,
|
||||
recordPendingHistoryEntryIfEnabled,
|
||||
resolveOpenProviderRuntimeGroupPolicy,
|
||||
resolveDefaultGroupPolicy,
|
||||
@@ -698,6 +699,31 @@ async function resolveFeishuMediaList(params: {
|
||||
return out;
|
||||
}
|
||||
|
||||
// --- Broadcast support ---
|
||||
// Resolve broadcast agent list for a given peer (group) ID.
|
||||
// Returns null if no broadcast config exists or the peer is not in the broadcast list.
|
||||
export function resolveBroadcastAgents(cfg: ClawdbotConfig, peerId: string): string[] | null {
|
||||
const broadcast = (cfg as Record<string, unknown>).broadcast;
|
||||
if (!broadcast || typeof broadcast !== "object") return null;
|
||||
const agents = (broadcast as Record<string, unknown>)[peerId];
|
||||
if (!Array.isArray(agents) || agents.length === 0) return null;
|
||||
return agents as string[];
|
||||
}
|
||||
|
||||
// Build a session key for a broadcast target agent by replacing the agent ID prefix.
|
||||
// Session keys follow the format: agent:<agentId>:<channel>:<peerKind>:<peerId>
|
||||
export function buildBroadcastSessionKey(
|
||||
baseSessionKey: string,
|
||||
originalAgentId: string,
|
||||
targetAgentId: string,
|
||||
): string {
|
||||
const prefix = `agent:${originalAgentId}:`;
|
||||
if (baseSessionKey.startsWith(prefix)) {
|
||||
return `agent:${targetAgentId}:${baseSessionKey.slice(prefix.length)}`;
|
||||
}
|
||||
return baseSessionKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build media payload for inbound context.
|
||||
* Similar to Discord's buildDiscordMediaPayload().
|
||||
@@ -901,7 +927,12 @@ export async function handleFeishuMessage(params: {
|
||||
const dmPolicy = feishuCfg?.dmPolicy ?? "pairing";
|
||||
const configAllowFrom = feishuCfg?.allowFrom ?? [];
|
||||
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
|
||||
const rawBroadcastAgents = isGroup ? resolveBroadcastAgents(cfg, ctx.chatId) : null;
|
||||
const broadcastAgents = rawBroadcastAgents
|
||||
? [...new Set(rawBroadcastAgents.map((id) => normalizeAgentId(id)))]
|
||||
: null;
|
||||
|
||||
let requireMention = false; // DMs never require mention; groups may override below
|
||||
if (isGroup) {
|
||||
if (groupConfig?.enabled === false) {
|
||||
log(`feishu[${account.accountId}]: group ${ctx.chatId} is disabled`);
|
||||
@@ -956,17 +987,19 @@ export async function handleFeishuMessage(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const { requireMention } = resolveFeishuReplyPolicy({
|
||||
({ requireMention } = resolveFeishuReplyPolicy({
|
||||
isDirectMessage: false,
|
||||
globalConfig: feishuCfg,
|
||||
groupConfig,
|
||||
});
|
||||
}));
|
||||
|
||||
if (requireMention && !ctx.mentionedBot) {
|
||||
log(
|
||||
`feishu[${account.accountId}]: message in group ${ctx.chatId} did not mention bot, recording to history`,
|
||||
);
|
||||
if (chatHistories && groupHistoryKey) {
|
||||
log(`feishu[${account.accountId}]: message in group ${ctx.chatId} did not mention bot`);
|
||||
// Record to pending history for non-broadcast groups only. For broadcast groups,
|
||||
// the mentioned handler's broadcast dispatch writes the turn directly into all
|
||||
// agent sessions — buffering here would cause duplicate replay when this account
|
||||
// later becomes active via buildPendingHistoryContextFromMap.
|
||||
if (!broadcastAgents && chatHistories && groupHistoryKey) {
|
||||
recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
historyKey: groupHistoryKey,
|
||||
@@ -1208,82 +1241,230 @@ export async function handleFeishuMessage(params: {
|
||||
}))
|
||||
: undefined;
|
||||
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
BodyForAgent: messageBody,
|
||||
InboundHistory: inboundHistory,
|
||||
// Quote/reply message support: use standard ReplyToId for parent,
|
||||
// and pass root_id for thread reconstruction.
|
||||
ReplyToId: ctx.parentId,
|
||||
RootMessageId: ctx.rootId,
|
||||
RawBody: ctx.content,
|
||||
CommandBody: ctx.content,
|
||||
From: feishuFrom,
|
||||
To: feishuTo,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: isGroup ? "group" : "direct",
|
||||
GroupSubject: isGroup ? ctx.chatId : undefined,
|
||||
SenderName: ctx.senderName ?? ctx.senderOpenId,
|
||||
SenderId: ctx.senderOpenId,
|
||||
Provider: "feishu" as const,
|
||||
Surface: "feishu" as const,
|
||||
MessageSid: ctx.messageId,
|
||||
ReplyToBody: quotedContent ?? undefined,
|
||||
Timestamp: Date.now(),
|
||||
WasMentioned: ctx.mentionedBot,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
OriginatingChannel: "feishu" as const,
|
||||
OriginatingTo: feishuTo,
|
||||
...mediaPayload,
|
||||
});
|
||||
// --- Shared context builder for dispatch ---
|
||||
const buildCtxPayloadForAgent = (
|
||||
agentSessionKey: string,
|
||||
agentAccountId: string,
|
||||
wasMentioned: boolean,
|
||||
) =>
|
||||
core.channel.reply.finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
BodyForAgent: messageBody,
|
||||
InboundHistory: inboundHistory,
|
||||
ReplyToId: ctx.parentId,
|
||||
RootMessageId: ctx.rootId,
|
||||
RawBody: ctx.content,
|
||||
CommandBody: ctx.content,
|
||||
From: feishuFrom,
|
||||
To: feishuTo,
|
||||
SessionKey: agentSessionKey,
|
||||
AccountId: agentAccountId,
|
||||
ChatType: isGroup ? "group" : "direct",
|
||||
GroupSubject: isGroup ? ctx.chatId : undefined,
|
||||
SenderName: ctx.senderName ?? ctx.senderOpenId,
|
||||
SenderId: ctx.senderOpenId,
|
||||
Provider: "feishu" as const,
|
||||
Surface: "feishu" as const,
|
||||
MessageSid: ctx.messageId,
|
||||
ReplyToBody: quotedContent ?? undefined,
|
||||
Timestamp: Date.now(),
|
||||
WasMentioned: wasMentioned,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
OriginatingChannel: "feishu" as const,
|
||||
OriginatingTo: feishuTo,
|
||||
...mediaPayload,
|
||||
});
|
||||
|
||||
// Parse message create_time (Feishu uses millisecond epoch string).
|
||||
const messageCreateTimeMs = event.message.create_time
|
||||
? parseInt(event.message.create_time, 10)
|
||||
: undefined;
|
||||
const replyTargetMessageId = ctx.rootId ?? ctx.messageId;
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createFeishuReplyDispatcher({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
runtime: runtime as RuntimeEnv,
|
||||
chatId: ctx.chatId,
|
||||
replyToMessageId: replyTargetMessageId,
|
||||
skipReplyToInMessages: !isGroup,
|
||||
replyInThread,
|
||||
rootId: ctx.rootId,
|
||||
threadReply: isGroup ? (groupSession?.threadReply ?? false) : false,
|
||||
mentionTargets: ctx.mentionTargets,
|
||||
accountId: account.accountId,
|
||||
messageCreateTimeMs,
|
||||
});
|
||||
const threadReply = isGroup ? (groupSession?.threadReply ?? false) : false;
|
||||
|
||||
log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`);
|
||||
const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions,
|
||||
}),
|
||||
});
|
||||
if (broadcastAgents) {
|
||||
// Cross-account dedup: in multi-account setups, Feishu delivers the same
|
||||
// event to every bot account in the group. Only one account should handle
|
||||
// broadcast dispatch to avoid duplicate agent sessions and race conditions.
|
||||
// Uses a shared "broadcast" namespace (not per-account) so the first handler
|
||||
// to reach this point claims the message; subsequent accounts skip.
|
||||
if (!(await tryRecordMessagePersistent(ctx.messageId, "broadcast", log))) {
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast already claimed by another account for message ${ctx.messageId}; skipping`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (isGroup && historyKey && chatHistories) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
// --- Broadcast dispatch: send message to all configured agents ---
|
||||
const strategy =
|
||||
((cfg as Record<string, unknown>).broadcast as Record<string, unknown> | undefined)
|
||||
?.strategy || "parallel";
|
||||
const activeAgentId =
|
||||
ctx.mentionedBot || !requireMention ? normalizeAgentId(route.agentId) : null;
|
||||
const agentIds = (cfg.agents?.list ?? []).map((a: { id: string }) => normalizeAgentId(a.id));
|
||||
const hasKnownAgents = agentIds.length > 0;
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcasting to ${broadcastAgents.length} agents (strategy=${strategy}, active=${activeAgentId ?? "none"})`,
|
||||
);
|
||||
|
||||
const dispatchForAgent = async (agentId: string) => {
|
||||
if (hasKnownAgents && !agentIds.includes(normalizeAgentId(agentId))) {
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast agent ${agentId} not found in agents.list; skipping`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const agentSessionKey = buildBroadcastSessionKey(route.sessionKey, route.agentId, agentId);
|
||||
const agentCtx = buildCtxPayloadForAgent(
|
||||
agentSessionKey,
|
||||
route.accountId,
|
||||
ctx.mentionedBot && agentId === activeAgentId,
|
||||
);
|
||||
|
||||
if (agentId === activeAgentId) {
|
||||
// Active agent: real Feishu dispatcher (responds on Feishu)
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createFeishuReplyDispatcher({
|
||||
cfg,
|
||||
agentId,
|
||||
runtime: runtime as RuntimeEnv,
|
||||
chatId: ctx.chatId,
|
||||
replyToMessageId: replyTargetMessageId,
|
||||
skipReplyToInMessages: !isGroup,
|
||||
replyInThread,
|
||||
rootId: ctx.rootId,
|
||||
threadReply,
|
||||
mentionTargets: ctx.mentionTargets,
|
||||
accountId: account.accountId,
|
||||
messageCreateTimeMs,
|
||||
});
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast active dispatch agent=${agentId} (session=${agentSessionKey})`,
|
||||
);
|
||||
await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => markDispatchIdle(),
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: agentCtx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions,
|
||||
}),
|
||||
});
|
||||
} else {
|
||||
// Observer agent: no-op dispatcher (session entry + inference, no Feishu reply).
|
||||
// Strip CommandAuthorized so slash commands (e.g. /reset) don't silently
|
||||
// mutate observer sessions — only the active agent should execute commands.
|
||||
delete (agentCtx as Record<string, unknown>).CommandAuthorized;
|
||||
const noopDispatcher = {
|
||||
sendToolResult: () => false,
|
||||
sendBlockReply: () => false,
|
||||
sendFinalReply: () => false,
|
||||
waitForIdle: async () => {},
|
||||
getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }),
|
||||
markComplete: () => {},
|
||||
};
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast observer dispatch agent=${agentId} (session=${agentSessionKey})`,
|
||||
);
|
||||
await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher: noopDispatcher,
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: agentCtx,
|
||||
cfg,
|
||||
dispatcher: noopDispatcher,
|
||||
}),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
if (strategy === "sequential") {
|
||||
for (const agentId of broadcastAgents) {
|
||||
try {
|
||||
await dispatchForAgent(agentId);
|
||||
} catch (err) {
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast dispatch failed for agent=${agentId}: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const results = await Promise.allSettled(broadcastAgents.map(dispatchForAgent));
|
||||
for (let i = 0; i < results.length; i++) {
|
||||
if (results[i].status === "rejected") {
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast dispatch failed for agent=${broadcastAgents[i]}: ${String((results[i] as PromiseRejectedResult).reason)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isGroup && historyKey && chatHistories) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast dispatch complete for ${broadcastAgents.length} agents`,
|
||||
);
|
||||
} else {
|
||||
// --- Single-agent dispatch (existing behavior) ---
|
||||
const ctxPayload = buildCtxPayloadForAgent(
|
||||
route.sessionKey,
|
||||
route.accountId,
|
||||
ctx.mentionedBot,
|
||||
);
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createFeishuReplyDispatcher({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
runtime: runtime as RuntimeEnv,
|
||||
chatId: ctx.chatId,
|
||||
replyToMessageId: replyTargetMessageId,
|
||||
skipReplyToInMessages: !isGroup,
|
||||
replyInThread,
|
||||
rootId: ctx.rootId,
|
||||
threadReply,
|
||||
mentionTargets: ctx.mentionTargets,
|
||||
accountId: account.accountId,
|
||||
messageCreateTimeMs,
|
||||
});
|
||||
}
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`,
|
||||
);
|
||||
log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`);
|
||||
const { queuedFinal, counts } = await core.channel.reply.withReplyDispatcher({
|
||||
dispatcher,
|
||||
onSettled: () => {
|
||||
markDispatchIdle();
|
||||
},
|
||||
run: () =>
|
||||
core.channel.reply.dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions,
|
||||
}),
|
||||
});
|
||||
|
||||
if (isGroup && historyKey && chatHistories) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
|
||||
log(
|
||||
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
error(`feishu[${account.accountId}]: failed to dispatch message: ${String(err)}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user