fix(gateway): preserve streamed prefixes across tool boundaries

This commit is contained in:
Vignesh Natarajan
2026-03-05 17:28:11 -08:00
parent d58dafae88
commit d326861eb4
3 changed files with 152 additions and 8 deletions

View File

@@ -29,6 +29,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/chat streaming tool-boundary text retention: merge assistant delta segments into per-run chat buffers so pre-tool text is preserved in live chat deltas/finals when providers emit post-tool assistant segments as non-prefix snapshots. (#36957) Thanks @Datyedyeguy.
- OpenAI Codex OAuth/login hardening: fail OAuth completion early when the returned token is missing `api.responses.write`, and allow `openclaw models auth login --provider openai-codex` to use the built-in OAuth path even when no provider plugins are installed. (#36660) Thanks @driesvints.
- Gateway/remote WS break-glass hostname support: honor `OPENCLAW_ALLOW_INSECURE_PRIVATE_WS=1` for `ws://` hostname URLs (not only private IP literals) across onboarding validation and runtime gateway connection checks, while still rejecting public IP literals and non-unicast IPv6 endpoints. (#36930) Thanks @manju-rn.
- Routing/binding lookup scalability: pre-index route bindings by channel/account and avoid full binding-list rescans on channel-account cache rollover, preventing multi-second `resolveAgentRoute` stalls in large binding configurations. (#36915) Thanks @songchenghao.

View File

@@ -310,6 +310,98 @@ describe("agent event handler", () => {
nowSpy.mockRestore();
});
it("preserves pre-tool assistant text when later segments stream as non-prefix snapshots", () => {
let now = 10_500;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
chatRunState.registry.add("run-segmented", {
sessionKey: "session-segmented",
clientRunId: "client-segmented",
});
handler({
runId: "run-segmented",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Before tool call", delta: "Before tool call" },
});
now = 10_700;
handler({
runId: "run-segmented",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "After tool call", delta: "\nAfter tool call" },
});
emitLifecycleEnd(handler, "run-segmented", 3);
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls).toHaveLength(3);
const secondPayload = chatCalls[1]?.[1] as {
state?: string;
message?: { content?: Array<{ text?: string }> };
};
const finalPayload = chatCalls[2]?.[1] as {
state?: string;
message?: { content?: Array<{ text?: string }> };
};
expect(secondPayload.state).toBe("delta");
expect(secondPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
expect(finalPayload.state).toBe("final");
expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
nowSpy.mockRestore();
});
it("flushes merged segmented text before final when latest segment is throttled", () => {
let now = 10_800;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
chatRunState.registry.add("run-segmented-flush", {
sessionKey: "session-segmented-flush",
clientRunId: "client-segmented-flush",
});
handler({
runId: "run-segmented-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Before tool call", delta: "Before tool call" },
});
now = 10_860;
handler({
runId: "run-segmented-flush",
seq: 2,
stream: "assistant",
ts: Date.now(),
data: { text: "After tool call", delta: "\nAfter tool call" },
});
emitLifecycleEnd(handler, "run-segmented-flush", 3);
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls).toHaveLength(3);
const flushPayload = chatCalls[1]?.[1] as {
state?: string;
message?: { content?: Array<{ text?: string }> };
};
const finalPayload = chatCalls[2]?.[1] as {
state?: string;
message?: { content?: Array<{ text?: string }> };
};
expect(flushPayload.state).toBe("delta");
expect(flushPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
expect(finalPayload.state).toBe("final");
expect(finalPayload.message?.content?.[0]?.text).toBe("Before tool call\nAfter tool call");
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
nowSpy.mockRestore();
});
it("does not flush an extra delta when the latest text already broadcast", () => {
let now = 11_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);

View File

@@ -89,6 +89,48 @@ function isSilentReplyLeadFragment(text: string): boolean {
return SILENT_REPLY_TOKEN.startsWith(normalized);
}
function appendUniqueSuffix(base: string, suffix: string): string {
if (!suffix) {
return base;
}
if (!base) {
return suffix;
}
if (base.endsWith(suffix)) {
return base;
}
const maxOverlap = Math.min(base.length, suffix.length);
for (let overlap = maxOverlap; overlap > 0; overlap -= 1) {
if (base.slice(-overlap) === suffix.slice(0, overlap)) {
return base + suffix.slice(overlap);
}
}
return base + suffix;
}
function resolveMergedAssistantText(params: {
previousText: string;
nextText: string;
nextDelta: string;
}) {
const { previousText, nextText, nextDelta } = params;
if (nextText && previousText) {
if (nextText.startsWith(previousText)) {
return nextText;
}
if (previousText.startsWith(nextText) && !nextDelta) {
return previousText;
}
}
if (nextDelta) {
return appendUniqueSuffix(previousText, nextDelta);
}
if (nextText) {
return nextText;
}
return previousText;
}
export type ChatRunEntry = {
sessionKey: string;
clientRunId: string;
@@ -302,16 +344,25 @@ export function createAgentEventHandler({
sourceRunId: string,
seq: number,
text: string,
delta?: unknown,
) => {
const cleaned = stripInlineDirectiveTagsForDisplay(text).text;
if (!cleaned) {
const cleanedText = stripInlineDirectiveTagsForDisplay(text).text;
const cleanedDelta =
typeof delta === "string" ? stripInlineDirectiveTagsForDisplay(delta).text : "";
const previousText = chatRunState.buffers.get(clientRunId) ?? "";
const mergedText = resolveMergedAssistantText({
previousText,
nextText: cleanedText,
nextDelta: cleanedDelta,
});
if (!mergedText) {
return;
}
chatRunState.buffers.set(clientRunId, cleaned);
if (isSilentReplyText(cleaned, SILENT_REPLY_TOKEN)) {
chatRunState.buffers.set(clientRunId, mergedText);
if (isSilentReplyText(mergedText, SILENT_REPLY_TOKEN)) {
return;
}
if (isSilentReplyLeadFragment(cleaned)) {
if (isSilentReplyLeadFragment(mergedText)) {
return;
}
if (shouldHideHeartbeatChatOutput(clientRunId, sourceRunId)) {
@@ -323,7 +374,7 @@ export function createAgentEventHandler({
return;
}
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, cleaned.length);
chatRunState.deltaLastBroadcastLen.set(clientRunId, mergedText.length);
const payload = {
runId: clientRunId,
sessionKey,
@@ -331,7 +382,7 @@ export function createAgentEventHandler({
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text: cleaned }],
content: [{ type: "text", text: mergedText }],
timestamp: now,
},
};
@@ -512,7 +563,7 @@ export function createAgentEventHandler({
nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload);
}
if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") {
emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text);
emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text, evt.data.delta);
} else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
const evtStopReason =
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;