gateway: expose isManuallyStopped and resetRestartAttempts on ChannelManager
This commit is contained in:
committed by
Peter Steinberger
parent
ae0b110e44
commit
68489a213f
@@ -4,10 +4,19 @@ import type { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
|
||||
import { type ChannelId, getChannelPlugin, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { type BackoffPolicy, computeBackoff, sleepWithAbort } from "../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { resetDirectoryCache } from "../infra/outbound/target-resolver.js";
|
||||
import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
|
||||
|
||||
const CHANNEL_RESTART_POLICY: BackoffPolicy = {
|
||||
initialMs: 5_000,
|
||||
maxMs: 5 * 60_000,
|
||||
factor: 2,
|
||||
jitter: 0.1,
|
||||
};
|
||||
const MAX_RESTART_ATTEMPTS = 10;
|
||||
|
||||
export type ChannelRuntimeSnapshot = {
|
||||
channels: Partial<Record<ChannelId, ChannelAccountSnapshot>>;
|
||||
channelAccounts: Partial<Record<ChannelId, Record<string, ChannelAccountSnapshot>>>;
|
||||
@@ -58,6 +67,8 @@ export type ChannelManager = {
|
||||
startChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
||||
stopChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
||||
markChannelLoggedOut: (channelId: ChannelId, cleared: boolean, accountId?: string) => void;
|
||||
isManuallyStopped: (channelId: ChannelId, accountId: string) => boolean;
|
||||
resetRestartAttempts: (channelId: ChannelId, accountId: string) => void;
|
||||
};
|
||||
|
||||
// Channel docking: lifecycle hooks (`plugin.gateway`) flow through this manager.
|
||||
@@ -65,6 +76,12 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
const { loadConfig, channelLogs, channelRuntimeEnvs } = opts;
|
||||
|
||||
const channelStores = new Map<ChannelId, ChannelRuntimeStore>();
|
||||
// Tracks restart attempts per channel:account. Reset on successful start.
|
||||
const restartAttempts = new Map<string, number>();
|
||||
// Tracks accounts that were manually stopped so we don't auto-restart them.
|
||||
const manuallyStopped = new Set<string>();
|
||||
|
||||
const restartKey = (channelId: ChannelId, accountId: string) => `${channelId}:${accountId}`;
|
||||
|
||||
const getStore = (channelId: ChannelId): ChannelRuntimeStore => {
|
||||
const existing = channelStores.get(channelId);
|
||||
@@ -138,13 +155,18 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
return;
|
||||
}
|
||||
|
||||
const rKey = restartKey(channelId, id);
|
||||
manuallyStopped.delete(rKey);
|
||||
|
||||
const abort = new AbortController();
|
||||
store.aborts.set(id, abort);
|
||||
restartAttempts.delete(rKey);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: true,
|
||||
lastStartAt: Date.now(),
|
||||
lastError: null,
|
||||
reconnectAttempts: 0,
|
||||
});
|
||||
|
||||
const log = channelLogs[channelId];
|
||||
@@ -172,6 +194,31 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
running: false,
|
||||
lastStopAt: Date.now(),
|
||||
});
|
||||
})
|
||||
.then(async () => {
|
||||
if (manuallyStopped.has(rKey)) {
|
||||
return;
|
||||
}
|
||||
const attempt = (restartAttempts.get(rKey) ?? 0) + 1;
|
||||
restartAttempts.set(rKey, attempt);
|
||||
if (attempt > MAX_RESTART_ATTEMPTS) {
|
||||
log.error?.(`[${id}] giving up after ${MAX_RESTART_ATTEMPTS} restart attempts`);
|
||||
return;
|
||||
}
|
||||
const delayMs = computeBackoff(CHANNEL_RESTART_POLICY, attempt);
|
||||
log.info?.(
|
||||
`[${id}] auto-restart attempt ${attempt}/${MAX_RESTART_ATTEMPTS} in ${Math.round(delayMs / 1000)}s`,
|
||||
);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
reconnectAttempts: attempt,
|
||||
});
|
||||
try {
|
||||
await sleepWithAbort(delayMs);
|
||||
await startChannel(channelId, id);
|
||||
} catch {
|
||||
// abort or startup failure — next crash will retry
|
||||
}
|
||||
});
|
||||
store.tasks.set(id, tracked);
|
||||
}),
|
||||
@@ -203,6 +250,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
if (!abort && !task && !plugin?.gateway?.stopAccount) {
|
||||
return;
|
||||
}
|
||||
manuallyStopped.add(restartKey(channelId, id));
|
||||
abort?.abort();
|
||||
if (plugin?.gateway?.stopAccount) {
|
||||
const account = plugin.config.resolveAccount(cfg, id);
|
||||
@@ -302,11 +350,21 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
|
||||
return { channels, channelAccounts };
|
||||
};
|
||||
|
||||
const isManuallyStopped_ = (channelId: ChannelId, accountId: string): boolean => {
|
||||
return manuallyStopped.has(restartKey(channelId, accountId));
|
||||
};
|
||||
|
||||
const resetRestartAttempts_ = (channelId: ChannelId, accountId: string): void => {
|
||||
restartAttempts.delete(restartKey(channelId, accountId));
|
||||
};
|
||||
|
||||
return {
|
||||
getRuntimeSnapshot,
|
||||
startChannels,
|
||||
startChannel,
|
||||
stopChannel,
|
||||
markChannelLoggedOut,
|
||||
isManuallyStopped: isManuallyStopped_,
|
||||
resetRestartAttempts: resetRestartAttempts_,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user