diff --git a/table/occ_scenario_test.go b/table/occ_scenario_test.go new file mode 100644 index 000000000..64ad9d914 --- /dev/null +++ b/table/occ_scenario_test.go @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +// Scenario-level regression test for the OCC manifest-list rebuild fix. +// +// Ported from the integration test suite to serve as a fast, local regression +// guard. Uses real on-disk Parquet and Avro files (local FS temp dir) but +// does not require Docker or a remote catalog. + +import ( + "context" + "fmt" + "path/filepath" + "sync/atomic" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/iceberg-go" + iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/suite" +) + +// --------------------------------------------------------------------------- +// Mock catalog +// --------------------------------------------------------------------------- + +// occScenarioCatalog simulates N CAS conflicts (HTTP 409) followed by a +// successful commit. LoadTable always returns the current metadata so that +// the retry loop can rebase against it. +type occScenarioCatalog struct { + current table.Metadata + conflictsLeft int + loadTableCalls atomic.Int32 + commitTableCalls atomic.Int32 + location string +} + +func (c *occScenarioCatalog) LoadTable(_ context.Context, _ table.Identifier) (*table.Table, error) { + c.loadTableCalls.Add(1) + + return table.New( + []string{"default", "occ_scenario_test"}, + c.current, + c.location+"/metadata/current.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + c, + ), nil +} + +func (c *occScenarioCatalog) CommitTable( + _ context.Context, + _ table.Identifier, + _ []table.Requirement, + updates []table.Update, +) (table.Metadata, string, error) { + c.commitTableCalls.Add(1) + + if c.conflictsLeft > 0 { + c.conflictsLeft-- + + return nil, "", fmt.Errorf("%w: simulated 409 conflict", table.ErrCommitFailed) + } + + newMeta, err := table.UpdateTableMetadata(c.current, updates, "") + if err != nil { + return nil, "", err + } + + c.current = newMeta + + return newMeta, c.location + "/metadata/final.metadata.json", nil +} + +// --------------------------------------------------------------------------- +// Test suite +// --------------------------------------------------------------------------- + +type OCCScenarioTestSuite struct { + suite.Suite + ctx context.Context + location string +} + +func TestOCCScenario(t *testing.T) { + suite.Run(t, new(OCCScenarioTestSuite)) +} + +func (s *OCCScenarioTestSuite) SetupSuite() { + s.ctx = context.Background() +} + +func (s *OCCScenarioTestSuite) SetupTest() { + s.location = filepath.ToSlash(s.T().TempDir()) +} + +func (s *OCCScenarioTestSuite) makeTable( + conflicts int, + extraProps iceberg.Properties, +) (*table.Table, *occScenarioCatalog) { + schema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64}, + iceberg.NestedField{ID: 2, Name: "val", Type: iceberg.PrimitiveTypes.String}, + ) + + props := iceberg.Properties{table.PropertyFormatVersion: "2"} + for k, v := range extraProps { + props[k] = v + } + + meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, + table.UnsortedSortOrder, s.location, props) + s.Require().NoError(err) + + cat := &occScenarioCatalog{ + current: meta, + conflictsLeft: conflicts, + location: s.location, + } + + tbl := table.New( + []string{"default", "occ_scenario_test"}, + meta, + s.location+"/metadata/v1.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + cat, + ) + + return tbl, cat +} + +func (s *OCCScenarioTestSuite) makeArrowTable() arrow.Table { + mem := memory.DefaultAllocator + sc := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: "val", Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + tbl, err := array.TableFromJSON(mem, sc, []string{ + `[{"id": 1, "val": "hello"}]`, + }) + s.Require().NoError(err) + + return tbl +} + +// --------------------------------------------------------------------------- +// Regression test +// --------------------------------------------------------------------------- + +// TestManifestListInheritedAfterConflict is a regression test for the bug +// where a retried snapshot reused the original manifest list (built against a +// stale parent) instead of inheriting the manifests already committed by +// concurrent writers. +// +// Scenario: +// - Writer B commits one row to an empty table (snapshot B, manifest B). +// - Writer A starts a transaction from the empty base (no knowledge of B). +// - Writer A's first commit attempt fails with ErrCommitFailed (409). +// - doCommit reloads and gets snapshot B as the fresh head. +// - The rebuildManifestList closure rewrites the manifest list to contain +// manifest A (Writer A's file) and manifest B (inherited). +// - Writer A's second attempt succeeds. +// +// Without the fix the final snapshot contains only manifest A and a scan +// returns 1 row instead of 2. +func (s *OCCScenarioTestSuite) TestManifestListInheritedAfterConflict() { + props := iceberg.Properties{ + table.CommitMinRetryWaitMsKey: "0", + table.CommitMaxRetryWaitMsKey: "0", + table.CommitTotalRetryTimeoutMsKey: "60000", + table.CommitNumRetriesKey: "2", + } + + // Step 1: commit Writer B's row to an empty table. + // This writes real Parquet + manifest Avro files to s.location/metadata/. + emptyTbl, catB := s.makeTable(0, props) + rowB := s.makeArrowTable() + defer rowB.Release() + + txB := emptyTbl.NewTransaction() + s.Require().NoError(txB.AppendTable(s.ctx, rowB, rowB.NumRows(), nil)) + + _, err := txB.Commit(s.ctx) + s.Require().NoError(err, "Writer B must commit successfully") + + metaAfterB := catB.current // catalog state after B's commit (real files on disk) + + // Step 2: Writer A's catalog starts with B's committed state. + // It returns ErrCommitFailed once (simulating B having committed just + // before A), then accepts the retry. + catA := &occScenarioCatalog{ + current: metaAfterB, + conflictsLeft: 1, + location: s.location, + } + + // Step 3: Writer A's table starts from the EMPTY base — it loaded the + // table before B committed. + writerATable := table.New( + emptyTbl.Identifier(), + emptyTbl.Metadata(), // empty: no knowledge of B's snapshot yet + s.location+"/metadata/v1.metadata.json", + func(_ context.Context) (iceio.IO, error) { return iceio.LocalFS{}, nil }, + catA, + ) + + rowA := s.makeArrowTable() + defer rowA.Release() + + txA := writerATable.NewTransaction() + s.Require().NoError(txA.AppendTable(s.ctx, rowA, rowA.NumRows(), nil)) + + _, err = txA.Commit(s.ctx) + s.Require().NoError(err, "Writer A must succeed after one conflict retry") + + s.Equal(int32(2), catA.commitTableCalls.Load(), + "expected 2 commit attempts: 1 conflict + 1 success") + + // Step 4: the final snapshot must reference BOTH manifests. + // manifest A (Writer A's data file) + manifest B (inherited from B's snapshot). + // Without the fix, only manifest A is present and the count is 1. + finalSnap := catA.current.CurrentSnapshot() + s.Require().NotNil(finalSnap, "committed table must have a current snapshot") + + fio := iceio.LocalFS{} + manifests, err := finalSnap.Manifests(fio) + s.Require().NoError(err, "must be able to read manifest list from disk") + s.Require().Len(manifests, 2, + "expected 2 manifests (Writer A + Writer B); got %d — "+ + "manifest list not inherited on retry", len(manifests)) + + // Each manifest must have exactly 1 data file (added or existing). + for i, mf := range manifests { + count := mf.AddedDataFiles() + mf.ExistingDataFiles() + s.EqualValues(1, count, + "manifest[%d] should have 1 data file, got %d", i, count) + } +} diff --git a/table/rebuild_manifest_test.go b/table/rebuild_manifest_test.go new file mode 100644 index 000000000..7b4631c81 --- /dev/null +++ b/table/rebuild_manifest_test.go @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "errors" + "testing" + + iceio "github.com/apache/iceberg-go/io" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList +// closure simply records the freshParent it received and returns a new +// snapshot whose ManifestList is the given newManifestList value. +func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent **Snapshot) *addSnapshotUpdate { + return &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + *gotParent = freshParent + rebuilt := *snap + rebuilt.ManifestList = newManifestList + + return &rebuilt, nil + }, + } +} + +// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that +// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh +// branch head as freshParent. +func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) { + const oldManifest = "s3://bucket/old-manifest-list.avro" + const newManifest = "s3://bucket/new-manifest-list.avro" + + // Build a fresh metadata that has a different snapshot than the one + // embedded in the update, so the parent-hasn't-changed guard is + // bypassed and the closure must be called. + freshHead := int64(42) + freshMeta := newConflictTestMetadata(t, &freshHead) + + parentID := int64(7) // original snap's parent — does NOT match freshHead (42) + snap := &Snapshot{ + SnapshotID: 99, + ParentSnapshotID: &parentID, + ManifestList: oldManifest, + Summary: &Summary{Operation: OpAppend}, + } + + var receivedParent *Snapshot + upd := rebuildUpdate(snap, newManifest, &receivedParent) + + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{upd}, + freshMeta, + MainBranch, + iceio.LocalFS{}, + 1, + ) + require.NoError(t, err) + require.Len(t, rebuilt, 1) + require.Len(t, orphaned, 1, "old manifest list must be recorded as orphaned") + + // The closure should have been called with the branch's current head. + require.NotNil(t, receivedParent) + assert.Equal(t, freshHead, receivedParent.SnapshotID) + + // The rebuilt update must carry the new manifest list. + addUpd, ok := rebuilt[0].(*addSnapshotUpdate) + require.True(t, ok) + assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList) + + // The superseded manifest list becomes an orphan. + assert.Equal(t, oldManifest, orphaned[0]) +} + +// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that +// rebuildSnapshotUpdates skips the rebuild when the update's snapshot +// already has the fresh branch head as its parent (no-op retry). +func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) { + const manifest = "s3://bucket/manifest-list.avro" + + freshHead := int64(42) + freshMeta := newConflictTestMetadata(t, &freshHead) + + // Parent already equals the fresh head — rebuild must be skipped. + snap := &Snapshot{ + SnapshotID: 99, + ParentSnapshotID: &freshHead, // same as fresh head + ManifestList: manifest, + Summary: &Summary{Operation: OpAppend}, + } + + called := false + upd := &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, _ *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + called = true + + return snap, nil + }, + } + + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{upd}, + freshMeta, + MainBranch, + iceio.LocalFS{}, + 1, + ) + require.NoError(t, err) + assert.False(t, called, "rebuild closure must not be called when parent is already up-to-date") + assert.Empty(t, orphaned, "no orphans when rebuild is skipped") + assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update must pass through unchanged") +} + +// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that +// updates without a rebuildManifestList closure are returned unmodified. +func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) { + plainUpd := NewAddSnapshotUpdate(&Snapshot{ + SnapshotID: 1, + ManifestList: "s3://bucket/no-rebuild.avro", + Summary: &Summary{Operation: OpAppend}, + }) + + // freshMeta may be nil — the plain update must not be touched. + rebuilt, orphaned, err := rebuildSnapshotUpdates( + t.Context(), + []Update{plainUpd}, + nil, + MainBranch, + iceio.LocalFS{}, + 0, + ) + require.NoError(t, err) + assert.Empty(t, orphaned) + assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate)) +} + +// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error +// returned by the rebuild closure surfaces as the function's return error. +func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) { + freshHead := int64(5) + freshMeta := newConflictTestMetadata(t, &freshHead) + + parent := int64(1) + snap := &Snapshot{ + SnapshotID: 10, + ParentSnapshotID: &parent, + ManifestList: "s3://bucket/old.avro", + Summary: &Summary{Operation: OpAppend}, + } + wantErr := errors.New("simulated S3 write failure") + upd := &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot}, + Snapshot: snap, + rebuildManifestList: func(_ context.Context, _ *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) { + return nil, wantErr + }, + } + + _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, freshMeta, MainBranch, iceio.LocalFS{}, 1) + assert.ErrorIs(t, err, wantErr) +} diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index c98b7cb7f..ce146f593 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -815,12 +815,52 @@ func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) { }, previousSummary) } +// computeOwnManifests returns the subset of allManifests that were written +// by this producer (i.e. not inherited from the parent snapshot). These are +// preserved across OCC retry attempts when the manifest list is rebuilt +// against a fresh parent. +func (sp *snapshotProducer) computeOwnManifests(allManifests []iceberg.ManifestFile) []iceberg.ManifestFile { + if sp.parentSnapshotID <= 0 { + // No parent means all manifests are new — nothing to exclude. + return allManifests + } + + parent, err := sp.txn.meta.SnapshotByID(sp.parentSnapshotID) + if err != nil || parent == nil { + return allManifests + } + + parentManifests, err := parent.Manifests(sp.io) + if err != nil { + return allManifests + } + + inherited := make(map[string]bool, len(parentManifests)) + for _, m := range parentManifests { + inherited[m.FilePath()] = true + } + + own := make([]iceberg.ManifestFile, 0, len(allManifests)) + for _, m := range allManifests { + if !inherited[m.FilePath()] { + own = append(own, m) + } + } + + return own +} + func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ []Requirement, err error) { newManifests, err := sp.manifests(ctx) if err != nil { return nil, nil, err } + // Separate "own" manifests (those written by this producer) from + // manifests inherited from the stale parent. The own manifests are + // preserved when the manifest list is rebuilt during OCC retries. + ownManifests := sp.computeOwnManifests(newManifests) + nextSequence := sp.txn.meta.nextSequenceNumber() summary, err := sp.summary(sp.snapshotProps) if err != nil { @@ -903,9 +943,103 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ []Require }) } + // Build the manifest-list rebuild closure. It is called by doCommit + // on each OCC retry to regenerate the manifest list so it correctly + // inherits all data files committed by concurrent writers since the + // original snapshot was built. + formatVersion := sp.txn.meta.formatVersion + snapshotID := sp.snapshotID + commitUUID := sp.commitUuid + capturedSnapshot := snapshot // copy the value so the closure is self-contained + processManifestsFn := func(m []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + return sp.processManifests(m) + } + + rebuildFn := func(_ context.Context, freshParent *Snapshot, fio iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) { + // Load inherited manifests from the fresh parent. + var inherited []iceberg.ManifestFile + if freshParent != nil { + inherited, retErr = freshParent.Manifests(fio) + if retErr != nil { + return nil, fmt.Errorf("rebuild manifest list: load parent manifests: %w", retErr) + } + } + + // Combine own manifests with inherited ones, applying any + // producer-specific processing (no-op for fast/merge-append). + combined, procErr := processManifestsFn(slices.Concat(ownManifests, inherited)) + if procErr != nil { + return nil, fmt.Errorf("rebuild manifest list: process manifests: %w", procErr) + } + + // Derive the sequence number. When there is a fresh parent, use its + // sequence number + 1 so the rebuilt snapshot is strictly greater than + // any committed peer. When there is no fresh parent (first snapshot in + // the table or unknown parent), preserve the original sequence number + // from the initial build. + var newSeq int64 + if freshParent != nil && formatVersion >= 2 { + newSeq = freshParent.SequenceNumber + 1 + } else { + newSeq = capturedSnapshot.SequenceNumber + } + + // Write the rebuilt manifest list to a path unique to this retry + // attempt. Each retry uses a different attempt counter in the filename + // (snap-{id}-{attempt}-{uuid}.avro) so that S3 conditional-write + // semantics (if-none-match) do not reject the overwrite. Orphaned files + // from superseded retry attempts are removed by doCommit after the + // commit succeeds. + fname := newManifestListFileName(snapshotID, attempt, commitUUID) + manifestListPath := locProvider.NewMetadataLocation(fname) + + out, createErr := fio.Create(manifestListPath) + if createErr != nil { + return nil, fmt.Errorf("rebuild manifest list: create file: %w", createErr) + } + defer internal.CheckedClose(out, &retErr) + + var parentID *int64 + if freshParent != nil { + id := freshParent.SnapshotID + parentID = &id + } + + firstRowID := int64(0) + if formatVersion == 3 { + writer, wrErr := iceberg.NewManifestListWriterV3(out, snapshotID, newSeq, firstRowID, parentID) + if wrErr != nil { + return nil, fmt.Errorf("rebuild manifest list: create v3 writer: %w", wrErr) + } + defer internal.CheckedClose(writer, &retErr) + if addErr := writer.AddManifests(combined); addErr != nil { + return nil, fmt.Errorf("rebuild manifest list: add manifests: %w", addErr) + } + } else { + if wErr := iceberg.WriteManifestList(formatVersion, out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil { + return nil, fmt.Errorf("rebuild manifest list: write: %w", wErr) + } + } + + rebuilt := capturedSnapshot + rebuilt.ManifestList = manifestListPath + rebuilt.ParentSnapshotID = parentID + rebuilt.SequenceNumber = newSeq + + return &rebuilt, nil + } + + addSnap := NewAddSnapshotUpdate(&snapshot) + addSnap.ownManifests = ownManifests + addSnap.rebuildManifestList = rebuildFn + return []Update{ - NewAddSnapshotUpdate(&snapshot), - NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, -1, -1, -1), + addSnap, + // Use 0 (not -1) for the optional fields so they are omitted by + // `omitempty` in JSON marshalling. -1 is a sentinel meaning + // "no limit" internally, but strict catalogs such as AWS S3 Tables + // reject a payload that explicitly contains negative values. + NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, 0, 0, 0), }, []Requirement{ AssertRefSnapshotID(branch, sp.txn.meta.currentSnapshotID), }, nil diff --git a/table/table.go b/table/table.go index a8ee06b1c..68a0c874e 100644 --- a/table/table.go +++ b/table/table.go @@ -51,14 +51,6 @@ import ( // catalogs return their conflict errors raw and will not trigger // retries until follow-up work wires them through (tracked under // issue #830). -// -// The retry loop in doCommit re-issues the original updates and -// requirements unchanged. This recovers only from transient catalog -// errors (dropped connections, brief 409 during leader election); it -// does not yet refresh the table metadata between attempts, so a -// contended commit whose AssertRefSnapshotID requirement has been -// invalidated by a peer will fail deterministically on every retry. -// Refresh-and-replay is tracked separately (issue #830). var ErrCommitFailed = errors.New("commit failed, refresh and try again") type FSysF func(ctx context.Context) (icebergio.IO, error) @@ -385,9 +377,10 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen } var ( - newMeta Metadata - newLoc string - timer *time.Timer + newMeta Metadata + newLoc string + timer *time.Timer + orphanedManifests []string // manifest-list files orphaned by rebuilds ) // current tracks the catalog state between retries. On attempt 0 it @@ -428,6 +421,20 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen } current = fresh.metadata reqs = rewriteRefSnapshotRequirements(reqs, co.branch, current) + + // Rebuild snapshot manifest lists to inherit all files committed + // by concurrent writers since the snapshot was originally built. + // Without this, the new snapshot's manifest list would only + // contain its own files and callers scanning the current snapshot + // would miss every concurrent writer's data. + if wfs, ok := fs.(icebergio.WriteFileIO); ok { + rebuiltUpdates, orphaned, rebuildErr := rebuildSnapshotUpdates(retryCtx, updates, current, co.branch, wfs, int(attempt)) + if rebuildErr != nil { + return nil, fmt.Errorf("rebuild manifest list for retry attempt %d: %w", attempt, rebuildErr) + } + orphanedManifests = append(orphanedManifests, orphaned...) + updates = rebuiltUpdates + } } // Pre-flight client-side conflict validation. Producers can @@ -483,6 +490,19 @@ func (t Table) doCommit(ctx context.Context, updates []Update, reqs []Requiremen return nil, err } + // Delete manifest-list files that were written during failed retry + // attempts and have now been superseded by the committed rebuild. + // These are orphaned objects that will never be referenced again. + if len(orphanedManifests) > 0 { + if wfs, ok := fs.(icebergio.WriteFileIO); ok { + for _, path := range orphanedManifests { + if removeErr := wfs.Remove(path); removeErr != nil { + log.Printf("Warning: failed to delete orphaned manifest list %s: %v", path, removeErr) + } + } + } + } + deleteOldMetadata(fs, t.metadata, newMeta) return New(t.identifier, newMeta, newLoc, t.fsF, t.cat), nil @@ -526,6 +546,66 @@ func rewriteRefSnapshotRequirements(reqs []Requirement, branch string, fresh Met return out } +// rebuildSnapshotUpdates returns a new slice of updates where any +// addSnapshotUpdate that carries a rebuildManifestList closure has its +// snapshot regenerated to inherit all data files committed to the branch +// since the original snapshot was built. Updates without a rebuild closure +// pass through unchanged. +// +// It also returns the manifest-list file paths that were superseded by +// the rebuild (i.e., the paths from the input updates that were replaced). +// These become orphaned objects in object storage and should be removed +// by the caller after a successful commit. +// +// This is the manifest-layer "refresh-and-replay" step: the data files +// (already written to object storage) are reused as-is; only the manifest +// list is rewritten to include the fresh parent's manifests so that the +// rebuilt snapshot contains every committed file. +func rebuildSnapshotUpdates(ctx context.Context, updates []Update, freshMeta Metadata, branch string, fs icebergio.WriteFileIO, attempt int) (rebuilt []Update, orphanedPaths []string, err error) { + // Determine the fresh branch head to use as the rebuilt snapshot's parent. + var freshHead *Snapshot + if branch != "" && freshMeta != nil { + freshHead = freshMeta.SnapshotByName(branch) + } else if freshMeta != nil { + freshHead = freshMeta.CurrentSnapshot() + } + + result := make([]Update, len(updates)) + copy(result, updates) + + for i, u := range result { + su, ok := u.(*addSnapshotUpdate) + if !ok || su.rebuildManifestList == nil { + continue + } + + // Skip if the parent has not changed — saves an unnecessary S3 write. + if freshHead != nil && su.Snapshot.ParentSnapshotID != nil && + *su.Snapshot.ParentSnapshotID == freshHead.SnapshotID { + continue + } + + oldManifestList := su.Snapshot.ManifestList + + newSnap, rebuildErr := su.rebuildManifestList(ctx, freshHead, fs, attempt) + if rebuildErr != nil { + return nil, nil, rebuildErr + } + + result[i] = &addSnapshotUpdate{ + baseUpdate: su.baseUpdate, + Snapshot: newSnap, + ownManifests: su.ownManifests, + rebuildManifestList: su.rebuildManifestList, + } + + // The old manifest list is now an orphaned object in object storage. + orphanedPaths = append(orphanedPaths, oldManifestList) + } + + return result, orphanedPaths, nil +} + type retryConfig struct { numRetries uint minWaitMs uint diff --git a/table/updates.go b/table/updates.go index dda9a5323..389858249 100644 --- a/table/updates.go +++ b/table/updates.go @@ -305,6 +305,18 @@ func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error { type addSnapshotUpdate struct { baseUpdate Snapshot *Snapshot `json:"snapshot"` + + // ownManifests holds the manifests written by this producer (those + // NOT inherited from the parent snapshot). Populated by + // snapshotProducer.commit and used by rebuildManifestList below. + ownManifests []iceberg.ManifestFile + + // rebuildManifestList, when non-nil, regenerates the snapshot's + // manifest list to inherit from freshParent and combines it with + // ownManifests. Called by doCommit on every retry attempt so that + // each retry snapshot correctly inherits all files committed by + // concurrent writers since the original build. + rebuildManifestList func(ctx context.Context, freshParent *Snapshot, fio io.WriteFileIO, attempt int) (*Snapshot, error) } // NewAddSnapshotUpdate creates a new update that adds the given snapshot to the table metadata. @@ -316,7 +328,25 @@ func NewAddSnapshotUpdate(snapshot *Snapshot) *addSnapshotUpdate { } func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error { - return builder.AddSnapshot(u.Snapshot) + if err := builder.AddSnapshot(u.Snapshot); err != nil { + return err + } + + // Propagate the rebuild closure to the newly-added update object so + // that doCommit's retry loop can regenerate the manifest list after + // an OCC conflict. MetadataBuilder.AddSnapshot always appends a fresh + // *addSnapshotUpdate as the last element of builder.updates; we reach + // back and copy our runtime-only fields onto it. + if u.rebuildManifestList != nil { + if n := len(builder.updates); n > 0 { + if su, ok := builder.updates[n-1].(*addSnapshotUpdate); ok { + su.ownManifests = u.ownManifests + su.rebuildManifestList = u.rebuildManifestList + } + } + } + + return nil } type setSnapshotRefUpdate struct {