feat(gateway): add channel health monitor with auto-restart

This commit is contained in:
David Szarzynski
2026-02-12 11:45:31 +07:00
committed by Peter Steinberger
parent 68489a213f
commit 497e2d76ad
2 changed files with 512 additions and 0 deletions

View File

@@ -0,0 +1,347 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ChannelId } from "../channels/plugins/types.js";
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js";
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
function createMockChannelManager(overrides?: Partial<ChannelManager>): ChannelManager {
return {
getRuntimeSnapshot: vi.fn(() => ({ channels: {}, channelAccounts: {} })),
startChannels: vi.fn(async () => {}),
startChannel: vi.fn(async () => {}),
stopChannel: vi.fn(async () => {}),
markChannelLoggedOut: vi.fn(),
isManuallyStopped: vi.fn(() => false),
resetRestartAttempts: vi.fn(),
...overrides,
};
}
function snapshotWith(
accounts: Record<string, Record<string, Partial<ChannelAccountSnapshot>>>,
): ChannelRuntimeSnapshot {
const channels: ChannelRuntimeSnapshot["channels"] = {};
const channelAccounts: ChannelRuntimeSnapshot["channelAccounts"] = {};
for (const [channelId, accts] of Object.entries(accounts)) {
const resolved: Record<string, ChannelAccountSnapshot> = {};
for (const [accountId, partial] of Object.entries(accts)) {
resolved[accountId] = { accountId, ...partial };
}
channelAccounts[channelId as ChannelId] = resolved;
const firstId = Object.keys(accts)[0];
if (firstId) {
channels[channelId as ChannelId] = resolved[firstId];
}
}
return { channels, channelAccounts };
}
describe("channel-health-monitor", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("does not run before the grace period", async () => {
const manager = createMockChannelManager();
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 60_000,
});
await vi.advanceTimersByTimeAsync(10_000);
expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled();
monitor.stop();
});
it("runs health check after grace period", async () => {
const manager = createMockChannelManager();
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 1_000,
});
await vi.advanceTimersByTimeAsync(6_500);
expect(manager.getRuntimeSnapshot).toHaveBeenCalled();
monitor.stop();
});
it("skips healthy channels (running + connected)", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: { running: true, connected: true, enabled: true, configured: true },
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.stopChannel).not.toHaveBeenCalled();
expect(manager.startChannel).not.toHaveBeenCalled();
monitor.stop();
});
it("skips disabled channels", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
imessage: {
default: {
running: false,
enabled: false,
configured: true,
lastError: "disabled",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.startChannel).not.toHaveBeenCalled();
monitor.stop();
});
it("skips unconfigured channels", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: { running: false, enabled: true, configured: false },
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.startChannel).not.toHaveBeenCalled();
monitor.stop();
});
it("skips manually stopped channels", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: { running: false, enabled: true, configured: true },
},
}),
),
isManuallyStopped: vi.fn(() => true),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.startChannel).not.toHaveBeenCalled();
monitor.stop();
});
it("restarts a stuck channel (running but not connected)", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
whatsapp: {
default: {
running: true,
connected: false,
enabled: true,
configured: true,
linked: true,
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.stopChannel).toHaveBeenCalledWith("whatsapp", "default");
expect(manager.resetRestartAttempts).toHaveBeenCalledWith("whatsapp", "default");
expect(manager.startChannel).toHaveBeenCalledWith("whatsapp", "default");
monitor.stop();
});
it("restarts a stopped channel that gave up (reconnectAttempts >= 10)", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: {
running: false,
enabled: true,
configured: true,
reconnectAttempts: 10,
lastError: "Failed to resolve Discord application id",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.resetRestartAttempts).toHaveBeenCalledWith("discord", "default");
expect(manager.startChannel).toHaveBeenCalledWith("discord", "default");
monitor.stop();
});
it("restarts a channel that stopped unexpectedly (not running, not manual)", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
telegram: {
default: {
running: false,
enabled: true,
configured: true,
lastError: "polling stopped unexpectedly",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.resetRestartAttempts).toHaveBeenCalledWith("telegram", "default");
expect(manager.startChannel).toHaveBeenCalledWith("telegram", "default");
monitor.stop();
});
it("applies cooldown — skips recently restarted channels for 2 cycles", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: {
running: false,
enabled: true,
configured: true,
lastError: "crashed",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(5_000);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(5_000);
expect(manager.startChannel).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(5_000);
expect(manager.startChannel).toHaveBeenCalledTimes(2);
monitor.stop();
});
it("caps at 3 health-monitor restarts per channel per hour", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
discord: {
default: {
running: false,
enabled: true,
configured: true,
lastError: "keeps crashing",
},
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 1_000,
startupGraceMs: 0,
cooldownCycles: 1,
maxRestartsPerHour: 3,
});
await vi.advanceTimersByTimeAsync(1_500);
await vi.advanceTimersByTimeAsync(2_000);
await vi.advanceTimersByTimeAsync(2_000);
expect(manager.startChannel).toHaveBeenCalledTimes(3);
await vi.advanceTimersByTimeAsync(2_000);
expect(manager.startChannel).toHaveBeenCalledTimes(3);
monitor.stop();
});
it("stops cleanly", async () => {
const manager = createMockChannelManager();
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
monitor.stop();
await vi.advanceTimersByTimeAsync(10_000);
expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled();
});
it("stops via abort signal", async () => {
const manager = createMockChannelManager();
const abort = new AbortController();
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
abortSignal: abort.signal,
});
abort.abort();
await vi.advanceTimersByTimeAsync(10_000);
expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled();
monitor.stop();
});
it("treats running channels without a connected field as healthy", async () => {
const manager = createMockChannelManager({
getRuntimeSnapshot: vi.fn(() =>
snapshotWith({
slack: {
default: { running: true, enabled: true, configured: true },
},
}),
),
});
const monitor = startChannelHealthMonitor({
channelManager: manager,
checkIntervalMs: 5_000,
startupGraceMs: 0,
});
await vi.advanceTimersByTimeAsync(5_500);
expect(manager.stopChannel).not.toHaveBeenCalled();
monitor.stop();
});
});

View File

@@ -0,0 +1,165 @@
import type { ChannelId } from "../channels/plugins/types.js";
import type { ChannelManager } from "./server-channels.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const log = createSubsystemLogger("gateway/health-monitor");
const DEFAULT_CHECK_INTERVAL_MS = 5 * 60_000;
const DEFAULT_STARTUP_GRACE_MS = 60_000;
const DEFAULT_COOLDOWN_CYCLES = 2;
const DEFAULT_MAX_RESTARTS_PER_HOUR = 3;
const ONE_HOUR_MS = 60 * 60_000;
export type ChannelHealthMonitorDeps = {
channelManager: ChannelManager;
checkIntervalMs?: number;
startupGraceMs?: number;
cooldownCycles?: number;
maxRestartsPerHour?: number;
abortSignal?: AbortSignal;
};
export type ChannelHealthMonitor = {
stop: () => void;
};
type RestartRecord = {
lastRestartAt: number;
restartsThisHour: { at: number }[];
};
function isChannelHealthy(snapshot: {
running?: boolean;
connected?: boolean;
enabled?: boolean;
configured?: boolean;
}): boolean {
if (!snapshot.enabled || !snapshot.configured) {
return true;
}
if (!snapshot.running) {
return false;
}
if (snapshot.connected === false) {
return false;
}
return true;
}
export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor {
const {
channelManager,
checkIntervalMs = DEFAULT_CHECK_INTERVAL_MS,
startupGraceMs = DEFAULT_STARTUP_GRACE_MS,
cooldownCycles = DEFAULT_COOLDOWN_CYCLES,
maxRestartsPerHour = DEFAULT_MAX_RESTARTS_PER_HOUR,
abortSignal,
} = deps;
const cooldownMs = cooldownCycles * checkIntervalMs;
const restartRecords = new Map<string, RestartRecord>();
const startedAt = Date.now();
let stopped = false;
let timer: ReturnType<typeof setInterval> | null = null;
const rKey = (channelId: string, accountId: string) => `${channelId}:${accountId}`;
function pruneOldRestarts(record: RestartRecord, now: number) {
record.restartsThisHour = record.restartsThisHour.filter((r) => now - r.at < ONE_HOUR_MS);
}
async function runCheck() {
if (stopped) {
return;
}
const now = Date.now();
if (now - startedAt < startupGraceMs) {
return;
}
const snapshot = channelManager.getRuntimeSnapshot();
for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) {
if (!accounts) {
continue;
}
for (const [accountId, status] of Object.entries(accounts)) {
if (!status) {
continue;
}
if (!status.enabled || !status.configured) {
continue;
}
if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) {
continue;
}
if (isChannelHealthy(status)) {
continue;
}
const key = rKey(channelId, accountId);
const record = restartRecords.get(key) ?? {
lastRestartAt: 0,
restartsThisHour: [],
};
if (now - record.lastRestartAt <= cooldownMs) {
continue;
}
pruneOldRestarts(record, now);
if (record.restartsThisHour.length >= maxRestartsPerHour) {
log.warn?.(
`[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`,
);
continue;
}
const reason = !status.running
? status.reconnectAttempts && status.reconnectAttempts >= 10
? "gave-up"
: "stopped"
: "stuck";
log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`);
try {
if (status.running) {
await channelManager.stopChannel(channelId as ChannelId, accountId);
}
channelManager.resetRestartAttempts(channelId as ChannelId, accountId);
await channelManager.startChannel(channelId as ChannelId, accountId);
record.lastRestartAt = now;
record.restartsThisHour.push({ at: now });
restartRecords.set(key, record);
} catch (err) {
log.error?.(`[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`);
}
}
}
}
function stop() {
stopped = true;
if (timer) {
clearInterval(timer);
timer = null;
}
}
if (abortSignal?.aborted) {
stopped = true;
} else {
abortSignal?.addEventListener("abort", stop, { once: true });
timer = setInterval(() => void runCheck(), checkIntervalMs);
if (typeof timer === "object" && "unref" in timer) {
timer.unref();
}
log.info?.(
`started (interval: ${Math.round(checkIntervalMs / 1000)}s, grace: ${Math.round(startupGraceMs / 1000)}s)`,
);
}
return { stop };
}