@@ -3,6 +3,7 @@ package to.bitkit.services
33import kotlinx.coroutines.CoroutineDispatcher
44import kotlinx.coroutines.Job
55import kotlinx.coroutines.cancelAndJoin
6+ import kotlinx.coroutines.ensureActive
67import kotlinx.coroutines.flow.MutableSharedFlow
78import kotlinx.coroutines.flow.SharedFlow
89import kotlinx.coroutines.flow.asSharedFlow
@@ -55,6 +56,7 @@ import to.bitkit.utils.jsonLogOf
5556import java.io.File
5657import javax.inject.Inject
5758import javax.inject.Singleton
59+ import kotlin.coroutines.cancellation.CancellationException
5860import kotlin.io.path.Path
5961import kotlin.time.Duration
6062
@@ -219,12 +221,14 @@ class LightningService @Inject constructor(
219221 runCatching {
220222 Logger .debug(" LDK event listener started" , context = TAG )
221223 if (timeout != null ) {
222- withTimeout(timeout) { listenForEvents(eventHandler) }
224+ withTimeout(timeout) { listenForEvents(node, eventHandler) }
223225 } else {
224- listenForEvents(eventHandler)
226+ listenForEvents(node, eventHandler)
225227 }
226228 }.onFailure {
227- Logger .error(" LDK event listener error" , it, context = TAG )
229+ if (it !is CancellationException ) {
230+ Logger .error(" LDK event listener error" , it, context = TAG )
231+ }
228232 }
229233 }
230234 }
@@ -236,17 +240,17 @@ class LightningService @Inject constructor(
236240 shouldListenForEvents = false
237241 listenerJob?.cancelAndJoin()
238242 listenerJob = null
239- val node = this .node ? : throw ServiceError .NodeNotStarted ()
243+
244+ val node = this .node ? : run {
245+ Logger .debug(" Node already stopped" , context = TAG )
246+ return
247+ }
240248
241249 Logger .debug(" Stopping node…" , context = TAG )
242250 ServiceQueue .LDK .background {
243- try {
244- node.stop()
245- this @LightningService.node = null
246- } catch (_: NodeException .NotRunning ) {
247- // Node is not running, clear the reference
248- this @LightningService.node = null
249- }
251+ runCatching { node.stop() }
252+ .onFailure { if (it !is NodeException .NotRunning ) throw it }
253+ this @LightningService.node = null
250254 }
251255 Logger .info(" Node stopped" , context = TAG )
252256 }
@@ -723,20 +727,34 @@ class LightningService @Inject constructor(
723727 // region events
724728 private var shouldListenForEvents = true
725729
726- suspend fun listenForEvents (onEvent : NodeEventHandler ? = null) = withContext(bgDispatcher) {
730+ fun startEventListener (onEvent : NodeEventHandler ? = null): Result <Unit > = runCatching {
731+ val node = this .node ? : throw ServiceError .NodeNotSetup ()
732+ shouldListenForEvents = true
733+ listenerJob = launch {
734+ runCatching {
735+ Logger .debug(" LDK event listener started" , context = TAG )
736+ listenForEvents(node, onEvent)
737+ }.onFailure {
738+ if (it !is CancellationException ) {
739+ Logger .error(" LDK event listener error" , it, context = TAG )
740+ }
741+ }
742+ }
743+ }
744+
745+ private suspend fun listenForEvents (node : Node , onEvent : NodeEventHandler ? = null) = withContext(bgDispatcher) {
727746 while (shouldListenForEvents) {
728- val node = this @LightningService.node ? : let {
729- Logger .error(ServiceError .NodeNotStarted ().message.orEmpty(), context = TAG )
747+ ensureActive()
748+
749+ val event = runCatching { node.nextEventAsync() }.getOrElse {
750+ Logger .warn(" Event listener stopping: node stopped" , it, context = TAG )
730751 return @withContext
731752 }
732- val event = node.nextEventAsync()
753+
733754 Logger .debug(" LDK event fired: ${jsonLogOf(event)} " , context = TAG )
734- try {
735- node.eventHandled()
736- Logger .verbose(" LDK eventHandled: '$event '" , context = TAG )
737- } catch (e: NodeException ) {
738- Logger .verbose(" LDK eventHandled error: '$event '" , LdkError (e), context = TAG )
739- }
755+ runCatching { node.eventHandled() }
756+ .onSuccess { Logger .verbose(" LDK eventHandled: '$event '" , context = TAG ) }
757+ .onFailure { Logger .verbose(" LDK eventHandled error: '$event '" , it, context = TAG ) }
740758 onEvent?.invoke(event)
741759 }
742760 }
0 commit comments