Files
openclaw/src/gateway/server-methods/agent-wait-dedupe.ts
Bob 61f7cea48b fix: kill stuck ACP child processes on startup and harden sessions in discord threads (#33699)
* Gateway: resolve agent.wait for chat.send runs

* Discord: harden ACP thread binding + listener timeout

* ACPX: handle already-exited child wait

* Gateway/Discord: address PR review findings

* Discord: keep ACP error-state thread bindings on startup

* gateway: make agent.wait dedupe bridge event-driven

* discord: harden ACP probe classification and cap startup fan-out

* discord: add cooperative timeout cancellation

* discord: fix startup probe concurrency helper typing

* plugin-sdk: avoid Windows root-alias shard timeout

* plugin-sdk: keep root alias reflection path non-blocking

* discord+gateway: resolve remaining PR review findings

* gateway+discord: fix codex review regressions

* Discord/Gateway: address Codex review findings

* Gateway: keep agent.wait lifecycle active with shared run IDs

* Discord: clean up status reactions on aborted runs

* fix: add changelog note for ACP/Discord startup hardening (#33699) (thanks @dutifulbob)

---------

Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
2026-03-04 10:52:28 +01:00

245 lines
6.5 KiB
TypeScript

import type { DedupeEntry } from "../server-shared.js";
export type AgentWaitTerminalSnapshot = {
status: "ok" | "error" | "timeout";
startedAt?: number;
endedAt?: number;
error?: string;
};
const AGENT_WAITERS_BY_RUN_ID = new Map<string, Set<() => void>>();
function parseRunIdFromDedupeKey(key: string): string | null {
if (key.startsWith("agent:")) {
return key.slice("agent:".length) || null;
}
if (key.startsWith("chat:")) {
return key.slice("chat:".length) || null;
}
return null;
}
function asFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function addWaiter(runId: string, waiter: () => void): () => void {
const normalizedRunId = runId.trim();
if (!normalizedRunId) {
return () => {};
}
const existing = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
if (existing) {
existing.add(waiter);
return () => {
const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
if (!waiters) {
return;
}
waiters.delete(waiter);
if (waiters.size === 0) {
AGENT_WAITERS_BY_RUN_ID.delete(normalizedRunId);
}
};
}
AGENT_WAITERS_BY_RUN_ID.set(normalizedRunId, new Set([waiter]));
return () => {
const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
if (!waiters) {
return;
}
waiters.delete(waiter);
if (waiters.size === 0) {
AGENT_WAITERS_BY_RUN_ID.delete(normalizedRunId);
}
};
}
function notifyWaiters(runId: string): void {
const normalizedRunId = runId.trim();
if (!normalizedRunId) {
return;
}
const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId);
if (!waiters || waiters.size === 0) {
return;
}
for (const waiter of waiters) {
waiter();
}
}
export function readTerminalSnapshotFromDedupeEntry(
entry: DedupeEntry,
): AgentWaitTerminalSnapshot | null {
const payload = entry.payload as
| {
status?: unknown;
startedAt?: unknown;
endedAt?: unknown;
error?: unknown;
summary?: unknown;
}
| undefined;
const status = typeof payload?.status === "string" ? payload.status : undefined;
if (status === "accepted" || status === "started" || status === "in_flight") {
return null;
}
const startedAt = asFiniteNumber(payload?.startedAt);
const endedAt = asFiniteNumber(payload?.endedAt) ?? entry.ts;
const errorMessage =
typeof payload?.error === "string"
? payload.error
: typeof payload?.summary === "string"
? payload.summary
: entry.error?.message;
if (status === "ok" || status === "timeout") {
return {
status,
startedAt,
endedAt,
error: status === "timeout" ? errorMessage : undefined,
};
}
if (status === "error" || !entry.ok) {
return {
status: "error",
startedAt,
endedAt,
error: errorMessage,
};
}
return null;
}
export function readTerminalSnapshotFromGatewayDedupe(params: {
dedupe: Map<string, DedupeEntry>;
runId: string;
ignoreAgentTerminalSnapshot?: boolean;
}): AgentWaitTerminalSnapshot | null {
if (params.ignoreAgentTerminalSnapshot) {
const chatEntry = params.dedupe.get(`chat:${params.runId}`);
if (!chatEntry) {
return null;
}
return readTerminalSnapshotFromDedupeEntry(chatEntry);
}
const chatEntry = params.dedupe.get(`chat:${params.runId}`);
const chatSnapshot = chatEntry ? readTerminalSnapshotFromDedupeEntry(chatEntry) : null;
const agentEntry = params.dedupe.get(`agent:${params.runId}`);
const agentSnapshot = agentEntry ? readTerminalSnapshotFromDedupeEntry(agentEntry) : null;
if (agentEntry) {
if (!agentSnapshot) {
// If agent is still in-flight, only trust chat if it was written after
// this agent entry (indicating a newer completed chat run reused runId).
if (chatSnapshot && chatEntry && chatEntry.ts > agentEntry.ts) {
return chatSnapshot;
}
return null;
}
}
if (agentSnapshot && chatSnapshot && agentEntry && chatEntry) {
// Reused idempotency keys can leave both records present. Prefer the
// freshest terminal snapshot so callers observe the latest run outcome.
return chatEntry.ts > agentEntry.ts ? chatSnapshot : agentSnapshot;
}
return agentSnapshot ?? chatSnapshot;
}
export async function waitForTerminalGatewayDedupe(params: {
dedupe: Map<string, DedupeEntry>;
runId: string;
timeoutMs: number;
signal?: AbortSignal;
ignoreAgentTerminalSnapshot?: boolean;
}): Promise<AgentWaitTerminalSnapshot | null> {
const initial = readTerminalSnapshotFromGatewayDedupe(params);
if (initial) {
return initial;
}
if (params.timeoutMs <= 0 || params.signal?.aborted) {
return null;
}
return await new Promise((resolve) => {
let settled = false;
let timeoutHandle: NodeJS.Timeout | undefined;
let onAbort: (() => void) | undefined;
let removeWaiter: (() => void) | undefined;
const finish = (snapshot: AgentWaitTerminalSnapshot | null) => {
if (settled) {
return;
}
settled = true;
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (onAbort) {
params.signal?.removeEventListener("abort", onAbort);
}
removeWaiter?.();
resolve(snapshot);
};
const onWake = () => {
const snapshot = readTerminalSnapshotFromGatewayDedupe(params);
if (snapshot) {
finish(snapshot);
}
};
removeWaiter = addWaiter(params.runId, onWake);
onWake();
if (settled) {
return;
}
const timeoutDelayMs = Math.max(1, Math.min(Math.floor(params.timeoutMs), 2_147_483_647));
timeoutHandle = setTimeout(() => finish(null), timeoutDelayMs);
timeoutHandle.unref?.();
onAbort = () => finish(null);
params.signal?.addEventListener("abort", onAbort, { once: true });
});
}
export function setGatewayDedupeEntry(params: {
dedupe: Map<string, DedupeEntry>;
key: string;
entry: DedupeEntry;
}) {
params.dedupe.set(params.key, params.entry);
const runId = parseRunIdFromDedupeKey(params.key);
if (!runId) {
return;
}
const snapshot = readTerminalSnapshotFromDedupeEntry(params.entry);
if (!snapshot) {
return;
}
notifyWaiters(runId);
}
export const __testing = {
getWaiterCount(runId?: string): number {
if (runId) {
return AGENT_WAITERS_BY_RUN_ID.get(runId)?.size ?? 0;
}
let total = 0;
for (const waiters of AGENT_WAITERS_BY_RUN_ID.values()) {
total += waiters.size;
}
return total;
},
resetWaiters() {
AGENT_WAITERS_BY_RUN_ID.clear();
},
};