diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index a45fe4e12..439ca90eb 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -196,6 +196,24 @@ function mockSingleSuccessfulAttempt() { ); } +function mockSingleErrorAttempt(params: { + errorMessage: string; + provider?: string; + model?: string; +}) { + runEmbeddedAttemptMock.mockResolvedValueOnce( + makeAttempt({ + assistantTexts: [], + lastAssistant: buildAssistant({ + stopReason: "error", + errorMessage: params.errorMessage, + ...(params.provider ? { provider: params.provider } : {}), + ...(params.model ? { model: params.model } : {}), + }), + }), + ); +} + async function withTimedAgentWorkspace( run: (ctx: { agentDir: string; workspaceDir: string; now: number }) => Promise, ) { @@ -347,15 +365,7 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { try { await writeAuthStore(agentDir); - runEmbeddedAttemptMock.mockResolvedValueOnce( - makeAttempt({ - assistantTexts: [], - lastAssistant: buildAssistant({ - stopReason: "error", - errorMessage: "rate limit", - }), - }), - ); + mockSingleErrorAttempt({ errorMessage: "rate limit" }); await runEmbeddedPiAgent({ sessionId: "session:test", @@ -523,17 +533,11 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-")); try { await writeAuthStore(agentDir); - runEmbeddedAttemptMock.mockResolvedValueOnce( - makeAttempt({ - assistantTexts: [], - lastAssistant: buildAssistant({ - stopReason: "error", - errorMessage: "insufficient credits", - provider: "openai", - model: "mock-rotated", - }), - }), - ); + mockSingleErrorAttempt({ + errorMessage: "insufficient credits", + provider: "openai", + model: "mock-rotated", + }); let thrown: unknown; try { diff --git a/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping-f.e2e.test.ts b/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping-f.e2e.test.ts index 2db54ddc0..a040a9a89 100644 --- a/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping-f.e2e.test.ts +++ b/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping-f.e2e.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { describe, expect, it } from "vitest"; import "./test-helpers/fast-coding-tools.js"; import { createOpenClawCodingTools } from "./pi-tools.js"; +import { expectReadWriteEditTools } from "./test-helpers/pi-tools-fs-helpers.js"; describe("createOpenClawCodingTools", () => { it("uses workspaceDir for Read tool path resolution", async () => { @@ -88,12 +89,7 @@ describe("createOpenClawCodingTools", () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-alias-")); try { const tools = createOpenClawCodingTools({ workspaceDir: tmpDir }); - const readTool = tools.find((tool) => tool.name === "read"); - const writeTool = tools.find((tool) => tool.name === "write"); - const editTool = tools.find((tool) => tool.name === "edit"); - expect(readTool).toBeDefined(); - expect(writeTool).toBeDefined(); - expect(editTool).toBeDefined(); + const { readTool, writeTool, editTool } = expectReadWriteEditTools(tools); const filePath = "alias-test.txt"; await writeTool?.execute("tool-alias-1", { diff --git a/src/agents/pi-tools.sandbox-mounted-paths.workspace-only.test.ts b/src/agents/pi-tools.sandbox-mounted-paths.workspace-only.test.ts index 1d08f1a90..f40489f20 100644 --- a/src/agents/pi-tools.sandbox-mounted-paths.workspace-only.test.ts +++ b/src/agents/pi-tools.sandbox-mounted-paths.workspace-only.test.ts @@ -7,6 +7,11 @@ import { createOpenClawCodingTools } from "./pi-tools.js"; import type { SandboxContext } from "./sandbox.js"; import type { SandboxFsBridge, SandboxResolvedPath } from "./sandbox/fs-bridge.js"; import { createSandboxFsBridgeFromResolver } from "./test-helpers/host-sandbox-fs-bridge.js"; +import { + expectReadWriteEditTools, + expectReadWriteTools, + getTextContent, +} from "./test-helpers/pi-tools-fs-helpers.js"; import { createPiToolsSandboxContext } from "./test-helpers/pi-tools-sandbox-context.js"; vi.mock("../infra/shell-env.js", async (importOriginal) => { @@ -14,11 +19,6 @@ vi.mock("../infra/shell-env.js", async (importOriginal) => { return { ...mod, getShellPathFromLoginShell: () => null }; }); -function getTextContent(result?: { content?: Array<{ type: string; text?: string }> }) { - const textBlock = result?.content?.find((block) => block.type === "text"); - return textBlock?.text ?? ""; -} - function createUnsafeMountedBridge(params: { root: string; agentHostRoot: string; @@ -96,10 +96,7 @@ describe("tools.fs.workspaceOnly", () => { await fs.writeFile(path.join(agentRoot, "secret.txt"), "shh", "utf8"); const tools = createOpenClawCodingTools({ sandbox, workspaceDir: sandboxRoot }); - const readTool = tools.find((tool) => tool.name === "read"); - const writeTool = tools.find((tool) => tool.name === "write"); - expect(readTool).toBeDefined(); - expect(writeTool).toBeDefined(); + const { readTool, writeTool } = expectReadWriteTools(tools); const readResult = await readTool?.execute("t1", { path: "/agent/secret.txt" }); expect(getTextContent(readResult)).toContain("shh"); @@ -115,12 +112,7 @@ describe("tools.fs.workspaceOnly", () => { const cfg = { tools: { fs: { workspaceOnly: true } } } as unknown as OpenClawConfig; const tools = createOpenClawCodingTools({ sandbox, workspaceDir: sandboxRoot, config: cfg }); - const readTool = tools.find((tool) => tool.name === "read"); - const writeTool = tools.find((tool) => tool.name === "write"); - const editTool = tools.find((tool) => tool.name === "edit"); - expect(readTool).toBeDefined(); - expect(writeTool).toBeDefined(); - expect(editTool).toBeDefined(); + const { readTool, writeTool, editTool } = expectReadWriteEditTools(tools); await expect(readTool?.execute("t1", { path: "/agent/secret.txt" })).rejects.toThrow( /Path escapes sandbox root/i, diff --git a/src/agents/pi-tools.workspace-paths.e2e.test.ts b/src/agents/pi-tools.workspace-paths.e2e.test.ts index de0d73827..02cf247dc 100644 --- a/src/agents/pi-tools.workspace-paths.e2e.test.ts +++ b/src/agents/pi-tools.workspace-paths.e2e.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; import { createOpenClawCodingTools } from "./pi-tools.js"; import { createHostSandboxFsBridge } from "./test-helpers/host-sandbox-fs-bridge.js"; +import { expectReadWriteEditTools, getTextContent } from "./test-helpers/pi-tools-fs-helpers.js"; import { createPiToolsSandboxContext } from "./test-helpers/pi-tools-sandbox-context.js"; vi.mock("../infra/shell-env.js", async (importOriginal) => { @@ -19,11 +20,6 @@ async function withTempDir(prefix: string, fn: (dir: string) => Promise) { } } -function getTextContent(result?: { content?: Array<{ type: string; text?: string }> }) { - const textBlock = result?.content?.find((block) => block.type === "text"); - return textBlock?.text ?? ""; -} - describe("workspace path resolution", () => { it("reads relative paths against workspaceDir even after cwd changes", async () => { await withTempDir("openclaw-ws-", async (workspaceDir) => { @@ -171,13 +167,7 @@ describe("sandboxed workspace paths", () => { await fs.writeFile(path.join(workspaceDir, testFile), "workspace read", "utf8"); const tools = createOpenClawCodingTools({ workspaceDir, sandbox }); - const readTool = tools.find((tool) => tool.name === "read"); - const writeTool = tools.find((tool) => tool.name === "write"); - const editTool = tools.find((tool) => tool.name === "edit"); - - expect(readTool).toBeDefined(); - expect(writeTool).toBeDefined(); - expect(editTool).toBeDefined(); + const { readTool, writeTool, editTool } = expectReadWriteEditTools(tools); const result = await readTool?.execute("sbx-read", { path: testFile }); expect(getTextContent(result)).toContain("sandbox read"); diff --git a/src/agents/skills.buildworkspaceskillsnapshot.e2e.test.ts b/src/agents/skills.buildworkspaceskillsnapshot.e2e.test.ts index 2b7e01d3d..1ec75e420 100644 --- a/src/agents/skills.buildworkspaceskillsnapshot.e2e.test.ts +++ b/src/agents/skills.buildworkspaceskillsnapshot.e2e.test.ts @@ -1,25 +1,19 @@ import fs from "node:fs/promises"; -import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; +import { createTrackedTempDirs } from "../test-utils/tracked-temp-dirs.js"; import { writeSkill } from "./skills.e2e-test-helpers.js"; import { buildWorkspaceSkillSnapshot } from "./skills.js"; -const tempDirs: string[] = []; - -async function createTempDir(prefix: string) { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); - tempDirs.push(dir); - return dir; -} +const tempDirs = createTrackedTempDirs(); afterEach(async () => { - await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); + await tempDirs.cleanup(); }); describe("buildWorkspaceSkillSnapshot", () => { it("returns an empty snapshot when skills dirs are missing", async () => { - const workspaceDir = await createTempDir("openclaw-"); + const workspaceDir = await tempDirs.make("openclaw-"); const snapshot = buildWorkspaceSkillSnapshot(workspaceDir, { managedSkillsDir: path.join(workspaceDir, ".managed"), @@ -31,7 +25,7 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("omits disable-model-invocation skills from the prompt", async () => { - const workspaceDir = await createTempDir("openclaw-"); + const workspaceDir = await tempDirs.make("openclaw-"); await writeSkill({ dir: path.join(workspaceDir, "skills", "visible-skill"), name: "visible-skill", @@ -58,7 +52,7 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("truncates the skills prompt when it exceeds the configured char budget", async () => { - const workspaceDir = await createTempDir("openclaw-"); + const workspaceDir = await tempDirs.make("openclaw-"); // Make a bunch of skills with very long descriptions. for (let i = 0; i < 25; i += 1) { @@ -88,8 +82,8 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("limits discovery for nested repo-style skills roots (dir/skills/*)", async () => { - const workspaceDir = await createTempDir("openclaw-"); - const repoDir = await createTempDir("openclaw-skills-repo-"); + const workspaceDir = await tempDirs.make("openclaw-"); + const repoDir = await tempDirs.make("openclaw-skills-repo-"); for (let i = 0; i < 20; i += 1) { const name = `repo-skill-${String(i).padStart(2, "0")}`; @@ -123,7 +117,7 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("skips skills whose SKILL.md exceeds maxSkillFileBytes", async () => { - const workspaceDir = await createTempDir("openclaw-"); + const workspaceDir = await tempDirs.make("openclaw-"); await writeSkill({ dir: path.join(workspaceDir, "skills", "small-skill"), @@ -157,8 +151,8 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("detects nested skills roots beyond the first 25 entries", async () => { - const workspaceDir = await createTempDir("openclaw-"); - const repoDir = await createTempDir("openclaw-skills-repo-"); + const workspaceDir = await tempDirs.make("openclaw-"); + const repoDir = await tempDirs.make("openclaw-skills-repo-"); // Create 30 nested dirs, but only the last one is an actual skill. for (let i = 0; i < 30; i += 1) { @@ -194,8 +188,8 @@ describe("buildWorkspaceSkillSnapshot", () => { }); it("enforces maxSkillFileBytes for root-level SKILL.md", async () => { - const workspaceDir = await createTempDir("openclaw-"); - const rootSkillDir = await createTempDir("openclaw-root-skill-"); + const workspaceDir = await tempDirs.make("openclaw-"); + const rootSkillDir = await tempDirs.make("openclaw-root-skill-"); await writeSkill({ dir: rootSkillDir, diff --git a/src/agents/test-helpers/pi-tools-fs-helpers.ts b/src/agents/test-helpers/pi-tools-fs-helpers.ts new file mode 100644 index 000000000..90fbf5157 --- /dev/null +++ b/src/agents/test-helpers/pi-tools-fs-helpers.ts @@ -0,0 +1,33 @@ +import { expect } from "vitest"; + +type TextResultBlock = { type: string; text?: string }; + +export function getTextContent(result?: { content?: TextResultBlock[] }) { + const textBlock = result?.content?.find((block) => block.type === "text"); + return textBlock?.text ?? ""; +} + +export function expectReadWriteEditTools(tools: T[]) { + const readTool = tools.find((tool) => tool.name === "read"); + const writeTool = tools.find((tool) => tool.name === "write"); + const editTool = tools.find((tool) => tool.name === "edit"); + expect(readTool).toBeDefined(); + expect(writeTool).toBeDefined(); + expect(editTool).toBeDefined(); + return { + readTool: readTool as T, + writeTool: writeTool as T, + editTool: editTool as T, + }; +} + +export function expectReadWriteTools(tools: T[]) { + const readTool = tools.find((tool) => tool.name === "read"); + const writeTool = tools.find((tool) => tool.name === "write"); + expect(readTool).toBeDefined(); + expect(writeTool).toBeDefined(); + return { + readTool: readTool as T, + writeTool: writeTool as T, + }; +} diff --git a/src/agents/volc-models.shared.ts b/src/agents/volc-models.shared.ts index f74af8918..8ce5f08ca 100644 --- a/src/agents/volc-models.shared.ts +++ b/src/agents/volc-models.shared.ts @@ -4,7 +4,7 @@ export type VolcModelCatalogEntry = { id: string; name: string; reasoning: boolean; - input: readonly string[]; + input: ReadonlyArray; contextWindow: number; maxTokens: number; }; diff --git a/src/auto-reply/reply.triggers.trigger-handling.allows-elevated-off-groups-without-mention.e2e.test.ts b/src/auto-reply/reply.triggers.trigger-handling.allows-elevated-off-groups-without-mention.e2e.test.ts index a73f84aae..034eeb7cd 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.allows-elevated-off-groups-without-mention.e2e.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.allows-elevated-off-groups-without-mention.e2e.test.ts @@ -1,4 +1,3 @@ -import fs from "node:fs/promises"; import { beforeAll, describe, expect, it } from "vitest"; import { loadSessionStore } from "../config/sessions.js"; import { @@ -6,6 +5,7 @@ import { loadGetReplyFromConfig, MAIN_SESSION_KEY, makeWhatsAppElevatedCfg, + readSessionStore, requireSessionStorePath, runDirectElevatedToggleAndLoadStore, withTempHome, @@ -66,8 +66,7 @@ describe("trigger handling", () => { const text = Array.isArray(res) ? res[0]?.text : res?.text; expect(text).toContain("Elevated mode set to ask"); - const storeRaw = await fs.readFile(requireSessionStorePath(cfg), "utf-8"); - const store = JSON.parse(storeRaw) as Record; + const store = await readSessionStore(cfg); expect(store["agent:main:whatsapp:group:123@g.us"]?.elevatedLevel).toBe("on"); }); }); diff --git a/src/auto-reply/reply.triggers.trigger-handling.ignores-inline-elevated-directive-unapproved-sender.e2e.test.ts b/src/auto-reply/reply.triggers.trigger-handling.ignores-inline-elevated-directive-unapproved-sender.e2e.test.ts index d0c80b74b..87dea35d9 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.ignores-inline-elevated-directive-unapproved-sender.e2e.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.ignores-inline-elevated-directive-unapproved-sender.e2e.test.ts @@ -1,4 +1,3 @@ -import fs from "node:fs/promises"; import { join } from "node:path"; import { beforeAll, describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; @@ -9,7 +8,7 @@ import { MAIN_SESSION_KEY, makeCfg, makeWhatsAppElevatedCfg, - requireSessionStorePath, + readSessionStore, withTempHome, } from "./reply.triggers.trigger-handling.test-harness.js"; @@ -78,8 +77,7 @@ describe("trigger handling", () => { const text = Array.isArray(res) ? res[0]?.text : res?.text; expect(text).toContain("Elevated mode set to ask"); - const storeRaw = await fs.readFile(requireSessionStorePath(cfg), "utf-8"); - const store = JSON.parse(storeRaw) as Record; + const store = await readSessionStore(cfg); expect(store[MAIN_SESSION_KEY]?.elevatedLevel).toBe("on"); }); }); diff --git a/src/auto-reply/reply.triggers.trigger-handling.test-harness.ts b/src/auto-reply/reply.triggers.trigger-handling.test-harness.ts index e5113d230..baba2527b 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.test-harness.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.test-harness.ts @@ -147,6 +147,13 @@ export function requireSessionStorePath(cfg: { session?: { store?: string } }): return storePath; } +export async function readSessionStore(cfg: { + session?: { store?: string }; +}): Promise> { + const storeRaw = await fs.readFile(requireSessionStorePath(cfg), "utf-8"); + return JSON.parse(storeRaw) as Record; +} + export function makeWhatsAppElevatedCfg( home: string, opts?: { elevatedEnabled?: boolean; requireMentionInGroups?: boolean }, @@ -196,8 +203,7 @@ export async function runDirectElevatedToggleAndLoadStore(params: { if (!storePath) { throw new Error("session.store is required in test config"); } - const storeRaw = await fs.readFile(storePath, "utf-8"); - const store = JSON.parse(storeRaw) as Record; + const store = await readSessionStore(params.cfg); return { text, store }; } diff --git a/src/channels/dock.ts b/src/channels/dock.ts index b881a1008..12fd9c32d 100644 --- a/src/channels/dock.ts +++ b/src/channels/dock.ts @@ -148,6 +148,19 @@ function resolveCaseInsensitiveAccount( ] ); } + +function resolveDefaultToCaseInsensitiveAccount(params: { + channel?: + | { + accounts?: Record; + defaultTo?: string; + } + | undefined; + accountId?: string | null; +}): string | undefined { + const account = resolveCaseInsensitiveAccount(params.channel?.accounts, params.accountId); + return (account?.defaultTo ?? params.channel?.defaultTo)?.trim() || undefined; +} // Channel docks: lightweight channel metadata/behavior for shared code paths. // // Rules: @@ -331,15 +344,7 @@ const DOCKS: Record = { const channel = cfg.channels?.irc as | { accounts?: Record; defaultTo?: string } | undefined; - const normalized = normalizeAccountId(accountId); - const account = - channel?.accounts?.[normalized] ?? - channel?.accounts?.[ - Object.keys(channel?.accounts ?? {}).find( - (key) => key.toLowerCase() === normalized.toLowerCase(), - ) ?? "" - ]; - return (account?.defaultTo ?? channel?.defaultTo)?.trim() || undefined; + return resolveDefaultToCaseInsensitiveAccount({ channel, accountId }); }, }, groups: { @@ -412,15 +417,7 @@ const DOCKS: Record = { const channel = cfg.channels?.googlechat as | { accounts?: Record; defaultTo?: string } | undefined; - const normalized = normalizeAccountId(accountId); - const account = - channel?.accounts?.[normalized] ?? - channel?.accounts?.[ - Object.keys(channel?.accounts ?? {}).find( - (key) => key.toLowerCase() === normalized.toLowerCase(), - ) ?? "" - ]; - return (account?.defaultTo ?? channel?.defaultTo)?.trim() || undefined; + return resolveDefaultToCaseInsensitiveAccount({ channel, accountId }); }, }, groups: { diff --git a/src/channels/draft-stream-controls.ts b/src/channels/draft-stream-controls.ts new file mode 100644 index 000000000..056e69f69 --- /dev/null +++ b/src/channels/draft-stream-controls.ts @@ -0,0 +1,139 @@ +import { createDraftStreamLoop } from "./draft-stream-loop.js"; + +export type FinalizableDraftStreamState = { + stopped: boolean; + final: boolean; +}; + +export function createFinalizableDraftStreamControls(params: { + throttleMs: number; + isStopped: () => boolean; + isFinal: () => boolean; + markStopped: () => void; + markFinal: () => void; + sendOrEditStreamMessage: (text: string) => Promise; +}) { + const loop = createDraftStreamLoop({ + throttleMs: params.throttleMs, + isStopped: params.isStopped, + sendOrEditStreamMessage: params.sendOrEditStreamMessage, + }); + + const update = (text: string) => { + if (params.isStopped() || params.isFinal()) { + return; + } + loop.update(text); + }; + + const stop = async (): Promise => { + params.markFinal(); + await loop.flush(); + }; + + const stopForClear = async (): Promise => { + params.markStopped(); + loop.stop(); + await loop.waitForInFlight(); + }; + + return { + loop, + update, + stop, + stopForClear, + }; +} + +export function createFinalizableDraftStreamControlsForState(params: { + throttleMs: number; + state: FinalizableDraftStreamState; + sendOrEditStreamMessage: (text: string) => Promise; +}) { + return createFinalizableDraftStreamControls({ + throttleMs: params.throttleMs, + isStopped: () => params.state.stopped, + isFinal: () => params.state.final, + markStopped: () => { + params.state.stopped = true; + }, + markFinal: () => { + params.state.final = true; + }, + sendOrEditStreamMessage: params.sendOrEditStreamMessage, + }); +} + +export async function takeMessageIdAfterStop(params: { + stopForClear: () => Promise; + readMessageId: () => T | undefined; + clearMessageId: () => void; +}): Promise { + await params.stopForClear(); + const messageId = params.readMessageId(); + params.clearMessageId(); + return messageId; +} + +export async function clearFinalizableDraftMessage(params: { + stopForClear: () => Promise; + readMessageId: () => T | undefined; + clearMessageId: () => void; + isValidMessageId: (value: unknown) => value is T; + deleteMessage: (messageId: T) => Promise; + onDeleteSuccess?: (messageId: T) => void; + warn?: (message: string) => void; + warnPrefix: string; +}): Promise { + const messageId = await takeMessageIdAfterStop({ + stopForClear: params.stopForClear, + readMessageId: params.readMessageId, + clearMessageId: params.clearMessageId, + }); + if (!params.isValidMessageId(messageId)) { + return; + } + try { + await params.deleteMessage(messageId); + params.onDeleteSuccess?.(messageId); + } catch (err) { + params.warn?.(`${params.warnPrefix}: ${err instanceof Error ? err.message : String(err)}`); + } +} + +export function createFinalizableDraftLifecycle(params: { + throttleMs: number; + state: FinalizableDraftStreamState; + sendOrEditStreamMessage: (text: string) => Promise; + readMessageId: () => T | undefined; + clearMessageId: () => void; + isValidMessageId: (value: unknown) => value is T; + deleteMessage: (messageId: T) => Promise; + onDeleteSuccess?: (messageId: T) => void; + warn?: (message: string) => void; + warnPrefix: string; +}) { + const controls = createFinalizableDraftStreamControlsForState({ + throttleMs: params.throttleMs, + state: params.state, + sendOrEditStreamMessage: params.sendOrEditStreamMessage, + }); + + const clear = async () => { + await clearFinalizableDraftMessage({ + stopForClear: controls.stopForClear, + readMessageId: params.readMessageId, + clearMessageId: params.clearMessageId, + isValidMessageId: params.isValidMessageId, + deleteMessage: params.deleteMessage, + onDeleteSuccess: params.onDeleteSuccess, + warn: params.warn, + warnPrefix: params.warnPrefix, + }); + }; + + return { + ...controls, + clear, + }; +} diff --git a/src/discord/draft-stream.ts b/src/discord/draft-stream.ts index 835fee234..108ca09ba 100644 --- a/src/discord/draft-stream.ts +++ b/src/discord/draft-stream.ts @@ -1,6 +1,6 @@ import type { RequestClient } from "@buape/carbon"; import { Routes } from "discord-api-types/v10"; -import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; +import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js"; /** Discord messages cap at 2000 characters. */ const DISCORD_STREAM_MAX_CHARS = 2000; @@ -37,14 +37,13 @@ export function createDiscordDraftStream(params: { ? params.replyToMessageId() : params.replyToMessageId; + const streamState = { stopped: false, final: false }; let streamMessageId: string | undefined; let lastSentText = ""; - let stopped = false; - let isFinal = false; const sendOrEditStreamMessage = async (text: string): Promise => { // Allow final flush even if stopped (e.g., after clear()). - if (stopped && !isFinal) { + if (streamState.stopped && !streamState.final) { return false; } const trimmed = text.trimEnd(); @@ -54,7 +53,7 @@ export function createDiscordDraftStream(params: { if (trimmed.length > maxChars) { // Discord messages cap at 2000 chars. // Stop streaming once we exceed the cap to avoid repeated API failures. - stopped = true; + streamState.stopped = true; params.warn?.(`discord stream preview stopped (text length ${trimmed.length} > ${maxChars})`); return false; } @@ -63,7 +62,7 @@ export function createDiscordDraftStream(params: { } // Debounce first preview send for better push notification quality. - if (streamMessageId === undefined && minInitialChars != null && !isFinal) { + if (streamMessageId === undefined && minInitialChars != null && !streamState.final) { if (trimmed.length < minInitialChars) { return false; } @@ -91,14 +90,14 @@ export function createDiscordDraftStream(params: { })) as { id?: string } | undefined; const sentMessageId = sent?.id; if (typeof sentMessageId !== "string" || !sentMessageId) { - stopped = true; + streamState.stopped = true; params.warn?.("discord stream preview stopped (missing message id from send)"); return false; } streamMessageId = sentMessageId; return true; } catch (err) { - stopped = true; + streamState.stopped = true; params.warn?.( `discord stream preview failed: ${err instanceof Error ? err.message : String(err)}`, ); @@ -106,42 +105,20 @@ export function createDiscordDraftStream(params: { } }; - const loop = createDraftStreamLoop({ + const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ throttleMs, - isStopped: () => stopped, + state: streamState, sendOrEditStreamMessage, + readMessageId: () => streamMessageId, + clearMessageId: () => { + streamMessageId = undefined; + }, + isValidMessageId: (value): value is string => typeof value === "string", + deleteMessage: (messageId) => rest.delete(Routes.channelMessage(channelId, messageId)), + warn: params.warn, + warnPrefix: "discord stream preview cleanup failed", }); - const update = (text: string) => { - if (stopped || isFinal) { - return; - } - loop.update(text); - }; - - const stop = async (): Promise => { - isFinal = true; - await loop.flush(); - }; - - const clear = async () => { - stopped = true; - loop.stop(); - await loop.waitForInFlight(); - const messageId = streamMessageId; - streamMessageId = undefined; - if (typeof messageId !== "string") { - return; - } - try { - await rest.delete(Routes.channelMessage(channelId, messageId)); - } catch (err) { - params.warn?.( - `discord stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - }; - const forceNewMessage = () => { streamMessageId = undefined; lastSentText = ""; diff --git a/src/infra/fs-safe.test.ts b/src/infra/fs-safe.test.ts index e15a953ec..020591495 100644 --- a/src/infra/fs-safe.test.ts +++ b/src/infra/fs-safe.test.ts @@ -1,24 +1,18 @@ import fs from "node:fs/promises"; -import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; +import { createTrackedTempDirs } from "../test-utils/tracked-temp-dirs.js"; import { SafeOpenError, openFileWithinRoot, readLocalFileSafely } from "./fs-safe.js"; -const tempDirs: string[] = []; - -async function makeTempDir(prefix: string): Promise { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); - tempDirs.push(dir); - return dir; -} +const tempDirs = createTrackedTempDirs(); afterEach(async () => { - await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); + await tempDirs.cleanup(); }); describe("fs-safe", () => { it("reads a local file safely", async () => { - const dir = await makeTempDir("openclaw-fs-safe-"); + const dir = await tempDirs.make("openclaw-fs-safe-"); const file = path.join(dir, "payload.txt"); await fs.writeFile(file, "hello"); @@ -29,14 +23,14 @@ describe("fs-safe", () => { }); it("rejects directories", async () => { - const dir = await makeTempDir("openclaw-fs-safe-"); + const dir = await tempDirs.make("openclaw-fs-safe-"); await expect(readLocalFileSafely({ filePath: dir })).rejects.toMatchObject({ code: "not-file", }); }); it("enforces maxBytes", async () => { - const dir = await makeTempDir("openclaw-fs-safe-"); + const dir = await tempDirs.make("openclaw-fs-safe-"); const file = path.join(dir, "big.bin"); await fs.writeFile(file, Buffer.alloc(8)); @@ -46,7 +40,7 @@ describe("fs-safe", () => { }); it.runIf(process.platform !== "win32")("rejects symlinks", async () => { - const dir = await makeTempDir("openclaw-fs-safe-"); + const dir = await tempDirs.make("openclaw-fs-safe-"); const target = path.join(dir, "target.txt"); const link = path.join(dir, "link.txt"); await fs.writeFile(target, "target"); @@ -58,8 +52,8 @@ describe("fs-safe", () => { }); it("blocks traversal outside root", async () => { - const root = await makeTempDir("openclaw-fs-safe-root-"); - const outside = await makeTempDir("openclaw-fs-safe-outside-"); + const root = await tempDirs.make("openclaw-fs-safe-root-"); + const outside = await tempDirs.make("openclaw-fs-safe-outside-"); const file = path.join(outside, "outside.txt"); await fs.writeFile(file, "outside"); @@ -72,8 +66,8 @@ describe("fs-safe", () => { }); it.runIf(process.platform !== "win32")("blocks symlink escapes under root", async () => { - const root = await makeTempDir("openclaw-fs-safe-root-"); - const outside = await makeTempDir("openclaw-fs-safe-outside-"); + const root = await tempDirs.make("openclaw-fs-safe-root-"); + const outside = await tempDirs.make("openclaw-fs-safe-outside-"); const target = path.join(outside, "outside.txt"); const link = path.join(root, "link.txt"); await fs.writeFile(target, "outside"); @@ -88,7 +82,7 @@ describe("fs-safe", () => { }); it("returns not-found for missing files", async () => { - const dir = await makeTempDir("openclaw-fs-safe-"); + const dir = await tempDirs.make("openclaw-fs-safe-"); const missing = path.join(dir, "missing.txt"); await expect(readLocalFileSafely({ filePath: missing })).rejects.toBeInstanceOf(SafeOpenError); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index bcab90563..7f9d92dc7 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -1,5 +1,5 @@ import type { Bot } from "grammy"; -import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; +import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; @@ -55,16 +55,15 @@ export function createTelegramDraftStream(params: { ? { ...threadParams, reply_to_message_id: params.replyToMessageId } : threadParams; + const streamState = { stopped: false, final: false }; let streamMessageId: number | undefined; let lastSentText = ""; let lastSentParseMode: "HTML" | undefined; - let stopped = false; - let isFinal = false; let generation = 0; const sendOrEditStreamMessage = async (text: string): Promise => { // Allow final flush even if stopped (e.g., after clear()). - if (stopped && !isFinal) { + if (streamState.stopped && !streamState.final) { return false; } const trimmed = text.trimEnd(); @@ -80,7 +79,7 @@ export function createTelegramDraftStream(params: { if (renderedText.length > maxChars) { // Telegram text messages/edits cap at 4096 chars. // Stop streaming once we exceed the cap to avoid repeated API failures. - stopped = true; + streamState.stopped = true; params.warn?.( `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, ); @@ -92,7 +91,7 @@ export function createTelegramDraftStream(params: { const sendGeneration = generation; // Debounce first preview send for better push notification quality. - if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) { + if (typeof streamMessageId !== "number" && minInitialChars != null && !streamState.final) { if (renderedText.length < minInitialChars) { return false; } @@ -120,7 +119,7 @@ export function createTelegramDraftStream(params: { const sent = await params.api.sendMessage(chatId, renderedText, sendParams); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { - stopped = true; + streamState.stopped = true; params.warn?.("telegram stream preview stopped (missing message id from sendMessage)"); return false; } @@ -136,7 +135,7 @@ export function createTelegramDraftStream(params: { streamMessageId = normalizedMessageId; return true; } catch (err) { - stopped = true; + streamState.stopped = true; params.warn?.( `telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`, ); @@ -144,42 +143,23 @@ export function createTelegramDraftStream(params: { } }; - const loop = createDraftStreamLoop({ + const { loop, update, stop, clear } = createFinalizableDraftLifecycle({ throttleMs, - isStopped: () => stopped, + state: streamState, sendOrEditStreamMessage, - }); - - const update = (text: string) => { - if (stopped || isFinal) { - return; - } - loop.update(text); - }; - - const stop = async (): Promise => { - isFinal = true; - await loop.flush(); - }; - - const clear = async () => { - stopped = true; - loop.stop(); - await loop.waitForInFlight(); - const messageId = streamMessageId; - streamMessageId = undefined; - if (typeof messageId !== "number") { - return; - } - try { - await params.api.deleteMessage(chatId, messageId); + readMessageId: () => streamMessageId, + clearMessageId: () => { + streamMessageId = undefined; + }, + isValidMessageId: (value): value is number => + typeof value === "number" && Number.isFinite(value), + deleteMessage: (messageId) => params.api.deleteMessage(chatId, messageId), + onDeleteSuccess: (messageId) => { params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); - } catch (err) { - params.warn?.( - `telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - }; + }, + warn: params.warn, + warnPrefix: "telegram stream preview cleanup failed", + }); const forceNewMessage = () => { generation += 1; diff --git a/src/test-utils/tracked-temp-dirs.ts b/src/test-utils/tracked-temp-dirs.ts new file mode 100644 index 000000000..c4fa7ba2b --- /dev/null +++ b/src/test-utils/tracked-temp-dirs.ts @@ -0,0 +1,18 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +export function createTrackedTempDirs() { + const dirs: string[] = []; + + return { + async make(prefix: string): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); + dirs.push(dir); + return dir; + }, + async cleanup(): Promise { + await Promise.all(dirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); + }, + }; +}