From fd1b54916af2cb24ef8b12e9fa89b06d86abc34c Mon Sep 17 00:00:00 2001 From: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> Date: Tue, 5 May 2026 07:19:05 +1000 Subject: [PATCH 1/5] fix(occ): rebuild manifest list on retry to inherit concurrent writes When doCommit retries after an OCC conflict (ErrCommitFailed / HTTP 409), the AddSnapshotUpdate still carries the manifest list built against the original base snapshot. That list does not include any manifests committed by concurrent writers between the first attempt and the retry, so the retried commit silently discards those manifests and the data they reference is lost. Fix: snapshotProducer records its own manifest files at commit-build time (computeOwnManifests). doCommit calls a rebuildManifestList closure on each retry that rewrites the AddSnapshotUpdate with a fresh manifest list that prepends the current branch-head manifests before the producer's own files. Orphaned manifest-list files from failed attempts are deleted after a successful commit. Fixes #976 --- table/snapshot_producers.go | 136 +++++++++++++++++++++++++++++++++++- table/table.go | 102 ++++++++++++++++++++++++--- table/updates.go | 32 ++++++++- 3 files changed, 256 insertions(+), 14 deletions(-) diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index c98b7cb7f..ed07aa322 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -815,12 +815,51 @@ func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) { }, previousSummary) } +// computeOwnManifests returns the subset of allManifests that were written +// by this producer (i.e. not inherited from the parent snapshot). These are +// preserved across OCC retry attempts when the manifest list is rebuilt +// against a fresh parent. +func (sp *snapshotProducer) computeOwnManifests(allManifests []iceberg.ManifestFile) []iceberg.ManifestFile { + if sp.parentSnapshotID <= 0 { + // No parent means all manifests are new — nothing to exclude. + return allManifests + } + + parent, err := sp.txn.meta.SnapshotByID(sp.parentSnapshotID) + if err != nil || parent == nil { + return allManifests + } + + parentManifests, err := parent.Manifests(sp.io) + if err != nil { + return allManifests + } + + inherited := make(map[string]bool, len(parentManifests)) + for _, m := range parentManifests { + inherited[m.FilePath()] = true + } + + own := make([]iceberg.ManifestFile, 0, len(allManifests)) + for _, m := range allManifests { + if !inherited[m.FilePath()] { + own = append(own, m) + } + } + return own +} + func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ []Requirement, err error) { newManifests, err := sp.manifests(ctx) if err != nil { return nil, nil, err } + // Separate "own" manifests (those written by this producer) from + // manifests inherited from the stale parent. The own manifests are + // preserved when the manifest list is rebuilt during OCC retries. + ownManifests := sp.computeOwnManifests(newManifests) + nextSequence := sp.txn.meta.nextSequenceNumber() summary, err := sp.summary(sp.snapshotProps) if err != nil { @@ -903,9 +942,102 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ []Require }) } + // Build the manifest-list rebuild closure. It is called by doCommit + // on each OCC retry to regenerate the manifest list so it correctly + // inherits all data files committed by concurrent writers since the + // original snapshot was built. + formatVersion := sp.txn.meta.formatVersion + snapshotID := sp.snapshotID + commitUUID := sp.commitUuid + capturedSnapshot := snapshot // copy the value so the closure is self-contained + processManifestsFn := func(m []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + return sp.processManifests(m) + } + + rebuildFn := func(_ context.Context, freshParent *Snapshot, fio iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) { + // Load inherited manifests from the fresh parent. + var inherited []iceberg.ManifestFile + if freshParent != nil { + inherited, retErr = freshParent.Manifests(fio) + if retErr != nil { + return nil, fmt.Errorf("rebuild manifest list: load parent manifests: %w", retErr) + } + } + + // Combine own manifests with inherited ones, applying any + // producer-specific processing (no-op for fast/merge-append). + combined, procErr := processManifestsFn(slices.Concat(ownManifests, inherited)) + if procErr != nil { + return nil, fmt.Errorf("rebuild manifest list: process manifests: %w", procErr) + } + + // Derive the sequence number. When there is a fresh parent, use its + // sequence number + 1 so the rebuilt snapshot is strictly greater than + // any committed peer. When there is no fresh parent (first snapshot in + // the table or unknown parent), preserve the original sequence number + // from the initial build. + var newSeq int64 + if freshParent != nil && formatVersion >= 2 { + newSeq = freshParent.SequenceNumber + 1 + } else { + newSeq = capturedSnapshot.SequenceNumber + } + + // Write the rebuilt manifest list to a path unique to this retry + // attempt. Each retry uses a different attempt counter in the filename + // (snap-{id}-{attempt}-{uuid}.avro) so that S3 conditional-write + // semantics (if-none-match) do not reject the overwrite. Orphaned files + // from superseded retry attempts are removed by doCommit after the + // commit succeeds. + fname := newManifestListFileName(snapshotID, attempt, commitUUID) + manifestListPath := locProvider.NewMetadataLocation(fname) + + out, createErr := fio.Create(manifestListPath) + if createErr != nil { + return nil, fmt.Errorf("rebuild manifest list: create file: %w", createErr) + } + defer internal.CheckedClose(out, &retErr) + + var parentID *int64 + if freshParent != nil { + id := freshParent.SnapshotID + parentID = &id + } + + firstRowID := int64(0) + if formatVersion == 3 { + writer, wrErr := iceberg.NewManifestListWriterV3(out, snapshotID, newSeq, firstRowID, parentID) + if wrErr != nil { + return nil, fmt.Errorf("rebuild manifest list: create v3 writer: %w", wrErr) + } + defer internal.CheckedClose(writer, &retErr) + if addErr := writer.AddManifests(combined); addErr != nil { + return nil, fmt.Errorf("rebuild manifest list: add manifests: %w", addErr) + } + } else { + if wErr := iceberg.WriteManifestList(formatVersion, out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil { + return nil, fmt.Errorf("rebuild manifest list: write: %w", wErr) + } + } + + rebuilt := capturedSnapshot + rebuilt.ManifestList = manifestListPath + rebuilt.ParentSnapshotID = parentID + rebuilt.SequenceNumber = newSeq + return &rebuilt, nil + } + + addSnap := NewAddSnapshotUpdate(&snapshot) + addSnap.ownManifests = ownManifests + addSnap.rebuildManifestList = rebuildFn + return []Update{ - NewAddSnapshotUpdate(&snapshot), - NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, -1, -1, -1), + addSnap, + // Use 0 (not -1) for the optional fields so they are omitted by + // `omitempty` in JSON marshalling. -1 is a sentinel meaning + // "no limit" internally, but strict catalogs such as AWS S3 Tables + // reject a payload that explicitly contains negative values. + NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, 0, 0, 0), }, []Requirement{ AssertRefSnapshotID(branch, sp.txn.meta.currentSnapshotID), }, nil diff --git a/table/table.go b/table/table.go index a8ee06b1c..68a0c874e 100644 --- a/table/table.go +++ b/table/table.go @@ -51,14 +51,6 @@ import ( // catalogs return their conflict errors raw and will not trigger // retries until follow-up work wires them through (tracked under // issue #830). -// -// The retry loop in doCommit re-issues the original updates and -// requirements unchanged. This recovers only from transient catalog -// errors (dropped connections, brief 409 during leader election); it -// does not yet refresh the table metadata between attempts, so a -// contended commit whose AssertRefSnapshotID requirement has been -// invalidated by a peer will fail deterministically on every retry. -// Refresh-and-replay is tracked separately (issue #830). var ErrCommitFailed = errors.New("commit failed, refresh and try again") type FSysF func(ctx context.Context) (icebergio.IO, error) @@ -385,9 +377,10 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen } var ( - newMeta Metadata - newLoc string - timer *time.Timer + newMeta Metadata + newLoc string + timer *time.Timer + orphanedManifests []string // manifest-list files orphaned by rebuilds ) // current tracks the catalog state between retries. On attempt 0 it @@ -428,6 +421,20 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen } current = fresh.metadata reqs = rewriteRefSnapshotRequirements(reqs, co.branch, current) + + // Rebuild snapshot manifest lists to inherit all files committed + // by concurrent writers since the snapshot was originally built. + // Without this, the new snapshot's manifest list would only + // contain its own files and callers scanning the current snapshot + // would miss every concurrent writer's data. + if wfs, ok := fs.(icebergio.WriteFileIO); ok { + rebuiltUpdates, orphaned, rebuildErr := rebuildSnapshotUpdates(retryCtx, updates, current, co.branch, wfs, int(attempt)) + if rebuildErr != nil { + return nil, fmt.Errorf("rebuild manifest list for retry attempt %d: %w", attempt, rebuildErr) + } + orphanedManifests = append(orphanedManifests, orphaned...) + updates = rebuiltUpdates + } } // Pre-flight client-side conflict validation. Producers can @@ -483,6 +490,19 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen return nil, err } + // Delete manifest-list files that were written during failed retry + // attempts and have now been superseded by the committed rebuild. + // These are orphaned objects that will never be referenced again. + if len(orphanedManifests) > 0 { + if wfs, ok := fs.(icebergio.WriteFileIO); ok { + for _, path := range orphanedManifests { + if removeErr := wfs.Remove(path); removeErr != nil { + log.Printf("Warning: failed to delete orphaned manifest list %s: %v", path, removeErr) + } + } + } + } + deleteOldMetadata(fs, t.metadata, newMeta) return New(t.identifier, newMeta, newLoc, t.fsF, t.cat), nil @@ -526,6 +546,66 @@ func rewriteRefSnapshotRequirements(reqs []Requirement, branch string, fresh Met return out } +// rebuildSnapshotUpdates returns a new slice of updates where any +// addSnapshotUpdate that carries a rebuildManifestList closure has its +// snapshot regenerated to inherit all data files committed to the branch +// since the original snapshot was built. Updates without a rebuild closure +// pass through unchanged. +// +// It also returns the manifest-list file paths that were superseded by +// the rebuild (i.e., the paths from the input updates that were replaced). +// These become orphaned objects in object storage and should be removed +// by the caller after a successful commit. +// +// This is the manifest-layer "refresh-and-replay" step: the data files +// (already written to object storage) are reused as-is; only the manifest +// list is rewritten to include the fresh parent's manifests so that the +// rebuilt snapshot contains every committed file. +func rebuildSnapshotUpdates(ctx context.Context, updates []Update, freshMeta Metadata, branch string, fs icebergio.WriteFileIO, attempt int) (rebuilt []Update, orphanedPaths []string, err error) { + // Determine the fresh branch head to use as the rebuilt snapshot's parent. + var freshHead *Snapshot + if branch != "" && freshMeta != nil { + freshHead = freshMeta.SnapshotByName(branch) + } else if freshMeta != nil { + freshHead = freshMeta.CurrentSnapshot() + } + + result := make([]Update, len(updates)) + copy(result, updates) + + for i, u := range result { + su, ok := u.(*addSnapshotUpdate) + if !ok || su.rebuildManifestList == nil { + continue + } + + // Skip if the parent has not changed — saves an unnecessary S3 write. + if freshHead != nil && su.Snapshot.ParentSnapshotID != nil && + *su.Snapshot.ParentSnapshotID == freshHead.SnapshotID { + continue + } + + oldManifestList := su.Snapshot.ManifestList + + newSnap, rebuildErr := su.rebuildManifestList(ctx, freshHead, fs, attempt) + if rebuildErr != nil { + return nil, nil, rebuildErr + } + + result[i] = &addSnapshotUpdate{ + baseUpdate: su.baseUpdate, + Snapshot: newSnap, + ownManifests: su.ownManifests, + rebuildManifestList: su.rebuildManifestList, + } + + // The old manifest list is now an orphaned object in object storage. + orphanedPaths = append(orphanedPaths, oldManifestList) + } + + return result, orphanedPaths, nil +} + type retryConfig struct { numRetries uint minWaitMs uint diff --git a/table/updates.go b/table/updates.go index dda9a5323..389858249 100644 --- a/table/updates.go +++ b/table/updates.go @@ -305,6 +305,18 @@ func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error { type addSnapshotUpdate struct { baseUpdate Snapshot *Snapshot `json:"snapshot"` + + // ownManifests holds the manifests written by this producer (those + // NOT inherited from the parent snapshot). Populated by + // snapshotProducer.commit and used by rebuildManifestList below. + ownManifests []iceberg.ManifestFile + + // rebuildManifestList, when non-nil, regenerates the snapshot's + // manifest list to inherit from freshParent and combines it with + // ownManifests. Called by doCommit on every retry attempt so that + // each retry snapshot correctly inherits all files committed by + // concurrent writers since the original build. + rebuildManifestList func(ctx context.Context, freshParent *Snapshot, fio io.WriteFileIO, attempt int) (*Snapshot, error) } // NewAddSnapshotUpdate creates a new update that adds the given snapshot to the table metadata. @@ -316,7 +328,25 @@ func NewAddSnapshotUpdate(snapshot *Snapshot) *addSnapshotUpdate { } func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error { - return builder.AddSnapshot(u.Snapshot) + if err := builder.AddSnapshot(u.Snapshot); err != nil { + return err + } + + // Propagate the rebuild closure to the newly-added update object so + // that doCommit's retry loop can regenerate the manifest list after + // an OCC conflict. MetadataBuilder.AddSnapshot always appends a fresh + // *addSnapshotUpdate as the last element of builder.updates; we reach + // back and copy our runtime-only fields onto it. + if u.rebuildManifestList != nil { + if n := len(builder.updates); n > 0 { + if su, ok := builder.updates[n-1].(*addSnapshotUpdate); ok { + su.ownManifests = u.ownManifests + su.rebuildManifestList = u.rebuildManifestList + } + } + } + + return nil } type setSnapshotRefUpdate struct { From f36952d13fb84a5029b186a4fcb273013536ddae Mon Sep 17 00:00:00 2001 From: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> Date: Tue, 5 May 2026 07:44:51 +1000 Subject: [PATCH 2/5] test(table): add unit tests for rebuildSnapshotUpdates Covers four cases: - closure is called with the fresh branch head as freshParent - rebuild is skipped when snapshot parent already matches the fresh head - updates without a rebuild closure pass through unchanged - errors returned by the closure propagate to the caller --- table/rebuild_manifest_test.go | 183 +++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) create mode 100644 table/rebuild_manifest_test.go diff --git a/table/rebuild_manifest_test.go b/table/rebuild_manifest_test.go new file mode 100644 index 000000000..9c4c7b5bf --- /dev/null +++ b/table/rebuild_manifest_test.go @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "testing" + + iceio "github.com/apache/iceberg-go/io" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList +// closure simply records the freshParent it received and returns a new +// snapshot whose ManifestList is the given newManifestList value. +func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent **Snapshot) *addSnapshotUpdate { + return &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + *gotParent = freshParent + rebuilt := *snap + rebuilt.ManifestList = newManifestList + return &rebuilt, nil + }, + } +} + +// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that +// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh +// branch head as freshParent. +func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) { + const oldManifest = "s3://bucket/old-manifest-list.avro" + const newManifest = "s3://bucket/new-manifest-list.avro" + + // Build a fresh metadata that has a different snapshot than the one + // embedded in the update, so the parent-hasn't-changed guard is + // bypassed and the closure must be called. + freshHead := int64(42) + freshMeta := newConflictTestMetadata(t, &freshHead) + + parentID := int64(7) // original snap's parent — does NOT match freshHead (42) + snap := &Snapshot{ + SnapshotID: 99, + ParentSnapshotID: &parentID, + ManifestList: oldManifest, + Summary: &Summary{Operation: OpAppend}, + } + + var receivedParent *Snapshot + upd := rebuildUpdate(snap, newManifest, &receivedParent) + + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{upd}, + freshMeta, + MainBranch, + iceio.LocalFS{}, + 1, + ) + require.NoError(t, err) + require.Len(t, rebuilt, 1) + require.Len(t, orphaned, 1, "old manifest list must be recorded as orphaned") + + // The closure should have been called with the branch's current head. + require.NotNil(t, receivedParent) + assert.Equal(t, freshHead, receivedParent.SnapshotID) + + // The rebuilt update must carry the new manifest list. + addUpd, ok := rebuilt[0].(*addSnapshotUpdate) + require.True(t, ok) + assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList) + + // The superseded manifest list becomes an orphan. + assert.Equal(t, oldManifest, orphaned[0]) +} + +// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that +// rebuildSnapshotUpdates skips the rebuild when the update's snapshot +// already has the fresh branch head as its parent (no-op retry). +func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) { + const manifest = "s3://bucket/manifest-list.avro" + + freshHead := int64(42) + freshMeta := newConflictTestMetadata(t, &freshHead) + + // Parent already equals the fresh head — rebuild must be skipped. + snap := &Snapshot{ + SnapshotID: 99, + ParentSnapshotID: &freshHead, // same as fresh head + ManifestList: manifest, + Summary: &Summary{Operation: OpAppend}, + } + + called := false + upd := &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, _ *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + called = true + return snap, nil + }, + } + + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{upd}, + freshMeta, + MainBranch, + iceio.LocalFS{}, + 1, + ) + require.NoError(t, err) + assert.False(t, called, "rebuild closure must not be called when parent is already up-to-date") + assert.Empty(t, orphaned, "no orphans when rebuild is skipped") + assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update must pass through unchanged") +} + +// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that +// updates without a rebuildManifestList closure are returned unmodified. +func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) { + plainUpd := NewAddSnapshotUpdate(&Snapshot{ + SnapshotID: 1, + ManifestList: "s3://bucket/no-rebuild.avro", + Summary: &Summary{Operation: OpAppend}, + }) + + // freshMeta may be nil — the plain update must not be touched. + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{plainUpd}, + nil, + MainBranch, + iceio.LocalFS{}, + 0, + ) + require.NoError(t, err) + assert.Empty(t, orphaned) + assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate)) +} + +// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error +// returned by the rebuild closure surfaces as the function's return error. +func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) { + freshHead := int64(5) + freshMeta := newConflictTestMetadata(t, &freshHead) + + parent := int64(1) + snap := &Snapshot{ + SnapshotID: 10, + ParentSnapshotID: &parent, + ManifestList: "s3://bucket/old.avro", + Summary: &Summary{Operation: OpAppend}, + } + wantErr := fmt.Errorf("simulated S3 write failure") + upd := &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, _ *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + return nil, wantErr + }, + } + + _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, freshMeta, MainBranch, iceio.LocalFS{}, 1) + assert.ErrorIs(t, err, wantErr) +} From f1d2cd90492de0d304301f8720838c3a71531066 Mon Sep 17 00:00:00 2001 From: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> Date: Tue, 5 May 2026 08:23:41 +1000 Subject: [PATCH 3/5] style: fix nlreturn and perfsprint lint violations --- table/rebuild_manifest_test.go | 5 +++-- table/snapshot_producers.go | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/table/rebuild_manifest_test.go b/table/rebuild_manifest_test.go index 9c4c7b5bf..4b461e87a 100644 --- a/table/rebuild_manifest_test.go +++ b/table/rebuild_manifest_test.go @@ -19,7 +19,7 @@ package table import ( "context" - "fmt" + "errors" "testing" iceio "github.com/apache/iceberg-go/io" @@ -38,6 +38,7 @@ func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent **Snapshot) *gotParent = freshParent rebuilt := *snap rebuilt.ManifestList = newManifestList + return &rebuilt, nil }, } @@ -169,7 +170,7 @@ func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) { ManifestList: "s3://bucket/old.avro", Summary: &Summary{Operation: OpAppend}, } - wantErr := fmt.Errorf("simulated S3 write failure") + wantErr := errors.New("simulated S3 write failure") upd := &addSnapshotUpdate{ baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, Snapshot: snap, diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index ed07aa322..ce146f593 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -846,6 +846,7 @@ func (sp *snapshotProducer) computeOwnManifests(allManifests []iceberg.ManifestF own = append(own, m) } } + return own } @@ -1024,6 +1025,7 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ []Require rebuilt.ManifestList = manifestListPath rebuilt.ParentSnapshotID = parentID rebuilt.SequenceNumber = newSeq + return &rebuilt, nil } From 4bb0204fcfa5b02967a4e8029131e685136664b1 Mon Sep 17 00:00:00 2001 From: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> Date: Tue, 5 May 2026 09:00:44 +1000 Subject: [PATCH 4/5] style: fix remaining nlreturn in TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged --- table/rebuild_manifest_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/table/rebuild_manifest_test.go b/table/rebuild_manifest_test.go index 4b461e87a..7b4631c81 100644 --- a/table/rebuild_manifest_test.go +++ b/table/rebuild_manifest_test.go @@ -116,6 +116,7 @@ func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) { Snapshot: snap, rebuildManifestList: func(_ context.Context, _ *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { called = true + return snap, nil }, } From ed5a362b20b086723cd8c41c6b2673446e9928e1 Mon Sep 17 00:00:00 2001 From: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> Date: Tue, 5 May 2026 11:42:02 +1000 Subject: [PATCH 5/5] test: add regression test TestManifestListInheritedAfterConflict Verifies that a retried snapshot correctly inherits manifests from concurrent writers rather than reusing the stale manifest list. The test fails on main (got 1 manifest) and passes with this fix (got 2 manifests), serving as a direct regression guard for the manifest-list rebuild logic in rebuildSnapshotUpdates. Signed-off-by: mzzz-zzm <80936176+mzzz-zzm@users.noreply.github.com> --- table/occ_scenario_test.go | 257 +++++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 table/occ_scenario_test.go diff --git a/table/occ_scenario_test.go b/table/occ_scenario_test.go new file mode 100644 index 000000000..64ad9d914 --- /dev/null +++ b/table/occ_scenario_test.go @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +// Scenario-level regression test for the OCC manifest-list rebuild fix. +// +// Ported from the integration test suite to serve as a fast, local regression +// guard. Uses real on-disk Parquet and Avro files (local FS temp dir) but +// does not require Docker or a remote catalog. + +import ( + "context" + "fmt" + "path/filepath" + "sync/atomic" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/suite" +) + +// --------------------------------------------------------------------------- +// Mock catalog +// --------------------------------------------------------------------------- + +// occScenarioCatalog simulates N CAS conflicts (HTTP 409) followed by a +// successful commit. LoadTable always returns the current metadata so that +// the retry loop can rebase against it. +type occScenarioCatalog struct { + current table.Metadata + conflictsLeft int + loadTableCalls atomic.Int32 + commitTableCalls atomic.Int32 + location string +} + +func (c *occScenarioCatalog) LoadTable(_ context.Context, _ table.Identifier) (*table.Table, error) { + c.loadTableCalls.Add(1) + + return table.New( + []string{"default", "occ_scenario_test"}, + c.current, + c.location+"/metadata/current.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + c, + ), nil +} + +func (c *occScenarioCatalog) CommitTable( + _ context.Context, + _ table.Identifier, + _ []table.Requirement, + updates []table.Update, +) (table.Metadata, string, error) { + c.commitTableCalls.Add(1) + + if c.conflictsLeft > 0 { + c.conflictsLeft-- + + return nil, "", fmt.Errorf("%w: simulated 409 conflict", table.ErrCommitFailed) + } + + newMeta, err := table.UpdateTableMetadata(c.current, updates, "") + if err != nil { + return nil, "", err + } + + c.current = newMeta + + return newMeta, c.location + "/metadata/final.metadata.json", nil +} + +// --------------------------------------------------------------------------- +// Test suite +// --------------------------------------------------------------------------- + +type OCCScenarioTestSuite struct { + suite.Suite + ctx context.Context + location string +} + +func TestOCCScenario(t *testing.T) { + suite.Run(t, new(OCCScenarioTestSuite)) +} + +func (s *OCCScenarioTestSuite) SetupSuite() { + s.ctx = context.Background() +} + +func (s *OCCScenarioTestSuite) SetupTest() { + s.location = filepath.ToSlash(s.T().TempDir()) +} + +func (s *OCCScenarioTestSuite) makeTable( + conflicts int, + extraProps iceberg.Properties, +) (*table.Table, *occScenarioCatalog) { + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64}, + iceberg.NestedField{ID: 2, Name: "val", Type: iceberg.PrimitiveTypes.String}, + ) + + props := iceberg.Properties{table.PropertyFormatVersion: "2"} + for k, v := range extraProps { + props[k] = v + } + + meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, + table.UnsortedSortOrder, s.location, props) + s.Require().NoError(err) + + cat := &occScenarioCatalog{ + current: meta, + conflictsLeft: conflicts, + location: s.location, + } + + tbl := table.New( + []string{"default", "occ_scenario_test"}, + meta, + s.location+"/metadata/v1.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + cat, + ) + + return tbl, cat +} + +func (s *OCCScenarioTestSuite) makeArrowTable() arrow.Table { + mem := memory.DefaultAllocator + sc := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: "val", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + tbl, err := array.TableFromJSON(mem, sc, []string{ + `[{"id": 1, "val": "hello"}]`, + }) + s.Require().NoError(err) + + return tbl +} + +// --------------------------------------------------------------------------- +// Regression test +// --------------------------------------------------------------------------- + +// TestManifestListInheritedAfterConflict is a regression test for the bug +// where a retried snapshot reused the original manifest list (built against a +// stale parent) instead of inheriting the manifests already committed by +// concurrent writers. +// +// Scenario: +// - Writer B commits one row to an empty table (snapshot B, manifest B). +// - Writer A starts a transaction from the empty base (no knowledge of B). +// - Writer A's first commit attempt fails with ErrCommitFailed (409). +// - doCommit reloads and gets snapshot B as the fresh head. +// - The rebuildManifestList closure rewrites the manifest list to contain +// manifest A (Writer A's file) and manifest B (inherited). +// - Writer A's second attempt succeeds. +// +// Without the fix the final snapshot contains only manifest A and a scan +// returns 1 row instead of 2. +func (s *OCCScenarioTestSuite) TestManifestListInheritedAfterConflict() { + props := iceberg.Properties{ + table.CommitMinRetryWaitMsKey: "0", + table.CommitMaxRetryWaitMsKey: "0", + table.CommitTotalRetryTimeoutMsKey: "60000", + table.CommitNumRetriesKey: "2", + } + + // Step 1: commit Writer B's row to an empty table. + // This writes real Parquet + manifest Avro files to s.location/metadata/. + emptyTbl, catB := s.makeTable(0, props) + rowB := s.makeArrowTable() + defer rowB.Release() + + txB := emptyTbl.NewTransaction() + s.Require().NoError(txB.AppendTable(s.ctx, rowB, rowB.NumRows(), nil)) + + _, err := txB.Commit(s.ctx) + s.Require().NoError(err, "Writer B must commit successfully") + + metaAfterB := catB.current // catalog state after B's commit (real files on disk) + + // Step 2: Writer A's catalog starts with B's committed state. + // It returns ErrCommitFailed once (simulating B having committed just + // before A), then accepts the retry. + catA := &occScenarioCatalog{ + current: metaAfterB, + conflictsLeft: 1, + location: s.location, + } + + // Step 3: Writer A's table starts from the EMPTY base — it loaded the + // table before B committed. + writerATable := table.New( + emptyTbl.Identifier(), + emptyTbl.Metadata(), // empty: no knowledge of B's snapshot yet + s.location+"/metadata/v1.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + catA, + ) + + rowA := s.makeArrowTable() + defer rowA.Release() + + txA := writerATable.NewTransaction() + s.Require().NoError(txA.AppendTable(s.ctx, rowA, rowA.NumRows(), nil)) + + _, err = txA.Commit(s.ctx) + s.Require().NoError(err, "Writer A must succeed after one conflict retry") + + s.Equal(int32(2), catA.commitTableCalls.Load(), + "expected 2 commit attempts: 1 conflict + 1 success") + + // Step 4: the final snapshot must reference BOTH manifests. + // manifest A (Writer A's data file) + manifest B (inherited from B's snapshot). + // Without the fix, only manifest A is present and the count is 1. + finalSnap := catA.current.CurrentSnapshot() + s.Require().NotNil(finalSnap, "committed table must have a current snapshot") + + fio := iceio.LocalFS{} + manifests, err := finalSnap.Manifests(fio) + s.Require().NoError(err, "must be able to read manifest list from disk") + s.Require().Len(manifests, 2, + "expected 2 manifests (Writer A + Writer B); got %d — "+ + "manifest list not inherited on retry", len(manifests)) + + // Each manifest must have exactly 1 data file (added or existing). + for i, mf := range manifests { + count := mf.AddedDataFiles() + mf.ExistingDataFiles() + s.EqualValues(1, count, + "manifest[%d] should have 1 data file, got %d", i, count) + } +}