From f4029759ff21665af17b376b499b5f044b926542 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Fri, 3 Jul 2026 15:11:00 +0300 Subject: [PATCH 1/2] Introduce lag emergency --- .../autoscaler/CostBasedAutoScaler.java | 50 +++++++++++++- .../autoscaler/CostBasedAutoScalerConfig.java | 45 +++++++++++-- .../autoscaler/WeightedCostFunction.java | 29 ++++++++- .../CostBasedAutoScalerConfigTest.java | 18 ++++- .../autoscaler/CostBasedAutoScalerTest.java | 56 ++++++++++++++++ .../autoscaler/WeightedCostFunctionTest.java | 65 +++++++++++++++++++ 6 files changed, 255 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index bf07545aaa53..bdf7c3391b66 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -141,7 +141,10 @@ private ServiceMetricEvent.Builder getMetricBuilder() public void start() { autoscalerExecutor.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter), + supervisor.buildDynamicAllocationTask( + this::computeTaskCountForScaleAction, () -> { + }, emitter + ), config.getScaleActionPeriodMillis(), config.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS @@ -207,6 +210,25 @@ public CostBasedAutoScalerConfig getConfig() return config; } + private boolean isCriticalLag(CostMetrics metrics) + { + final Long criticalLagThreshold = config.getCriticalLagThreshold(); + return metrics != null && criticalLagThreshold != null + && metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION; + } + + /** + * Whether the last collected metrics crossed {@link WeightedCostFunction#CRITICAL_LAG_TIER2_FRACTION} of + * {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()}, meaning the argmin search should be + * skipped entirely in favor of jumping straight to the maximum task count. + */ + private boolean isEmergencyLag(CostMetrics metrics) + { + final Long criticalLagThreshold = config.getCriticalLagThreshold(); + return metrics != null && criticalLagThreshold != null + && metrics.getAggregateLag() >= criticalLagThreshold * WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION; + } + /** * Returns the lowest-cost task count given {@code metrics}, or {@link #CANNOT_COMPUTE} when * metrics are unusable. Returning the current task count means the current count is already @@ -244,6 +266,24 @@ int computeOptimalTaskCount(CostMetrics metrics) return currentTaskCount; } + final boolean criticalLag = isCriticalLag(metrics); + final boolean emergencyLag = isEmergencyLag(metrics); + if (emergencyLag) { + log.info( + "Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: skipping the argmin" + + " search and jumping straight to the maximum task count.", + supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER2_FRACTION * 100, + config.getCriticalLagThreshold() + ); + } else if (criticalLag) { + log.info( + "Supervisor[%s] aggregateLag[%.0f] crossed [%.0f%%] of criticalLagThreshold[%d]: widening scale-up" + + " candidates and maxing out the lag-amplification multiplier.", + supervisorId, metrics.getAggregateLag(), WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION * 100, + config.getCriticalLagThreshold() + ); + } + // Start with the current task count as optimal int optimalTaskCount = currentTaskCount; CostResult optimalCost = costFunction.computeCost(metrics, currentTaskCount, config); @@ -270,7 +310,7 @@ int computeOptimalTaskCount(CostMetrics metrics) int startIndex = 0; int endIndex = validTaskCounts.length - 1; - if (config.isUseTaskCountBoundariesOnScaleUp()) { + if (config.isUseTaskCountBoundariesOnScaleUp() && !criticalLag) { int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount); endIndex = currentTaskCountIndex >= 0 ? Math.min(currentTaskCountIndex + BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex) @@ -284,6 +324,12 @@ int computeOptimalTaskCount(CostMetrics metrics) : startIndex; } + // Emergency (tier 2) lag skips the argmin search entirely: evaluate only the maximum valid task count. + if (emergencyLag) { + startIndex = validTaskCounts.length - 1; + endIndex = validTaskCounts.length - 1; + } + for (int i = startIndex; i <= endIndex; ++i) { final int taskCount = validTaskCounts[i]; CostResult costResult = costFunction.computeCost(metrics, taskCount, config); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index fa4d547eae49..0d84d45a19f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -69,6 +69,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig private final Duration minScaleDownDelay; private final boolean scaleDownDuringTaskRolloverOnly; private final boolean usePollIdleRatio; + private final Long criticalLagThreshold; /** * Creates a new CostBasedAutoScalerConfig instance. @@ -93,7 +94,8 @@ public CostBasedAutoScalerConfig( @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay, @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay, @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly, - @Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio + @Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio, + @Nullable @JsonProperty("criticalLagThreshold") Long criticalLagThreshold ) { this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false; @@ -123,6 +125,12 @@ public CostBasedAutoScalerConfig( this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY); this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false); this.usePollIdleRatio = Configs.valueOrDefault(usePollIdleRatio, true); + this.criticalLagThreshold = criticalLagThreshold; + + Preconditions.checkArgument( + criticalLagThreshold == null || criticalLagThreshold > 0, + "criticalLagThreshold must be > 0" + ); if (this.enableTaskAutoScaler) { Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true"); @@ -305,6 +313,24 @@ public boolean isUsePollIdleRatio() return usePollIdleRatio; } + /** + * Aggregate (sum-across-partitions) lag threshold driving a two-tier SLA-critical fast path, + * relative to {@link CostMetrics#getAggregateLag()}: + * + * {@code null} disables the feature. + */ + @JsonProperty + @Nullable + public Long getCriticalLagThreshold() + { + return criticalLagThreshold; + } + @Override public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { @@ -338,7 +364,8 @@ public boolean equals(Object o) && scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly && usePollIdleRatio == that.usePollIdleRatio && Objects.equals(taskCountStart, that.taskCountStart) - && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio); + && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio) + && Objects.equals(criticalLagThreshold, that.criticalLagThreshold); } @Override @@ -360,7 +387,8 @@ public int hashCode() minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly, - usePollIdleRatio + usePollIdleRatio, + criticalLagThreshold ); } @@ -384,6 +412,7 @@ public String toString() ", minScaleDownDelay=" + minScaleDownDelay + ", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly + ", usePollIdleRatio=" + usePollIdleRatio + + ", criticalLagThreshold=" + criticalLagThreshold + '}'; } @@ -409,6 +438,7 @@ public static class Builder private Duration minScaleDownDelay; private Boolean scaleDownDuringTaskRolloverOnly; private Boolean usePollIdleRatio; + private Long criticalLagThreshold; private Builder() { @@ -498,6 +528,12 @@ public Builder usePollIdleRatio(boolean usePollIdleRatio) return this; } + public Builder criticalLagThreshold(Long criticalLagThreshold) + { + this.criticalLagThreshold = criticalLagThreshold; + return this; + } + public Builder useTaskCountBoundariesOnScaleUp(boolean useTaskCountBoundariesOnScaleUp) { this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp; @@ -528,7 +564,8 @@ public CostBasedAutoScalerConfig build() minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly, - usePollIdleRatio + usePollIdleRatio, + criticalLagThreshold ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index beaf0a5b9d55..c0d330edb526 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -42,6 +42,26 @@ public class WeightedCostFunction */ static final double LAG_AMPLIFICATION_MULTIPLIER = 0.3; + /** + * Amplification multiplier used once aggregate lag crosses {@link #CRITICAL_LAG_TIER1_FRACTION} of + * {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} (tier 1 of the critical-lag fast path). + */ + static final double CRITICAL_LAG_AMPLIFICATION_MULTIPLIER = 6.0; + + /** + * Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 1 of the + * critical-lag fast path engages: amplification maxes out at {@link #CRITICAL_LAG_AMPLIFICATION_MULTIPLIER} + * and the scale-up candidate boundary is bypassed. + */ + static final double CRITICAL_LAG_TIER1_FRACTION = 0.75; + + /** + * Fraction of {@link CostBasedAutoScalerConfig#getCriticalLagThreshold()} at which tier 2 of the + * critical-lag fast path engages: the cost-minimization search is skipped entirely and the task + * count jumps straight to the maximum. + */ + static final double CRITICAL_LAG_TIER2_FRACTION = 0.95; + /** * Exponent (< 1) for sublinear busy redistribution in the idle projection: * busy grows as {@code (currentTaskCount / proposedTaskCount)^EXPONENT}, not linearly. @@ -108,12 +128,19 @@ public CostResult computeCost( // Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks. // In case of increasing lag, we apply an amplification factor to reflect the urgency of addressing lag. // Caution: we rely only on the metrics, the real issues may be absolutely different, up to hardware failure. + // Once aggregate lag crosses CRITICAL_LAG_TIER1_FRACTION of criticalLagThreshold, the multiplier is + // maxed out at CRITICAL_LAG_AMPLIFICATION_MULTIPLIER (vs the default 0.3). final double lagRecoveryTime; if (metrics.getAggregateLag() <= 0) { lagRecoveryTime = 0; } else { final double lagPerPartition = metrics.getAggregateLag() / metrics.getPartitionCount(); - final double amplification = Math.max(1.0, 1.0 + LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition)); + final Long criticalLagThreshold = config.getCriticalLagThreshold(); + final boolean criticalLag = criticalLagThreshold != null + && metrics.getAggregateLag() >= criticalLagThreshold * CRITICAL_LAG_TIER1_FRACTION; + + final double amplificationMultiplier = criticalLag ? CRITICAL_LAG_AMPLIFICATION_MULTIPLIER : LAG_AMPLIFICATION_MULTIPLIER; + final double amplification = Math.max(1.0, 1.0 + amplificationMultiplier * Math.log(lagPerPartition)); final double adjustedProcessingRate = Math.max(avgProcessingRate, MIN_PROCESSING_RATE); lagRecoveryTime = metrics.getAggregateLag() * amplification / (proposedTaskCount * adjustedProcessingRate); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java index 6d09221b410a..c802840cf4df 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java @@ -54,7 +54,8 @@ public void testSerdeWithAllProperties() throws Exception + " \"minScaleUpDelay\": \"PT5M\",\n" + " \"minScaleDownDelay\": \"PT10M\",\n" + " \"scaleDownDuringTaskRolloverOnly\": true,\n" - + " \"usePollIdleRatio\": false\n" + + " \"usePollIdleRatio\": false,\n" + + " \"criticalLagThreshold\": 500000\n" + "}"; final CostBasedAutoScalerConfig config = mapper.readValue(json, CostBasedAutoScalerConfig.class); @@ -74,6 +75,7 @@ public void testSerdeWithAllProperties() throws Exception Assert.assertFalse(config.isUsePollIdleRatio()); Assert.assertFalse(config.isUseTaskCountBoundariesOnScaleUp()); Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown()); + Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold()); // Test serialization back to JSON final String serialized = mapper.writeValueAsString(config); @@ -112,6 +114,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertTrue(config.isUseTaskCountBoundariesOnScaleDown()); Assert.assertNull(config.getTaskCountStart()); Assert.assertNull(config.getStopTaskCountRatio()); + Assert.assertNull(config.getCriticalLagThreshold()); } @Test @@ -221,6 +224,7 @@ public void testBuilder() .minScaleDownDelay(Duration.standardMinutes(10)) .scaleDownDuringTaskRolloverOnly(true) .usePollIdleRatio(false) + .criticalLagThreshold(500000L) .build(); Assert.assertTrue(config.getEnableTaskAutoScaler()); @@ -238,6 +242,18 @@ public void testBuilder() Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay()); Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly()); Assert.assertFalse(config.isUsePollIdleRatio()); + Assert.assertEquals(Long.valueOf(500000), config.getCriticalLagThreshold()); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidation_ZeroCriticalLagThreshold() + { + CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(5) + .criticalLagThreshold(0L) + .enableTaskAutoScaler(true) + .build(); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index d773e20bfcb3..696160f2d90e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -258,6 +258,62 @@ public void testComputeOptimalTaskCountLimitsTaskCountJumps() ); } + @Test + public void testCriticalLagThresholdBypassesScaleUpBoundary() + { + // aggregateLag = 100_000 * 100 = 10,000,000. With threshold=12,000,000: tier1=9,000,000 (crossed), + // tier2=11,400,000 (not crossed), so this exercises tier1 (boundary bypass) without triggering + // tier2's emergency jump-to-max. + final CostBasedAutoScalerConfig boundedScaleUpConfig = CostBasedAutoScalerConfig + .builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .useTaskCountBoundariesOnScaleUp(true) + .criticalLagThreshold(12_000_000L) + .build(); + final CostBasedAutoScaler scaler = createAutoScaler(boundedScaleUpConfig); + + Assert.assertEquals( + "Critical lag should bypass the scale-up boundary and jump straight to the argmin", + 100, + scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 100, 0.25)) + ); + + // Below the threshold, the boundary still applies as usual. + Assert.assertEquals( + "Below criticalLagThreshold, the scale-up boundary still limits candidates", + 13, + scaler.computeOptimalTaskCount(createMetrics(10.0, 10, 100, 0.25)) + ); + } + + @Test + public void testEmergencyLagJumpsStraightToMaxTaskCount() + { + // aggregateLag = 100_000 * 500 = 50,000,000. With threshold=10,000,000: tier2=9,500,000 is + // comfortably crossed, so the argmin search is skipped entirely in favor of the maximum task count. + final CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig + .builder() + .taskCountMax(500) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.1) + .idleWeight(0.9) + .criticalLagThreshold(10_000_000L) + .build(); + final CostBasedAutoScaler scaler = createAutoScaler(config); + + // Idle-heavy weights would normally argue for scaling down, but emergency lag overrides that entirely. + Assert.assertEquals( + "Emergency lag should jump straight to the maximum task count regardless of idle-favoring weights", + 500, + scaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 500, 0.9)) + ); + } + @Test public void testExtractPollIdleRatio() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java index 6802692ade49..da4c68dbbf4b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java @@ -357,6 +357,71 @@ public void testLagAmplificationAppliedUnconditionally() Assert.assertEquals("Lag amplification should increase lag recovery time", expected, costWithAmp, 0.0001); } + @Test + public void testCriticalLagThresholdMaxesOutAmplificationMultiplier() + { + int currentTaskCount = 10; + int proposedTaskCount = 10; + int partitionCount = 10; + double avgPartitionLag = 150.0; + double aggregateLag = avgPartitionLag * partitionCount; + + CostMetrics metrics = createMetrics(avgPartitionLag, currentTaskCount, partitionCount, 0.1); + + // aggregateLag sits at exactly tier1Fraction (75%) of this threshold. + long tier1Threshold = (long) (aggregateLag / WeightedCostFunction.CRITICAL_LAG_TIER1_FRACTION); + + CostBasedAutoScalerConfig noThreshold = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .build(); + CostBasedAutoScalerConfig belowTier1 = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .criticalLagThreshold(tier1Threshold + 100) + .build(); + CostBasedAutoScalerConfig atTier1 = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .criticalLagThreshold(tier1Threshold) + .build(); + + double costBelowTier1 = costFunction.computeCost(metrics, proposedTaskCount, belowTier1).totalCost(); + Assert.assertEquals( + "Below tier1, amplification uses the default multiplier", + costFunction.computeCost(metrics, proposedTaskCount, noThreshold).totalCost(), + costBelowTier1, + 0.0001 + ); + + double lagPerPartition = aggregateLag / partitionCount; + double criticalAmplification = + 1.0 + WeightedCostFunction.CRITICAL_LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition); + double expectedCriticalCost = + aggregateLag * criticalAmplification / (proposedTaskCount * WeightedCostFunction.MIN_PROCESSING_RATE); + + double costAtTier1 = costFunction.computeCost(metrics, proposedTaskCount, atTier1).totalCost(); + Assert.assertEquals( + "At/above tier1, the amplification multiplier maxes out at CRITICAL_LAG_AMPLIFICATION_MULTIPLIER", + expectedCriticalCost, + costAtTier1, + 0.0001 + ); + Assert.assertTrue( + "Critical-lag cost should exceed the default-multiplier cost for the same lag", + costAtTier1 > costBelowTier1 + ); + } + @Test public void testAmplificationGrowsWithLag() { From d555233117f628d4f3267870de6b79795bf8b022 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Fri, 3 Jul 2026 16:20:52 +0300 Subject: [PATCH 2/2] Checkstyle --- .../supervisor/autoscaler/CostBasedAutoScaler.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index bdf7c3391b66..631881db7a75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -141,10 +141,7 @@ private ServiceMetricEvent.Builder getMetricBuilder() public void start() { autoscalerExecutor.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask( - this::computeTaskCountForScaleAction, () -> { - }, emitter - ), + supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter), config.getScaleActionPeriodMillis(), config.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS