iOS/Gateway: stabilize background wake and reconnect behavior (#21226)
Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 7705a7741e06335197a2015593355a7f4f9170ab Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky
This commit is contained in:
@@ -13,6 +13,7 @@ const mocks = vi.hoisted(() => ({
|
||||
loadApnsRegistration: vi.fn(),
|
||||
resolveApnsAuthConfigFromEnv: vi.fn(),
|
||||
sendApnsBackgroundWake: vi.fn(),
|
||||
sendApnsAlert: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/config.js", () => ({
|
||||
@@ -32,6 +33,7 @@ vi.mock("../../infra/push-apns.js", () => ({
|
||||
loadApnsRegistration: mocks.loadApnsRegistration,
|
||||
resolveApnsAuthConfigFromEnv: mocks.resolveApnsAuthConfigFromEnv,
|
||||
sendApnsBackgroundWake: mocks.sendApnsBackgroundWake,
|
||||
sendApnsAlert: mocks.sendApnsAlert,
|
||||
}));
|
||||
|
||||
type RespondCall = [
|
||||
@@ -81,12 +83,17 @@ async function invokeNode(params: {
|
||||
requestParams?: Partial<Record<string, unknown>>;
|
||||
}) {
|
||||
const respond = vi.fn();
|
||||
const logGateway = {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
await nodeHandlers["node.invoke"]({
|
||||
params: makeNodeInvokeParams(params.requestParams),
|
||||
respond: respond as never,
|
||||
context: {
|
||||
nodeRegistry: params.nodeRegistry,
|
||||
execApprovalManager: undefined,
|
||||
logGateway,
|
||||
} as never,
|
||||
client: null,
|
||||
req: { type: "req", id: "req-node-invoke", method: "node.invoke" },
|
||||
@@ -135,6 +142,7 @@ describe("node.invoke APNs wake path", () => {
|
||||
mocks.loadApnsRegistration.mockReset();
|
||||
mocks.resolveApnsAuthConfigFromEnv.mockReset();
|
||||
mocks.sendApnsBackgroundWake.mockReset();
|
||||
mocks.sendApnsAlert.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -202,7 +210,7 @@ describe("node.invoke APNs wake path", () => {
|
||||
expect(call?.[1]).toMatchObject({ ok: true, nodeId: "ios-node-reconnect" });
|
||||
});
|
||||
|
||||
it("throttles repeated wake attempts for the same disconnected node", async () => {
|
||||
it("forces one retry wake when the first wake still fails to reconnect", async () => {
|
||||
vi.useFakeTimers();
|
||||
mockSuccessfulWakeConfig("ios-node-throttle");
|
||||
|
||||
@@ -211,21 +219,14 @@ describe("node.invoke APNs wake path", () => {
|
||||
invoke: vi.fn().mockResolvedValue({ ok: true }),
|
||||
};
|
||||
|
||||
const first = invokeNode({
|
||||
const invokePromise = invokeNode({
|
||||
nodeRegistry,
|
||||
requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-1" },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(WAKE_WAIT_TIMEOUT_MS);
|
||||
await first;
|
||||
await vi.advanceTimersByTimeAsync(20_000);
|
||||
await invokePromise;
|
||||
|
||||
const second = invokeNode({
|
||||
nodeRegistry,
|
||||
requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-2" },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(WAKE_WAIT_TIMEOUT_MS);
|
||||
await second;
|
||||
|
||||
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(2);
|
||||
expect(nodeRegistry.invoke).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
import {
|
||||
loadApnsRegistration,
|
||||
resolveApnsAuthConfigFromEnv,
|
||||
sendApnsAlert,
|
||||
sendApnsBackgroundWake,
|
||||
} from "../../infra/push-apns.js";
|
||||
import { isNodeCommandAllowed, resolveNodeCommandAllowlist } from "../node-command-policy.js";
|
||||
@@ -40,15 +41,36 @@ import {
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
||||
|
||||
type NodeWakeState = {
|
||||
lastWakeAtMs: number;
|
||||
inFlight?: Promise<boolean>;
|
||||
inFlight?: Promise<NodeWakeAttempt>;
|
||||
};
|
||||
|
||||
const nodeWakeById = new Map<string, NodeWakeState>();
|
||||
const nodeWakeNudgeById = new Map<string, number>();
|
||||
|
||||
type NodeWakeAttempt = {
|
||||
available: boolean;
|
||||
throttled: boolean;
|
||||
path: "throttled" | "no-registration" | "no-auth" | "sent" | "send-error";
|
||||
durationMs: number;
|
||||
apnsStatus?: number;
|
||||
apnsReason?: string;
|
||||
};
|
||||
|
||||
type NodeWakeNudgeAttempt = {
|
||||
sent: boolean;
|
||||
throttled: boolean;
|
||||
reason: "throttled" | "no-registration" | "no-auth" | "send-error" | "apns-not-ok" | "sent";
|
||||
durationMs: number;
|
||||
apnsStatus?: number;
|
||||
apnsReason?: string;
|
||||
};
|
||||
|
||||
function isNodeEntry(entry: { role?: string; roles?: string[] }) {
|
||||
if (entry.role === "node") {
|
||||
@@ -64,7 +86,10 @@ async function delayMs(ms: number): Promise<void> {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async function maybeWakeNodeWithApns(nodeId: string): Promise<boolean> {
|
||||
async function maybeWakeNodeWithApns(
|
||||
nodeId: string,
|
||||
opts?: { force?: boolean },
|
||||
): Promise<NodeWakeAttempt> {
|
||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||
nodeWakeById.set(nodeId, state);
|
||||
|
||||
@@ -73,36 +98,75 @@ async function maybeWakeNodeWithApns(nodeId: string): Promise<boolean> {
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
if (state.lastWakeAtMs > 0 && now - state.lastWakeAtMs < NODE_WAKE_THROTTLE_MS) {
|
||||
return true;
|
||||
const force = opts?.force === true;
|
||||
if (!force && state.lastWakeAtMs > 0 && now - state.lastWakeAtMs < NODE_WAKE_THROTTLE_MS) {
|
||||
return { available: true, throttled: true, path: "throttled", durationMs: 0 };
|
||||
}
|
||||
|
||||
state.inFlight = (async () => {
|
||||
const startedAtMs = Date.now();
|
||||
const withDuration = (attempt: Omit<NodeWakeAttempt, "durationMs">): NodeWakeAttempt => ({
|
||||
...attempt,
|
||||
durationMs: Math.max(0, Date.now() - startedAtMs),
|
||||
});
|
||||
|
||||
try {
|
||||
const registration = await loadApnsRegistration(nodeId);
|
||||
if (!registration) {
|
||||
return false;
|
||||
return withDuration({ available: false, throttled: false, path: "no-registration" });
|
||||
}
|
||||
|
||||
const auth = await resolveApnsAuthConfigFromEnv(process.env);
|
||||
if (!auth.ok) {
|
||||
return false;
|
||||
return withDuration({
|
||||
available: false,
|
||||
throttled: false,
|
||||
path: "no-auth",
|
||||
apnsReason: auth.error,
|
||||
});
|
||||
}
|
||||
|
||||
state.lastWakeAtMs = Date.now();
|
||||
await sendApnsBackgroundWake({
|
||||
const wakeResult = await sendApnsBackgroundWake({
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
wakeReason: "node.invoke",
|
||||
});
|
||||
} catch {
|
||||
// Best-effort wake only.
|
||||
if (state.lastWakeAtMs === 0) {
|
||||
return false;
|
||||
if (!wakeResult.ok) {
|
||||
return withDuration({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "send-error",
|
||||
apnsStatus: wakeResult.status,
|
||||
apnsReason: wakeResult.reason,
|
||||
});
|
||||
}
|
||||
return withDuration({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "sent",
|
||||
apnsStatus: wakeResult.status,
|
||||
apnsReason: wakeResult.reason,
|
||||
});
|
||||
} catch (err) {
|
||||
// Best-effort wake only.
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
if (state.lastWakeAtMs === 0) {
|
||||
return withDuration({
|
||||
available: false,
|
||||
throttled: false,
|
||||
path: "send-error",
|
||||
apnsReason: message,
|
||||
});
|
||||
}
|
||||
return withDuration({
|
||||
available: true,
|
||||
throttled: false,
|
||||
path: "send-error",
|
||||
apnsReason: message,
|
||||
});
|
||||
}
|
||||
return true;
|
||||
})();
|
||||
|
||||
try {
|
||||
@@ -112,6 +176,70 @@ async function maybeWakeNodeWithApns(nodeId: string): Promise<boolean> {
|
||||
}
|
||||
}
|
||||
|
||||
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||
const startedAtMs = Date.now();
|
||||
const withDuration = (
|
||||
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
||||
): NodeWakeNudgeAttempt => ({
|
||||
...attempt,
|
||||
durationMs: Math.max(0, Date.now() - startedAtMs),
|
||||
});
|
||||
|
||||
const lastNudgeAtMs = nodeWakeNudgeById.get(nodeId) ?? 0;
|
||||
if (lastNudgeAtMs > 0 && Date.now() - lastNudgeAtMs < NODE_WAKE_NUDGE_THROTTLE_MS) {
|
||||
return withDuration({ sent: false, throttled: true, reason: "throttled" });
|
||||
}
|
||||
|
||||
const registration = await loadApnsRegistration(nodeId);
|
||||
if (!registration) {
|
||||
return withDuration({ sent: false, throttled: false, reason: "no-registration" });
|
||||
}
|
||||
const auth = await resolveApnsAuthConfigFromEnv(process.env);
|
||||
if (!auth.ok) {
|
||||
return withDuration({
|
||||
sent: false,
|
||||
throttled: false,
|
||||
reason: "no-auth",
|
||||
apnsReason: auth.error,
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await sendApnsAlert({
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
title: "OpenClaw needs a quick reopen",
|
||||
body: "Tap to reopen OpenClaw and restore the node connection.",
|
||||
});
|
||||
if (!result.ok) {
|
||||
return withDuration({
|
||||
sent: false,
|
||||
throttled: false,
|
||||
reason: "apns-not-ok",
|
||||
apnsStatus: result.status,
|
||||
apnsReason: result.reason,
|
||||
});
|
||||
}
|
||||
nodeWakeNudgeById.set(nodeId, Date.now());
|
||||
return withDuration({
|
||||
sent: true,
|
||||
throttled: false,
|
||||
reason: "sent",
|
||||
apnsStatus: result.status,
|
||||
apnsReason: result.reason,
|
||||
});
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
return withDuration({
|
||||
sent: false,
|
||||
throttled: false,
|
||||
reason: "send-error",
|
||||
apnsReason: message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForNodeReconnect(params: {
|
||||
nodeId: string;
|
||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||
@@ -430,7 +558,7 @@ export const nodeHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
});
|
||||
},
|
||||
"node.invoke": async ({ params, respond, context, client }) => {
|
||||
"node.invoke": async ({ params, respond, context, client, req }) => {
|
||||
if (!validateNodeInvokeParams(params)) {
|
||||
respondInvalidParams({
|
||||
respond,
|
||||
@@ -472,12 +600,70 @@ export const nodeHandlers: GatewayRequestHandlers = {
|
||||
await respondUnavailableOnThrow(respond, async () => {
|
||||
let nodeSession = context.nodeRegistry.get(nodeId);
|
||||
if (!nodeSession) {
|
||||
const wakeAvailable = await maybeWakeNodeWithApns(nodeId);
|
||||
if (wakeAvailable) {
|
||||
await waitForNodeReconnect({ nodeId, context });
|
||||
const wakeReqId = req.id;
|
||||
const wakeFlowStartedAtMs = Date.now();
|
||||
context.logGateway.info(
|
||||
`node wake start node=${nodeId} req=${wakeReqId} command=${command}`,
|
||||
);
|
||||
|
||||
const wake = await maybeWakeNodeWithApns(nodeId);
|
||||
context.logGateway.info(
|
||||
`node wake stage=wake1 node=${nodeId} req=${wakeReqId} ` +
|
||||
`available=${wake.available} throttled=${wake.throttled} ` +
|
||||
`path=${wake.path} durationMs=${wake.durationMs} ` +
|
||||
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
|
||||
);
|
||||
if (wake.available) {
|
||||
const waitStartedAtMs = Date.now();
|
||||
const waitTimeoutMs = NODE_WAKE_RECONNECT_WAIT_MS;
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId,
|
||||
context,
|
||||
timeoutMs: waitTimeoutMs,
|
||||
});
|
||||
const waitDurationMs = Math.max(0, Date.now() - waitStartedAtMs);
|
||||
context.logGateway.info(
|
||||
`node wake stage=wait1 node=${nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${waitTimeoutMs} durationMs=${waitDurationMs}`,
|
||||
);
|
||||
}
|
||||
nodeSession = context.nodeRegistry.get(nodeId);
|
||||
if (!nodeSession && wake.available) {
|
||||
const retryWake = await maybeWakeNodeWithApns(nodeId, { force: true });
|
||||
context.logGateway.info(
|
||||
`node wake stage=wake2 node=${nodeId} req=${wakeReqId} force=true ` +
|
||||
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
|
||||
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
|
||||
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
|
||||
);
|
||||
if (retryWake.available) {
|
||||
const waitStartedAtMs = Date.now();
|
||||
const waitTimeoutMs = NODE_WAKE_RECONNECT_RETRY_WAIT_MS;
|
||||
const reconnected = await waitForNodeReconnect({
|
||||
nodeId,
|
||||
context,
|
||||
timeoutMs: waitTimeoutMs,
|
||||
});
|
||||
const waitDurationMs = Math.max(0, Date.now() - waitStartedAtMs);
|
||||
context.logGateway.info(
|
||||
`node wake stage=wait2 node=${nodeId} req=${wakeReqId} ` +
|
||||
`reconnected=${reconnected} timeoutMs=${waitTimeoutMs} durationMs=${waitDurationMs}`,
|
||||
);
|
||||
}
|
||||
nodeSession = context.nodeRegistry.get(nodeId);
|
||||
}
|
||||
if (!nodeSession) {
|
||||
const totalDurationMs = Math.max(0, Date.now() - wakeFlowStartedAtMs);
|
||||
const nudge = await maybeSendNodeWakeNudge(nodeId);
|
||||
context.logGateway.info(
|
||||
`node wake nudge node=${nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
|
||||
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
|
||||
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
|
||||
);
|
||||
context.logGateway.warn(
|
||||
`node wake done node=${nodeId} req=${wakeReqId} connected=false ` +
|
||||
`reason=not_connected totalMs=${totalDurationMs}`,
|
||||
);
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -487,6 +673,11 @@ export const nodeHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const totalDurationMs = Math.max(0, Date.now() - wakeFlowStartedAtMs);
|
||||
context.logGateway.info(
|
||||
`node wake done node=${nodeId} req=${wakeReqId} connected=true totalMs=${totalDurationMs}`,
|
||||
);
|
||||
}
|
||||
const cfg = loadConfig();
|
||||
const allowlist = resolveNodeCommandAllowlist(cfg, nodeSession);
|
||||
|
||||
Reference in New Issue
Block a user