diff --git a/overlay/src/flood/inv_messages.rs b/overlay/src/flood/inv_messages.rs index 34d98745e..51304e288 100644 --- a/overlay/src/flood/inv_messages.rs +++ b/overlay/src/flood/inv_messages.rs @@ -1,6 +1,9 @@ -//! INV/GETDATA message types for bandwidth-efficient TX flooding. +//! TX stream message types for transaction flooding. //! -//! Wire format: length-prefixed `StellarMessage` XDR. +//! Wire format: length-prefixed payloads. INV/GETDATA messages are encoded as +//! `StellarMessage` XDR. TX messages are encoded as a big-endian TTL followed by +//! `StellarMessage::Transaction` XDR; legacy raw transaction messages decode +//! with TTL 0. use std::io; use stellar_xdr::curr::{ @@ -8,6 +11,8 @@ use stellar_xdr::curr::{ TxDemandVector, WriteXdr, }; +const TX_TTL_BYTES: usize = 4; + /// A single INV entry: hash + fee for prioritization #[derive(Debug, Clone, PartialEq, Eq)] pub struct InvEntry { @@ -68,11 +73,18 @@ impl Default for GetData { } } +/// Full transaction data carried on the TX stream. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TxMessage { + pub tx: Vec, + pub ttl: u32, +} + /// Parsed TX stream message #[derive(Debug, Clone, PartialEq, Eq)] pub enum TxStreamMessage { /// Full transaction data - Tx(Vec), + Tx(TxMessage), /// Batch of INV announcements InvBatch(InvBatch), /// Request for transactions @@ -83,8 +95,13 @@ impl TxStreamMessage { /// Encode message as StellarMessage XDR. pub fn encode(&self) -> io::Result> { match self { - TxStreamMessage::Tx(data) => { - crate::xdr::encode_transaction_message_from_xdr(data).map_err(to_invalid_data) + TxStreamMessage::Tx(tx_message) => { + let message = crate::xdr::encode_transaction_message_from_xdr(&tx_message.tx) + .map_err(to_invalid_data)?; + let mut encoded = Vec::with_capacity(TX_TTL_BYTES + message.len()); + encoded.extend_from_slice(&tx_message.ttl.to_be_bytes()); + encoded.extend_from_slice(&message); + Ok(encoded) } TxStreamMessage::InvBatch(batch) => { let hashes = batch @@ -107,12 +124,34 @@ impl TxStreamMessage { /// Decode StellarMessage XDR from the TX stream. pub fn decode(data: &[u8]) -> io::Result { + let (ttl, data) = match StellarMessage::from_xdr(data, Limits::none()) { + Ok(StellarMessage::Transaction(envelope)) => { + let tx = + crate::xdr::canonical_transaction_xdr(envelope).map_err(to_invalid_data)?; + return Ok(TxStreamMessage::Tx(TxMessage { tx, ttl: 0 })); + } + Ok(message) => return Self::decode_stellar_message(message), + Err(err) => { + if data.len() < TX_TTL_BYTES { + return Err(to_invalid_data(err)); + } + let ttl = u32::from_be_bytes(data[..TX_TTL_BYTES].try_into().unwrap()); + (ttl, &data[TX_TTL_BYTES..]) + } + }; + match StellarMessage::from_xdr(data, Limits::none()).map_err(to_invalid_data)? { StellarMessage::Transaction(envelope) => { let tx = crate::xdr::canonical_transaction_xdr(envelope).map_err(to_invalid_data)?; - Ok(TxStreamMessage::Tx(tx)) + Ok(TxStreamMessage::Tx(TxMessage { tx, ttl })) } + other => Self::decode_stellar_message(other), + } + } + + fn decode_stellar_message(message: StellarMessage) -> io::Result { + match message { StellarMessage::FloodAdvert(advert) => { let entries = advert .tx_hashes @@ -148,13 +187,31 @@ mod tests { #[test] fn test_tx_stream_message_tx() { let tx_data = valid_transaction_xdr(1000, 1, 1); - let msg = TxStreamMessage::Tx(tx_data.clone()); + let msg = TxStreamMessage::Tx(TxMessage { + tx: tx_data.clone(), + ttl: 7, + }); let encoded = msg.encode().unwrap(); let decoded = TxStreamMessage::decode(&encoded).unwrap(); assert_eq!(msg, decoded); } + #[test] + fn test_tx_stream_message_legacy_tx_decodes_with_zero_ttl() { + let tx_data = valid_transaction_xdr(1000, 2, 1); + let encoded = crate::xdr::encode_transaction_message_from_xdr(&tx_data).unwrap(); + + let decoded = TxStreamMessage::decode(&encoded).unwrap(); + assert_eq!( + decoded, + TxStreamMessage::Tx(TxMessage { + tx: tx_data, + ttl: 0, + }) + ); + } + #[test] fn test_tx_stream_message_inv_batch() { let mut batch = InvBatch::new(); diff --git a/overlay/src/flood/mod.rs b/overlay/src/flood/mod.rs index b2e03b718..009609dc2 100644 --- a/overlay/src/flood/mod.rs +++ b/overlay/src/flood/mod.rs @@ -1,6 +1,6 @@ //! TX flooding module. //! -//! Provides mempool management, TX set building, and INV/GETDATA flooding. +//! Provides mempool management, TX set building, and TX flooding messages. mod inv_batcher; mod inv_messages; @@ -11,7 +11,7 @@ mod tx_buffer; mod txset; pub use inv_batcher::InvBatcher; -pub use inv_messages::{GetData, InvBatch, InvEntry, TxStreamMessage}; +pub use inv_messages::{GetData, InvBatch, InvEntry, TxMessage, TxStreamMessage}; pub use inv_tracker::InvTracker; pub use mempool::{Mempool, TxEntry}; pub use pending_requests::PendingRequests; diff --git a/overlay/src/libp2p_overlay.rs b/overlay/src/libp2p_overlay.rs index ecb4aa195..a4b37c5e0 100644 --- a/overlay/src/libp2p_overlay.rs +++ b/overlay/src/libp2p_overlay.rs @@ -5,14 +5,15 @@ //! //! Uses libp2p-stream for persistent bidirectional streams: //! - SCP stream: consensus messages (priority, ~500B) -//! - TX stream: transaction flooding (~1KB) - uses INV/GETDATA protocol +//! - TX stream: transaction flooding (~1KB) - uses eager or INV/GETDATA protocol //! - TxSet stream: TX set request/response (~10MB) //! //! Each stream is opened once per peer and kept alive. //! QUIC provides independent loss recovery per stream. use crate::flood::{ - GetData, InvBatch, InvBatcher, InvEntry, InvTracker, PendingRequests, TxBuffer, TxStreamMessage, + GetData, InvBatch, InvBatcher, InvEntry, InvTracker, PendingRequests, TxBuffer, TxMessage, + TxStreamMessage, }; use crate::metrics::OverlayMetrics; use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; @@ -26,10 +27,12 @@ use libp2p::{ Multiaddr, PeerId, Stream, StreamProtocol, Swarm, SwarmBuilder, }; use libp2p_stream::{Behaviour as StreamBehaviour, Control, IncomingStreams}; +use rand::seq::SliceRandom; use std::collections::{HashMap, HashSet}; +use std::env; use std::io; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, Mutex, RwLock}; use tracing::{debug, error, info, trace, warn}; @@ -39,9 +42,83 @@ pub const SCP_PROTOCOL: StreamProtocol = StreamProtocol::new("/stellar/scp/1.0.0 pub const TX_PROTOCOL: StreamProtocol = StreamProtocol::new("/stellar/tx/1.0.0"); pub const TXSET_PROTOCOL: StreamProtocol = StreamProtocol::new("/stellar/txset/1.0.0"); +/// Compile-time switch selecting direct eager TX flooding instead of INV/GETDATA. +pub const TX_FLOOD_EAGER_MODE: bool = true; +/// Environment variable overriding the non-initial eager relay fanout. +pub const OVERLAY_EAGER_FANOUT_ENV: &str = "OVERLAY_EAGER_FANOUT"; +/// Environment variable overriding the initial eager broadcast redundancy multiplier. +pub const OVERLAY_EAGER_REDUNDANCY_ENV: &str = "OVERLAY_EAGER_REDUNDANCY"; +/// Default number of peers each non-initial eager relay forwards to. +pub const OVERLAY_EAGER_FANOUT_DEFAULT: usize = 4; +/// Default multiplier used only for initial eager broadcasts from Core. +pub const OVERLAY_EAGER_REDUNDANCY_DEFAULT: usize = 1; + +const _: () = assert!(OVERLAY_EAGER_FANOUT_DEFAULT > 1); +const _: () = assert!(OVERLAY_EAGER_REDUNDANCY_DEFAULT > 0); + +pub fn overlay_eager_fanout() -> usize { + static FANOUT: OnceLock = OnceLock::new(); + *FANOUT.get_or_init(|| { + overlay_eager_env_usize( + OVERLAY_EAGER_FANOUT_ENV, + OVERLAY_EAGER_FANOUT_DEFAULT, + |value| value > 1, + "greater than 1", + ) + }) +} + +pub fn overlay_eager_redundancy() -> usize { + static REDUNDANCY: OnceLock = OnceLock::new(); + *REDUNDANCY.get_or_init(|| { + overlay_eager_env_usize( + OVERLAY_EAGER_REDUNDANCY_ENV, + OVERLAY_EAGER_REDUNDANCY_DEFAULT, + |value| value > 0, + "greater than 0", + ) + }) +} + +fn overlay_eager_env_usize( + env_var_name: &str, + default_value: usize, + is_valid: impl Fn(usize) -> bool, + expected_value: &str, +) -> usize { + match env::var(env_var_name) { + Ok(raw_value) => match raw_value.parse::() { + Ok(value) if is_valid(value) => value, + Ok(value) => { + warn!( + "Ignoring {}={} because it must be {}; using default {}", + env_var_name, value, expected_value, default_value + ); + default_value + } + Err(error) => { + warn!( + "Ignoring {}={:?} because it is not a usize ({}); using default {}", + env_var_name, raw_value, error, default_value + ); + default_value + } + }, + Err(env::VarError::NotPresent) => default_value, + Err(env::VarError::NotUnicode(raw_value)) => { + warn!( + "Ignoring {}={:?} because it is not valid Unicode; using default {}", + env_var_name, raw_value, default_value + ); + default_value + } + } +} + /// Message frame: 4-byte length prefix + payload /// Max message size: 16MB (for large TX sets) const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; +const SCP_TTL_BYTES: usize = 4; /// Bounded channel capacity for TX events (backpressure for TX flooding) /// TXs that can't be queued are dropped - they'll be re-requested if needed. @@ -95,6 +172,8 @@ pub enum OverlayCommand { RequestScpState { ledger_seq: u32 }, /// Send SCP envelope to a specific peer SendScpToPeer { peer_id: PeerId, envelope: Vec }, + /// Update the number of known peers used to size eager flooding trees. + SetKnownPeerCount(usize), /// Shutdown Shutdown, /// Query the number of connected peers (responds via oneshot) @@ -256,6 +335,19 @@ impl OverlayHandle { Ok(()) } + pub async fn set_known_peer_count(&self, count: usize) { + if let Err(e) = self + .cmd_tx + .send(OverlayCommand::SetKnownPeerCount(count)) + .await + { + warn!( + "Overlay command channel closed, failed to send SetKnownPeerCount: {}", + e + ); + } + } + pub async fn shutdown(&self) { if let Err(e) = self.cmd_tx.send(OverlayCommand::Shutdown).await { warn!( @@ -292,8 +384,6 @@ struct SharedState { scp_seen: RwLock>, /// TX messages seen (for dedup) tx_seen: RwLock>, - /// Track which peers we've sent each SCP message to (prevent duplicate sends) - scp_sent_to: RwLock>>, /// TX set sources: which peer has which TX set (learned from SCP messages) txset_sources: RwLock>, /// Pending TX set requests: hash -> (peer, request_time) to avoid duplicate fetches and track latency @@ -316,6 +406,11 @@ struct SharedState { pending_getdata: RwLock, /// TX buffer for responding to GETDATA requests tx_buffer: RwLock, + /// Current eager flooding TTL depth derived from known and connected peer counts. + overlay_eager_depth: AtomicU64, + /// Number of peers known by configuration/peer discovery, whether currently + /// connected or not. Used to size eager flooding trees. + overlay_known_peer_count: AtomicU64, /// Overlay metrics (shared with App for IPC reporting) metrics: Arc, } @@ -335,9 +430,6 @@ impl SharedState { tx_seen: RwLock::new(lru::LruCache::new( std::num::NonZeroUsize::new(100000).unwrap(), )), - scp_sent_to: RwLock::new(lru::LruCache::new( - std::num::NonZeroUsize::new(10000).unwrap(), - )), txset_sources: RwLock::new(lru::LruCache::new( std::num::NonZeroUsize::new(1000).unwrap(), )), @@ -351,6 +443,8 @@ impl SharedState { inv_tracker: RwLock::new(InvTracker::new()), pending_getdata: RwLock::new(PendingRequests::new()), tx_buffer: RwLock::new(TxBuffer::new()), + overlay_eager_depth: AtomicU64::new(0), + overlay_known_peer_count: AtomicU64::new(0), metrics, } } @@ -494,8 +588,9 @@ impl StellarOverlay { tokio::spawn(handle_inbound_tx_streams(tx_incoming, state.clone())); tokio::spawn(handle_inbound_txset_streams(txset_incoming, state.clone())); - // Spawn INV/GETDATA housekeeping task - tokio::spawn(inv_getdata_housekeeping_task(state.clone())); + if !TX_FLOOD_EAGER_MODE { + tokio::spawn(inv_getdata_housekeeping_task(state.clone())); + } loop { tokio::select! { @@ -555,7 +650,7 @@ impl StellarOverlay { OverlayCommand::SendScpToPeer { peer_id, envelope } => { // Don't hold &self across await - extract state and call helper directly let state = Arc::clone(&self.state); - let message = match crate::xdr::encode_scp_message(&envelope) { + let message = match encode_scp_stream_message(&envelope, 0) { Ok(message) => message, Err(e) => { warn!("Dropping invalid SCP envelope for {}: {}", peer_id, e); @@ -566,6 +661,9 @@ impl StellarOverlay { warn!("Failed to send SCP to {}: {:?}", peer_id, e); } } + OverlayCommand::SetKnownPeerCount(count) => { + update_known_peer_count(&self.state, count); + } OverlayCommand::Shutdown => { info!("Overlay shutting down"); break; @@ -626,6 +724,7 @@ impl StellarOverlay { { let mut streams = self.state.peer_streams.write().await; streams.insert(peer_id, Arc::new(PeerOutboundStreams::new())); + update_eager_depth(&self.state, streams.len()); } // Notify application so it can record the PeerId ↔ address mapping. @@ -675,6 +774,7 @@ impl StellarOverlay { { let mut streams = self.state.peer_streams.write().await; streams.remove(&peer_id); + update_eager_depth(&self.state, streams.len()); } // Clean up pending txset requests for this peer { @@ -736,56 +836,46 @@ impl StellarOverlay { } } - /// Broadcast SCP envelope to all connected peers + /// Broadcast SCP envelope to connected peers using eager random broadcast trees. async fn broadcast_scp(&mut self, envelope: &[u8]) { - let message = match crate::xdr::encode_scp_message(envelope) { + let hash = blake2b_hash(envelope); + let ttl = self.state.overlay_eager_depth.load(Ordering::Relaxed) as u32; + let message = match encode_scp_stream_message(envelope, ttl) { Ok(message) => message, Err(e) => { warn!("SCP_BROADCAST_DROP: Dropping invalid SCP envelope: {}", e); return; } }; - let hash = blake2b_hash(envelope); - // Mark as seen for inbound dedup (if we later receive this from a peer, skip it) - { - let mut seen = self.state.scp_seen.write().await; - seen.put(hash, ()); - } - - // Determine which peers still need this message let streams = self.state.peer_streams.read().await; - let all_peers: Vec<_> = streams.keys().cloned().collect(); + let peers: Vec<_> = streams.keys().cloned().collect(); drop(streams); - let peers_to_send: Vec; - { - let mut sent_to = self.state.scp_sent_to.write().await; - let already_sent: HashSet = sent_to.peek(&hash).cloned().unwrap_or_default(); - - peers_to_send = all_peers - .into_iter() - .filter(|p| !already_sent.contains(p)) - .collect(); - - if peers_to_send.is_empty() { - trace!( - "SCP_BROADCAST_SKIP: SCP {:02x?}... already sent to all connected peers", - &hash[..4] - ); - return; - } + if peers.is_empty() { + debug!( + "SCP_BROADCAST: No peers to send SCP {:02x?}...", + &hash[..4] + ); + return; + } - // Update sent_to with the peers we're about to send to - let mut new_sent = already_sent; - new_sent.extend(peers_to_send.iter().cloned()); - sent_to.put(hash, new_sent); + let initial_fanout = overlay_eager_fanout().saturating_mul(overlay_eager_redundancy()); + let peers_to_send = select_eager_peers(peers, None, initial_fanout); + + if peers_to_send.is_empty() { + trace!( + "SCP_BROADCAST_SKIP: SCP {:02x?}... has no eligible peers", + &hash[..4] + ); + return; } info!( - "SCP_BROADCAST: Broadcasting SCP {:02x?}... ({} bytes) to {} peers", + "SCP_BROADCAST: Broadcasting SCP {:02x?}... ({} bytes, ttl={}) to {} peers", &hash[..4], envelope.len(), + ttl, peers_to_send.len() ); self.state @@ -793,45 +883,12 @@ impl StellarOverlay { .message_broadcast .fetch_add(1, Ordering::Relaxed); - // Spawn parallel send tasks - don't block event loop waiting for each peer for peer_id in peers_to_send { - let state = Arc::clone(&self.state); - let message = message.clone(); - tokio::spawn(async move { - match send_to_peer_stream(&state, peer_id.clone(), StreamType::Scp, &message).await - { - Ok(_) => { - state - .metrics - .send_scp_message - .fetch_add(1, Ordering::Relaxed); - state.metrics.message_write.fetch_add(1, Ordering::Relaxed); - state - .metrics - .byte_write - .fetch_add(message.len() as u64, Ordering::Relaxed); - debug!( - "SCP_SEND_OK: Sent SCP {:02x?}... to {}", - &hash[..4], - peer_id - ); - } - Err(e) => { - state.metrics.error_write.fetch_add(1, Ordering::Relaxed); - warn!( - "SCP_SEND_FAIL: Failed to send SCP {:02x?}... to {}: {}", - &hash[..4], - peer_id, - e - ); - } - } - }); + send_scp_message(&self.state, peer_id, message.clone(), ttl, hash).await; } } - /// Broadcast TX to all connected peers - /// Broadcast TX using INV/GETDATA protocol (bandwidth efficient) + /// Broadcast TX to connected peers using the selected TX flooding mode. async fn broadcast_tx(&mut self, tx: &[u8]) { let parsed = match crate::xdr::parse_supported_transaction(tx) { Ok(parsed) => parsed, @@ -869,7 +926,29 @@ impl StellarOverlay { drop(streams); if peers.is_empty() { - debug!("TX_INV: No peers to announce TX {:02x?}...", &hash[..4]); + debug!("TX_FLOOD: No peers to announce TX {:02x?}...", &hash[..4]); + return; + } + + if TX_FLOOD_EAGER_MODE { + let initial_fanout = overlay_eager_fanout().saturating_mul(overlay_eager_redundancy()); + let peers_to_send = select_eager_peers(peers, None, initial_fanout); + let ttl = self.state.overlay_eager_depth.load(Ordering::Relaxed) as u32; + debug!( + "TX_EAGER: Sending TX {:02x?}... ({} bytes, ttl={}) to {} peers", + &hash[..4], + tx.len(), + ttl, + peers_to_send.len() + ); + self.state + .metrics + .send_transaction + .fetch_add(peers_to_send.len() as u64, Ordering::Relaxed); + + for peer in peers_to_send { + send_tx_message(&self.state, peer, tx.clone(), ttl, hash).await; + } return; } @@ -1106,8 +1185,7 @@ impl StellarOverlay { /// Send SCP envelope to a specific peer pub async fn send_scp_to_peer(&self, peer_id: PeerId, envelope: &[u8]) -> io::Result<()> { - let message = crate::xdr::encode_scp_message(envelope) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let message = encode_scp_stream_message(envelope, 0)?; send_to_peer_stream(&self.state, peer_id, StreamType::Scp, &message).await } } @@ -1458,7 +1536,7 @@ async fn handle_inbound_scp_streams(mut incoming: IncomingStreams, state: Arc message, Err(e) => { warn!("SCP_PARSE_ERR: Dropping malformed SCP stream message from {}: {}", peer_id, e); @@ -1466,8 +1544,8 @@ async fn handle_inbound_scp_streams(mut incoming: IncomingStreams, state: Arc { + let (envelope, ttl) = match message { + ScpStreamMessage::GetScpState(ledger_seq) => { info!( "SCP_STATE_REQ: Peer {} requests SCP state for ledger >= {}", peer_id, ledger_seq @@ -1484,26 +1562,7 @@ async fn handle_inbound_scp_streams(mut incoming: IncomingStreams, state: Arc { - match crate::xdr::canonical_scp_envelope_xdr(envelope) { - Ok(envelope) => envelope, - Err(e) => { - warn!( - "SCP_PARSE_ERR: Dropping invalid SCP envelope from {}: {}", - peer_id, e - ); - continue; - } - } - } - other => { - warn!( - "SCP_PARSE_ERR: Dropping unexpected {} on SCP stream from {}", - other.name(), - peer_id - ); - continue; - } + ScpStreamMessage::Scp { envelope, ttl } => (envelope, ttl), }; let hash = blake2b_hash(&envelope); @@ -1518,18 +1577,6 @@ async fn handle_inbound_scp_streams(mut incoming: IncomingStreams, state: Arc 0 { + let peers_to_send = { + let streams = state.peer_streams.read().await; + let peers = streams.keys().cloned().collect(); + select_eager_peers(peers, Some(&peer_id), overlay_eager_fanout()) + }; + + if !peers_to_send.is_empty() { + let relay_ttl = ttl - 1; + let message = match encode_scp_stream_message(&envelope, relay_ttl) + { + Ok(message) => message, + Err(e) => { + warn!( + "SCP_RELAY_DROP: Failed to encode SCP {:02x?}... from {}: {}", + &hash[..4], peer_id, e + ); + continue; + } + }; + debug!( + "SCP_RELAY: Relaying SCP {:02x?}... with ttl={} to {} peers", + &hash[..4], + relay_ttl, + peers_to_send.len() + ); + for peer in peers_to_send { + send_scp_message(&state, peer, message.clone(), relay_ttl, hash) + .await; + } + } + } + // Forward to Core if let Err(e) = state.event_tx.send(OverlayEvent::ScpReceived { envelope, @@ -1607,7 +1687,7 @@ async fn handle_inbound_tx_streams(mut incoming: IncomingStreams, state: Arc, peer_id: &PeerId, @@ -1616,13 +1696,27 @@ async fn handle_tx_stream_message( ) { match TxStreamMessage::decode(data) { Ok(TxStreamMessage::InvBatch(batch)) => { + if TX_FLOOD_EAGER_MODE { + warn!( + "TX_INV_IGNORED: Ignoring INV from {} in eager mode", + peer_id + ); + return; + } handle_inv_batch(state, peer_id, batch).await; } Ok(TxStreamMessage::GetData(getdata)) => { + if TX_FLOOD_EAGER_MODE { + warn!( + "TX_GETDATA_IGNORED: Ignoring GETDATA from {} in eager mode", + peer_id + ); + return; + } handle_getdata(state, peer_id, getdata, stream).await; } - Ok(TxStreamMessage::Tx(tx_data)) => { - handle_tx_response(state, peer_id, tx_data).await; + Ok(TxStreamMessage::Tx(tx_message)) => { + handle_tx_response(state, peer_id, tx_message).await; } Err(e) => { warn!( @@ -1737,7 +1831,10 @@ async fn handle_getdata( .flood_fulfilled .fetch_add(1, Ordering::Relaxed); // Send TX response - let msg = TxStreamMessage::Tx(tx_data); + let msg = TxStreamMessage::Tx(TxMessage { + tx: tx_data, + ttl: 0, + }); let encoded = match msg.encode() { Ok(encoded) => encoded, Err(e) => { @@ -1784,9 +1881,9 @@ async fn handle_getdata( } } -/// Handle TX response (from GETDATA request) -async fn handle_tx_response(state: &Arc, peer_id: &PeerId, tx: Vec) { - let parsed = match crate::xdr::parse_supported_transaction(&tx) { +/// Handle TX response or eager TX flood message. +async fn handle_tx_response(state: &Arc, peer_id: &PeerId, tx_message: TxMessage) { + let parsed = match crate::xdr::parse_supported_transaction(&tx_message.tx) { Ok(parsed) => parsed, Err(e) => { warn!("TX_RECV_DROP: Dropping invalid TX from {}: {}", peer_id, e); @@ -1796,6 +1893,7 @@ async fn handle_tx_response(state: &Arc, peer_id: &PeerId, tx: Vec< let hash = parsed.full_hash; let tx = parsed.envelope_xdr; let fee_per_op = (parsed.fee / u64::from(parsed.num_ops.max(1))) as i64; + let ttl = tx_message.ttl; let recv_start = std::time::Instant::now(); let tx_len = tx.len() as u64; @@ -1866,7 +1964,45 @@ async fn handle_tx_response(state: &Arc, peer_id: &PeerId, tx: Vec< } } - // RELAY: Announce to other peers via INV + if TX_FLOOD_EAGER_MODE { + if ttl > 0 { + let peers_to_send = { + let streams = state.peer_streams.read().await; + let peers = streams.keys().cloned().collect(); + select_eager_peers(peers, Some(peer_id), overlay_eager_fanout()) + }; + + if !peers_to_send.is_empty() { + debug!( + "TX_EAGER_RELAY: Relaying TX {:02x?}... with ttl={} to {} peers", + &hash[..4], + ttl - 1, + peers_to_send.len() + ); + for peer in peers_to_send { + send_tx_message(state, peer, tx.clone(), ttl - 1, hash).await; + } + } + } + } else { + relay_tx_inv(state, peer_id, hash, fee_per_op).await; + } + + // Record recv-transaction timing + let elapsed_us = recv_start.elapsed().as_micros() as u64; + state + .metrics + .recv_transaction_sum_us + .fetch_add(elapsed_us, Ordering::Relaxed); + state + .metrics + .recv_transaction_count + .fetch_add(1, Ordering::Relaxed); + state.metrics.update_recv_transaction_max(elapsed_us); +} + +/// Announce a received TX to peers via INV in INV/GETDATA mode. +async fn relay_tx_inv(state: &Arc, peer_id: &PeerId, hash: [u8; 32], fee_per_op: i64) { let peers_to_announce: Vec = { let streams = state.peer_streams.read().await; let tracker = state.inv_tracker.read().await; @@ -1904,18 +2040,194 @@ async fn handle_tx_response(state: &Arc, peer_id: &PeerId, tx: Vec< } } } +} - // Record recv-transaction timing - let elapsed_us = recv_start.elapsed().as_micros() as u64; +fn update_eager_depth(state: &SharedState, peer_count: usize) { + let peer_count = + peer_count.max(state.overlay_known_peer_count.load(Ordering::Relaxed) as usize); state - .metrics - .recv_transaction_sum_us - .fetch_add(elapsed_us, Ordering::Relaxed); + .overlay_eager_depth + .store(calculate_eager_depth(peer_count), Ordering::Relaxed); +} + +fn update_known_peer_count(state: &SharedState, peer_count: usize) { state - .metrics - .recv_transaction_count - .fetch_add(1, Ordering::Relaxed); - state.metrics.update_recv_transaction_max(elapsed_us); + .overlay_known_peer_count + .store(peer_count as u64, Ordering::Relaxed); + let connected_peer_count = state + .peer_streams + .try_read() + .map(|streams| streams.len()) + .unwrap_or(0); + update_eager_depth(state, connected_peer_count); +} + +fn calculate_eager_depth(peer_count: usize) -> u64 { + calculate_eager_depth_for_fanout(peer_count, overlay_eager_fanout()) +} + +fn calculate_eager_depth_for_fanout(peer_count: usize, fanout: usize) -> u64 { + if peer_count <= 1 { + return 0; + } + + let mut depth = 0; + let mut covered = 1usize; + while covered < peer_count { + depth += 1; + covered = covered.saturating_mul(fanout); + if covered == usize::MAX { + break; + } + } + depth +} + +fn select_eager_peers(peers: Vec, exclude: Option<&PeerId>, limit: usize) -> Vec { + let mut candidates = peers + .into_iter() + .filter(|peer| exclude.map_or(true, |excluded| peer != excluded)) + .collect::>(); + candidates.shuffle(&mut rand::thread_rng()); + candidates.truncate(limit); + candidates +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum ScpStreamMessage { + Scp { envelope: Vec, ttl: u32 }, + GetScpState(u32), +} + +fn encode_scp_stream_message(envelope: &[u8], ttl: u32) -> io::Result> { + let message = crate::xdr::encode_scp_message(envelope) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let mut encoded = Vec::with_capacity(SCP_TTL_BYTES + message.len()); + encoded.extend_from_slice(&ttl.to_be_bytes()); + encoded.extend_from_slice(&message); + Ok(encoded) +} + +fn decode_scp_stream_message(data: &[u8]) -> io::Result { + match crate::xdr::parse_stellar_message(data) { + Ok(stellar_xdr::curr::StellarMessage::GetScpState(ledger_seq)) => { + return Ok(ScpStreamMessage::GetScpState(ledger_seq)); + } + Ok(stellar_xdr::curr::StellarMessage::ScpMessage(envelope)) => { + let envelope = crate::xdr::canonical_scp_envelope_xdr(envelope) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + return Ok(ScpStreamMessage::Scp { envelope, ttl: 0 }); + } + Ok(other) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unexpected SCP stream StellarMessage {}", other.name()), + )); + } + Err(err) => { + if data.len() < SCP_TTL_BYTES { + return Err(io::Error::new(io::ErrorKind::InvalidData, err.to_string())); + } + } + } + + let ttl = u32::from_be_bytes(data[..SCP_TTL_BYTES].try_into().unwrap()); + match crate::xdr::parse_stellar_message(&data[SCP_TTL_BYTES..]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))? + { + stellar_xdr::curr::StellarMessage::ScpMessage(envelope) => { + let envelope = crate::xdr::canonical_scp_envelope_xdr(envelope) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + Ok(ScpStreamMessage::Scp { envelope, ttl }) + } + other => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unexpected TTL-prefixed SCP stream StellarMessage {}", other.name()), + )), + } +} + +async fn send_scp_message( + state: &Arc, + peer: PeerId, + message: Vec, + ttl: u32, + hash: [u8; 32], +) { + let state = Arc::clone(state); + tokio::spawn(async move { + match send_to_peer_stream(&state, peer, StreamType::Scp, &message).await { + Ok(_) => { + state + .metrics + .send_scp_message + .fetch_add(1, Ordering::Relaxed); + state.metrics.message_write.fetch_add(1, Ordering::Relaxed); + state + .metrics + .byte_write + .fetch_add(message.len() as u64, Ordering::Relaxed); + debug!( + "SCP_SEND_OK: Sent SCP {:02x?}... to {} with ttl={}", + &hash[..4], + peer, + ttl + ); + } + Err(e) => { + state.metrics.error_write.fetch_add(1, Ordering::Relaxed); + warn!( + "SCP_SEND_FAIL: Failed to send SCP {:02x?}... to {}: {}", + &hash[..4], + peer, + e + ); + } + } + }); +} + +async fn send_tx_message( + state: &Arc, + peer: PeerId, + tx: Vec, + ttl: u32, + hash: [u8; 32], +) { + let msg = TxStreamMessage::Tx(TxMessage { tx, ttl }); + let encoded = match msg.encode() { + Ok(encoded) => encoded, + Err(e) => { + state.metrics.error_write.fetch_add(1, Ordering::Relaxed); + warn!("Failed to encode TX for {}: {}", peer, e); + return; + } + }; + + let state = Arc::clone(state); + tokio::spawn(async move { + if let Err(e) = send_to_peer_stream(&state, peer, StreamType::Tx, &encoded).await { + state.metrics.error_write.fetch_add(1, Ordering::Relaxed); + warn!( + "Failed to send TX {:02x?}... to {}: {}", + &hash[..4], + peer, + e + ); + } else { + state.metrics.message_write.fetch_add(1, Ordering::Relaxed); + state + .metrics + .byte_write + .fetch_add(encoded.len() as u64, Ordering::Relaxed); + debug!( + "TX_SEND: Sent TX {:02x?}... to {} with ttl={}", + &hash[..4], + peer, + ttl + ); + } + }); } /// Handle inbound TxSet streams from peers @@ -2167,6 +2479,60 @@ fn test_txset_xdr(seed: u8) -> ([u8; 32], Vec) { mod tests { use super::*; + #[test] + fn test_calculate_eager_depth() { + assert_eq!(calculate_eager_depth_for_fanout(0, 4), 0); + assert_eq!(calculate_eager_depth_for_fanout(1, 4), 0); + assert_eq!(calculate_eager_depth_for_fanout(2, 4), 1); + assert_eq!(calculate_eager_depth_for_fanout(4, 4), 1); + assert_eq!(calculate_eager_depth_for_fanout(5, 4), 2); + assert_eq!(calculate_eager_depth_for_fanout(16, 4), 2); + assert_eq!(calculate_eager_depth_for_fanout(17, 4), 3); + assert_eq!(calculate_eager_depth_for_fanout(64, 4), 3); + } + + #[test] + fn test_select_eager_peers_honors_limit_and_exclusion() { + let peers = (0..6).map(|_| PeerId::random()).collect::>(); + let excluded = peers[2]; + + let selected = select_eager_peers(peers.clone(), Some(&excluded), 3); + + assert_eq!(selected.len(), 3); + assert!(!selected.contains(&excluded)); + + let all_selected = select_eager_peers(peers.clone(), Some(&excluded), peers.len()); + assert_eq!(all_selected.len(), peers.len() - 1); + assert!(!all_selected.contains(&excluded)); + } + + #[test] + fn test_scp_stream_message_round_trips_with_ttl() { + let envelope = test_scp_envelope_xdr(42); + let encoded = encode_scp_stream_message(&envelope, 7).unwrap(); + + let decoded = decode_scp_stream_message(&encoded).unwrap(); + assert_eq!( + decoded, + ScpStreamMessage::Scp { + envelope, + ttl: 7, + } + ); + } + + #[test] + fn test_scp_stream_message_legacy_scp_decodes_with_zero_ttl() { + let envelope = test_scp_envelope_xdr(43); + let encoded = crate::xdr::encode_scp_message(&envelope).unwrap(); + + let decoded = decode_scp_stream_message(&encoded).unwrap(); + assert_eq!( + decoded, + ScpStreamMessage::Scp { envelope, ttl: 0 } + ); + } + #[tokio::test] async fn test_overlay_creation() { let keypair = Keypair::generate_ed25519(); @@ -4028,6 +4394,10 @@ async fn test_pending_txset_cleanup_on_disconnect() { /// Test INV/GETDATA protocol: TX propagation via INV→GETDATA→TX flow #[tokio::test] async fn test_inv_getdata_tx_propagation() { + if TX_FLOOD_EAGER_MODE { + return; + } + let keypair1 = Keypair::generate_ed25519(); let keypair2 = Keypair::generate_ed25519(); @@ -4106,6 +4476,10 @@ async fn test_inv_getdata_tx_propagation() { /// Test INV/GETDATA protocol: TX relay through 3 nodes (A→B→C) #[tokio::test] async fn test_inv_getdata_three_node_relay() { + if TX_FLOOD_EAGER_MODE { + return; + } + let keypair1 = Keypair::generate_ed25519(); let keypair2 = Keypair::generate_ed25519(); let keypair3 = Keypair::generate_ed25519(); @@ -4394,7 +4768,7 @@ async fn test_scp_relay_no_echo_to_sender() { } assert!(node2_received, "Node2 should receive SCP from Node1"); - // Node2 relays - this should NOT send back to Node1 (already in scp_sent_to) + // A duplicate Core rebroadcast after overlay relay should not echo back to Node1. handle2.broadcast_scp(scp_msg.clone()).await; // Wait and verify Node1 does NOT receive an echo diff --git a/overlay/src/main.rs b/overlay/src/main.rs index f14c4f359..722af4386 100644 --- a/overlay/src/main.rs +++ b/overlay/src/main.rs @@ -1313,6 +1313,10 @@ impl App { cp.resolved.clear(); } + self.libp2p_handle + .set_known_peer_count(all_peers.len().saturating_sub(1)) + .await; + // Prune known_peers and peer_hostnames for peers whose // hostnames are no longer in the config. Prevents stale // entries from re-dialing removed peers.