diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index bf07545aaa53..631881db7a75 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -207,6 +207,25 @@ public CostBasedAutoScalerConfig getConfig()
return config;
}
+ private boolean isCriticalLag(CostMetrics metrics)
+ {
+ final Long criticalLagThreshold = config.getCriticalLagThreshold();
+ return metrics != null && criticalLagThreshold != null
+ && metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION;
+ }
+
+ /**
+ * Whether the last collected metrics crossed {@link WeightedCostFunction#CRITICAL_LAG_TIER2_FRACTION} of
+ * {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()}, meaning the argmin search should be
+ * skipped entirely in favor of jumping straight to the maximum task count.
+ */
+ private boolean isEmergencyLag(CostMetrics metrics)
+ {
+ final Long criticalLagThreshold = config.getCriticalLagThreshold();
+ return metrics != null && criticalLagThreshold != null
+ && metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION;
+ }
+
/**
* Returns the lowest-cost task count given {@code metrics}, or {@link #CANNOT_COMPUTE} when
* metrics are unusable. Returning the current task count means the current count is already
@@ -244,6 +263,24 @@ int computeOptimalTaskCount(CostMetrics metrics)
return currentTaskCount;
}
+ final boolean criticalLag = isCriticalLag(metrics);
+ final boolean emergencyLag = isEmergencyLag(metrics);
+ if (emergencyLag) {
+ log.info(
+ "Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: skipping the argmin"
+ + " search and jumping straight to the maximum task count.",
+ supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION * 100,
+ config.getCriticalLagThreshold()
+ );
+ } else if (criticalLag) {
+ log.info(
+ "Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: widening scale-up"
+ + " candidates and maxing out the lag-amplification multiplier.",
+ supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION * 100,
+ config.getCriticalLagThreshold()
+ );
+ }
+
// Start with the current task count as optimal
int optimalTaskCount = currentTaskCount;
CostResult optimalCost = costFunction.computeCost(metrics, currentTaskCount, config);
@@ -270,7 +307,7 @@ int computeOptimalTaskCount(CostMetrics metrics)
int startIndex = 0;
int endIndex = validTaskCounts.length - 1;
- if (config.isUseTaskCountBoundariesOnScaleUp()) {
+ if (config.isUseTaskCountBoundariesOnScaleUp() && !criticalLag) {
int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount);
endIndex = currentTaskCountIndex >= 0
? Math.min(currentTaskCountIndex + BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex)
@@ -284,6 +321,12 @@ int computeOptimalTaskCount(CostMetrics metrics)
: startIndex;
}
+ // Emergency (tier 2) lag skips the argmin search entirely: evaluate only the maximum valid task count.
+ if (emergencyLag) {
+ startIndex = validTaskCounts.length - 1;
+ endIndex = validTaskCounts.length - 1;
+ }
+
for (int i = startIndex; i <= endIndex; ++i) {
final int taskCount = validTaskCounts[i];
CostResult costResult = costFunction.computeCost(metrics, taskCount, config);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index fa4d547eae49..0d84d45a19f2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -69,6 +69,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
private final Duration minScaleDownDelay;
private final boolean scaleDownDuringTaskRolloverOnly;
private final boolean usePollIdleRatio;
+ private final Long criticalLagThreshold;
/**
* Creates a new CostBasedAutoScalerConfig instance.
@@ -93,7 +94,8 @@ public CostBasedAutoScalerConfig(
@Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
@Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly,
- @Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio
+ @Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio,
+ @Nullable @JsonProperty("criticalLagThreshold") Long criticalLagThreshold
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
@@ -123,6 +125,12 @@ public CostBasedAutoScalerConfig(
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY);
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
this.usePollIdleRatio = Configs.valueOrDefault(usePollIdleRatio, true);
+ this.criticalLagThreshold = criticalLagThreshold;
+
+ Preconditions.checkArgument(
+ criticalLagThreshold == null || criticalLagThreshold > 0,
+ "criticalLagThreshold must be > 0"
+ );
if (this.enableTaskAutoScaler) {
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true");
@@ -305,6 +313,24 @@ public boolean isUsePollIdleRatio()
return usePollIdleRatio;
}
+ /**
+ * Aggregate (sum-across-partitions) lag threshold driving a two-tier SLA-critical fast path,
+ * relative to {@link CostMetrics#getAggregateLag()}:
+ *
+ * - At 75% of this value, the lag-amplification multiplier maxes out at 6.0 (instead of the
+ * default 0.3), and the scale-up candidate search bypasses {@link #isUseTaskCountBoundariesOnScaleUp()}.
+ * - At 95% of this value, cost minimization is skipped entirely and the task count jumps
+ * straight to the maximum.
+ *
+ * {@code null} disables the feature.
+ */
+ @JsonProperty
+ @Nullable
+ public Long getCriticalLagThreshold()
+ {
+ return criticalLagThreshold;
+ }
+
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
@@ -338,7 +364,8 @@ public boolean equals(Object o)
&& scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly
&& usePollIdleRatio == that.usePollIdleRatio
&& Objects.equals(taskCountStart, that.taskCountStart)
- && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
+ && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
+ && Objects.equals(criticalLagThreshold, that.criticalLagThreshold);
}
@Override
@@ -360,7 +387,8 @@ public int hashCode()
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly,
- usePollIdleRatio
+ usePollIdleRatio,
+ criticalLagThreshold
);
}
@@ -384,6 +412,7 @@ public String toString()
", minScaleDownDelay=" + minScaleDownDelay +
", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly +
", usePollIdleRatio=" + usePollIdleRatio +
+ ", criticalLagThreshold=" + criticalLagThreshold +
'}';
}
@@ -409,6 +438,7 @@ public static class Builder
private Duration minScaleDownDelay;
private Boolean scaleDownDuringTaskRolloverOnly;
private Boolean usePollIdleRatio;
+ private Long criticalLagThreshold;
private Builder()
{
@@ -498,6 +528,12 @@ public Builder usePollIdleRatio(boolean usePollIdleRatio)
return this;
}
+ public Builder criticalLagThreshold(Long criticalLagThreshold)
+ {
+ this.criticalLagThreshold = criticalLagThreshold;
+ return this;
+ }
+
public Builder useTaskCountBoundariesOnScaleUp(boolean useTaskCountBoundariesOnScaleUp)
{
this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp;
@@ -528,7 +564,8 @@ public CostBasedAutoScalerConfig build()
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly,
- usePollIdleRatio
+ usePollIdleRatio,
+ criticalLagThreshold
);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index beaf0a5b9d55..c0d330edb526 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -42,6 +42,26 @@ public class WeightedCostFunction
*/
static final double LAG_AMPLIFICATION_MULTIPLIER = 0.3;
+ /**
+ * Amplification multiplier used once aggregate lag crosses {@link #CRITICAL_LAG_TIER1_FRACTION} of
+ * {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} (tier 1 of the critical-lag fast path).
+ */
+ static final double CRITICAL_LAG_AMPLIFICATION_MULTIPLIER = 6.0;
+
+ /**
+ * Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 1 of the
+ * critical-lag fast path engages: amplification maxes out at {@link #CRITICAL_LAG_AMPLIFICATION_MULTIPLIER}
+ * and the scale-up candidate boundary is bypassed.
+ */
+ static final double CRITICAL_LAG_TIER1_FRACTION = 0.75;
+
+ /**
+ * Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 2 of the
+ * critical-lag fast path engages: the cost-minimization search is skipped entirely and the task
+ * count jumps straight to the maximum.
+ */
+ static final double CRITICAL_LAG_TIER2_FRACTION = 0.95;
+
/**
* Exponent (< 1) for sublinear busy redistribution in the idle projection:
* busy grows as {@code (currentTaskCount / proposedTaskCount)^EXPONENT}, not linearly.
@@ -108,12 +128,19 @@ public CostResult computeCost(
// Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks.
// In case of increasing lag, we apply an amplification factor to reflect the urgency of addressing lag.
// Caution: we rely only on the metrics, the real issues may be absolutely different, up to hardware failure.
+ // Once aggregate lag crosses CRITICAL_LAG_TIER1_FRACTION of criticalLagThreshold, the multiplier is
+ // maxed out at CRITICAL_LAG_AMPLIFICATION_MULTIPLIER (vs the default 0.3).
final double lagRecoveryTime;
if (metrics.getAggregateLag() <= 0) {
lagRecoveryTime = 0;
} else {
final double lagPerPartition = metrics.getAggregateLag() / metrics.getPartitionCount();
- final double amplification = Math.max(1.0, 1.0 + LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
+ final Long criticalLagThreshold = config.getCriticalLagThreshold();
+ final boolean criticalLag = criticalLagThreshold != null
+ && metrics.getAggregateLag() >= criticalLagThreshold * CRITICAL_LAG_TIER1_FRACTION;
+
+ final double amplificationMultiplier = criticalLag ? CRITICAL_LAG_AMPLIFICATION_MULTIPLIER : LAG_AMPLIFICATION_MULTIPLIER;
+ final double amplification = Math.max(1.0, 1.0 + amplificationMultiplier * Math.log(lagPerPartition));
final double adjustedProcessingRate = Math.max(avgProcessingRate, MIN_PROCESSING_RATE);
lagRecoveryTime = metrics.getAggregateLag() * amplification / (proposedTaskCount * adjustedProcessingRate);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 6d09221b410a..c802840cf4df 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -54,7 +54,8 @@ public void testSerdeWithAllProperties() throws Exception
+ " \"minScaleUpDelay\": \"PT5M\",\n"
+ " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true,\n"
- + " \"usePollIdleRatio\": false\n"
+ + " \"usePollIdleRatio\": false,\n"
+ + " \"criticalLagThreshold\": 500000\n"
+ "}";
final CostBasedAutoScalerConfig config = mapper.readValue(json, CostBasedAutoScalerConfig.class);
@@ -74,6 +75,7 @@ public void testSerdeWithAllProperties() throws Exception
Assert.assertFalse(config.isUsePollIdleRatio());
Assert.assertFalse(config.isUseTaskCountBoundariesOnScaleUp());
Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown());
+ Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold());
// Test serialization back to JSON
final String serialized = mapper.writeValueAsString(config);
@@ -112,6 +114,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown());
Assert.assertNull(config.getTaskCountStart());
Assert.assertNull(config.getStopTaskCountRatio());
+ Assert.assertNull(config.getCriticalLagThreshold());
}
@Test
@@ -221,6 +224,7 @@ public void testBuilder()
.minScaleDownDelay(Duration.standardMinutes(10))
.scaleDownDuringTaskRolloverOnly(true)
.usePollIdleRatio(false)
+ .criticalLagThreshold(500000L)
.build();
Assert.assertTrue(config.getEnableTaskAutoScaler());
@@ -238,6 +242,18 @@ public void testBuilder()
Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertFalse(config.isUsePollIdleRatio());
+ Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testValidation_ZeroCriticalLagThreshold()
+ {
+ CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(5)
+ .criticalLagThreshold(0L)
+ .enableTaskAutoScaler(true)
+ .build();
}
@Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index d773e20bfcb3..696160f2d90e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -258,6 +258,62 @@ public void testComputeOptimalTaskCountLimitsTaskCountJumps()
);
}
+ @Test
+ public void testCriticalLagThresholdBypassesScaleUpBoundary()
+ {
+ // aggregateLag = 100_000 * 100 = 10,000,000. With threshold=12,000,000: tier1=9,000,000 (crossed),
+ // tier2=11,400,000 (not crossed), so this exercises tier1 (boundary bypass) without triggering
+ // tier2's emergency jump-to-max.
+ final CostBasedAutoScalerConfig boundedScaleUpConfig = CostBasedAutoScalerConfig
+ .builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(1.0)
+ .idleWeight(0.0)
+ .useTaskCountBoundariesOnScaleUp(true)
+ .criticalLagThreshold(12_000_000L)
+ .build();
+ final CostBasedAutoScaler scaler = createAutoScaler(boundedScaleUpConfig);
+
+ Assert.assertEquals(
+ "Critical lag should bypass the scale-up boundary and jump straight to the argmin",
+ 100,
+ scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 100, 0.25))
+ );
+
+ // Below the threshold, the boundary still applies as usual.
+ Assert.assertEquals(
+ "Below criticalLagThreshold, the scale-up boundary still limits candidates",
+ 13,
+ scaler.computeOptimalTaskCount(createMetrics(10.0, 10, 100, 0.25))
+ );
+ }
+
+ @Test
+ public void testEmergencyLagJumpsStraightToMaxTaskCount()
+ {
+ // aggregateLag = 100_000 * 500 = 50,000,000. With threshold=10,000,000: tier2=9,500,000 is
+ // comfortably crossed, so the argmin search is skipped entirely in favor of the maximum task count.
+ final CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig
+ .builder()
+ .taskCountMax(500)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.1)
+ .idleWeight(0.9)
+ .criticalLagThreshold(10_000_000L)
+ .build();
+ final CostBasedAutoScaler scaler = createAutoScaler(config);
+
+ // Idle-heavy weights would normally argue for scaling down, but emergency lag overrides that entirely.
+ Assert.assertEquals(
+ "Emergency lag should jump straight to the maximum task count regardless of idle-favoring weights",
+ 500,
+ scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 500, 0.9))
+ );
+ }
+
@Test
public void testExtractPollIdleRatio()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 6802692ade49..da4c68dbbf4b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -357,6 +357,71 @@ public void testLagAmplificationAppliedUnconditionally()
Assert.assertEquals("Lag amplification should increase lag recovery time", expected, costWithAmp, 0.0001);
}
+ @Test
+ public void testCriticalLagThresholdMaxesOutAmplificationMultiplier()
+ {
+ int currentTaskCount = 10;
+ int proposedTaskCount = 10;
+ int partitionCount = 10;
+ double avgPartitionLag = 150.0;
+ double aggregateLag = avgPartitionLag * partitionCount;
+
+ CostMetrics metrics = createMetrics(avgPartitionLag, currentTaskCount, partitionCount, 0.1);
+
+ // aggregateLag sits at exactly tier1Fraction (75%) of this threshold.
+ long tier1Threshold = (long) (aggregateLag / WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION);
+
+ CostBasedAutoScalerConfig noThreshold = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(1.0)
+ .idleWeight(0.0)
+ .build();
+ CostBasedAutoScalerConfig belowTier1 = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(1.0)
+ .idleWeight(0.0)
+ .criticalLagThreshold(tier1Threshold + 100)
+ .build();
+ CostBasedAutoScalerConfig atTier1 = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(1.0)
+ .idleWeight(0.0)
+ .criticalLagThreshold(tier1Threshold)
+ .build();
+
+ double costBelowTier1 = costFunction.computeCost(metrics, proposedTaskCount, belowTier1).totalCost();
+ Assert.assertEquals(
+ "Below tier1, amplification uses the default multiplier",
+ costFunction.computeCost(metrics, proposedTaskCount, noThreshold).totalCost(),
+ costBelowTier1,
+ 0.0001
+ );
+
+ double lagPerPartition = aggregateLag / partitionCount;
+ double criticalAmplification =
+ 1.0 + WeightedCostFunction.CRITICAL_LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition);
+ double expectedCriticalCost =
+ aggregateLag * criticalAmplification / (proposedTaskCount * WeightedCostFunction.MIN_PROCESSING_RATE);
+
+ double costAtTier1 = costFunction.computeCost(metrics, proposedTaskCount, atTier1).totalCost();
+ Assert.assertEquals(
+ "At/above tier1, the amplification multiplier maxes out at CRITICAL_LAG_AMPLIFICATION_MULTIPLIER",
+ expectedCriticalCost,
+ costAtTier1,
+ 0.0001
+ );
+ Assert.assertTrue(
+ "Critical-lag cost should exceed the default-multiplier cost for the same lag",
+ costAtTier1 > costBelowTier1
+ );
+ }
+
@Test
public void testAmplificationGrowsWithLag()
{