Files
openclaw/extensions/feishu/src/monitor.transport.ts
2026-03-04 02:35:12 -05:00

164 lines
4.2 KiB
TypeScript

import * as http from "http";
import * as Lark from "@larksuiteoapi/node-sdk";
import {
applyBasicWebhookRequestGuards,
type RuntimeEnv,
installRequestBodyLimitGuard,
} from "openclaw/plugin-sdk/feishu";
import { createFeishuWSClient } from "./client.js";
import {
botOpenIds,
FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
FEISHU_WEBHOOK_MAX_BODY_BYTES,
feishuWebhookRateLimiter,
httpServers,
recordWebhookStatus,
wsClients,
} from "./monitor.state.js";
import type { ResolvedFeishuAccount } from "./types.js";
export type MonitorTransportParams = {
account: ResolvedFeishuAccount;
accountId: string;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
eventDispatcher: Lark.EventDispatcher;
};
export async function monitorWebSocket({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
}: MonitorTransportParams): Promise<void> {
const log = runtime?.log ?? console.log;
log(`feishu[${accountId}]: starting WebSocket connection...`);
const wsClient = createFeishuWSClient(account);
wsClients.set(accountId, wsClient);
return new Promise((resolve, reject) => {
const cleanup = () => {
wsClients.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
try {
wsClient.start({ eventDispatcher });
log(`feishu[${accountId}]: WebSocket client started`);
} catch (err) {
cleanup();
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
}
});
}
export async function monitorWebhook({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
}: MonitorTransportParams): Promise<void> {
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
const port = account.config.webhookPort ?? 3000;
const path = account.config.webhookPath ?? "/feishu/events";
const host = account.config.webhookHost ?? "127.0.0.1";
log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`);
const server = http.createServer();
const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true });
server.on("request", (req, res) => {
res.on("finish", () => {
recordWebhookStatus(runtime, accountId, path, res.statusCode);
});
const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`;
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: feishuWebhookRateLimiter,
rateLimitKey,
nowMs: Date.now(),
requireJsonContentType: true,
})
) {
return;
}
const guard = installRequestBodyLimitGuard(req, res, {
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
responseFormat: "text",
});
if (guard.isTripped()) {
return;
}
void Promise.resolve(webhookHandler(req, res))
.catch((err) => {
if (!guard.isTripped()) {
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
}
})
.finally(() => {
guard.dispose();
});
});
httpServers.set(accountId, server);
return new Promise((resolve, reject) => {
const cleanup = () => {
server.close();
httpServers.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping Webhook server`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
server.listen(port, host, () => {
log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`);
});
server.on("error", (err) => {
error(`feishu[${accountId}]: Webhook server error: ${err}`);
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
});
});
}