Files
openclaw/extensions/feishu/src/reply-dispatcher.ts
2026-02-14 05:24:27 -06:00

240 lines
7.5 KiB
TypeScript

import {
createReplyPrefixContext,
createTypingCallbacks,
logTypingFailure,
type ClawdbotConfig,
type ReplyPayload,
type RuntimeEnv,
} from "openclaw/plugin-sdk";
import type { MentionTarget } from "./mention.js";
import { resolveFeishuAccount } from "./accounts.js";
import { createFeishuClient } from "./client.js";
import { buildMentionedCardContent } from "./mention.js";
import { getFeishuRuntime } from "./runtime.js";
import { sendMarkdownCardFeishu, sendMessageFeishu } from "./send.js";
import { FeishuStreamingSession } from "./streaming-card.js";
import { resolveReceiveIdType } from "./targets.js";
import { addTypingIndicator, removeTypingIndicator, type TypingIndicatorState } from "./typing.js";
/** Detect if text contains markdown elements that benefit from card rendering */
function shouldUseCard(text: string): boolean {
return /```[\s\S]*?```/.test(text) || /\|.+\|[\r\n]+\|[-:| ]+\|/.test(text);
}
export type CreateFeishuReplyDispatcherParams = {
cfg: ClawdbotConfig;
agentId: string;
runtime: RuntimeEnv;
chatId: string;
replyToMessageId?: string;
mentionTargets?: MentionTarget[];
accountId?: string;
};
export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherParams) {
const core = getFeishuRuntime();
const { cfg, agentId, chatId, replyToMessageId, mentionTargets, accountId } = params;
const account = resolveFeishuAccount({ cfg, accountId });
const prefixContext = createReplyPrefixContext({ cfg, agentId });
let typingState: TypingIndicatorState | null = null;
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!replyToMessageId) {
return;
}
typingState = await addTypingIndicator({ cfg, messageId: replyToMessageId, accountId });
},
stop: async () => {
if (!typingState) {
return;
}
await removeTypingIndicator({ cfg, state: typingState, accountId });
typingState = null;
},
onStartError: (err) =>
logTypingFailure({
log: (message) => params.runtime.log?.(message),
channel: "feishu",
action: "start",
error: err,
}),
onStopError: (err) =>
logTypingFailure({
log: (message) => params.runtime.log?.(message),
channel: "feishu",
action: "stop",
error: err,
}),
});
const textChunkLimit = core.channel.text.resolveTextChunkLimit(cfg, "feishu", accountId, {
fallbackLimit: 4000,
});
const chunkMode = core.channel.text.resolveChunkMode(cfg, "feishu");
const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg, channel: "feishu" });
const renderMode = account.config?.renderMode ?? "auto";
const streamingEnabled = account.config?.streaming !== false && renderMode !== "raw";
let streaming: FeishuStreamingSession | null = null;
let streamText = "";
let lastPartial = "";
let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null;
const startStreaming = () => {
if (!streamingEnabled || streamingStartPromise || streaming) {
return;
}
streamingStartPromise = (async () => {
const creds =
account.appId && account.appSecret
? { appId: account.appId, appSecret: account.appSecret, domain: account.domain }
: null;
if (!creds) {
return;
}
streaming = new FeishuStreamingSession(createFeishuClient(account), creds, (message) =>
params.runtime.log?.(`feishu[${account.accountId}] ${message}`),
);
try {
await streaming.start(chatId, resolveReceiveIdType(chatId));
} catch (error) {
params.runtime.error?.(`feishu: streaming start failed: ${String(error)}`);
streaming = null;
}
})();
};
const closeStreaming = async () => {
if (streamingStartPromise) {
await streamingStartPromise;
}
await partialUpdateQueue;
if (streaming?.isActive()) {
let text = streamText;
if (mentionTargets?.length) {
text = buildMentionedCardContent(mentionTargets, text);
}
await streaming.close(text);
}
streaming = null;
streamingStartPromise = null;
streamText = "";
lastPartial = "";
};
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId),
onReplyStart: () => {
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
void typingCallbacks.onReplyStart?.();
},
deliver: async (payload: ReplyPayload, info) => {
const text = payload.text ?? "";
if (!text.trim()) {
return;
}
const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text));
if ((info?.kind === "block" || info?.kind === "final") && streamingEnabled && useCard) {
startStreaming();
if (streamingStartPromise) {
await streamingStartPromise;
}
}
if (streaming?.isActive()) {
if (info?.kind === "final") {
streamText = text;
await closeStreaming();
}
return;
}
let first = true;
if (useCard) {
for (const chunk of core.channel.text.chunkTextWithMode(
text,
textChunkLimit,
chunkMode,
)) {
await sendMarkdownCardFeishu({
cfg,
to: chatId,
text: chunk,
replyToMessageId,
mentions: first ? mentionTargets : undefined,
accountId,
});
first = false;
}
} else {
const converted = core.channel.text.convertMarkdownTables(text, tableMode);
for (const chunk of core.channel.text.chunkTextWithMode(
converted,
textChunkLimit,
chunkMode,
)) {
await sendMessageFeishu({
cfg,
to: chatId,
text: chunk,
replyToMessageId,
mentions: first ? mentionTargets : undefined,
accountId,
});
first = false;
}
}
},
onError: async (error, info) => {
params.runtime.error?.(
`feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`,
);
await closeStreaming();
typingCallbacks.onIdle?.();
},
onIdle: async () => {
await closeStreaming();
typingCallbacks.onIdle?.();
},
onCleanup: () => {
typingCallbacks.onCleanup?.();
},
});
return {
dispatcher,
replyOptions: {
...replyOptions,
onModelSelected: prefixContext.onModelSelected,
onPartialReply: streamingEnabled
? (payload: ReplyPayload) => {
if (!payload.text || payload.text === lastPartial) {
return;
}
lastPartial = payload.text;
streamText = payload.text;
partialUpdateQueue = partialUpdateQueue.then(async () => {
if (streamingStartPromise) {
await streamingStartPromise;
}
if (streaming?.isActive()) {
await streaming.update(streamText);
}
});
}
: undefined,
},
markDispatchIdle,
};
}