Skip to content

Commit ecba943

Browse files
authored
Merge pull request #717 from synonymdev/fix/handle-node-connection-errors
Handle node connection issues
2 parents 23ec978 + 96686b6 commit ecba943

5 files changed

Lines changed: 241 additions & 56 deletions

File tree

app/src/main/java/to/bitkit/repositories/HealthRepo.kt

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged
1111
import kotlinx.coroutines.flow.map
1212
import kotlinx.coroutines.flow.update
1313
import kotlinx.coroutines.launch
14+
import org.lightningdevkit.ldknode.ChannelDetails
1415
import to.bitkit.data.CacheStore
1516
import to.bitkit.di.BgDispatcher
1617
import to.bitkit.models.BackupCategory
@@ -43,68 +44,80 @@ class HealthRepo @Inject constructor(
4344
observeBackupStatus()
4445
}
4546

46-
@Suppress("CyclomaticComplexMethod")
4747
private fun collectState() {
48-
val internetHealthState = connectivityRepo.isOnline.map { connectivityState ->
49-
when (connectivityState) {
50-
ConnectivityState.CONNECTED -> HealthState.READY
51-
ConnectivityState.CONNECTING -> HealthState.PENDING
52-
ConnectivityState.DISCONNECTED -> HealthState.ERROR
53-
}
54-
}
48+
val internetHealthState = connectivityRepo.isOnline.map { it.asHealth() }
5549

5650
repoScope.launch {
5751
combine(
5852
internetHealthState,
5953
lightningRepo.lightningState,
6054
) { internetHealth, lightningState ->
61-
val isOnline = internetHealth == HealthState.READY
62-
val nodeLifecycleState = lightningState.nodeLifecycleState
63-
64-
val nodeHealth = when {
65-
!isOnline -> HealthState.ERROR
66-
else -> nodeLifecycleState.asHealth()
67-
}
68-
69-
val electrumHealth = when {
70-
!isOnline -> HealthState.ERROR
71-
nodeLifecycleState.isRunning() -> HealthState.READY
72-
nodeLifecycleState.canRun() -> HealthState.PENDING
73-
else -> HealthState.ERROR
74-
}
75-
76-
val channelsHealth = when {
77-
!isOnline -> HealthState.ERROR
78-
else -> {
79-
val channels = lightningState.channels
80-
val hasOpenChannels = channels.any { it.isChannelReady }
81-
val hasPendingChannels = channels.any { !it.isChannelReady }
82-
83-
when {
84-
hasOpenChannels -> HealthState.READY
85-
hasPendingChannels -> HealthState.PENDING
86-
else -> HealthState.ERROR
87-
}
88-
}
89-
}
90-
91-
AppHealthState(
92-
internet = internetHealth,
93-
electrum = electrumHealth,
94-
node = nodeHealth,
95-
channels = channelsHealth,
96-
)
55+
computeHealthState(internetHealth, lightningState)
9756
}.collect { newHealthState ->
98-
updateState { currentState ->
99-
newHealthState.copy(
100-
backups = currentState.backups,
101-
app = currentState.app,
57+
updateState {
58+
it.copy(
59+
internet = newHealthState.internet,
60+
electrum = newHealthState.electrum,
61+
node = newHealthState.node,
62+
channels = newHealthState.channels,
10263
)
10364
}
10465
}
10566
}
10667
}
10768

69+
@Suppress("CyclomaticComplexMethod")
70+
private fun computeHealthState(internetHealth: HealthState, lightningState: LightningState): AppHealthState {
71+
val isOnline = internetHealth == HealthState.READY
72+
val nodeLifecycleState = lightningState.nodeLifecycleState
73+
val isSyncing = lightningState.isSyncingWallet
74+
val hasSyncError = lightningState.lastSyncError != null
75+
76+
val nodeHealth = when {
77+
!isOnline -> HealthState.ERROR
78+
isSyncing -> HealthState.PENDING
79+
hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR
80+
else -> nodeLifecycleState.asHealth()
81+
}
82+
83+
val electrumHealth = when {
84+
!isOnline -> HealthState.ERROR
85+
isSyncing -> HealthState.PENDING
86+
hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR
87+
nodeLifecycleState.isRunning() -> HealthState.READY
88+
nodeLifecycleState.canRun() -> HealthState.PENDING
89+
else -> HealthState.ERROR
90+
}
91+
92+
val channelsHealth = when {
93+
!isOnline -> HealthState.ERROR
94+
else -> computeChannelsHealth(lightningState.channels)
95+
}
96+
97+
return AppHealthState(
98+
internet = internetHealth,
99+
electrum = electrumHealth,
100+
node = nodeHealth,
101+
channels = channelsHealth,
102+
)
103+
}
104+
105+
private fun computeChannelsHealth(channels: List<ChannelDetails>): HealthState {
106+
val hasOpenChannels = channels.any { it.isChannelReady }
107+
val hasPendingChannels = channels.any { !it.isChannelReady }
108+
return when {
109+
hasOpenChannels -> HealthState.READY
110+
hasPendingChannels -> HealthState.PENDING
111+
else -> HealthState.ERROR
112+
}
113+
}
114+
115+
private fun ConnectivityState.asHealth() = when (this) {
116+
ConnectivityState.CONNECTED -> HealthState.READY
117+
ConnectivityState.CONNECTING -> HealthState.PENDING
118+
ConnectivityState.DISCONNECTED -> HealthState.ERROR
119+
}
120+
108121
private fun observePaidOrdersState() {
109122
repoScope.launch {
110123
blocktankRepo.blocktankState.map { it.paidOrders }.distinctUntilChanged().collect { paidOrders ->

app/src/main/java/to/bitkit/repositories/LightningRepo.kt

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@ import com.synonym.bitkitcore.createWithdrawCallbackUrl
1111
import com.synonym.bitkitcore.lnurlAuth
1212
import kotlinx.coroutines.CoroutineDispatcher
1313
import kotlinx.coroutines.CoroutineScope
14+
import kotlinx.coroutines.Job
1415
import kotlinx.coroutines.SupervisorJob
1516
import kotlinx.coroutines.delay
1617
import kotlinx.coroutines.flow.MutableSharedFlow
1718
import kotlinx.coroutines.flow.MutableStateFlow
1819
import kotlinx.coroutines.flow.asSharedFlow
1920
import kotlinx.coroutines.flow.asStateFlow
21+
import kotlinx.coroutines.flow.distinctUntilChanged
2022
import kotlinx.coroutines.flow.first
23+
import kotlinx.coroutines.flow.map
2124
import kotlinx.coroutines.flow.update
25+
import kotlinx.coroutines.isActive
2226
import kotlinx.coroutines.launch
2327
import kotlinx.coroutines.sync.Mutex
2428
import kotlinx.coroutines.tasks.await
@@ -64,6 +68,7 @@ import to.bitkit.utils.Logger
6468
import to.bitkit.utils.ServiceError
6569
import java.util.concurrent.ConcurrentHashMap
6670
import java.util.concurrent.atomic.AtomicBoolean
71+
import java.util.concurrent.atomic.AtomicReference
6772
import javax.inject.Inject
6873
import javax.inject.Singleton
6974
import kotlin.coroutines.cancellation.CancellationException
@@ -84,6 +89,7 @@ class LightningRepo @Inject constructor(
8489
private val lnurlService: LnurlService,
8590
private val cacheStore: CacheStore,
8691
private val preActivityMetadataRepo: PreActivityMetadataRepo,
92+
private val connectivityRepo: ConnectivityRepo,
8793
) {
8894
private val _lightningState = MutableStateFlow(LightningState())
8995
val lightningState = _lightningState.asStateFlow()
@@ -101,6 +107,66 @@ class LightningRepo @Inject constructor(
101107

102108
private val syncMutex = Mutex()
103109
private val syncPending = AtomicBoolean(false)
110+
private val syncRetryJob = AtomicReference<Job?>(null)
111+
112+
init {
113+
observeConnectivityForSyncRetry()
114+
}
115+
116+
private fun observeConnectivityForSyncRetry() {
117+
scope.launch {
118+
connectivityRepo.isOnline
119+
.map { it == ConnectivityState.CONNECTED }
120+
.distinctUntilChanged()
121+
.collect { isConnected ->
122+
if (!isConnected) {
123+
// Cancel any pending retry when disconnected
124+
syncRetryJob.getAndSet(null)?.cancel()
125+
return@collect
126+
}
127+
128+
// Start retry loop if sync is failing
129+
startSyncRetryLoopIfNeeded()
130+
}
131+
}
132+
}
133+
134+
private fun startSyncRetryLoopIfNeeded() {
135+
val state = _lightningState.value
136+
if (!state.nodeLifecycleState.isRunning() || state.lastSyncError == null) {
137+
return
138+
}
139+
140+
// Don't start if already retrying
141+
if (syncRetryJob.get()?.isActive == true) {
142+
return
143+
}
144+
145+
val job = scope.launch {
146+
// Don't start retry loop if offline
147+
if (connectivityRepo.isOnline.first() != ConnectivityState.CONNECTED) {
148+
return@launch
149+
}
150+
151+
while (isActive) {
152+
val currentState = _lightningState.value
153+
// Stop if no longer running or sync is now healthy
154+
if (!currentState.nodeLifecycleState.isRunning() || currentState.isSyncHealthy) {
155+
Logger.debug("Sync retry loop stopped: node not running or sync healthy", context = TAG)
156+
break
157+
}
158+
159+
delay(SYNC_RETRY_DELAY_MS)
160+
Logger.info("Retrying sync after failure", context = TAG)
161+
sync().onSuccess {
162+
Logger.info("Sync retry succeeded", context = TAG)
163+
}.onFailure {
164+
Logger.warn("Sync retry failed, will retry in ${SYNC_RETRY_DELAY_MS / 1000}s", it, context = TAG)
165+
}
166+
}
167+
}
168+
syncRetryJob.set(job)
169+
}
104170

105171
/**
106172
* Executes the provided operation only if the node is running.
@@ -318,6 +384,7 @@ class LightningRepo @Inject constructor(
318384
}
319385
}
320386

387+
@Suppress("TooGenericExceptionCaught")
321388
suspend fun sync(): Result<Unit> = executeWhenNodeRunning("sync") {
322389
// If sync is in progress, mark pending and skip
323390
if (!syncMutex.tryLock()) {
@@ -326,21 +393,28 @@ class LightningRepo @Inject constructor(
326393
return@executeWhenNodeRunning Result.success(Unit)
327394
}
328395

329-
try {
396+
runCatching {
330397
do {
331398
syncPending.set(false)
332399
_lightningState.update { it.copy(isSyncingWallet = true) }
333400
lightningService.sync()
334401
refreshChannelCache()
335402
syncState()
403+
_lightningState.update {
404+
it.copy(
405+
lastSyncError = null,
406+
lastSuccessfulSyncAt = System.currentTimeMillis(),
407+
)
408+
}
336409
if (syncPending.get()) delay(MS_SYNC_LOOP_DEBOUNCE)
337410
} while (syncPending.getAndSet(false))
338-
} finally {
339-
_lightningState.update { it.copy(isSyncingWallet = false) }
411+
}.also {
412+
_lightningState.update { state -> state.copy(isSyncingWallet = false) }
340413
syncMutex.unlock()
414+
}.onFailure {
415+
_lightningState.update { state -> state.copy(lastSyncError = it) }
416+
startSyncRetryLoopIfNeeded()
341417
}
342-
343-
Result.success(Unit)
344418
}
345419

346420
fun syncAsync() = scope.launch {
@@ -349,6 +423,16 @@ class LightningRepo @Inject constructor(
349423
}
350424
}
351425

426+
private suspend fun ensureSyncedBeforeSend(): Result<Unit> {
427+
Logger.debug("Ensuring wallet is synced before send", context = TAG)
428+
return sync().fold(
429+
onSuccess = { Result.success(Unit) },
430+
onFailure = {
431+
Result.failure(SyncUnhealthyError())
432+
},
433+
)
434+
}
435+
352436
/** Clear pending sync flag. Called when manual pull-to-refresh takes priority. */
353437
fun clearPendingSync() = syncPending.set(false)
354438

@@ -634,6 +718,11 @@ class LightningRepo @Inject constructor(
634718
): Result<Txid> = executeWhenNodeRunning("sendOnChain") {
635719
require(address.isNotEmpty()) { "Send address cannot be empty" }
636720

721+
// Ensure wallet is synced before sending to have up-to-date state
722+
ensureSyncedBeforeSend().onFailure {
723+
return@executeWhenNodeRunning Result.failure(it)
724+
}
725+
637726
val transactionSpeed = speed ?: settingsStore.data.first().defaultTransactionSpeed
638727
val satsPerVByte = getFeeRateForSpeed(transactionSpeed, feeRates).getOrThrow()
639728

@@ -960,6 +1049,7 @@ class LightningRepo @Inject constructor(
9601049
private const val TAG = "LightningRepo"
9611050
private const val LENGTH_CHANNEL_ID_PREVIEW = 10
9621051
private const val MS_SYNC_LOOP_DEBOUNCE = 500L
1052+
private const val SYNC_RETRY_DELAY_MS = 15_000L
9631053
}
9641054
}
9651055

@@ -968,6 +1058,7 @@ class NodeSetupError : AppError("Unknown node setup error")
9681058
class NodeStopTimeoutError : AppError("Timeout waiting for node to stop")
9691059
class NodeRunTimeoutError(opName: String) : AppError("Timeout waiting for node to run and execute: '$opName'")
9701060
class GetPaymentsError : AppError("It wasn't possible get the payments")
1061+
class SyncUnhealthyError : AppError("Wallet sync failed before send")
9711062

9721063
data class LightningState(
9731064
val nodeId: String = "",
@@ -978,6 +1069,15 @@ data class LightningState(
9781069
val balances: BalanceDetails? = null,
9791070
val isSyncingWallet: Boolean = false,
9801071
val isGeoBlocked: Boolean = false,
1072+
val lastSyncError: Throwable? = null,
1073+
val lastSuccessfulSyncAt: Long? = null,
9811074
) {
9821075
fun block(): BestBlock? = nodeStatus?.currentBestBlock
1076+
1077+
/**
1078+
* Returns true if the node has synced successfully at least once and the last sync didn't fail.
1079+
* This is used to determine if critical operations like sending should be allowed.
1080+
*/
1081+
val isSyncHealthy: Boolean
1082+
get() = lastSyncError == null && lastSuccessfulSyncAt != null
9831083
}

app/src/main/java/to/bitkit/repositories/WalletRepo.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class WalletRepo @Inject constructor(
202202
eventSyncJob?.cancel()
203203
eventSyncJob = repoScope.launch {
204204
delay(EVENT_SYNC_DEBOUNCE_MS)
205-
syncNodeAndWallet()
205+
syncBalances()
206206
transferRepo.syncTransferStates()
207207
}
208208
}

0 commit comments

Comments
 (0)