fix(memory): readonly sync recovery (openclaw#25799) thanks @rodrigouroz
Verified: - pnpm build - pnpm check - pnpm test:macmini (fails in this environment at src/daemon/launchd.integration.test.ts beforeAll hook timeout; merged with Tak override) Co-authored-by: rodrigouroz <384037+rodrigouroz@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -115,6 +115,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Security/Voice Call (Twilio): bind webhook replay + manager dedupe identity to authenticated request material, remove unsigned `i-twilio-idempotency-token` trust from replay/dedupe keys, and thread verified request identity through provider parse flow to harden cross-provider event dedupe. This ships in the next npm release (`2026.2.26`). Thanks @tdjackey for reporting.
|
||||
- Security/Exec approvals forwarding: prefer turn-source channel/account/thread metadata when resolving approval delivery targets so stale session routes do not misroute approval prompts.
|
||||
- Security/Pairing multi-account isolation: enforce account-scoped pairing allowlists and pending-request storage across core + extension message channels while preserving channel-scoped defaults for the default account. This ships in the next npm release (`2026.2.26`). Thanks @tdjackey for reporting and @gumadeiras for implementation.
|
||||
- Memory/SQLite: deduplicate concurrent memory-manager initialization and auto-reopen stale SQLite handles after atomic reindex swaps, preventing repeated `attempt to write a readonly database` sync failures until gateway restart.
|
||||
- Config/Plugins entries: treat unknown `plugins.entries.*` ids as startup warnings (ignored stale keys) instead of hard validation failures that can crash-loop gateway boot. Landed from contributor PR #27506 by @Sid-Qin. (#27455)
|
||||
- Telegram native commands: degrade command registration on `BOT_COMMANDS_TOO_MUCH` by retrying with fewer commands instead of crash-looping startup sync. Landed from contributor PR #27512 by @Sid-Qin. (#27456)
|
||||
- Web tools/Proxy: route `web_search` provider HTTP calls (Brave, Perplexity, xAI, Gemini, Kimi), redirect resolution, and `web_fetch` through a shared proxy-aware SSRF guard path so gateway installs behind `HTTP_PROXY`/`HTTPS_PROXY`/`ALL_PROXY` no longer fail with transport `fetch failed` errors. (#27430) thanks @kevinWangSheng.
|
||||
|
||||
81
src/memory/manager.get-concurrency.test.ts
Normal file
81
src/memory/manager.get-concurrency.test.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { getMemorySearchManager, type MemoryIndexManager } from "./index.js";
|
||||
import "./test-runtime-mocks.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
providerCreateCalls: 0,
|
||||
providerDelayMs: 0,
|
||||
}));
|
||||
|
||||
vi.mock("./embeddings.js", () => ({
|
||||
createEmbeddingProvider: async () => {
|
||||
hoisted.providerCreateCalls += 1;
|
||||
if (hoisted.providerDelayMs > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, hoisted.providerDelayMs));
|
||||
}
|
||||
return {
|
||||
requestedProvider: "openai",
|
||||
provider: {
|
||||
id: "mock",
|
||||
model: "mock-embed",
|
||||
maxInputTokens: 8192,
|
||||
embedQuery: async () => [0, 1, 0],
|
||||
embedBatch: async (texts: string[]) => texts.map(() => [0, 1, 0]),
|
||||
},
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
describe("memory manager cache hydration", () => {
|
||||
let workspaceDir = "";
|
||||
|
||||
beforeEach(async () => {
|
||||
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-concurrent-"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(workspaceDir, "MEMORY.md"), "Hello memory.");
|
||||
hoisted.providerCreateCalls = 0;
|
||||
hoisted.providerDelayMs = 50;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("deduplicates concurrent manager creation for the same cache key", async () => {
|
||||
const indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath, vector: { enabled: false } },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from(
|
||||
{ length: 12 },
|
||||
async () => await getMemorySearchManager({ cfg, agentId: "main" }),
|
||||
),
|
||||
);
|
||||
const managers = results
|
||||
.map((result) => result.manager)
|
||||
.filter((manager): manager is MemoryIndexManager => Boolean(manager));
|
||||
|
||||
expect(managers).toHaveLength(12);
|
||||
expect(new Set(managers).size).toBe(1);
|
||||
expect(hoisted.providerCreateCalls).toBe(1);
|
||||
|
||||
await managers[0].close();
|
||||
});
|
||||
});
|
||||
154
src/memory/manager.readonly-recovery.test.ts
Normal file
154
src/memory/manager.readonly-recovery.test.ts
Normal file
@@ -0,0 +1,154 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resetEmbeddingMocks } from "./embedding.test-mocks.js";
|
||||
import type { MemoryIndexManager } from "./index.js";
|
||||
import { getRequiredMemoryIndexManager } from "./test-manager-helpers.js";
|
||||
|
||||
describe("memory manager readonly recovery", () => {
|
||||
let workspaceDir = "";
|
||||
let indexPath = "";
|
||||
let manager: MemoryIndexManager | null = null;
|
||||
|
||||
beforeEach(async () => {
|
||||
resetEmbeddingMocks();
|
||||
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-readonly-"));
|
||||
indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(workspaceDir, "MEMORY.md"), "Hello memory.");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (manager) {
|
||||
await manager.close();
|
||||
manager = null;
|
||||
}
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("reopens sqlite and retries once when sync hits SQLITE_READONLY", async () => {
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
manager = await getRequiredMemoryIndexManager({ cfg, agentId: "main" });
|
||||
|
||||
const runSyncSpy = vi.spyOn(
|
||||
manager as unknown as {
|
||||
runSync: (params?: { reason?: string; force?: boolean }) => Promise<void>;
|
||||
},
|
||||
"runSync",
|
||||
);
|
||||
runSyncSpy
|
||||
.mockRejectedValueOnce(new Error("attempt to write a readonly database"))
|
||||
.mockResolvedValueOnce(undefined);
|
||||
const openDatabaseSpy = vi.spyOn(
|
||||
manager as unknown as { openDatabase: () => DatabaseSync },
|
||||
"openDatabase",
|
||||
);
|
||||
|
||||
await manager.sync({ reason: "test" });
|
||||
|
||||
expect(runSyncSpy).toHaveBeenCalledTimes(2);
|
||||
expect(openDatabaseSpy).toHaveBeenCalledTimes(1);
|
||||
expect(manager.status().custom?.readonlyRecovery).toEqual({
|
||||
attempts: 1,
|
||||
successes: 1,
|
||||
failures: 0,
|
||||
lastError: "attempt to write a readonly database",
|
||||
});
|
||||
});
|
||||
|
||||
it("reopens sqlite and retries when readonly appears in error code", async () => {
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
manager = await getRequiredMemoryIndexManager({ cfg, agentId: "main" });
|
||||
|
||||
const runSyncSpy = vi.spyOn(
|
||||
manager as unknown as {
|
||||
runSync: (params?: { reason?: string; force?: boolean }) => Promise<void>;
|
||||
},
|
||||
"runSync",
|
||||
);
|
||||
runSyncSpy
|
||||
.mockRejectedValueOnce({ message: "write failed", code: "SQLITE_READONLY" })
|
||||
.mockResolvedValueOnce(undefined);
|
||||
const openDatabaseSpy = vi.spyOn(
|
||||
manager as unknown as { openDatabase: () => DatabaseSync },
|
||||
"openDatabase",
|
||||
);
|
||||
|
||||
await manager.sync({ reason: "test" });
|
||||
|
||||
expect(runSyncSpy).toHaveBeenCalledTimes(2);
|
||||
expect(openDatabaseSpy).toHaveBeenCalledTimes(1);
|
||||
expect(manager.status().custom?.readonlyRecovery).toEqual({
|
||||
attempts: 1,
|
||||
successes: 1,
|
||||
failures: 0,
|
||||
lastError: "write failed",
|
||||
});
|
||||
});
|
||||
|
||||
it("does not retry non-readonly sync errors", async () => {
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
manager = await getRequiredMemoryIndexManager({ cfg, agentId: "main" });
|
||||
|
||||
const runSyncSpy = vi.spyOn(
|
||||
manager as unknown as {
|
||||
runSync: (params?: { reason?: string; force?: boolean }) => Promise<void>;
|
||||
},
|
||||
"runSync",
|
||||
);
|
||||
runSyncSpy.mockRejectedValueOnce(new Error("embedding timeout"));
|
||||
const openDatabaseSpy = vi.spyOn(
|
||||
manager as unknown as { openDatabase: () => DatabaseSync },
|
||||
"openDatabase",
|
||||
);
|
||||
|
||||
await expect(manager.sync({ reason: "test" })).rejects.toThrow("embedding timeout");
|
||||
expect(runSyncSpy).toHaveBeenCalledTimes(1);
|
||||
expect(openDatabaseSpy).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
});
|
||||
@@ -39,6 +39,7 @@ const BATCH_FAILURE_LIMIT = 2;
|
||||
const log = createSubsystemLogger("memory");
|
||||
|
||||
const INDEX_CACHE = new Map<string, MemoryIndexManager>();
|
||||
const INDEX_CACHE_PENDING = new Map<string, Promise<MemoryIndexManager>>();
|
||||
|
||||
export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements MemorySearchManager {
|
||||
private readonly cacheKey: string;
|
||||
@@ -99,6 +100,10 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
>();
|
||||
private sessionWarm = new Set<string>();
|
||||
private syncing: Promise<void> | null = null;
|
||||
private readonlyRecoveryAttempts = 0;
|
||||
private readonlyRecoverySuccesses = 0;
|
||||
private readonlyRecoveryFailures = 0;
|
||||
private readonlyRecoveryLastError?: string;
|
||||
|
||||
static async get(params: {
|
||||
cfg: OpenClawConfig;
|
||||
@@ -116,26 +121,44 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const providerResult = await createEmbeddingProvider({
|
||||
config: cfg,
|
||||
agentDir: resolveAgentDir(cfg, agentId),
|
||||
provider: settings.provider,
|
||||
remote: settings.remote,
|
||||
model: settings.model,
|
||||
fallback: settings.fallback,
|
||||
local: settings.local,
|
||||
});
|
||||
const manager = new MemoryIndexManager({
|
||||
cacheKey: key,
|
||||
cfg,
|
||||
agentId,
|
||||
workspaceDir,
|
||||
settings,
|
||||
providerResult,
|
||||
purpose: params.purpose,
|
||||
});
|
||||
INDEX_CACHE.set(key, manager);
|
||||
return manager;
|
||||
const pending = INDEX_CACHE_PENDING.get(key);
|
||||
if (pending) {
|
||||
return pending;
|
||||
}
|
||||
const createPromise = (async () => {
|
||||
const providerResult = await createEmbeddingProvider({
|
||||
config: cfg,
|
||||
agentDir: resolveAgentDir(cfg, agentId),
|
||||
provider: settings.provider,
|
||||
remote: settings.remote,
|
||||
model: settings.model,
|
||||
fallback: settings.fallback,
|
||||
local: settings.local,
|
||||
});
|
||||
const refreshed = INDEX_CACHE.get(key);
|
||||
if (refreshed) {
|
||||
return refreshed;
|
||||
}
|
||||
const manager = new MemoryIndexManager({
|
||||
cacheKey: key,
|
||||
cfg,
|
||||
agentId,
|
||||
workspaceDir,
|
||||
settings,
|
||||
providerResult,
|
||||
purpose: params.purpose,
|
||||
});
|
||||
INDEX_CACHE.set(key, manager);
|
||||
return manager;
|
||||
})();
|
||||
INDEX_CACHE_PENDING.set(key, createPromise);
|
||||
try {
|
||||
return await createPromise;
|
||||
} finally {
|
||||
if (INDEX_CACHE_PENDING.get(key) === createPromise) {
|
||||
INDEX_CACHE_PENDING.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private constructor(params: {
|
||||
@@ -388,12 +411,97 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
if (this.syncing) {
|
||||
return this.syncing;
|
||||
}
|
||||
this.syncing = this.runSync(params).finally(() => {
|
||||
this.syncing = this.runSyncWithReadonlyRecovery(params).finally(() => {
|
||||
this.syncing = null;
|
||||
});
|
||||
return this.syncing ?? Promise.resolve();
|
||||
}
|
||||
|
||||
private isReadonlyDbError(err: unknown): boolean {
|
||||
const readonlyPattern =
|
||||
/attempt to write a readonly database|database is read-only|SQLITE_READONLY/i;
|
||||
const messages = new Set<string>();
|
||||
|
||||
const pushValue = (value: unknown): void => {
|
||||
if (typeof value !== "string") {
|
||||
return;
|
||||
}
|
||||
const normalized = value.trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
messages.add(normalized);
|
||||
};
|
||||
|
||||
pushValue(err instanceof Error ? err.message : String(err));
|
||||
if (err && typeof err === "object") {
|
||||
const record = err as Record<string, unknown>;
|
||||
pushValue(record.message);
|
||||
pushValue(record.code);
|
||||
pushValue(record.name);
|
||||
if (record.cause && typeof record.cause === "object") {
|
||||
const cause = record.cause as Record<string, unknown>;
|
||||
pushValue(cause.message);
|
||||
pushValue(cause.code);
|
||||
pushValue(cause.name);
|
||||
}
|
||||
}
|
||||
|
||||
return [...messages].some((value) => readonlyPattern.test(value));
|
||||
}
|
||||
|
||||
private extractErrorReason(err: unknown): string {
|
||||
if (err instanceof Error && err.message.trim()) {
|
||||
return err.message;
|
||||
}
|
||||
if (err && typeof err === "object") {
|
||||
const record = err as Record<string, unknown>;
|
||||
if (typeof record.message === "string" && record.message.trim()) {
|
||||
return record.message;
|
||||
}
|
||||
if (typeof record.code === "string" && record.code.trim()) {
|
||||
return record.code;
|
||||
}
|
||||
}
|
||||
return String(err);
|
||||
}
|
||||
|
||||
private async runSyncWithReadonlyRecovery(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
await this.runSync(params);
|
||||
return;
|
||||
} catch (err) {
|
||||
if (!this.isReadonlyDbError(err) || this.closed) {
|
||||
throw err;
|
||||
}
|
||||
const reason = this.extractErrorReason(err);
|
||||
this.readonlyRecoveryAttempts += 1;
|
||||
this.readonlyRecoveryLastError = reason;
|
||||
log.warn(`memory sync readonly handle detected; reopening sqlite connection`, { reason });
|
||||
try {
|
||||
this.db.close();
|
||||
} catch {}
|
||||
this.db = this.openDatabase();
|
||||
this.vectorReady = null;
|
||||
this.vector.available = null;
|
||||
this.vector.loadError = undefined;
|
||||
this.ensureSchema();
|
||||
const meta = this.readMeta();
|
||||
this.vector.dims = meta?.vectorDims;
|
||||
try {
|
||||
await this.runSync(params);
|
||||
this.readonlyRecoverySuccesses += 1;
|
||||
} catch (retryErr) {
|
||||
this.readonlyRecoveryFailures += 1;
|
||||
throw retryErr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async readFile(params: {
|
||||
relPath: string;
|
||||
from?: number;
|
||||
@@ -571,6 +679,12 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
custom: {
|
||||
searchMode,
|
||||
providerUnavailableReason: this.providerUnavailableReason,
|
||||
readonlyRecovery: {
|
||||
attempts: this.readonlyRecoveryAttempts,
|
||||
successes: this.readonlyRecoverySuccesses,
|
||||
failures: this.readonlyRecoveryFailures,
|
||||
lastError: this.readonlyRecoveryLastError,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user