diff --git a/src/auto-reply/dispatch.test.ts b/src/auto-reply/dispatch.test.ts new file mode 100644 index 000000000..b07f720ab --- /dev/null +++ b/src/auto-reply/dispatch.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it, vi } from "vitest"; +import type { ReplyDispatcher } from "./reply/reply-dispatcher.js"; +import { withReplyDispatcher } from "./dispatch.js"; + +function createDispatcher(record: string[]): ReplyDispatcher { + return { + sendToolResult: () => true, + sendBlockReply: () => true, + sendFinalReply: () => true, + getQueuedCounts: () => ({ tool: 0, block: 0, final: 0 }), + markComplete: () => { + record.push("markComplete"); + }, + waitForIdle: async () => { + record.push("waitForIdle"); + }, + }; +} + +describe("withReplyDispatcher", () => { + it("always marks complete and waits for idle after success", async () => { + const order: string[] = []; + const dispatcher = createDispatcher(order); + + const result = await withReplyDispatcher({ + dispatcher, + run: async () => { + order.push("run"); + return "ok"; + }, + onSettled: () => { + order.push("onSettled"); + }, + }); + + expect(result).toBe("ok"); + expect(order).toEqual(["run", "markComplete", "waitForIdle", "onSettled"]); + }); + + it("still drains dispatcher after run throws", async () => { + const order: string[] = []; + const dispatcher = createDispatcher(order); + const onSettled = vi.fn(() => { + order.push("onSettled"); + }); + + await expect( + withReplyDispatcher({ + dispatcher, + run: async () => { + order.push("run"); + throw new Error("boom"); + }, + onSettled, + }), + ).rejects.toThrow("boom"); + + expect(onSettled).toHaveBeenCalledTimes(1); + expect(order).toEqual(["run", "markComplete", "waitForIdle", "onSettled"]); + }); +}); diff --git a/src/auto-reply/dispatch.ts b/src/auto-reply/dispatch.ts index d018623c7..32f89beb1 100644 --- a/src/auto-reply/dispatch.ts +++ b/src/auto-reply/dispatch.ts @@ -14,6 +14,24 @@ import { export type DispatchInboundResult = DispatchFromConfigResult; +export async function withReplyDispatcher(params: { + dispatcher: ReplyDispatcher; + run: () => Promise; + onSettled?: () => void | Promise; +}): Promise { + try { + return await params.run(); + } finally { + // Ensure dispatcher reservations are always released on every exit path. + params.dispatcher.markComplete(); + try { + await params.dispatcher.waitForIdle(); + } finally { + await params.onSettled?.(); + } + } +} + export async function dispatchInboundMessage(params: { ctx: MsgContext | FinalizedMsgContext; cfg: OpenClawConfig; @@ -41,20 +59,23 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: { const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping( params.dispatcherOptions, ); - - const result = await dispatchInboundMessage({ - ctx: params.ctx, - cfg: params.cfg, + return await withReplyDispatcher({ dispatcher, - replyResolver: params.replyResolver, - replyOptions: { - ...params.replyOptions, - ...replyOptions, + run: async () => + dispatchInboundMessage({ + ctx: params.ctx, + cfg: params.cfg, + dispatcher, + replyResolver: params.replyResolver, + replyOptions: { + ...params.replyOptions, + ...replyOptions, + }, + }), + onSettled: () => { + markDispatchIdle(); }, }); - - markDispatchIdle(); - return result; } export async function dispatchInboundMessageWithDispatcher(params: { @@ -65,13 +86,15 @@ export async function dispatchInboundMessageWithDispatcher(params: { replyResolver?: typeof import("./reply.js").getReplyFromConfig; }): Promise { const dispatcher = createReplyDispatcher(params.dispatcherOptions); - const result = await dispatchInboundMessage({ - ctx: params.ctx, - cfg: params.cfg, + return await withReplyDispatcher({ dispatcher, - replyResolver: params.replyResolver, - replyOptions: params.replyOptions, + run: async () => + dispatchInboundMessage({ + ctx: params.ctx, + cfg: params.cfg, + dispatcher, + replyResolver: params.replyResolver, + replyOptions: params.replyOptions, + }), }); - await dispatcher.waitForIdle(); - return result; } diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 928e02cc5..f2de12bcb 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -5,6 +5,7 @@ const acquireGatewayLock = vi.fn(async () => ({ })); const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); +const markGatewaySigusr1RestartHandled = vi.fn(); const getActiveTaskCount = vi.fn(() => 0); const waitForActiveTasks = vi.fn(async () => ({ drained: true })); const resetAllLanes = vi.fn(); @@ -22,6 +23,7 @@ vi.mock("../../infra/gateway-lock.js", () => ({ vi.mock("../../infra/restart.js", () => ({ consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(), isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(), + markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(), })); vi.mock("../../process/command-queue.js", () => ({ @@ -100,6 +102,7 @@ describe("runGatewayLoop", () => { reason: "gateway restarting", restartExpectedMs: 1500, }); + expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(1); expect(resetAllLanes).toHaveBeenCalledTimes(1); process.emit("SIGUSR1"); @@ -109,6 +112,7 @@ describe("runGatewayLoop", () => { reason: "gateway restarting", restartExpectedMs: 1500, }); + expect(markGatewaySigusr1RestartHandled).toHaveBeenCalledTimes(2); expect(resetAllLanes).toHaveBeenCalledTimes(2); } finally { removeNewSignalListeners("SIGTERM", beforeSigterm); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index ec582fdcb..7cd1902f5 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -4,6 +4,7 @@ import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed, + markGatewaySigusr1RestartHandled, } from "../../infra/restart.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { @@ -108,6 +109,7 @@ export async function runGatewayLoop(params: { ); return; } + markGatewaySigusr1RestartHandled(); request("restart", "SIGUSR1"); }; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 28ea99b60..b099364cb 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -6,7 +6,7 @@ import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; +import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { resolveSessionFilePath } from "../../config/sessions.js"; @@ -524,36 +524,40 @@ export const chatHandlers: GatewayRequestHandlers = { }); let agentRunStarted = false; - void dispatchInboundMessage({ - ctx, - cfg, + void withReplyDispatcher({ dispatcher, - replyOptions: { - runId: clientRunId, - abortSignal: abortController.signal, - images: parsedImages.length > 0 ? parsedImages : undefined, - disableBlockStreaming: true, - onAgentRunStart: (runId) => { - agentRunStarted = true; - const connId = typeof client?.connId === "string" ? client.connId : undefined; - const wantsToolEvents = hasGatewayClientCap( - client?.connect?.caps, - GATEWAY_CLIENT_CAPS.TOOL_EVENTS, - ); - if (connId && wantsToolEvents) { - context.registerToolEventRecipient(runId, connId); - // Register for any other active runs *in the same session* so - // late-joining clients (e.g. page refresh mid-response) receive - // in-progress tool events without leaking cross-session data. - for (const [activeRunId, active] of context.chatAbortControllers) { - if (activeRunId !== runId && active.sessionKey === p.sessionKey) { - context.registerToolEventRecipient(activeRunId, connId); + run: () => + dispatchInboundMessage({ + ctx, + cfg, + dispatcher, + replyOptions: { + runId: clientRunId, + abortSignal: abortController.signal, + images: parsedImages.length > 0 ? parsedImages : undefined, + disableBlockStreaming: true, + onAgentRunStart: (runId) => { + agentRunStarted = true; + const connId = typeof client?.connId === "string" ? client.connId : undefined; + const wantsToolEvents = hasGatewayClientCap( + client?.connect?.caps, + GATEWAY_CLIENT_CAPS.TOOL_EVENTS, + ); + if (connId && wantsToolEvents) { + context.registerToolEventRecipient(runId, connId); + // Register for any other active runs *in the same session* so + // late-joining clients (e.g. page refresh mid-response) receive + // in-progress tool events without leaking cross-session data. + for (const [activeRunId, active] of context.chatAbortControllers) { + if (activeRunId !== runId && active.sessionKey === p.sessionKey) { + context.registerToolEventRecipient(activeRunId, connId); + } + } } - } - } - }, - onModelSelected, - }, + }, + onModelSelected, + }, + }), }) .then(() => { if (!agentRunStarted) { diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 02ec35bc3..6a2dfd2cb 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -8,7 +8,11 @@ import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../conf import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { resetDirectoryCache } from "../infra/outbound/target-resolver.js"; -import { emitGatewayRestart, setGatewaySigusr1RestartPolicy } from "../infra/restart.js"; +import { + deferGatewayRestartUntilIdle, + emitGatewayRestart, + setGatewaySigusr1RestartPolicy, +} from "../infra/restart.js"; import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; import { resolveHooksConfig } from "./hooks.js"; @@ -155,13 +159,33 @@ export function createGatewayReloadHandlers(params: { return; } - // Check if there are active operations (commands in queue, pending replies, or embedded runs) - const queueSize = getTotalQueueSize(); - const pendingReplies = getTotalPendingReplies(); - const embeddedRuns = getActiveEmbeddedRunCount(); - const totalActive = queueSize + pendingReplies + embeddedRuns; + const getActiveCounts = () => { + const queueSize = getTotalQueueSize(); + const pendingReplies = getTotalPendingReplies(); + const embeddedRuns = getActiveEmbeddedRunCount(); + return { + queueSize, + pendingReplies, + embeddedRuns, + totalActive: queueSize + pendingReplies + embeddedRuns, + }; + }; + const formatActiveDetails = (counts: ReturnType) => { + const details = []; + if (counts.queueSize > 0) { + details.push(`${counts.queueSize} operation(s)`); + } + if (counts.pendingReplies > 0) { + details.push(`${counts.pendingReplies} reply(ies)`); + } + if (counts.embeddedRuns > 0) { + details.push(`${counts.embeddedRuns} embedded run(s)`); + } + return details; + }; + const active = getActiveCounts(); - if (totalActive > 0) { + if (active.totalActive > 0) { // Avoid spinning up duplicate polling loops from repeated config changes. if (restartPending) { params.logReload.info( @@ -170,63 +194,40 @@ export function createGatewayReloadHandlers(params: { return; } restartPending = true; - const details = []; - if (queueSize > 0) { - details.push(`${queueSize} queued operation(s)`); - } - if (pendingReplies > 0) { - details.push(`${pendingReplies} pending reply(ies)`); - } - if (embeddedRuns > 0) { - details.push(`${embeddedRuns} embedded run(s)`); - } + const initialDetails = formatActiveDetails(active); params.logReload.warn( - `config change requires gateway restart (${reasons}) — deferring until ${details.join(", ")} complete`, + `config change requires gateway restart (${reasons}) — deferring until ${initialDetails.join(", ")} complete`, ); - // Wait for all operations and replies to complete before restarting (max 30 seconds) - const maxWaitMs = 30_000; - const checkIntervalMs = 500; - const startTime = Date.now(); - - const checkAndRestart = () => { - const currentQueueSize = getTotalQueueSize(); - const currentPendingReplies = getTotalPendingReplies(); - const currentEmbeddedRuns = getActiveEmbeddedRunCount(); - const currentTotalActive = currentQueueSize + currentPendingReplies + currentEmbeddedRuns; - const elapsed = Date.now() - startTime; - - if (currentTotalActive === 0) { - restartPending = false; - params.logReload.info("all operations and replies completed; restarting gateway now"); - emitGatewayRestart(); - } else if (elapsed >= maxWaitMs) { - const remainingDetails = []; - if (currentQueueSize > 0) { - remainingDetails.push(`${currentQueueSize} operation(s)`); - } - if (currentPendingReplies > 0) { - remainingDetails.push(`${currentPendingReplies} reply(ies)`); - } - if (currentEmbeddedRuns > 0) { - remainingDetails.push(`${currentEmbeddedRuns} embedded run(s)`); - } - restartPending = false; - params.logReload.warn( - `restart timeout after ${elapsed}ms with ${remainingDetails.join(", ")} still active; restarting anyway`, - ); - emitGatewayRestart(); - } else { - // Check again soon - setTimeout(checkAndRestart, checkIntervalMs); - } - }; - - setTimeout(checkAndRestart, checkIntervalMs); + deferGatewayRestartUntilIdle({ + getPendingCount: () => getActiveCounts().totalActive, + hooks: { + onReady: () => { + restartPending = false; + params.logReload.info("all operations and replies completed; restarting gateway now"); + }, + onTimeout: (_pending, elapsedMs) => { + const remaining = formatActiveDetails(getActiveCounts()); + restartPending = false; + params.logReload.warn( + `restart timeout after ${elapsedMs}ms with ${remaining.join(", ")} still active; restarting anyway`, + ); + }, + onCheckError: (err) => { + restartPending = false; + params.logReload.warn( + `restart deferral check failed (${String(err)}); restarting gateway now`, + ); + }, + }, + }); } else { // No active operations or pending replies, restart immediately params.logReload.warn(`config change requires gateway restart (${reasons})`); - emitGatewayRestart(); + const emitted = emitGatewayRestart(); + if (!emitted) { + params.logReload.info("gateway restart already scheduled; skipping duplicate signal"); + } } }; diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 445fe73ae..771003f2f 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -3,7 +3,7 @@ import type { IMessagePayload, MonitorIMessageOpts } from "./types.js"; import { resolveHumanDelayConfig } from "../../agents/identity.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import { hasControlCommand } from "../../auto-reply/command-detection.js"; -import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; +import { dispatchInboundMessage, withReplyDispatcher } from "../../auto-reply/dispatch.js"; import { formatInboundEnvelope, formatInboundFromLabel, @@ -647,17 +647,21 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); - const { queuedFinal } = await dispatchInboundMessage({ - ctx: ctxPayload, - cfg, + const { queuedFinal } = await withReplyDispatcher({ dispatcher, - replyOptions: { - disableBlockStreaming: - typeof accountInfo.config.blockStreaming === "boolean" - ? !accountInfo.config.blockStreaming - : undefined, - onModelSelected, - }, + run: () => + dispatchInboundMessage({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + disableBlockStreaming: + typeof accountInfo.config.blockStreaming === "boolean" + ? !accountInfo.config.blockStreaming + : undefined, + onModelSelected, + }, + }), }); if (!queuedFinal) { diff --git a/src/infra/infra-runtime.test.ts b/src/infra/infra-runtime.test.ts index 61e7dff43..78e6d15f9 100644 --- a/src/infra/infra-runtime.test.ts +++ b/src/infra/infra-runtime.test.ts @@ -6,7 +6,9 @@ import { ensureBinary } from "./binaries.js"; import { __testing, consumeGatewaySigusr1RestartAuthorization, + emitGatewayRestart, isGatewaySigusr1RestartExternallyAllowed, + markGatewaySigusr1RestartHandled, scheduleGatewaySigusr1Restart, setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck, @@ -100,6 +102,25 @@ describe("infra runtime", () => { setGatewaySigusr1RestartPolicy({ allowExternal: true }); expect(isGatewaySigusr1RestartExternallyAllowed()).toBe(true); }); + + it("suppresses duplicate emit until the restart cycle is marked handled", () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + expect(emitGatewayRestart()).toBe(true); + expect(emitGatewayRestart()).toBe(false); + expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true); + + markGatewaySigusr1RestartHandled(); + + expect(emitGatewayRestart()).toBe(true); + const sigusr1Emits = emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1"); + expect(sigusr1Emits.length).toBe(2); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); }); describe("pre-restart deferral check", () => { diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 830d07310..60540884b 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -13,12 +13,20 @@ export type RestartAttempt = { const SPAWN_TIMEOUT_MS = 2000; const SIGUSR1_AUTH_GRACE_MS = 5000; +const DEFAULT_DEFERRAL_POLL_MS = 500; +const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000; let sigusr1AuthorizedCount = 0; let sigusr1AuthorizedUntil = 0; let sigusr1ExternalAllowed = false; let preRestartCheck: (() => number) | null = null; -let sigusr1Emitted = false; +let restartCycleToken = 0; +let emittedRestartToken = 0; +let consumedRestartToken = 0; + +function hasUnconsumedRestartSignal(): boolean { + return emittedRestartToken > consumedRestartToken; +} /** * Register a callback that scheduleGatewaySigusr1Restart checks before emitting SIGUSR1. @@ -35,10 +43,11 @@ export function setPreRestartDeferralCheck(fn: () => number): void { * to ensure only one restart fires. */ export function emitGatewayRestart(): boolean { - if (sigusr1Emitted) { + if (hasUnconsumedRestartSignal()) { return false; } - sigusr1Emitted = true; + const cycleToken = ++restartCycleToken; + emittedRestartToken = cycleToken; authorizeGatewaySigusr1Restart(); try { if (process.listenerCount("SIGUSR1") > 0) { @@ -47,7 +56,9 @@ export function emitGatewayRestart(): boolean { process.kill(process.pid, "SIGUSR1"); } } catch { - /* ignore */ + // Roll back the cycle marker so future restart requests can still proceed. + emittedRestartToken = consumedRestartToken; + return false; } return true; } @@ -85,10 +96,6 @@ export function consumeGatewaySigusr1RestartAuthorization(): boolean { if (sigusr1AuthorizedCount <= 0) { return false; } - // Reset the emission guard so the next restart cycle can fire. - // The run loop re-enters startGatewayServer() after close(), which - // re-registers setPreRestartDeferralCheck and can schedule new restarts. - sigusr1Emitted = false; sigusr1AuthorizedCount -= 1; if (sigusr1AuthorizedCount <= 0) { sigusr1AuthorizedUntil = 0; @@ -96,6 +103,80 @@ export function consumeGatewaySigusr1RestartAuthorization(): boolean { return true; } +/** + * Mark the currently emitted SIGUSR1 restart cycle as consumed by the run loop. + * This explicitly advances the cycle state instead of resetting emit guards inside + * consumeGatewaySigusr1RestartAuthorization(). + */ +export function markGatewaySigusr1RestartHandled(): void { + if (hasUnconsumedRestartSignal()) { + consumedRestartToken = emittedRestartToken; + } +} + +export type RestartDeferralHooks = { + onDeferring?: (pending: number) => void; + onReady?: () => void; + onTimeout?: (pending: number, elapsedMs: number) => void; + onCheckError?: (err: unknown) => void; +}; + +/** + * Poll pending work until it drains (or times out), then emit one restart signal. + * Shared by both the direct RPC restart path and the config watcher path. + */ +export function deferGatewayRestartUntilIdle(opts: { + getPendingCount: () => number; + hooks?: RestartDeferralHooks; + pollMs?: number; + maxWaitMs?: number; +}): void { + const pollMsRaw = opts.pollMs ?? DEFAULT_DEFERRAL_POLL_MS; + const pollMs = Math.max(10, Math.floor(pollMsRaw)); + const maxWaitMsRaw = opts.maxWaitMs ?? DEFAULT_DEFERRAL_MAX_WAIT_MS; + const maxWaitMs = Math.max(pollMs, Math.floor(maxWaitMsRaw)); + + let pending: number; + try { + pending = opts.getPendingCount(); + } catch (err) { + opts.hooks?.onCheckError?.(err); + emitGatewayRestart(); + return; + } + if (pending <= 0) { + opts.hooks?.onReady?.(); + emitGatewayRestart(); + return; + } + + opts.hooks?.onDeferring?.(pending); + const startedAt = Date.now(); + const poll = setInterval(() => { + let current: number; + try { + current = opts.getPendingCount(); + } catch (err) { + clearInterval(poll); + opts.hooks?.onCheckError?.(err); + emitGatewayRestart(); + return; + } + if (current <= 0) { + clearInterval(poll); + opts.hooks?.onReady?.(); + emitGatewayRestart(); + return; + } + const elapsedMs = Date.now() - startedAt; + if (elapsedMs >= maxWaitMs) { + clearInterval(poll); + opts.hooks?.onTimeout?.(current, elapsedMs); + emitGatewayRestart(); + } + }, pollMs); +} + function formatSpawnDetail(result: { error?: unknown; status?: number | null; @@ -227,40 +308,14 @@ export function scheduleGatewaySigusr1Restart(opts?: { typeof opts?.reason === "string" && opts.reason.trim() ? opts.reason.trim().slice(0, 200) : undefined; - const DEFERRAL_POLL_MS = 500; - const DEFERRAL_MAX_WAIT_MS = 30_000; setTimeout(() => { - if (!preRestartCheck) { + const pendingCheck = preRestartCheck; + if (!pendingCheck) { emitGatewayRestart(); return; } - let pending: number; - try { - pending = preRestartCheck(); - } catch { - emitGatewayRestart(); - return; - } - if (pending <= 0) { - emitGatewayRestart(); - return; - } - // Poll until pending work drains or timeout - let waited = 0; - const poll = setInterval(() => { - waited += DEFERRAL_POLL_MS; - let current: number; - try { - current = preRestartCheck!(); - } catch { - current = 0; - } - if (current <= 0 || waited >= DEFERRAL_MAX_WAIT_MS) { - clearInterval(poll); - emitGatewayRestart(); - } - }, DEFERRAL_POLL_MS); + deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck }); }, delayMs); return { ok: true, @@ -278,6 +333,8 @@ export const __testing = { sigusr1AuthorizedUntil = 0; sigusr1ExternalAllowed = false; preRestartCheck = null; - sigusr1Emitted = false; + restartCycleToken = 0; + emittedRestartToken = 0; + consumedRestartToken = 0; }, }; diff --git a/src/macos/gateway-daemon.ts b/src/macos/gateway-daemon.ts index 38fd5485f..a33ca94e8 100644 --- a/src/macos/gateway-daemon.ts +++ b/src/macos/gateway-daemon.ts @@ -49,7 +49,11 @@ async function main() { { setGatewayWsLogStyle }, { setVerbose }, { acquireGatewayLock, GatewayLockError }, - { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed }, + { + consumeGatewaySigusr1RestartAuthorization, + isGatewaySigusr1RestartExternallyAllowed, + markGatewaySigusr1RestartHandled, + }, { defaultRuntime }, { enableConsoleCapture, setConsoleTimestampPrefix }, commandQueueMod, @@ -201,6 +205,7 @@ async function main() { ); return; } + markGatewaySigusr1RestartHandled(); request("restart", "SIGUSR1"); };