Gateway/Control UI: preserve partial output on abort (#15026)
* Gateway/Control UI: preserve partial output on abort * fix: finalize abort partial handling and tests (#15026) (thanks @advaitpaliwal) --------- Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
This commit is contained in:
@@ -340,6 +340,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Configure/Gateway: reject literal `"undefined"`/`"null"` token input and validate gateway password prompt values to avoid invalid password-mode configs. (#13767) Thanks @omair445.
|
||||
- Gateway: handle async `EPIPE` on stdout/stderr during shutdown. (#13414) Thanks @keshav55.
|
||||
- Gateway/Control UI: resolve missing dashboard assets when `openclaw` is installed globally via symlink-based Node managers (nvm/fnm/n/Homebrew). (#14919) Thanks @aynorica.
|
||||
- Gateway/Control UI: keep partial assistant output visible when runs are aborted, and persist aborted partials to session transcripts for follow-up context.
|
||||
- Cron: use requested `agentId` for isolated job auth resolution. (#13983) Thanks @0xRaini.
|
||||
- Cron: prevent cron jobs from skipping execution when `nextRunAtMs` advances. (#14068) Thanks @WalterSumbon.
|
||||
- Cron: pass `agentId` to `runHeartbeatOnce` for main-session jobs. (#14140) Thanks @ishikawa-pro.
|
||||
|
||||
@@ -96,6 +96,10 @@ Cron jobs panel notes:
|
||||
- Click **Stop** (calls `chat.abort`)
|
||||
- Type `/stop` (or `stop|esc|abort|wait|exit|interrupt`) to abort out-of-band
|
||||
- `chat.abort` supports `{ sessionKey }` (no `runId`) to abort all active runs for that session
|
||||
- Abort partial retention:
|
||||
- When a run is aborted, partial assistant text can still be shown in the UI
|
||||
- Gateway persists aborted partial assistant text into transcript history when buffered output exists
|
||||
- Persisted entries include abort metadata so transcript consumers can tell abort partials from normal completion output
|
||||
|
||||
## Tailnet access (recommended)
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ Status: the macOS/iOS SwiftUI chat UI talks directly to the Gateway WebSocket.
|
||||
|
||||
- The UI connects to the Gateway WebSocket and uses `chat.history`, `chat.send`, and `chat.inject`.
|
||||
- `chat.inject` appends an assistant note directly to the transcript and broadcasts it to the UI (no agent run).
|
||||
- Aborted runs can keep partial assistant output visible in the UI.
|
||||
- Gateway persists aborted partial assistant text into transcript history when buffered output exists, and marks those entries with abort metadata.
|
||||
- History is always fetched from the gateway (no local file watching).
|
||||
- If the gateway is unreachable, WebChat is read-only.
|
||||
|
||||
|
||||
122
src/gateway/chat-abort.test.ts
Normal file
122
src/gateway/chat-abort.test.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
abortChatRunById,
|
||||
type ChatAbortOps,
|
||||
type ChatAbortControllerEntry,
|
||||
} from "./chat-abort.js";
|
||||
|
||||
function createActiveEntry(sessionKey: string): ChatAbortControllerEntry {
|
||||
const now = Date.now();
|
||||
return {
|
||||
controller: new AbortController(),
|
||||
sessionId: "sess-1",
|
||||
sessionKey,
|
||||
startedAtMs: now,
|
||||
expiresAtMs: now + 10_000,
|
||||
};
|
||||
}
|
||||
|
||||
function createOps(params: {
|
||||
runId: string;
|
||||
entry: ChatAbortControllerEntry;
|
||||
buffer?: string;
|
||||
}): ChatAbortOps & {
|
||||
broadcast: ReturnType<typeof vi.fn>;
|
||||
nodeSendToSession: ReturnType<typeof vi.fn>;
|
||||
removeChatRun: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const { runId, entry, buffer } = params;
|
||||
const broadcast = vi.fn();
|
||||
const nodeSendToSession = vi.fn();
|
||||
const removeChatRun = vi.fn();
|
||||
|
||||
return {
|
||||
chatAbortControllers: new Map([[runId, entry]]),
|
||||
chatRunBuffers: new Map(buffer !== undefined ? [[runId, buffer]] : []),
|
||||
chatDeltaSentAt: new Map([[runId, Date.now()]]),
|
||||
chatAbortedRuns: new Map(),
|
||||
removeChatRun,
|
||||
agentRunSeq: new Map(),
|
||||
broadcast,
|
||||
nodeSendToSession,
|
||||
};
|
||||
}
|
||||
|
||||
describe("abortChatRunById", () => {
|
||||
it("broadcasts aborted payload with partial message when buffered text exists", () => {
|
||||
const runId = "run-1";
|
||||
const sessionKey = "main";
|
||||
const entry = createActiveEntry(sessionKey);
|
||||
const ops = createOps({ runId, entry, buffer: " Partial reply " });
|
||||
ops.agentRunSeq.set(runId, 2);
|
||||
ops.agentRunSeq.set("client-run-1", 4);
|
||||
ops.removeChatRun.mockReturnValue({ sessionKey, clientRunId: "client-run-1" });
|
||||
|
||||
const result = abortChatRunById(ops, { runId, sessionKey, stopReason: "user" });
|
||||
|
||||
expect(result).toEqual({ aborted: true });
|
||||
expect(entry.controller.signal.aborted).toBe(true);
|
||||
expect(ops.chatAbortControllers.has(runId)).toBe(false);
|
||||
expect(ops.chatRunBuffers.has(runId)).toBe(false);
|
||||
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
|
||||
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
|
||||
expect(ops.agentRunSeq.has(runId)).toBe(false);
|
||||
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);
|
||||
|
||||
expect(ops.broadcast).toHaveBeenCalledTimes(1);
|
||||
const payload = ops.broadcast.mock.calls[0]?.[1] as Record<string, unknown>;
|
||||
expect(payload).toEqual(
|
||||
expect.objectContaining({
|
||||
runId,
|
||||
sessionKey,
|
||||
seq: 3,
|
||||
state: "aborted",
|
||||
stopReason: "user",
|
||||
}),
|
||||
);
|
||||
expect(payload.message).toEqual(
|
||||
expect.objectContaining({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: " Partial reply " }],
|
||||
}),
|
||||
);
|
||||
expect((payload.message as { timestamp?: unknown }).timestamp).toEqual(expect.any(Number));
|
||||
expect(ops.nodeSendToSession).toHaveBeenCalledWith(sessionKey, "chat", payload);
|
||||
});
|
||||
|
||||
it("omits aborted message when buffered text is empty", () => {
|
||||
const runId = "run-1";
|
||||
const sessionKey = "main";
|
||||
const entry = createActiveEntry(sessionKey);
|
||||
const ops = createOps({ runId, entry, buffer: " " });
|
||||
|
||||
const result = abortChatRunById(ops, { runId, sessionKey });
|
||||
|
||||
expect(result).toEqual({ aborted: true });
|
||||
const payload = ops.broadcast.mock.calls[0]?.[1] as Record<string, unknown>;
|
||||
expect(payload.message).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves partial message even when abort listeners clear buffers synchronously", () => {
|
||||
const runId = "run-1";
|
||||
const sessionKey = "main";
|
||||
const entry = createActiveEntry(sessionKey);
|
||||
const ops = createOps({ runId, entry, buffer: "streamed text" });
|
||||
|
||||
// Simulate synchronous cleanup triggered by AbortController listeners.
|
||||
entry.controller.signal.addEventListener("abort", () => {
|
||||
ops.chatRunBuffers.delete(runId);
|
||||
});
|
||||
|
||||
const result = abortChatRunById(ops, { runId, sessionKey });
|
||||
|
||||
expect(result).toEqual({ aborted: true });
|
||||
const payload = ops.broadcast.mock.calls[0]?.[1] as Record<string, unknown>;
|
||||
expect(payload.message).toEqual(
|
||||
expect.objectContaining({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "streamed text" }],
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -52,15 +52,23 @@ function broadcastChatAborted(
|
||||
runId: string;
|
||||
sessionKey: string;
|
||||
stopReason?: string;
|
||||
partialText?: string;
|
||||
},
|
||||
) {
|
||||
const { runId, sessionKey, stopReason } = params;
|
||||
const { runId, sessionKey, stopReason, partialText } = params;
|
||||
const payload = {
|
||||
runId,
|
||||
sessionKey,
|
||||
seq: (ops.agentRunSeq.get(runId) ?? 0) + 1,
|
||||
state: "aborted" as const,
|
||||
stopReason,
|
||||
message: partialText
|
||||
? {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: partialText }],
|
||||
timestamp: Date.now(),
|
||||
}
|
||||
: undefined,
|
||||
};
|
||||
ops.broadcast("chat", payload);
|
||||
ops.nodeSendToSession(sessionKey, "chat", payload);
|
||||
@@ -83,13 +91,15 @@ export function abortChatRunById(
|
||||
return { aborted: false };
|
||||
}
|
||||
|
||||
const bufferedText = ops.chatRunBuffers.get(runId);
|
||||
const partialText = bufferedText && bufferedText.trim() ? bufferedText : undefined;
|
||||
ops.chatAbortedRuns.set(runId, Date.now());
|
||||
active.controller.abort();
|
||||
ops.chatAbortControllers.delete(runId);
|
||||
ops.chatRunBuffers.delete(runId);
|
||||
ops.chatDeltaSentAt.delete(runId);
|
||||
const removed = ops.removeChatRun(runId, runId, sessionKey);
|
||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason });
|
||||
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
|
||||
ops.agentRunSeq.delete(runId);
|
||||
if (removed?.clientRunId) {
|
||||
ops.agentRunSeq.delete(removed.clientRunId);
|
||||
|
||||
252
src/gateway/server-methods/chat.abort-persistence.test.ts
Normal file
252
src/gateway/server-methods/chat.abort-persistence.test.ts
Normal file
@@ -0,0 +1,252 @@
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
type TranscriptLine = {
|
||||
message?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function createActiveRun(sessionKey: string, sessionId: string) {
|
||||
const now = Date.now();
|
||||
return {
|
||||
controller: new AbortController(),
|
||||
sessionId,
|
||||
sessionKey,
|
||||
startedAtMs: now,
|
||||
expiresAtMs: now + 30_000,
|
||||
};
|
||||
}
|
||||
|
||||
async function writeTranscriptHeader(transcriptPath: string, sessionId: string) {
|
||||
const header = {
|
||||
type: "session",
|
||||
version: CURRENT_SESSION_VERSION,
|
||||
id: sessionId,
|
||||
timestamp: new Date(0).toISOString(),
|
||||
cwd: "/tmp",
|
||||
};
|
||||
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, "utf-8");
|
||||
}
|
||||
|
||||
async function readTranscriptLines(transcriptPath: string): Promise<TranscriptLine[]> {
|
||||
const raw = await fs.readFile(transcriptPath, "utf-8");
|
||||
return raw
|
||||
.split(/\r?\n/)
|
||||
.filter((line) => line.trim().length > 0)
|
||||
.map((line) => {
|
||||
try {
|
||||
return JSON.parse(line) as TranscriptLine;
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async function importChatHandlersWithSession(transcriptPath: string, sessionId: string) {
|
||||
vi.resetModules();
|
||||
vi.doMock("../session-utils.js", async (importOriginal) => {
|
||||
const original = await importOriginal();
|
||||
return {
|
||||
...original,
|
||||
loadSessionEntry: () => ({
|
||||
cfg: {},
|
||||
storePath: path.join(path.dirname(transcriptPath), "sessions.json"),
|
||||
entry: {
|
||||
sessionId,
|
||||
sessionFile: transcriptPath,
|
||||
},
|
||||
canonicalKey: "main",
|
||||
}),
|
||||
};
|
||||
});
|
||||
return import("./chat.js");
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
vi.resetModules();
|
||||
});
|
||||
|
||||
describe("chat abort transcript persistence", () => {
|
||||
it("persists run-scoped abort partial with rpc metadata and idempotency", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-run-"));
|
||||
const transcriptPath = path.join(dir, "sess-main.jsonl");
|
||||
const sessionId = "sess-main";
|
||||
const runId = "idem-abort-run-1";
|
||||
await writeTranscriptHeader(transcriptPath, sessionId);
|
||||
|
||||
const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId);
|
||||
const respond = vi.fn();
|
||||
const context = {
|
||||
chatAbortControllers: new Map([[runId, createActiveRun("main", sessionId)]]),
|
||||
chatRunBuffers: new Map([[runId, "Partial from run abort"]]),
|
||||
chatDeltaSentAt: new Map([[runId, Date.now()]]),
|
||||
chatAbortedRuns: new Map<string, number>(),
|
||||
removeChatRun: vi
|
||||
.fn()
|
||||
.mockReturnValue({ sessionKey: "main", clientRunId: "client-idem-abort-run-1" }),
|
||||
agentRunSeq: new Map<string, number>([
|
||||
[runId, 2],
|
||||
["client-idem-abort-run-1", 3],
|
||||
]),
|
||||
broadcast: vi.fn(),
|
||||
nodeSendToSession: vi.fn(),
|
||||
logGateway: { warn: vi.fn() },
|
||||
};
|
||||
|
||||
await chatHandlers["chat.abort"]({
|
||||
params: { sessionKey: "main", runId },
|
||||
respond,
|
||||
context: context as never,
|
||||
});
|
||||
|
||||
const [ok1, payload1] = respond.mock.calls.at(-1) ?? [];
|
||||
expect(ok1).toBe(true);
|
||||
expect(payload1).toMatchObject({ aborted: true, runIds: [runId] });
|
||||
|
||||
context.chatAbortControllers.set(runId, createActiveRun("main", sessionId));
|
||||
context.chatRunBuffers.set(runId, "Partial from run abort");
|
||||
context.chatDeltaSentAt.set(runId, Date.now());
|
||||
|
||||
await chatHandlers["chat.abort"]({
|
||||
params: { sessionKey: "main", runId },
|
||||
respond,
|
||||
context: context as never,
|
||||
});
|
||||
|
||||
const lines = await readTranscriptLines(transcriptPath);
|
||||
const persisted = lines
|
||||
.map((line) => line.message)
|
||||
.filter(
|
||||
(message): message is Record<string, unknown> =>
|
||||
Boolean(message) && message?.idempotencyKey === `${runId}:assistant`,
|
||||
);
|
||||
|
||||
expect(persisted).toHaveLength(1);
|
||||
expect(persisted[0]).toMatchObject({
|
||||
stopReason: "stop",
|
||||
idempotencyKey: `${runId}:assistant`,
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: "rpc",
|
||||
runId,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("persists session-scoped abort partials with rpc metadata", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-abort-session-"));
|
||||
const transcriptPath = path.join(dir, "sess-main.jsonl");
|
||||
const sessionId = "sess-main";
|
||||
await writeTranscriptHeader(transcriptPath, sessionId);
|
||||
|
||||
const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId);
|
||||
const respond = vi.fn();
|
||||
const context = {
|
||||
chatAbortControllers: new Map([
|
||||
["run-a", createActiveRun("main", sessionId)],
|
||||
["run-b", createActiveRun("main", sessionId)],
|
||||
]),
|
||||
chatRunBuffers: new Map([
|
||||
["run-a", "Session abort partial"],
|
||||
["run-b", " "],
|
||||
]),
|
||||
chatDeltaSentAt: new Map([
|
||||
["run-a", Date.now()],
|
||||
["run-b", Date.now()],
|
||||
]),
|
||||
chatAbortedRuns: new Map<string, number>(),
|
||||
removeChatRun: vi
|
||||
.fn()
|
||||
.mockImplementation((run: string) => ({ sessionKey: "main", clientRunId: run })),
|
||||
agentRunSeq: new Map<string, number>(),
|
||||
broadcast: vi.fn(),
|
||||
nodeSendToSession: vi.fn(),
|
||||
logGateway: { warn: vi.fn() },
|
||||
};
|
||||
|
||||
await chatHandlers["chat.abort"]({
|
||||
params: { sessionKey: "main" },
|
||||
respond,
|
||||
context: context as never,
|
||||
});
|
||||
|
||||
const [ok, payload] = respond.mock.calls.at(-1) ?? [];
|
||||
expect(ok).toBe(true);
|
||||
expect(payload).toMatchObject({ aborted: true });
|
||||
expect(payload.runIds).toEqual(expect.arrayContaining(["run-a", "run-b"]));
|
||||
|
||||
const lines = await readTranscriptLines(transcriptPath);
|
||||
const runAPersisted = lines
|
||||
.map((line) => line.message)
|
||||
.find((message) => message?.idempotencyKey === "run-a:assistant");
|
||||
const runBPersisted = lines
|
||||
.map((line) => line.message)
|
||||
.find((message) => message?.idempotencyKey === "run-b:assistant");
|
||||
|
||||
expect(runAPersisted).toMatchObject({
|
||||
idempotencyKey: "run-a:assistant",
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: "rpc",
|
||||
runId: "run-a",
|
||||
},
|
||||
});
|
||||
expect(runBPersisted).toBeUndefined();
|
||||
});
|
||||
|
||||
it("persists /stop partials with stop-command metadata", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-stop-"));
|
||||
const transcriptPath = path.join(dir, "sess-main.jsonl");
|
||||
const sessionId = "sess-main";
|
||||
await writeTranscriptHeader(transcriptPath, sessionId);
|
||||
|
||||
const { chatHandlers } = await importChatHandlersWithSession(transcriptPath, sessionId);
|
||||
const respond = vi.fn();
|
||||
const context = {
|
||||
chatAbortControllers: new Map([["run-stop-1", createActiveRun("main", sessionId)]]),
|
||||
chatRunBuffers: new Map([["run-stop-1", "Partial from /stop"]]),
|
||||
chatDeltaSentAt: new Map([["run-stop-1", Date.now()]]),
|
||||
chatAbortedRuns: new Map<string, number>(),
|
||||
removeChatRun: vi.fn().mockReturnValue({ sessionKey: "main", clientRunId: "client-stop-1" }),
|
||||
agentRunSeq: new Map<string, number>([["run-stop-1", 1]]),
|
||||
broadcast: vi.fn(),
|
||||
nodeSendToSession: vi.fn(),
|
||||
logGateway: { warn: vi.fn() },
|
||||
dedupe: {
|
||||
get: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
await chatHandlers["chat.send"]({
|
||||
params: {
|
||||
sessionKey: "main",
|
||||
message: "/stop",
|
||||
idempotencyKey: "idem-stop-req",
|
||||
},
|
||||
respond,
|
||||
context: context as never,
|
||||
client: undefined,
|
||||
});
|
||||
|
||||
const [ok, payload] = respond.mock.calls.at(-1) ?? [];
|
||||
expect(ok).toBe(true);
|
||||
expect(payload).toMatchObject({ aborted: true, runIds: ["run-stop-1"] });
|
||||
|
||||
const lines = await readTranscriptLines(transcriptPath);
|
||||
const persisted = lines
|
||||
.map((line) => line.message)
|
||||
.find((message) => message?.idempotencyKey === "run-stop-1:assistant");
|
||||
|
||||
expect(persisted).toMatchObject({
|
||||
idempotencyKey: "run-stop-1:assistant",
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: "stop-command",
|
||||
runId: "run-stop-1",
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -15,6 +15,7 @@ import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
|
||||
import {
|
||||
abortChatRunById,
|
||||
abortChatRunsForSessionKey,
|
||||
type ChatAbortControllerEntry,
|
||||
isChatStopCommandText,
|
||||
resolveChatRunExpiresAtMs,
|
||||
} from "../chat-abort.js";
|
||||
@@ -49,6 +50,14 @@ type TranscriptAppendResult = {
|
||||
};
|
||||
|
||||
type AppendMessageArg = Parameters<SessionManager["appendMessage"]>[0];
|
||||
type AbortOrigin = "rpc" | "stop-command";
|
||||
|
||||
type AbortedPartialSnapshot = {
|
||||
runId: string;
|
||||
sessionId: string;
|
||||
text: string;
|
||||
abortOrigin: AbortOrigin;
|
||||
};
|
||||
|
||||
function stripDisallowedChatControlChars(message: string): string {
|
||||
let output = "";
|
||||
@@ -116,6 +125,24 @@ function ensureTranscriptFile(params: { transcriptPath: string; sessionId: strin
|
||||
}
|
||||
}
|
||||
|
||||
function transcriptHasIdempotencyKey(transcriptPath: string, idempotencyKey: string): boolean {
|
||||
try {
|
||||
const lines = fs.readFileSync(transcriptPath, "utf-8").split(/\r?\n/);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
continue;
|
||||
}
|
||||
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
|
||||
if (parsed?.message?.idempotencyKey === idempotencyKey) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function appendAssistantTranscriptMessage(params: {
|
||||
message: string;
|
||||
label?: string;
|
||||
@@ -124,6 +151,12 @@ function appendAssistantTranscriptMessage(params: {
|
||||
sessionFile?: string;
|
||||
agentId?: string;
|
||||
createIfMissing?: boolean;
|
||||
idempotencyKey?: string;
|
||||
abortMeta?: {
|
||||
aborted: true;
|
||||
origin: AbortOrigin;
|
||||
runId: string;
|
||||
};
|
||||
}): TranscriptAppendResult {
|
||||
const transcriptPath = resolveTranscriptPath({
|
||||
sessionId: params.sessionId,
|
||||
@@ -148,6 +181,10 @@ function appendAssistantTranscriptMessage(params: {
|
||||
}
|
||||
}
|
||||
|
||||
if (params.idempotencyKey && transcriptHasIdempotencyKey(transcriptPath, params.idempotencyKey)) {
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const labelPrefix = params.label ? `[${params.label}]\n\n` : "";
|
||||
const usage = {
|
||||
@@ -176,6 +213,16 @@ function appendAssistantTranscriptMessage(params: {
|
||||
api: "openai-responses",
|
||||
provider: "openclaw",
|
||||
model: "gateway-injected",
|
||||
...(params.idempotencyKey ? { idempotencyKey: params.idempotencyKey } : {}),
|
||||
...(params.abortMeta
|
||||
? {
|
||||
openclawAbort: {
|
||||
aborted: true,
|
||||
origin: params.abortMeta.origin,
|
||||
runId: params.abortMeta.runId,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
try {
|
||||
@@ -189,6 +236,63 @@ function appendAssistantTranscriptMessage(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function collectSessionAbortPartials(params: {
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
sessionKey: string;
|
||||
abortOrigin: AbortOrigin;
|
||||
}): AbortedPartialSnapshot[] {
|
||||
const out: AbortedPartialSnapshot[] = [];
|
||||
for (const [runId, active] of params.chatAbortControllers) {
|
||||
if (active.sessionKey !== params.sessionKey) {
|
||||
continue;
|
||||
}
|
||||
const text = params.chatRunBuffers.get(runId);
|
||||
if (!text || !text.trim()) {
|
||||
continue;
|
||||
}
|
||||
out.push({
|
||||
runId,
|
||||
sessionId: active.sessionId,
|
||||
text,
|
||||
abortOrigin: params.abortOrigin,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function persistAbortedPartials(params: {
|
||||
context: Pick<GatewayRequestContext, "logGateway">;
|
||||
sessionKey: string;
|
||||
snapshots: AbortedPartialSnapshot[];
|
||||
}) {
|
||||
if (params.snapshots.length === 0) {
|
||||
return;
|
||||
}
|
||||
const { storePath, entry } = loadSessionEntry(params.sessionKey);
|
||||
for (const snapshot of params.snapshots) {
|
||||
const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId;
|
||||
const appended = appendAssistantTranscriptMessage({
|
||||
message: snapshot.text,
|
||||
sessionId,
|
||||
storePath,
|
||||
sessionFile: entry?.sessionFile,
|
||||
createIfMissing: true,
|
||||
idempotencyKey: `${snapshot.runId}:assistant`,
|
||||
abortMeta: {
|
||||
aborted: true,
|
||||
origin: snapshot.abortOrigin,
|
||||
runId: snapshot.runId,
|
||||
},
|
||||
});
|
||||
if (!appended.ok) {
|
||||
params.context.logGateway.warn(
|
||||
`chat.abort transcript append failed: ${appended.error ?? "unknown error"}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function nextChatSeq(context: { agentRunSeq: Map<string, number> }, runId: string) {
|
||||
const next = (context.agentRunSeq.get(runId) ?? 0) + 1;
|
||||
context.agentRunSeq.set(runId, next);
|
||||
@@ -299,7 +403,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const { sessionKey, runId } = params as {
|
||||
const { sessionKey: rawSessionKey, runId } = params as {
|
||||
sessionKey: string;
|
||||
runId?: string;
|
||||
};
|
||||
@@ -316,10 +420,23 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
};
|
||||
|
||||
if (!runId) {
|
||||
const snapshots = collectSessionAbortPartials({
|
||||
chatAbortControllers: context.chatAbortControllers,
|
||||
chatRunBuffers: context.chatRunBuffers,
|
||||
sessionKey: rawSessionKey,
|
||||
abortOrigin: "rpc",
|
||||
});
|
||||
const res = abortChatRunsForSessionKey(ops, {
|
||||
sessionKey,
|
||||
sessionKey: rawSessionKey,
|
||||
stopReason: "rpc",
|
||||
});
|
||||
if (res.aborted) {
|
||||
persistAbortedPartials({
|
||||
context,
|
||||
sessionKey: rawSessionKey,
|
||||
snapshots,
|
||||
});
|
||||
}
|
||||
respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds });
|
||||
return;
|
||||
}
|
||||
@@ -329,7 +446,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
respond(true, { ok: true, aborted: false, runIds: [] });
|
||||
return;
|
||||
}
|
||||
if (active.sessionKey !== sessionKey) {
|
||||
if (active.sessionKey !== rawSessionKey) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
@@ -338,11 +455,26 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
|
||||
const partialText = context.chatRunBuffers.get(runId);
|
||||
const res = abortChatRunById(ops, {
|
||||
runId,
|
||||
sessionKey,
|
||||
sessionKey: rawSessionKey,
|
||||
stopReason: "rpc",
|
||||
});
|
||||
if (res.aborted && partialText && partialText.trim()) {
|
||||
persistAbortedPartials({
|
||||
context,
|
||||
sessionKey: rawSessionKey,
|
||||
snapshots: [
|
||||
{
|
||||
runId,
|
||||
sessionId: active.sessionId,
|
||||
text: partialText,
|
||||
abortOrigin: "rpc",
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
respond(true, {
|
||||
ok: true,
|
||||
aborted: res.aborted,
|
||||
@@ -437,6 +569,12 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
|
||||
if (stopCommand) {
|
||||
const snapshots = collectSessionAbortPartials({
|
||||
chatAbortControllers: context.chatAbortControllers,
|
||||
chatRunBuffers: context.chatRunBuffers,
|
||||
sessionKey: rawSessionKey,
|
||||
abortOrigin: "stop-command",
|
||||
});
|
||||
const res = abortChatRunsForSessionKey(
|
||||
{
|
||||
chatAbortControllers: context.chatAbortControllers,
|
||||
@@ -450,6 +588,13 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
},
|
||||
{ sessionKey: rawSessionKey, stopReason: "stop" },
|
||||
);
|
||||
if (res.aborted) {
|
||||
persistAbortedPartials({
|
||||
context,
|
||||
sessionKey: rawSessionKey,
|
||||
snapshots,
|
||||
});
|
||||
}
|
||||
respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -92,4 +92,125 @@ describe("handleChatEvent", () => {
|
||||
expect(state.chatStream).toBe(null);
|
||||
expect(state.chatStreamStartedAt).toBe(null);
|
||||
});
|
||||
|
||||
it("processes aborted from own run and keeps partial assistant message", () => {
|
||||
const existingMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hi" }],
|
||||
timestamp: 1,
|
||||
};
|
||||
const partialMessage = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Partial reply" }],
|
||||
timestamp: 2,
|
||||
};
|
||||
const state = createState({
|
||||
sessionKey: "main",
|
||||
chatRunId: "run-1",
|
||||
chatStream: "Partial reply",
|
||||
chatStreamStartedAt: 100,
|
||||
chatMessages: [existingMessage],
|
||||
});
|
||||
const payload: ChatEventPayload = {
|
||||
runId: "run-1",
|
||||
sessionKey: "main",
|
||||
state: "aborted",
|
||||
message: partialMessage,
|
||||
};
|
||||
|
||||
expect(handleChatEvent(state, payload)).toBe("aborted");
|
||||
expect(state.chatRunId).toBe(null);
|
||||
expect(state.chatStream).toBe(null);
|
||||
expect(state.chatStreamStartedAt).toBe(null);
|
||||
expect(state.chatMessages).toEqual([existingMessage, partialMessage]);
|
||||
});
|
||||
|
||||
it("falls back to streamed partial when aborted payload message is invalid", () => {
|
||||
const existingMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hi" }],
|
||||
timestamp: 1,
|
||||
};
|
||||
const state = createState({
|
||||
sessionKey: "main",
|
||||
chatRunId: "run-1",
|
||||
chatStream: "Partial reply",
|
||||
chatStreamStartedAt: 100,
|
||||
chatMessages: [existingMessage],
|
||||
});
|
||||
const payload = {
|
||||
runId: "run-1",
|
||||
sessionKey: "main",
|
||||
state: "aborted",
|
||||
message: "not-an-assistant-message",
|
||||
} as unknown as ChatEventPayload;
|
||||
|
||||
expect(handleChatEvent(state, payload)).toBe("aborted");
|
||||
expect(state.chatRunId).toBe(null);
|
||||
expect(state.chatStream).toBe(null);
|
||||
expect(state.chatStreamStartedAt).toBe(null);
|
||||
expect(state.chatMessages).toHaveLength(2);
|
||||
expect(state.chatMessages[0]).toEqual(existingMessage);
|
||||
expect(state.chatMessages[1]).toMatchObject({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Partial reply" }],
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to streamed partial when aborted payload has non-assistant role", () => {
|
||||
const existingMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hi" }],
|
||||
timestamp: 1,
|
||||
};
|
||||
const state = createState({
|
||||
sessionKey: "main",
|
||||
chatRunId: "run-1",
|
||||
chatStream: "Partial reply",
|
||||
chatStreamStartedAt: 100,
|
||||
chatMessages: [existingMessage],
|
||||
});
|
||||
const payload: ChatEventPayload = {
|
||||
runId: "run-1",
|
||||
sessionKey: "main",
|
||||
state: "aborted",
|
||||
message: {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "unexpected" }],
|
||||
},
|
||||
};
|
||||
|
||||
expect(handleChatEvent(state, payload)).toBe("aborted");
|
||||
expect(state.chatMessages).toHaveLength(2);
|
||||
expect(state.chatMessages[1]).toMatchObject({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "Partial reply" }],
|
||||
});
|
||||
});
|
||||
|
||||
it("processes aborted from own run without message and empty stream", () => {
|
||||
const existingMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hi" }],
|
||||
timestamp: 1,
|
||||
};
|
||||
const state = createState({
|
||||
sessionKey: "main",
|
||||
chatRunId: "run-1",
|
||||
chatStream: "",
|
||||
chatStreamStartedAt: 100,
|
||||
chatMessages: [existingMessage],
|
||||
});
|
||||
const payload: ChatEventPayload = {
|
||||
runId: "run-1",
|
||||
sessionKey: "main",
|
||||
state: "aborted",
|
||||
};
|
||||
|
||||
expect(handleChatEvent(state, payload)).toBe("aborted");
|
||||
expect(state.chatRunId).toBe(null);
|
||||
expect(state.chatStream).toBe(null);
|
||||
expect(state.chatStreamStartedAt).toBe(null);
|
||||
expect(state.chatMessages).toEqual([existingMessage]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -58,6 +58,20 @@ function dataUrlToBase64(dataUrl: string): { content: string; mimeType: string }
|
||||
return { mimeType: match[1], content: match[2] };
|
||||
}
|
||||
|
||||
function normalizeAbortedAssistantMessage(message: unknown): Record<string, unknown> | null {
|
||||
if (!message || typeof message !== "object") {
|
||||
return null;
|
||||
}
|
||||
const candidate = message as Record<string, unknown>;
|
||||
if (candidate.role !== "assistant") {
|
||||
return null;
|
||||
}
|
||||
if (!("content" in candidate) || !Array.isArray(candidate.content)) {
|
||||
return null;
|
||||
}
|
||||
return candidate;
|
||||
}
|
||||
|
||||
export async function sendChatMessage(
|
||||
state: ChatState,
|
||||
message: string,
|
||||
@@ -198,6 +212,22 @@ export function handleChatEvent(state: ChatState, payload?: ChatEventPayload) {
|
||||
state.chatRunId = null;
|
||||
state.chatStreamStartedAt = null;
|
||||
} else if (payload.state === "aborted") {
|
||||
const normalizedMessage = normalizeAbortedAssistantMessage(payload.message);
|
||||
if (normalizedMessage) {
|
||||
state.chatMessages = [...state.chatMessages, normalizedMessage];
|
||||
} else {
|
||||
const streamedText = state.chatStream ?? "";
|
||||
if (streamedText.trim()) {
|
||||
state.chatMessages = [
|
||||
...state.chatMessages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: streamedText }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
];
|
||||
}
|
||||
}
|
||||
state.chatStream = null;
|
||||
state.chatRunId = null;
|
||||
state.chatStreamStartedAt = null;
|
||||
|
||||
Reference in New Issue
Block a user