From 497e2d76ad45a252d7164ba08c9fdcdd8f3f6f6e Mon Sep 17 00:00:00 2001 From: David Szarzynski Date: Thu, 12 Feb 2026 11:45:31 +0700 Subject: [PATCH] feat(gateway): add channel health monitor with auto-restart --- src/gateway/channel-health-monitor.test.ts | 347 +++++++++++++++++++++ src/gateway/channel-health-monitor.ts | 165 ++++++++++ 2 files changed, 512 insertions(+) create mode 100644 src/gateway/channel-health-monitor.test.ts create mode 100644 src/gateway/channel-health-monitor.ts diff --git a/src/gateway/channel-health-monitor.test.ts b/src/gateway/channel-health-monitor.test.ts new file mode 100644 index 000000000..511543659 --- /dev/null +++ b/src/gateway/channel-health-monitor.test.ts @@ -0,0 +1,347 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChannelId } from "../channels/plugins/types.js"; +import type { ChannelAccountSnapshot } from "../channels/plugins/types.js"; +import type { ChannelManager, ChannelRuntimeSnapshot } from "./server-channels.js"; +import { startChannelHealthMonitor } from "./channel-health-monitor.js"; + +function createMockChannelManager(overrides?: Partial): ChannelManager { + return { + getRuntimeSnapshot: vi.fn(() => ({ channels: {}, channelAccounts: {} })), + startChannels: vi.fn(async () => {}), + startChannel: vi.fn(async () => {}), + stopChannel: vi.fn(async () => {}), + markChannelLoggedOut: vi.fn(), + isManuallyStopped: vi.fn(() => false), + resetRestartAttempts: vi.fn(), + ...overrides, + }; +} + +function snapshotWith( + accounts: Record>>, +): ChannelRuntimeSnapshot { + const channels: ChannelRuntimeSnapshot["channels"] = {}; + const channelAccounts: ChannelRuntimeSnapshot["channelAccounts"] = {}; + for (const [channelId, accts] of Object.entries(accounts)) { + const resolved: Record = {}; + for (const [accountId, partial] of Object.entries(accts)) { + resolved[accountId] = { accountId, ...partial }; + } + channelAccounts[channelId as ChannelId] = resolved; + const firstId = Object.keys(accts)[0]; + if (firstId) { + channels[channelId as ChannelId] = resolved[firstId]; + } + } + return { channels, channelAccounts }; +} + +describe("channel-health-monitor", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + it("does not run before the grace period", async () => { + const manager = createMockChannelManager(); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 60_000, + }); + await vi.advanceTimersByTimeAsync(10_000); + expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("runs health check after grace period", async () => { + const manager = createMockChannelManager(); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 1_000, + }); + await vi.advanceTimersByTimeAsync(6_500); + expect(manager.getRuntimeSnapshot).toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips healthy channels (running + connected)", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { running: true, connected: true, enabled: true, configured: true }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.stopChannel).not.toHaveBeenCalled(); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips disabled channels", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + imessage: { + default: { + running: false, + enabled: false, + configured: true, + lastError: "disabled", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips unconfigured channels", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { running: false, enabled: true, configured: false }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("skips manually stopped channels", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { running: false, enabled: true, configured: true }, + }, + }), + ), + isManuallyStopped: vi.fn(() => true), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.startChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("restarts a stuck channel (running but not connected)", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + whatsapp: { + default: { + running: true, + connected: false, + enabled: true, + configured: true, + linked: true, + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.stopChannel).toHaveBeenCalledWith("whatsapp", "default"); + expect(manager.resetRestartAttempts).toHaveBeenCalledWith("whatsapp", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("whatsapp", "default"); + monitor.stop(); + }); + + it("restarts a stopped channel that gave up (reconnectAttempts >= 10)", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { + running: false, + enabled: true, + configured: true, + reconnectAttempts: 10, + lastError: "Failed to resolve Discord application id", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.resetRestartAttempts).toHaveBeenCalledWith("discord", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("discord", "default"); + monitor.stop(); + }); + + it("restarts a channel that stopped unexpectedly (not running, not manual)", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + telegram: { + default: { + running: false, + enabled: true, + configured: true, + lastError: "polling stopped unexpectedly", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.resetRestartAttempts).toHaveBeenCalledWith("telegram", "default"); + expect(manager.startChannel).toHaveBeenCalledWith("telegram", "default"); + monitor.stop(); + }); + + it("applies cooldown — skips recently restarted channels for 2 cycles", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { + running: false, + enabled: true, + configured: true, + lastError: "crashed", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(5_000); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(5_000); + expect(manager.startChannel).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(5_000); + expect(manager.startChannel).toHaveBeenCalledTimes(2); + monitor.stop(); + }); + + it("caps at 3 health-monitor restarts per channel per hour", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + discord: { + default: { + running: false, + enabled: true, + configured: true, + lastError: "keeps crashing", + }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 1_000, + startupGraceMs: 0, + cooldownCycles: 1, + maxRestartsPerHour: 3, + }); + await vi.advanceTimersByTimeAsync(1_500); + await vi.advanceTimersByTimeAsync(2_000); + await vi.advanceTimersByTimeAsync(2_000); + expect(manager.startChannel).toHaveBeenCalledTimes(3); + await vi.advanceTimersByTimeAsync(2_000); + expect(manager.startChannel).toHaveBeenCalledTimes(3); + monitor.stop(); + }); + + it("stops cleanly", async () => { + const manager = createMockChannelManager(); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + monitor.stop(); + await vi.advanceTimersByTimeAsync(10_000); + expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled(); + }); + + it("stops via abort signal", async () => { + const manager = createMockChannelManager(); + const abort = new AbortController(); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + abortSignal: abort.signal, + }); + abort.abort(); + await vi.advanceTimersByTimeAsync(10_000); + expect(manager.getRuntimeSnapshot).not.toHaveBeenCalled(); + monitor.stop(); + }); + + it("treats running channels without a connected field as healthy", async () => { + const manager = createMockChannelManager({ + getRuntimeSnapshot: vi.fn(() => + snapshotWith({ + slack: { + default: { running: true, enabled: true, configured: true }, + }, + }), + ), + }); + const monitor = startChannelHealthMonitor({ + channelManager: manager, + checkIntervalMs: 5_000, + startupGraceMs: 0, + }); + await vi.advanceTimersByTimeAsync(5_500); + expect(manager.stopChannel).not.toHaveBeenCalled(); + monitor.stop(); + }); +}); diff --git a/src/gateway/channel-health-monitor.ts b/src/gateway/channel-health-monitor.ts new file mode 100644 index 000000000..bfed61983 --- /dev/null +++ b/src/gateway/channel-health-monitor.ts @@ -0,0 +1,165 @@ +import type { ChannelId } from "../channels/plugins/types.js"; +import type { ChannelManager } from "./server-channels.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; + +const log = createSubsystemLogger("gateway/health-monitor"); + +const DEFAULT_CHECK_INTERVAL_MS = 5 * 60_000; +const DEFAULT_STARTUP_GRACE_MS = 60_000; +const DEFAULT_COOLDOWN_CYCLES = 2; +const DEFAULT_MAX_RESTARTS_PER_HOUR = 3; +const ONE_HOUR_MS = 60 * 60_000; + +export type ChannelHealthMonitorDeps = { + channelManager: ChannelManager; + checkIntervalMs?: number; + startupGraceMs?: number; + cooldownCycles?: number; + maxRestartsPerHour?: number; + abortSignal?: AbortSignal; +}; + +export type ChannelHealthMonitor = { + stop: () => void; +}; + +type RestartRecord = { + lastRestartAt: number; + restartsThisHour: { at: number }[]; +}; + +function isChannelHealthy(snapshot: { + running?: boolean; + connected?: boolean; + enabled?: boolean; + configured?: boolean; +}): boolean { + if (!snapshot.enabled || !snapshot.configured) { + return true; + } + if (!snapshot.running) { + return false; + } + if (snapshot.connected === false) { + return false; + } + return true; +} + +export function startChannelHealthMonitor(deps: ChannelHealthMonitorDeps): ChannelHealthMonitor { + const { + channelManager, + checkIntervalMs = DEFAULT_CHECK_INTERVAL_MS, + startupGraceMs = DEFAULT_STARTUP_GRACE_MS, + cooldownCycles = DEFAULT_COOLDOWN_CYCLES, + maxRestartsPerHour = DEFAULT_MAX_RESTARTS_PER_HOUR, + abortSignal, + } = deps; + + const cooldownMs = cooldownCycles * checkIntervalMs; + const restartRecords = new Map(); + const startedAt = Date.now(); + let stopped = false; + let timer: ReturnType | null = null; + + const rKey = (channelId: string, accountId: string) => `${channelId}:${accountId}`; + + function pruneOldRestarts(record: RestartRecord, now: number) { + record.restartsThisHour = record.restartsThisHour.filter((r) => now - r.at < ONE_HOUR_MS); + } + + async function runCheck() { + if (stopped) { + return; + } + + const now = Date.now(); + if (now - startedAt < startupGraceMs) { + return; + } + + const snapshot = channelManager.getRuntimeSnapshot(); + + for (const [channelId, accounts] of Object.entries(snapshot.channelAccounts)) { + if (!accounts) { + continue; + } + for (const [accountId, status] of Object.entries(accounts)) { + if (!status) { + continue; + } + if (!status.enabled || !status.configured) { + continue; + } + if (channelManager.isManuallyStopped(channelId as ChannelId, accountId)) { + continue; + } + if (isChannelHealthy(status)) { + continue; + } + + const key = rKey(channelId, accountId); + const record = restartRecords.get(key) ?? { + lastRestartAt: 0, + restartsThisHour: [], + }; + + if (now - record.lastRestartAt <= cooldownMs) { + continue; + } + + pruneOldRestarts(record, now); + if (record.restartsThisHour.length >= maxRestartsPerHour) { + log.warn?.( + `[${channelId}:${accountId}] health-monitor: hit ${maxRestartsPerHour} restarts/hour limit, skipping`, + ); + continue; + } + + const reason = !status.running + ? status.reconnectAttempts && status.reconnectAttempts >= 10 + ? "gave-up" + : "stopped" + : "stuck"; + + log.info?.(`[${channelId}:${accountId}] health-monitor: restarting (reason: ${reason})`); + + try { + if (status.running) { + await channelManager.stopChannel(channelId as ChannelId, accountId); + } + channelManager.resetRestartAttempts(channelId as ChannelId, accountId); + await channelManager.startChannel(channelId as ChannelId, accountId); + record.lastRestartAt = now; + record.restartsThisHour.push({ at: now }); + restartRecords.set(key, record); + } catch (err) { + log.error?.(`[${channelId}:${accountId}] health-monitor: restart failed: ${String(err)}`); + } + } + } + } + + function stop() { + stopped = true; + if (timer) { + clearInterval(timer); + timer = null; + } + } + + if (abortSignal?.aborted) { + stopped = true; + } else { + abortSignal?.addEventListener("abort", stop, { once: true }); + timer = setInterval(() => void runCheck(), checkIntervalMs); + if (typeof timer === "object" && "unref" in timer) { + timer.unref(); + } + log.info?.( + `started (interval: ${Math.round(checkIntervalMs / 1000)}s, grace: ${Math.round(startupGraceMs / 1000)}s)`, + ); + } + + return { stop }; +}