Skip to content
51 changes: 44 additions & 7 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,7 @@ export function createMemoryCore(
if (nowMs - lastDirtyClosedScan < 30_000) return;
lastDirtyClosedScan = nowMs;
try {
const dirtyClosed = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
const dirtyClosed = collectDirtyClosedEpisodes();
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
Expand Down Expand Up @@ -912,9 +910,7 @@ export function createMemoryCore(
await recoverOpenEpisodesAsSessionEnd(stale);
}
}
const dirtyClosed = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
const dirtyClosed = collectDirtyClosedEpisodes();
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
Expand Down Expand Up @@ -1186,7 +1182,15 @@ export function createMemoryCore(
continue;
}

const snapshot = snapshotFromRecoveredEpisode(ep, endedAt);
// Pre-stamp before emitting finalized: if the watchdog fires mid-scoring,
// the next startup's condition-4 check will see DIRTY_REWARD_RESCORE and
// skip this episode rather than looping indefinitely.
handle.repos.episodes.updateMeta(episodeId, {
recoveryReason: "dirty_reward_rescore",
});
const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, {
recoveryReason: "dirty_reward_rescore",
});
debugStartupRecovery("H3", "startup_recovery_emit_finalized", {
episodeId,
sessionId: ep.sessionId,
Expand Down Expand Up @@ -1252,6 +1256,7 @@ export function createMemoryCore(
episodes: Array<EpisodeRow & { meta?: Record<string, unknown> }>,
): Promise<void> {
log.info("init.dirty_closed_episodes.rescore", { count: episodes.length });
const rescored: EpisodeId[] = [];
for (const ep of episodes) {
if (isLightweightEpisode(ep)) continue;
const episodeId = ep.id as EpisodeId;
Expand All @@ -1260,6 +1265,7 @@ export function createMemoryCore(
closeReason: "finalized",
recoveredAtStartup: endedAt,
recoveryReason: "dirty_reward_rescore",
rewardDirty: undefined,
});
const snapshot = snapshotFromRecoveredEpisode(ep, endedAt, {
recoveryReason: "dirty_reward_rescore",
Expand All @@ -1269,10 +1275,36 @@ export function createMemoryCore(
episode: snapshot,
closedBy: "finalized",
});
rescored.push(episodeId);
}
// Drain the capture pass (patches reflections + α onto existing traces).
await handle.flush();
// In lightweight mode flush() returns before draining the reward
// subscriber. Explicitly run reward for any episode whose trace count
// still mismatches — mirrors the pattern in recoverOpenEpisodesAsSessionEnd.
for (const episodeId of rescored) {
if (episodeRewardIsDirty(handle.repos.episodes.getById(episodeId) ?? {} as never)) {
await handle.rewardRunner.run({ episodeId, feedback: [], trigger: "manual" });
}
}
await handle.flush();
}

function collectDirtyClosedEpisodes(): (EpisodeRow & { meta?: Record<string, unknown> })[] {
const dirty: (EpisodeRow & { meta?: Record<string, unknown> })[] = [];
let offset = 0;
const pageSize = 500;
while (true) {
const page = handle.repos.episodes.list({ status: "closed", limit: pageSize, offset });
for (const ep of page) {
if (episodeRewardIsDirty(ep)) dirty.push(ep);
}
if (page.length < pageSize) break;
offset += pageSize;
}
return dirty;
}

function episodeRewardIsDirty(ep: EpisodeRow & { meta?: Record<string, unknown> }): boolean {
const meta = ep.meta ?? {};
if (meta.lightweightMemory === true) return false;
Expand All @@ -1285,6 +1317,11 @@ export function createMemoryCore(
if (
ep.rTask == null &&
(ep.traceIds?.length ?? 0) > 0 &&
// Episodes already attempted by a recovery path carry recoveryReason "dirty_reward_rescore".
// Excluding them prevents a crash-respawn loop when the watchdog fires
// mid-scoring and leaves rTask null: without this guard the next startup
// would re-pick the episode via closeReason="finalized" indefinitely.
meta.recoveryReason !== "dirty_reward_rescore" &&
(meta.closeReason === "finalized" ||
meta.closeReason === "abandoned" ||
meta.recoveryReason === "missed_session_end")
Expand Down
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/core/reward/reward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner {
try {
const existingMeta = episode.meta ?? {};
const wasFinalized = existingMeta.closeReason === "finalized";
deps.episodesRepo.setRTask(input.episodeId, 0);
deps.episodesRepo.updateMeta(input.episodeId, {
...(wasFinalized ? {} : { closeReason: "abandoned", abandonReason: skipReason }),
reward: {
Expand All @@ -128,6 +129,7 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner {
trigger: input.trigger,
skipped: true,
},
rewardDirty: undefined,
});
} catch (err) {
warnings.push({
Expand Down
185 changes: 185 additions & 0 deletions apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1505,4 +1505,189 @@ algorithm:
expect(meta.reward?.traceCount).toBe(1);
expect(meta.reward?.traceIds).toEqual(["tr_missing_reward"]);
});

it("dirty-reward recovery does not insert orphan traces (regression: rescore loop guard)", async () => {
// Regression test for the rescore loop:
// When recoverDirtyClosedEpisodes re-emits episode.finalized, capture's
// runReflect used to insert new trace rows for "orphan steps" — steps
// whose timestamps didn't match any existing DB row. For recovered
// episodes this happens whenever a trace has tool calls with endedAt
// timestamps different from the trace's own ts, because the snapshot
// rebuilds a separate tool-role turn for each call.
//
// Without the guard the orphan insert grows trace_ids_json, keeping
// reward.traceCount != traceIds.length forever and looping on every
// bridge restart. The guard (meta.recoveryReason === "dirty_reward_rescore")
// skips the insert, so trace_ids_json stays stable and the episode
// stops appearing dirty after a single recovery pass.

home = await makeTmpHome({ agent: "openclaw" });

const seeder = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-seed",
});
await seeder.init();
await seeder.shutdown();

const Sqlite = (await import("better-sqlite3")).default;
const writeDb = new Sqlite(home.home.dbFile);
const BASE = Date.now() - 5_000;

writeDb
.prepare(
`INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`,
)
.run("se_loop", "openclaw", BASE, BASE, "{}");

// Episode is dirty: traceCount=1 but trace_ids_json has 2 IDs.
writeDb
.prepare(
`INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, ?, ?, ?, 'closed', ?)`,
)
.run(
"ep_loop",
"se_loop",
BASE,
BASE + 1,
JSON.stringify(["tr_loop_a", "tr_loop_b"]),
0.5,
JSON.stringify({
closeReason: "finalized",
reward: { rHuman: 0.5, scoredAt: BASE - 1000, traceCount: 1 },
}),
);

// tr_loop_a: plain text trace — no orphan risk.
writeDb
.prepare(
`INSERT INTO traces (
id, episode_id, session_id, ts, user_text, agent_text, summary,
tool_calls_json, reflection, agent_thinking, value, alpha, r_human,
priority, tags_json, error_signatures_json, vec_summary, vec_action,
share_scope, share_target, shared_at, turn_id, schema_version
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`,
)
.run(
"tr_loop_a",
"ep_loop",
"se_loop",
BASE,
"帮我分析一下这段Python代码的性能瓶颈,并给出优化建议。",
"这段代码的主要性能问题在于嵌套循环,时间复杂度是O(n²),可以用哈希表将其优化到O(n)。",
"Python代码性能分析",
"[]",
null,
null,
0,
0,
null,
0.5,
"[]",
"[]",
BASE,
1,
);

// tr_loop_b: trace with a tool call whose endedAt differs from the trace ts.
// snapshotFromRecoveredEpisode creates a tool-role turn with ts=BASE+300,
// which does NOT appear in traceByTs (only BASE and BASE+100 are in the map).
// Without the guard this step is treated as an orphan and a new trace is
// inserted, growing trace_ids_json from 2 to 3 and keeping the episode dirty.
const toolCallWithDifferentTs = JSON.stringify([
{
name: "bash",
input: { command: "python -c 'import cProfile; cProfile.run(\"main()\")'"},
output: "ncalls tottime ... main 1 0.003",
endedAt: BASE + 300,
},
]);
writeDb
.prepare(
`INSERT INTO traces (
id, episode_id, session_id, ts, user_text, agent_text, summary,
tool_calls_json, reflection, agent_thinking, value, alpha, r_human,
priority, tags_json, error_signatures_json, vec_summary, vec_action,
share_scope, share_target, shared_at, turn_id, schema_version
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?)`,
)
.run(
"tr_loop_b",
"ep_loop",
"se_loop",
BASE + 100,
"请用cProfile验证一下",
"运行结果确认了瓶颈在内层循环,优化后耗时减少了约80%。",
"cProfile性能验证",
toolCallWithDifferentTs,
null,
null,
0,
0,
null,
0.5,
"[]",
"[]",
BASE + 100,
1,
);
writeDb.close();

// First recovery: episode is dirty (traceCount=1 != ids_len=2).
core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-recover-1",
});
await core.init();
await core.shutdown();
core = null;

const readDb1 = new Sqlite(home.home.dbFile, { readonly: true });
const ep1 = readDb1
.prepare("SELECT trace_ids_json, meta_json, r_task FROM episodes WHERE id = ?")
.get("ep_loop") as { trace_ids_json: string; meta_json: string; r_task: number | null } | undefined;
readDb1.close();

expect(ep1).toBeDefined();
const ids1 = JSON.parse(ep1!.trace_ids_json) as string[];
// Guard: no orphan trace was inserted during dirty-reward recovery.
expect(ids1.length).toBe(2);
const meta1 = JSON.parse(ep1!.meta_json) as {
recoveryReason?: string;
reward?: { traceCount?: number };
};
expect(meta1.recoveryReason).toBe("dirty_reward_rescore");
// After recovery traceCount matches ids_len: episode is no longer dirty.
expect(meta1.reward?.traceCount).toBe(2);

// Second recovery (simulates next bridge restart): episode should not
// be re-scored because traceCount(2) == trace_ids_json.length(2).
core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "rescore-loop-recover-2",
});
await core.init();

const readDb2 = new Sqlite(home.home.dbFile, { readonly: true });
const ep2 = readDb2
.prepare("SELECT trace_ids_json, meta_json FROM episodes WHERE id = ?")
.get("ep_loop") as { trace_ids_json: string; meta_json: string } | undefined;
readDb2.close();

expect(ep2).toBeDefined();
const ids2 = JSON.parse(ep2!.trace_ids_json) as string[];
// Still 2 — no new orphan inserts on the second restart.
expect(ids2.length).toBe(2);
const meta2 = JSON.parse(ep2!.meta_json) as {
reward?: { traceCount?: number };
};
// traceCount unchanged: the episode was not re-scored.
expect(meta2.reward?.traceCount).toBe(2);
});
});