diff --git a/dash-spv/src/chain/chain_tip.rs b/dash-spv/src/chain/chain_tip.rs index 7e03d7b8b..9067f80be 100644 --- a/dash-spv/src/chain/chain_tip.rs +++ b/dash-spv/src/chain/chain_tip.rs @@ -1,309 +1,17 @@ -//! Chain tip management for tracking multiple blockchain tips -//! -//! This module manages multiple chain tips to support fork handling -//! and chain reorganization. +//! Fork-candidate carrier used by the staged-fork pipeline. use super::ChainWork; -use dashcore::{BlockHash, Header as BlockHeader}; -use std::collections::HashMap; - -/// Represents a chain tip with its metadata -#[derive(Debug, Clone, PartialEq)] -pub struct ChainTip { - /// The block hash of this tip - pub hash: BlockHash, - /// The height of this tip - pub height: u32, - /// The header at this tip - pub header: BlockHeader, - /// Cumulative chain work up to this tip - pub chain_work: ChainWork, - /// Whether this is currently the active (best) chain - pub is_active: bool, -} - -impl ChainTip { - /// Create a new chain tip - pub fn new(header: BlockHeader, height: u32, chain_work: ChainWork) -> Self { - Self { - hash: header.block_hash(), - height, - header, - chain_work, - is_active: false, - } - } -} - -/// Manages multiple chain tips for fork handling -pub struct ChainTipManager { - /// All known chain tips indexed by their hash - tips: HashMap, - /// The hash of the current active (best) chain tip - active_tip: Option, - /// Maximum number of tips to track - max_tips: usize, -} - -impl ChainTipManager { - /// Create a new chain tip manager - pub fn new(max_tips: usize) -> Self { - Self { - tips: HashMap::new(), - active_tip: None, - max_tips, - } - } - - /// Add a new chain tip - pub fn add_tip(&mut self, tip: ChainTip) -> Result<(), &'static str> { - let hash = tip.hash; - - // Check if we need to make space - if self.tips.len() >= self.max_tips && !self.tips.contains_key(&hash) { - self.evict_weakest_tip()?; - } - - self.tips.insert(hash, tip); - - // Update active tip if this has more work - self.update_active_tip(); - - Ok(()) - } - - /// Update a tip with a new header extending it - pub fn extend_tip( - &mut self, - tip_hash: &BlockHash, - header: BlockHeader, - new_work: ChainWork, - ) -> Result<(), &'static str> { - let new_height = { - let tip = self.tips.get(tip_hash).ok_or("Tip not found")?; - tip.height + 1 - }; - - let new_tip = ChainTip { - hash: header.block_hash(), - height: new_height, - header, - chain_work: new_work, - is_active: false, - }; - - // Store the old tip temporarily in case we need to restore it - let old_tip = self.tips.remove(tip_hash); - - // Attempt to add the new tip - match self.add_tip(new_tip) { - Ok(()) => Ok(()), - Err(e) => { - // Restore the old tip if adding the new one failed - if let Some(tip) = old_tip { - self.tips.insert(*tip_hash, tip); - } - Err(e) - } - } - } - - /// Get the current active (best) chain tip - pub fn get_active_tip(&self) -> Option<&ChainTip> { - self.active_tip.as_ref().and_then(|hash| self.tips.get(hash)) - } - - /// Get a specific tip by hash - pub fn get_tip(&self, hash: &BlockHash) -> Option<&ChainTip> { - self.tips.get(hash) - } - - /// Get all tips sorted by chain work (descending) - pub fn get_all_tips(&self) -> Vec<&ChainTip> { - let mut tips: Vec<_> = self.tips.values().collect(); - tips.sort_by_key(|t| std::cmp::Reverse(t.chain_work)); - tips - } - - /// Remove a tip - pub fn remove_tip(&mut self, hash: &BlockHash) -> Option { - let tip = self.tips.remove(hash); - - // If we removed the active tip, update to the next best - if self.active_tip.as_ref() == Some(hash) { - self.update_active_tip(); - } - - tip - } - - /// Check if a block hash is a known tip - pub fn is_tip(&self, hash: &BlockHash) -> bool { - self.tips.contains_key(hash) - } - - /// Get the number of tracked tips - pub fn tip_count(&self) -> usize { - self.tips.len() - } - - /// Update the active tip to the one with most work - fn update_active_tip(&mut self) { - // Clear active flag on all tips - for tip in self.tips.values_mut() { - tip.is_active = false; - } - - // Find tip with most work - let best_tip = - self.tips.iter().max_by_key(|(_, tip)| &tip.chain_work).map(|(hash, _)| *hash); - - if let Some(ref hash) = best_tip { - if let Some(tip) = self.tips.get_mut(hash) { - tip.is_active = true; - } - } - - self.active_tip = best_tip; - } - - /// Evict the tip with least work - fn evict_weakest_tip(&mut self) -> Result<(), &'static str> { - // Don't evict the active tip - let weakest = self - .tips - .iter() - .filter(|(hash, _)| self.active_tip.as_ref() != Some(hash)) - .min_by_key(|(_, tip)| &tip.chain_work) - .map(|(hash, _)| *hash); - - if let Some(hash) = weakest { - self.tips.remove(&hash); - Ok(()) - } else { - Err("Cannot evict: the only tip present is active") - } - } - - /// Clear all tips - pub fn clear(&mut self) { - self.tips.clear(); - self.active_tip = None; - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_tip_manager() { - let mut manager = ChainTipManager::new(5); - - // Add some tips with different work - for i in 0..3 { - let tip = ChainTip::dummy(i, i as u8); - manager.add_tip(tip).expect("Failed to add tip"); - } - - assert_eq!(manager.tip_count(), 3); - - // The tip with most work should be active - let active = manager.get_active_tip().expect("Should have an active tip"); - assert_eq!(active.height, 2); - assert!(active.is_active); - - // Add a tip with more work - let better_tip = ChainTip::dummy(1, 10); - manager.add_tip(better_tip).expect("Failed to add better tip"); - - // Active tip should update - let active = manager.get_active_tip().expect("Should have an active tip"); - assert_eq!(active.chain_work.as_bytes()[31], 10); - } - - #[test] - fn test_tip_eviction() { - let mut manager = ChainTipManager::new(2); - - // Fill to capacity - manager.add_tip(ChainTip::dummy(1, 5)).expect("Failed to add first tip"); - manager.add_tip(ChainTip::dummy(2, 10)).expect("Failed to add second tip"); - - // Adding another should evict the weakest - manager.add_tip(ChainTip::dummy(3, 7)).expect("Failed to add third tip"); - - assert_eq!(manager.tip_count(), 2); - - // The tip with work=5 should have been evicted - let tips = manager.get_all_tips(); - assert!(tips.iter().all(|t| t.chain_work.as_bytes()[31] >= 7)); - } - - #[test] - fn test_extend_tip_atomic() { - let mut manager = ChainTipManager::new(2); - - // Add two tips to fill capacity - let tip1 = ChainTip::dummy(1, 5); - let tip1_hash = tip1.hash; - manager.add_tip(tip1).expect("Failed to add tip1"); - - let tip2 = ChainTip::dummy(2, 10); - manager.add_tip(tip2).expect("Failed to add tip2"); - - // Extend tip1 successfully - since we remove tip1 first, there's room for the new tip - let new_header = ChainTip::dummy(3, 6).header; - let new_work = ChainWork::dummy(7); - - // The extend operation should succeed - let result = manager.extend_tip(&tip1_hash, new_header, new_work); - assert!(result.is_ok()); - - // The old tip should be gone - assert!(manager.get_tip(&tip1_hash).is_none()); - - // The new tip should exist - let new_tip_hash = new_header.block_hash(); - assert!(manager.get_tip(&new_tip_hash).is_some()); - assert_eq!(manager.tip_count(), 2); - } - - #[test] - fn test_extend_tip_atomic_with_failure() { - // To properly test atomic behavior, we need a custom scenario where add_tip can fail - // Since add_tip only fails when eviction fails (all tips are active), and only one - // tip can be active at a time, we need to test the restoration logic differently. - - // For now, we'll test that the extend operation is atomic when it succeeds - // A more complex test would require mocking or a different failure scenario - let mut manager = ChainTipManager::new(3); - - // Add three tips - let tip1 = ChainTip::dummy(1, 5); - let tip1_hash = tip1.hash; - manager.add_tip(tip1).expect("Failed to add tip1"); - - let tip2 = ChainTip::dummy(2, 10); - manager.add_tip(tip2).expect("Failed to add tip2"); - - let tip3 = ChainTip::dummy(3, 8); - manager.add_tip(tip3).expect("Failed to add tip3"); - - // Verify initial state - assert_eq!(manager.tip_count(), 3); - assert!(manager.get_tip(&tip1_hash).is_some()); - - // Extend tip1 - this should work and be atomic - let new_header = ChainTip::dummy(4, 6).header; - let new_work = ChainWork::dummy(6); - - let result = manager.extend_tip(&tip1_hash, new_header, new_work); - assert!(result.is_ok()); - - // Verify final state - old tip gone, new tip present - assert!(manager.get_tip(&tip1_hash).is_none()); - assert!(manager.get_tip(&new_header.block_hash()).is_some()); - assert_eq!(manager.tip_count(), 3); - } +use crate::types::HashedBlockHeader; + +/// A buffered fork branch that has been validated against the active chain. +/// +/// Carries the common-ancestor height in the active chain, the validated +/// headers that extend past that ancestor, and the resulting cumulative work +/// at the fork tip. A candidate is promoted once its `total_work` strictly +/// exceeds the active chain's work. +#[derive(Debug, Clone)] +pub(crate) struct ForkCandidate { + pub(crate) ancestor_height: u32, + pub(crate) headers: Vec, + pub(crate) total_work: ChainWork, } diff --git a/dash-spv/src/chain/chain_work.rs b/dash-spv/src/chain/chain_work.rs index 7a0bba655..fa4a2979c 100644 --- a/dash-spv/src/chain/chain_work.rs +++ b/dash-spv/src/chain/chain_work.rs @@ -38,25 +38,9 @@ impl ChainWork { } } - /// Create ChainWork from accumulated work at a given height plus a new header - /// - /// IMPORTANT: This is a temporary approximation that returns only the work from - /// the current header. For accurate cumulative work calculation, callers should - /// track the actual cumulative work by summing individual block work values. - /// - /// TODO: This function should be refactored to accept the previous cumulative work - /// as a parameter, or callers should maintain cumulative work separately. - pub fn from_height_and_header(_height: u32, header: &BlockHeader) -> Self { - // Currently returns only the work from the current header - // This is incorrect for cumulative work but better than adding height bytes - // which has no relation to proof-of-work - Self::from_header(header) - } - - /// Add the work from a header to this cumulative work - pub fn add_header(self, header: &BlockHeader) -> Self { - let header_work = Self::from_header(header); - self.combine(header_work) + /// Sum the per-header work of `headers` onto `base`. + pub(crate) fn accumulate(base: ChainWork, headers: &[BlockHeader]) -> ChainWork { + headers.iter().fold(base, |acc, h| acc.combine(Self::from_header(h))) } /// Add two ChainWork values diff --git a/dash-spv/src/chain/difficulty.rs b/dash-spv/src/chain/difficulty.rs new file mode 100644 index 000000000..7594985cd --- /dev/null +++ b/dash-spv/src/chain/difficulty.rs @@ -0,0 +1,272 @@ +//! Difficulty retargeting for Dash. +//! +//! Ports the Dark Gravity Wave v3 algorithm from `dashd`'s `pow.cpp`. Only the +//! DGW v3 branch is implemented because the staged-fork pipeline only ingests +//! forks at heights well past `nPowDGWHeight` on every network (34140 on +//! mainnet, 4001/4002 on testnet/regtest/devnet). + +use dashcore::consensus::Params; +use dashcore::pow::U256; +use dashcore::{CompactTarget, Header, Network, Target}; + +/// Number of blocks DGW v3 averages over. +const DGW_PAST_BLOCKS: u32 = 24; + +/// Dash target block spacing in seconds. +/// +/// `dashcore::consensus::Params` inherits Bitcoin's 600s default. Dash mainnet, +/// testnet, regtest, and devnet all run at 150s (2.5 minutes), so DGW retargets +/// against that value, not the Params field. +const DASH_TARGET_SPACING: u64 = 150; + +/// Compute the next nBits target using DGW v3. +/// +/// `previous_headers` must contain at least the most recent `DGW_PAST_BLOCKS` +/// headers in chain order (oldest first, `previous_headers.last()` being the +/// tip the new block will extend). The `tip_height` is the height of the last +/// entry. Returns `pow_limit` for heights below the DGW window per the dashd +/// pre-window short-circuit. +/// +/// Networks with retargeting disabled (`no_pow_retargeting`, regtest) or +/// without enough history return `pow_limit` directly, matching dashd's +/// behavior on those branches. +pub(crate) fn next_work_required_dgw_v3( + previous_headers: &[Header], + tip_height: u32, + params: &Params, +) -> CompactTarget { + let pow_limit_target = pow_limit_target(params.network); + let pow_limit_bits = pow_limit_target.to_compact_lossy(); + + if tip_height < DGW_PAST_BLOCKS { + return pow_limit_bits; + } + + if params.no_pow_retargeting { + return pow_limit_bits; + } + + // dashd's `fPowAllowMinDifficultyBlocks` branch (testnet/devnet/regtest) + // reverts to `pow_limit` when the candidate block is too far in the future + // of the tip. The staged-fork pipeline does not yet have the candidate + // block's own time at this call site, so it falls through to the standard + // DGW average, which is strictly stricter, never looser, than dashd's + // rule. Fork acceptance can therefore only be over-cautious on those + // networks, not under-cautious. + + if (previous_headers.len() as u32) < DGW_PAST_BLOCKS { + return pow_limit_bits; + } + + let window = &previous_headers[previous_headers.len() - DGW_PAST_BLOCKS as usize..]; + + let mut past_target_avg = U256::ZERO; + for (i, header) in window.iter().rev().enumerate() { + let count = (i + 1) as u64; + let target = U256::from_be_bytes(Target::from_compact(header.bits).to_be_bytes()); + if count == 1 { + past_target_avg = target; + } else { + // past_target_avg = (past_target_avg * count + target) / (count + 1) + past_target_avg = + (past_target_avg * U256::from(count) + target) / U256::from(count + 1); + } + } + + let last = window.last().expect("window length checked above"); + let first = window.first().expect("window length checked above"); + let actual = (last.time as i64 - first.time as i64).max(0) as u64; + let target_timespan = (DGW_PAST_BLOCKS as u64) * DASH_TARGET_SPACING; + + let actual_clamped = actual.max(target_timespan / 3).min(target_timespan.saturating_mul(3)); + + let mut bn_new = past_target_avg * U256::from(actual_clamped) / U256::from(target_timespan); + + let limit = U256::from_be_bytes(pow_limit_target.to_be_bytes()); + if bn_new > limit { + bn_new = limit; + } + + Target::from_be_bytes(bn_new.to_be_bytes()).to_compact_lossy() +} + +/// Check whether `candidate_time` triggers a min-difficulty exception for +/// networks where `allow_min_difficulty_blocks` is set (testnet, devnet). +/// +/// Mirrors dashd's `fPowAllowMinDifficultyBlocks` branch in `GetNextWorkRequired`: +/// - If candidate time > prev tip time + 2h: return `pow_limit` bits. +/// - If candidate time > prev tip time + 4×spacing: return prev tip bits * 10 +/// (clamped to `pow_limit`). +/// - Otherwise: return `None` (normal DGW applies). +pub(crate) fn min_difficulty_bits( + params: &Params, + prev_tip_time: u32, + prev_tip_bits: CompactTarget, + candidate_time: u32, +) -> Option { + if !params.allow_min_difficulty_blocks { + return None; + } + let pow_limit = pow_limit_target(params.network); + let pow_limit_bits = pow_limit.to_compact_lossy(); + let gap = (candidate_time as i64 - prev_tip_time as i64).max(0) as u64; + if gap > 2 * 60 * 60 { + return Some(pow_limit_bits); + } + if gap > DASH_TARGET_SPACING * 4 { + let prev_target = U256::from_be_bytes(Target::from_compact(prev_tip_bits).to_be_bytes()); + let limit = U256::from_be_bytes(pow_limit.to_be_bytes()); + // Clamp before multiplying to avoid 256-bit overflow: if prev_target is + // already at or above pow_limit / 10, the result caps to pow_limit. + let threshold = limit / U256::from(10_u64); + let result = if prev_target >= threshold { + limit + } else { + prev_target * U256::from(10_u64) + }; + return Some(Target::from_be_bytes(result.to_be_bytes()).to_compact_lossy()); + } + None +} + +fn pow_limit_target(network: Network) -> Target { + // dashcore stores network `pow_limit` as a `Work` value whose `to_target` + // inverse does not match dashd's `consensus.powLimit` for the network + // constants. Use the dashd values directly here so the DGW clamp uses the + // same upper bound the network enforces. + let mut bytes = [0u8; 32]; + match network { + // dashd mainnet/testnet: uint256S("00000fffffff...ff"), ~2^236. + Network::Mainnet | Network::Testnet => { + bytes[3] = 0x0f; + for b in bytes.iter_mut().skip(4) { + *b = 0xff; + } + } + // dashd regtest/devnet: uint256S("7fffffff...ff"), ~2^255. + Network::Regtest | Network::Devnet => { + bytes[0] = 0x7f; + for b in bytes.iter_mut().skip(1) { + *b = 0xff; + } + } + } + Target::from_be_bytes(bytes) +} + +#[cfg(test)] +mod tests { + use dashcore::block::Version; + use dashcore::{BlockHash, CompactTarget, Header, TxMerkleNode}; + use dashcore_hashes::Hash; + + use super::*; + + fn synthetic_header(bits: u32, time: u32) -> Header { + Header { + version: Version::ONE, + prev_blockhash: BlockHash::all_zeros(), + merkle_root: TxMerkleNode::all_zeros(), + time, + bits: CompactTarget::from_consensus(bits), + nonce: 0, + } + } + + fn pow_limit_compact(params: &Params) -> CompactTarget { + pow_limit_target(params.network).to_compact_lossy() + } + + #[test] + fn pow_limit_returned_below_window() { + let params = Params::new(Network::Mainnet); + let bits = next_work_required_dgw_v3(&[], 10, ¶ms); + assert_eq!(bits, pow_limit_compact(¶ms)); + } + + #[test] + fn pow_limit_returned_for_regtest_no_retargeting() { + let params = Params::new(Network::Regtest); + let window: Vec
= (0..DGW_PAST_BLOCKS) + .map(|i| synthetic_header(0x1d00ffff, 100 + i * DASH_TARGET_SPACING as u32)) + .collect(); + let bits = next_work_required_dgw_v3(&window, 100, ¶ms); + assert_eq!(bits, pow_limit_compact(¶ms)); + } + + #[test] + fn constant_difficulty_window_tightens_slightly() { + // 24 blocks at exactly target spacing. Because dashd's DGW only counts + // 23 intervals over 24 blocks, the new target trends slightly stricter + // (~23/24) even on a perfectly on-pace window. This is the documented + // bias in `pow.cpp`. We assert the direction and magnitude rather than + // strict equality. + let params = Params::new(Network::Mainnet); + let spacing = DASH_TARGET_SPACING as u32; + let bits = 0x1b0404cb_u32; + let window: Vec
= (0..DGW_PAST_BLOCKS) + .map(|i| synthetic_header(bits, 1_500_000_000 + i * spacing)) + .collect(); + + let next = next_work_required_dgw_v3(&window, 100_000, ¶ms); + let next_target = Target::from_compact(next); + let in_target = Target::from_compact(CompactTarget::from_consensus(bits)); + assert!( + next_target < in_target, + "DGW bias should yield a strictly tighter target on steady spacing" + ); + // The expected factor is 23/24, so the new target should be at least 90% of the old. + // We bound via a scaled comparison: in_target * 9 / 10 < next_target < in_target. + let scaled_floor = + U256::from_be_bytes(in_target.to_be_bytes()) * U256::from(9_u64) / U256::from(10_u64); + let next_u = U256::from_be_bytes(next_target.to_be_bytes()); + assert!( + next_u > scaled_floor, + "DGW bias should be modest (< 10%), got {:?} vs input {:?}", + next, + bits + ); + } + + #[test] + fn fast_blocks_raise_difficulty() { + // Blocks half the spacing apart should lower the target (raise difficulty). + let params = Params::new(Network::Mainnet); + let spacing = (DASH_TARGET_SPACING / 2) as u32; + let bits = 0x1b0404cb_u32; + let window: Vec
= (0..DGW_PAST_BLOCKS) + .map(|i| synthetic_header(bits, 1_500_000_000 + i * spacing)) + .collect(); + let next = next_work_required_dgw_v3(&window, 100_000, ¶ms); + + let next_target = Target::from_compact(next); + let prev_target = Target::from_compact(CompactTarget::from_consensus(bits)); + assert!( + next_target < prev_target, + "fast blocks should produce stricter target: next {:?} >= prev {:#x}", + next, + bits + ); + } + + #[test] + fn slow_blocks_lower_difficulty() { + // Blocks 2x apart should raise the target (lower difficulty). + let params = Params::new(Network::Mainnet); + let spacing = (DASH_TARGET_SPACING * 2) as u32; + let bits = 0x1b0404cb_u32; + let window: Vec
= (0..DGW_PAST_BLOCKS) + .map(|i| synthetic_header(bits, 1_500_000_000 + i * spacing)) + .collect(); + let next = next_work_required_dgw_v3(&window, 100_000, ¶ms); + + let next_target = Target::from_compact(next); + let prev_target = Target::from_compact(CompactTarget::from_consensus(bits)); + assert!( + next_target > prev_target, + "slow blocks should produce looser target: next {:?} <= prev {:#x}", + next, + bits + ); + } +} diff --git a/dash-spv/src/chain/mod.rs b/dash-spv/src/chain/mod.rs index 3231c70ea..527de1107 100644 --- a/dash-spv/src/chain/mod.rs +++ b/dash-spv/src/chain/mod.rs @@ -1,18 +1,13 @@ -//! Chain management module with reorganization support -//! -//! This module provides functionality for managing blockchain state including: -//! - Chain reorganization -//! - Multiple chain tip tracking -//! - Chain work calculation -//! - Transaction rollback during reorgs +//! Chain primitives: cumulative work, checkpoints, DGW v3 difficulty, and the fork-candidate carrier. -pub mod chain_tip; -pub mod chain_work; +pub(crate) mod chain_tip; +pub(crate) mod chain_work; pub mod checkpoints; +pub(crate) mod difficulty; #[cfg(test)] mod checkpoint_test; -pub use chain_tip::{ChainTip, ChainTipManager}; +pub(crate) use chain_tip::ForkCandidate; pub use chain_work::ChainWork; pub use checkpoints::{Checkpoint, CheckpointManager}; diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 8c7d4f29f..67574665d 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -76,6 +76,7 @@ impl DashSpvClient NetworkResult<()> { + pub fn request_block_headers(&self, locator: Vec) -> NetworkResult<()> { self.send_message(NetworkMessage::GetHeaders(GetHeadersMessage::new( - vec![start_hash], + locator, BlockHash::all_zeros(), ))) } pub fn request_block_headers_from_peer( &self, - start_hash: BlockHash, + locator: Vec, address: SocketAddr, ) -> NetworkResult<()> { self.send_message_to_peer( - NetworkMessage::GetHeaders(GetHeadersMessage::new( - vec![start_hash], - BlockHash::all_zeros(), - )), + NetworkMessage::GetHeaders(GetHeadersMessage::new(locator, BlockHash::all_zeros())), address, ) } diff --git a/dash-spv/src/sync/block_headers/fork_buffer.rs b/dash-spv/src/sync/block_headers/fork_buffer.rs new file mode 100644 index 000000000..75440b306 --- /dev/null +++ b/dash-spv/src/sync/block_headers/fork_buffer.rs @@ -0,0 +1,568 @@ +//! Per-peer staged fork buffer. +//! +//! Buffers fork headers received from peers until either their cumulative +//! work exceeds the active chain (`take_winning_candidate`), they age out +//! (`expire_stale`), or the peer disconnects (`remove_peer`). +//! +//! All buffered branches are independently validated: each header must meet +//! its claimed PoW target, satisfy the median-time-past rule against the +//! ancestor history, and match DGW v3's expected `nBits` from the ancestor's +//! 24-block window. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use dashcore::consensus::Params; +use dashcore::{BlockHash, Header}; + +use crate::chain::difficulty::{min_difficulty_bits, next_work_required_dgw_v3}; +use crate::chain::{ChainWork, ForkCandidate}; +use crate::error::{SyncError, SyncResult}; +use crate::types::HashedBlockHeader; + +/// Number of blocks median-time-past is computed over (BIP113). +const MTP_WINDOW: usize = 11; + +/// Cap on simultaneous fork branches buffered per peer to keep memory bounded. +pub(super) const MAX_FORK_HEADERS_PER_PEER: usize = 4096; + +/// Maximum distinct fork branch tips a single peer may contribute. +const MAX_BRANCHES_PER_PEER: usize = 16; + +#[derive(Debug)] +pub(super) struct ForkBuffer { + branches: HashMap<(SocketAddr, BlockHash), BufferedBranch>, + params: Params, +} + +#[derive(Debug)] +struct BufferedBranch { + headers: Vec, + ancestor_height: u32, + total_work: ChainWork, + arrived_at: Instant, +} + +impl ForkBuffer { + pub(super) fn new(params: Params) -> Self { + Self { + branches: HashMap::new(), + params, + } + } + + /// Validate and buffer a fork branch coming from `peer`. + /// + /// `headers` is the fork extension (oldest first, must connect to + /// `ancestor_header`). `history` contains the active-chain headers + /// preceding the ancestor in chain order, oldest first, with the ancestor + /// itself at `history.last()`. Used for MTP and DGW retarget anchoring. + pub(super) fn ingest( + &mut self, + peer: SocketAddr, + headers: &[Header], + ancestor_height: u32, + ancestor_header: Header, + history: &[Header], + ) -> SyncResult<()> { + if headers.is_empty() { + return Ok(()); + } + if headers.len() > MAX_FORK_HEADERS_PER_PEER { + return Err(SyncError::Validation(format!( + "Fork branch too large: {} headers (max {})", + headers.len(), + MAX_FORK_HEADERS_PER_PEER + ))); + } + let peer_branch_count = self.branches.keys().filter(|(p, _)| *p == peer).count(); + if peer_branch_count >= MAX_BRANCHES_PER_PEER { + return Err(SyncError::Validation(format!( + "Too many concurrent fork branches from peer {} (max {})", + peer, MAX_BRANCHES_PER_PEER + ))); + } + debug_assert_eq!( + history.last().map(|h| h.block_hash()), + Some(ancestor_header.block_hash()), + "history.last() must be the ancestor header" + ); + + // Chain continuity: each header must extend the previous. + let mut prev = ancestor_header; + let mut hashed: Vec = Vec::with_capacity(headers.len()); + let mut rolling_history: Vec
= history.to_vec(); + for (offset, header) in headers.iter().enumerate() { + let height = ancestor_height + offset as u32 + 1; + if header.prev_blockhash != prev.block_hash() { + return Err(SyncError::ForkChainBreak(format!( + "expected prev {}, got {}", + prev.block_hash(), + header.prev_blockhash + ))); + } + + // PoW target met. + let hashed_header = HashedBlockHeader::from(*header); + if !header.target().is_met_by(*hashed_header.hash()) { + return Err(SyncError::Validation(format!( + "Fork header at height {} failed PoW target", + height + ))); + } + + // Median time past: candidate time must strictly exceed MTP of + // last 11 ancestor headers. + let mtp = median_time_past(&rolling_history); + if (header.time as u64) <= mtp { + return Err(SyncError::Validation(format!( + "Fork header at height {} fails MTP rule ({} <= {})", + height, header.time, mtp + ))); + } + + // DGW v3 retarget anchored at the ancestor's window. + let expected_bits = + next_work_required_dgw_v3(&rolling_history, height - 1, &self.params); + let min_diff = min_difficulty_bits(&self.params, prev.time, prev.bits, header.time); + let bits_ok = header.bits == expected_bits || min_diff == Some(header.bits); + if !bits_ok { + return Err(SyncError::Validation(format!( + "Fork header at height {} bad bits: got {:?}, expected {:?}", + height, header.bits, expected_bits + ))); + } + + hashed.push(hashed_header); + rolling_history.push(*header); + prev = *header; + } + + let branch_work = ChainWork::accumulate(ChainWork::zero(), headers); + + let key = (peer, hashed.last().expect("non-empty fork branch").hash().to_owned()); + self.branches.insert( + key, + BufferedBranch { + headers: hashed, + ancestor_height, + total_work: branch_work, + arrived_at: Instant::now(), + }, + ); + + Ok(()) + } + + /// Drop branches older than `ttl`. Returns how many were evicted. + pub(super) fn expire_stale(&mut self, ttl: Duration) -> usize { + let now = Instant::now(); + let before = self.branches.len(); + self.branches.retain(|_, b| now.duration_since(b.arrived_at) <= ttl); + before - self.branches.len() + } + + /// Drop all buffered branches sourced from `peer`. + pub(super) fn remove_peer(&mut self, peer: SocketAddr) { + self.branches.retain(|(p, _), _| *p != peer); + } + + /// Take the buffered branch whose extension work strictly exceeds + /// `active_extension_work`. + /// + /// Caller supplies the cumulative work of the active chain's headers + /// from one past the candidate's ancestor up to the active tip. The + /// buffer returns a winner only when the fork's extension is heavier. + /// Phase 3 promotes the candidate by truncating storage at + /// `ancestor_height` and storing the fork headers. + pub(super) fn take_winning_candidate( + &mut self, + active_extension_work: ChainWork, + ) -> Option { + let winner_key = self.branches.iter().max_by_key(|(_, b)| b.total_work).map(|(k, _)| *k)?; + let branch = self.branches.remove(&winner_key)?; + if branch.total_work <= active_extension_work { + // Not a winner. Put it back to give future ingests a chance to + // extend the same branch. + self.branches.insert(winner_key, branch); + return None; + } + Some(ForkCandidate { + ancestor_height: branch.ancestor_height, + headers: branch.headers, + total_work: branch.total_work, + }) + } + + /// Return the set of branch tip hashes currently buffered. + pub(super) fn branch_tip_hashes(&self) -> impl Iterator { + self.branches.keys().map(|(_, tip)| tip) + } + + #[cfg(test)] + pub(super) fn len(&self) -> usize { + self.branches.len() + } +} + +fn median_time_past(history: &[Header]) -> u64 { + let window = if history.len() >= MTP_WINDOW { + &history[history.len() - MTP_WINDOW..] + } else { + history + }; + let mut times: Vec = window.iter().map(|h| h.time).collect(); + times.sort_unstable(); + times.get(times.len() / 2).copied().unwrap_or(0) as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::block::Version; + use dashcore::{BlockHash, CompactTarget, Header, Network, TxMerkleNode}; + use dashcore_hashes::Hash; + + /// Regtest `pow_limit` (≈2^255) expressed in compact form. Easy enough + /// that most nonces satisfy PoW and matches DGW's expected next bits + /// when retargeting is disabled on regtest. + const EASY_BITS: u32 = 0x207fffff; + + fn easy_header(prev: BlockHash, time: u32, nonce_start: u32) -> Header { + // Iterate nonces until PoW passes. With 2^255 target ~half pass. + for nonce in nonce_start..nonce_start + 32 { + let header = Header { + version: Version::ONE, + prev_blockhash: prev, + merkle_root: TxMerkleNode::all_zeros(), + time, + bits: CompactTarget::from_consensus(EASY_BITS), + nonce, + }; + if header.target().is_met_by(header.block_hash()) { + return header; + } + } + panic!("could not find a valid nonce within 32 tries"); + } + + fn build_chain(start_time: u32, count: usize, start_prev: BlockHash) -> Vec
{ + let mut prev = start_prev; + let mut t = start_time; + let mut headers = Vec::with_capacity(count); + for n in 0..count { + // each header at +600s satisfies MTP easily + let header = easy_header(prev, t, (n as u32) * 32); + prev = header.block_hash(); + t += 600; + headers.push(header); + } + headers + } + + fn regtest_params() -> Params { + // Regtest has no_pow_retargeting = true, so DGW falls through to + // pow_limit which matches our easy bits when run through DGW's clamp. + // For these tests we use mainnet's params with allow_min skipped but + // override no_pow_retargeting via regtest. + Params::new(Network::Regtest) + } + + #[test] + fn ingest_accepts_pre_dgw_window_ancestor() { + // Fork ancestor below the DGW window (24 blocks). DGW short-circuits to + // pow_limit, and the fork header carries pow_limit bits, so ingest must + // succeed without demanding a full 24-block history. + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + let active = build_chain(1_700_000_000, 6, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + let fork = build_chain(1_700_000_000 + 7 * 600, 2, ancestor.block_hash()); + + buf.ingest(peer, &fork, ancestor_height, ancestor, &active) + .expect("pre-DGW-window fork must be accepted"); + assert_eq!(buf.len(), 1); + } + + #[test] + fn ingest_validates_and_buffers_single_branch() { + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + // Active-chain history of 11 blocks so MTP works. + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // Fork extension of 3 headers, must start after the ancestor. + let fork = build_chain(1_700_000_000 + 12 * 600, 3, ancestor.block_hash()); + + buf.ingest(peer, &fork, ancestor_height, ancestor, &active).expect("ingest"); + assert_eq!(buf.len(), 1); + + // Force a winner by passing zero active work. + let candidate = + buf.take_winning_candidate(ChainWork::zero()).expect("candidate should win"); + assert_eq!(candidate.ancestor_height, ancestor_height); + assert_eq!(candidate.headers.len(), 3); + assert_eq!(buf.len(), 0); + } + + #[test] + fn ingest_rejects_branch_with_bad_pow() { + // One fork header with bits set to the hardest possible compact target + // so that no X11 hash can satisfy PoW. The DGW bits mismatch is also + // present, but PoW fires first and we assert specifically on that error. + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // 0x01000001 encodes target=1; no 32-byte X11 output can be ≤ 1. + let impossible_bits = CompactTarget::from_consensus(0x01000001); + let fork_header = Header { + version: Version::ONE, + prev_blockhash: ancestor.block_hash(), + merkle_root: TxMerkleNode::all_zeros(), + time: 1_700_000_000 + 12 * 600, + bits: impossible_bits, + nonce: 0, + }; + + let err = buf + .ingest(peer, &[fork_header], ancestor_height, ancestor, &active) + .expect_err("impossible PoW target must be rejected"); + assert!( + matches!(&err, SyncError::Validation(msg) if msg.contains("failed PoW target")), + "expected PoW failure, got: {:?}", + err + ); + assert_eq!(buf.len(), 0); + } + + #[test] + fn ingest_rejects_branch_with_wrong_bits() { + // One fork header with correct PoW nonce but wrong bits field (DGW mismatch + // only). Because the nonce was found for EASY_BITS and EASY_BITS is the + // regtest pow_limit, any change to bits leaves the nonce intact — the hash + // still meets the original target — so only the bits check fires. + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + let mut fork_header = easy_header(ancestor.block_hash(), 1_700_000_000 + 12 * 600, 0); + // Switch bits to 0x2100ffff (easier than EASY_BITS) so the existing + // nonce still satisfies PoW, but the bits value differs from the DGW + // expected output (0x207fffff on regtest). This isolates the DGW check. + let different_bits = CompactTarget::from_consensus(0x2100_ffff); + fork_header.bits = different_bits; + + let err = buf + .ingest(peer, &[fork_header], ancestor_height, ancestor, &active) + .expect_err("wrong bits must be rejected"); + assert!( + matches!(&err, SyncError::Validation(msg) if msg.contains("bad bits")), + "expected DGW bits failure, got: {:?}", + err + ); + assert_eq!(buf.len(), 0); + } + + #[test] + fn ingest_rejects_branch_with_stale_timestamp() { + // One fork header whose time equals the MTP of the ancestor window, + // violating the strictly-greater rule (time must be > MTP, not >=). + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + // Build 11 headers at regular 600s spacing. + let start_time = 1_700_000_000u32; + let active = build_chain(start_time, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // Compute the MTP of the 11-block window. + let mut times: Vec = active.iter().map(|h| h.time).collect(); + times.sort_unstable(); + let mtp = times[times.len() / 2] as u64; + + // Build a fork header with time == mtp (must be strictly greater). + // Search nonces for one whose hash meets the easy target at this time. + let mut fork_header = None; + for nonce in 0u32..64 { + let h = Header { + version: Version::ONE, + prev_blockhash: ancestor.block_hash(), + merkle_root: TxMerkleNode::all_zeros(), + time: mtp as u32, + bits: CompactTarget::from_consensus(EASY_BITS), + nonce, + }; + if h.target().is_met_by(h.block_hash()) { + fork_header = Some(h); + break; + } + } + let fork_header = fork_header.expect("nonce search exhausted for MTP test"); + + let err = buf + .ingest(peer, &[fork_header], ancestor_height, ancestor, &active) + .expect_err("MTP violation must be rejected"); + assert!( + matches!(&err, SyncError::Validation(msg) if msg.contains("MTP rule")), + "expected MTP failure, got: {:?}", + err + ); + assert_eq!(buf.len(), 0); + } + + #[test] + fn two_peers_serving_same_fork_dedup_per_peer() { + let peer_a: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let peer_b: SocketAddr = "5.6.7.8:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + let fork = build_chain(1_700_000_000 + 12 * 600, 3, ancestor.block_hash()); + + // Same fork (same hash chain) ingested twice from different peers. + buf.ingest(peer_a, &fork, ancestor_height, ancestor, &active).unwrap(); + buf.ingest(peer_b, &fork, ancestor_height, ancestor, &active).unwrap(); + // Keys differ by peer so both entries exist. + assert_eq!(buf.len(), 2); + // But the SAME fork tip hash is keyed under (peer, hash). A repeated + // ingest from the same peer with the same tip hash overwrites. + buf.ingest(peer_a, &fork, ancestor_height, ancestor, &active).unwrap(); + assert_eq!(buf.len(), 2); + } + + #[test] + fn expire_stale_drops_old_branches() { + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + let fork = build_chain(1_700_000_000 + 12 * 600, 3, ancestor.block_hash()); + buf.ingest(peer, &fork, ancestor_height, ancestor, &active).unwrap(); + + // TTL=0 expires immediately. + let evicted = buf.expire_stale(Duration::from_secs(0)); + assert_eq!(evicted, 1); + assert_eq!(buf.len(), 0); + } + + #[test] + fn remove_peer_clears_its_branches() { + let peer_a: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let peer_b: SocketAddr = "5.6.7.8:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + let fork = build_chain(1_700_000_000 + 12 * 600, 3, ancestor.block_hash()); + buf.ingest(peer_a, &fork, ancestor_height, ancestor, &active).unwrap(); + buf.ingest(peer_b, &fork, ancestor_height, ancestor, &active).unwrap(); + assert_eq!(buf.len(), 2); + buf.remove_peer(peer_a); + assert_eq!(buf.len(), 1); + } + + #[test] + fn ingest_rejects_peer_exceeding_branch_cap() { + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // Ingest MAX_BRANCHES_PER_PEER distinct branches (each with a unique tip). + for i in 0..MAX_BRANCHES_PER_PEER { + let fork_time = 1_700_000_000 + (12 + i as u32) * 600; + let branch = build_chain(fork_time, 1, ancestor.block_hash()); + buf.ingest(peer, &branch, ancestor_height, ancestor, &active) + .expect("ingest within cap should succeed"); + } + assert_eq!(buf.len(), MAX_BRANCHES_PER_PEER); + + // One more branch from the same peer must be rejected. + let extra_time = 1_700_000_000 + (12 + MAX_BRANCHES_PER_PEER as u32) * 600; + let extra = build_chain(extra_time, 1, ancestor.block_hash()); + let err = buf + .ingest(peer, &extra, ancestor_height, ancestor, &active) + .expect_err("17th branch must be rejected"); + assert!( + matches!(&err, SyncError::Validation(msg) if msg.contains("Too many concurrent")), + "expected branch-cap error, got: {:?}", + err + ); + assert_eq!(buf.len(), MAX_BRANCHES_PER_PEER, "buffer unchanged after rejection"); + } + + #[test] + fn ingest_rejects_chain_discontinuity_with_fork_chain_break() { + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let mut buf = ForkBuffer::new(regtest_params()); + + let active = build_chain(1_700_000_000, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // Build a valid fork header but give it a wrong prev_blockhash so + // the chain-continuity check fires and returns ForkChainBreak. + let wrong_prev = BlockHash::from_slice(&[0xAB; 32]).unwrap(); + let disconnected_header = easy_header(wrong_prev, 1_700_000_000 + 12 * 600, 0); + + let err = buf + .ingest(peer, &[disconnected_header], ancestor_height, ancestor, &active) + .expect_err("disconnected chain must be rejected"); + assert!( + matches!(&err, SyncError::ForkChainBreak(_)), + "expected ForkChainBreak, got: {:?}", + err + ); + assert_eq!(buf.len(), 0); + } + + #[test] + fn ingest_accepts_valid_min_difficulty_fork_header() { + // Regtest has `allow_min_difficulty_blocks = true` and a pow_limit that + // matches `EASY_BITS`. A fork header whose time gap from its predecessor + // exceeds 4×spacing triggers the min-difficulty exception: `bits` may + // equal `min_difficulty_bits(...)` rather than the strict DGW output. + // This exercises the `min_diff == Some(header.bits)` acceptance branch + // that rejection tests do not reach. + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let params = regtest_params(); + let mut buf = ForkBuffer::new(params.clone()); + + let start_time = 1_700_000_000u32; + let active = build_chain(start_time, 11, BlockHash::all_zeros()); + let ancestor_height = (active.len() as u32) - 1; + let ancestor = *active.last().unwrap(); + + // Gap well above 4×DASH_TARGET_SPACING (150s) so the min-difficulty + // branch fires and returns the pow_limit bits, which equal `EASY_BITS`. + let fork_time = ancestor.time + 750; + let expected_min_bits = + min_difficulty_bits(¶ms, ancestor.time, ancestor.bits, fork_time) + .expect("gap > 4×spacing must produce min-difficulty bits"); + assert_eq!(expected_min_bits, CompactTarget::from_consensus(EASY_BITS)); + + let fork_header = easy_header(ancestor.block_hash(), fork_time, 0); + buf.ingest(peer, &[fork_header], ancestor_height, ancestor, &active) + .expect("min-difficulty fork header must be accepted"); + assert_eq!(buf.len(), 1); + } +} diff --git a/dash-spv/src/sync/block_headers/manager.rs b/dash-spv/src/sync/block_headers/manager.rs index 713699f7c..d75bd5a65 100644 --- a/dash-spv/src/sync/block_headers/manager.rs +++ b/dash-spv/src/sync/block_headers/manager.rs @@ -11,17 +11,19 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; -use crate::chain::CheckpointManager; +use crate::chain::{ChainWork, CheckpointManager, ForkCandidate}; use crate::error::{SyncError, SyncResult}; use crate::network::RequestSender; use crate::storage::{BlockHeaderStorage, BlockHeaderTip, MetadataStorage}; +use crate::sync::block_headers::fork_buffer::ForkBuffer; use crate::sync::block_headers::HeadersPipeline; use crate::sync::{BlockHeadersProgress, ProgressPercentage, SyncEvent, SyncManager, SyncState}; use crate::types::HashedBlockHeader; use crate::validation::{BlockHeaderValidator, Validator}; use dashcore::block::Header; +use dashcore::consensus::Params; use dashcore::network::message_blockdata::Inventory; -use dashcore::BlockHash; +use dashcore::{BlockHash, Network}; use tokio::sync::RwLock; /// Headers manager for downloading and validating block headers. @@ -45,6 +47,15 @@ pub struct BlockHeadersManager { /// Peers we've sent a GetHeaders to after sync, so Dash Core knows our tip /// and can send us header announcements instead of inv. pub(super) announced_peers: HashSet, + /// Per-peer buffer of fork branches awaiting promotion. + pub(super) fork_buffer: ForkBuffer, + /// Fork branch that has beaten the active chain on work and is ready for + /// promotion by the sync coordinator. + pending_fork_candidate: Option, + /// Maps the last-known fork branch tip hash to its ancestor height. + /// Populated whenever a fork batch is buffered so that subsequent batches + /// extending the same branch are routed to `ingest_fork` correctly. + fork_tip_index: HashMap, } impl std::fmt::Debug for BlockHeadersManager { @@ -62,6 +73,7 @@ impl BlockHeadersManager { header_storage: Arc>, metadata_storage: Arc>, checkpoint_manager: Arc, + network: Network, ) -> SyncResult { let tip = header_storage .read() @@ -88,9 +100,86 @@ impl BlockHeadersManager { pipeline: HeadersPipeline::new(checkpoint_manager), pending_announcements: HashMap::new(), announced_peers: HashSet::new(), + fork_buffer: ForkBuffer::new(Params::new(network)), + pending_fork_candidate: None, + fork_tip_index: HashMap::new(), }) } + /// Number of ancestor headers DGW v3 requires to compute next bits. + const DGW_HISTORY: u32 = 24; + + /// Consume the fork candidate set when a buffered branch overtook the + /// active chain. The sync coordinator calls this to perform the actual reorg. + pub(crate) fn take_pending_fork_candidate(&mut self) -> Option { + self.pending_fork_candidate.take() + } + + /// Buffer a fork extension whose ancestor is on the active chain at a + /// height strictly below the current tip. + async fn ingest_fork( + &mut self, + peer: SocketAddr, + headers: &[Header], + ancestor_height: u32, + ) -> SyncResult<()> { + let storage = self.header_storage.read().await; + // Mirror dashd's pre-DGW-window short-circuit: when the ancestor sits + // below `DGW_HISTORY`, DGW returns `pow_limit` regardless of the + // window contents, so we only need what storage actually has. + let pre_window = ancestor_height + 1 < Self::DGW_HISTORY; + let history_start = ancestor_height.saturating_sub(Self::DGW_HISTORY); + let history = storage.load_headers(history_start..ancestor_height + 1).await?; + if !pre_window { + let expected_len = (ancestor_height + 1 - history_start) as usize; + if history.len() != expected_len { + return Err(SyncError::Validation(format!( + "storage gap before ancestor at height {}: expected {} headers, got {}", + ancestor_height, + expected_len, + history.len() + ))); + } + } + let ancestor = *history.last().ok_or_else(|| { + SyncError::Validation(format!("missing ancestor header at height {}", ancestor_height)) + })?; + let tip_height = storage + .get_tip_height() + .await + .ok_or_else(|| SyncError::MissingDependency("no tip height".to_string()))?; + let active_extension = storage.load_headers(ancestor_height + 1..tip_height + 1).await?; + drop(storage); + + self.fork_buffer.ingest(peer, headers, ancestor_height, ancestor, &history)?; + + // Track the new fork tip so subsequent batches extending this branch + // can be routed here even though their prev_blockhash won't be found + // on the active chain. + if let Some(last) = headers.last() { + self.fork_tip_index.insert(last.block_hash(), ancestor_height); + } + + let active_extension_work = ChainWork::accumulate(ChainWork::zero(), &active_extension); + if let Some(candidate) = self.fork_buffer.take_winning_candidate(active_extension_work) { + tracing::info!( + "Fork candidate ready for promotion: ancestor={} headers={} (peer {})", + candidate.ancestor_height, + candidate.headers.len(), + peer + ); + self.pending_fork_candidate = Some(candidate); + self.prune_fork_tip_index(); + } + Ok(()) + } + + /// Remove `fork_tip_index` entries whose branch no longer exists in the buffer. + pub(super) fn prune_fork_tip_index(&mut self) { + let live_tips: HashSet = self.fork_buffer.branch_tip_hashes().copied().collect(); + self.fork_tip_index.retain(|tip, _| live_tips.contains(tip)); + } + pub(super) async fn tip(&self) -> SyncResult { self.header_storage .read() @@ -100,6 +189,38 @@ impl BlockHeadersManager { .ok_or_else(|| SyncError::MissingDependency("storage not initialized".to_string())) } + /// Build a Dash Core style block locator from the current storage tip. + /// + /// First 10 entries step back by 1, then the step doubles each entry, and + /// genesis is always the final entry. Used as the `getheaders` locator so + /// peers on a fork can find the most recent common ancestor. + pub(super) async fn build_locator(&self) -> SyncResult> { + let storage = self.header_storage.read().await; + let tip_height = storage + .get_tip_height() + .await + .ok_or_else(|| SyncError::MissingDependency("storage not initialized".to_string()))?; + + let mut locator = Vec::with_capacity(32); + let mut step: u32 = 1; + let mut height = tip_height; + loop { + if let Some(header) = storage.get_header(height).await? { + locator.push(header.block_hash()); + } else { + tracing::warn!("build_locator: header at height {} missing from storage", height); + } + if height == 0 { + break; + } + height = height.saturating_sub(step); + if locator.len() > 10 { + step = step.saturating_mul(2); + } + } + Ok(locator) + } + /// Validate and store headers batch. async fn store_headers(&mut self, headers: &[HashedBlockHeader]) -> SyncResult { debug_assert!(!headers.is_empty()); @@ -123,6 +244,7 @@ impl BlockHeadersManager { pub(super) async fn handle_headers_pipeline( &mut self, headers: &[Header], + peer: SocketAddr, requests: &RequestSender, ) -> SyncResult> { if !self.pipeline.is_initialized() { @@ -131,6 +253,44 @@ impl BlockHeadersManager { return Ok(vec![]); } + // Check whether the batch is a fork extension before the pipeline + // sees it. A fork extension's `prev_blockhash` is a known active + // header whose height is strictly less than our tip. + if let Some(first) = headers.first() { + let storage = self.header_storage.read().await; + let prev_height = storage.get_header_height_by_hash(&first.prev_blockhash).await?; + let tip_height = storage + .get_tip_height() + .await + .ok_or_else(|| SyncError::MissingDependency("no tip height".to_string()))?; + drop(storage); + + if let Some(prev_h) = prev_height { + if prev_h < tip_height { + self.ingest_fork(peer, headers, prev_h).await?; + return Ok(Vec::new()); + } + } else if let Some(&ancestor_height) = self.fork_tip_index.get(&first.prev_blockhash) { + // prev_blockhash is a fork tip, not on the active chain. + // Route continuation batches to the fork buffer using the + // same ancestor_height as the first batch. Chain-break errors + // are swallowed because the second batch anchors at the + // active-chain ancestor rather than the buffered fork tip; + // proper re-anchoring is deferred to Phase 3. + match self.ingest_fork(peer, headers, ancestor_height).await { + Ok(()) => {} + Err(SyncError::ForkChainBreak(msg)) => { + tracing::debug!( + "fork continuation chain break (deferred to Phase 3): {}", + msg + ); + } + Err(e) => return Err(e), + } + return Ok(Vec::new()); + } + } + let was_syncing = self.state() == SyncState::Syncing; let tip_was_complete = self.pipeline.is_tip_complete(); @@ -144,10 +304,13 @@ impl BlockHeadersManager { ); } - // Send more requests during initial sync or active post-sync catch-up. - // Skip for unsolicited headers. + // Send more requests during initial sync or active post-sync catch-up + // before processing ready batches so network and storage work overlap. + // During initial sync the segment tip has already advanced past storage + // so the storage-derived locator would never be selected; pass an empty + // slice and let `send_pending` use the single-entry fallback directly. if was_syncing || !tip_was_complete { - let sent = self.pipeline.send_pending(requests)?; + let sent = self.pipeline.send_pending(requests, &[])?; if sent > 0 { tracing::debug!("Pipeline sent {} more requests", sent); } @@ -199,7 +362,8 @@ impl BlockHeadersManager { self.pending_announcements.len() ); self.pipeline.reset_tip_segment(); - self.pipeline.send_pending(requests)?; + let locator = self.build_locator().await?; + self.pipeline.send_pending(requests, &locator)?; } else { // Synced to the tip and no pending announcements, finalize and emit event let tip = self.tip().await?; @@ -262,8 +426,11 @@ mod tests { use crate::storage::{ DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager, }; + use crate::sync::block_headers::fork_buffer::MAX_FORK_HEADERS_PER_PEER; use crate::sync::{ManagerIdentifier, SyncManager, SyncManagerProgress}; use dashcore::network::message::NetworkMessage; + use dashcore::{block::Version, CompactTarget, TxMerkleNode}; + use dashcore_hashes::Hash; use tokio::sync::mpsc::unbounded_channel; type TestBlockHeadersManager = @@ -279,9 +446,14 @@ mod tests { let genesis = Header::dummy_batch(0..1); storage.store_headers(&genesis).await.unwrap(); let checkpoint_manager = create_test_checkpoint_manager(); - BlockHeadersManager::new(storage.block_headers(), storage.metadata(), checkpoint_manager) - .await - .expect("Failed to create BlockHeadersManager") + BlockHeadersManager::new( + storage.block_headers(), + storage.metadata(), + checkpoint_manager, + Network::Testnet, + ) + .await + .expect("Failed to create BlockHeadersManager") } /// Create a manager in synced state with an initialized pipeline. @@ -347,8 +519,9 @@ mod tests { let (sender, mut rx) = create_test_request_sender(); let header = Header::dummy_chain(1, tip_hash).remove(0); + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); - let events = manager.handle_headers_pipeline(&[header], &sender).await.unwrap(); + let events = manager.handle_headers_pipeline(&[header], peer, &sender).await.unwrap(); // Header should have been stored assert_eq!(events.len(), 1); @@ -421,7 +594,8 @@ mod tests { // Active catch-up: peer connect skipped while pipeline has pending request let mut manager = create_synced_manager().await; manager.pipeline.reset_tip_segment(); - manager.pipeline.send_pending(&requests).unwrap(); + let locator = manager.build_locator().await.unwrap(); + manager.pipeline.send_pending(&requests, &locator).unwrap(); rx.try_recv().unwrap(); // drain the pipeline GetHeaders manager.handle_network_event(&connect, &requests).await.unwrap(); @@ -457,7 +631,8 @@ mod tests { // segment's current_tip_hash to advanced_hash. let header = Header::dummy_chain(1, initial_locator).remove(0); let advanced_hash = header.block_hash(); - manager.handle_headers_pipeline(&[header], &requests).await.unwrap(); + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + manager.handle_headers_pipeline(&[header], peer, &requests).await.unwrap(); // Drain the follow-up GetHeaders that send_pending issued. match rx.try_recv().expect("follow-up GetHeaders not sent") { @@ -534,6 +709,254 @@ mod tests { assert!(rx.try_recv().is_err()); } + #[tokio::test] + async fn lagging_peer_sending_tip_extension_is_not_classified_as_fork() { + // A header whose prev_blockhash IS our tip (equal height, not strictly + // less) must flow through the normal pipeline path, never the fork + // buffer. This guards against treating slow peers (or our own next + // block arriving after a catch-up) as a reorg. + let mut manager = create_test_manager().await; + let tip = manager.tip().await.unwrap(); + let tip_hash = *tip.hash(); + + manager.pipeline.init(0, tip_hash, 0); + manager.pipeline.mark_tip_complete(); + manager.progress.set_state(SyncState::Synced); + + let (sender, _rx) = create_test_request_sender(); + let header = Header::dummy_chain(1, tip_hash).remove(0); + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + + let events = manager.handle_headers_pipeline(&[header], peer, &sender).await.unwrap(); + + // Extension stored, no fork candidate generated. + assert_eq!(events.len(), 1); + assert!(matches!(events[0], SyncEvent::BlockHeadersStored { .. })); + assert!(manager.take_pending_fork_candidate().is_none()); + } + + #[tokio::test] + async fn test_build_locator_shape_matches_dashd_algorithm() { + // Build a 10K-block chain in storage and verify the locator follows + // the dashd algorithm: first 10 entries step back by 1, then the step + // doubles, and genesis is always included. + let mut storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let chain = Header::dummy_chain(10_000, BlockHash::all_zeros()); + // First header in dummy_chain has prev = all_zeros (treat as genesis). + storage.store_headers(&chain).await.unwrap(); + let checkpoint_manager = create_test_checkpoint_manager(); + let manager = BlockHeadersManager::new( + storage.block_headers(), + storage.metadata(), + checkpoint_manager, + Network::Testnet, + ) + .await + .unwrap(); + + let locator = manager.build_locator().await.unwrap(); + let tip_height = (chain.len() - 1) as u32; + + // First entry equals the tip hash. + assert_eq!(locator[0], chain[tip_height as usize].block_hash()); + + // Reconstruct expected heights with the dashd algorithm. + let mut expected_heights: Vec = Vec::new(); + let mut step: u32 = 1; + let mut height = tip_height; + loop { + expected_heights.push(height); + if height == 0 { + break; + } + height = height.saturating_sub(step); + if expected_heights.len() > 10 { + step = step.saturating_mul(2); + } + } + + assert_eq!(locator.len(), expected_heights.len(), "locator length"); + for (i, h) in expected_heights.iter().enumerate() { + assert_eq!( + locator[i], + chain[*h as usize].block_hash(), + "locator[{}] should be hash at height {}", + i, + h + ); + } + + // Genesis is the final entry. + assert_eq!(*locator.last().unwrap(), chain[0].block_hash()); + + // Stays under the dashd ~32 entry bound. + assert!(locator.len() <= 32, "locator should not exceed 32 entries, got {}", locator.len()); + } + + /// Build a regtest manager seeded with `count` blocks so the storage tip is + /// at height `count - 1`. Returns the manager and the stored chain. + async fn create_regtest_manager_with_chain( + count: usize, + ) -> (TestBlockHeadersManager, Vec
) { + let mut storage = DiskStorageManager::with_temp_dir().await.unwrap(); + // Build a real hash-chained regtest chain using easy PoW so storage + // can index the hashes for `get_header_height_by_hash`. + let easy_bits = CompactTarget::from_consensus(0x207fffff); + let mut prev = BlockHash::all_zeros(); + let mut chain = Vec::with_capacity(count); + for i in 0..count { + let mut header = None; + for nonce in 0u32..64 { + let h = Header { + version: Version::ONE, + prev_blockhash: prev, + merkle_root: TxMerkleNode::all_zeros(), + time: 1_700_000_000 + i as u32 * 600, + bits: easy_bits, + nonce, + }; + if h.target().is_met_by(h.block_hash()) { + header = Some(h); + break; + } + } + let h = header.expect("nonce space exhausted"); + prev = h.block_hash(); + chain.push(h); + } + storage.store_headers(&chain).await.unwrap(); + let checkpoint_manager = Arc::new(CheckpointManager::new(vec![])); + let manager = BlockHeadersManager::new( + storage.block_headers(), + storage.metadata(), + checkpoint_manager, + Network::Regtest, + ) + .await + .expect("failed to create regtest manager"); + (manager, chain) + } + + #[tokio::test] + async fn fork_header_at_depth_is_routed_to_buffer() { + // Store a 5-block chain (heights 0-4). Build a fork extending height 1 + // (depth 3 below tip=4). The fork must be routed to the fork buffer, + // not the pipeline. Routing is confirmed by the return being empty + // events and the fork buffer holding one entry. + let easy_bits = CompactTarget::from_consensus(0x207fffff); + + let (mut manager, chain) = create_regtest_manager_with_chain(5).await; + let tip = manager.tip().await.unwrap(); + let tip_hash = *tip.hash(); + + manager.pipeline.init(0, tip_hash, tip.height()); + manager.pipeline.mark_tip_complete(); + manager.progress.set_state(SyncState::Synced); + + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let (sender, _rx) = create_test_request_sender(); + + // Build one valid fork header extending chain[1] (height 1, depth 3). + let ancestor = chain[1]; + let fork_time = ancestor.time + 11 * 600 + 1; + let mut fork_header = None; + for nonce in 0u32..64 { + let h = Header { + version: Version::ONE, + prev_blockhash: ancestor.block_hash(), + merkle_root: TxMerkleNode::all_zeros(), + time: fork_time, + bits: easy_bits, + nonce, + }; + if h.target().is_met_by(h.block_hash()) { + fork_header = Some(h); + break; + } + } + let fork_header = fork_header.expect("nonce space exhausted"); + + let events = manager.handle_headers_pipeline(&[fork_header], peer, &sender).await.unwrap(); + + // Fork path returns no events, not the pipeline path. + assert!(events.is_empty()); + // Branch entered the buffer. + assert_eq!(manager.fork_buffer.len(), 1); + assert!(manager.take_pending_fork_candidate().is_none()); + + // Second batch extending the fork: prev_blockhash is the first fork header's hash, + // which is not on the active chain. The fork_tip_index must route it into + // ingest_fork rather than silently dropping it as an unmatched pipeline batch. + let fork_tip = fork_header.block_hash(); + let second_fork_time = fork_time + 600; + let mut second_fork_header = None; + for nonce in 0u32..64 { + let h = Header { + version: Version::ONE, + prev_blockhash: fork_tip, + merkle_root: TxMerkleNode::all_zeros(), + time: second_fork_time, + bits: easy_bits, + nonce, + }; + if h.target().is_met_by(h.block_hash()) { + second_fork_header = Some(h); + break; + } + } + let second_fork_header = second_fork_header.expect("nonce space exhausted"); + + // The continuation batch is routed through fork_tip_index to ingest_fork. + // The ingest fails the chain-continuity check (second batch anchors at the + // active-chain ancestor, not the buffered fork tip), but the ForkChainBreak + // error is swallowed so the peer is not penalized. The fork buffer stays + // at one branch; proper multi-batch continuation is deferred to Phase 3. + let result2 = manager.handle_headers_pipeline(&[second_fork_header], peer, &sender).await; + assert!(result2.is_ok(), "continuation must not propagate error to caller: {:?}", result2); + assert_eq!( + manager.fork_buffer.len(), + 1, + "fork buffer must not grow (second ingest fails chain-continuity)" + ); + } + + #[tokio::test] + async fn fork_continuation_non_fork_chain_break_error_propagates() { + let (mut manager, _chain) = create_regtest_manager_with_chain(5).await; + let tip = manager.tip().await.unwrap(); + let tip_hash = *tip.hash(); + manager.pipeline.init(0, tip_hash, tip.height()); + manager.pipeline.mark_tip_complete(); + manager.progress.set_state(SyncState::Synced); + + let fake_fork_tip = BlockHash::from_slice(&[0xAB; 32]).unwrap(); + manager.fork_tip_index.insert(fake_fork_tip, 1); + + // Craft a batch that exceeds MAX_FORK_HEADERS_PER_PEER (4096) so + // fork_buffer.ingest returns SyncError::Validation("Fork branch too + // large") before the chain-break check. This is not a ForkChainBreak + // error and must reach the Err(e) => return Err(e) arm. + let oversized_header = Header { + version: Version::ONE, + prev_blockhash: fake_fork_tip, + merkle_root: TxMerkleNode::all_zeros(), + time: 1_700_000_000, + bits: CompactTarget::from_consensus(0x207fffff), + nonce: 0, + }; + let oversized_batch = vec![oversized_header; MAX_FORK_HEADERS_PER_PEER + 1]; + + let peer: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let (sender, _rx) = create_test_request_sender(); + + let result = manager.handle_headers_pipeline(&oversized_batch, peer, &sender).await; + assert!( + matches!(&result, Err(SyncError::Validation(_))), + "non-ForkChainBreak error must propagate from fork continuation: {:?}", + result + ); + } + #[tokio::test] async fn test_empty_headers_after_tip_announcement_is_harmless() { let mut manager = create_synced_manager().await; @@ -549,11 +972,54 @@ mod tests { rx.try_recv().unwrap(); // drain the GetHeaders request // Peer responds with empty headers (same height as us) - let events = manager.handle_headers_pipeline(&[], &requests).await.unwrap(); + let events = manager.handle_headers_pipeline(&[], addr, &requests).await.unwrap(); // No events emitted, no requests sent, tip segment stays complete assert!(events.is_empty()); assert!(rx.try_recv().is_err()); assert!(manager.pipeline.is_tip_complete()); } + + #[tokio::test] + async fn tick_retries_sync_with_single_entry_locator_before_first_response() { + // Simulate a timeout before any headers response arrives: the pipeline is + // initialized but no headers have been received yet, so the segment's + // current_tip_hash still equals the storage tip. The tick handler must + // still send a GetHeaders using the single-entry locator (segment tip), + // not silently skip because no in-flight request exists. + let mut manager = create_test_manager().await; + let tip = manager.tip().await.unwrap(); + let tip_hash = *tip.hash(); + + let initial_event = NetworkEvent::PeersUpdated { + connected_count: 1, + best_height: Some(40_000), + addresses: vec![], + }; + let (requests, mut rx) = create_test_request_sender(); + manager.handle_network_event(&initial_event, &requests).await.unwrap(); + assert_eq!(manager.state(), SyncState::Syncing); + + // Drain the initial GetHeaders from start_sync. + rx.try_recv().expect("start_sync must send initial GetHeaders"); + assert!(rx.try_recv().is_err()); + + // Simulate a timeout: clear the in-flight request so send_pending can retry, + // then fire tick as if the 30-second request timeout had elapsed. + manager.pipeline.clear_in_flight(); + manager.tick(&requests).await.unwrap(); + + // Tick must have issued a GetHeaders whose locator starts from the + // storage tip (segment tip == storage tip before the first response). + let msg = rx.try_recv().expect("tick must send retry GetHeaders"); + match msg { + NetworkRequest::SendMessage(NetworkMessage::GetHeaders(m)) => { + assert_eq!( + m.locator_hashes[0], tip_hash, + "retry locator must start at the storage tip" + ); + } + other => panic!("expected GetHeaders, got {:?}", other), + } + } } diff --git a/dash-spv/src/sync/block_headers/mod.rs b/dash-spv/src/sync/block_headers/mod.rs index 2493e9d40..020623852 100644 --- a/dash-spv/src/sync/block_headers/mod.rs +++ b/dash-spv/src/sync/block_headers/mod.rs @@ -1,3 +1,4 @@ +mod fork_buffer; mod manager; mod pipeline; mod progress; diff --git a/dash-spv/src/sync/block_headers/pipeline.rs b/dash-spv/src/sync/block_headers/pipeline.rs index cce5baca9..d311e7c7a 100644 --- a/dash-spv/src/sync/block_headers/pipeline.rs +++ b/dash-spv/src/sync/block_headers/pipeline.rs @@ -118,16 +118,33 @@ impl HeadersPipeline { } /// Send pending requests for active segments. + /// + /// `tip_locator` is used for the tip segment (target_height = None) when + /// its first entry matches the segment's current tip hash, so the peer can + /// find a common ancestor when its chain has forked. Mid-sync, the segment + /// advances in memory ahead of storage so the storage-derived locator + /// would not match. In that case, and for checkpoint segments, fall back + /// to a single-entry locator anchored at the segment's current tip hash. /// Returns the number of requests sent. - pub fn send_pending(&mut self, requests: &RequestSender) -> SyncResult { + pub fn send_pending( + &mut self, + requests: &RequestSender, + tip_locator: &[BlockHash], + ) -> SyncResult { let mut sent = 0; for segment in &mut self.segments { - // Skip completed segments if segment.complete { continue; } while segment.can_send() { - segment.send_request(requests)?; + let locator = if segment.target_height.is_none() + && tip_locator.first() == Some(&segment.current_tip_hash) + { + tip_locator.to_vec() + } else { + vec![segment.current_tip_hash] + }; + segment.send_request(requests, locator)?; sent += 1; } } @@ -407,7 +424,7 @@ mod tests { let (sender, mut rx) = create_test_request_sender(); - let sent = pipeline.send_pending(&sender).unwrap(); + let sent = pipeline.send_pending(&sender, &[]).unwrap(); // Should send at least one request per segment assert!(sent >= pipeline.segment_count()); @@ -626,4 +643,53 @@ mod tests { pipeline.mark_tip_complete(); assert!(pipeline.is_tip_complete()); } + + #[test] + fn send_pending_uses_full_locator_when_storage_matches_tip_and_single_entry_otherwise() { + use crate::network::NetworkRequest; + use dashcore::network::message::NetworkMessage; + + let tip_hash = BlockHash::dummy(77); + let mut tip_seg = SegmentState::new(0, 100, tip_hash, None, None); + tip_seg.current_tip_hash = tip_hash; + + let cm = create_test_checkpoint_manager(true); + let mut pipeline = HeadersPipeline::new(cm); + pipeline.initialized = true; + pipeline.segments = vec![tip_seg]; + + let (sender, mut rx) = create_test_request_sender(); + let full_locator = vec![tip_hash, BlockHash::dummy(1), BlockHash::dummy(0)]; + + // Storage-caught-up case: tip_locator[0] == segment.current_tip_hash. + // Expect the full locator to be sent. + pipeline.send_pending(&sender, &full_locator).unwrap(); + match rx.try_recv().unwrap() { + NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => { + assert_eq!( + msg.locator_hashes, full_locator, + "full locator when storage matches segment tip" + ); + } + other => panic!("unexpected request: {:?}", other), + } + + // Mid-sync case: segment has advanced past storage, so tip_locator[0] + // no longer matches segment.current_tip_hash. Expect single-entry fallback. + let advanced_hash = BlockHash::dummy(88); + pipeline.segments[0].current_tip_hash = advanced_hash; + pipeline.segments[0].clear_in_flight(); + + pipeline.send_pending(&sender, &full_locator).unwrap(); + match rx.try_recv().unwrap() { + NetworkRequest::SendMessage(NetworkMessage::GetHeaders(msg)) => { + assert_eq!( + msg.locator_hashes, + vec![advanced_hash], + "single-entry fallback when storage lags segment tip" + ); + } + other => panic!("unexpected request: {:?}", other), + } + } } diff --git a/dash-spv/src/sync/block_headers/segment_state.rs b/dash-spv/src/sync/block_headers/segment_state.rs index 9829d1250..8339c4ed6 100644 --- a/dash-spv/src/sync/block_headers/segment_state.rs +++ b/dash-spv/src/sync/block_headers/segment_state.rs @@ -63,9 +63,22 @@ impl SegmentState { !self.complete && !self.coordinator.is_in_flight(&self.current_tip_hash) } - /// Send a GetHeaders request for this segment. - pub(super) fn send_request(&mut self, requests: &RequestSender) -> SyncResult<()> { - requests.request_block_headers(self.current_tip_hash)?; + /// Send a GetHeaders request for this segment using the provided locator. + /// + /// The locator's first entry must equal `current_tip_hash`. The full + /// locator lets peers on forks find the most recent common ancestor. + pub(super) fn send_request( + &mut self, + requests: &RequestSender, + locator: Vec, + ) -> SyncResult<()> { + debug_assert!( + !locator.is_empty() && locator[0] == self.current_tip_hash, + "segment {} locator must start at current_tip_hash {}", + self.segment_id, + self.current_tip_hash + ); + requests.request_block_headers(locator)?; self.coordinator.mark_sent(&[self.current_tip_hash]); tracing::debug!( "Segment {}: sent GetHeaders from height {} hash {}", diff --git a/dash-spv/src/sync/block_headers/sync_manager.rs b/dash-spv/src/sync/block_headers/sync_manager.rs index d47e2cfe8..73f1d6567 100644 --- a/dash-spv/src/sync/block_headers/sync_manager.rs +++ b/dash-spv/src/sync/block_headers/sync_manager.rs @@ -14,6 +14,9 @@ use std::time::{Duration, Instant}; /// Timeout waiting for unsolicited header messages after a block announcement. pub(super) const UNSOLICITED_HEADERS_WAIT_TIMEOUT: Duration = Duration::from_secs(3); +/// Maximum age of a staged fork branch before it is dropped from the buffer. +pub(super) const FORK_BUFFER_TTL: Duration = Duration::from_secs(60); + #[async_trait] impl SyncManager for BlockHeadersManager { fn identifier(&self) -> ManagerIdentifier { @@ -78,7 +81,8 @@ impl SyncManager for BlockHeadersMana } // Send initial batch of requests - let sent = self.pipeline.send_pending(requests)?; + let locator = self.build_locator().await?; + let sent = self.pipeline.send_pending(requests, &locator)?; tracing::info!("Pipeline: sent {} initial requests", sent); Ok(vec![SyncEvent::SyncStart { @@ -94,7 +98,8 @@ impl SyncManager for BlockHeadersMana match msg.inner() { NetworkMessage::Headers(headers) => { // Always route through pipeline when initialized - self.handle_headers_pipeline(headers, requests).await + let peer = msg.peer_address(); + self.handle_headers_pipeline(headers, peer, requests).await } NetworkMessage::Inv(inv) => { @@ -121,10 +126,29 @@ impl SyncManager for BlockHeadersMana } self.pipeline.handle_timeouts(); + let evicted = self.fork_buffer.expire_stale(FORK_BUFFER_TTL); + if evicted > 0 { + tracing::debug!("Expired {} stale fork branches", evicted); + self.prune_fork_tip_index(); + } + if let Some(candidate) = self.take_pending_fork_candidate() { + // Reorg promotion is not yet wired into the coordinator. Log here + // so detection can be confirmed end-to-end before that lands. + tracing::warn!( + "Fork candidate ready (ancestor={} headers={} work_bytes={:?}), \ + reorg promotion not yet wired", + candidate.ancestor_height, + candidate.headers.len(), + candidate.total_work.as_bytes() + ); + } - // During initial sync, send more requests and log progress + // During initial sync, send more requests and log progress. + // The segment tip is always ahead of storage during active sync so the + // storage-derived locator would never be selected; pass an empty slice + // and let `send_pending` use the single-entry fallback directly. if self.state() == SyncState::Syncing { - let sent = self.pipeline.send_pending(requests)?; + let sent = self.pipeline.send_pending(requests, &[])?; if sent > 0 { tracing::debug!("Tick: pipeline sent {} more requests", sent); } @@ -152,7 +176,8 @@ impl SyncManager for BlockHeadersMana // Reset tip segment and send requests via pipeline self.pipeline.reset_tip_segment(); - self.pipeline.send_pending(requests)?; + let locator = self.build_locator().await?; + self.pipeline.send_pending(requests, &locator)?; for hash in stale { self.pending_announcements.remove(&hash); @@ -182,8 +207,9 @@ impl SyncManager for BlockHeadersMana && !self.pipeline.tip_segment_has_pending_request() { let tip = self.tip().await?; + let locator = self.build_locator().await?; tracing::info!("Announcing tip {} to new peer {}", tip.height(), address); - requests.request_block_headers_from_peer(*tip.hash(), *address)?; + requests.request_block_headers_from_peer(locator, *address)?; self.announced_peers.insert(*address); } } @@ -191,6 +217,8 @@ impl SyncManager for BlockHeadersMana address, } => { self.announced_peers.remove(address); + self.fork_buffer.remove_peer(*address); + self.prune_fork_tip_index(); } NetworkEvent::PeersUpdated { connected_count, @@ -221,7 +249,8 @@ impl SyncManager for BlockHeadersMana ); // Reset tip segment and send requests via pipeline self.pipeline.reset_tip_segment(); - self.pipeline.send_pending(requests)?; + let locator = self.build_locator().await?; + self.pipeline.send_pending(requests, &locator)?; } } } diff --git a/dash-spv/src/test_utils/chain_tip.rs b/dash-spv/src/test_utils/chain_tip.rs deleted file mode 100644 index 8890071df..000000000 --- a/dash-spv/src/test_utils/chain_tip.rs +++ /dev/null @@ -1,12 +0,0 @@ -use dashcore::Header; - -use crate::chain::{ChainTip, ChainWork}; - -impl ChainTip { - pub fn dummy(height: u32, work_value: u8) -> ChainTip { - let header = Header::dummy(height); - let chain_work = ChainWork::dummy(work_value); - - ChainTip::new(header, height, chain_work) - } -} diff --git a/dash-spv/src/test_utils/mod.rs b/dash-spv/src/test_utils/mod.rs index 61acedda0..75c734d1c 100644 --- a/dash-spv/src/test_utils/mod.rs +++ b/dash-spv/src/test_utils/mod.rs @@ -1,4 +1,3 @@ -mod chain_tip; mod chain_work; mod checkpoint; mod context; diff --git a/dash/src/pow.rs b/dash/src/pow.rs index 06456db69..d1dfe4184 100644 --- a/dash/src/pow.rs +++ b/dash/src/pow.rs @@ -314,18 +314,19 @@ impl Decodable for CompactTarget { // (high, low): u.0 contains the high bits, u.1 contains the low bits. #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] #[cfg_attr(feature = "bincode", derive(Encode, Decode))] -struct U256(u128, u128); +pub struct U256(u128, u128); impl U256 { const MAX: U256 = U256(0xffff_ffff_ffff_ffff_ffff_ffff_ffff_ffff, 0xffff_ffff_ffff_ffff_ffff_ffff_ffff_ffff); - const ZERO: U256 = U256(0, 0); + /// The zero value. + pub const ZERO: U256 = U256(0, 0); const ONE: U256 = U256(0, 1); /// Creates [`U256`] from a big-endian array of `u8`s. - fn from_be_bytes(a: [u8; 32]) -> U256 { + pub fn from_be_bytes(a: [u8; 32]) -> U256 { let (high, low) = split_in_half(a); let big = u128::from_be_bytes(high); let little = u128::from_be_bytes(low); @@ -341,7 +342,7 @@ impl U256 { } /// Converts `Self` to a big-endian array of `u8`s. - fn to_be_bytes(self) -> [u8; 32] { + pub fn to_be_bytes(self) -> [u8; 32] { let mut out = [0; 32]; out[..16].copy_from_slice(&self.0.to_be_bytes()); out[16..].copy_from_slice(&self.1.to_be_bytes());