Skip to content

Commit 53aa6d3

Browse files
committed
feat: implement Plan 1 - Critical Stability fixes
Wave 1: Storage and Journal Safety - Add frozen cache TTL (1h) and size bounds (50 sessions) - Add pending journal source-aware retention (compaction-only TTL) - Add inter-process file lock with stale recovery - Move processLatestUserMessage to first transform (after isSubAgent guard) Wave 2: Promotion Ownership and Bounded Rejection - Add pendingOwnerSessionID/pendingMessageID metadata - Add owner-aware pending journal clearing - Add explicit/manual bounded retry (max 3 attempts) - Fix session.deleted cleanup idempotency Wave 3: Normalize, Security, and Cache Hardening - Fix load-time write loop (only write on security/migration change) - Add deterministic sort tie-breaker (createdAt -> id) - Add Bearer token redaction - Add processed message cache bounds - Remove priorityWithFreshness dead code Tests: 180 pass, 0 fail
1 parent 034dfe8 commit 53aa6d3

11 files changed

Lines changed: 1345 additions & 110 deletions

src/pending-journal.ts

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { LongTermMemoryEntry, PendingMemoryJournalStore } from "./types.ts";
2+
import { PROMOTION_RETRY_LIMITS } from "./types.ts";
23
import { workspaceKey, workspacePendingJournalPath } from "./paths.ts";
34
import { atomicWriteJSON, readJSON, updateJSON } from "./storage.ts";
45

@@ -42,7 +43,7 @@ function dedupeByText(entries: LongTermMemoryEntry[]): LongTermMemoryEntry[] {
4243
const result: LongTermMemoryEntry[] = [];
4344

4445
for (const entry of entries) {
45-
const key = memoryKey(entry);
46+
const key = `${memoryKey(entry)}\u0000${entry.pendingOwnerSessionID ?? ""}`;
4647
if (seen.has(key)) continue;
4748
seen.add(key);
4849
result.push(entry);
@@ -67,38 +68,34 @@ function entryTime(entry: LongTermMemoryEntry): number {
6768

6869
function isStaleEntry(entry: LongTermMemoryEntry, maxAgeDays: number): boolean {
6970
const time = entryTime(entry);
70-
71-
// If timestamp is 0 (both invalid), treat as stale
71+
72+
// Invalid timestamps are corruption safety and apply to every source.
7273
if (time === 0) return true;
73-
74-
const ageMs = Date.now() - time;
75-
const maxAgeMs = maxAgeDays * 24 * 60 * 60 * 1000;
76-
77-
return ageMs > maxAgeMs;
74+
75+
// TTL policy applies only to compaction candidates. Explicit/manual entries
76+
// represent user intent and should survive age while under the hard cap.
77+
if (entry.source !== "compaction") return false;
78+
79+
return Date.now() - time > maxAgeDays * 86_400_000;
7880
}
7981

8082
function applyRetention(
8183
entries: LongTermMemoryEntry[],
8284
maxEntries: number,
8385
maxAgeDays: number,
8486
): LongTermMemoryEntry[] {
85-
// 1. Dedupe first
8687
const deduped = dedupeByText(entries);
87-
88-
// 2. Remove stale entries
8988
const freshEntries = deduped.filter(entry => !isStaleEntry(entry, maxAgeDays));
90-
91-
// 3. Sort by entryTime descending (newest first) for cap, using updatedAt then createdAt
9289
const sorted = [...freshEntries].sort((a, b) => {
93-
return entryTime(b) - entryTime(a);
90+
const timeDiff = entryTime(b) - entryTime(a);
91+
if (timeDiff !== 0) return timeDiff;
92+
return a.id.localeCompare(b.id);
9493
});
95-
96-
// 4. Keep maxEntries newest
9794
const capped = sorted.slice(0, maxEntries);
98-
99-
// 5. Restore stable order (oldest-to-newest) for consistency with existing code
10095
return capped.sort((a, b) => {
101-
return entryTime(a) - entryTime(b);
96+
const timeDiff = entryTime(a) - entryTime(b);
97+
if (timeDiff !== 0) return timeDiff;
98+
return a.id.localeCompare(b.id);
10299
});
103100
}
104101

@@ -159,13 +156,69 @@ export async function hasPendingJournalEntries(root: string): Promise<boolean> {
159156
return journal.entries.length > 0;
160157
}
161158

162-
export async function clearPendingMemories(root: string, keys?: Set<string>): Promise<void> {
159+
export async function clearPendingMemories(
160+
root: string,
161+
keys?: Set<string>,
162+
options: { ownerSessionID?: string; clearUnowned?: boolean } = {},
163+
): Promise<void> {
163164
await updatePendingJournal(root, store => {
164165
if (!keys || keys.size === 0) {
165166
store.entries = [];
166167
return store;
167168
}
168-
store.entries = store.entries.filter(entry => !keys.has(memoryKey(entry)));
169+
170+
store.entries = store.entries.filter(entry => {
171+
if (!keys.has(memoryKey(entry))) return true;
172+
if (!options.ownerSessionID) return false;
173+
if (entry.pendingOwnerSessionID === options.ownerSessionID) return false;
174+
if (options.clearUnowned && !entry.pendingOwnerSessionID) return false;
175+
return true;
176+
});
169177
return store;
170178
});
171179
}
180+
181+
export async function recordPromotionRejections(
182+
root: string,
183+
keys: Set<string>,
184+
reason: string,
185+
options: { ownerSessionID?: string } = {},
186+
): Promise<Set<string>> {
187+
const exhausted = new Set<string>();
188+
if (keys.size === 0) return exhausted;
189+
190+
await updatePendingJournal(root, store => {
191+
const nowIso = new Date().toISOString();
192+
const exhaustedEntries = new Set<string>();
193+
194+
store.entries = store.entries.map(entry => {
195+
const key = memoryKey(entry);
196+
if (!keys.has(key)) return entry;
197+
if (options.ownerSessionID && entry.pendingOwnerSessionID !== options.ownerSessionID) return entry;
198+
199+
const promotionAttempts = (entry.promotionAttempts ?? 0) + 1;
200+
const max = entry.source === "manual"
201+
? PROMOTION_RETRY_LIMITS.maxManualAttempts
202+
: PROMOTION_RETRY_LIMITS.maxExplicitAttempts;
203+
204+
if (promotionAttempts >= max) {
205+
exhausted.add(key);
206+
exhaustedEntries.add(`${key}\u0000${entry.pendingOwnerSessionID ?? ""}`);
207+
}
208+
209+
return {
210+
...entry,
211+
promotionAttempts,
212+
lastPromotionAttemptAt: nowIso,
213+
lastPromotionFailureReason: reason,
214+
};
215+
});
216+
217+
store.entries = store.entries.filter(entry => (
218+
!exhaustedEntries.has(`${memoryKey(entry)}\u0000${entry.pendingOwnerSessionID ?? ""}`)
219+
));
220+
return store;
221+
});
222+
223+
return exhausted;
224+
}

src/plugin.ts

Lines changed: 119 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import {
4343
hasPendingJournalEntries,
4444
loadPendingJournal,
4545
memoryKey,
46+
recordPromotionRejections,
4647
} from "./pending-journal.ts";
4748
import {
4849
loadSessionState,
@@ -61,6 +62,7 @@ import {
6162
pendingTodos,
6263
} from "./opencode.ts";
6364
import { accountPendingPromotions } from "./promotion-accounting.ts";
65+
import { WORKSPACE_MEMORY_CACHE_LIMITS } from "./types.ts";
6466

6567
/**
6668
* Build the complete compaction prompt.
@@ -203,13 +205,67 @@ export const MemoryV2Plugin: Plugin = async (input) => {
203205
// Cache for processed user message IDs (to avoid duplicate processing)
204206
const processedUserMessages = new Map<string, Set<string>>();
205207

208+
function pruneFrozenWorkspaceMemoryCache(now = Date.now()): void {
209+
for (const [sessionID, cached] of frozenWorkspaceMemoryCache) {
210+
if (now - cached.loadedAt > WORKSPACE_MEMORY_CACHE_LIMITS.frozenTtlMs) {
211+
frozenWorkspaceMemoryCache.delete(sessionID);
212+
}
213+
}
214+
215+
while (frozenWorkspaceMemoryCache.size > WORKSPACE_MEMORY_CACHE_LIMITS.maxFrozenSessions) {
216+
const oldest = [...frozenWorkspaceMemoryCache.entries()]
217+
.sort((a, b) => a[1].loadedAt - b[1].loadedAt)[0]?.[0];
218+
if (!oldest) break;
219+
frozenWorkspaceMemoryCache.delete(oldest);
220+
}
221+
}
222+
223+
function pruneProcessedUserMessagesCache(): void {
224+
for (const [sessionID, messages] of processedUserMessages) {
225+
while (messages.size > WORKSPACE_MEMORY_CACHE_LIMITS.maxProcessedMessagesPerSession) {
226+
const oldest = messages.values().next().value as string | undefined;
227+
if (!oldest) break;
228+
messages.delete(oldest);
229+
}
230+
231+
if (messages.size === 0) {
232+
processedUserMessages.delete(sessionID);
233+
}
234+
}
235+
236+
while (processedUserMessages.size > WORKSPACE_MEMORY_CACHE_LIMITS.maxProcessedSessionIDs) {
237+
const oldestSessionID = processedUserMessages.keys().next().value as string | undefined;
238+
if (!oldestSessionID) break;
239+
processedUserMessages.delete(oldestSessionID);
240+
}
241+
}
242+
243+
function rememberProcessedUserMessage(sessionID: string, messageID: string, processedForSession: Set<string>): void {
244+
processedForSession.add(messageID);
245+
while (processedForSession.size > WORKSPACE_MEMORY_CACHE_LIMITS.maxProcessedMessagesPerSession) {
246+
const oldest = processedForSession.values().next().value as string | undefined;
247+
if (!oldest) break;
248+
processedForSession.delete(oldest);
249+
}
250+
251+
if (processedUserMessages.has(sessionID)) {
252+
processedUserMessages.delete(sessionID);
253+
}
254+
processedUserMessages.set(sessionID, processedForSession);
255+
pruneProcessedUserMessagesCache();
256+
}
257+
206258
async function processLatestUserMessage(sessionID: string): Promise<void> {
207259
const processedForSession = processedUserMessages.get(sessionID) ?? new Set<string>();
208260
const latestMessage = await latestUserText(client, sessionID);
209261

210262
if (!latestMessage?.id || processedForSession.has(latestMessage.id)) return;
211263

212-
const memories = extractExplicitMemories(latestMessage.text);
264+
const memories = extractExplicitMemories(latestMessage.text).map(memory => ({
265+
...memory,
266+
pendingOwnerSessionID: sessionID,
267+
pendingMessageID: latestMessage.id,
268+
}));
213269
const decisions = memories.filter(memory => memory.type === "decision");
214270

215271
if (memories.length > 0) {
@@ -233,19 +289,29 @@ export const MemoryV2Plugin: Plugin = async (input) => {
233289
});
234290
}
235291

236-
processedForSession.add(latestMessage.id);
237-
processedUserMessages.set(sessionID, processedForSession);
292+
rememberProcessedUserMessage(sessionID, latestMessage.id, processedForSession);
238293
}
239294

240-
async function promotePendingMemories(sessionID?: string): Promise<void> {
295+
async function promotePendingMemories(
296+
sessionID?: string,
297+
options: { includeUnownedJournal?: boolean; includeOwnedJournal?: boolean } = {},
298+
): Promise<void> {
299+
const includeUnownedJournal = options.includeUnownedJournal ?? !sessionID;
300+
const includeOwnedJournal = options.includeOwnedJournal ?? Boolean(sessionID);
241301
const [journal, sessionState] = await Promise.all([
242302
loadPendingJournal(directory),
243303
sessionID ? loadSessionState(directory, sessionID) : Promise.resolve(undefined),
244304
]);
245305

306+
const journalPending = journal.entries.filter(memory => {
307+
if (sessionID && includeOwnedJournal && memory.pendingOwnerSessionID === sessionID) return true;
308+
if (includeUnownedJournal && !memory.pendingOwnerSessionID) return true;
309+
return false;
310+
});
311+
246312
const pending = [
247313
...(sessionState?.pendingMemories ?? []),
248-
...journal.entries,
314+
...journalPending,
249315
];
250316
if (pending.length === 0) return;
251317

@@ -277,16 +343,39 @@ export const MemoryV2Plugin: Plugin = async (input) => {
277343
events: updateResult.events,
278344
});
279345

346+
const exhaustedRejectedKeys = await recordPromotionRejections(
347+
directory,
348+
accounting.retryableRejectedKeys,
349+
"rejected_capacity",
350+
{ ownerSessionID: sessionID },
351+
);
352+
353+
const sessionRemovalKeys = new Set([
354+
...accounting.clearableKeys,
355+
...exhaustedRejectedKeys,
356+
]);
357+
280358
if (sessionID) {
281359
await updateSessionState(directory, sessionID, state => {
282-
state.pendingMemories = state.pendingMemories.filter(memory => !accounting.clearableKeys.has(memoryKey(memory)));
360+
state.pendingMemories = state.pendingMemories.filter(memory => {
361+
const key = memoryKey(memory);
362+
if (!sessionRemovalKeys.has(key)) return true;
363+
364+
if (accounting.clearableKeys.has(key)) return false;
365+
if (exhaustedRejectedKeys.has(key)) return false;
366+
367+
return true;
368+
});
283369
return state;
284370
});
285371
clearFrozenWorkspaceMemoryCache(sessionID);
286372
}
287373

288374
if (accounting.clearableKeys.size > 0) {
289-
await clearPendingMemories(directory, accounting.clearableKeys);
375+
await clearPendingMemories(directory, accounting.clearableKeys, {
376+
ownerSessionID: sessionID,
377+
clearUnowned: !sessionID || includeUnownedJournal === true,
378+
});
290379
}
291380
}
292381

@@ -324,6 +413,7 @@ export const MemoryV2Plugin: Plugin = async (input) => {
324413
renderedPrompt: string;
325414
}> {
326415
const now = Date.now();
416+
pruneFrozenWorkspaceMemoryCache(now);
327417
const cached = frozenWorkspaceMemoryCache.get(sessionID);
328418

329419
// Cache is valid for the current session cache epoch.
@@ -336,6 +426,7 @@ export const MemoryV2Plugin: Plugin = async (input) => {
336426
const store = await loadWorkspaceMemory(root);
337427
const renderedPrompt = renderWorkspaceMemory(store);
338428
frozenWorkspaceMemoryCache.set(sessionID, { store, renderedPrompt, loadedAt: now });
429+
pruneFrozenWorkspaceMemoryCache(now);
339430
return { store, renderedPrompt };
340431
}
341432

@@ -357,19 +448,23 @@ export const MemoryV2Plugin: Plugin = async (input) => {
357448
const { sessionID } = hookInput;
358449
if (!sessionID) return;
359450

451+
pruneFrozenWorkspaceMemoryCache();
452+
pruneProcessedUserMessagesCache();
453+
360454
// Sub-agents are short-lived - skip memory system
361455
if (await isSubAgent(sessionID)) return;
362456

363-
// Before first snapshot in this session, promote durable pending memories from
364-
// prior sessions. Keep this before processing latest user text so current-turn
365-
// explicit memory remains pending (not immediately frozen into system[1]).
457+
// Process explicit user memory even on no-tool turns. Keep this after the
458+
// sub-agent guard so child sessions never append to the parent journal.
459+
await processLatestUserMessage(sessionID);
460+
461+
// Before first snapshot in this session, promote durable unowned backlog from
462+
// prior sessions. Current-turn owned explicit memory remains pending and only
463+
// appears in hot state for this transform.
366464
if (!frozenWorkspaceMemoryCache.has(sessionID) && await hasPendingJournalEntries(directory)) {
367-
await promotePendingMemories();
465+
await promotePendingMemories(undefined, { includeUnownedJournal: true, includeOwnedJournal: false });
368466
}
369467

370-
// Process explicit user memory even on no-tool turns.
371-
await processLatestUserMessage(sessionID);
372-
373468
// Get frozen workspace memory snapshot (loaded and rendered once per session)
374469
const workspaceSnapshot = await getFrozenWorkspaceMemorySnapshot(directory, sessionID);
375470

@@ -521,7 +616,7 @@ export const MemoryV2Plugin: Plugin = async (input) => {
521616
}
522617

523618
try {
524-
await promotePendingMemories(sessionID);
619+
await promotePendingMemories(sessionID, { includeUnownedJournal: true });
525620
} catch {
526621
// Keep pending memories in session/journal for retry on next event/session.
527622
}
@@ -532,16 +627,20 @@ export const MemoryV2Plugin: Plugin = async (input) => {
532627
if (sessionID) {
533628
// Promote pending memories before deleting per-session state.
534629
// If promotion fails, leave session state and journal intact.
630+
let promoted = false;
535631
try {
536-
await promotePendingMemories(sessionID);
632+
await promotePendingMemories(sessionID, { includeOwnedJournal: true, includeUnownedJournal: false });
633+
promoted = true;
537634
} catch {
538635
return;
636+
} finally {
637+
if (promoted) {
638+
frozenWorkspaceMemoryCache.delete(sessionID);
639+
processedUserMessages.delete(sessionID);
640+
sessionParentCache.delete(sessionID);
641+
}
539642
}
540643

541-
// Clean up caches
542-
frozenWorkspaceMemoryCache.delete(sessionID);
543-
processedUserMessages.delete(sessionID);
544-
sessionParentCache.delete(sessionID);
545644
await rm(await sessionStatePath(directory, sessionID), { force: true });
546645
}
547646
}

0 commit comments

Comments
 (0)