Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,19 @@ func (b *DataFileBuilder) FirstRowID(id int64) *DataFileBuilder {
return b
}

// SetDataFileFirstRowID sets the first_row_id on an existing DataFile.
// This is used by the snapshot producer to assign row IDs at write time for v3 tables.
// Returns false if the DataFile implementation does not support mutation.
func SetDataFileFirstRowID(df DataFile, id int64) bool {
if d, ok := df.(*dataFile); ok {
d.FirstRowIDField = &id

return true
}

return false
}

func (b *DataFileBuilder) ReferencedDataFile(path string) *DataFileBuilder {
b.d.ReferencedDataFileField = &path

Expand Down
13 changes: 13 additions & 0 deletions table/snapshot_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,20 @@ func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent, fi
}
defer internal.CheckedClose(wr, &err)

// For v3 data manifests, assign first_row_id to each data file.
// Each file claims a contiguous range of row IDs starting from NextRowID.
assignRowIDs := sp.txn.meta.formatVersion >= 3 && content == iceberg.ManifestContentData
nextRowID := sp.txn.meta.NextRowID()

for _, df := range files {
if assignRowIDs {
if !iceberg.SetDataFileFirstRowID(df, nextRowID) {
return fmt.Errorf("failed to assign first_row_id to data file %s: unsupported DataFile implementation", df.FilePath())
}

nextRowID += df.Count()
}

err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
nil, nil, df))
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions table/snapshot_producers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,127 @@ func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) {
"persisted manifest first_row_id must match snapshot first-row-id for current commit")
}

// TestCommitV3RowLineageAssignsPerFileFirstRowID verifies that the snapshot producer
// assigns contiguous first_row_id to each individual data file within a manifest.
func TestCommitV3RowLineageAssignsPerFileFirstRowID(t *testing.T) {
spec := iceberg.NewPartitionSpec()
ident := Identifier{"db", "tbl"}
txn, memIO := createTestTransactionWithMemIO(t, spec)
txn.meta.formatVersion = 3

// Append multiple files with different row counts.
sp := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
sp.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 10))
sp.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-2.parquet", nil, 20))
sp.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-3.parquet", nil, 5))

updates, reqs, err := sp.commit(context.Background())
require.NoError(t, err, "commit should succeed")
addSnap, ok := updates[0].(*addSnapshotUpdate)
require.True(t, ok)
require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID)
require.Equal(t, int64(35), *addSnap.Snapshot.AddedRows)

// Read the manifest entries and verify per-file first_row_id.
manifests := readManifestListFromPath(t, memIO, addSnap.Snapshot.ManifestList)
require.NotEmpty(t, manifests)

var dataManifest iceberg.ManifestFile
for _, m := range manifests {
if m.ManifestContent() == iceberg.ManifestContentData && m.SnapshotID() == addSnap.Snapshot.SnapshotID {
dataManifest = m

break
}
}
require.NotNil(t, dataManifest, "must find data manifest for this snapshot")

entries, err := dataManifest.FetchEntries(memIO, false)
require.NoError(t, err, "fetch manifest entries")
require.Len(t, entries, 3)

// Files should have contiguous first_row_id: 0, 10, 30
require.NotNil(t, entries[0].DataFile().FirstRowID(), "file 1 must have first_row_id")
require.EqualValues(t, 0, *entries[0].DataFile().FirstRowID(), "file 1 first_row_id")
require.NotNil(t, entries[1].DataFile().FirstRowID(), "file 2 must have first_row_id")
require.EqualValues(t, 10, *entries[1].DataFile().FirstRowID(), "file 2 first_row_id")
require.NotNil(t, entries[2].DataFile().FirstRowID(), "file 3 must have first_row_id")
require.EqualValues(t, 30, *entries[2].DataFile().FirstRowID(), "file 3 first_row_id")

// Verify metadata advances correctly.
err = txn.apply(updates, reqs)
require.NoError(t, err)
meta, err := txn.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(35), meta.NextRowID())

// Second commit continues from next-row-id=35.
tbl2 := New(ident, meta, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil)
txn2 := tbl2.NewTransaction()
txn2.meta.formatVersion = 3
sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
sp2.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-4.parquet", nil, 7))
updates2, _, err := sp2.commit(context.Background())
require.NoError(t, err)
addSnap2, ok := updates2[0].(*addSnapshotUpdate)
require.True(t, ok)

manifests2 := readManifestListFromPath(t, memIO, addSnap2.Snapshot.ManifestList)
var dataManifest2 iceberg.ManifestFile
for _, m := range manifests2 {
if m.ManifestContent() == iceberg.ManifestContentData && m.SnapshotID() == addSnap2.Snapshot.SnapshotID {
dataManifest2 = m

break
}
}
require.NotNil(t, dataManifest2)
entries2, err := dataManifest2.FetchEntries(memIO, false)
require.NoError(t, err)
require.Len(t, entries2, 1)
require.NotNil(t, entries2[0].DataFile().FirstRowID())
require.EqualValues(t, 35, *entries2[0].DataFile().FirstRowID(), "second commit file starts at next-row-id=35")
}

// TestCommitV3RowLineageDeleteFilesNoRowID verifies that delete files do NOT get
// first_row_id assigned (row lineage applies only to data files).
func TestCommitV3RowLineageDeleteFilesNoRowID(t *testing.T) {
spec := iceberg.NewPartitionSpec()
txn, memIO := createTestTransactionWithMemIO(t, spec)
txn.meta.formatVersion = 3

// Append a delete file.
builder, err := iceberg.NewDataFileBuilder(
spec, iceberg.EntryContentPosDeletes,
"file://delete-1.parquet", iceberg.ParquetFile,
nil, nil, nil, 5, 100,
)
require.NoError(t, err)
deleteFile := builder.Build()

sp := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
sp.appendDeleteFile(deleteFile)
// Also append a data file to ensure delete manifest is separate.
sp.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 10))

updates, _, err := sp.commit(context.Background())
require.NoError(t, err)
addSnap, ok := updates[0].(*addSnapshotUpdate)
require.True(t, ok)

manifests := readManifestListFromPath(t, memIO, addSnap.Snapshot.ManifestList)
for _, m := range manifests {
if m.ManifestContent() == iceberg.ManifestContentDeletes {
entries, err := m.FetchEntries(memIO, false)
require.NoError(t, err)
for _, e := range entries {
require.Nil(t, e.DataFile().FirstRowID(),
"delete file must NOT have first_row_id assigned")
}
}
}
}

func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
spec := partitionedSpec()
schema := simpleSchema()
Expand Down
Loading