diff --git a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt index 6a0bfda4..aee22173 100644 --- a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt +++ b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt @@ -32,21 +32,33 @@ class CoalesceService( private val slots = ConcurrentHashMap() fun submitAsync(key: String, task: () -> Unit): CompletableFuture { - val slot = slots.computeIfAbsent(key) { Slot() } + while (true) { + val slot = slots.computeIfAbsent(key) { Slot() } + var firstFuture: CompletableFuture? = null - // If no run is in flight, start the loop and return a future for the first run. - if (slot.running.compareAndSet(false, true)) { - val firstFuture = CompletableFuture() - runLoopAsync(key, slot, task, firstFuture) - return firstFuture - } + val queuedFuture = synchronized(slot.monitor) { + if (slots[key] !== slot) return@synchronized null + + // If no run is in flight, start the loop and return a future for the first run. + if (!slot.running.get()) { + firstFuture = CompletableFuture() + slot.running.set(true) + return@synchronized null + } + + // Already running: request one rerun (collapsed) and return the shared future for that rerun. + slot.rerunRequested.set(true) + val existing = slot.nextFuture + if (existing != null) return@synchronized existing + CompletableFuture().also { slot.nextFuture = it } + } - // Already running: request one rerun (collapsed) and return the shared future for that rerun. - slot.rerunRequested.set(true) - synchronized(slot.monitor) { - val existing = slot.nextFuture - if (existing != null) return existing - return CompletableFuture().also { slot.nextFuture = it } + if (queuedFuture != null) return queuedFuture + val startFuture = firstFuture + if (startFuture != null) { + runLoopAsync(key, slot, task, startFuture) + return startFuture + } } } @@ -70,28 +82,32 @@ class CoalesceService( } catch (ex: Exception) { logger.error("Coalesced task failed for key={}", key, ex) currentFuture?.completeExceptionally(ex) + synchronized(slot.monitor) { + val pendingNext = slot.nextFuture + slot.nextFuture = null + slot.rerunRequested.set(false) + slot.running.set(false) + slots.remove(key, slot) + pendingNext?.completeExceptionally(ex) + } // Keep existing behavior: stop reruns for this cycle after a failed execution. keepGoing = false - continue } - // If no rerun requested, finish. - if (!slot.rerunRequested.getAndSet(false)) { - keepGoing = false - continue - } - - // Promote the shared next future for the rerun. If none exists (rare race), create one. - currentFuture = synchronized(slot.monitor) { - val next = slot.nextFuture ?: CompletableFuture() - slot.nextFuture = null - next + synchronized(slot.monitor) { + // If no rerun requested, finish. + if (!slot.rerunRequested.getAndSet(false)) { + slot.running.set(false) + slots.remove(key, slot) + keepGoing = false + } else { + // Promote the shared next future for the rerun. If none exists (rare race), create one. + val next = slot.nextFuture ?: CompletableFuture() + slot.nextFuture = null + currentFuture = next + } } } - - // Mark not running and remove slot. If a submit races here, it will start a new loop. - slot.running.set(false) - slots.remove(key, slot) }, coalesceExecutor, )