From 0a246415d4314993c4b2a6436fb96eee5b16a91b Mon Sep 17 00:00:00 2001 From: Julius Henke Date: Wed, 20 May 2026 18:51:22 +0200 Subject: [PATCH 1/3] fix: synchronize coalesce slot shutdown and rerun handoff --- .../org/tormap/service/CoalesceService.kt | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt index 6a0bfda4..88a28feb 100644 --- a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt +++ b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt @@ -33,21 +33,25 @@ class CoalesceService( fun submitAsync(key: String, task: () -> Unit): CompletableFuture { val slot = slots.computeIfAbsent(key) { Slot() } + var firstFuture: CompletableFuture? = null - // If no run is in flight, start the loop and return a future for the first run. - if (slot.running.compareAndSet(false, true)) { - val firstFuture = CompletableFuture() - runLoopAsync(key, slot, task, firstFuture) - return firstFuture - } - - // Already running: request one rerun (collapsed) and return the shared future for that rerun. - slot.rerunRequested.set(true) synchronized(slot.monitor) { + // If no run is in flight, start the loop and return a future for the first run. + if (!slot.running.get()) { + firstFuture = CompletableFuture() + slot.running.set(true) + return@synchronized + } + + // Already running: request one rerun (collapsed) and return the shared future for that rerun. + slot.rerunRequested.set(true) val existing = slot.nextFuture if (existing != null) return existing return CompletableFuture().also { slot.nextFuture = it } } + + runLoopAsync(key, slot, task, firstFuture!!) + return firstFuture!! } internal fun hasStateForKey(key: String): Boolean = slots.containsKey(key) @@ -75,23 +79,21 @@ class CoalesceService( continue } - // If no rerun requested, finish. - if (!slot.rerunRequested.getAndSet(false)) { - keepGoing = false - continue - } + synchronized(slot.monitor) { + // If no rerun requested, finish. + if (!slot.rerunRequested.getAndSet(false)) { + slot.running.set(false) + slots.remove(key, slot) + keepGoing = false + continue + } - // Promote the shared next future for the rerun. If none exists (rare race), create one. - currentFuture = synchronized(slot.monitor) { + // Promote the shared next future for the rerun. If none exists (rare race), create one. val next = slot.nextFuture ?: CompletableFuture() slot.nextFuture = null - next + currentFuture = next } } - - // Mark not running and remove slot. If a submit races here, it will start a new loop. - slot.running.set(false) - slots.remove(key, slot) }, coalesceExecutor, ) From c8a2d90aa0fd568829a2a7a6f4126863986b3570 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 17:01:16 +0000 Subject: [PATCH 2/3] fix: prevent stale slot from starting duplicate coalesce loop Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/ceafc039-466b-486c-a47f-746e72bd07db Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com> --- .../org/tormap/service/CoalesceService.kt | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt index 88a28feb..2f180919 100644 --- a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt +++ b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt @@ -32,26 +32,34 @@ class CoalesceService( private val slots = ConcurrentHashMap() fun submitAsync(key: String, task: () -> Unit): CompletableFuture { - val slot = slots.computeIfAbsent(key) { Slot() } - var firstFuture: CompletableFuture? = null + while (true) { + val slot = slots.computeIfAbsent(key) { Slot() } + var firstFuture: CompletableFuture? = null - synchronized(slot.monitor) { - // If no run is in flight, start the loop and return a future for the first run. - if (!slot.running.get()) { - firstFuture = CompletableFuture() - slot.running.set(true) - return@synchronized + val queuedFuture = synchronized(slot.monitor) { + if (slots[key] !== slot) return@synchronized null + + // If no run is in flight, start the loop and return a future for the first run. + if (!slot.running.get()) { + firstFuture = CompletableFuture() + slot.running.set(true) + return@synchronized null + } + + // Already running: request one rerun (collapsed) and return the shared future for that rerun. + slot.rerunRequested.set(true) + val existing = slot.nextFuture + if (existing != null) return@synchronized existing + CompletableFuture().also { slot.nextFuture = it } } - // Already running: request one rerun (collapsed) and return the shared future for that rerun. - slot.rerunRequested.set(true) - val existing = slot.nextFuture - if (existing != null) return existing - return CompletableFuture().also { slot.nextFuture = it } + if (queuedFuture != null) return queuedFuture + val startFuture = firstFuture + if (startFuture != null) { + runLoopAsync(key, slot, task, startFuture) + return startFuture + } } - - runLoopAsync(key, slot, task, firstFuture!!) - return firstFuture!! } internal fun hasStateForKey(key: String): Boolean = slots.containsKey(key) From 02ebc1c08e06c1f4012046e1cf75a7d088785d64 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 20 May 2026 17:03:44 +0000 Subject: [PATCH 3/3] fix: clean up coalesce slot state on task failure Agent-Logs-Url: https://github.com/TorMap/tormap/sessions/3ff79a36-5735-4758-a207-d2ef45467cbb Co-authored-by: JuliusHenke <23460202+JuliusHenke@users.noreply.github.com> --- .../org/tormap/service/CoalesceService.kt | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt index 2f180919..aee22173 100644 --- a/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt +++ b/backend/src/main/kotlin/org/tormap/service/CoalesceService.kt @@ -82,9 +82,16 @@ class CoalesceService( } catch (ex: Exception) { logger.error("Coalesced task failed for key={}", key, ex) currentFuture?.completeExceptionally(ex) + synchronized(slot.monitor) { + val pendingNext = slot.nextFuture + slot.nextFuture = null + slot.rerunRequested.set(false) + slot.running.set(false) + slots.remove(key, slot) + pendingNext?.completeExceptionally(ex) + } // Keep existing behavior: stop reruns for this cycle after a failed execution. keepGoing = false - continue } synchronized(slot.monitor) { @@ -93,13 +100,12 @@ class CoalesceService( slot.running.set(false) slots.remove(key, slot) keepGoing = false - continue + } else { + // Promote the shared next future for the rerun. If none exists (rare race), create one. + val next = slot.nextFuture ?: CompletableFuture() + slot.nextFuture = null + currentFuture = next } - - // Promote the shared next future for the rerun. If none exists (rare race), create one. - val next = slot.nextFuture ?: CompletableFuture() - slot.nextFuture = null - currentFuture = next } } },