Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions backend/src/main/kotlin/org/tormap/service/CoalesceService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,33 @@ class CoalesceService(
private val slots = ConcurrentHashMap<String, Slot>()

fun submitAsync(key: String, task: () -> Unit): CompletableFuture<Void> {
val slot = slots.computeIfAbsent(key) { Slot() }
while (true) {
val slot = slots.computeIfAbsent(key) { Slot() }
var firstFuture: CompletableFuture<Void>? = null

// If no run is in flight, start the loop and return a future for the first run.
if (slot.running.compareAndSet(false, true)) {
val firstFuture = CompletableFuture<Void>()
runLoopAsync(key, slot, task, firstFuture)
return firstFuture
}
val queuedFuture = synchronized(slot.monitor) {
if (slots[key] !== slot) return@synchronized null

// If no run is in flight, start the loop and return a future for the first run.
if (!slot.running.get()) {
firstFuture = CompletableFuture<Void>()
slot.running.set(true)
return@synchronized null
}

// Already running: request one rerun (collapsed) and return the shared future for that rerun.
slot.rerunRequested.set(true)
val existing = slot.nextFuture
if (existing != null) return@synchronized existing
CompletableFuture<Void>().also { slot.nextFuture = it }
}

// Already running: request one rerun (collapsed) and return the shared future for that rerun.
slot.rerunRequested.set(true)
synchronized(slot.monitor) {
val existing = slot.nextFuture
if (existing != null) return existing
return CompletableFuture<Void>().also { slot.nextFuture = it }
if (queuedFuture != null) return queuedFuture
val startFuture = firstFuture
if (startFuture != null) {
runLoopAsync(key, slot, task, startFuture)
return startFuture
}
}
}

Expand All @@ -70,28 +82,32 @@ class CoalesceService(
} catch (ex: Exception) {
logger.error("Coalesced task failed for key={}", key, ex)
currentFuture?.completeExceptionally(ex)
synchronized(slot.monitor) {
val pendingNext = slot.nextFuture
slot.nextFuture = null
slot.rerunRequested.set(false)
slot.running.set(false)
slots.remove(key, slot)
pendingNext?.completeExceptionally(ex)
}
// Keep existing behavior: stop reruns for this cycle after a failed execution.
keepGoing = false
continue
}

// If no rerun requested, finish.
if (!slot.rerunRequested.getAndSet(false)) {
keepGoing = false
continue
}

// Promote the shared next future for the rerun. If none exists (rare race), create one.
currentFuture = synchronized(slot.monitor) {
val next = slot.nextFuture ?: CompletableFuture<Void>()
slot.nextFuture = null
next
synchronized(slot.monitor) {
// If no rerun requested, finish.
if (!slot.rerunRequested.getAndSet(false)) {
slot.running.set(false)
slots.remove(key, slot)
keepGoing = false
} else {
// Promote the shared next future for the rerun. If none exists (rare race), create one.
val next = slot.nextFuture ?: CompletableFuture<Void>()
slot.nextFuture = null
currentFuture = next
}
Comment thread
JuliusHenke marked this conversation as resolved.
}
}

// Mark not running and remove slot. If a submit races here, it will start a new loop.
slot.running.set(false)
slots.remove(key, slot)
},
coalesceExecutor,
)
Expand Down
Loading