diff --git a/CHANGELOG.md b/CHANGELOG.md index 8805acc7e..54b3a67cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Gateway/session stores: regenerate the Swift push-test protocol models and align Windows native session-store realpath handling so protocol checks and sync session discovery stop drifting on Windows. (#44266) thanks @jalehman. - Context engine/session routing: forward optional `sessionKey` through context-engine lifecycle calls so plugins can see structured routing metadata during bootstrap, assembly, post-turn ingestion, and compaction. (#44157) thanks @jalehman. - Agents/failover: classify z.ai `network_error` stop reasons as retryable timeouts so provider connectivity failures trigger fallback instead of surfacing raw unhandled-stop-reason errors. (#43884) Thanks @hougangdev. +- Memory/session sync: add mode-aware post-compaction session reindexing with `agents.defaults.compaction.postIndexSync` plus `agents.defaults.memorySearch.sync.sessions.postCompactionForce`, so compacted session memory can refresh immediately without forcing every deployment into synchronous reindexing. (#25561) thanks @rodrigouroz. ## 2026.3.11 diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index 3ffe84fab..b743060f6 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -1106,7 +1106,6 @@ public struct PushTestResult: Codable, Sendable { public let tokensuffix: String public let topic: String public let environment: String - public let transport: String public init( ok: Bool, @@ -1115,8 +1114,7 @@ public struct PushTestResult: Codable, Sendable { reason: String?, tokensuffix: String, topic: String, - environment: String, - transport: String) + environment: String) { self.ok = ok self.status = status @@ -1125,7 +1123,6 @@ public struct PushTestResult: Codable, Sendable { self.tokensuffix = tokensuffix self.topic = topic self.environment = environment - self.transport = transport } private enum CodingKeys: String, CodingKey { @@ -1136,7 +1133,6 @@ public struct PushTestResult: Codable, Sendable { case tokensuffix = "tokenSuffix" case topic case environment - case transport } } diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 3ffe84fab..b743060f6 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -1106,7 +1106,6 @@ public struct PushTestResult: Codable, Sendable { public let tokensuffix: String public let topic: String public let environment: String - public let transport: String public init( ok: Bool, @@ -1115,8 +1114,7 @@ public struct PushTestResult: Codable, Sendable { reason: String?, tokensuffix: String, topic: String, - environment: String, - transport: String) + environment: String) { self.ok = ok self.status = status @@ -1125,7 +1123,6 @@ public struct PushTestResult: Codable, Sendable { self.tokensuffix = tokensuffix self.topic = topic self.environment = environment - self.transport = transport } private enum CodingKeys: String, CodingKey { @@ -1136,7 +1133,6 @@ public struct PushTestResult: Codable, Sendable { case tokensuffix = "tokenSuffix" case topic case environment - case transport } } diff --git a/src/agents/memory-search.test.ts b/src/agents/memory-search.test.ts index 1d04b7303..8b1b4bc34 100644 --- a/src/agents/memory-search.test.ts +++ b/src/agents/memory-search.test.ts @@ -284,6 +284,7 @@ describe("memory search config", () => { expect(resolved?.sync.sessions).toEqual({ deltaBytes: 100000, deltaMessages: 50, + postCompactionForce: true, }); }); diff --git a/src/agents/memory-search.ts b/src/agents/memory-search.ts index d00dae706..1cbc83b77 100644 --- a/src/agents/memory-search.ts +++ b/src/agents/memory-search.ts @@ -61,6 +61,7 @@ export type ResolvedMemorySearchConfig = { sessions: { deltaBytes: number; deltaMessages: number; + postCompactionForce: boolean; }; }; query: { @@ -248,6 +249,10 @@ function mergeConfig( overrides?.sync?.sessions?.deltaMessages ?? defaults?.sync?.sessions?.deltaMessages ?? DEFAULT_SESSION_DELTA_MESSAGES, + postCompactionForce: + overrides?.sync?.sessions?.postCompactionForce ?? + defaults?.sync?.sessions?.postCompactionForce ?? + true, }, }; const query = { @@ -315,6 +320,7 @@ function mergeConfig( ); const deltaBytes = clampInt(sync.sessions.deltaBytes, 0, Number.MAX_SAFE_INTEGER); const deltaMessages = clampInt(sync.sessions.deltaMessages, 0, Number.MAX_SAFE_INTEGER); + const postCompactionForce = sync.sessions.postCompactionForce; return { enabled, sources, @@ -336,6 +342,7 @@ function mergeConfig( sessions: { deltaBytes, deltaMessages, + postCompactionForce, }, }, query: { diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 9292028b5..3e59f14af 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -4,41 +4,67 @@ import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; const { hookRunner, ensureRuntimePluginsLoaded, + resolveContextEngineMock, resolveModelMock, sessionCompactImpl, triggerInternalHook, sanitizeSessionHistoryMock, contextEngineCompactMock, -} = vi.hoisted(() => ({ - hookRunner: { - hasHooks: vi.fn(), - runBeforeCompaction: vi.fn(), - runAfterCompaction: vi.fn(), - }, - ensureRuntimePluginsLoaded: vi.fn(), - resolveModelMock: vi.fn(() => ({ - model: { provider: "openai", api: "responses", id: "fake", input: [] }, - error: null, - authStorage: { setRuntimeApiKey: vi.fn() }, - modelRegistry: {}, - })), - sessionCompactImpl: vi.fn(async () => ({ - summary: "summary", - firstKeptEntryId: "entry-1", - tokensBefore: 120, - details: { ok: true }, - })), - triggerInternalHook: vi.fn(), - sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages), - contextEngineCompactMock: vi.fn(async () => ({ + getMemorySearchManagerMock, + resolveMemorySearchConfigMock, + resolveSessionAgentIdMock, +} = vi.hoisted(() => { + const contextEngineCompactMock = vi.fn(async () => ({ ok: true as boolean, compacted: true as boolean, reason: undefined as string | undefined, result: { summary: "engine-summary", tokensAfter: 50 } as | { summary: string; tokensAfter: number } | undefined, - })), -})); + })); + + return { + hookRunner: { + hasHooks: vi.fn(), + runBeforeCompaction: vi.fn(), + runAfterCompaction: vi.fn(), + }, + ensureRuntimePluginsLoaded: vi.fn(), + resolveContextEngineMock: vi.fn(async () => ({ + info: { ownsCompaction: true }, + compact: contextEngineCompactMock, + })), + resolveModelMock: vi.fn(() => ({ + model: { provider: "openai", api: "responses", id: "fake", input: [] }, + error: null, + authStorage: { setRuntimeApiKey: vi.fn() }, + modelRegistry: {}, + })), + sessionCompactImpl: vi.fn(async () => ({ + summary: "summary", + firstKeptEntryId: "entry-1", + tokensBefore: 120, + details: { ok: true }, + })), + triggerInternalHook: vi.fn(), + sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages), + contextEngineCompactMock, + getMemorySearchManagerMock: vi.fn(async () => ({ + manager: { + sync: vi.fn(async () => {}), + }, + })), + resolveMemorySearchConfigMock: vi.fn(() => ({ + sources: ["sessions"], + sync: { + sessions: { + postCompactionForce: true, + }, + }, + })), + resolveSessionAgentIdMock: vi.fn(() => "main"), + }; +}); vi.mock("../../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: () => hookRunner, @@ -135,10 +161,7 @@ vi.mock("../session-write-lock.js", () => ({ vi.mock("../../context-engine/index.js", () => ({ ensureContextEnginesInitialized: vi.fn(), - resolveContextEngine: vi.fn(async () => ({ - info: { ownsCompaction: true }, - compact: contextEngineCompactMock, - })), + resolveContextEngine: resolveContextEngineMock, })); vi.mock("../../process/command-queue.js", () => ({ @@ -211,9 +234,18 @@ vi.mock("../agent-paths.js", () => ({ })); vi.mock("../agent-scope.js", () => ({ + resolveSessionAgentId: resolveSessionAgentIdMock, resolveSessionAgentIds: vi.fn(() => ({ defaultAgentId: "main", sessionAgentId: "main" })), })); +vi.mock("../memory-search.js", () => ({ + resolveMemorySearchConfig: resolveMemorySearchConfigMock, +})); + +vi.mock("../../memory/index.js", () => ({ + getMemorySearchManager: getMemorySearchManagerMock, +})); + vi.mock("../date-time.js", () => ({ formatUserTime: vi.fn(() => ""), resolveUserTimeFormat: vi.fn(() => ""), @@ -314,6 +346,23 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => { return params.messages; }); + getMemorySearchManagerMock.mockReset(); + getMemorySearchManagerMock.mockResolvedValue({ + manager: { + sync: vi.fn(async () => {}), + }, + }); + resolveMemorySearchConfigMock.mockReset(); + resolveMemorySearchConfigMock.mockReturnValue({ + sources: ["sessions"], + sync: { + sessions: { + postCompactionForce: true, + }, + }, + }); + resolveSessionAgentIdMock.mockReset(); + resolveSessionAgentIdMock.mockReturnValue("main"); unregisterApiProviders(getCustomApiRegistrySourceId("ollama")); }); @@ -452,6 +501,161 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } }); + it("skips sync in await mode when postCompactionForce is false", async () => { + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + resolveMemorySearchConfigMock.mockReturnValue({ + sources: ["sessions"], + sync: { + sessions: { + postCompactionForce: false, + }, + }, + }); + + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(resolveSessionAgentIdMock).toHaveBeenCalledWith({ + sessionKey: "agent:main:session-1", + config: expect.any(Object), + }); + expect(getMemorySearchManagerMock).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); + }); + + it("awaits post-compaction memory sync in await mode when postCompactionForce is true", async () => { + let releaseSync: (() => void) | undefined; + const syncGate = new Promise((resolve) => { + releaseSync = resolve; + }); + const sync = vi.fn(() => syncGate); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + let settled = false; + + const resultPromise = compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + void resultPromise.then(() => { + settled = true; + }); + await vi.waitFor(() => { + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + sessionFiles: ["/tmp/session.jsonl"], + }); + }); + expect(settled).toBe(false); + releaseSync?.(); + const result = await resultPromise; + expect(result.ok).toBe(true); + expect(settled).toBe(true); + }); + + it("skips post-compaction memory sync when the mode is off", async () => { + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "off", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(resolveSessionAgentIdMock).not.toHaveBeenCalled(); + expect(getMemorySearchManagerMock).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); + }); + + it("fires post-compaction memory sync without awaiting it in async mode", async () => { + const sync = vi.fn(async () => {}); + let resolveManager: ((value: { manager: { sync: typeof sync } }) => void) | undefined; + const managerGate = new Promise<{ manager: { sync: typeof sync } }>((resolve) => { + resolveManager = resolve; + }); + getMemorySearchManagerMock.mockImplementation(() => managerGate); + let settled = false; + + const resultPromise = compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "async", + }, + }, + }, + } as never, + }); + + await vi.waitFor(() => { + expect(getMemorySearchManagerMock).toHaveBeenCalledTimes(1); + }); + void resultPromise.then(() => { + settled = true; + }); + await vi.waitFor(() => { + expect(settled).toBe(true); + }); + expect(sync).not.toHaveBeenCalled(); + resolveManager?.({ manager: { sync } }); + await managerGate; + await vi.waitFor(() => { + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + sessionFiles: ["/tmp/session.jsonl"], + }); + }); + const result = await resultPromise; + expect(result.ok).toBe(true); + }); + it("registers the Ollama api provider before compaction", async () => { resolveModelMock.mockReturnValue({ model: { @@ -493,6 +697,11 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { hookRunner.hasHooks.mockReset(); hookRunner.runBeforeCompaction.mockReset(); hookRunner.runAfterCompaction.mockReset(); + resolveContextEngineMock.mockReset(); + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: true }, + compact: contextEngineCompactMock, + }); contextEngineCompactMock.mockReset(); contextEngineCompactMock.mockResolvedValue({ ok: true, @@ -546,8 +755,47 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { ); }); + it("emits a transcript update and post-compaction memory sync on the engine-owned path", async () => { + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + + try { + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: " /tmp/session.jsonl ", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(listener).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" }); + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + sessionFiles: ["/tmp/session.jsonl"], + }); + } finally { + cleanup(); + } + }); + it("does not fire after_compaction when compaction fails", async () => { hookRunner.hasHooks.mockReturnValue(true); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); contextEngineCompactMock.mockResolvedValue({ ok: false, compacted: false, @@ -567,6 +815,44 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { expect(result.ok).toBe(false); expect(hookRunner.runBeforeCompaction).toHaveBeenCalled(); expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); + }); + + it("does not duplicate transcript updates or sync in the wrapper when the engine delegates compaction", async () => { + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: false }, + compact: contextEngineCompactMock, + }); + + try { + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(listener).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); + } finally { + cleanup(); + } }); it("catches and logs hook exceptions without aborting compaction", async () => { diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index f1eea7216..1207a0c3b 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -18,6 +18,7 @@ import { import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; +import { getMemorySearchManager } from "../../memory/index.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js"; @@ -30,7 +31,7 @@ import { resolveUserPath } from "../../utils.js"; import { normalizeMessageChannel } from "../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { resolveOpenClawAgentDir } from "../agent-paths.js"; -import { resolveSessionAgentIds } from "../agent-scope.js"; +import { resolveSessionAgentId, resolveSessionAgentIds } from "../agent-scope.js"; import type { ExecElevatedDefaults } from "../bash-tools.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js"; @@ -39,6 +40,7 @@ import { ensureCustomApiRegistered } from "../custom-api-registry.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; +import { resolveMemorySearchConfig } from "../memory-search.js"; import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js"; import { supportsModelTools } from "../model-tool-support.js"; import { ensureOpenClawModelsJson } from "../models-config.js"; @@ -268,6 +270,95 @@ function classifyCompactionReason(reason?: string): string { return "unknown"; } +function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" { + const mode = config?.agents?.defaults?.compaction?.postIndexSync; + if (mode === "off" || mode === "async" || mode === "await") { + return mode; + } + return "async"; +} + +async function runPostCompactionSessionMemorySync(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; +}): Promise { + if (!params.config) { + return; + } + try { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } + const agentId = resolveSessionAgentId({ + sessionKey: params.sessionKey, + config: params.config, + }); + const resolvedMemory = resolveMemorySearchConfig(params.config, agentId); + if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) { + return; + } + if (!resolvedMemory.sync.sessions.postCompactionForce) { + return; + } + const { manager } = await getMemorySearchManager({ + cfg: params.config, + agentId, + }); + if (!manager?.sync) { + return; + } + const syncTask = manager.sync({ + reason: "post-compaction", + sessionFiles: [sessionFile], + }); + await syncTask; + } catch (err) { + log.warn(`memory sync skipped (post-compaction): ${String(err)}`); + } +} + +function syncPostCompactionSessionMemory(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; + mode: "off" | "async" | "await"; +}): Promise { + if (params.mode === "off" || !params.config) { + return Promise.resolve(); + } + + const syncTask = runPostCompactionSessionMemorySync({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + if (params.mode === "await") { + return syncTask; + } + void syncTask; + return Promise.resolve(); +} + +async function runPostCompactionSideEffects(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; +}): Promise { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } + emitSessionTranscriptUpdate(sessionFile); + await syncPostCompactionSessionMemory({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile, + mode: resolvePostCompactionIndexSyncMode(params.config), + }); +} + /** * Core compaction logic without lane queueing. * Use this when already inside a session/global lane to avoid deadlocks. @@ -809,7 +900,11 @@ export async function compactEmbeddedPiSessionDirect( const result = await compactWithSafetyTimeout(() => session.compact(params.customInstructions), ); - emitSessionTranscriptUpdate(params.sessionFile); + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); // Estimate tokens after compaction by summing token estimates for remaining messages let tokensAfter: number | undefined; try { @@ -999,6 +1094,13 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext: params as Record, }); + if (engineOwnsCompaction && result.ok && result.compacted) { + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + } if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) { try { await hookRunner.runAfterCompaction( diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 1fdaae873..9c4512575 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -930,6 +930,8 @@ export const FIELD_HELP: Record = { "Requires at least this many newly appended bytes before session transcript changes trigger reindex (default: 100000). Increase to reduce frequent small reindexes, or lower for faster transcript freshness.", "agents.defaults.memorySearch.sync.sessions.deltaMessages": "Requires at least this many appended transcript messages before reindex is triggered (default: 50). Lower this for near-real-time transcript recall, or raise it to reduce indexing churn.", + "agents.defaults.memorySearch.sync.sessions.postCompactionForce": + "Forces a session memory-search reindex after compaction-triggered transcript updates (default: true). Keep enabled when compacted summaries must be immediately searchable, or disable to reduce write-time indexing pressure.", ui: "UI presentation settings for accenting and assistant identity shown in control surfaces. Use this for branding and readability customization without changing runtime behavior.", "ui.seamColor": "Primary accent/seam color used by UI surfaces for emphasis, badges, and visual identity cues. Use high-contrast values that remain readable across light/dark themes.", @@ -1033,6 +1035,8 @@ export const FIELD_HELP: Record = { "Enables summary quality audits and regeneration retries for safeguard compaction. Default: false, so safeguard mode alone does not turn on retry behavior.", "agents.defaults.compaction.qualityGuard.maxRetries": "Maximum number of regeneration retries after a failed safeguard summary quality audit. Use small values to bound extra latency and token cost.", + "agents.defaults.compaction.postIndexSync": + 'Controls post-compaction session memory reindex mode: "off", "async", or "await" (default: "async"). Use "await" for strongest freshness, "async" for lower compaction latency, and "off" only when session-memory sync is handled elsewhere.', "agents.defaults.compaction.postCompactionSections": 'AGENTS.md H2/H3 section names re-injected after compaction so the agent reruns critical startup guidance. Leave unset to use "Session Startup"/"Red Lines" with legacy fallback to "Every Session"/"Safety"; set to [] to disable reinjection entirely.', "agents.defaults.compaction.model": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 256d3c1dd..6aa2ae40e 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -354,6 +354,8 @@ export const FIELD_LABELS: Record = { "agents.defaults.memorySearch.sync.watchDebounceMs": "Memory Watch Debounce (ms)", "agents.defaults.memorySearch.sync.sessions.deltaBytes": "Session Delta Bytes", "agents.defaults.memorySearch.sync.sessions.deltaMessages": "Session Delta Messages", + "agents.defaults.memorySearch.sync.sessions.postCompactionForce": + "Force Reindex After Compaction", "agents.defaults.memorySearch.query.maxResults": "Memory Search Max Results", "agents.defaults.memorySearch.query.minScore": "Memory Search Min Score", "agents.defaults.memorySearch.query.hybrid.enabled": "Memory Search Hybrid", @@ -468,6 +470,7 @@ export const FIELD_LABELS: Record = { "agents.defaults.compaction.qualityGuard": "Compaction Quality Guard", "agents.defaults.compaction.qualityGuard.enabled": "Compaction Quality Guard Enabled", "agents.defaults.compaction.qualityGuard.maxRetries": "Compaction Quality Guard Max Retries", + "agents.defaults.compaction.postIndexSync": "Compaction Post-Index Sync", "agents.defaults.compaction.postCompactionSections": "Post-Compaction Context Sections", "agents.defaults.compaction.model": "Compaction Model Override", "agents.defaults.compaction.memoryFlush": "Compaction Memory Flush", diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 5abaab2c1..11d1809c8 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -287,6 +287,7 @@ export type AgentDefaultsConfig = { }; export type AgentCompactionMode = "default" | "safeguard"; +export type AgentCompactionPostIndexSyncMode = "off" | "async" | "await"; export type AgentCompactionIdentifierPolicy = "strict" | "off" | "custom"; export type AgentCompactionQualityGuardConfig = { /** Enable compaction summary quality audits and regeneration retries. Default: false. */ @@ -314,6 +315,8 @@ export type AgentCompactionConfig = { identifierInstructions?: string; /** Optional quality-audit retries for safeguard compaction summaries. */ qualityGuard?: AgentCompactionQualityGuardConfig; + /** Post-compaction session memory index sync mode. */ + postIndexSync?: AgentCompactionPostIndexSyncMode; /** Pre-compaction memory flush (agentic turn). Default: enabled. */ memoryFlush?: AgentCompactionMemoryFlushConfig; /** diff --git a/src/config/types.tools.ts b/src/config/types.tools.ts index aaf6cb33e..43d39285b 100644 --- a/src/config/types.tools.ts +++ b/src/config/types.tools.ts @@ -402,6 +402,8 @@ export type MemorySearchConfig = { deltaBytes?: number; /** Minimum appended JSONL lines before session transcripts are reindexed. */ deltaMessages?: number; + /** Force session reindex after compaction-triggered transcript updates (default: true). */ + postCompactionForce?: boolean; }; }; /** Query behavior. */ diff --git a/src/config/zod-schema.agent-defaults.ts b/src/config/zod-schema.agent-defaults.ts index 242d69597..02148736e 100644 --- a/src/config/zod-schema.agent-defaults.ts +++ b/src/config/zod-schema.agent-defaults.ts @@ -103,6 +103,7 @@ export const AgentDefaultsSchema = z }) .strict() .optional(), + postIndexSync: z.enum(["off", "async", "await"]).optional(), postCompactionSections: z.array(z.string()).optional(), model: z.string().optional(), memoryFlush: z diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index d5b9eeedb..28c7cfaab 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -649,6 +649,7 @@ export const MemorySearchSchema = z .object({ deltaBytes: z.number().int().nonnegative().optional(), deltaMessages: z.number().int().nonnegative().optional(), + postCompactionForce: z.boolean().optional(), }) .strict() .optional(), diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 23371056b..dcb0b0610 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -461,6 +461,391 @@ describe("memory index", () => { } }); + it("targets explicit session files during post-compaction sync", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const firstSessionPath = path.join(sessionDir, "targeted-first.jsonl"); + const secondSessionPath = path.join(sessionDir, "targeted-second.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] }, + })}\n`, + ); + + try { + const result = await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }); + const manager = requireManager(result); + await manager.sync?.({ reason: "test" }); + + const db = ( + manager as unknown as { + db: { + prepare: (sql: string) => { + get: (path: string, source: string) => { hash: string } | undefined; + }; + }; + } + ).db; + const getSessionHash = (sessionPath: string) => + db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(sessionPath, "sessions")?.hash; + + const firstOriginalHash = getSessionHash("sessions/targeted-first.jsonl"); + const secondOriginalHash = getSessionHash("sessions/targeted-second.jsonl"); + + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "first transcript v2 after compaction" }], + }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "second transcript v2 should stay untouched" }], + }, + })}\n`, + ); + + await manager.sync?.({ + reason: "post-compaction", + sessionFiles: [firstSessionPath], + }); + + expect(getSessionHash("sessions/targeted-first.jsonl")).not.toBe(firstOriginalHash); + expect(getSessionHash("sessions/targeted-second.jsonl")).toBe(secondOriginalHash); + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + } + }); + + it("preserves unrelated dirty sessions after targeted post-compaction sync", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-dirty-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const firstSessionPath = path.join(sessionDir, "targeted-dirty-first.jsonl"); + const secondSessionPath = path.join(sessionDir, "targeted-dirty-second.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-dirty-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] }, + })}\n`, + ); + + try { + const manager = requireManager( + await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }), + ); + await manager.sync({ reason: "test" }); + + const db = ( + manager as unknown as { + db: { + prepare: (sql: string) => { + get: (path: string, source: string) => { hash: string } | undefined; + }; + }; + } + ).db; + const getSessionHash = (sessionPath: string) => + db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(sessionPath, "sessions")?.hash; + + const firstOriginalHash = getSessionHash("sessions/targeted-dirty-first.jsonl"); + const secondOriginalHash = getSessionHash("sessions/targeted-dirty-second.jsonl"); + + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "first transcript v2 after compaction" }], + }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "second transcript v2 still pending" }], + }, + })}\n`, + ); + + const internal = manager as unknown as { + sessionsDirty: boolean; + sessionsDirtyFiles: Set; + }; + internal.sessionsDirty = true; + internal.sessionsDirtyFiles.add(secondSessionPath); + + await manager.sync({ + reason: "post-compaction", + sessionFiles: [firstSessionPath], + }); + + expect(getSessionHash("sessions/targeted-dirty-first.jsonl")).not.toBe(firstOriginalHash); + expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).toBe(secondOriginalHash); + expect(internal.sessionsDirtyFiles.has(secondSessionPath)).toBe(true); + expect(internal.sessionsDirty).toBe(true); + + await manager.sync({ reason: "test" }); + + expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).not.toBe(secondOriginalHash); + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { force: true }); + } + }); + + it("queues targeted session sync when another sync is already in progress", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-queued-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const sessionPath = path.join(sessionDir, "targeted-queued.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-queued-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + sessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "queued transcript v1" }] }, + })}\n`, + ); + + try { + const manager = requireManager( + await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }), + ); + await manager.sync({ reason: "test" }); + + const db = ( + manager as unknown as { + db: { + prepare: (sql: string) => { + get: (path: string, source: string) => { hash: string } | undefined; + }; + }; + } + ).db; + const getSessionHash = (sessionRelPath: string) => + db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(sessionRelPath, "sessions")?.hash; + const originalHash = getSessionHash("sessions/targeted-queued.jsonl"); + + const internal = manager as unknown as { + runSyncWithReadonlyRecovery: (params?: { + reason?: string; + sessionFiles?: string[]; + }) => Promise; + }; + const originalRunSync = internal.runSyncWithReadonlyRecovery.bind(manager); + let releaseBusySync: (() => void) | undefined; + const busyGate = new Promise((resolve) => { + releaseBusySync = resolve; + }); + internal.runSyncWithReadonlyRecovery = async (params) => { + if (params?.reason === "busy-sync") { + await busyGate; + } + return await originalRunSync(params); + }; + + const busySyncPromise = manager.sync({ reason: "busy-sync" }); + await fs.writeFile( + sessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "queued transcript v2 after compaction" }], + }, + })}\n`, + ); + + const targetedSyncPromise = manager.sync({ + reason: "post-compaction", + sessionFiles: [sessionPath], + }); + + releaseBusySync?.(); + await Promise.all([busySyncPromise, targetedSyncPromise]); + + expect(getSessionHash("sessions/targeted-queued.jsonl")).not.toBe(originalHash); + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { force: true }); + } + }); + + it("runs a full reindex after fallback activates during targeted sync", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-fallback-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const sessionPath = path.join(sessionDir, "targeted-fallback.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-fallback-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + sessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "fallback transcript v1" }] }, + })}\n`, + ); + + try { + const manager = requireManager( + await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }), + ); + await manager.sync({ reason: "test" }); + + const internal = manager as unknown as { + syncSessionFiles: (params: { + targetSessionFiles?: string[]; + needsFullReindex: boolean; + }) => Promise; + shouldFallbackOnError: (message: string) => boolean; + activateFallbackProvider: (reason: string) => Promise; + runUnsafeReindex: (params: { + reason?: string; + force?: boolean; + progress?: unknown; + }) => Promise; + }; + const originalSyncSessionFiles = internal.syncSessionFiles.bind(manager); + const originalShouldFallbackOnError = internal.shouldFallbackOnError.bind(manager); + const originalActivateFallbackProvider = internal.activateFallbackProvider.bind(manager); + const originalRunUnsafeReindex = internal.runUnsafeReindex.bind(manager); + + internal.syncSessionFiles = async (params) => { + if (params.targetSessionFiles?.length) { + throw new Error("embedding backend failed"); + } + return await originalSyncSessionFiles(params); + }; + internal.shouldFallbackOnError = () => true; + const activateFallbackProvider = vi.fn(async () => true); + internal.activateFallbackProvider = activateFallbackProvider; + const runUnsafeReindex = vi.fn(async () => {}); + internal.runUnsafeReindex = runUnsafeReindex; + + await manager.sync({ + reason: "post-compaction", + sessionFiles: [sessionPath], + }); + + expect(activateFallbackProvider).toHaveBeenCalledWith("embedding backend failed"); + expect(runUnsafeReindex).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + progress: undefined, + }); + + internal.syncSessionFiles = originalSyncSessionFiles; + internal.shouldFallbackOnError = originalShouldFallbackOnError; + internal.activateFallbackProvider = originalActivateFallbackProvider; + internal.runUnsafeReindex = originalRunUnsafeReindex; + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { force: true }); + } + }); + it("reindexes when the embedding model changes", async () => { const base = createCfg({ storePath: indexModelPath }); const baseAgents = base.agents!; diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index 6fd3e6bb9..6babe9317 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -151,6 +151,8 @@ export abstract class MemoryManagerSyncOps { protected abstract sync(params?: { reason?: string; force?: boolean; + forceSessions?: boolean; + sessionFile?: string; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise; protected abstract withTimeout( @@ -611,6 +613,35 @@ export abstract class MemoryManagerSyncOps { return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); } + private normalizeTargetSessionFiles(sessionFiles?: string[]): Set | null { + if (!sessionFiles || sessionFiles.length === 0) { + return null; + } + const normalized = new Set(); + for (const sessionFile of sessionFiles) { + const trimmed = sessionFile.trim(); + if (!trimmed) { + continue; + } + const resolved = path.resolve(trimmed); + if (this.isSessionFileForAgent(resolved)) { + normalized.add(resolved); + } + } + return normalized.size > 0 ? normalized : null; + } + + private clearSyncedSessionFiles(targetSessionFiles?: Iterable | null) { + if (!targetSessionFiles) { + this.sessionsDirtyFiles.clear(); + } else { + for (const targetSessionFile of targetSessionFiles) { + this.sessionsDirtyFiles.delete(targetSessionFile); + } + } + this.sessionsDirty = this.sessionsDirtyFiles.size > 0; + } + protected ensureIntervalSync() { const minutes = this.settings.sync.intervalMinutes; if (!minutes || minutes <= 0 || this.intervalTimer) { @@ -640,12 +671,15 @@ export abstract class MemoryManagerSyncOps { } private shouldSyncSessions( - params?: { reason?: string; force?: boolean }, + params?: { reason?: string; force?: boolean; sessionFiles?: string[] }, needsFullReindex = false, ) { if (!this.sources.has("sessions")) { return false; } + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + return true; + } if (params?.force) { return true; } @@ -752,6 +786,7 @@ export abstract class MemoryManagerSyncOps { private async syncSessionFiles(params: { needsFullReindex: boolean; + targetSessionFiles?: string[]; progress?: MemorySyncProgressState; }) { // FTS-only mode: skip embedding sync (no provider) @@ -760,13 +795,22 @@ export abstract class MemoryManagerSyncOps { return; } - const files = await listSessionFilesForAgent(this.agentId); - const activePaths = new Set(files.map((file) => sessionPathForFile(file))); - const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; + const targetSessionFiles = params.needsFullReindex + ? null + : this.normalizeTargetSessionFiles(params.targetSessionFiles); + const files = targetSessionFiles + ? Array.from(targetSessionFiles) + : await listSessionFilesForAgent(this.agentId); + const activePaths = targetSessionFiles + ? null + : new Set(files.map((file) => sessionPathForFile(file))); + const indexAll = + params.needsFullReindex || Boolean(targetSessionFiles) || this.sessionsDirtyFiles.size === 0; log.debug("memory sync: indexing session files", { files: files.length, indexAll, dirtyFiles: this.sessionsDirtyFiles.size, + targetedFiles: targetSessionFiles?.size ?? 0, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); @@ -827,6 +871,12 @@ export abstract class MemoryManagerSyncOps { }); await runWithConcurrency(tasks, this.getIndexConcurrency()); + if (activePaths === null) { + // Targeted syncs only refresh the requested transcripts and should not + // prune unrelated session rows without a full directory enumeration. + return; + } + const staleRows = this.db .prepare(`SELECT path FROM files WHERE source = ?`) .all("sessions") as Array<{ path: string }>; @@ -885,6 +935,7 @@ export abstract class MemoryManagerSyncOps { protected async runSync(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }) { const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; @@ -899,8 +950,47 @@ export abstract class MemoryManagerSyncOps { const meta = this.readMeta(); const configuredSources = this.resolveConfiguredSourcesForMeta(); const configuredScopeHash = this.resolveConfiguredScopeHash(); + const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles); + const hasTargetSessionFiles = targetSessionFiles !== null; + if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) { + // Post-compaction refreshes should only update the explicit transcript files and + // leave broader reindex/dirty-work decisions to the regular sync path. + try { + await this.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: Array.from(targetSessionFiles), + progress: progress ?? undefined, + }); + this.clearSyncedSessionFiles(targetSessionFiles); + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); + if (activated) { + if ( + process.env.OPENCLAW_TEST_FAST === "1" && + process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1" + ) { + await this.runUnsafeReindex({ + reason: params?.reason, + force: true, + progress: progress ?? undefined, + }); + } else { + await this.runSafeReindex({ + reason: params?.reason, + force: true, + progress: progress ?? undefined, + }); + } + return; + } + throw err; + } + return; + } const needsFullReindex = - params?.force || + (params?.force && !hasTargetSessionFiles) || !meta || (this.provider && meta.model !== this.provider.model) || (this.provider && meta.provider !== this.provider.id) || @@ -932,7 +1022,8 @@ export abstract class MemoryManagerSyncOps { } const shouldSyncMemory = - this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); + this.sources.has("memory") && + ((!hasTargetSessionFiles && params?.force) || needsFullReindex || this.dirty); const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); if (shouldSyncMemory) { @@ -941,7 +1032,11 @@ export abstract class MemoryManagerSyncOps { } if (shouldSyncSessions) { - await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); + await this.syncSessionFiles({ + needsFullReindex, + targetSessionFiles: targetSessionFiles ? Array.from(targetSessionFiles) : undefined, + progress: progress ?? undefined, + }); this.sessionsDirty = false; this.sessionsDirtyFiles.clear(); } else if (this.sessionsDirtyFiles.size > 0) { diff --git a/src/memory/manager.ts b/src/memory/manager.ts index e79f83c57..61e2cd71a 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -125,6 +125,8 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem >(); private sessionWarm = new Set(); private syncing: Promise | null = null; + private queuedSessionFiles = new Set(); + private queuedSessionSync: Promise | null = null; private readonlyRecoveryAttempts = 0; private readonlyRecoverySuccesses = 0; private readonlyRecoveryFailures = 0; @@ -452,12 +454,16 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem async sync(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { if (this.closed) { return; } if (this.syncing) { + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + return this.enqueueTargetedSessionSync(params.sessionFiles); + } return this.syncing; } this.syncing = this.runSyncWithReadonlyRecovery(params).finally(() => { @@ -466,6 +472,36 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem return this.syncing ?? Promise.resolve(); } + private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise { + for (const sessionFile of sessionFiles ?? []) { + const trimmed = sessionFile.trim(); + if (trimmed) { + this.queuedSessionFiles.add(trimmed); + } + } + if (this.queuedSessionFiles.size === 0) { + return this.syncing ?? Promise.resolve(); + } + if (!this.queuedSessionSync) { + this.queuedSessionSync = (async () => { + try { + await this.syncing?.catch(() => undefined); + while (!this.closed && this.queuedSessionFiles.size > 0) { + const queuedSessionFiles = Array.from(this.queuedSessionFiles); + this.queuedSessionFiles.clear(); + await this.sync({ + reason: "queued-session-files", + sessionFiles: queuedSessionFiles, + }); + } + } finally { + this.queuedSessionSync = null; + } + })(); + } + return this.queuedSessionSync; + } + private isReadonlyDbError(err: unknown): boolean { const readonlyPattern = /attempt to write a readonly database|database is read-only|SQLITE_READONLY/i; @@ -518,6 +554,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem private async runSyncWithReadonlyRecovery(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { try { diff --git a/src/memory/qmd-manager.ts b/src/memory/qmd-manager.ts index 7efe8f10a..986d526e0 100644 --- a/src/memory/qmd-manager.ts +++ b/src/memory/qmd-manager.ts @@ -867,8 +867,12 @@ export class QmdMemoryManager implements MemorySearchManager { async sync(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + log.debug("qmd sync ignoring targeted sessionFiles hint; running regular update"); + } if (params?.progress) { params.progress({ completed: 0, total: 1, label: "Updating QMD index…" }); } diff --git a/src/memory/search-manager.ts b/src/memory/search-manager.ts index ea581b5d6..6cc8d9f20 100644 --- a/src/memory/search-manager.ts +++ b/src/memory/search-manager.ts @@ -181,6 +181,7 @@ class FallbackMemoryManager implements MemorySearchManager { async sync(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }) { if (!this.primaryFailed) { diff --git a/src/memory/types.ts b/src/memory/types.ts index 287ee6ac5..880384df7 100644 --- a/src/memory/types.ts +++ b/src/memory/types.ts @@ -72,6 +72,7 @@ export interface MemorySearchManager { sync?(params?: { reason?: string; force?: boolean; + sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise; probeEmbeddingAvailability(): Promise;