Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions diskann-disk/src/build/builder/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use diskann_providers::{
use diskann_utils::io::{read_bin, write_bin};
use diskann_utils::views::MatrixView;
use tokio::task::JoinSet;
use tracing::{debug, info};

use crate::{
build::{
Expand Down Expand Up @@ -61,6 +60,8 @@ use crate::{
},
DiskIndexBuildParameters, QuantizationType,
};
use diskann::tracked_debug;
use diskann::tracked_info;

/// Disk index builder that composes with DiskIndexBuilderCore.
pub struct DiskIndexBuilder<'a, Data, StorageProvider>
Expand Down Expand Up @@ -175,7 +176,7 @@ where
storage_provider: &StorageProvider,
checkpoint_record_manager: &mut dyn CheckpointManager,
) -> ANNResult<BuildQuantizer> {
info!(
tracked_info!(
"Training quantizer for {} quantized builds.",
build_quantization_type.to_string()
);
Expand All @@ -193,7 +194,7 @@ where
)
},
|| {
info!(
tracked_info!(
"Skipping quantizer training, instead loading from already trained quantizer saved in the file system.",
);
BuildQuantizer::load(
Expand All @@ -210,7 +211,7 @@ where
runtime.block_on(async {
match self.build_internal().await {
Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => {
info!(
tracked_info!(
"Index build was interrupted by continuation_checker, progress saved for resumption"
);
Ok(()) // Return success for controlled interruptions
Expand All @@ -225,7 +226,7 @@ where

let pool = create_thread_pool(self.index_configuration.num_threads)?;

info!(
tracked_info!(
"Starting index build: R={} L={} Indexing RAM budget={} T={}",
self.index_configuration.config.pruned_degree(),
self.index_configuration.config.l_build(),
Expand All @@ -252,7 +253,7 @@ where

let storage_provider = self.core.storage_provider;

info!(
tracked_info!(
"Compressing data into {} bytes per vector for disk search",
num_chunks.get()
);
Expand All @@ -266,7 +267,7 @@ where
let offset = match checkpoint_context.get_resumption_point()? {
Some(offset) => offset,
None => {
info!("Skip the DataCompression");
tracked_info!("Skip the DataCompression");
return Ok(());
}
};
Expand Down Expand Up @@ -362,9 +363,10 @@ where
let offset = match checkpoint_context.get_resumption_point()? {
Some(offset) => offset,
None => {
info!(
tracked_info!(
"[Stage:{:?}] Skip build_shard_index for shard {} - no valid checkpoint exists",
stage, shard_id
stage,
shard_id
);
return Ok(());
}
Expand All @@ -383,11 +385,12 @@ where
&shard_ids_file,
&shard_base_file,
)?;
info!("[Stage:{:?}] Generate data for shard {}", stage, shard_id);
tracked_info!("[Stage:{:?}] Generate data for shard {}", stage, shard_id);
} else {
info!(
tracked_info!(
"[Stage:{:?}] Resume shard {} build with existing data",
stage, shard_id
stage,
shard_id
);
}

Expand Down Expand Up @@ -440,7 +443,7 @@ where
let offset = match checkpoint_context.get_resumption_point()? {
Some(offset) => offset,
None => {
info!(
tracked_info!(
"[Stage:{:?}] Skip in-memory index build - no valid checkpoint exists",
stage
);
Expand Down Expand Up @@ -558,18 +561,18 @@ where
#[cfg(debug_assertions)]
/// Log statistics about the build process
async fn log_build_stats<T: VectorRepr>(index: &Arc<dyn InmemIndexBuilder<T>>) -> ANNResult<()> {
debug!(
tracked_debug!(
"Number of points reachable in the graph: {}",
index.count_reachable_nodes().await?
);

let (full_vector, quant_vector) = index.counts_for_get_vector();
let capacity = index.capacity();
debug!(
tracked_debug!(
"Number of get vector calls per insert: {}",
full_vector as f32 / capacity as f32
);
debug!(
tracked_debug!(
"Number of get quantized vector calls per insert: {}",
quant_vector as f32 / capacity as f32
);
Expand All @@ -593,7 +596,7 @@ where

index.set_start_point(medoid.as_slice())?;

debug!("Set start point to medoid ID: {}", medoid_id);
tracked_debug!("Set start point to medoid ID: {}", medoid_id);

Ok(medoid_id)
}
Expand Down Expand Up @@ -627,15 +630,15 @@ where

match progress {
Progress::Processed(num_points) => {
info!(
tracked_info!(
"Linked #{} points. Start #{}, end #{} ",
num_points,
offset,
num_points + offset
);
}
Progress::Completed => {
info!("Linked all points. Num points: #{}", total_points);
tracked_info!("Linked all points. Num points: #{}", total_points);
}
}

Expand All @@ -655,7 +658,7 @@ where
T: VectorRepr,
Iter: Iterator<Item = (usize, (Box<[T]>, ()))> + Send + 'static,
{
debug!("Processing chunk from #{} to #{}", start, end);
tracked_debug!("Processing chunk from #{} to #{}", start, end);

let partitions = async_tools::PartitionIter::new(end - start, num_tasks);

Expand Down Expand Up @@ -690,7 +693,7 @@ where
res.map_err(|_| ANNError::log_index_error("A spawned insert task failed"))??;
}

debug!("Completed chunk #{} to #{}", start, end);
tracked_debug!("Completed chunk #{} to #{}", start, end);
Ok(())
}

Expand Down Expand Up @@ -853,7 +856,7 @@ where

for file in files.iter() {
if self.storage_provider.exists(file) {
debug!("Deleting temporary file: {}", file);
tracked_debug!("Deleting temporary file: {}", file);
self.storage_provider.delete(file)?;
}
}
Expand Down Expand Up @@ -885,7 +888,7 @@ impl StartPoint {
ANNError::log_invalid_file_format(format!("Start point ID file {} is empty", path))
})?;

debug!("Loaded start point ID {} from {}", *start_point_id, path);
tracked_debug!("Loaded start point ID {} from {}", *start_point_id, path);
Ok(Self(*start_point_id))
}

Expand All @@ -897,7 +900,7 @@ impl StartPoint {
MatrixView::row_vector(std::slice::from_ref(&self.0)),
&mut storage_provider.create_for_write(path)?,
)?;
debug!("Saved start point ID {} to {}", self.0, path);
tracked_debug!("Saved start point ID {} to {}", self.0, path);
Ok(())
}

Expand Down
39 changes: 21 additions & 18 deletions diskann-disk/src/build/builder/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use diskann_providers::{
};
use diskann_utils::io::read_bin;
use rand::{seq::SliceRandom, Rng};
use tracing::info;

use crate::{
build::chunking::{
Expand All @@ -31,6 +30,7 @@ use crate::{
utils::partition_with_ram_budget,
DiskIndexBuildParameters, QuantizationType,
};
use diskann::tracked_info;

/// Overhead factor for RAM estimation during index build (10% buffer).
const OVERHEAD_FACTOR: f64 = 1.1f64;
Expand Down Expand Up @@ -146,7 +146,7 @@ where
let storage_provider = self.storage_provider;
let shard_ids = read_bin::<u32>(&mut storage_provider.open_reader(shard_ids_file)?)?;
let shard_size = shard_ids.nrows();
info!("Loaded {} shard ids from {}", shard_size, shard_ids_file);
tracked_info!("Loaded {} shard ids from {}", shard_size, shard_ids_file);
let max_id = shard_ids.as_slice().iter().max().copied().unwrap_or(0);
let sampling_rate = shard_ids.as_slice().len() as f64 / (max_id + 1) as f64;

Expand Down Expand Up @@ -177,9 +177,10 @@ where
Ok(())
})?;

info!(
tracked_info!(
"Written file: {} with {} points",
shard_base_file, num_written
shard_base_file,
num_written
);

shard_base_cached_writer.flush()?;
Expand Down Expand Up @@ -215,19 +216,19 @@ where
// find max node id
let num_nodes: u32 = *id_maps.iter().flatten().max().unwrap_or(&0) + 1;
let num_elements: u32 = id_maps.iter().map(|idmap| idmap.len() as u32).sum();
info!("# nodes: {}, max degree: {}", num_nodes, max_degree);
tracked_info!("# nodes: {}, max degree: {}", num_nodes, max_degree);

// compute inverse map: node -> shards
let mut node_shard: Vec<(u32, u32)> = Vec::with_capacity(num_elements as usize);
for (shard, id_map) in id_maps.iter().enumerate() {
info!("Creating inverse map -- shard #{}", shard);
tracked_info!("Creating inverse map -- shard #{}", shard);
node_shard.extend(id_map.iter().map(|node_id| (*node_id, shard as u32)));
}
node_shard.sort_unstable_by(|left, right| {
left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1))
});

info!("Finished computing node -> shards map");
tracked_info!("Finished computing node -> shards map");

// create cached vamana readers
let mut vamana_readers = Vec::new();
Expand Down Expand Up @@ -270,9 +271,10 @@ where

// write max_degree to merged_vamana_index
let output_width: u32 = max_degree;
info!(
tracked_info!(
"Max input width: {}, output width: {}",
max_input_width, output_width
max_input_width,
output_width
);

merged_vamana_cached_writer.write(&output_width.to_le_bytes())?;
Expand Down Expand Up @@ -300,7 +302,7 @@ where
// Hence the final index will also not have any frozen points.
merged_vamana_cached_writer.write(&vamana_index_frozen.to_le_bytes())?;

info!("Starting merge");
tracked_info!("Starting merge");

let mut nbr_set = vec![false; num_nodes as usize];
let mut final_nbrs: Vec<u32> = Vec::new();
Expand Down Expand Up @@ -334,9 +336,10 @@ where
let num_nbrs = vamana_readers[shard_id as usize].read_u32()?;

if num_nbrs == 0 {
info!(
tracked_info!(
"WARNING: shard #{}, node_id {} has 0 nbrs",
shard_id, node_id
shard_id,
node_id
);
} else {
let mut nbrs_bytes = vec![0u8; num_nbrs as usize * mem::size_of::<u32>()];
Expand Down Expand Up @@ -373,11 +376,11 @@ where
nbr_set.clear();
final_nbrs.clear();

info!("Expected size: {}", merged_index_size);
tracked_info!("Expected size: {}", merged_index_size);
merged_vamana_cached_writer.reset()?;
merged_vamana_cached_writer.write(&merged_index_size.to_le_bytes())?;

info!("Finished merge");
tracked_info!("Finished merge");
Ok(())
}

Expand Down Expand Up @@ -445,21 +448,21 @@ pub(crate) fn determine_build_strategy<Data: GraphDataType>(
build_quantization_type,
);

info!(
tracked_info!(
"Estimated index RAM usage: {} GB, index_build_ram_limit={} GB",
estimated_index_ram_in_bytes / BYTES_IN_GB,
index_build_ram_limit_in_bytes / BYTES_IN_GB
);

if estimated_index_ram_in_bytes >= index_build_ram_limit_in_bytes {
info!(
tracked_info!(
"Insufficient memory budget for index build in one shot, index_build_ram_limit={} GB estimated_index_ram={} GB",
index_build_ram_limit_in_bytes / BYTES_IN_GB,
estimated_index_ram_in_bytes / BYTES_IN_GB,
);
IndexBuildStrategy::Merged
} else {
info!(
tracked_info!(
"Full index fits in RAM budget, should consume at most {} GBs, so building in one shot",
estimated_index_ram_in_bytes / BYTES_IN_GB
);
Expand Down Expand Up @@ -562,7 +565,7 @@ impl<'a> MergedVamanaIndexWorkflow<'a> {
) {
p += 1;
}
info!("Found {} existing partitions from previous run", p);
tracked_info!("Found {} existing partitions from previous run", p);
Ok(p)
},
)
Expand Down
4 changes: 2 additions & 2 deletions diskann-disk/src/build/builder/quantizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use diskann_providers::{
};
use diskann_quantization::scalar::train::ScalarQuantizationParameters;
use diskann_utils::views::MatrixView;
use tracing::info;

use crate::QuantizationType;
use diskann::tracked_info;

/// Quantizer types used specifically for async disk index building.
#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +111,7 @@ impl BuildQuantizer {
MatrixView::try_from(&train_data_vector, train_size, train_dim).bridge_err()?,
);

info!("Now quantizer is trained and saving to file");
tracked_info!("Now quantizer is trained and saving to file");
let sq_storage = SQStorage::new(index_path_prefix);
sq_storage.save_quantizer(&quantizer, storage_provider)?;

Expand Down
Loading
Loading