Skip to content
32 changes: 22 additions & 10 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::RangeFrom;

use futures::{StreamExt, TryStreamExt};
use uuid::Uuid;

use crate::error::Result;
use crate::runtime::spawn;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot,
Expand All @@ -33,6 +35,9 @@ use crate::transaction::ActionCommit;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};

const META_ROOT_PATH: &str = "metadata";
/// Control the number of threads used to verify duplicate files.
/// This needs to balance the degree of parallelism and resource utilisation.
const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a doc suggesting why it is 32.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


/// A trait that defines how different table operations produce new snapshots.
///
Expand Down Expand Up @@ -175,17 +180,24 @@ impl<'a> SnapshotProducer<'a> {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
let manifest = manifest_list_entry
.load_manifest(self.table.file_io())
.await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
referenced_files.push(file_path.to_string());

let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect();
futures::stream::iter(entries)
.map(|entry| {
let file_io = self.table.file_io().clone();
spawn(async move { entry.load_manifest(&file_io).await })
})
.buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES)
.try_for_each(|manifest| {
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
referenced_files.push(file_path.to_string());
}
}
}
}
std::future::ready(Ok(()))
})
.await?;
}

if !referenced_files.is_empty() {
Expand Down
Loading