Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 108 additions & 29 deletions ant-core/src/data/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
use crate::data::client::batch::{PaymentIntent, PreparedChunk};
use crate::data::client::classify_error;
use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
use crate::data::client::merkle::PaymentMode;
use crate::data::client::merkle::{chunk_contents_for_upload_addresses, PaymentMode};
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
Expand Down Expand Up @@ -99,17 +99,64 @@ impl Client {
// Merkle batch payment path
info!("Using merkle batch payment for {chunk_count} chunks");

let addresses: Vec<[u8; 32]> =
chunk_contents.iter().map(|c| compute_address(c)).collect();
let chunk_entries: Vec<([u8; 32], u64)> = chunk_contents
.iter()
.map(|chunk| {
let size = u64::try_from(chunk.len())
.map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
Ok((compute_address(chunk), size))
})
.collect::<Result<Vec<_>>>()?;
let merkle_plan = match self
.plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, None)
.await
{
Ok(plan) => plan,
Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
info!("Merkle preflight needs more peers ({msg}), falling back to wave-batch");
let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
return Ok(DataUploadResult {
data_map,
chunks_stored: addresses.len(),
payment_mode_used: PaymentMode::Single,
});
}
Err(e) => return Err(e),
};

if merkle_plan.to_upload.is_empty() {
info!("All {chunk_count} chunks already stored; skipping merkle payment");
return Ok(DataUploadResult {
data_map,
chunks_stored: chunk_count,
payment_mode_used: PaymentMode::Merkle,
});
}

// Compute average chunk size for quoting
let avg_size =
chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
let chunk_contents =
chunk_contents_for_upload_addresses(chunk_contents, &merkle_plan.to_upload)?;

let remaining_chunks = merkle_plan.to_upload.len();
if !self.should_use_merkle(remaining_chunks, mode) {
info!(
"{remaining_chunks} chunks need upload after merkle preflight; \
using single-node payment"
);
let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
return Ok(DataUploadResult {
data_map,
chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
payment_mode_used: PaymentMode::Single,
});
}

// Try merkle batch; in Auto mode, fall back to per-chunk on network issues
let batch_result = match self
.pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
.pay_for_merkle_batch(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
{
Ok(result) => result,
Expand All @@ -118,15 +165,22 @@ impl Client {
let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
return Ok(DataUploadResult {
data_map,
chunks_stored: addresses.len(),
chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
payment_mode_used: PaymentMode::Single,
});
}
Err(e) => return Err(e),
};

let (chunks_stored, _stats) = self
.merkle_upload_chunks(chunk_contents, addresses, &batch_result, None)
.merkle_upload_chunks(
chunk_contents,
merkle_plan.to_upload,
&batch_result,
None,
merkle_plan.already_stored.len(),
chunk_count,
)
.await?;

info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
Expand Down Expand Up @@ -228,37 +282,60 @@ impl Client {
};

let chunk_count = chunk_contents.len();
let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_contents
.into_iter()
.map(|content| {
let address = compute_address(&content);
(content, address)
})
.collect();

let quote_limiter = self.controller().quote.clone();
let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
.map(|content| {
let limiter = quote_limiter.clone();
async move {
observe_op(
&limiter,
|| async move { self.prepare_chunk_payment(content).await },
classify_error,
)
.await
}
})
.buffer_unordered(quote_concurrency)
.collect()
.await;
let results: Vec<([u8; 32], Result<Option<PreparedChunk>>)> =
futures::stream::iter(chunks_with_addr)
.map(|(content, address)| {
let limiter = quote_limiter.clone();
async move {
let result = observe_op(
&limiter,
|| async move { self.prepare_chunk_payment(content).await },
classify_error,
)
.await;
(address, result)
}
})
.buffer_unordered(quote_concurrency)
.collect()
.await;

let mut prepared_chunks = Vec::with_capacity(results.len());
for result in results {
if let Some(prepared) = result? {
prepared_chunks.push(prepared);
let mut already_stored_addresses = Vec::new();
for (address, result) in results {
match result? {
Some(prepared) => prepared_chunks.push(prepared),
None => already_stored_addresses.push(address),
}
}

if let Some(addr) = data_map_address {
if already_stored_addresses.contains(&addr) {
info!(
"Public upload: DataMap chunk {} was already stored \
on the network — address is retrievable without a \
new payment",
hex::encode(addr)
);
}
}

let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);

info!(
"Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
"Data prepared for external signing: {} chunks, {} already stored, total {} atto ({content_len} bytes)",
prepared_chunks.len(),
already_stored_addresses.len(),
payment_intent.total_amount,
);

Expand All @@ -269,6 +346,8 @@ impl Client {
payment_intent,
},
data_map_address,
already_stored_addresses,
total_chunks: chunk_count,
})
}

Expand Down
Loading
Loading