Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions overlay/src/flood/inv_messages.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
//! INV/GETDATA message types for bandwidth-efficient TX flooding.
//! TX stream message types for transaction flooding.
//!
//! Wire format: length-prefixed `StellarMessage` XDR.
//! Wire format: length-prefixed payloads. INV/GETDATA messages are encoded as
//! `StellarMessage` XDR. TX messages are encoded as a big-endian TTL followed by
//! `StellarMessage::Transaction` XDR; legacy raw transaction messages decode
//! with TTL 0.

use std::io;
use stellar_xdr::curr::{
FloodAdvert, FloodDemand, Hash, Limits, ReadXdr, StellarMessage, TxAdvertVector,
TxDemandVector, WriteXdr,
};

const TX_TTL_BYTES: usize = 4;

/// A single INV entry: hash + fee for prioritization
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvEntry {
Expand Down Expand Up @@ -68,11 +73,18 @@ impl Default for GetData {
}
}

/// Full transaction data carried on the TX stream.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TxMessage {
pub tx: Vec<u8>,
pub ttl: u32,
}

/// Parsed TX stream message
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TxStreamMessage {
/// Full transaction data
Tx(Vec<u8>),
Tx(TxMessage),
/// Batch of INV announcements
InvBatch(InvBatch),
/// Request for transactions
Expand All @@ -83,8 +95,13 @@ impl TxStreamMessage {
/// Encode message as StellarMessage XDR.
pub fn encode(&self) -> io::Result<Vec<u8>> {
match self {
TxStreamMessage::Tx(data) => {
crate::xdr::encode_transaction_message_from_xdr(data).map_err(to_invalid_data)
TxStreamMessage::Tx(tx_message) => {
let message = crate::xdr::encode_transaction_message_from_xdr(&tx_message.tx)
.map_err(to_invalid_data)?;
let mut encoded = Vec::with_capacity(TX_TTL_BYTES + message.len());
encoded.extend_from_slice(&tx_message.ttl.to_be_bytes());
encoded.extend_from_slice(&message);
Comment on lines +101 to +103

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve compatibility when sending TX frames

Because the TX stream protocol remains /stellar/tx/1.0.0, a mixed-version peer will accept this stream and then try to decode the payload as a StellarMessage. Prefixing every TX with the TTL makes upgraded nodes send bytes that older nodes cannot parse, so any rolling upgrade or connection to an unupgraded peer drops all TX responses/eager TXs even though this decoder still accepts legacy frames. Please either negotiate/bump the protocol or keep legacy TX encoding when talking to peers that have not advertised TTL support.

Useful? React with 👍 / 👎.

Ok(encoded)
}
TxStreamMessage::InvBatch(batch) => {
let hashes = batch
Expand All @@ -107,12 +124,34 @@ impl TxStreamMessage {

/// Decode StellarMessage XDR from the TX stream.
pub fn decode(data: &[u8]) -> io::Result<Self> {
let (ttl, data) = match StellarMessage::from_xdr(data, Limits::none()) {
Ok(StellarMessage::Transaction(envelope)) => {
let tx =
crate::xdr::canonical_transaction_xdr(envelope).map_err(to_invalid_data)?;
return Ok(TxStreamMessage::Tx(TxMessage { tx, ttl: 0 }));
}
Ok(message) => return Self::decode_stellar_message(message),
Comment on lines +127 to +133

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Decode TTL-prefixed TX frames before legacy messages

When the eager depth equals a valid MessageType discriminator, upgraded peers can drop eager TXs before the TTL is stripped. With the default fanout, any node with 5–16 peers broadcasts with ttl=2, and the first StellarMessage::from_xdr(data, ...) treats the TTL bytes as MessageType::Auth instead of falling through to the TTL-prefixed decode path, so decode_stellar_message rejects the frame as an unexpected TX-stream message. This affects normal upgraded-to-upgraded eager flooding on moderately connected nodes; check the TTL prefix first (or otherwise disambiguate it) rather than attempting a legacy StellarMessage parse on the whole buffer.

Useful? React with 👍 / 👎.

Err(err) => {
if data.len() < TX_TTL_BYTES {
return Err(to_invalid_data(err));
}
let ttl = u32::from_be_bytes(data[..TX_TTL_BYTES].try_into().unwrap());
(ttl, &data[TX_TTL_BYTES..])
}
};

match StellarMessage::from_xdr(data, Limits::none()).map_err(to_invalid_data)? {
StellarMessage::Transaction(envelope) => {
let tx =
crate::xdr::canonical_transaction_xdr(envelope).map_err(to_invalid_data)?;
Ok(TxStreamMessage::Tx(tx))
Ok(TxStreamMessage::Tx(TxMessage { tx, ttl }))
}
other => Self::decode_stellar_message(other),
}
}

fn decode_stellar_message(message: StellarMessage) -> io::Result<Self> {
match message {
StellarMessage::FloodAdvert(advert) => {
let entries = advert
.tx_hashes
Expand Down Expand Up @@ -148,13 +187,31 @@ mod tests {
#[test]
fn test_tx_stream_message_tx() {
let tx_data = valid_transaction_xdr(1000, 1, 1);
let msg = TxStreamMessage::Tx(tx_data.clone());
let msg = TxStreamMessage::Tx(TxMessage {
tx: tx_data.clone(),
ttl: 7,
});

let encoded = msg.encode().unwrap();
let decoded = TxStreamMessage::decode(&encoded).unwrap();
assert_eq!(msg, decoded);
}

#[test]
fn test_tx_stream_message_legacy_tx_decodes_with_zero_ttl() {
let tx_data = valid_transaction_xdr(1000, 2, 1);
let encoded = crate::xdr::encode_transaction_message_from_xdr(&tx_data).unwrap();

let decoded = TxStreamMessage::decode(&encoded).unwrap();
assert_eq!(
decoded,
TxStreamMessage::Tx(TxMessage {
tx: tx_data,
ttl: 0,
})
);
}

#[test]
fn test_tx_stream_message_inv_batch() {
let mut batch = InvBatch::new();
Expand Down
4 changes: 2 additions & 2 deletions overlay/src/flood/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! TX flooding module.
//!
//! Provides mempool management, TX set building, and INV/GETDATA flooding.
//! Provides mempool management, TX set building, and TX flooding messages.

mod inv_batcher;
mod inv_messages;
Expand All @@ -11,7 +11,7 @@ mod tx_buffer;
mod txset;

pub use inv_batcher::InvBatcher;
pub use inv_messages::{GetData, InvBatch, InvEntry, TxStreamMessage};
pub use inv_messages::{GetData, InvBatch, InvEntry, TxMessage, TxStreamMessage};
pub use inv_tracker::InvTracker;
pub use mempool::{Mempool, TxEntry};
pub use pending_requests::PendingRequests;
Expand Down
Loading
Loading