Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
.taskCountMax(100)
.taskCountStart(initialTaskCount)
.scaleActionPeriodMillis(1900)
.minTriggerScaleActionFrequencyMillis(2000)
// Weight configuration: strongly favor lag reduction over idle time
.lagWeight(0.9)
.idleWeight(0.1)
Expand Down Expand Up @@ -163,7 +162,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
.taskCountMax(50)
.taskCountStart(lowInitialTaskCount)
.scaleActionPeriodMillis(500)
.minTriggerScaleActionFrequencyMillis(1000)
.lagWeight(0.8)
.idleWeight(0.2)
.build();
Expand Down Expand Up @@ -212,7 +210,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp_withUtili
.taskCountMax(50)
.taskCountStart(lowInitialTaskCount)
.scaleActionPeriodMillis(500)
.minTriggerScaleActionFrequencyMillis(1000)
.lagWeight(0.8)
.idleWeight(0.2)
.usePollIdleRatio(false)
Expand Down Expand Up @@ -258,7 +255,6 @@ public void test_autoScaler_scalesUpAndDown_withSlowPublish()
.taskCountMax(4)
.lagWeight(0.5)
.idleWeight(0.5)
.minTriggerScaleActionFrequencyMillis(10L)
.scaleActionPeriodMillis(10L)
.minScaleDownDelay(Duration.standardSeconds(1))
.build();
Expand Down Expand Up @@ -336,7 +332,6 @@ void test_scaleDownDuringTaskRollover()
.taskCountMin(1)
.taskCountMax(10)
.scaleActionPeriodMillis(100)
.minTriggerScaleActionFrequencyMillis(100)
// High idle weight ensures scale-down when tasks are mostly idle (little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class);

public static final String LAG_WEIGHT_METRIC = "task/autoScaler/costBased/lagWeight";
public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost";
public static final String IDLE_WEIGHT_METRIC = "task/autoScaler/costBased/idleWeight";
public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount";
public static final String INVALID_METRICS_COUNT = "task/autoScaler/costBased/invalidMetrics";
public static final String AVG_PROCESSING_RATE_METRIC = "task/autoScaler/costBased/avgProcessingRate";
public static final String AVG_POLL_IDLE_RATIO = "task/autoScaler/costBased/avgPollIdleRatio";
public static final String IDLE_RATIO_ESTIMATED_FROM_RATE = "task/autoScaler/costBased/idleRatioFromRate";

/**
* Maximum number of candidate task counts to evaluate above or below the current task count
Expand Down Expand Up @@ -233,7 +236,6 @@ int computeOptimalTaskCount(CostMetrics metrics)

final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
partitionCount,
currentTaskCount,
config.getTaskCountMin(),
config.getTaskCountMax()
);
Expand All @@ -247,6 +249,7 @@ int computeOptimalTaskCount(CostMetrics metrics)
// Start with the current task count as optimal
int optimalTaskCount = currentTaskCount;
CostResult optimalCost = costFunction.computeCost(metrics, currentTaskCount, config);
final double idleRatioEstimatedFromRate = metrics.estimateIdleRatioFromProcessingRate();

log.info(
"Computing optimal taskCount for supervisor[%s] with metrics:"
Expand All @@ -257,7 +260,7 @@ int computeOptimalTaskCount(CostMetrics metrics)
metrics.getAvgPartitionLag(),
metrics.getAvgProcessingRate(),
metrics.getMaxObservedRate(),
metrics.estimateIdleRatioFromProcessingRate(),
idleRatioEstimatedFromRate,
metrics.getPollIdleRatio(),
config.getLagWeight(),
config.getIdleWeight()
Expand Down Expand Up @@ -297,10 +300,21 @@ int computeOptimalTaskCount(CostMetrics metrics)
}

emitter.emit(getMetricBuilder().setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) optimalTaskCount));
emitter.emit(getMetricBuilder().setMetric(LAG_WEIGHT_METRIC, optimalCost.lagCost()));
emitter.emit(getMetricBuilder().setMetric(IDLE_WEIGHT_METRIC, optimalCost.idleCost()));
emitter.emit(getMetricBuilder().setMetric(AVG_PROCESSING_RATE_METRIC, metrics.getAvgProcessingRate()));
emitter.emit(getMetricBuilder().setMetric(AVG_POLL_IDLE_RATIO, metrics.getPollIdleRatio()));
emitter.emit(getMetricBuilder().setMetric(LAG_WEIGHT_METRIC, config.getLagWeight()));
emitter.emit(getMetricBuilder().setMetric(IDLE_WEIGHT_METRIC, config.getIdleWeight()));
emitter.emit(getMetricBuilder().setMetric(LAG_COST_METRIC, optimalCost.lagCost()));
emitter.emit(getMetricBuilder().setMetric(IDLE_COST_METRIC, optimalCost.idleCost()));

// Emit avg rate and idle metrics only if they are available
if (metrics.getAvgProcessingRate() >= 0) {
emitter.emit(getMetricBuilder().setMetric(AVG_PROCESSING_RATE_METRIC, metrics.getAvgProcessingRate()));
}
if (metrics.getPollIdleRatio() >= 0) {
emitter.emit(getMetricBuilder().setMetric(AVG_POLL_IDLE_RATIO, metrics.getPollIdleRatio()));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also would like to request you to emit estimateIdleRatioFromProcessingRate() as separate metric for better tracking.

if (idleRatioEstimatedFromRate >= 0) {
emitter.emit(getMetricBuilder().setMetric(IDLE_RATIO_ESTIMATED_FROM_RATE, idleRatioEstimatedFromRate));
}

if (optimalTaskCount != currentTaskCount) {
log.info(
Expand All @@ -317,17 +331,16 @@ int computeOptimalTaskCount(CostMetrics metrics)
* Generates valid task counts by converting every possible partitions-per-task ratio
* into a task count and filtering by configured min/max task count bounds.
*
* @return list of valid task counts within bounds
* @return array of valid task counts within bounds
*/
@SuppressWarnings({"ReassignedVariable"})
static int[] computeValidTaskCounts(
int partitionCount,
int currentTaskCount,
int taskCountMin,
int taskCountMax
)
{
if (partitionCount <= 0 || currentTaskCount <= 0) {
if (partitionCount <= 0) {
return new int[]{};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;

Expand All @@ -45,18 +44,15 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class CostBasedAutoScalerConfig implements AutoScalerConfig
{
private static final EmittingLogger LOG = new EmittingLogger(CostBasedAutoScalerConfig.class);

static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 minutes
static final double DEFAULT_LAG_WEIGHT = 0.4;
static final double DEFAULT_IDLE_WEIGHT = 0.6;
static final Duration DEFAULT_MIN_SCALE_DELAY = Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
static final Duration DEFAULT_MIN_SCALE_UP_DELAY = Duration.standardMinutes(10);
static final Duration DEFAULT_MIN_SCALE_DOWN_DELAY = Duration.standardMinutes(30);

private final boolean enableTaskAutoScaler;
private final int taskCountMax;
private final int taskCountMin;
private final Integer taskCountStart;
private final long minTriggerScaleActionFrequencyMillis;
private final Double stopTaskCountRatio;
private final long scaleActionPeriodMillis;

Expand All @@ -72,17 +68,13 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig

/**
* Creates a new CostBasedAutoScalerConfig instance.
* <p>
* Note: useTaskCountBoundaries and highLagThreshold are kept for backward compatibility,
* but effectively they are removed.
*/
@JsonCreator
public CostBasedAutoScalerConfig(
@JsonProperty("taskCountMax") Integer taskCountMax,
@JsonProperty("taskCountMin") Integer taskCountMin,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
@Nullable @JsonProperty("taskCountStart") Integer taskCountStart,
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis,
@Nullable @JsonProperty("stopTaskCountRatio") Double stopTaskCountRatio,
@Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
@Nullable @JsonProperty("lagWeight") Double lagWeight,
Expand All @@ -96,18 +88,9 @@ public CostBasedAutoScalerConfig(
@Nullable @JsonProperty("usePollIdleRatio") Boolean usePollIdleRatio
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;

// Timing configuration with defaults
this.scaleActionPeriodMillis = scaleActionPeriodMillis != null
? scaleActionPeriodMillis
: DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
this.minTriggerScaleActionFrequencyMillis = Configs.valueOrDefault(
minTriggerScaleActionFrequencyMillis,
DEFAULT_SCALE_ACTION_PERIOD_MILLIS
);
this.enableTaskAutoScaler = Configs.valueOrDefault(enableTaskAutoScaler, false);
this.scaleActionPeriodMillis = Configs.valueOrDefault(scaleActionPeriodMillis, DEFAULT_MIN_SCALE_UP_DELAY.getMillis());

// Cost function weights with defaults
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
this.optimalTaskIdleRatio = Configs.valueOrDefault(
Expand All @@ -116,11 +99,8 @@ public CostBasedAutoScalerConfig(
);
this.useTaskCountBoundariesOnScaleUp = Configs.valueOrDefault(useTaskCountBoundariesOnScaleUp, false);
this.useTaskCountBoundariesOnScaleDown = Configs.valueOrDefault(useTaskCountBoundariesOnScaleDown, true);
this.minScaleUpDelay = Configs.valueOrDefault(
minScaleUpDelay,
Duration.millis(this.minTriggerScaleActionFrequencyMillis)
);
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY);
this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, DEFAULT_MIN_SCALE_UP_DELAY);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve the legacy cooldown fallback

Existing cost-based supervisor specs can still contain minTriggerScaleActionFrequencyMillis, and the shared AutoScalerConfig contract/docs say minScaleUpDelay and minScaleDownDelay fall back to it when the Duration fields are absent. This PR removes the creator parameter and always defaults minScaleUpDelay to DEFAULT_MIN_SCALE_UP_DELAY, while the getter now returns -1, so a persisted spec that only set the deprecated field is accepted but silently changes its scale-up cooldown on upgrade. Please continue binding the legacy value and using it as the fallback until the field is actually removed, or migrate the public contract/docs and persisted-spec behavior together.

this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DOWN_DELAY);
this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
this.usePollIdleRatio = Configs.valueOrDefault(usePollIdleRatio, true);

Expand Down Expand Up @@ -204,7 +184,7 @@ public Integer getTaskCountStart()
@JsonProperty
public long getMinTriggerScaleActionFrequencyMillis()
{
return minTriggerScaleActionFrequencyMillis;
return -1;
}

@Override
Expand Down Expand Up @@ -308,7 +288,7 @@ public boolean isUsePollIdleRatio()
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
return new CostBasedAutoScaler((SeekableStreamSupervisor) supervisor, this, spec, emitter);
return new CostBasedAutoScaler((SeekableStreamSupervisor<?, ?, ?>) supervisor, this, spec, emitter);
}

@Override
Expand All @@ -326,7 +306,6 @@ public boolean equals(Object o)
return enableTaskAutoScaler == that.enableTaskAutoScaler
&& taskCountMax == that.taskCountMax
&& taskCountMin == that.taskCountMin
&& minTriggerScaleActionFrequencyMillis == that.minTriggerScaleActionFrequencyMillis
&& scaleActionPeriodMillis == that.scaleActionPeriodMillis
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
Expand All @@ -349,7 +328,6 @@ public int hashCode()
taskCountMax,
taskCountMin,
taskCountStart,
minTriggerScaleActionFrequencyMillis,
stopTaskCountRatio,
scaleActionPeriodMillis,
lagWeight,
Expand All @@ -372,7 +350,6 @@ public String toString()
", taskCountMax=" + taskCountMax +
", taskCountMin=" + taskCountMin +
", taskCountStart=" + taskCountStart +
", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis +
", stopTaskCountRatio=" + stopTaskCountRatio +
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", lagWeight=" + lagWeight +
Expand All @@ -397,7 +374,6 @@ public static class Builder
private Integer taskCountMin;
private Boolean enableTaskAutoScaler = true;
private Integer taskCountStart;
private Long minTriggerScaleActionFrequencyMillis;
private Double stopTaskCountRatio;
private Long scaleActionPeriodMillis;
private Double lagWeight;
Expand Down Expand Up @@ -438,12 +414,6 @@ public Builder taskCountStart(Integer taskCountStart)
return this;
}

public Builder minTriggerScaleActionFrequencyMillis(long minTriggerScaleActionFrequencyMillis)
{
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis;
return this;
}

public Builder stopTaskCountRatio(Double stopTaskCountRatio)
{
this.stopTaskCountRatio = stopTaskCountRatio;
Expand Down Expand Up @@ -517,7 +487,6 @@ public CostBasedAutoScalerConfig build()
taskCountMin,
enableTaskAutoScaler,
taskCountStart,
minTriggerScaleActionFrequencyMillis,
stopTaskCountRatio,
scaleActionPeriodMillis,
lagWeight,
Expand Down
Loading
Loading