@@ -24,8 +24,11 @@ import kotlinx.coroutines.flow.MutableStateFlow
2424import kotlinx.coroutines.flow.asSharedFlow
2525import kotlinx.coroutines.flow.asStateFlow
2626import kotlinx.coroutines.flow.distinctUntilChanged
27+ import kotlinx.coroutines.flow.filter
2728import kotlinx.coroutines.flow.first
2829import kotlinx.coroutines.flow.map
30+ import kotlinx.coroutines.flow.mapNotNull
31+ import kotlinx.coroutines.flow.onSubscription
2932import kotlinx.coroutines.flow.update
3033import kotlinx.coroutines.isActive
3134import kotlinx.coroutines.launch
@@ -44,6 +47,7 @@ import org.lightningdevkit.ldknode.ClosureReason
4447import org.lightningdevkit.ldknode.Event
4548import org.lightningdevkit.ldknode.NodeStatus
4649import org.lightningdevkit.ldknode.PaymentDetails
50+ import org.lightningdevkit.ldknode.PaymentHash
4751import org.lightningdevkit.ldknode.PaymentId
4852import org.lightningdevkit.ldknode.PeerDetails
4953import org.lightningdevkit.ldknode.SpendableUtxo
@@ -60,6 +64,7 @@ import to.bitkit.ext.nowTimestamp
6064import to.bitkit.ext.toPeerDetailsList
6165import to.bitkit.models.ALL_ADDRESS_TYPE_STRINGS
6266import to.bitkit.models.CoinSelectionPreference
67+ import to.bitkit.models.MSat
6368import to.bitkit.models.NATIVE_WITNESS_TYPES
6469import to.bitkit.models.NodeLifecycleState
6570import to.bitkit.models.OpenChannelResult
@@ -121,6 +126,8 @@ class LightningRepo @Inject constructor(
121126 val isRecoveryMode = _isRecoveryMode .asStateFlow()
122127
123128 private val channelCache = ConcurrentHashMap <String , ChannelDetails >()
129+ private val probeOutcomeCache = ConcurrentHashMap <PaymentId , ProbeOutcome >()
130+ private val probeOutcomeSignal = MutableSharedFlow <ProbeOutcome >(extraBufferCapacity = 64 )
124131
125132 private val syncMutex = Mutex ()
126133 private val syncPending = AtomicBoolean (false )
@@ -420,6 +427,7 @@ class LightningRepo @Inject constructor(
420427
421428 private suspend fun onEvent (event : Event ) {
422429 handleLdkEvent(event)
430+ recordProbeOutcome(event)
423431 _eventHandlers .toList().forEach {
424432 runCatching { it.invoke(event) }
425433 }
@@ -441,12 +449,14 @@ class LightningRepo @Inject constructor(
441449 suspend fun stop (): Result <Unit > = withContext(bgDispatcher) {
442450 lifecycleMutex.withLock {
443451 if (_lightningState .value.nodeLifecycleState.isStoppedOrStopping()) {
452+ clearProbeOutcomes()
444453 return @withLock Result .success(Unit )
445454 }
446455
447456 runCatching {
448457 _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Stopping ) }
449458 lightningService.stop()
459+ clearProbeOutcomes()
450460 _lightningState .update { LightningState (nodeLifecycleState = NodeLifecycleState .Stopped ) }
451461 }.onFailure {
452462 Logger .error(" Node stop error" , it, context = TAG )
@@ -529,6 +539,21 @@ class LightningRepo @Inject constructor(
529539 }
530540 }
531541
542+ private suspend fun recordProbeOutcome (event : Event ) {
543+ val outcome = when (event) {
544+ is Event .ProbeSuccessful -> ProbeOutcome .Success (event.paymentId, event.paymentHash)
545+ is Event .ProbeFailed -> ProbeOutcome .Failure (event.paymentId, event.paymentHash, event.shortChannelId)
546+ else -> return
547+ }
548+
549+ probeOutcomeCache[outcome.paymentId] = outcome
550+ probeOutcomeSignal.emit(outcome)
551+ }
552+
553+ private fun clearProbeOutcomes () {
554+ probeOutcomeCache.clear()
555+ }
556+
532557 private suspend fun registerClosedChannel (channelId : String , reason : ClosureReason ? ) = withContext(bgDispatcher) {
533558 runCatching {
534559 val channel = channelCache[channelId] ? : run {
@@ -582,6 +607,7 @@ class LightningRepo @Inject constructor(
582607 stop().mapCatching {
583608 Logger .debug(" node stopped, calling wipeStorage" , context = TAG )
584609 lightningService.wipeStorage(walletIndex)
610+ clearProbeOutcomes()
585611 _lightningState .update {
586612 LightningState (
587613 nodeStatus = it.nodeStatus,
@@ -1363,23 +1389,74 @@ class LightningRepo @Inject constructor(
13631389 // endregion
13641390
13651391 // region probing
1366- suspend fun sendProbeForInvoice (bolt11 : String , amountSats : ULong? = null): Result <Unit > =
1392+ suspend fun sendProbeForInvoice (bolt11 : String , amountSats : ULong? = null): Result <ProbeDispatch > =
13671393 executeWhenNodeRunning(" sendProbeForInvoice" ) {
13681394 Logger .debug(
1369- " sendProbeForInvoice: amountSats=${amountSats ? : " null (using invoice amount)" } " ,
1370- context = TAG
1395+ " sendProbeForInvoice: amountSats=' ${amountSats ? : " null (using invoice amount)" } ' " ,
1396+ context = TAG ,
13711397 )
1372- runCatching {
1373- if (amountSats != null ) {
1374- val amountMsat = amountSats * 1000u
1375- lightningService.sendProbesUsingAmount(bolt11, amountMsat)
1376- } else {
1377- lightningService.sendProbes(bolt11)
1378- }
1379- }.getOrElse {
1380- Result .failure(it)
1398+ val result = if (amountSats != null ) {
1399+ val amountMsat = amountSats.safe() * MSat .PER_SAT .safe()
1400+ lightningService.sendProbesUsingAmount(bolt11, amountMsat)
1401+ } else {
1402+ lightningService.sendProbes(bolt11)
13811403 }
1404+
1405+ result.map { ProbeDispatch (paymentIds = it) }
13821406 }
1407+
1408+ suspend fun sendProbeForNode (nodeId : String , amountSats : ULong ): Result <ProbeDispatch > =
1409+ executeWhenNodeRunning(" sendProbeForNode" ) {
1410+ Logger .debug(
1411+ " Sending keysend probe to nodeId='$nodeId ' amountSats='$amountSats '" ,
1412+ context = TAG ,
1413+ )
1414+ val amountMsat = amountSats.safe() * MSat .PER_SAT .safe()
1415+ lightningService.sendKeysendProbe(nodeId, amountMsat).map {
1416+ ProbeDispatch (paymentIds = it)
1417+ }
1418+ }
1419+
1420+ suspend fun waitForProbeOutcome (
1421+ paymentIds : Set <PaymentId >,
1422+ timeout : Duration = PROBE_TIMEOUT ,
1423+ ): Result <ProbeOutcome > = withContext(bgDispatcher) {
1424+ if (paymentIds.isEmpty()) {
1425+ return @withContext Result .failure(ProbeError .NoProbeHandles ())
1426+ }
1427+
1428+ val trackedIds = paymentIds.toSet()
1429+ val outcome = withTimeoutOrNull(timeout) {
1430+ val pending = trackedIds.toMutableSet()
1431+ var lastFailure: ProbeOutcome .Failure ? = null
1432+
1433+ probeOutcomeSignal
1434+ .onSubscription {
1435+ trackedIds.forEach { id ->
1436+ probeOutcomeCache[id]?.let { emit(it) }
1437+ }
1438+ }
1439+ .filter { it.paymentId in trackedIds }
1440+ .mapNotNull { probeOutcome ->
1441+ if (! pending.remove(probeOutcome.paymentId)) return @mapNotNull null
1442+
1443+ probeOutcomeCache.remove(probeOutcome.paymentId)
1444+ when (probeOutcome) {
1445+ is ProbeOutcome .Success -> probeOutcome
1446+ is ProbeOutcome .Failure -> {
1447+ lastFailure = probeOutcome
1448+ if (pending.isEmpty()) lastFailure else null
1449+ }
1450+ }
1451+ }
1452+ .first()
1453+ }
1454+
1455+ trackedIds.forEach { probeOutcomeCache.remove(it) }
1456+
1457+ outcome?.let { Result .success(it) }
1458+ ? : Result .failure(ProbeError .TimedOut ())
1459+ }
13831460 // endregion
13841461
13851462 suspend fun restartNode (): Result <Unit > = withContext(bgDispatcher) {
@@ -1404,6 +1481,7 @@ class LightningRepo @Inject constructor(
14041481 private const val CHANNELS_READY_TIMEOUT_MS = 15_000L
14051482 private const val CHANNELS_USABLE_TIMEOUT_MS = 15_000L
14061483 val SEND_LN_TIMEOUT = 10 .seconds
1484+ private val PROBE_TIMEOUT = 60 .seconds
14071485 }
14081486}
14091487
@@ -1413,6 +1491,10 @@ class NodeStopTimeoutError : AppError("Timeout waiting for node to stop")
14131491class NodeRunTimeoutError (opName : String ) : AppError(" Timeout waiting for node to run and execute: '$opName '" )
14141492class GetPaymentsError : AppError (" It wasn't possible get the payments" )
14151493class SyncUnhealthyError : AppError (" Wallet sync failed before send" )
1494+ sealed class ProbeError (message : String ) : AppError(message) {
1495+ class NoProbeHandles : ProbeError (" No probe handles returned" )
1496+ class TimedOut : ProbeError (" Probe timed out" )
1497+ }
14161498
14171499@Stable
14181500data class LightningState (
@@ -1436,3 +1518,23 @@ data class LightningState(
14361518 val isSyncHealthy: Boolean
14371519 get() = lastSyncError == null && lastSuccessfulSyncAt != null
14381520}
1521+
1522+ data class ProbeDispatch (
1523+ val paymentIds : Set <PaymentId >,
1524+ )
1525+
1526+ sealed interface ProbeOutcome {
1527+ val paymentId: PaymentId
1528+ val paymentHash: PaymentHash
1529+
1530+ data class Success (
1531+ override val paymentId : PaymentId ,
1532+ override val paymentHash : PaymentHash ,
1533+ ) : ProbeOutcome
1534+
1535+ data class Failure (
1536+ override val paymentId : PaymentId ,
1537+ override val paymentHash : PaymentHash ,
1538+ val shortChannelId : ULong? ,
1539+ ) : ProbeOutcome
1540+ }
0 commit comments