From 3409fb9fc83d5cb27dd61ca61b52908158475454 Mon Sep 17 00:00:00 2001 From: Artem Alperin Date: Tue, 5 May 2026 10:51:20 +0000 Subject: [PATCH 1/3] Replace FetchEntries with iterator approach --- cmd/iceberg/output.go | 11 ++- manifest.go | 104 +++++++++++++++++++------- table/compaction/eq_delete_collect.go | 18 ++--- table/conflict_validation.go | 9 +-- table/orphan_cleanup.go | 10 +-- table/scanner.go | 11 ++- table/snapshot_producers.go | 14 ++-- table/snapshots.go | 29 ++++--- table/transaction.go | 10 +-- table/updates.go | 20 ++--- 10 files changed, 135 insertions(+), 101 deletions(-) diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go index 695ba28d9..813239a45 100644 --- a/cmd/iceberg/output.go +++ b/cmd/iceberg/output.go @@ -150,12 +150,11 @@ func (t textOutput) Files(tbl *table.Table, history bool) { snapshotTree = append(snapshotTree, pterm.LeveledListItem{ Level: 1, Text: "Manifest: " + m.FilePath(), }) - datafiles, err := m.FetchEntries(afs, false) - if err != nil { - t.Error(err) - os.Exit(1) - } - for _, e := range datafiles { + for e, err := range m.Entries(afs, false) { + if err != nil { + t.Error(err) + os.Exit(1) + } snapshotTree = append(snapshotTree, pterm.LeveledListItem{ Level: 2, Text: "Datafile: " + e.DataFile().FilePath(), }) diff --git a/manifest.go b/manifest.go index da781c8f8..9e2531233 100644 --- a/manifest.go +++ b/manifest.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "iter" "math" "math/big" "reflect" @@ -339,8 +340,35 @@ func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue } func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 } func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 } -func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { - return fetchManifestEntries(m, fs, discardDeleted) + +func (m *manifestFile) Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, error] { + return func(yield func(ManifestEntry, error) bool) { + f, err := fs.Open(m.FilePath()) + if err != nil { + yield(nil, err) + + return + } + defer func() { + _ = f.Close() + }() + + for entry, err := range IterManifest(m, f, discardDeleted) { + if !yield(entry, err) { + return + } + } + } +} + +func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) (_ []ManifestEntry, err error) { + f, openErr := fs.Open(m.FilePath()) + if openErr != nil { + return nil, openErr + } + defer internal.CheckedClose(f, &err) + + return ReadManifest(m, f, discardDeleted) } func getFieldIDMap(sc *avro.Schema) (map[string]int, map[int]string, map[int]int) { @@ -395,16 +423,6 @@ type hasFieldToIDMap interface { setFieldIDToFixedSizeMap(map[int]int) } -func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) (_ []ManifestEntry, err error) { - f, err := fs.Open(m.FilePath()) - if err != nil { - return nil, err - } - defer internal.CheckedClose(f, &err) - - return ReadManifest(m, f, discardDeleted) -} - // ManifestFile is the interface which covers both V1 and V2 manifest files. type ManifestFile interface { // Version returns the version number of this manifest file. @@ -463,6 +481,13 @@ type ManifestFile interface { HasAddedFiles() bool // HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null. HasExistingFiles() bool + // Entries streams the manifest entries from the manifest file using + // the provided file system IO interface. Entries that have been + // marked as deleted are skipped if discardDeleted is true. + // + // Prefer Entries over FetchEntries when walking large manifests + // since it avoids loading every entry into memory at once. + Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, error] // FetchEntries reads the manifest list file to fetch the list of // manifest entries using the provided file system IO interface. // If discardDeleted is true, entries for files containing deleted rows @@ -751,33 +776,54 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) { return tmp, nil } +// IterManifest returns an iterator that streams manifest entries from +// the provided reader without buffering them. If discardDeleted is true, +// entries whose status is "deleted" are skipped. +func IterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[ManifestEntry, error] { + return func(yield func(ManifestEntry, error) bool) { + manifestReader, err := NewManifestReader(m, f) + if err != nil { + yield(nil, err) + + return + } + defer func() { + _ = manifestReader.Close() + }() + + for { + entry, err := manifestReader.ReadEntry() + if err != nil { + if errors.Is(err, io.EOF) { + return + } + yield(nil, err) + + return + } + if discardDeleted && entry.Status() == EntryStatusDELETED { + continue + } + if !yield(entry, nil) { + return + } + } + } +} + // ReadManifest reads in an avro list file and returns a slice // of manifest entries or an error if one is encountered. If discardDeleted // is true, the returned slice omits entries whose status is "deleted". func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) ([]ManifestEntry, error) { - manifestReader, err := NewManifestReader(m, f) - if err != nil { - return nil, err - } - defer func() { - _ = manifestReader.Close() - }() - var results []ManifestEntry - for { - entry, err := manifestReader.ReadEntry() + for entry, err := range IterManifest(m, f, discardDeleted) { if err != nil { - if errors.Is(err, io.EOF) { - return results, nil - } - return results, err } - if discardDeleted && entry.Status() == EntryStatusDELETED { - continue - } results = append(results, entry) } + + return results, nil } // ReadManifestList reads in an avro manifest list file and returns a slice diff --git a/table/compaction/eq_delete_collect.go b/table/compaction/eq_delete_collect.go index 145b7a4f4..58f223794 100644 --- a/table/compaction/eq_delete_collect.go +++ b/table/compaction/eq_delete_collect.go @@ -71,11 +71,10 @@ func CollectDeadEqualityDeletes( if m.ManifestContent() != iceberg.ManifestContentDeletes { continue } - entries, err := m.FetchEntries(fs, true) - if err != nil { - return nil, err - } - for _, e := range entries { + for e, err := range m.Entries(fs, true) { + if err != nil { + return nil, err + } if e.DataFile().ContentType() != iceberg.EntryContentEqDeletes { continue } @@ -95,11 +94,10 @@ func CollectDeadEqualityDeletes( if m.ManifestContent() != iceberg.ManifestContentData { continue } - entries, err := m.FetchEntries(fs, true) - if err != nil { - return nil, err - } - for _, e := range entries { + for e, err := range m.Entries(fs, true) { + if err != nil { + return nil, err + } df := e.DataFile() if df.ContentType() != iceberg.EntryContentData { continue diff --git a/table/conflict_validation.go b/table/conflict_validation.go index d626e9960..1e12eaced 100644 --- a/table/conflict_validation.go +++ b/table/conflict_validation.go @@ -385,11 +385,10 @@ func validateAddedDataFilesMatchingFilter(ctx *conflictContext, filter iceberg.B } pEval := partitionEvals.Get(int(mf.PartitionSpecID())) - entries, err := mf.FetchEntries(ctx.fs, false) - if err != nil { - return fmt.Errorf("reading entries from manifest %s: %w", mf.FilePath(), err) - } - for _, e := range entries { + for e, err := range mf.Entries(ctx.fs, false) { + if err != nil { + return fmt.Errorf("reading entries from manifest %s: %w", mf.FilePath(), err) + } if e.Status() != iceberg.EntryStatusADDED || e.SnapshotID() != snap.SnapshotID { continue } diff --git a/table/orphan_cleanup.go b/table/orphan_cleanup.go index 28e0adac1..81fa84ede 100644 --- a/table/orphan_cleanup.go +++ b/table/orphan_cleanup.go @@ -268,12 +268,10 @@ func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) { for _, manifest := range manifestFiles { referenced[manifest.FilePath()] = true - entries, err := manifest.FetchEntries(fs, false) - if err != nil { - return nil, fmt.Errorf("failed to read manifest entries: %w", err) - } - - for _, entry := range entries { + for entry, err := range manifest.Entries(fs, false) { + if err != nil { + return nil, fmt.Errorf("failed to read manifest entries: %w", err) + } referenced[entry.DataFile().FilePath()] = true } } diff --git a/table/scanner.go b/table/scanner.go index a38116824..d95fae568 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -153,13 +153,12 @@ func GetPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.Struct func openManifest(io io.IO, manifest iceberg.ManifestFile, partitionFilter, metricsEval func(iceberg.DataFile) (bool, error), ) ([]iceberg.ManifestEntry, error) { - entries, err := manifest.FetchEntries(io, true) - if err != nil { - return nil, err - } + var out []iceberg.ManifestEntry + for entry, err := range manifest.Entries(io, true) { + if err != nil { + return nil, err + } - out := make([]iceberg.ManifestEntry, 0, len(entries)) - for _, entry := range entries { p, err := partitionFilter(entry.DataFile()) if err != nil { return nil, err diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index c98b7cb7f..2aaaabaa5 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "iter" "maps" "slices" "sync/atomic" @@ -350,12 +351,11 @@ func (m *manifestMergeManager) createManifest(specID int, bin []iceberg.Manifest defer internal.CheckedClose(wr, &err) for _, manifest := range bin { - entries, err := m.snap.fetchManifestEntry(manifest, false) - if err != nil { - return nil, err - } + for entry, err := range m.snap.iterManifestEntries(manifest, false) { + if err != nil { + return nil, err + } - for _, entry := range entries { switch { case entry.Status() == iceberg.EntryStatusDELETED && entry.SnapshotID() == m.snap.snapshotID: // only files deleted by this snapshot should be added to the new manifest @@ -612,6 +612,10 @@ func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile, discardDe return m.FetchEntries(sp.io, discardDeleted) } +func (sp *snapshotProducer) iterManifestEntries(m iceberg.ManifestFile, discardDeleted bool) iter.Seq2[iceberg.ManifestEntry, error] { + return m.Entries(sp.io, discardDeleted) +} + func (sp *snapshotProducer) manifests(ctx context.Context) (_ []iceberg.ManifestFile, err error) { deleted, err := sp.deletedEntries(ctx) if err != nil { diff --git a/table/snapshots.go b/table/snapshots.go index ef109b258..f23f30c09 100644 --- a/table/snapshots.go +++ b/table/snapshots.go @@ -345,20 +345,19 @@ func (s Snapshot) dataFiles(fio iceio.IO, fileFilter set[iceberg.ManifestEntryCo } for _, m := range manifests { - dataFiles, err := m.FetchEntries(fio, false) - if err != nil { - yield(nil, err) + for entry, err := range m.Entries(fio, false) { + if err != nil { + yield(nil, err) - return - } + return + } - for _, f := range dataFiles { if fileFilter != nil { - if _, ok := fileFilter[f.DataFile().ContentType()]; !ok { + if _, ok := fileFilter[entry.DataFile().ContentType()]; !ok { continue } } - if !yield(f.DataFile(), nil) { + if !yield(entry.DataFile(), nil) { return } } @@ -389,15 +388,13 @@ func (s Snapshot) entries(fio iceio.IO, manifestContent iceberg.ManifestContent) continue } - entries, err := m.FetchEntries(fio, false) - if err != nil { - yield(nil, err) + for entry, err := range m.Entries(fio, false) { + if err != nil { + yield(nil, err) - return - } - - for _, e := range entries { - if !yield(e, nil) { + return + } + if !yield(entry, nil) { return } } diff --git a/table/transaction.go b/table/transaction.go index 5a11ce598..1ffa18de1 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -1400,15 +1400,13 @@ func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context, fs } } - entries, err := manifest.FetchEntries(fs, false) - if err != nil { - return fmt.Errorf("failed to fetch manifest entries: %w", err) - } - localDelete := make([]iceberg.DataFile, 0) localRewrite := make([]iceberg.DataFile, 0) - for _, entry := range entries { + for entry, err := range manifest.Entries(fs, false) { + if err != nil { + return fmt.Errorf("failed to fetch manifest entries: %w", err) + } if entry.Status() == iceberg.EntryStatusDELETED { continue } diff --git a/table/updates.go b/table/updates.go index dda9a5323..cfe723a51 100644 --- a/table/updates.go +++ b/table/updates.go @@ -496,12 +496,10 @@ func (u *removeSnapshotsUpdate) PostCommit(ctx context.Context, preTable *Table, for _, man := range mans { filesToDelete[man.FilePath()] = struct{}{} - entries, err := man.FetchEntries(prefs, false) - if err != nil { - return err - } - - for _, entry := range entries { + for entry, err := range man.Entries(prefs, false) { + if err != nil { + return err + } filesToDelete[entry.DataFile().FilePath()] = struct{}{} } } @@ -516,12 +514,10 @@ func (u *removeSnapshotsUpdate) PostCommit(ctx context.Context, preTable *Table, for _, man := range mans { delete(filesToDelete, man.FilePath()) - entries, err := man.FetchEntries(prefs, false) - if err != nil { - return err - } - - for _, entry := range entries { + for entry, err := range man.Entries(prefs, false) { + if err != nil { + return err + } if entry.Status() != iceberg.EntryStatusDELETED { delete(filesToDelete, entry.DataFile().FilePath()) } From 21bd9c9ffcd7f86523b4f4ecc178a915dfdeef5a Mon Sep 17 00:00:00 2001 From: Artem Alperin Date: Tue, 5 May 2026 14:58:44 +0000 Subject: [PATCH 2/3] Partial fix of issues --- catalog/rest/rest_integration_test.go | 7 +++++-- catalog/sql/sql_integration_test.go | 7 +++++-- cmd/iceberg/output.go | 10 ++++++++-- manifest.go | 11 +++++++---- manifest_test.go | 7 +++++-- table/orphan_cleanup_integration_test.go | 6 ++---- table/rewrite_data_files_test.go | 10 ++++------ table/row_delta_test.go | 6 ++---- table/scanner.go | 3 ++- table/snapshot_producers.go | 15 ++++++++++++++- table/table_test.go | 13 +++++++------ 11 files changed, 61 insertions(+), 34 deletions(-) diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index c0a7d2179..9394fa83a 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -351,8 +351,11 @@ func (s *RestIntegrationSuite) TestWriteCommitTable() { s.Len(mf, 1) s.EqualValues(1, mf[0].AddedDataFiles()) - entries, err := mf[0].FetchEntries(mustFS(s.T(), updated), false) - s.Require().NoError(err) + entries := make([]iceberg.ManifestEntry, 0, 1) + for entry, err := range mf[0].Entries(mustFS(s.T(), updated), false) { + s.Require().NoError(err) + entries = append(entries, entry) + } s.Len(entries, 1) s.Equal(pqfile, entries[0].DataFile().FilePath()) diff --git a/catalog/sql/sql_integration_test.go b/catalog/sql/sql_integration_test.go index 82f83bbac..9f8a27932 100644 --- a/catalog/sql/sql_integration_test.go +++ b/catalog/sql/sql_integration_test.go @@ -313,8 +313,11 @@ func (s *SQLIntegrationSuite) TestWriteCommitTable() { updatedFS, err := updated.FS(s.ctx) s.Require().NoError(err) - entries, err := mf[0].FetchEntries(updatedFS, false) - s.Require().NoError(err) + entries := make([]iceberg.ManifestEntry, 0, 1) + for entry, err := range mf[0].Entries(updatedFS, false) { + s.Require().NoError(err) + entries = append(entries, entry) + } s.Len(entries, 1) s.Equal(pqfile, entries[0].DataFile().FilePath()) diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go index 813239a45..32a3a1016 100644 --- a/cmd/iceberg/output.go +++ b/cmd/iceberg/output.go @@ -150,15 +150,21 @@ func (t textOutput) Files(tbl *table.Table, history bool) { snapshotTree = append(snapshotTree, pterm.LeveledListItem{ Level: 1, Text: "Manifest: " + m.FilePath(), }) + var iterErr error for e, err := range m.Entries(afs, false) { if err != nil { - t.Error(err) - os.Exit(1) + iterErr = err + + break } snapshotTree = append(snapshotTree, pterm.LeveledListItem{ Level: 2, Text: "Datafile: " + e.DataFile().FilePath(), }) } + if iterErr != nil { + t.Error(iterErr) + os.Exit(1) + } } } diff --git a/manifest.go b/manifest.go index 9e2531233..400da9e22 100644 --- a/manifest.go +++ b/manifest.go @@ -353,7 +353,7 @@ func (m *manifestFile) Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[Manif _ = f.Close() }() - for entry, err := range IterManifest(m, f, discardDeleted) { + for entry, err := range iterManifest(m, f, discardDeleted) { if !yield(entry, err) { return } @@ -492,6 +492,9 @@ type ManifestFile interface { // manifest entries using the provided file system IO interface. // If discardDeleted is true, entries for files containing deleted rows // will be skipped. + // + // Deprecated: Use Entries instead, which streams manifest entries via an + // iterator and avoids loading every entry into memory at once. FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) // // WriteEntries writes a list of manifest entries to a provided // // io.Writer. The version of the manifest file is used to determine the @@ -776,10 +779,10 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) { return tmp, nil } -// IterManifest returns an iterator that streams manifest entries from +// iterManifest returns an iterator that streams manifest entries from // the provided reader without buffering them. If discardDeleted is true, // entries whose status is "deleted" are skipped. -func IterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[ManifestEntry, error] { +func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[ManifestEntry, error] { return func(yield func(ManifestEntry, error) bool) { manifestReader, err := NewManifestReader(m, f) if err != nil { @@ -816,7 +819,7 @@ func IterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[Ma // is true, the returned slice omits entries whose status is "deleted". func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) ([]ManifestEntry, error) { var results []ManifestEntry - for entry, err := range IterManifest(m, f, discardDeleted) { + for entry, err := range iterManifest(m, f, discardDeleted) { if err != nil { return results, err } diff --git a/manifest_test.go b/manifest_test.go index 64407b749..be5e31f07 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -554,8 +554,11 @@ func (m *ManifestTestSuite) TestManifestEntriesV1() { Contents: bytes.NewReader(m.v1ManifestEntries.Bytes()), }, nil) defer mockfs.AssertExpectations(m.T()) - entries, err := manifest.FetchEntries(&mockfs, false) - m.Require().NoError(err) + entries := make([]ManifestEntry, 0, 2) + for entry, err := range manifest.Entries(&mockfs, false) { + m.Require().NoError(err) + entries = append(entries, entry) + } m.Len(entries, 2) m.Zero(manifest.PartitionSpecID()) m.Zero(manifest.SnapshotID()) diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index cce3fd0bb..a9734b954 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -285,10 +285,8 @@ func (s *OrphanCleanupIntegrationSuite) TestOrphanCleanupActualDeletion() { for manifest, err := range tbl.AllManifests(s.ctx) { s.Require().NoError(err) - entries, err := manifest.FetchEntries(fs, false) - s.Require().NoError(err) - - for _, entry := range entries { + for entry, err := range manifest.Entries(fs, false) { + s.Require().NoError(err) dataFile := entry.DataFile().FilePath() s.True(s.fileExists(fs, dataFile), "Data file should still exist: %s", dataFile) } diff --git a/table/rewrite_data_files_test.go b/table/rewrite_data_files_test.go index bdf7a6ea8..ce12a8fd9 100644 --- a/table/rewrite_data_files_test.go +++ b/table/rewrite_data_files_test.go @@ -569,9 +569,8 @@ func snapshotContainsDeleteFile(t *testing.T, tbl *table.Table, path string) boo if m.ManifestContent() != iceberg.ManifestContentDeletes { continue } - entries, err := m.FetchEntries(fs, false) - require.NoError(t, err) - for _, e := range entries { + for e, err := range m.Entries(fs, false) { + require.NoError(t, err) if e.Status() == iceberg.EntryStatusDELETED { continue } @@ -656,9 +655,8 @@ func readKeySeqNums(t *testing.T, tbl *table.Table) (eqDelete, smallMin, preserv smallMin = int64(1<<62 - 1) for _, m := range manifests { - entries, err := m.FetchEntries(fs, false) - require.NoError(t, err) - for _, e := range entries { + for e, err := range m.Entries(fs, false) { + require.NoError(t, err) if e.Status() == iceberg.EntryStatusDELETED { continue } diff --git a/table/row_delta_test.go b/table/row_delta_test.go index 054895610..107fbd632 100644 --- a/table/row_delta_test.go +++ b/table/row_delta_test.go @@ -381,10 +381,8 @@ func TestRowDeltaManifestContents(t *testing.T) { // Verify manifest entries have correct content types for _, m := range manifests { - entries, err := m.FetchEntries(fs, true) - require.NoError(t, err) - - for _, e := range entries { + for e, err := range m.Entries(fs, true) { + require.NoError(t, err) if m.ManifestContent() == iceberg.ManifestContentData { assert.Equal(t, iceberg.EntryContentData, e.DataFile().ContentType()) } else { diff --git a/table/scanner.go b/table/scanner.go index d95fae568..7263c028d 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -153,7 +153,8 @@ func GetPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.Struct func openManifest(io io.IO, manifest iceberg.ManifestFile, partitionFilter, metricsEval func(iceberg.DataFile) (bool, error), ) ([]iceberg.ManifestEntry, error) { - var out []iceberg.ManifestEntry + // Counts may be -1 (unset) on V1 manifests, so clamp before allocating. + out := make([]iceberg.ManifestEntry, 0, max(0, int(manifest.AddedDataFiles())+int(manifest.ExistingDataFiles()))) for entry, err := range manifest.Entries(io, true) { if err != nil { return nil, err diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 2aaaabaa5..488e72d3c 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -609,7 +609,20 @@ func (sp *snapshotProducer) newManifestOutput() (io.WriteCloser, string, error) } func (sp *snapshotProducer) fetchManifestEntry(m iceberg.ManifestFile, discardDeleted bool) ([]iceberg.ManifestEntry, error) { - return m.FetchEntries(sp.io, discardDeleted) + capacity := int(m.AddedDataFiles()) + int(m.ExistingDataFiles()) + if !discardDeleted { + capacity += int(m.DeletedDataFiles()) + } + // Counts may be -1 (unset) on V1 manifests, so clamp before allocating. + entries := make([]iceberg.ManifestEntry, 0, max(0, capacity)) + for entry, err := range m.Entries(sp.io, discardDeleted) { + if err != nil { + return nil, err + } + entries = append(entries, entry) + } + + return entries, nil } func (sp *snapshotProducer) iterManifestEntries(m iceberg.ManifestFile, discardDeleted bool) iter.Seq2[iceberg.ManifestEntry, error] { diff --git a/table/table_test.go b/table/table_test.go index 6913ea4bf..f10c80199 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -662,10 +662,8 @@ func (t *TableWritingTestSuite) TestAddFilesPartitionedTable() { t.Require().NoError(err) for _, manifest := range m { - entries, err := manifest.FetchEntries(mustFS(t.T(), tbl), false) - t.Require().NoError(err) - - for _, e := range entries { + for e, err := range manifest.Entries(mustFS(t.T(), tbl), false) { + t.Require().NoError(err) t.Equal(map[int]any{ 1000: int32(123), 1001: int32(650), }, e.DataFile().Partition()) @@ -2333,8 +2331,11 @@ func (t *TableWritingTestSuite) TestMergeManifests() { t.Len(manifestList, 1) t.validateManifestFileLength(mustFS(t.T(), tblA), manifestList[0]) - entries, err := manifestList[0].FetchEntries(mustFS(t.T(), tblA), false) - t.Require().NoError(err) + entries := make([]iceberg.ManifestEntry, 0, 3) + for entry, err := range manifestList[0].Entries(mustFS(t.T(), tblA), false) { + t.Require().NoError(err) + entries = append(entries, entry) + } t.Len(entries, 3) // entries should match the snapshot ID they were added in From d88b4f6e890f2c8a3408f5138865de12ba2e7e76 Mon Sep 17 00:00:00 2001 From: Artem Alperin Date: Tue, 5 May 2026 19:11:07 +0000 Subject: [PATCH 3/3] Fix iterator --- manifest.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/manifest.go b/manifest.go index 400da9e22..82de7c1c5 100644 --- a/manifest.go +++ b/manifest.go @@ -349,12 +349,17 @@ func (m *manifestFile) Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[Manif return } + aborted := false defer func() { - _ = f.Close() + if cerr := f.Close(); cerr != nil && !aborted { + yield(nil, cerr) + } }() for entry, err := range iterManifest(m, f, discardDeleted) { if !yield(entry, err) { + aborted = true + return } } @@ -790,8 +795,11 @@ func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[Ma return } + aborted := false defer func() { - _ = manifestReader.Close() + if cerr := manifestReader.Close(); cerr != nil && !aborted { + yield(nil, cerr) + } }() for { @@ -800,7 +808,9 @@ func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[Ma if errors.Is(err, io.EOF) { return } - yield(nil, err) + if !yield(nil, err) { + aborted = true + } return } @@ -808,6 +818,8 @@ func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[Ma continue } if !yield(entry, nil) { + aborted = true + return } }