fix: harden gateway control-plane restart protections
This commit is contained in:
@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Security/Gateway: rate-limit control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) to 3 requests per minute per `deviceId+clientIp`, add restart single-flight coalescing plus a 30-second restart cooldown, and log actor/device/ip with changed-path audit details for config/update-triggered restarts.
|
||||
- Commands/Doctor: skip embedding-provider warnings when `memory.backend` is `qmd`, because QMD manages embeddings internally and does not require `memorySearch` providers. (#17263) Thanks @miloudbelarebia.
|
||||
- Security/Webhooks: harden Feishu and Zalo webhook ingress with webhook-mode token preconditions, loopback-default Feishu bind host, JSON content-type enforcement, per-path rate limiting, replay dedupe for Zalo events, constant-time Zalo secret comparison, and anomaly status counters.
|
||||
- Security/Plugins: add explicit `plugins.runtime.allowLegacyExec` opt-in to re-enable deprecated `runtime.system.runCommandWithTimeout` for legacy modules while keeping runtime command execution disabled by default. (#20874) Thanks @mbelinky.
|
||||
|
||||
@@ -370,6 +370,10 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang
|
||||
|
||||
## Config RPC (programmatic updates)
|
||||
|
||||
<Note>
|
||||
Control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) are rate-limited to **3 requests per 60 seconds** per `deviceId+clientIp`. When limited, the RPC returns `UNAVAILABLE` with `retryAfterMs`.
|
||||
</Note>
|
||||
|
||||
<AccordionGroup>
|
||||
<Accordion title="config.apply (full replace)">
|
||||
Validates + writes the full config and restarts the Gateway in one step.
|
||||
@@ -386,6 +390,8 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang
|
||||
- `note` (optional) — note for the restart sentinel
|
||||
- `restartDelayMs` (optional) — delay before restart (default 2000)
|
||||
|
||||
Restart requests are coalesced while one is already pending/in-flight, and a 30-second cooldown applies between restart cycles.
|
||||
|
||||
```bash
|
||||
openclaw gateway call config.get --params '{}' # capture payload.hash
|
||||
openclaw gateway call config.apply --params '{
|
||||
@@ -410,6 +416,8 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang
|
||||
- `baseHash` (required) — config hash from `config.get`
|
||||
- `sessionKey`, `note`, `restartDelayMs` — same as `config.apply`
|
||||
|
||||
Restart behavior matches `config.apply`: coalesced pending restarts plus a 30-second cooldown between restart cycles.
|
||||
|
||||
```bash
|
||||
openclaw gateway call config.patch --params '{
|
||||
"raw": "{ channels: { telegram: { groups: { \"*\": { requireMention: false } } } } }",
|
||||
|
||||
40
src/gateway/control-plane-audit.ts
Normal file
40
src/gateway/control-plane-audit.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import type { GatewayClient } from "./server-methods/types.js";
|
||||
|
||||
export type ControlPlaneActor = {
|
||||
actor: string;
|
||||
deviceId: string;
|
||||
clientIp: string;
|
||||
connId: string;
|
||||
};
|
||||
|
||||
function normalizePart(value: unknown, fallback: string): string {
|
||||
if (typeof value !== "string") {
|
||||
return fallback;
|
||||
}
|
||||
const normalized = value.trim();
|
||||
return normalized.length > 0 ? normalized : fallback;
|
||||
}
|
||||
|
||||
export function resolveControlPlaneActor(client: GatewayClient | null): ControlPlaneActor {
|
||||
return {
|
||||
actor: normalizePart(client?.connect?.client?.id, "unknown-actor"),
|
||||
deviceId: normalizePart(client?.connect?.device?.id, "unknown-device"),
|
||||
clientIp: normalizePart(client?.clientIp, "unknown-ip"),
|
||||
connId: normalizePart(client?.connId, "unknown-conn"),
|
||||
};
|
||||
}
|
||||
|
||||
export function formatControlPlaneActor(actor: ControlPlaneActor): string {
|
||||
return `actor=${actor.actor} device=${actor.deviceId} ip=${actor.clientIp} conn=${actor.connId}`;
|
||||
}
|
||||
|
||||
export function summarizeChangedPaths(paths: string[], maxPaths = 8): string {
|
||||
if (paths.length === 0) {
|
||||
return "<none>";
|
||||
}
|
||||
if (paths.length <= maxPaths) {
|
||||
return paths.join(",");
|
||||
}
|
||||
const head = paths.slice(0, maxPaths).join(",");
|
||||
return `${head},+${paths.length - maxPaths} more`;
|
||||
}
|
||||
79
src/gateway/control-plane-rate-limit.ts
Normal file
79
src/gateway/control-plane-rate-limit.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import type { GatewayClient } from "./server-methods/types.js";
|
||||
|
||||
const CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS = 3;
|
||||
const CONTROL_PLANE_RATE_LIMIT_WINDOW_MS = 60_000;
|
||||
|
||||
type Bucket = {
|
||||
count: number;
|
||||
windowStartMs: number;
|
||||
};
|
||||
|
||||
const controlPlaneBuckets = new Map<string, Bucket>();
|
||||
|
||||
function normalizePart(value: unknown, fallback: string): string {
|
||||
if (typeof value !== "string") {
|
||||
return fallback;
|
||||
}
|
||||
const normalized = value.trim();
|
||||
return normalized.length > 0 ? normalized : fallback;
|
||||
}
|
||||
|
||||
export function resolveControlPlaneRateLimitKey(client: GatewayClient | null): string {
|
||||
const deviceId = normalizePart(client?.connect?.device?.id, "unknown-device");
|
||||
const clientIp = normalizePart(client?.clientIp, "unknown-ip");
|
||||
return `${deviceId}|${clientIp}`;
|
||||
}
|
||||
|
||||
export function consumeControlPlaneWriteBudget(params: {
|
||||
client: GatewayClient | null;
|
||||
nowMs?: number;
|
||||
}): {
|
||||
allowed: boolean;
|
||||
retryAfterMs: number;
|
||||
remaining: number;
|
||||
key: string;
|
||||
} {
|
||||
const nowMs = params.nowMs ?? Date.now();
|
||||
const key = resolveControlPlaneRateLimitKey(params.client);
|
||||
const bucket = controlPlaneBuckets.get(key);
|
||||
|
||||
if (!bucket || nowMs - bucket.windowStartMs >= CONTROL_PLANE_RATE_LIMIT_WINDOW_MS) {
|
||||
controlPlaneBuckets.set(key, {
|
||||
count: 1,
|
||||
windowStartMs: nowMs,
|
||||
});
|
||||
return {
|
||||
allowed: true,
|
||||
retryAfterMs: 0,
|
||||
remaining: CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS - 1,
|
||||
key,
|
||||
};
|
||||
}
|
||||
|
||||
if (bucket.count >= CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS) {
|
||||
const retryAfterMs = Math.max(
|
||||
0,
|
||||
bucket.windowStartMs + CONTROL_PLANE_RATE_LIMIT_WINDOW_MS - nowMs,
|
||||
);
|
||||
return {
|
||||
allowed: false,
|
||||
retryAfterMs,
|
||||
remaining: 0,
|
||||
key,
|
||||
};
|
||||
}
|
||||
|
||||
bucket.count += 1;
|
||||
return {
|
||||
allowed: true,
|
||||
retryAfterMs: 0,
|
||||
remaining: Math.max(0, CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS - bucket.count),
|
||||
key,
|
||||
};
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
resetControlPlaneRateLimitState() {
|
||||
controlPlaneBuckets.clear();
|
||||
},
|
||||
};
|
||||
124
src/gateway/server-methods.control-plane-rate-limit.test.ts
Normal file
124
src/gateway/server-methods.control-plane-rate-limit.test.ts
Normal file
@@ -0,0 +1,124 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { GatewayRequestHandler } from "./server-methods/types.js";
|
||||
import { __testing as controlPlaneRateLimitTesting } from "./control-plane-rate-limit.js";
|
||||
import { handleGatewayRequest } from "./server-methods.js";
|
||||
|
||||
const noWebchat = () => false;
|
||||
|
||||
describe("gateway control-plane write rate limit", () => {
|
||||
beforeEach(() => {
|
||||
controlPlaneRateLimitTesting.resetControlPlaneRateLimitState();
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-02-19T00:00:00.000Z"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
controlPlaneRateLimitTesting.resetControlPlaneRateLimitState();
|
||||
});
|
||||
|
||||
function buildContext(logWarn = vi.fn()) {
|
||||
return {
|
||||
logGateway: {
|
||||
warn: logWarn,
|
||||
},
|
||||
} as unknown as Parameters<typeof handleGatewayRequest>[0]["context"];
|
||||
}
|
||||
|
||||
function buildClient() {
|
||||
return {
|
||||
connect: {
|
||||
role: "operator",
|
||||
scopes: ["operator.admin"],
|
||||
client: {
|
||||
id: "openclaw-control-ui",
|
||||
version: "1.0.0",
|
||||
platform: "darwin",
|
||||
mode: "ui",
|
||||
},
|
||||
minProtocol: 1,
|
||||
maxProtocol: 1,
|
||||
},
|
||||
connId: "conn-1",
|
||||
clientIp: "10.0.0.5",
|
||||
} as Parameters<typeof handleGatewayRequest>[0]["client"];
|
||||
}
|
||||
|
||||
async function runRequest(params: {
|
||||
method: string;
|
||||
context: Parameters<typeof handleGatewayRequest>[0]["context"];
|
||||
client: Parameters<typeof handleGatewayRequest>[0]["client"];
|
||||
handler: GatewayRequestHandler;
|
||||
}) {
|
||||
const respond = vi.fn();
|
||||
await handleGatewayRequest({
|
||||
req: {
|
||||
type: "req",
|
||||
id: crypto.randomUUID(),
|
||||
method: params.method,
|
||||
},
|
||||
respond,
|
||||
client: params.client,
|
||||
isWebchatConnect: noWebchat,
|
||||
context: params.context,
|
||||
extraHandlers: {
|
||||
[params.method]: params.handler,
|
||||
},
|
||||
});
|
||||
return respond;
|
||||
}
|
||||
|
||||
it("allows 3 control-plane writes and blocks the 4th in the same minute", async () => {
|
||||
const handlerCalls = vi.fn();
|
||||
const handler: GatewayRequestHandler = (opts) => {
|
||||
handlerCalls(opts);
|
||||
opts.respond(true, undefined, undefined);
|
||||
};
|
||||
const logWarn = vi.fn();
|
||||
const context = buildContext(logWarn);
|
||||
const client = buildClient();
|
||||
|
||||
await runRequest({ method: "config.patch", context, client, handler });
|
||||
await runRequest({ method: "config.patch", context, client, handler });
|
||||
await runRequest({ method: "config.patch", context, client, handler });
|
||||
const blocked = await runRequest({ method: "config.patch", context, client, handler });
|
||||
|
||||
expect(handlerCalls).toHaveBeenCalledTimes(3);
|
||||
expect(blocked).toHaveBeenCalledWith(
|
||||
false,
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
retryable: true,
|
||||
}),
|
||||
);
|
||||
expect(logWarn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("resets the control-plane write budget after 60 seconds", async () => {
|
||||
const handlerCalls = vi.fn();
|
||||
const handler: GatewayRequestHandler = (opts) => {
|
||||
handlerCalls(opts);
|
||||
opts.respond(true, undefined, undefined);
|
||||
};
|
||||
const context = buildContext();
|
||||
const client = buildClient();
|
||||
|
||||
await runRequest({ method: "update.run", context, client, handler });
|
||||
await runRequest({ method: "update.run", context, client, handler });
|
||||
await runRequest({ method: "update.run", context, client, handler });
|
||||
|
||||
const blocked = await runRequest({ method: "update.run", context, client, handler });
|
||||
expect(blocked).toHaveBeenCalledWith(
|
||||
false,
|
||||
undefined,
|
||||
expect.objectContaining({ code: "UNAVAILABLE" }),
|
||||
);
|
||||
|
||||
vi.advanceTimersByTime(60_001);
|
||||
|
||||
const allowed = await runRequest({ method: "update.run", context, client, handler });
|
||||
expect(allowed).toHaveBeenCalledWith(true, undefined, undefined);
|
||||
expect(handlerCalls).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
});
|
||||
@@ -1,3 +1,6 @@
|
||||
import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js";
|
||||
import { formatControlPlaneActor, resolveControlPlaneActor } from "./control-plane-audit.js";
|
||||
import { consumeControlPlaneWriteBudget } from "./control-plane-rate-limit.js";
|
||||
import { ErrorCodes, errorShape } from "./protocol/index.js";
|
||||
import { agentHandlers } from "./server-methods/agent.js";
|
||||
import { agentsHandlers } from "./server-methods/agents.js";
|
||||
@@ -20,7 +23,6 @@ import { skillsHandlers } from "./server-methods/skills.js";
|
||||
import { systemHandlers } from "./server-methods/system.js";
|
||||
import { talkHandlers } from "./server-methods/talk.js";
|
||||
import { ttsHandlers } from "./server-methods/tts.js";
|
||||
import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js";
|
||||
import { updateHandlers } from "./server-methods/update.js";
|
||||
import { usageHandlers } from "./server-methods/usage.js";
|
||||
import { voicewakeHandlers } from "./server-methods/voicewake.js";
|
||||
@@ -98,6 +100,7 @@ const WRITE_METHODS = new Set([
|
||||
"browser.request",
|
||||
"push.test",
|
||||
]);
|
||||
const CONTROL_PLANE_WRITE_METHODS = new Set(["config.apply", "config.patch", "update.run"]);
|
||||
|
||||
function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) {
|
||||
if (!client?.connect) {
|
||||
@@ -209,6 +212,32 @@ export async function handleGatewayRequest(
|
||||
respond(false, undefined, authError);
|
||||
return;
|
||||
}
|
||||
if (CONTROL_PLANE_WRITE_METHODS.has(req.method)) {
|
||||
const budget = consumeControlPlaneWriteBudget({ client });
|
||||
if (!budget.allowed) {
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
context.logGateway.warn(
|
||||
`control-plane write rate-limited method=${req.method} ${formatControlPlaneActor(actor)} retryAfterMs=${budget.retryAfterMs} key=${budget.key}`,
|
||||
);
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.UNAVAILABLE,
|
||||
`rate limit exceeded for ${req.method}; retry after ${Math.ceil(budget.retryAfterMs / 1000)}s`,
|
||||
{
|
||||
retryable: true,
|
||||
retryAfterMs: budget.retryAfterMs,
|
||||
details: {
|
||||
method: req.method,
|
||||
limit: "3 per 60s",
|
||||
},
|
||||
},
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
const handler = opts.extraHandlers?.[req.method] ?? coreGatewayHandlers[req.method];
|
||||
if (!handler) {
|
||||
respond(
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { GatewayRequestHandlers, RespondFn } from "./types.js";
|
||||
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
|
||||
import { listChannelPlugins } from "../../channels/plugins/index.js";
|
||||
import {
|
||||
@@ -19,7 +21,6 @@ import {
|
||||
} from "../../config/redact-snapshot.js";
|
||||
import { buildConfigSchema, type ConfigSchemaResponse } from "../../config/schema.js";
|
||||
import { extractDeliveryInfo } from "../../config/sessions.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import {
|
||||
formatDoctorNonInteractiveHint,
|
||||
type RestartSentinelPayload,
|
||||
@@ -27,6 +28,12 @@ import {
|
||||
} from "../../infra/restart-sentinel.js";
|
||||
import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
||||
import { loadOpenClawPlugins } from "../../plugins/loader.js";
|
||||
import { diffConfigPaths } from "../config-reload.js";
|
||||
import {
|
||||
formatControlPlaneActor,
|
||||
resolveControlPlaneActor,
|
||||
summarizeChangedPaths,
|
||||
} from "../control-plane-audit.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
@@ -38,7 +45,6 @@ import {
|
||||
} from "../protocol/index.js";
|
||||
import { resolveBaseHashParam } from "./base-hash.js";
|
||||
import { parseRestartRequestParams } from "./restart-request.js";
|
||||
import type { GatewayRequestHandlers, RespondFn } from "./types.js";
|
||||
import { assertValidParams } from "./validation.js";
|
||||
|
||||
function requireConfigBaseHash(
|
||||
@@ -275,7 +281,7 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
undefined,
|
||||
);
|
||||
},
|
||||
"config.patch": async ({ params, respond }) => {
|
||||
"config.patch": async ({ params, respond, client, context }) => {
|
||||
if (!assertValidParams(params, validateConfigPatchParams, "config.patch", respond)) {
|
||||
return;
|
||||
}
|
||||
@@ -349,6 +355,11 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const changedPaths = diffConfigPaths(snapshot.config, validated.config);
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
context?.logGateway?.info(
|
||||
`config.patch write ${formatControlPlaneActor(actor)} changedPaths=${summarizeChangedPaths(changedPaths)} restartReason=config.patch`,
|
||||
);
|
||||
await writeConfigFile(validated.config, writeOptions);
|
||||
|
||||
const { sessionKey, note, restartDelayMs, deliveryContext, threadId } =
|
||||
@@ -365,7 +376,18 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
const restart = scheduleGatewaySigusr1Restart({
|
||||
delayMs: restartDelayMs,
|
||||
reason: "config.patch",
|
||||
audit: {
|
||||
actor: actor.actor,
|
||||
deviceId: actor.deviceId,
|
||||
clientIp: actor.clientIp,
|
||||
changedPaths,
|
||||
},
|
||||
});
|
||||
if (restart.coalesced) {
|
||||
context?.logGateway?.warn(
|
||||
`config.patch restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`,
|
||||
);
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
@@ -381,7 +403,7 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
undefined,
|
||||
);
|
||||
},
|
||||
"config.apply": async ({ params, respond }) => {
|
||||
"config.apply": async ({ params, respond, client, context }) => {
|
||||
if (!assertValidParams(params, validateConfigApplyParams, "config.apply", respond)) {
|
||||
return;
|
||||
}
|
||||
@@ -393,6 +415,11 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
if (!parsed) {
|
||||
return;
|
||||
}
|
||||
const changedPaths = diffConfigPaths(snapshot.config, parsed.config);
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
context?.logGateway?.info(
|
||||
`config.apply write ${formatControlPlaneActor(actor)} changedPaths=${summarizeChangedPaths(changedPaths)} restartReason=config.apply`,
|
||||
);
|
||||
await writeConfigFile(parsed.config, writeOptions);
|
||||
|
||||
const { sessionKey, note, restartDelayMs, deliveryContext, threadId } =
|
||||
@@ -409,7 +436,18 @@ export const configHandlers: GatewayRequestHandlers = {
|
||||
const restart = scheduleGatewaySigusr1Restart({
|
||||
delayMs: restartDelayMs,
|
||||
reason: "config.apply",
|
||||
audit: {
|
||||
actor: actor.actor,
|
||||
deviceId: actor.deviceId,
|
||||
clientIp: actor.clientIp,
|
||||
changedPaths,
|
||||
},
|
||||
});
|
||||
if (restart.coalesced) {
|
||||
context?.logGateway?.warn(
|
||||
`config.apply restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`,
|
||||
);
|
||||
}
|
||||
respond(
|
||||
true,
|
||||
{
|
||||
|
||||
@@ -17,6 +17,7 @@ type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
export type GatewayClient = {
|
||||
connect: ConnectParams;
|
||||
connId?: string;
|
||||
clientIp?: string;
|
||||
};
|
||||
|
||||
export type RespondFn = (
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { extractDeliveryInfo } from "../../config/sessions.js";
|
||||
import { resolveOpenClawPackageRoot } from "../../infra/openclaw-root.js";
|
||||
@@ -9,16 +10,17 @@ import {
|
||||
import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js";
|
||||
import { normalizeUpdateChannel } from "../../infra/update-channels.js";
|
||||
import { runGatewayUpdate } from "../../infra/update-runner.js";
|
||||
import { formatControlPlaneActor, resolveControlPlaneActor } from "../control-plane-audit.js";
|
||||
import { validateUpdateRunParams } from "../protocol/index.js";
|
||||
import { parseRestartRequestParams } from "./restart-request.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
import { assertValidParams } from "./validation.js";
|
||||
|
||||
export const updateHandlers: GatewayRequestHandlers = {
|
||||
"update.run": async ({ params, respond }) => {
|
||||
"update.run": async ({ params, respond, client, context }) => {
|
||||
if (!assertValidParams(params, validateUpdateRunParams, "update.run", respond)) {
|
||||
return;
|
||||
}
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params);
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs;
|
||||
@@ -98,8 +100,22 @@ export const updateHandlers: GatewayRequestHandlers = {
|
||||
? scheduleGatewaySigusr1Restart({
|
||||
delayMs: restartDelayMs,
|
||||
reason: "update.run",
|
||||
audit: {
|
||||
actor: actor.actor,
|
||||
deviceId: actor.deviceId,
|
||||
clientIp: actor.clientIp,
|
||||
changedPaths: [],
|
||||
},
|
||||
})
|
||||
: null;
|
||||
context?.logGateway?.info(
|
||||
`update.run completed ${formatControlPlaneActor(actor)} changedPaths=<n/a> restartReason=update.run status=${result.status}`,
|
||||
);
|
||||
if (restart?.coalesced) {
|
||||
context?.logGateway?.warn(
|
||||
`update.run restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`,
|
||||
);
|
||||
}
|
||||
|
||||
respond(
|
||||
true,
|
||||
|
||||
@@ -124,6 +124,56 @@ describe("infra runtime", () => {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
}
|
||||
});
|
||||
|
||||
it("coalesces duplicate scheduled restarts into a single pending timer", async () => {
|
||||
const emitSpy = vi.spyOn(process, "emit");
|
||||
const handler = () => {};
|
||||
process.on("SIGUSR1", handler);
|
||||
try {
|
||||
const first = scheduleGatewaySigusr1Restart({ delayMs: 1_000, reason: "first" });
|
||||
const second = scheduleGatewaySigusr1Restart({ delayMs: 1_000, reason: "second" });
|
||||
|
||||
expect(first.coalesced).toBe(false);
|
||||
expect(second.coalesced).toBe(true);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(999);
|
||||
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
const sigusr1Emits = emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1");
|
||||
expect(sigusr1Emits.length).toBe(1);
|
||||
} finally {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
}
|
||||
});
|
||||
|
||||
it("applies restart cooldown between emitted restart cycles", async () => {
|
||||
const emitSpy = vi.spyOn(process, "emit");
|
||||
const handler = () => {};
|
||||
process.on("SIGUSR1", handler);
|
||||
try {
|
||||
const first = scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "first" });
|
||||
expect(first.coalesced).toBe(false);
|
||||
expect(first.delayMs).toBe(0);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true);
|
||||
markGatewaySigusr1RestartHandled();
|
||||
|
||||
const second = scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "second" });
|
||||
expect(second.coalesced).toBe(false);
|
||||
expect(second.delayMs).toBe(30_000);
|
||||
expect(second.cooldownMsApplied).toBe(30_000);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(29_999);
|
||||
expect(emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1").length).toBe(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1").length).toBe(2);
|
||||
} finally {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("pre-restart deferral check", () => {
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
resolveGatewayLaunchAgentLabel,
|
||||
resolveGatewaySystemdServiceName,
|
||||
} from "../daemon/constants.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
|
||||
export type RestartAttempt = {
|
||||
ok: boolean;
|
||||
@@ -15,6 +16,9 @@ const SPAWN_TIMEOUT_MS = 2000;
|
||||
const SIGUSR1_AUTH_GRACE_MS = 5000;
|
||||
const DEFAULT_DEFERRAL_POLL_MS = 500;
|
||||
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000;
|
||||
const RESTART_COOLDOWN_MS = 30_000;
|
||||
|
||||
const restartLog = createSubsystemLogger("restart");
|
||||
|
||||
let sigusr1AuthorizedCount = 0;
|
||||
let sigusr1AuthorizedUntil = 0;
|
||||
@@ -23,11 +27,65 @@ let preRestartCheck: (() => number) | null = null;
|
||||
let restartCycleToken = 0;
|
||||
let emittedRestartToken = 0;
|
||||
let consumedRestartToken = 0;
|
||||
let lastRestartEmittedAt = 0;
|
||||
let pendingRestartTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let pendingRestartDueAt = 0;
|
||||
let pendingRestartReason: string | undefined;
|
||||
|
||||
function hasUnconsumedRestartSignal(): boolean {
|
||||
return emittedRestartToken > consumedRestartToken;
|
||||
}
|
||||
|
||||
function clearPendingScheduledRestart(): void {
|
||||
if (pendingRestartTimer) {
|
||||
clearTimeout(pendingRestartTimer);
|
||||
}
|
||||
pendingRestartTimer = null;
|
||||
pendingRestartDueAt = 0;
|
||||
pendingRestartReason = undefined;
|
||||
}
|
||||
|
||||
export type RestartAuditInfo = {
|
||||
actor?: string;
|
||||
deviceId?: string;
|
||||
clientIp?: string;
|
||||
changedPaths?: string[];
|
||||
};
|
||||
|
||||
function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null {
|
||||
if (!Array.isArray(paths) || paths.length === 0) {
|
||||
return null;
|
||||
}
|
||||
if (paths.length <= maxPaths) {
|
||||
return paths.join(",");
|
||||
}
|
||||
const head = paths.slice(0, maxPaths).join(",");
|
||||
return `${head},+${paths.length - maxPaths} more`;
|
||||
}
|
||||
|
||||
function formatRestartAudit(audit: RestartAuditInfo | undefined): string {
|
||||
const actor = typeof audit?.actor === "string" && audit.actor.trim() ? audit.actor.trim() : null;
|
||||
const deviceId =
|
||||
typeof audit?.deviceId === "string" && audit.deviceId.trim() ? audit.deviceId.trim() : null;
|
||||
const clientIp =
|
||||
typeof audit?.clientIp === "string" && audit.clientIp.trim() ? audit.clientIp.trim() : null;
|
||||
const changed = summarizeChangedPaths(audit?.changedPaths);
|
||||
const fields = [];
|
||||
if (actor) {
|
||||
fields.push(`actor=${actor}`);
|
||||
}
|
||||
if (deviceId) {
|
||||
fields.push(`device=${deviceId}`);
|
||||
}
|
||||
if (clientIp) {
|
||||
fields.push(`ip=${clientIp}`);
|
||||
}
|
||||
if (changed) {
|
||||
fields.push(`changedPaths=${changed}`);
|
||||
}
|
||||
return fields.length > 0 ? fields.join(" ") : "actor=<unknown>";
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a callback that scheduleGatewaySigusr1Restart checks before emitting SIGUSR1.
|
||||
* The callback should return the number of pending items (0 = safe to restart).
|
||||
@@ -44,8 +102,10 @@ export function setPreRestartDeferralCheck(fn: () => number): void {
|
||||
*/
|
||||
export function emitGatewayRestart(): boolean {
|
||||
if (hasUnconsumedRestartSignal()) {
|
||||
clearPendingScheduledRestart();
|
||||
return false;
|
||||
}
|
||||
clearPendingScheduledRestart();
|
||||
const cycleToken = ++restartCycleToken;
|
||||
emittedRestartToken = cycleToken;
|
||||
authorizeGatewaySigusr1Restart();
|
||||
@@ -60,6 +120,7 @@ export function emitGatewayRestart(): boolean {
|
||||
emittedRestartToken = consumedRestartToken;
|
||||
return false;
|
||||
}
|
||||
lastRestartEmittedAt = Date.now();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -293,11 +354,14 @@ export type ScheduledRestart = {
|
||||
delayMs: number;
|
||||
reason?: string;
|
||||
mode: "emit" | "signal";
|
||||
coalesced: boolean;
|
||||
cooldownMsApplied: number;
|
||||
};
|
||||
|
||||
export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
delayMs?: number;
|
||||
reason?: string;
|
||||
audit?: RestartAuditInfo;
|
||||
}): ScheduledRestart {
|
||||
const delayMsRaw =
|
||||
typeof opts?.delayMs === "number" && Number.isFinite(opts.delayMs)
|
||||
@@ -308,22 +372,77 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
typeof opts?.reason === "string" && opts.reason.trim()
|
||||
? opts.reason.trim().slice(0, 200)
|
||||
: undefined;
|
||||
const mode = process.listenerCount("SIGUSR1") > 0 ? "emit" : "signal";
|
||||
const nowMs = Date.now();
|
||||
const cooldownMsApplied = Math.max(0, lastRestartEmittedAt + RESTART_COOLDOWN_MS - nowMs);
|
||||
const requestedDueAt = nowMs + delayMs + cooldownMsApplied;
|
||||
|
||||
setTimeout(() => {
|
||||
const pendingCheck = preRestartCheck;
|
||||
if (!pendingCheck) {
|
||||
emitGatewayRestart();
|
||||
return;
|
||||
if (hasUnconsumedRestartSignal()) {
|
||||
restartLog.warn(
|
||||
`restart request coalesced (already in-flight) reason=${reason ?? "unspecified"} ${formatRestartAudit(opts?.audit)}`,
|
||||
);
|
||||
return {
|
||||
ok: true,
|
||||
pid: process.pid,
|
||||
signal: "SIGUSR1",
|
||||
delayMs: 0,
|
||||
reason,
|
||||
mode,
|
||||
coalesced: true,
|
||||
cooldownMsApplied,
|
||||
};
|
||||
}
|
||||
|
||||
if (pendingRestartTimer) {
|
||||
const remainingMs = Math.max(0, pendingRestartDueAt - nowMs);
|
||||
const shouldPullEarlier = requestedDueAt < pendingRestartDueAt;
|
||||
if (shouldPullEarlier) {
|
||||
restartLog.warn(
|
||||
`restart request rescheduled earlier reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} oldDelayMs=${remainingMs} newDelayMs=${Math.max(0, requestedDueAt - nowMs)} ${formatRestartAudit(opts?.audit)}`,
|
||||
);
|
||||
clearPendingScheduledRestart();
|
||||
} else {
|
||||
restartLog.warn(
|
||||
`restart request coalesced (already scheduled) reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} delayMs=${remainingMs} ${formatRestartAudit(opts?.audit)}`,
|
||||
);
|
||||
return {
|
||||
ok: true,
|
||||
pid: process.pid,
|
||||
signal: "SIGUSR1",
|
||||
delayMs: remainingMs,
|
||||
reason,
|
||||
mode,
|
||||
coalesced: true,
|
||||
cooldownMsApplied,
|
||||
};
|
||||
}
|
||||
deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck });
|
||||
}, delayMs);
|
||||
}
|
||||
|
||||
pendingRestartDueAt = requestedDueAt;
|
||||
pendingRestartReason = reason;
|
||||
pendingRestartTimer = setTimeout(
|
||||
() => {
|
||||
pendingRestartTimer = null;
|
||||
pendingRestartDueAt = 0;
|
||||
pendingRestartReason = undefined;
|
||||
const pendingCheck = preRestartCheck;
|
||||
if (!pendingCheck) {
|
||||
emitGatewayRestart();
|
||||
return;
|
||||
}
|
||||
deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck });
|
||||
},
|
||||
Math.max(0, requestedDueAt - nowMs),
|
||||
);
|
||||
return {
|
||||
ok: true,
|
||||
pid: process.pid,
|
||||
signal: "SIGUSR1",
|
||||
delayMs,
|
||||
delayMs: Math.max(0, requestedDueAt - nowMs),
|
||||
reason,
|
||||
mode: process.listenerCount("SIGUSR1") > 0 ? "emit" : "signal",
|
||||
mode,
|
||||
coalesced: false,
|
||||
cooldownMsApplied,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -336,5 +455,7 @@ export const __testing = {
|
||||
restartCycleToken = 0;
|
||||
emittedRestartToken = 0;
|
||||
consumedRestartToken = 0;
|
||||
lastRestartEmittedAt = 0;
|
||||
clearPendingScheduledRestart();
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user