From 0b0cd67da5ba3643fcfbfea17da691f0fb0e6cd9 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:12:07 +0200 Subject: [PATCH 01/12] Add probing service Introduce a background probing service that periodically sends payment probes to discover liquidity along Lightning routes. Probes update the local scorer with channel liquidity information, improving pathfinding for subsequent real payments. The service supports three strategies: - HighDegree: probes nodes with the most channels in the network graph - Random: walks random paths from the local node - Custom: user-supplied strategy via the `ProbingStrategy` trait A dedicated `ProbingConfigBuilder` exposes amount bounds, locked-msat caps, probing intervals, and per-node cooldowns, with sensible defaults. The service runs as a cancellable background task driven by the existing `Runtime`, and budget accounting tracks both in-flight and locked amounts to bound outbound liquidity exposure. UniFFI bindings expose the probing service to the Swift, Kotlin, and Python language bindings. Co-Authored-By: Claude Sonnet 4.6 --- bindings/ldk_node.udl | 5 + src/builder.rs | 82 ++++- src/config.rs | 6 + src/event.rs | 29 +- src/lib.rs | 21 ++ src/probing.rs | 749 ++++++++++++++++++++++++++++++++++++++++++ src/util.rs | 37 +++ 7 files changed, 916 insertions(+), 13 deletions(-) create mode 100644 src/probing.rs create mode 100644 src/util.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7368b0291..beb81efa2 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,10 @@ typedef dictionary TorConfig; typedef interface NodeEntropy; +typedef interface ProbingConfig; + +typedef interface ProbingConfigBuilder; + typedef enum WordCount; [Remote] @@ -61,6 +65,7 @@ interface Builder { [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); + void set_probing_config(ProbingConfig config); [Throws=BuildError] Node build(NodeEntropy node_entropy); [Throws=BuildError] diff --git a/src/builder.rs b/src/builder.rs index 0b44dc153..5660d6b75 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -10,6 +10,7 @@ use std::convert::TryInto; use std::default::Default; use std::net::ToSocketAddrs; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::SystemTime; use std::{fmt, fs}; @@ -51,6 +52,7 @@ use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, HRNResolverConfig, TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MIN_PROBE_AMOUNT_MSAT, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -77,6 +79,9 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing::{ + HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, RandomStrategy, +}; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -293,6 +298,7 @@ pub struct NodeBuilder { runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, + probing_config: Option, } impl NodeBuilder { @@ -311,6 +317,8 @@ impl NodeBuilder { let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; + let async_payments_role = None; + let probing_config = None; Self { config, chain_data_source_config, @@ -318,9 +326,10 @@ impl NodeBuilder { liquidity_source_config, log_writer_config, runtime_handle, - async_payments_role: None, + async_payments_role, pathfinding_scores_sync_config, recovery_mode, + probing_config, } } @@ -626,6 +635,25 @@ impl NodeBuilder { self } + /// Configures background probing. + /// + /// Use [`ProbingConfigBuilder`] to build the configuration: + /// ```ignore + /// use ldk_node::probing::ProbingConfigBuilder; + /// + /// builder.set_probing_config( + /// ProbingConfigBuilder::high_degree(100) + /// .interval(Duration::from_secs(30)) + /// .build() + /// ); + /// ``` + /// + /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder + pub fn set_probing_config(&mut self, config: ProbingConfig) -> &mut Self { + self.probing_config = Some(config); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -797,6 +825,7 @@ impl NodeBuilder { self.gossip_source_config.as_ref(), self.liquidity_source_config.as_ref(), self.pathfinding_scores_sync_config.as_ref(), + self.probing_config.as_ref(), self.async_payments_role, self.recovery_mode, seed_bytes, @@ -1097,6 +1126,13 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } + /// Configures background probing. + /// + /// See [`ProbingConfig`] for details. + pub fn set_probing_config(&self, config: Arc) { + self.inner.write().unwrap().set_probing_config((*config).clone()); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1240,8 +1276,9 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, - async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], - runtime: Arc, logger: Arc, kv_store: Arc, + probing_config: Option<&ProbingConfig>, async_payments_role: Option, + recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, + kv_store: Arc, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1639,7 +1676,10 @@ fn build_with_store_internal( }, } - let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) { + scoring_fee_params.probing_diversity_penalty_msat = penalty; + } let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -2019,6 +2059,39 @@ fn build_with_store_internal( _leak_checker.0.push(Arc::downgrade(&wallet) as Weak); } + let prober = probing_config.map(|probing_cfg| { + let strategy: Arc = match &probing_cfg.kind { + ProbingStrategyKind::HighDegree { top_node_count } => { + Arc::new(HighDegreeStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + Arc::clone(&router), + *top_node_count, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + probing_cfg.cooldown, + config.probing_liquidity_limit_multiplier, + )) + }, + ProbingStrategyKind::Random { max_hops } => Arc::new(RandomStrategy::new( + Arc::clone(&network_graph), + Arc::clone(&channel_manager), + *max_hops, + DEFAULT_MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )), + ProbingStrategyKind::Custom(s) => Arc::clone(s), + }; + Arc::new(Prober { + channel_manager: Arc::clone(&channel_manager), + logger: Arc::clone(&logger), + strategy, + interval: probing_cfg.interval, + max_locked_msat: probing_cfg.max_locked_msat, + locked_msat: Arc::new(AtomicU64::new(0)), + }) + }); + Ok(Node { runtime, stop_sender, @@ -2052,6 +2125,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver, + prober, #[cfg(cycle_tests)] _leak_checker, }) diff --git a/src/config.rs b/src/config.rs index 014d6216a..8b28d4015 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,12 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; +pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10; +pub(crate) const MIN_PROBING_INTERVAL: Duration = Duration::from_millis(100); +pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour +pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats +pub(crate) const DEFAULT_MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats +pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; // The default timeout after which we abort a wallet syncing operation. diff --git a/src/event.rs b/src/event.rs index 3161daa2a..3eda18790 100644 --- a/src/event.rs +++ b/src/event.rs @@ -52,6 +52,7 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; +use crate::probing::Prober; use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, @@ -509,12 +510,13 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, - runtime: Arc, - logger: L, - config: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + prober: Option>, + runtime: Arc, + logger: L, + config: Arc, } impl EventHandler @@ -530,7 +532,7 @@ where payment_store: Arc, peer_store: Arc>, keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, - runtime: Arc, logger: L, config: Arc, + prober: Option>, runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -544,12 +546,13 @@ where payment_store, peer_store, keys_manager, - logger, - runtime, - config, static_invoice_store, onion_messenger, om_mailbox, + prober, + runtime, + logger, + config, } } @@ -1158,8 +1161,16 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { .. } => {}, - LdkEvent::ProbeFailed { .. } => {}, + LdkEvent::ProbeSuccessful { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_successful(&path); + } + }, + LdkEvent::ProbeFailed { path, .. } => { + if let Some(prober) = &self.prober { + prober.handle_probe_failed(&path); + } + }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source.handle_htlc_handling_failed(failure_type).await; diff --git a/src/lib.rs b/src/lib.rs index 6902228a6..f0d695af6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,10 +101,12 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +pub mod probing; mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -113,6 +115,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(cycle_tests)] use std::{any::Any, sync::Weak}; +#[cfg(feature = "uniffi")] +use crate::probing::ProbingConfig; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; @@ -170,6 +174,9 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +#[cfg(feature = "uniffi")] +pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; +use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; use types::{ @@ -239,6 +246,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: HRNResolver, + prober: Option>, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -596,11 +604,19 @@ impl Node { static_invoice_store, Arc::clone(&self.onion_messenger), self.om_mailbox.clone(), + self.prober.clone(), Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); + if let Some(prober) = self.prober.clone() { + let stop_rx = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + run_prober(prober, stop_rx).await; + }); + } + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1079,6 +1095,11 @@ impl Node { )) } + /// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured. + pub fn prober(&self) -> Option<&Prober> { + self.prober.as_deref() + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 000000000..3d0b1af75 --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,749 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Background probing strategies for training the payment scorer. + +use std::collections::HashMap; +use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; +#[cfg(feature = "uniffi")] +use std::sync::RwLock; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use bitcoin::secp256k1::PublicKey; +use lightning::routing::gossip::NodeId; +use lightning::routing::router::{ + Path, PaymentParameters, RouteHop, RouteParameters, MAX_PATH_LENGTH_ESTIMATE, +}; +use lightning_invoice::DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning_types::features::{ChannelFeatures, NodeFeatures}; + +use crate::config::{ + DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBED_NODE_COOLDOWN_SECS, + DEFAULT_PROBING_INTERVAL_SECS, MIN_PROBING_INTERVAL, +}; +use crate::logger::{log_debug, LdkLogger, Logger}; +use crate::types::{ChannelManager, Graph, Router}; +use crate::util::random_range; + +use lightning::routing::router::Router as LdkRouter; + +/// Which built-in probing strategy to use, or a custom one. +#[derive(Clone)] +pub(crate) enum ProbingStrategyKind { + HighDegree { top_node_count: usize }, + Random { max_hops: usize }, + Custom(Arc), +} + +impl fmt::Debug for ProbingStrategyKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::HighDegree { top_node_count } => { + f.debug_struct("HighDegree").field("top_node_count", top_node_count).finish() + }, + Self::Random { max_hops } => { + f.debug_struct("Random").field("max_hops", max_hops).finish() + }, + Self::Custom(_) => f.write_str("Custom()"), + } + } +} + +/// Configuration for the background probing subsystem. +/// +/// Construct via [`ProbingConfigBuilder`]. Pick a strategy with +/// [`ProbingConfigBuilder::high_degree`], [`ProbingConfigBuilder::random_walk`], or +/// [`ProbingConfigBuilder::custom`], chain optional setters, and finalize with +/// [`ProbingConfigBuilder::build`]. +/// +/// # Caution +/// +/// Probes send real HTLCs along real paths. If an intermediate hop is offline or +/// misbehaving, the probe HTLC can remain in-flight — locking outbound liquidity +/// on the first-hop channel until the HTLC timeout elapses (potentially hours). +/// `max_locked_msat` caps the total outbound capacity that in-flight probes may +/// hold at any one time; tune it conservatively for nodes with tight liquidity. +/// +/// # Example +/// ```ignore +/// let config = ProbingConfigBuilder::high_degree(100) +/// .interval(Duration::from_secs(30)) +/// .max_locked_msat(500_000) +/// .diversity_penalty_msat(250) +/// .build(); +/// builder.set_probing_config(config); +/// ``` +#[derive(Clone, Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Object))] +pub struct ProbingConfig { + pub(crate) kind: ProbingStrategyKind, + pub(crate) interval: Duration, + pub(crate) max_locked_msat: u64, + pub(crate) diversity_penalty_msat: Option, + pub(crate) cooldown: Duration, +} + +/// Builder for [`ProbingConfig`]. +/// +/// Pick a strategy with [`high_degree`], [`random_walk`], or [`custom`], chain optional +/// setters, and call [`build`] to finalize. +/// +/// [`high_degree`]: Self::high_degree +/// [`random_walk`]: Self::random_walk +/// [`custom`]: Self::custom +/// [`build`]: Self::build +pub struct ProbingConfigBuilder { + kind: ProbingStrategyKind, + interval: Duration, + max_locked_msat: u64, + diversity_penalty_msat: Option, + cooldown: Duration, +} + +impl ProbingConfigBuilder { + fn with_kind(kind: ProbingStrategyKind) -> Self { + Self { + kind, + interval: Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS), + max_locked_msat: DEFAULT_MAX_PROBE_LOCKED_MSAT, + diversity_penalty_msat: None, + cooldown: Duration::from_secs(DEFAULT_PROBED_NODE_COOLDOWN_SECS), + } + } + + /// Start building a config that probes toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + pub fn high_degree(top_node_count: usize) -> Self { + Self::with_kind(ProbingStrategyKind::HighDegree { top_node_count }) + } + + /// Start building a config that probes via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + pub fn random_walk(max_hops: usize) -> Self { + Self::with_kind(ProbingStrategyKind::Random { max_hops }) + } + + /// Start building a config with a custom [`ProbingStrategy`] implementation. + pub fn custom(strategy: Arc) -> Self { + Self::with_kind(ProbingStrategyKind::Custom(strategy)) + } + + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. + pub fn interval(&mut self, interval: Duration) -> &mut Self { + self.interval = interval; + self + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn max_locked_msat(&mut self, max_msat: u64) -> &mut Self { + self.max_locked_msat = max_msat; + self + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self { + self.diversity_penalty_msat = Some(penalty_msat); + self + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. + pub fn cooldown(&mut self, cooldown: Duration) -> &mut Self { + self.cooldown = cooldown; + self + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> ProbingConfig { + ProbingConfig { + kind: self.kind.clone(), + interval: self.interval.max(MIN_PROBING_INTERVAL), + max_locked_msat: self.max_locked_msat, + diversity_penalty_msat: self.diversity_penalty_msat, + cooldown: self.cooldown, + } + } +} + +/// A UniFFI-compatible wrapper around [`ProbingConfigBuilder`] that uses interior mutability +/// so it can be shared behind an `Arc` as required by the FFI object model. +/// +/// Obtain one via the constructors [`new_high_degree`] or [`new_random_walk`], configure it +/// with the `set_*` methods, then call [`build`] to produce a [`ProbingConfig`]. +/// +/// [`new_high_degree`]: Self::new_high_degree +/// [`new_random_walk`]: Self::new_random_walk +/// [`build`]: Self::build +#[cfg(feature = "uniffi")] +#[derive(uniffi::Object)] +pub struct ArcedProbingConfigBuilder { + inner: RwLock, +} + +#[cfg(feature = "uniffi")] +#[uniffi::export] +impl ArcedProbingConfigBuilder { + /// Creates a builder configured to probe toward the highest-degree nodes in the graph. + /// + /// `top_node_count` controls how many of the most-connected nodes are cycled through. + #[uniffi::constructor] + pub fn new_high_degree(top_node_count: u64) -> Arc { + Arc::new(Self { + inner: RwLock::new(ProbingConfigBuilder::high_degree(top_node_count as usize)), + }) + } + + /// Creates a builder configured to probe via random graph walks. + /// + /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + #[uniffi::constructor] + pub fn new_random_walk(max_hops: u64) -> Arc { + Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) + } + + /// Overrides the interval between probe attempts. Defaults to 10 seconds. + pub fn set_interval(&self, secs: u64) { + self.inner.write().unwrap().interval(Duration::from_secs(secs)); + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// + /// Defaults to 100 000 000 msat (100k sats). + pub fn set_max_locked_msat(&self, max_msat: u64) { + self.inner.write().unwrap().max_locked_msat(max_msat); + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { + self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + } + + /// Sets how long a probed node stays ineligible before being probed again. + /// + /// Only applies to the high-degree strategy. Defaults to 1 hour. + pub fn set_cooldown(&self, secs: u64) { + self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + } + + /// Builds the [`ProbingConfig`]. + pub fn build(&self) -> Arc { + Arc::new(self.inner.read().unwrap().build()) + } +} + +/// Strategy can be used for determining the next target and amount for probing. +pub trait ProbingStrategy: Send + Sync + 'static { + /// Returns the next probe path to run, or `None` to skip this tick. + fn next_probe(&self) -> Option; +} + +/// Probes toward the most-connected nodes in the graph. +/// +/// On each tick the strategy reads the current gossip graph, sorts nodes by +/// channel count, and picks the highest-degree node from the top +/// `top_node_count` that has not been probed within `cooldown`. +/// Nodes probed more recently are skipped so that the strategy +/// naturally spreads across the top nodes and picks up graph changes. +/// If all top nodes are on cooldown, the cooldown map is cleared and a new cycle begins +/// immediately. +/// +/// The probe amount is chosen uniformly at random from +/// `[min_amount_msat, max_amount_msat]`. +pub struct HighDegreeStrategy { + network_graph: Arc, + channel_manager: Arc, + router: Arc, + /// How many of the highest-degree nodes to cycle through. + pub top_node_count: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, + /// How long a node stays ineligible after being probed. + pub cooldown: Duration, + /// Skip a path when the first-hop outbound liquidity is less than + /// `path_value * liquidity_limit_multiplier`. + pub liquidity_limit_multiplier: u64, + /// Nodes probed recently, with the time they were last probed. + recently_probed: Mutex>, +} + +impl HighDegreeStrategy { + /// Creates a new high-degree probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, router: Arc, + top_node_count: usize, min_amount_msat: u64, max_amount_msat: u64, cooldown: Duration, + liquidity_limit_multiplier: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + router, + top_node_count, + min_amount_msat, + max_amount_msat, + cooldown, + liquidity_limit_multiplier, + recently_probed: Mutex::new(HashMap::new()), + } + } +} + +impl ProbingStrategy for HighDegreeStrategy { + fn next_probe(&self) -> Option { + let graph = self.network_graph.read_only(); + + let mut nodes_by_degree: Vec<(PublicKey, usize)> = graph + .nodes() + .unordered_iter() + .filter_map(|(id, info)| { + PublicKey::try_from(*id).ok().map(|pubkey| (pubkey, info.channels.len())) + }) + .collect(); + + if nodes_by_degree.is_empty() { + return None; + } + + nodes_by_degree.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let top_node_count = self.top_node_count.min(nodes_by_degree.len()); + let now = Instant::now(); + + let mut probed = self.recently_probed.lock().unwrap_or_else(|e| e.into_inner()); + + // We could check staleness when we use the entry, but that way we'd not clear cache at + // all. For hundreds of top nodes it's okay to call retain each tick. + probed.retain(|_, probed_at| now.duration_since(*probed_at) < self.cooldown); + + // If all top nodes are on cooldown, reset and start a new cycle. + let final_node = match nodes_by_degree[..top_node_count] + .iter() + .find(|(pubkey, _)| !probed.contains_key(pubkey)) + { + Some((pubkey, _)) => *pubkey, + None => { + probed.clear(); + nodes_by_degree[0].0 + }, + }; + + probed.insert(final_node, now); + drop(probed); + drop(graph); + + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + let payment_params = + PaymentParameters::from_node_id(final_node, DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32); + let route_params = + RouteParameters::from_payment_params_and_value(payment_params, amount_msat); + + let payer = self.channel_manager.get_our_node_id(); + let usable_channels = self.channel_manager.list_usable_channels(); + let first_hops: Vec<&_> = usable_channels.iter().collect(); + let inflight_htlcs = self.channel_manager.compute_inflight_htlcs(); + + let route = self + .router + .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + .ok()?; + + let path = route.paths.into_iter().next()?; + + // Liquidity-limit check (mirrors send_preflight_probes): skip the path when the + // first-hop outbound liquidity is less than path_value * liquidity_limit_multiplier. + if let Some(first_hop_hop) = path.hops.first() { + if let Some(ch) = usable_channels + .iter() + .find(|h| h.get_outbound_payment_scid() == Some(first_hop_hop.short_channel_id)) + { + let path_value = path.final_value_msat() + path.fee_msat(); + if ch.next_outbound_htlc_limit_msat + < path_value.saturating_mul(self.liquidity_limit_multiplier) + { + return None; + } + } + } + + Some(path) + } +} + +/// Explores the graph by walking a random number of hops outward from one of our own +/// channels, constructing the [`Path`] explicitly. +/// +/// On each tick: +/// 1. Picks one of our confirmed, usable channels to start from. +/// 2. Performs a random walk of a chosen depth (up to [`MAX_PATH_LENGTH_ESTIMATE`]) through the +/// gossip graph, skipping disabled channels and dead-ends. +/// +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +/// +/// Because path selection ignores the scorer, this probes channels the router +/// would never try on its own, teaching the scorer about previously unknown paths. +pub struct RandomStrategy { + network_graph: Arc, + channel_manager: Arc, + /// Upper bound on the number of hops in a randomly constructed path. + pub max_hops: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, +} + +impl RandomStrategy { + /// Creates a new random-walk probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, max_hops: usize, + min_amount_msat: u64, max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + max_hops: max_hops.clamp(1, MAX_PATH_LENGTH_ESTIMATE as usize), + min_amount_msat, + max_amount_msat, + } + } + + /// Tries to build a path of `target_hops` hops. Returns `None` if the local node has no + /// usable channels, or the walk terminates before reaching `target_hops`. + fn try_build_path(&self, target_hops: usize, amount_msat: u64) -> Option { + let initial_channels = self + .channel_manager + .list_channels() + .into_iter() + .filter(|c| c.is_usable && c.short_channel_id.is_some()) + .collect::>(); + + if initial_channels.is_empty() { + return None; + } + + let graph = self.network_graph.read_only(); + let first_hop = + &initial_channels[random_range(0, initial_channels.len() as u64 - 1) as usize]; + let first_hop_scid = first_hop.short_channel_id?; + let next_peer_pubkey = first_hop.counterparty.node_id; + let next_peer_node_id = NodeId::from_pubkey(&next_peer_pubkey); + + // Track the tightest HTLC limit across all hops to cap the probe amount. + // The first hop limit comes from our live channel state; subsequent hops use htlc_maximum_msat from the gossip channel update. + let mut route_least_htlc_upper_bound = first_hop.next_outbound_htlc_limit_msat; + let mut route_greatest_htlc_lower_bound = first_hop.next_outbound_htlc_minimum_msat; + + // Walk the graph: each entry is (node_id, arrived_via_scid, pubkey); first entry is set: + let mut route: Vec<(NodeId, u64, PublicKey)> = + vec![(next_peer_node_id, first_hop_scid, next_peer_pubkey)]; + + let mut prev_scid = first_hop_scid; + let mut current_node_id = next_peer_node_id; + + for _ in 1..target_hops { + let node_info = match graph.node(¤t_node_id) { + Some(n) => n, + None => break, + }; + + // Skip the edge we arrived on. Longer cycles aren't filtered — probes fail at + // the destination anyway, so revisiting nodes is harmless. + let candidates: Vec = + node_info.channels.iter().copied().filter(|&scid| scid != prev_scid).collect(); + + if candidates.is_empty() { + break; + } + + let next_scid = candidates[random_range(0, candidates.len() as u64 - 1) as usize]; + let next_channel = match graph.channel(next_scid) { + Some(c) => c, + None => break, + }; + + // as_directed_from validates that current_node_id is a channel endpoint and that + // both direction updates are present; effective_capacity covers both htlc_maximum_msat + // and funding capacity. + let Some((directed, next_node_id)) = next_channel.as_directed_from(¤t_node_id) + else { + break; + }; + // Retrieve the direction-specific update via the public ChannelInfo fields. + // as_directed_from already checked both directions are Some, but we break + // defensively rather than unwrap. + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => break, + }; + + if !update.enabled { + break; + } + + route_least_htlc_upper_bound = + route_least_htlc_upper_bound.min(update.htlc_maximum_msat); + + route_greatest_htlc_lower_bound = + route_greatest_htlc_lower_bound.max(update.htlc_minimum_msat); + + let next_pubkey = match PublicKey::try_from(*next_node_id) { + Ok(pk) => pk, + Err(_) => break, + }; + + route.push((*next_node_id, next_scid, next_pubkey)); + prev_scid = next_scid; + current_node_id = *next_node_id; + } + + if route_greatest_htlc_lower_bound > route_least_htlc_upper_bound { + return None; + } + let amount_msat = + amount_msat.max(route_greatest_htlc_lower_bound).min(route_least_htlc_upper_bound); + if amount_msat < self.min_amount_msat || amount_msat > self.max_amount_msat { + return None; + } + + // Assemble hops backwards so each hop's proportional fee is computed on the amount it actually forwards + let mut hops = Vec::with_capacity(route.len()); + let mut forwarded = amount_msat; + let last = route.len() - 1; + + // Resolve (node_features, channel_features, maybe_announced_channel) for a hop. + // The first hop is our local channel and may be unannounced, so its ChannelFeatures + // are not in the gossip graph — match on SCID to detect it and fall back to local-state + // defaults. All other (walked) hops were picked from the graph and must resolve there. + let hop_features = + |node_id: &NodeId, via_scid: u64| -> Option<(NodeFeatures, ChannelFeatures, bool)> { + let node_features = graph + .node(node_id) + .and_then(|n| n.announcement_info.as_ref().map(|a| a.features().clone())) + .unwrap_or_else(NodeFeatures::empty); + let (channel_features, maybe_announced_channel) = if via_scid == first_hop_scid { + (ChannelFeatures::empty(), false) + } else { + (graph.channel(via_scid)?.features.clone(), true) + }; + Some((node_features, channel_features, maybe_announced_channel)) + }; + + // Final hop: fee_msat carries the delivery amount; cltv delta is zero. + { + let (node_id, via_scid, pubkey) = route[last]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: amount_msat, + cltv_expiry_delta: 0, + maybe_announced_channel, + }); + } + + // Non-final hops, from second-to-last back to first. + for i in (0..last).rev() { + let (node_id, via_scid, pubkey) = route[i]; + let (node_features, channel_features, maybe_announced_channel) = + hop_features(&node_id, via_scid)?; + + let (_, next_scid, _) = route[i + 1]; + let next_channel = graph.channel(next_scid)?; + let (directed, _) = next_channel.as_directed_from(&node_id)?; + let update = match if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref() + } else { + next_channel.two_to_one.as_ref() + } { + Some(u) => u, + None => return None, + }; + let fee = update.fees.base_msat as u64 + + (forwarded * update.fees.proportional_millionths as u64 / 1_000_000); + forwarded += fee; + + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features, + fee_msat: fee, + cltv_expiry_delta: update.cltv_expiry_delta as u32, + maybe_announced_channel, + }); + } + + hops.reverse(); + + // The first-hop HTLC carries amount_msat + all intermediate fees. + // Verify the total fits within our live outbound limit before returning. + let total_outgoing: u64 = hops.iter().map(|h| h.fee_msat).sum(); + if total_outgoing > first_hop.next_outbound_htlc_limit_msat { + return None; + } + + Some(Path { hops, blinded_tail: None }) + } +} + +impl ProbingStrategy for RandomStrategy { + fn next_probe(&self) -> Option { + let target_hops = random_range(1, self.max_hops as u64) as usize; + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + + self.try_build_path(target_hops, amount_msat) + } +} + +/// Periodically dispatches probes according to a [`ProbingStrategy`]. +pub struct Prober { + pub(crate) channel_manager: Arc, + pub(crate) logger: Arc, + /// The strategy that decides what to probe. + pub strategy: Arc, + /// How often to fire a probe attempt. + pub interval: Duration, + /// Maximum total millisatoshis that may be locked in in-flight probes at any time. + pub max_locked_msat: u64, + pub(crate) locked_msat: Arc, +} + +fn fmt_path(path: &lightning::routing::router::Path) -> String { + path.hops + .iter() + .map(|h| format!("{}(scid={})", h.pubkey, h.short_channel_id)) + .collect::>() + .join(" -> ") +} + +impl Prober { + /// Returns the total millisatoshis currently locked in in-flight probes. + pub fn locked_msat(&self) -> u64 { + self.locked_msat.load(Ordering::Relaxed) + } + + pub(crate) fn handle_probe_successful(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let prev = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) + .expect("fetch_update closure always returns Some"); + let new = prev.saturating_sub(amount); + log_debug!( + self.logger, + "Probe successful: released {} msat (locked_msat {} -> {}), path: {}", + amount, + prev, + new, + fmt_path(path) + ); + } + + pub(crate) fn handle_probe_failed(&self, path: &lightning::routing::router::Path) { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let prev = self + .locked_msat + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) + .expect("fetch_update closure always returns Some"); + let new = prev.saturating_sub(amount); + log_debug!( + self.logger, + "Probe failed: released {} msat (locked_msat {} -> {}), path: {}", + amount, + prev, + new, + fmt_path(path) + ); + } +} + +/// Runs the probing loop for the given [`Prober`] until `stop_rx` fires. +pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::watch::Receiver<()>) { + let mut ticker = tokio::time::interval(prober.interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + biased; + _ = stop_rx.changed() => { + log_debug!(prober.logger, "Stopping background probing."); + return; + } + _ = ticker.tick() => { + let path = match prober.strategy.next_probe() { + Some(p) => p, + None => continue, + }; + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + if prober.locked_msat.load(Ordering::Acquire) + amount > prober.max_locked_msat { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + continue; + } + match prober.channel_manager.send_probe(path.clone()) { + Ok(_) => { + prober.locked_msat.fetch_add(amount, Ordering::Release); + log_debug!( + prober.logger, + "Probe sent: locked {} msat, path: {}", + amount, + fmt_path(&path) + ); + } + Err(e) => { + log_debug!( + prober.logger, + "Probe send failed: {:?}, path: {}", + e, + fmt_path(&path) + ); + } + } + } + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 000000000..3350ad2c7 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,37 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +/// Returns a random `u64` uniformly distributed in `[min, max]` (inclusive). +pub(crate) fn random_range(min: u64, max: u64) -> u64 { + debug_assert!(min <= max); + if min == max { + return min; + } + let range = match (max - min).checked_add(1) { + Some(r) => r, + None => { + // overflowed — full u64::MAX range + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + return u64::from_ne_bytes(buf); + }, + }; + // We remove bias due to the fact that the range does not evenly divide 2⁶⁴. + // Imagine we had a range from 0 to 2⁶⁴-2 (of length 2⁶⁴-1), then + // the outcomes of 0 would be twice as frequent as any other, as 0 can be produced + // as randomly drawn 0 % 2⁶⁴-1 and as well as 2⁶⁴-1 % 2⁶⁴-1 + let limit = u64::MAX - (u64::MAX % range); + loop { + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + let val = u64::from_ne_bytes(buf); + if val < limit { + return min + (val % range); + } + // loop runs ~1 iteration on average, in worst case it's ~2 iterations on average + } +} From 1a8f945ff3f230360abd01f55653fc05725dc2d2 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Tue, 28 Apr 2026 18:13:21 +0200 Subject: [PATCH 02/12] Add probing service tests Add integration tests that verify the probing service fires probes on the configured interval and respects the locked-msat budget cap. Shared helpers in tests/common are extended with probing-aware setup. Co-Authored-By: Claude Sonnet 4.6 --- src/builder.rs | 2 +- src/probing.rs | 10 +- tests/common/mod.rs | 36 ++++- tests/probing_tests.rs | 345 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 13 deletions(-) create mode 100644 tests/probing_tests.rs diff --git a/src/builder.rs b/src/builder.rs index 5660d6b75..4da58c0fa 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1130,7 +1130,7 @@ impl ArcedNodeBuilder { /// /// See [`ProbingConfig`] for details. pub fn set_probing_config(&self, config: Arc) { - self.inner.write().unwrap().set_probing_config((*config).clone()); + self.inner.write().expect("lock").set_probing_config((*config).clone()); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options diff --git a/src/probing.rs b/src/probing.rs index 3d0b1af75..ac0571bd1 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -226,14 +226,14 @@ impl ArcedProbingConfigBuilder { /// Overrides the interval between probe attempts. Defaults to 10 seconds. pub fn set_interval(&self, secs: u64) { - self.inner.write().unwrap().interval(Duration::from_secs(secs)); + self.inner.write().expect("lock").interval(Duration::from_secs(secs)); } /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. /// /// Defaults to 100 000 000 msat (100k sats). pub fn set_max_locked_msat(&self, max_msat: u64) { - self.inner.write().unwrap().max_locked_msat(max_msat); + self.inner.write().expect("lock").max_locked_msat(max_msat); } /// Sets the probing diversity penalty applied by the probabilistic scorer. @@ -244,19 +244,19 @@ impl ArcedProbingConfigBuilder { /// /// If unset, LDK's default of `0` (no penalty) is used. pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { - self.inner.write().unwrap().diversity_penalty_msat(penalty_msat); + self.inner.write().expect("lock").diversity_penalty_msat(penalty_msat); } /// Sets how long a probed node stays ineligible before being probed again. /// /// Only applies to the high-degree strategy. Defaults to 1 hour. pub fn set_cooldown(&self, secs: u64) { - self.inner.write().unwrap().cooldown(Duration::from_secs(secs)); + self.inner.write().expect("lock").cooldown(Duration::from_secs(secs)); } /// Builds the [`ProbingConfig`]. pub fn build(&self) -> Arc { - Arc::new(self.inner.read().unwrap().build()) + Arc::new(self.inner.read().expect("lock").build()) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 850c6f22c..306a432d8 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -35,6 +35,7 @@ use ldk_node::config::{ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +use ldk_node::probing::ProbingConfig; use ldk_node::{ Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, UserChannelId, @@ -318,9 +319,9 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { } #[cfg(feature = "uniffi")] -type TestNode = Arc; +pub(crate) type TestNode = Arc; #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +pub(crate) type TestNode = Node; #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -350,6 +351,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub probing: Option, } impl Default for TestConfig { @@ -369,6 +371,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + probing: None, } } } @@ -501,6 +504,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + if let Some(probing) = config.probing { + builder.set_probing_config(probing.into()); + } + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -728,12 +735,18 @@ pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, None, should_announce).await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo } -pub async fn open_channel_push_amt( +/// Like [`open_channel`] but skips the `wait_for_tx` electrum check so that +/// multiple channels can be opened back-to-back before any blocks are mined. +/// The caller is responsible for mining blocks and confirming the funding txs. +pub async fn open_channel_no_wait( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, - should_announce: bool, electrsd: &ElectrsD, + should_announce: bool, ) -> OutPoint { if should_announce { node_a @@ -761,11 +774,20 @@ pub async fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid).await; - funding_txo_a } +pub async fn open_channel_push_amt( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, + should_announce: bool, electrsd: &ElectrsD, +) -> OutPoint { + let funding_txo = + open_channel_no_wait(node_a, node_b, funding_amount_sat, push_amount_msat, should_announce) + .await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + funding_txo +} + pub async fn open_channel_with_all( node_a: &TestNode, node_b: &TestNode, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs new file mode 100644 index 000000000..26c5727c8 --- /dev/null +++ b/tests/probing_tests.rs @@ -0,0 +1,345 @@ +// Integration tests for the probing service. +// +// Budget tests – linear A ──[1M sats]──▶ B ──[1M sats]──▶ C topology: +// +// probe_budget_increments_and_decrements +// Verifies locked_msat rises when a probe is dispatched and returns +// to zero once the probe resolves. +// +// exhausted_probe_budget_blocks_new_probes +// Stops B mid-flight so the HTLC cannot resolve; confirms the budget +// stays exhausted and no further probes are sent. After B restarts +// the probe fails, the budget clears, and new probes resume. + +mod common; +use std::sync::atomic::{AtomicBool, Ordering}; + +use common::{ + expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, + premine_and_distribute_funds, random_chain_source, random_config, setup_bitcoind_and_electrsd, + setup_node, TestNode, +}; + +use ldk_node::bitcoin::Amount; +use ldk_node::probing::{ProbingConfigBuilder, ProbingStrategy}; +use ldk_node::Event; + +use lightning::routing::router::Path; + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +const PROBE_AMOUNT_MSAT: u64 = 1_000_000; +const PROBING_INTERVAL_MILLISECONDS: u64 = 500; + +/// FixedPathStrategy — returns a fixed pre-built path; used by budget tests. +/// +/// The path is set after node and channel setup via [`set_path`]. +struct FixedPathStrategy { + path: Mutex>, + ready_to_probe: AtomicBool, +} + +impl FixedPathStrategy { + fn new() -> Arc { + Arc::new(Self { path: Mutex::new(None), ready_to_probe: AtomicBool::new(false) }) + } + + fn set_path(&self, path: Path) { + *self.path.lock().unwrap() = Some(path); + } + + fn start_probing(&self) { + self.ready_to_probe.store(true, Ordering::Relaxed); + } + + fn stop_probing(&self) { + self.ready_to_probe.store(false, Ordering::Relaxed); + } +} + +impl ProbingStrategy for FixedPathStrategy { + fn next_probe(&self) -> Option { + if self.ready_to_probe.load(Ordering::Relaxed) { + self.path.lock().unwrap().clone() + } else { + None + } + } +} + +/// Builds a 2-hop probe path: node_a → node_b → node_c using live channel info. +fn build_probe_path( + node_a: &TestNode, node_b: &TestNode, node_c: &TestNode, amount_msat: u64, +) -> Path { + use lightning::routing::router::RouteHop; + use lightning_types::features::{ChannelFeatures, NodeFeatures}; + + let ch_ab = node_a + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id() && ch.short_channel_id.is_some()) + .expect("A→B channel not found"); + let ch_bc = node_b + .list_channels() + .into_iter() + .find(|ch| ch.counterparty_node_id == node_c.node_id() && ch.short_channel_id.is_some()) + .expect("B→C channel not found"); + + Path { + hops: vec![ + RouteHop { + pubkey: node_b.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_ab.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: 1000, + cltv_expiry_delta: 40, + maybe_announced_channel: true, + }, + RouteHop { + pubkey: node_c.node_id(), + node_features: NodeFeatures::empty(), + short_channel_id: ch_bc.short_channel_id.unwrap(), + channel_features: ChannelFeatures::empty(), + fee_msat: amount_msat, + cltv_expiry_delta: 0, + maybe_announced_channel: true, + }, + ], + blinded_tail: None, + } +} + +/// Verifies that `locked_msat` increases when a probe is dispatched and returns +/// to zero once the probe resolves (succeeds or fails). +#[tokio::test(flavor = "multi_thread")] +async fn probe_budget_increments_and_decrements() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(10 * PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + // Build the probe path now that channels are ready, then enable probing. + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + tokio::time::sleep(Duration::from_secs(3)).await; + strategy.start_probing(); + + let went_up = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + println!("First probe dispatched; locked_msat = {}", node_a.prober().unwrap().locked_msat()); + + strategy.stop_probing(); + let cleared = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never returned to zero after probe resolved"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that no new probes are dispatched once the in-flight budget is exhausted. +/// +/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC +/// is in-flight, preventing resolution and keeping the budget locked. After B restarts +/// the HTLC fails, the budget clears, and probing resumes. +#[tokio::test(flavor = "multi_thread")] +async fn exhausted_probe_budget_blocks_new_probes() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(2 * PROBE_AMOUNT_MSAT) + .build(), + ); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + let capacity_at_open = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + + assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + tokio::time::sleep(Duration::from_secs(3)).await; + strategy.start_probing(); + + // Wait for the first probe to be in-flight. + let locked = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(locked, "no probe dispatched within 30 s"); + + // Capacity should have decreased due to the in-flight probe HTLC. + let capacity_with_probe = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + assert!( + capacity_with_probe < capacity_at_open, + "HTLC not visible in channel state: capacity unchanged ({capacity_at_open} msat)" + ); + + // Stop B while the probe HTLC is in-flight. + node_b.stop().unwrap(); + // Pause probing so the budget can clear without a new probe re-locking it. + strategy.stop_probing(); + + tokio::time::sleep(Duration::from_secs(5)).await; + assert!( + node_a.prober().map_or(0, |p| p.locked_msat()) > 0, + "probe resolved unexpectedly while B was offline" + ); + let capacity_after_wait = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .unwrap_or(u64::MAX); + assert!( + capacity_after_wait >= capacity_with_probe, + "a new probe HTLC was sent despite budget being exhausted" + ); + + // strategy.stop_probing(); + + // Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves + // without waiting for the background reconnection backoff. + node_b.start().unwrap(); + let node_a_addr = node_a.listening_addresses().unwrap().first().unwrap().clone(); + let node_c_addr = node_c.listening_addresses().unwrap().first().unwrap().clone(); + node_b.connect(node_a.node_id(), node_a_addr, false).unwrap(); + node_b.connect(node_c.node_id(), node_c_addr, false).unwrap(); + + let cleared = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(cleared, "locked_msat never cleared after B came back online"); + + // Re-enable probing; a new probe should be dispatched within a few ticks. + strategy.start_probing(); + let new_probe = tokio::time::timeout(Duration::from_secs(60), async { + loop { + if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .is_ok(); + assert!(new_probe, "no new probe dispatched after budget was freed"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} From 2431f88611df92332fe2a0dc5e6e9352d3e10365 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Thu, 30 Apr 2026 13:59:09 +0200 Subject: [PATCH 03/12] Fix race condition in probing tests Changed the exhaust test to be statistical (locked amount never exceeds the cap) instead of trying to turn intermediary routing node offline when it hasn't yet forwarded the probe htlc. --- tests/probing_tests.rs | 118 ++++++++--------------------------------- 1 file changed, 21 insertions(+), 97 deletions(-) diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs index 26c5727c8..05267ba38 100644 --- a/tests/probing_tests.rs +++ b/tests/probing_tests.rs @@ -7,9 +7,8 @@ // to zero once the probe resolves. // // exhausted_probe_budget_blocks_new_probes -// Stops B mid-flight so the HTLC cannot resolve; confirms the budget -// stays exhausted and no further probes are sent. After B restarts -// the probe fails, the budget clears, and new probes resume. +// Samples locked_msat across multiple probe cycles and asserts it never +// exceeds the configured max_locked_msat budget cap. mod common; use std::sync::atomic::{AtomicBool, Ordering}; @@ -94,7 +93,7 @@ fn build_probe_path( short_channel_id: ch_ab.short_channel_id.unwrap(), channel_features: ChannelFeatures::empty(), fee_msat: 1000, - cltv_expiry_delta: 40, + cltv_expiry_delta: 144, maybe_announced_channel: true, }, RouteHop { @@ -103,7 +102,7 @@ fn build_probe_path( short_channel_id: ch_bc.short_channel_id.unwrap(), channel_features: ChannelFeatures::empty(), fee_msat: amount_msat, - cltv_expiry_delta: 0, + cltv_expiry_delta: 18, maybe_announced_channel: true, }, ], @@ -194,11 +193,7 @@ async fn probe_budget_increments_and_decrements() { node_c.stop().unwrap(); } -/// Verifies that no new probes are dispatched once the in-flight budget is exhausted. -/// -/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC -/// is in-flight, preventing resolution and keeping the budget locked. After B restarts -/// the HTLC fails, the budget clears, and probing resumes. +/// Verifies that `locked_msat` never exceeds `max_locked_msat` across multiple probe cycles. #[tokio::test(flavor = "multi_thread")] async fn exhausted_probe_budget_blocks_new_probes() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -209,10 +204,11 @@ async fn exhausted_probe_budget_blocks_new_probes() { let mut config_a = random_config(false); let strategy = FixedPathStrategy::new(); + let max_locked_msat = 2 * PROBE_AMOUNT_MSAT; config_a.probing = Some( ProbingConfigBuilder::custom(strategy.clone()) .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) - .max_locked_msat(2 * PROBE_AMOUNT_MSAT) + .max_locked_msat(max_locked_msat) .build(), ); let node_a = setup_node(&chain_source, config_a); @@ -244,100 +240,28 @@ async fn exhausted_probe_budget_blocks_new_probes() { expect_event!(node_b, ChannelReady); expect_event!(node_c, ChannelReady); - let capacity_at_open = node_a - .list_channels() - .iter() - .find(|ch| ch.counterparty_node_id == node_b.node_id()) - .map(|ch| ch.outbound_capacity_msat) - .expect("A→B channel not found"); - assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); tokio::time::sleep(Duration::from_secs(3)).await; strategy.start_probing(); - // Wait for the first probe to be in-flight. - let locked = tokio::time::timeout(Duration::from_secs(30), async { - loop { - if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; + // Sample locked_msat across multiple probe cycles and assert the budget cap is never exceeded + let mut observed_locked = false; + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + while tokio::time::Instant::now() < deadline { + let msat = node_a.prober().map_or(0, |p| p.locked_msat()); + if msat > 0 { + observed_locked = true; } - }) - .await - .is_ok(); - assert!(locked, "no probe dispatched within 30 s"); - - // Capacity should have decreased due to the in-flight probe HTLC. - let capacity_with_probe = node_a - .list_channels() - .iter() - .find(|ch| ch.counterparty_node_id == node_b.node_id()) - .map(|ch| ch.outbound_capacity_msat) - .expect("A→B channel not found"); - assert!( - capacity_with_probe < capacity_at_open, - "HTLC not visible in channel state: capacity unchanged ({capacity_at_open} msat)" - ); - - // Stop B while the probe HTLC is in-flight. - node_b.stop().unwrap(); - // Pause probing so the budget can clear without a new probe re-locking it. - strategy.stop_probing(); - - tokio::time::sleep(Duration::from_secs(5)).await; - assert!( - node_a.prober().map_or(0, |p| p.locked_msat()) > 0, - "probe resolved unexpectedly while B was offline" - ); - let capacity_after_wait = node_a - .list_channels() - .iter() - .find(|ch| ch.counterparty_node_id == node_b.node_id()) - .map(|ch| ch.outbound_capacity_msat) - .unwrap_or(u64::MAX); - assert!( - capacity_after_wait >= capacity_with_probe, - "a new probe HTLC was sent despite budget being exhausted" - ); - - // strategy.stop_probing(); - - // Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves - // without waiting for the background reconnection backoff. - node_b.start().unwrap(); - let node_a_addr = node_a.listening_addresses().unwrap().first().unwrap().clone(); - let node_c_addr = node_c.listening_addresses().unwrap().first().unwrap().clone(); - node_b.connect(node_a.node_id(), node_a_addr, false).unwrap(); - node_b.connect(node_c.node_id(), node_c_addr, false).unwrap(); - - let cleared = tokio::time::timeout(Duration::from_secs(60), async { - loop { - if node_a.prober().map_or(1, |p| p.locked_msat()) == 0 { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .is_ok(); - assert!(cleared, "locked_msat never cleared after B came back online"); + assert!( + msat <= max_locked_msat, + "locked_msat {msat} exceeded budget cap {max_locked_msat}" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } - // Re-enable probing; a new probe should be dispatched within a few ticks. - strategy.start_probing(); - let new_probe = tokio::time::timeout(Duration::from_secs(60), async { - loop { - if node_a.prober().map_or(0, |p| p.locked_msat()) > 0 { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .is_ok(); - assert!(new_probe, "no new probe dispatched after budget was freed"); + assert!(observed_locked, "no probe was dispatched during the observation window"); node_a.stop().unwrap(); node_b.stop().unwrap(); From 36fdff37a1cef843e62f921793066bb246cba023 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 02:53:55 +0200 Subject: [PATCH 04/12] Clamp the minimum number of probing hops to 2 --- src/probing.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/probing.rs b/src/probing.rs index ac0571bd1..e5d968193 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -127,6 +127,7 @@ impl ProbingConfigBuilder { /// Start building a config that probes via random graph walks. /// /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + /// Values below `2` are clamped to `2`. pub fn random_walk(max_hops: usize) -> Self { Self::with_kind(ProbingStrategyKind::Random { max_hops }) } @@ -219,6 +220,7 @@ impl ArcedProbingConfigBuilder { /// Creates a builder configured to probe via random graph walks. /// /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. + /// Values below `2` are clamped to `2`. #[uniffi::constructor] pub fn new_random_walk(max_hops: u64) -> Arc { Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) @@ -439,7 +441,7 @@ impl RandomStrategy { Self { network_graph, channel_manager, - max_hops: max_hops.clamp(1, MAX_PATH_LENGTH_ESTIMATE as usize), + max_hops: max_hops.clamp(2, MAX_PATH_LENGTH_ESTIMATE as usize), min_amount_msat, max_amount_msat, } @@ -633,7 +635,7 @@ impl RandomStrategy { impl ProbingStrategy for RandomStrategy { fn next_probe(&self) -> Option { - let target_hops = random_range(1, self.max_hops as u64) as usize; + let target_hops = random_range(2, self.max_hops as u64) as usize; let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); self.try_build_path(target_hops, amount_msat) From a162ab9cf67c42525a51b0697b08e3c74230f23e Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 03:00:38 +0200 Subject: [PATCH 05/12] Fix zero CLTV expiry delta in probing final hop --- src/probing.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/probing.rs b/src/probing.rs index e5d968193..9229b6ada 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -572,7 +572,8 @@ impl RandomStrategy { Some((node_features, channel_features, maybe_announced_channel)) }; - // Final hop: fee_msat carries the delivery amount; cltv delta is zero. + // Final hop: fee_msat carries the delivery amount; cltv_expiry_delta carries the + // destination's final CLTV (matching LDK's shifted-by-one RouteHop convention). { let (node_id, via_scid, pubkey) = route[last]; let (node_features, channel_features, maybe_announced_channel) = @@ -583,7 +584,7 @@ impl RandomStrategy { short_channel_id: via_scid, channel_features, fee_msat: amount_msat, - cltv_expiry_delta: 0, + cltv_expiry_delta: DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32, maybe_announced_channel, }); } From 5bd0bd142a66322678221842196df56c412d8ca6 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 03:35:46 +0200 Subject: [PATCH 06/12] Distinguish between background and user probes Previously when calculated currently locked amount, we didn't account for preflight probes sent on a payment which could result in an incorrect value of probe locked_msat. Now Prober saves the PaymentId of probes it sent and tracks them on release, ignoring the user-sent ones. --- src/builder.rs | 1 + src/event.rs | 16 ++++++++++++---- src/probing.rs | 18 +++++++++++++----- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 4da58c0fa..4c3e9ac26 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -2089,6 +2089,7 @@ fn build_with_store_internal( interval: probing_cfg.interval, max_locked_msat: probing_cfg.max_locked_msat, locked_msat: Arc::new(AtomicU64::new(0)), + inflight_probes: Mutex::new(HashMap::new()), }) }); diff --git a/src/event.rs b/src/event.rs index 3eda18790..72346efd9 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1161,14 +1161,22 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { path, .. } => { + LdkEvent::ProbeSuccessful { path, payment_id, .. } => { if let Some(prober) = &self.prober { - prober.handle_probe_successful(&path); + if let Some(amount) = + prober.inflight_probes.lock().expect("lock").remove(&payment_id) + { + prober.handle_background_probe_successful(&path, amount); + } } }, - LdkEvent::ProbeFailed { path, .. } => { + LdkEvent::ProbeFailed { path, payment_id, .. } => { if let Some(prober) = &self.prober { - prober.handle_probe_failed(&path); + if let Some(amount) = + prober.inflight_probes.lock().expect("lock").remove(&payment_id) + { + prober.handle_background_probe_failed(&path, amount); + } } }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { diff --git a/src/probing.rs b/src/probing.rs index 9229b6ada..79aa66430 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -16,6 +16,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use bitcoin::secp256k1::PublicKey; +use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::NodeId; use lightning::routing::router::{ Path, PaymentParameters, RouteHop, RouteParameters, MAX_PATH_LENGTH_ESTIMATE, @@ -654,6 +655,7 @@ pub struct Prober { /// Maximum total millisatoshis that may be locked in in-flight probes at any time. pub max_locked_msat: u64, pub(crate) locked_msat: Arc, + pub(crate) inflight_probes: Mutex>, } fn fmt_path(path: &lightning::routing::router::Path) -> String { @@ -670,8 +672,7 @@ impl Prober { self.locked_msat.load(Ordering::Relaxed) } - pub(crate) fn handle_probe_successful(&self, path: &lightning::routing::router::Path) { - let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + pub(crate) fn handle_background_probe_successful(&self, path: &Path, amount: u64) { let prev = self .locked_msat .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) @@ -687,8 +688,7 @@ impl Prober { ); } - pub(crate) fn handle_probe_failed(&self, path: &lightning::routing::router::Path) { - let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + pub(crate) fn handle_background_probe_failed(&self, path: &Path, amount: u64) { let prev = self .locked_msat .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) @@ -727,9 +727,16 @@ pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::wa log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); continue; } + // Hold `inflight_probes` across `send_probe` so the event handler in + // `event.rs` (which acquires the same lock to remove the entry) cannot + // observe a `ProbeSuccessful`/`ProbeFailed` for a payment_id we have not + // yet inserted, which would leave `locked_msat` permanently incremented. + let mut inflight = prober.inflight_probes.lock().expect("lock"); match prober.channel_manager.send_probe(path.clone()) { - Ok(_) => { + Ok((_, payment_id)) => { + inflight.insert(payment_id, amount); prober.locked_msat.fetch_add(amount, Ordering::Release); + drop(inflight); log_debug!( prober.logger, "Probe sent: locked {} msat, path: {}", @@ -738,6 +745,7 @@ pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::wa ); } Err(e) => { + drop(inflight); log_debug!( prober.logger, "Probe send failed: {:?}, path: {}", From 935bd845d928895f69aa6cb8dcce1705f5164469 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 03:53:18 +0200 Subject: [PATCH 07/12] Move ffi re-exports to `ffi/types.rs` --- src/ffi/types.rs | 1 + src/lib.rs | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/ffi/types.rs b/src/ffi/types.rs index ad293bc3e..4a4949e63 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -148,6 +148,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; +pub use crate::probing::ProbingConfig; use crate::{hex_utils, SocketAddress, UserChannelId}; uniffi::custom_type!(PublicKey, String, { diff --git a/src/lib.rs b/src/lib.rs index f0d695af6..c52502a6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,8 +115,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(cycle_tests)] use std::{any::Any, sync::Weak}; -#[cfg(feature = "uniffi")] -use crate::probing::ProbingConfig; pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance}; pub use bip39; pub use bitcoin; @@ -174,8 +172,6 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; -#[cfg(feature = "uniffi")] -pub use probing::ArcedProbingConfigBuilder as ProbingConfigBuilder; use probing::{run_prober, Prober}; use runtime::Runtime; pub use tokio; From 8acd55d154ca3176cbcf79319bb2e529dc786ba2 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 03:54:01 +0200 Subject: [PATCH 08/12] Update docs for background probing service --- src/builder.rs | 8 +++- src/probing.rs | 121 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 109 insertions(+), 20 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 4c3e9ac26..7955412d6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -638,14 +638,20 @@ impl NodeBuilder { /// Configures background probing. /// /// Use [`ProbingConfigBuilder`] to build the configuration: - /// ```ignore + /// ```no_run + /// # #[cfg(not(feature = "uniffi"))] + /// # { + /// use std::time::Duration; + /// use ldk_node::Builder; /// use ldk_node::probing::ProbingConfigBuilder; /// + /// let mut builder = Builder::new(); /// builder.set_probing_config( /// ProbingConfigBuilder::high_degree(100) /// .interval(Duration::from_secs(30)) /// .build() /// ); + /// # } /// ``` /// /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder diff --git a/src/probing.rs b/src/probing.rs index 79aa66430..ec251d578 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -5,7 +5,60 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -//! Background probing strategies for training the payment scorer. +//! Background probing for training the payment scorer. +//! +//! Lightning Network nodes only know channels' capacities via their initially announced limits; +//! the real values change unpredictably after payments have been sent, which makes some of +//! the channels inoperable (capacity has been depleted). The only way to know about channel +//! depletion is to attempt sending a payment through it. Thus, sending a live payment +//! might involve a significant time delay for finding an appropriate channel with enough capacity, +//! up to complete failure when a route with enough capacity cannot be found. +//! +//! The background probing service fires probes to learn about the live state of channels and +//! their capacities, providing accurate data to the scorer and router. +//! +//! This module provides the configuration for such a service. There are two pre-built strategies, +//! [`RandomStrategy`] and [`HighDegreeStrategy`], as well as a [`ProbingStrategy`] trait which +//! allows defining a custom probing strategy (for example if there is an established payment +//! pattern). +//! +//! # Configuration +//! +//! Probing is opt-in: a node only runs the service if a [`ProbingConfig`] has been registered +//! on the [`Builder`] via [`Builder::set_probing_config`] before [`Builder::build`]. Without a +//! config, no probes are sent. +//! +//! # Example +//! +//! ```no_run +//! # #[cfg(not(feature = "uniffi"))] +//! # { +//! use std::time::Duration; +//! use ldk_node::Builder; +//! use ldk_node::probing::ProbingConfigBuilder; +//! +//! let probing_config = ProbingConfigBuilder::high_degree(100) +//! .interval(Duration::from_secs(30)) +//! .max_locked_msat(500_000) +//! .diversity_penalty_msat(250) +//! .build(); +//! +//! let mut builder = Builder::new(); +//! builder.set_probing_config(probing_config); +//! # } +//! ``` +//! +//! # Caution +//! +//! Probes send real HTLCs along real paths. If an intermediate hop is offline or +//! misbehaving, the probe HTLC can remain in-flight — locking outbound liquidity +//! on the first-hop channel until the HTLC timeout elapses (potentially hours). +//! `max_locked_msat` caps the total outbound capacity that in-flight probes may +//! hold at any one time; tune it conservatively for nodes with tight liquidity. +//! +//! [`Builder`]: crate::Builder +//! [`Builder::set_probing_config`]: crate::Builder::set_probing_config +//! [`Builder::build`]: crate::Builder::build use std::collections::HashMap; use std::fmt; @@ -58,27 +111,51 @@ impl fmt::Debug for ProbingStrategyKind { /// Configuration for the background probing subsystem. /// -/// Construct via [`ProbingConfigBuilder`]. Pick a strategy with -/// [`ProbingConfigBuilder::high_degree`], [`ProbingConfigBuilder::random_walk`], or -/// [`ProbingConfigBuilder::custom`], chain optional setters, and finalize with -/// [`ProbingConfigBuilder::build`]. +/// Instances are produced by [`ProbingConfigBuilder`], which exposes three strategy +/// constructors: [`ProbingConfigBuilder::high_degree`], [`ProbingConfigBuilder::random_walk`], +/// and [`ProbingConfigBuilder::custom`]. /// -/// # Caution +/// Optional setters on the builder tune timing and liquidity limits, and +/// [`ProbingConfigBuilder::build`] finalizes the value. /// -/// Probes send real HTLCs along real paths. If an intermediate hop is offline or -/// misbehaving, the probe HTLC can remain in-flight — locking outbound liquidity -/// on the first-hop channel until the HTLC timeout elapses (potentially hours). -/// `max_locked_msat` caps the total outbound capacity that in-flight probes may -/// hold at any one time; tune it conservatively for nodes with tight liquidity. +/// # Examples +/// +/// Using pre-built strategy: +/// ```no_run +/// # #[cfg(not(feature = "uniffi"))] +/// # { +/// use std::time::Duration; +/// use ldk_node::Builder; +/// use ldk_node::probing::ProbingConfigBuilder; /// -/// # Example -/// ```ignore /// let config = ProbingConfigBuilder::high_degree(100) /// .interval(Duration::from_secs(30)) /// .max_locked_msat(500_000) /// .diversity_penalty_msat(250) /// .build(); +/// +/// let mut builder = Builder::new(); /// builder.set_probing_config(config); +/// # } +/// ``` +/// +/// Creating a custom strategy that always probes the same path: +/// ``` +/// use ldk_node::lightning::routing::router::Path; +/// use ldk_node::probing::ProbingStrategy; +/// +/// struct FixedPathStrategy { +/// path: Path, +/// } +/// impl ProbingStrategy for FixedPathStrategy { +/// fn next_probe(&self) -> Option { +/// if self.path.hops.len() > 1 { +/// Some(self.path.clone()) +/// } else { +/// None +/// } +/// } +/// } /// ``` #[derive(Clone, Debug)] #[cfg_attr(feature = "uniffi", derive(uniffi::Object))] @@ -92,8 +169,9 @@ pub struct ProbingConfig { /// Builder for [`ProbingConfig`]. /// -/// Pick a strategy with [`high_degree`], [`random_walk`], or [`custom`], chain optional -/// setters, and call [`build`] to finalize. +/// A new instance starts from one of three strategy constructors — [`high_degree`], +/// [`random_walk`], or [`custom`] — and is finalized through [`build`]. Optional setters +/// in between override the timing and liquidity defaults. /// /// [`high_degree`]: Self::high_degree /// [`random_walk`]: Self::random_walk @@ -193,8 +271,9 @@ impl ProbingConfigBuilder { /// A UniFFI-compatible wrapper around [`ProbingConfigBuilder`] that uses interior mutability /// so it can be shared behind an `Arc` as required by the FFI object model. /// -/// Obtain one via the constructors [`new_high_degree`] or [`new_random_walk`], configure it -/// with the `set_*` methods, then call [`build`] to produce a [`ProbingConfig`]. +/// Instances are produced by the constructors [`new_high_degree`] and [`new_random_walk`]. +/// The `set_*` methods override the defaults, and [`build`] yields the resulting +/// [`ProbingConfig`]. /// /// [`new_high_degree`]: Self::new_high_degree /// [`new_random_walk`]: Self::new_random_walk @@ -263,7 +342,7 @@ impl ArcedProbingConfigBuilder { } } -/// Strategy can be used for determining the next target and amount for probing. +/// A strategy that decides which path the probing service should probe next. pub trait ProbingStrategy: Send + Sync + 'static { /// Returns the next probe path to run, or `None` to skip this tick. fn next_probe(&self) -> Option; @@ -281,6 +360,8 @@ pub trait ProbingStrategy: Send + Sync + 'static { /// /// The probe amount is chosen uniformly at random from /// `[min_amount_msat, max_amount_msat]`. +/// +/// `HighDegreeStrategy` can only use publicly announced channels for probing. pub struct HighDegreeStrategy { network_graph: Arc, channel_manager: Arc, @@ -406,7 +487,7 @@ impl ProbingStrategy for HighDegreeStrategy { } } -/// Explores the graph by walking a random number of hops outward from one of our own +/// Explores the graph by walking a random number (≥2) of hops outward from one of our own /// channels, constructing the [`Path`] explicitly. /// /// On each tick: @@ -418,6 +499,8 @@ impl ProbingStrategy for HighDegreeStrategy { /// /// Because path selection ignores the scorer, this probes channels the router /// would never try on its own, teaching the scorer about previously unknown paths. +/// +/// `RandomStrategy` can only use publicly announced channels for probing. pub struct RandomStrategy { network_graph: Arc, channel_manager: Arc, From 5e77f8893db61d95e207c51cd015d38b6c3b9ef3 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 09:50:57 +0200 Subject: [PATCH 09/12] Wait for channel readiness in probing tests --- tests/common/mod.rs | 31 +++++++++++++++++++++++++++++++ tests/probing_tests.rs | 9 ++++++--- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 306a432d8..ece78e10e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -610,6 +610,37 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin .await; } +/// Polls the channel from `source_node` to `counterparty_node` until it reports `is_usable` +/// and can carry an HTLC of `min_amount_msat` from `source_node`'s side. +/// +/// After `ChannelReady`, channel-monitor persistence can lag for tens of seconds on slow +/// CI runners; during that window `send_probe`/`send_payment` reject with +/// `ParameterError("...monitor update is in progress...")`. This helper gives tests a +/// deterministic readiness gate instead of racing the monitor-update pipeline. +pub(crate) async fn wait_for_channel_ready_to_send( + source_node: &TestNode, counterparty_node: &TestNode, min_amount_msat: u64, +) { + let counterparty = counterparty_node.node_id(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(180); + while tokio::time::Instant::now() < deadline { + let ready = source_node.list_channels().iter().any(|c| { + c.counterparty_node_id == counterparty + && c.is_usable + && c.next_outbound_htlc_limit_msat >= min_amount_msat + }); + if ready { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + panic!( + "channel from {} to {} not ready to send {} msat within 180s", + source_node.node_id(), + counterparty, + min_amount_msat, + ); +} + pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs index 05267ba38..c48a6401d 100644 --- a/tests/probing_tests.rs +++ b/tests/probing_tests.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use common::{ expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, premine_and_distribute_funds, random_chain_source, random_config, setup_bitcoind_and_electrsd, - setup_node, TestNode, + setup_node, wait_for_channel_ready_to_send, TestNode, }; use ldk_node::bitcoin::Amount; @@ -159,7 +159,9 @@ async fn probe_budget_increments_and_decrements() { // Build the probe path now that channels are ready, then enable probing. strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); - tokio::time::sleep(Duration::from_secs(3)).await; + // First hop carries amount + per-hop fee; second hop carries just amount. + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; strategy.start_probing(); let went_up = tokio::time::timeout(Duration::from_secs(30), async { @@ -243,7 +245,8 @@ async fn exhausted_probe_budget_blocks_new_probes() { assert_eq!(node_a.prober().map_or(1, |p| p.locked_msat()), 0, "initial locked_msat is nonzero"); strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); - tokio::time::sleep(Duration::from_secs(3)).await; + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; strategy.start_probing(); // Sample locked_msat across multiple probe cycles and assert the budget cap is never exceeded From c68b8e36ad37dd925da96062e1da641b2534e471 Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Wed, 6 May 2026 10:12:52 +0200 Subject: [PATCH 10/12] retrigger CI From 15edebc6c73a4a266f35e0a2abf853930b6e095e Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Fri, 8 May 2026 10:00:25 +0200 Subject: [PATCH 11/12] Deduce probing state from channel manager Previously we tried to store the total amount of funds locked and/or exact probes in flight which was difficult to persist and restore after restart. Now it is completely deduced from the channel manager. --- src/builder.rs | 7 ++--- src/event.rs | 12 ++------ src/probing.rs | 76 ++++++++++++++++++++++---------------------------- 3 files changed, 38 insertions(+), 57 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 7955412d6..85bed9ffe 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -10,7 +10,6 @@ use std::convert::TryInto; use std::default::Default; use std::net::ToSocketAddrs; use std::path::PathBuf; -use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::SystemTime; use std::{fmt, fs}; @@ -1134,7 +1133,9 @@ impl ArcedNodeBuilder { /// Configures background probing. /// - /// See [`ProbingConfig`] for details. + /// Use [`ProbingConfigBuilder`] to build the configuration. + /// + /// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder pub fn set_probing_config(&self, config: Arc) { self.inner.write().expect("lock").set_probing_config((*config).clone()); } @@ -2094,8 +2095,6 @@ fn build_with_store_internal( strategy, interval: probing_cfg.interval, max_locked_msat: probing_cfg.max_locked_msat, - locked_msat: Arc::new(AtomicU64::new(0)), - inflight_probes: Mutex::new(HashMap::new()), }) }); diff --git a/src/event.rs b/src/event.rs index 72346efd9..58ea9836e 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1163,20 +1163,12 @@ where LdkEvent::PaymentPathFailed { .. } => {}, LdkEvent::ProbeSuccessful { path, payment_id, .. } => { if let Some(prober) = &self.prober { - if let Some(amount) = - prober.inflight_probes.lock().expect("lock").remove(&payment_id) - { - prober.handle_background_probe_successful(&path, amount); - } + prober.handle_background_probe_successful(&path, payment_id); } }, LdkEvent::ProbeFailed { path, payment_id, .. } => { if let Some(prober) = &self.prober { - if let Some(amount) = - prober.inflight_probes.lock().expect("lock").remove(&payment_id) - { - prober.handle_background_probe_failed(&path, amount); - } + prober.handle_background_probe_failed(&path, payment_id); } }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { diff --git a/src/probing.rs b/src/probing.rs index ec251d578..ede116645 100644 --- a/src/probing.rs +++ b/src/probing.rs @@ -62,15 +62,15 @@ use std::collections::HashMap; use std::fmt; -use std::sync::atomic::{AtomicU64, Ordering}; #[cfg(feature = "uniffi")] use std::sync::RwLock; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use bitcoin::secp256k1::PublicKey; -use lightning::ln::channelmanager::PaymentId; +use lightning::ln::channelmanager::{PaymentId, RecentPaymentDetails}; use lightning::routing::gossip::NodeId; +use lightning::routing::router::Router as LdkRouter; use lightning::routing::router::{ Path, PaymentParameters, RouteHop, RouteParameters, MAX_PATH_LENGTH_ESTIMATE, }; @@ -85,8 +85,6 @@ use crate::logger::{log_debug, LdkLogger, Logger}; use crate::types::{ChannelManager, Graph, Router}; use crate::util::random_range; -use lightning::routing::router::Router as LdkRouter; - /// Which built-in probing strategy to use, or a custom one. #[derive(Clone)] pub(crate) enum ProbingStrategyKind { @@ -287,7 +285,7 @@ pub struct ArcedProbingConfigBuilder { #[cfg(feature = "uniffi")] #[uniffi::export] impl ArcedProbingConfigBuilder { - /// Creates a builder configured to probe toward the highest-degree nodes in the graph. + /// Start building a config that probes toward the highest-degree nodes in the graph. /// /// `top_node_count` controls how many of the most-connected nodes are cycled through. #[uniffi::constructor] @@ -297,7 +295,7 @@ impl ArcedProbingConfigBuilder { }) } - /// Creates a builder configured to probe via random graph walks. + /// Start building a config that probes via random graph walks. /// /// `max_hops` is the upper bound on the number of hops in a randomly constructed path. /// Values below `2` are clamped to `2`. @@ -306,7 +304,9 @@ impl ArcedProbingConfigBuilder { Arc::new(Self { inner: RwLock::new(ProbingConfigBuilder::random_walk(max_hops as usize)) }) } - /// Overrides the interval between probe attempts. Defaults to 10 seconds. + /// Overrides the interval between probe attempts. + /// + /// Defaults to 10 seconds. pub fn set_interval(&self, secs: u64) { self.inner.write().expect("lock").interval(Duration::from_secs(secs)); } @@ -324,6 +324,10 @@ impl ArcedProbingConfigBuilder { /// encouraging path diversity during background probing. The penalty decays /// quadratically over 24 hours. /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`RandomStrategy`]) bypass the scorer entirely. + /// /// If unset, LDK's default of `0` (no penalty) is used. pub fn set_diversity_penalty_msat(&self, penalty_msat: u64) { self.inner.write().expect("lock").diversity_penalty_msat(penalty_msat); @@ -331,7 +335,7 @@ impl ArcedProbingConfigBuilder { /// Sets how long a probed node stays ineligible before being probed again. /// - /// Only applies to the high-degree strategy. Defaults to 1 hour. + /// Only applies to [`HighDegreeStrategy`]. Defaults to 1 hour. pub fn set_cooldown(&self, secs: u64) { self.inner.write().expect("lock").cooldown(Duration::from_secs(secs)); } @@ -737,8 +741,6 @@ pub struct Prober { pub interval: Duration, /// Maximum total millisatoshis that may be locked in in-flight probes at any time. pub max_locked_msat: u64, - pub(crate) locked_msat: Arc, - pub(crate) inflight_probes: Mutex>, } fn fmt_path(path: &lightning::routing::router::Path) -> String { @@ -752,37 +754,33 @@ fn fmt_path(path: &lightning::routing::router::Path) -> String { impl Prober { /// Returns the total millisatoshis currently locked in in-flight probes. pub fn locked_msat(&self) -> u64 { - self.locked_msat.load(Ordering::Relaxed) + return self + .channel_manager + .list_recent_payments() + .into_iter() + .filter_map(|p| match p { + RecentPaymentDetails::Pending { is_probe: true, total_msat, .. } => { + Some(total_msat) + }, + _ => None, + }) + .sum(); } - pub(crate) fn handle_background_probe_successful(&self, path: &Path, amount: u64) { - let prev = self - .locked_msat - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) - .expect("fetch_update closure always returns Some"); - let new = prev.saturating_sub(amount); + pub(crate) fn handle_background_probe_successful(&self, path: &Path, payment_id: PaymentId) { log_debug!( self.logger, - "Probe successful: released {} msat (locked_msat {} -> {}), path: {}", - amount, - prev, - new, + "Background probe with payment_id: {} succeeded along the path: {}", + payment_id, fmt_path(path) ); } - pub(crate) fn handle_background_probe_failed(&self, path: &Path, amount: u64) { - let prev = self - .locked_msat - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| Some(v.saturating_sub(amount))) - .expect("fetch_update closure always returns Some"); - let new = prev.saturating_sub(amount); + pub(crate) fn handle_background_probe_failed(&self, path: &Path, payment_id: PaymentId) { log_debug!( self.logger, - "Probe failed: released {} msat (locked_msat {} -> {}), path: {}", - amount, - prev, - new, + "Background probe with payment_id: {} failed along the path: {}", + payment_id, fmt_path(path) ); } @@ -806,32 +804,24 @@ pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::wa None => continue, }; let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); - if prober.locked_msat.load(Ordering::Acquire) + amount > prober.max_locked_msat { + if prober.locked_msat() + amount > prober.max_locked_msat { log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); continue; } - // Hold `inflight_probes` across `send_probe` so the event handler in - // `event.rs` (which acquires the same lock to remove the entry) cannot - // observe a `ProbeSuccessful`/`ProbeFailed` for a payment_id we have not - // yet inserted, which would leave `locked_msat` permanently incremented. - let mut inflight = prober.inflight_probes.lock().expect("lock"); match prober.channel_manager.send_probe(path.clone()) { Ok((_, payment_id)) => { - inflight.insert(payment_id, amount); - prober.locked_msat.fetch_add(amount, Ordering::Release); - drop(inflight); log_debug!( prober.logger, - "Probe sent: locked {} msat, path: {}", + "Background probe with payment_id {} sent: locked {} msat, path: {}", + payment_id, amount, fmt_path(&path) ); } Err(e) => { - drop(inflight); log_debug!( prober.logger, - "Probe send failed: {:?}, path: {}", + "Background probe send failed: {:?}, path: {}", e, fmt_path(&path) ); From 76055ae463bf9e425867cd0fc482826d541b475c Mon Sep 17 00:00:00 2001 From: Alexander Shevtsov Date: Fri, 8 May 2026 10:02:00 +0200 Subject: [PATCH 12/12] Add probing test of state recovery after restart --- tests/probing_tests.rs | 106 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 105 insertions(+), 1 deletion(-) diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs index c48a6401d..c36535f01 100644 --- a/tests/probing_tests.rs +++ b/tests/probing_tests.rs @@ -9,6 +9,11 @@ // exhausted_probe_budget_blocks_new_probes // Samples locked_msat across multiple probe cycles and asserts it never // exceeds the configured max_locked_msat budget cap. +// +// probing_budget_restored_after_node_restart +// Dispatches a probe, then stops node_b before the failure can propagate +// back so the pending probe HTLC is preserved. Restarts node_a and asserts +// the prober's locked_msat is rebuilt non-zero from list_recent_payments(). mod common; use std::sync::atomic::{AtomicBool, Ordering}; @@ -16,7 +21,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use common::{ expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, premine_and_distribute_funds, random_chain_source, random_config, setup_bitcoind_and_electrsd, - setup_node, wait_for_channel_ready_to_send, TestNode, + setup_node, wait_for_channel_ready_to_send, TestNode, TestStoreType, }; use ldk_node::bitcoin::Amount; @@ -195,6 +200,105 @@ async fn probe_budget_increments_and_decrements() { node_c.stop().unwrap(); } +/// Verifies that `locked_msat` is restored after the node is stopped and restarted +/// while a probe is still in flight. +/// +/// Race-sensitive: once a probe is dispatched, the failure round-trip +/// (`A→B→C → C fails back → B → A`) resolves it within milliseconds. To keep the +/// HTLC pending across the restart we observe `locked_msat > 0` and then *immediately* +/// stop `node_b`, which prevents `B` from forwarding the failure back to `A`. +/// The pending Probe entry persists in `node_a`'s channel manager and must be +/// rebuilt by the prober's `locked_msat` on restart via `list_recent_payments()`. +#[tokio::test(flavor = "multi_thread")] +async fn probing_budget_restored_after_node_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + let mut config_a = random_config(false); + // Use a pure on-disk store so state survives the restart. + config_a.store_type = TestStoreType::Sqlite; + let strategy = FixedPathStrategy::new(); + config_a.probing = Some( + ProbingConfigBuilder::custom(strategy.clone()) + .interval(Duration::from_millis(PROBING_INTERVAL_MILLISECONDS)) + .max_locked_msat(10 * PROBE_AMOUNT_MSAT) + .build(), + ); + let restart_config = config_a.clone(); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + strategy.set_path(build_probe_path(&node_a, &node_b, &node_c, PROBE_AMOUNT_MSAT)); + wait_for_channel_ready_to_send(&node_a, &node_b, PROBE_AMOUNT_MSAT + 1000).await; + wait_for_channel_ready_to_send(&node_b, &node_c, PROBE_AMOUNT_MSAT).await; + + strategy.start_probing(); + let went_up = tokio::time::timeout(Duration::from_secs(30), async { + loop { + if node_a.prober().unwrap().locked_msat() > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .is_ok(); + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + + node_b.stop().unwrap(); + strategy.stop_probing(); + + let locked_before = node_a.prober().unwrap().locked_msat(); + println!("Before restart: locked_msat = {}", locked_before); + assert!(locked_before > 0, "probe resolved before we could stop node_b — flaky timing"); + + node_a.stop().unwrap(); + + // Restart node_a from the same persisted state. + let node_a = setup_node(&chain_source, restart_config); + + let locked_after = node_a.prober().unwrap().locked_msat(); + println!("After restart: locked_msat = {}", locked_after); + assert!( + locked_after > 0, + "locked_msat was not restored after restart (before={} after={})", + locked_before, + locked_after + ); + + node_a.stop().unwrap(); + node_c.stop().unwrap(); +} + /// Verifies that `locked_msat` never exceeds `max_locked_msat` across multiple probe cycles. #[tokio::test(flavor = "multi_thread")] async fn exhausted_probe_budget_blocks_new_probes() {