Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions catalog/rest/rest_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@ func (s *RestIntegrationSuite) TestWriteCommitTable() {

s.Len(mf, 1)
s.EqualValues(1, mf[0].AddedDataFiles())
entries, err := mf[0].FetchEntries(mustFS(s.T(), updated), false)
s.Require().NoError(err)
entries := make([]iceberg.ManifestEntry, 0, 1)
for entry, err := range mf[0].Entries(mustFS(s.T(), updated), false) {
s.Require().NoError(err)
entries = append(entries, entry)
}

s.Len(entries, 1)
s.Equal(pqfile, entries[0].DataFile().FilePath())
Expand Down
7 changes: 5 additions & 2 deletions catalog/sql/sql_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ func (s *SQLIntegrationSuite) TestWriteCommitTable() {
updatedFS, err := updated.FS(s.ctx)
s.Require().NoError(err)

entries, err := mf[0].FetchEntries(updatedFS, false)
s.Require().NoError(err)
entries := make([]iceberg.ManifestEntry, 0, 1)
for entry, err := range mf[0].Entries(updatedFS, false) {
s.Require().NoError(err)
entries = append(entries, entry)
}

s.Len(entries, 1)
s.Equal(pqfile, entries[0].DataFile().FilePath())
Expand Down
17 changes: 11 additions & 6 deletions cmd/iceberg/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,21 @@ func (t textOutput) Files(tbl *table.Table, history bool) {
snapshotTree = append(snapshotTree, pterm.LeveledListItem{
Level: 1, Text: "Manifest: " + m.FilePath(),
})
datafiles, err := m.FetchEntries(afs, false)
if err != nil {
t.Error(err)
os.Exit(1)
}
for _, e := range datafiles {
var iterErr error
for e, err := range m.Entries(afs, false) {
if err != nil {
iterErr = err

break
}
snapshotTree = append(snapshotTree, pterm.LeveledListItem{
Level: 2, Text: "Datafile: " + e.DataFile().FilePath(),
})
}
if iterErr != nil {
t.Error(iterErr)
os.Exit(1)
}
}
}

Expand Down
119 changes: 90 additions & 29 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"iter"
"math"
"math/big"
"reflect"
Expand Down Expand Up @@ -339,8 +340,40 @@ func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue }

func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)

func (m *manifestFile) Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, error] {
return func(yield func(ManifestEntry, error) bool) {
f, err := fs.Open(m.FilePath())
if err != nil {
yield(nil, err)

return
}
aborted := false
defer func() {
if cerr := f.Close(); cerr != nil && !aborted {
yield(nil, cerr)
}
}()

for entry, err := range iterManifest(m, f, discardDeleted) {
if !yield(entry, err) {
aborted = true

return
}
}
}
}

func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) (_ []ManifestEntry, err error) {
f, openErr := fs.Open(m.FilePath())
if openErr != nil {
return nil, openErr
}
defer internal.CheckedClose(f, &err)

return ReadManifest(m, f, discardDeleted)
}

func getFieldIDMap(sc *avro.Schema) (map[string]int, map[int]string, map[int]int) {
Expand Down Expand Up @@ -395,16 +428,6 @@ type hasFieldToIDMap interface {
setFieldIDToFixedSizeMap(map[int]int)
}

func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) (_ []ManifestEntry, err error) {
f, err := fs.Open(m.FilePath())
if err != nil {
return nil, err
}
defer internal.CheckedClose(f, &err)

return ReadManifest(m, f, discardDeleted)
}

// ManifestFile is the interface which covers both V1 and V2 manifest files.
type ManifestFile interface {
// Version returns the version number of this manifest file.
Expand Down Expand Up @@ -463,10 +486,20 @@ type ManifestFile interface {
HasAddedFiles() bool
// HasExistingFiles returns true if ExistingDataFiles > 0 or if it was null.
HasExistingFiles() bool
// Entries streams the manifest entries from the manifest file using
// the provided file system IO interface. Entries that have been
// marked as deleted are skipped if discardDeleted is true.
//
// Prefer Entries over FetchEntries when walking large manifests
// since it avoids loading every entry into memory at once.
Entries(fs iceio.IO, discardDeleted bool) iter.Seq2[ManifestEntry, error]
Comment thread
hdnpth marked this conversation as resolved.
// FetchEntries reads the manifest list file to fetch the list of
// manifest entries using the provided file system IO interface.
// If discardDeleted is true, entries for files containing deleted rows
// will be skipped.
//
// Deprecated: Use Entries instead, which streams manifest entries via an
// iterator and avoids loading every entry into memory at once.
FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error)
// // WriteEntries writes a list of manifest entries to a provided
// // io.Writer. The version of the manifest file is used to determine the
Expand Down Expand Up @@ -751,33 +784,61 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
return tmp, nil
}

// iterManifest returns an iterator that streams manifest entries from
// the provided reader without buffering them. If discardDeleted is true,
// entries whose status is "deleted" are skipped.
func iterManifest(m ManifestFile, f io.Reader, discardDeleted bool) iter.Seq2[ManifestEntry, error] {
return func(yield func(ManifestEntry, error) bool) {
manifestReader, err := NewManifestReader(m, f)
if err != nil {
yield(nil, err)

return
}
aborted := false
defer func() {
if cerr := manifestReader.Close(); cerr != nil && !aborted {
yield(nil, cerr)
}
}()

for {
entry, err := manifestReader.ReadEntry()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if !yield(nil, err) {
aborted = true
}

return
}
if discardDeleted && entry.Status() == EntryStatusDELETED {
continue
}
if !yield(entry, nil) {
aborted = true

return
}
}
}
}

// ReadManifest reads in an avro list file and returns a slice
// of manifest entries or an error if one is encountered. If discardDeleted
// is true, the returned slice omits entries whose status is "deleted".
func ReadManifest(m ManifestFile, f io.Reader, discardDeleted bool) ([]ManifestEntry, error) {
manifestReader, err := NewManifestReader(m, f)
if err != nil {
return nil, err
}
defer func() {
_ = manifestReader.Close()
}()

var results []ManifestEntry
for {
entry, err := manifestReader.ReadEntry()
for entry, err := range iterManifest(m, f, discardDeleted) {
if err != nil {
if errors.Is(err, io.EOF) {
return results, nil
}

return results, err
}
if discardDeleted && entry.Status() == EntryStatusDELETED {
continue
}
results = append(results, entry)
}

return results, nil
}

// ReadManifestList reads in an avro manifest list file and returns a slice
Expand Down
7 changes: 5 additions & 2 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,11 @@ func (m *ManifestTestSuite) TestManifestEntriesV1() {
Contents: bytes.NewReader(m.v1ManifestEntries.Bytes()),
}, nil)
defer mockfs.AssertExpectations(m.T())
entries, err := manifest.FetchEntries(&mockfs, false)
m.Require().NoError(err)
entries := make([]ManifestEntry, 0, 2)
for entry, err := range manifest.Entries(&mockfs, false) {
m.Require().NoError(err)
entries = append(entries, entry)
}
m.Len(entries, 2)
m.Zero(manifest.PartitionSpecID())
m.Zero(manifest.SnapshotID())
Expand Down
18 changes: 8 additions & 10 deletions table/compaction/eq_delete_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,10 @@ func CollectDeadEqualityDeletes(
if m.ManifestContent() != iceberg.ManifestContentDeletes {
continue
}
entries, err := m.FetchEntries(fs, true)
if err != nil {
return nil, err
}
for _, e := range entries {
for e, err := range m.Entries(fs, true) {
if err != nil {
return nil, err
}
if e.DataFile().ContentType() != iceberg.EntryContentEqDeletes {
continue
}
Expand All @@ -95,11 +94,10 @@ func CollectDeadEqualityDeletes(
if m.ManifestContent() != iceberg.ManifestContentData {
continue
}
entries, err := m.FetchEntries(fs, true)
if err != nil {
return nil, err
}
for _, e := range entries {
for e, err := range m.Entries(fs, true) {
if err != nil {
return nil, err
}
df := e.DataFile()
if df.ContentType() != iceberg.EntryContentData {
continue
Expand Down
9 changes: 4 additions & 5 deletions table/conflict_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,10 @@ func validateAddedDataFilesMatchingFilter(ctx *conflictContext, filter iceberg.B
}

pEval := partitionEvals.Get(int(mf.PartitionSpecID()))
entries, err := mf.FetchEntries(ctx.fs, false)
if err != nil {
return fmt.Errorf("reading entries from manifest %s: %w", mf.FilePath(), err)
}
for _, e := range entries {
for e, err := range mf.Entries(ctx.fs, false) {
if err != nil {
return fmt.Errorf("reading entries from manifest %s: %w", mf.FilePath(), err)
}
if e.Status() != iceberg.EntryStatusADDED || e.SnapshotID() != snap.SnapshotID {
continue
}
Expand Down
10 changes: 4 additions & 6 deletions table/orphan_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) {
for _, manifest := range manifestFiles {
referenced[manifest.FilePath()] = true

entries, err := manifest.FetchEntries(fs, false)
if err != nil {
return nil, fmt.Errorf("failed to read manifest entries: %w", err)
}

for _, entry := range entries {
for entry, err := range manifest.Entries(fs, false) {
if err != nil {
return nil, fmt.Errorf("failed to read manifest entries: %w", err)
}
referenced[entry.DataFile().FilePath()] = true
}
}
Expand Down
6 changes: 2 additions & 4 deletions table/orphan_cleanup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,8 @@ func (s *OrphanCleanupIntegrationSuite) TestOrphanCleanupActualDeletion() {

for manifest, err := range tbl.AllManifests(s.ctx) {
s.Require().NoError(err)
entries, err := manifest.FetchEntries(fs, false)
s.Require().NoError(err)

for _, entry := range entries {
for entry, err := range manifest.Entries(fs, false) {
s.Require().NoError(err)
dataFile := entry.DataFile().FilePath()
s.True(s.fileExists(fs, dataFile), "Data file should still exist: %s", dataFile)
}
Expand Down
10 changes: 4 additions & 6 deletions table/rewrite_data_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,8 @@ func snapshotContainsDeleteFile(t *testing.T, tbl *table.Table, path string) boo
if m.ManifestContent() != iceberg.ManifestContentDeletes {
continue
}
entries, err := m.FetchEntries(fs, false)
require.NoError(t, err)
for _, e := range entries {
for e, err := range m.Entries(fs, false) {
require.NoError(t, err)
if e.Status() == iceberg.EntryStatusDELETED {
continue
}
Expand Down Expand Up @@ -656,9 +655,8 @@ func readKeySeqNums(t *testing.T, tbl *table.Table) (eqDelete, smallMin, preserv

smallMin = int64(1<<62 - 1)
for _, m := range manifests {
entries, err := m.FetchEntries(fs, false)
require.NoError(t, err)
for _, e := range entries {
for e, err := range m.Entries(fs, false) {
require.NoError(t, err)
if e.Status() == iceberg.EntryStatusDELETED {
continue
}
Expand Down
6 changes: 2 additions & 4 deletions table/row_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,8 @@ func TestRowDeltaManifestContents(t *testing.T) {

// Verify manifest entries have correct content types
for _, m := range manifests {
entries, err := m.FetchEntries(fs, true)
require.NoError(t, err)

for _, e := range entries {
for e, err := range m.Entries(fs, true) {
require.NoError(t, err)
if m.ManifestContent() == iceberg.ManifestContentData {
assert.Equal(t, iceberg.EntryContentData, e.DataFile().ContentType())
} else {
Expand Down
12 changes: 6 additions & 6 deletions table/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ func GetPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.Struct
func openManifest(io io.IO, manifest iceberg.ManifestFile,
partitionFilter, metricsEval func(iceberg.DataFile) (bool, error),
) ([]iceberg.ManifestEntry, error) {
entries, err := manifest.FetchEntries(io, true)
if err != nil {
return nil, err
}
// Counts may be -1 (unset) on V1 manifests, so clamp before allocating.
out := make([]iceberg.ManifestEntry, 0, max(0, int(manifest.AddedDataFiles())+int(manifest.ExistingDataFiles())))
for entry, err := range manifest.Entries(io, true) {
if err != nil {
return nil, err
}

out := make([]iceberg.ManifestEntry, 0, len(entries))
for _, entry := range entries {
p, err := partitionFilter(entry.DataFile())
if err != nil {
return nil, err
Expand Down
Loading
Loading