From a4f582df56a13211071fba4424e535f4a58592ab Mon Sep 17 00:00:00 2001 From: Will2406 Date: Sat, 16 May 2026 01:17:16 -0500 Subject: [PATCH] fix(store): apply repairable cloud-upgrade mutations even when a blocker is queued MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously RepairCloudUpgrade short-circuited the entire repair pass as soon as DiagnoseCloudUpgradeLegacyMutations reported any blocked finding, including in --apply mode. This forced operators with a single unrecoverable legacy mutation (e.g. a session row whose `directory` was stored as the empty string by an older engram version) to perform manual SQLite surgery before any of the other queued mutations could be repaired. The error message also reported `Findings[0]`, which is ordered by `seq` and is often a *repairable* finding, not the actual blocker — so users saw the wrong seq/entity in the manual-action guidance. This change: - Applies the repairable subset first in `RepairCloudUpgrade` and only then surfaces residual blockers. The returned report now carries `Applied=true` on partial successes and explains both buckets in the message ("applied N repairable payload(s); M remain blocked: ..."). - Selects the first non-repairable finding for the manual-action message so the seq/entity/op shown to the user identifies the actual blocker. - Adds `entity_key=%q` to the blocker description so operators can locate the offending row directly without cross-referencing seq → entity_key. Tests cover the new mixed-bucket dry-run and apply paths plus the seq-ordering guarantee. --- internal/store/store.go | 70 +++++++++++----- internal/store/store_test.go | 152 +++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 18 deletions(-) diff --git a/internal/store/store.go b/internal/store/store.go index 569467ca..81ece095 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -1224,37 +1224,71 @@ func (s *Store) RepairCloudUpgrade(project string, apply bool) (CloudUpgradeRepa if err != nil { return CloudUpgradeRepairReport{}, fmt.Errorf("diagnose legacy cloud upgrade mutations: %w", err) } + + // Apply the repairable subset first. Holding the entire repair pass hostage + // to a single unrecoverable mutation forces operators into manual SQLite + // surgery for the rest of the queue; instead we let recoverable findings + // make forward progress and surface the residual blockers afterwards. + appliedRepairs := false + if apply && legacyReport.RepairableCount > 0 { + if err := s.applyCloudUpgradeLegacyMutationRepairs(project); err != nil { + return CloudUpgradeRepairReport{}, fmt.Errorf("apply cloud upgrade legacy mutation repairs: %w", err) + } + appliedRepairs = true + } + if legacyReport.BlockedCount > 0 { - first := legacyReport.Findings[0] + // Pick the first non-repairable finding so the user sees the actual + // blocker. Findings[0] is ordered by sync_mutations.seq and may be a + // repairable one, which previously produced misleading error messages. + var blocked CloudUpgradeLegacyMutationFinding + for _, f := range legacyReport.Findings { + if !f.Repairable { + blocked = f + break + } + } + var msg string + switch { + case appliedRepairs: + msg = fmt.Sprintf("applied %d repairable payload(s); %d remain blocked: manual-action-required: %s (seq=%d entity=%s entity_key=%q op=%s)", + legacyReport.RepairableCount, legacyReport.BlockedCount, blocked.Message, blocked.Seq, blocked.Entity, blocked.EntityKey, blocked.Op) + case legacyReport.RepairableCount > 0: + msg = fmt.Sprintf("%d repairable payload(s) would apply; %d would remain blocked: manual-action-required: %s (seq=%d entity=%s entity_key=%q op=%s)", + legacyReport.RepairableCount, legacyReport.BlockedCount, blocked.Message, blocked.Seq, blocked.Entity, blocked.EntityKey, blocked.Op) + default: + msg = fmt.Sprintf("manual-action-required: %s (seq=%d entity=%s entity_key=%q op=%s)", + blocked.Message, blocked.Seq, blocked.Entity, blocked.EntityKey, blocked.Op) + } return CloudUpgradeRepairReport{ Class: UpgradeRepairClassBlocked, ReasonCode: UpgradeReasonBlockedLegacyMutationManual, - Message: fmt.Sprintf("manual-action-required: %s (seq=%d entity=%s op=%s)", first.Message, first.Seq, first.Entity, first.Op), - Applied: false, + Message: msg, + Applied: appliedRepairs, }, nil } if legacyReport.RepairableCount > 0 { - report := CloudUpgradeRepairReport{ - Class: UpgradeRepairClassRepairable, - ReasonCode: UpgradeReasonRepairableLegacyMutationPayload, - Message: fmt.Sprintf("project %q has %d repairable legacy mutation payload issue(s)", project, legacyReport.RepairableCount), - PlannedAction: "repair_legacy_mutation_payloads", - Applied: false, - } - if !apply { - return report, nil - } - if err := s.applyCloudUpgradeLegacyMutationRepairs(project); err != nil { - return CloudUpgradeRepairReport{}, fmt.Errorf("apply cloud upgrade legacy mutation repairs: %w", err) + if !appliedRepairs { + return CloudUpgradeRepairReport{ + Class: UpgradeRepairClassRepairable, + ReasonCode: UpgradeReasonRepairableLegacyMutationPayload, + Message: fmt.Sprintf("project %q has %d repairable legacy mutation payload issue(s)", project, legacyReport.RepairableCount), + PlannedAction: "repair_legacy_mutation_payloads", + Applied: false, + }, nil } - report.Applied = true - report.Message = fmt.Sprintf("applied deterministic legacy mutation payload repairs for project %q", project) _ = s.SaveCloudUpgradeState(CloudUpgradeState{ Project: project, Stage: UpgradeStageRepairApplied, RepairClass: UpgradeRepairClassRepairable, }) - return report, nil + return CloudUpgradeRepairReport{ + Class: UpgradeRepairClassRepairable, + ReasonCode: UpgradeReasonRepairableLegacyMutationPayload, + Message: fmt.Sprintf("applied deterministic legacy mutation payload repairs for project %q", project), + PlannedAction: "repair_legacy_mutation_payloads", + Applied: true, + }, nil } requiresBackfill, err := s.projectSyncBackfillRequired(project) diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 72ada425..23e654db 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -1986,6 +1986,158 @@ func TestUpgradeRepairDryRunAndApply(t *testing.T) { t.Fatalf("expected no remaining legacy findings after repair, got %+v", after) } }) + + t.Run("repair applies repairable mutations even when a blocker is queued ahead of them", func(t *testing.T) { + s := newTestStore(t) + // Repairable session: local row has a directory we can backfill from. + if err := s.CreateSession("mix-recoverable", "mix-proj", "/tmp/mix"); err != nil { + t.Fatalf("create recoverable session: %v", err) + } + // Enroll BEFORE the orphan row exists so EnrollProject's backfill pass + // does not enqueue an extra (also-blocked) session mutation for it. + if err := s.EnrollProject("mix-proj"); err != nil { + t.Fatalf("enroll project: %v", err) + } + // Unrecoverable session: local row exists but the directory is empty, + // so the repair pass cannot infer a value (mirrors the legacy + // orphan-session shape reported by users on engram <= v1.15.13). + if _, err := s.execHook(s.db, + `INSERT INTO sessions (id, project, directory, started_at) VALUES (?, ?, '', datetime('now'))`, + "mix-orphan", "mix-proj", + ); err != nil { + t.Fatalf("seed orphan session row: %v", err) + } + + // Insert the BLOCKED mutation first so it gets a lower seq than the + // repairable one. The pre-fix code reported Findings[0]'s details, + // which exposed the wrong seq when Findings[0] happened to be the + // repairable mutation. Here we want the orphan to legitimately be the + // blocker no matter the ordering. + if _, err := s.execHook(s.db, + `INSERT INTO sync_mutations (target_key, entity, entity_key, op, payload, source, project) VALUES (?, ?, ?, ?, ?, ?, ?)`, + DefaultSyncTargetKey, SyncEntitySession, "mix-orphan", SyncOpUpsert, + `{"id":"mix-orphan","project":"mix-proj","directory":""}`, + SyncSourceLocal, "mix-proj", + ); err != nil { + t.Fatalf("insert blocked mutation: %v", err) + } + if _, err := s.execHook(s.db, + `INSERT INTO sync_mutations (target_key, entity, entity_key, op, payload, source, project) VALUES (?, ?, ?, ?, ?, ?, ?)`, + DefaultSyncTargetKey, SyncEntitySession, "mix-recoverable", SyncOpUpsert, + `{"id":"mix-recoverable","project":"mix-proj","directory":""}`, + SyncSourceLocal, "mix-proj", + ); err != nil { + t.Fatalf("insert repairable mutation: %v", err) + } + + // Dry-run: must surface both buckets and identify the orphan as the blocker. + dryRun, err := s.RepairCloudUpgrade("mix-proj", false) + if err != nil { + t.Fatalf("dry-run repair: %v", err) + } + if dryRun.Class != UpgradeRepairClassBlocked || dryRun.Applied { + t.Fatalf("dry-run should report blocked + applied=false, got %+v", dryRun) + } + if !strings.Contains(dryRun.Message, "1 repairable payload(s) would apply") || + !strings.Contains(dryRun.Message, "1 would remain blocked") || + !strings.Contains(dryRun.Message, `entity_key="mix-orphan"`) { + t.Fatalf("dry-run message should describe both buckets and name the orphan entity_key, got %q", dryRun.Message) + } + + // Apply: the repairable mutation must be patched in place, the blocker + // must remain queued, and Applied must be true (we did do work). + applied, err := s.RepairCloudUpgrade("mix-proj", true) + if err != nil { + t.Fatalf("apply repair: %v", err) + } + if applied.Class != UpgradeRepairClassBlocked || !applied.Applied { + t.Fatalf("apply should report blocked + applied=true (partial success), got %+v", applied) + } + if !strings.Contains(applied.Message, "applied 1 repairable payload(s)") || + !strings.Contains(applied.Message, "1 remain blocked") { + t.Fatalf("apply message should note partial success and remaining blocker, got %q", applied.Message) + } + + var repaired, stillBroken string + if err := s.db.QueryRow(`SELECT payload FROM sync_mutations WHERE entity_key = ? AND acked_at IS NULL`, "mix-recoverable").Scan(&repaired); err != nil { + t.Fatalf("load repaired mutation payload: %v", err) + } + if strings.Contains(repaired, `"directory":""`) { + t.Fatalf("expected mix-recoverable payload to have directory backfilled, got %s", repaired) + } + if err := s.db.QueryRow(`SELECT payload FROM sync_mutations WHERE entity_key = ? AND acked_at IS NULL`, "mix-orphan").Scan(&stillBroken); err != nil { + t.Fatalf("load orphan mutation payload: %v", err) + } + if !strings.Contains(stillBroken, `"directory":""`) { + t.Fatalf("orphan payload should remain unchanged, got %s", stillBroken) + } + + after, err := s.DiagnoseCloudUpgradeLegacyMutations("mix-proj") + if err != nil { + t.Fatalf("diagnose after partial repair: %v", err) + } + if after.RepairableCount != 0 || after.BlockedCount != 1 { + t.Fatalf("expected only the orphan to remain after partial repair, got %+v", after) + } + }) + + t.Run("blocker error message names the unrecoverable seq even when a lower-seq repairable exists", func(t *testing.T) { + s := newTestStore(t) + if err := s.CreateSession("ord-recoverable", "ord-proj", "/tmp/ord"); err != nil { + t.Fatalf("create recoverable session: %v", err) + } + if err := s.EnrollProject("ord-proj"); err != nil { + t.Fatalf("enroll project: %v", err) + } + if _, err := s.execHook(s.db, + `INSERT INTO sessions (id, project, directory, started_at) VALUES (?, ?, '', datetime('now'))`, + "ord-orphan", "ord-proj", + ); err != nil { + t.Fatalf("seed orphan session row: %v", err) + } + + // Repairable goes in FIRST (lowest seq). Previously Findings[0] would + // be this repairable row, and the error message would report its seq + // even though it is not the actual blocker. + if _, err := s.execHook(s.db, + `INSERT INTO sync_mutations (target_key, entity, entity_key, op, payload, source, project) VALUES (?, ?, ?, ?, ?, ?, ?)`, + DefaultSyncTargetKey, SyncEntitySession, "ord-recoverable", SyncOpUpsert, + `{"id":"ord-recoverable","project":"ord-proj","directory":""}`, + SyncSourceLocal, "ord-proj", + ); err != nil { + t.Fatalf("insert repairable mutation: %v", err) + } + var repairableSeq int64 + if err := s.db.QueryRow(`SELECT seq FROM sync_mutations WHERE entity_key = ? AND project = ?`, "ord-recoverable", "ord-proj").Scan(&repairableSeq); err != nil { + t.Fatalf("lookup repairable seq: %v", err) + } + + if _, err := s.execHook(s.db, + `INSERT INTO sync_mutations (target_key, entity, entity_key, op, payload, source, project) VALUES (?, ?, ?, ?, ?, ?, ?)`, + DefaultSyncTargetKey, SyncEntitySession, "ord-orphan", SyncOpUpsert, + `{"id":"ord-orphan","project":"ord-proj","directory":""}`, + SyncSourceLocal, "ord-proj", + ); err != nil { + t.Fatalf("insert blocked mutation: %v", err) + } + var orphanSeq int64 + if err := s.db.QueryRow(`SELECT seq FROM sync_mutations WHERE entity_key = ? AND project = ?`, "ord-orphan", "ord-proj").Scan(&orphanSeq); err != nil { + t.Fatalf("lookup orphan seq: %v", err) + } + + report, err := s.RepairCloudUpgrade("ord-proj", false) + if err != nil { + t.Fatalf("dry-run repair: %v", err) + } + wantSeq := fmt.Sprintf("seq=%d", orphanSeq) + wrongSeq := fmt.Sprintf("seq=%d", repairableSeq) + if !strings.Contains(report.Message, wantSeq) { + t.Fatalf("error message should reference the blocked seq %d, got %q", orphanSeq, report.Message) + } + if strings.Contains(report.Message, wrongSeq) { + t.Fatalf("error message must NOT reference the repairable seq %d, got %q", repairableSeq, report.Message) + } + }) } func TestRollbackCloudUpgradeSafetyBoundary(t *testing.T) {