From 1d10f6b855f42478d20b76ad34c3a27e0a0f58d8 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Mon, 30 Mar 2026 14:31:15 +0300 Subject: [PATCH 1/5] Implement futures unordered when checking --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..8c6c5f1501 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; @@ -175,10 +177,15 @@ impl<'a> SnapshotProducer<'a> { let manifest_list = current_snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry - .load_manifest(self.table.file_io()) - .await?; + + let file_io = self.table.file_io(); + let mut manifest_futures: FuturesUnordered<_> = manifest_list + .entries() + .iter() + .map(|entry| entry.load_manifest(file_io)) + .collect(); + + while let Some(manifest) = manifest_futures.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From 65da6db877da228882f9936761d70d6f927e615e Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 31 Mar 2026 13:52:42 +0300 Subject: [PATCH 2/5] Change to task per fetch --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8c6c5f1501..7ff421a3e2 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,6 +24,7 @@ use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; +use crate::runtime::spawn; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, @@ -178,14 +179,16 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let file_io = self.table.file_io(); - let mut manifest_futures: FuturesUnordered<_> = manifest_list - .entries() - .iter() - .map(|entry| entry.load_manifest(file_io)) + let mut manifest_tasks: FuturesUnordered<_> = manifest_list + .consume_entries() + .into_iter() + .map(|entry| { + let file_io = self.table.file_io().clone(); + spawn(async move { entry.load_manifest(&file_io).await }) + }) .collect(); - while let Some(manifest) = manifest_futures.try_next().await? { + while let Some(manifest) = manifest_tasks.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From b9d544fc2bce9b0a036d530b32a66c03e6434ffc Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Wed, 1 Apr 2026 11:31:07 +0300 Subject: [PATCH 3/5] Move to buffered and threads --- crates/iceberg/src/transaction/snapshot.rs | 27 +++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 7ff421a3e2..988bbf3ebc 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,8 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::TryStreamExt; -use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; use uuid::Uuid; use crate::error::Result; @@ -179,23 +178,23 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let mut manifest_tasks: FuturesUnordered<_> = manifest_list - .consume_entries() - .into_iter() + let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect(); + futures::stream::iter(entries) .map(|entry| { let file_io = self.table.file_io().clone(); spawn(async move { entry.load_manifest(&file_io).await }) }) - .collect(); - - while let Some(manifest) = manifest_tasks.try_next().await? { - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + .buffer_unordered(32) + .try_for_each(|manifest| { + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } } - } - } + std::future::ready(Ok(())) + }) + .await?; } if !referenced_files.is_empty() { From 5a91720bef5f17d9d4265c087b8ba4ced261737f Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 5 May 2026 10:09:44 +0300 Subject: [PATCH 4/5] Move num threads to constant --- crates/iceberg/src/transaction/snapshot.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 988bbf3ebc..f4c2f5a9f7 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -35,6 +35,7 @@ use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; /// A trait that defines how different table operations produce new snapshots. /// @@ -184,7 +185,7 @@ impl<'a> SnapshotProducer<'a> { let file_io = self.table.file_io().clone(); spawn(async move { entry.load_manifest(&file_io).await }) }) - .buffer_unordered(32) + .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES) .try_for_each(|manifest| { for entry in manifest.entries() { let file_path = entry.file_path(); From 84f65d2fb973669c32b3895bd5495adef708d144 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 5 May 2026 14:44:52 +0300 Subject: [PATCH 5/5] Add docstring to explain the motivation of the variable and its value --- crates/iceberg/src/transaction/snapshot.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index f4c2f5a9f7..6a0feb8aac 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -35,6 +35,8 @@ use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +/// Control the number of threads used to verify duplicate files. +/// This needs to balance the degree of parallelism and resource utilisation. const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; /// A trait that defines how different table operations produce new snapshots.