From 66786ec495989e6978e7b3afba5f73072c827657 Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Fri, 27 Mar 2026 16:16:10 -0700 Subject: [PATCH 1/8] Weighted Query Laning Strategy --- .../druid/server/QueryLaningStrategy.java | 4 +- .../WeightedQueryLaningStrategy.java | 226 ++++++++++++++++ .../druid/server/QuerySchedulerTest.java | 98 +++++++ .../WeightedQueryLaningStrategyTest.java | 254 ++++++++++++++++++ 4 files changed, 581 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java create mode 100644 server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java index d601384c6659..2e95f9d1b3a6 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -28,6 +28,7 @@ import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; import org.apache.druid.server.scheduling.ManualQueryLaningStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; +import org.apache.druid.server.scheduling.WeightedQueryLaningStrategy; import java.util.Optional; import java.util.Set; @@ -37,7 +38,8 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class), @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class), - @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class) + @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class), + @JsonSubTypes.Type(name = "weighted", value = WeightedQueryLaningStrategy.class) }) public interface QueryLaningStrategy { diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java new file mode 100644 index 000000000000..e21e117e5440 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.joda.time.base.AbstractInterval; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Query laning strategy that scores queries by how many configured thresholds they breach, + * then assigns them to the most restrictive matching lane. This provides more nuanced lane + * assignment than {@link HiLoQueryLaningStrategy}, which uses a binary high/low split. + * + *

Configuration example: + *

{@code
+ * {
+ *   "strategy": "weighted",
+ *   "periodThreshold": "P1M",
+ *   "segmentCountThreshold": 1000,
+ *   "lanes": {
+ *     "low": { "minScore": 1, "maxPercent": 30 },
+ *     "very-low": { "minScore": 3, "maxPercent": 10 }
+ *   }
+ * }
+ * }
+ */ +public class WeightedQueryLaningStrategy implements QueryLaningStrategy +{ + private static final int DEFAULT_SEGMENT_THRESHOLD = Integer.MAX_VALUE; + + private final int segmentCountThreshold; + @Nullable + private final Duration periodThreshold; + @Nullable + private final Duration durationThreshold; + @Nullable + private final Duration segmentRangeThreshold; + private final Map lanes; + + @JsonCreator + public WeightedQueryLaningStrategy( + @JsonProperty("periodThreshold") @Nullable String periodThresholdString, + @JsonProperty("durationThreshold") @Nullable String durationThresholdString, + @JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold, + @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString, + @JsonProperty("lanes") Map lanes + ) + { + this.segmentCountThreshold = segmentCountThreshold == null ? DEFAULT_SEGMENT_THRESHOLD : segmentCountThreshold; + this.periodThreshold = periodThresholdString == null + ? null + : new Period(periodThresholdString).toDurationFrom(DateTimes.nowUtc()); + this.durationThreshold = durationThresholdString == null + ? null + : new Period(durationThresholdString).toStandardDuration(); + this.segmentRangeThreshold = segmentRangeThresholdString == null + ? null + : new Period(segmentRangeThresholdString).toStandardDuration(); + + Preconditions.checkArgument( + segmentCountThreshold != null || periodThreshold != null || durationThreshold != null || segmentRangeThreshold != null, + "At least one of periodThreshold, durationThreshold, segmentCountThreshold, or segmentRangeThreshold must be set" + ); + Preconditions.checkArgument( + lanes != null && !lanes.isEmpty(), + "At least one lane must be defined" + ); + this.lanes = lanes; + } + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + Object2IntMap limits = new Object2IntArrayMap<>(lanes.size()); + for (Map.Entry entry : lanes.entrySet()) { + limits.put(entry.getKey(), computeLimitFromPercent(totalLimit, entry.getValue().maxPercent)); + } + return limits; + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + final String existingLane = query.getQuery().context().getLane(); + if (existingLane != null) { + return Optional.of(existingLane); + } + + int score = computeScore(query.getQuery(), segments); + if (score == 0) { + return Optional.empty(); + } + + // Find the lane with the highest minScore that this query meets + String bestLane = null; + int bestMinScore = 0; + for (Map.Entry entry : lanes.entrySet()) { + int minScore = entry.getValue().minScore; + if (score >= minScore && minScore > bestMinScore) { + bestLane = entry.getKey(); + bestMinScore = minScore; + } + } + return Optional.ofNullable(bestLane); + } + + private int computeScore(Query query, Set segments) + { + int score = 0; + + if (periodThreshold != null) { + final DateTime cutoff = DateTimes.nowUtc().minus(periodThreshold); + if (query.getIntervals().stream().anyMatch(interval -> interval.getStart().isBefore(cutoff))) { + score++; + } + } + + if (durationThreshold != null && query.getDuration().isLongerThan(durationThreshold)) { + score++; + } + + if (segments.size() > segmentCountThreshold) { + score++; + } + + if (segmentRangeThreshold != null) { + long segmentRangeMs = segments.stream() + .filter(s -> s.getSegmentDescriptor() != null) + .map(s -> s.getSegmentDescriptor().getInterval()) + .distinct() + .mapToLong(AbstractInterval::toDurationMillis) + .sum(); + if (new Duration(segmentRangeMs).isLongerThan(segmentRangeThreshold)) { + score++; + } + } + + return score; + } + + public static class LaneConfig + { + @JsonProperty + private final int minScore; + @JsonProperty + private final int maxPercent; + + @JsonCreator + public LaneConfig( + @JsonProperty("minScore") int minScore, + @JsonProperty("maxPercent") int maxPercent + ) + { + Preconditions.checkArgument(minScore > 0, "minScore must be > 0, got [%s]", minScore); + Preconditions.checkArgument( + maxPercent > 0 && maxPercent <= 100, + "maxPercent must be in the range 1 to 100, got [%s]", maxPercent + ); + this.minScore = minScore; + this.maxPercent = maxPercent; + } + + public int getMinScore() + { + return minScore; + } + + public int getMaxPercent() + { + return maxPercent; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LaneConfig that = (LaneConfig) o; + return minScore == that.minScore && maxPercent == that.maxPercent; + } + + @Override + public int hashCode() + { + return Objects.hash(minScore, maxPercent); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index ed602678a82c..c0ca0942b21f 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -68,6 +68,7 @@ import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; +import org.apache.druid.server.scheduling.WeightedQueryLaningStrategy; import org.easymock.EasyMock; import org.hamcrest.text.StringContainsInOrder; import org.junit.After; @@ -606,6 +607,103 @@ public void testConfigManualPercent() Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } + @Test + public void testConfigWeighted() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.setProperty(propertyPrefix + ".numThreads", "10"); + properties.setProperty(propertyPrefix + ".laning.strategy", "weighted"); + properties.setProperty(propertyPrefix + ".laning.segmentCountThreshold", "1"); + properties.setProperty(propertyPrefix + ".laning.lanes", "{\"low\": {\"minScore\": 1, \"maxPercent\": 30}}"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(3, scheduler.getLaneAvailableCapacity("low")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); + } + + @Test + public void testWeightedLaneAssignment() + { + ObservableQueryScheduler weightedScheduler = new ObservableQueryScheduler( + TEST_HI_CAPACITY, + ManualQueryPrioritizationStrategy.INSTANCE, + new WeightedQueryLaningStrategy( + null, + null, + 1, + null, + ImmutableMap.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + ), + SERVER_CONFIG_WITH_TOTAL + ); + + // Query with 2 segments exceeds segmentCountThreshold=1 → "low" lane + Query query = weightedScheduler.prioritizeAndLaneQuery( + QueryPlus.wrap(makeDefaultQuery()), + ImmutableSet.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ) + ); + Assert.assertEquals("low", query.context().getLane()); + + // Query with 0 segments → no lane + Query noLaneQuery = weightedScheduler.prioritizeAndLaneQuery( + QueryPlus.wrap(makeDefaultQuery()), + ImmutableSet.of() + ); + Assert.assertNull(noLaneQuery.context().getLane()); + } + + @Test + public void testWeightedFailsWhenOutOfLaneCapacity() + { + ObservableQueryScheduler weightedScheduler = new ObservableQueryScheduler( + TEST_HI_CAPACITY, + ManualQueryPrioritizationStrategy.INSTANCE, + new WeightedQueryLaningStrategy( + null, + null, + 1, + null, + ImmutableMap.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + ), + SERVER_CONFIG_WITH_TOTAL + ); + + ImmutableSet manySegments = ImmutableSet.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ); + + // Fill the low lane (capacity = ceil(5 * 40/100) = 2) + Query q1 = weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments); + Yielders.each(weightedScheduler.run(q1, Sequences.empty())); + Assert.assertEquals(1, weightedScheduler.getLaneAvailableCapacity("low")); + + Query q2 = weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments); + Yielders.each(weightedScheduler.run(q2, Sequences.empty())); + Assert.assertEquals(0, weightedScheduler.getLaneAvailableCapacity("low")); + + // Third should fail with 429 + Assert.assertThrows( + QueryCapacityExceededException.class, + () -> Yielders.each( + weightedScheduler.run( + weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments), + Sequences.empty() + ) + ) + ); + } + private void maybeDelayNextIteration(int i) throws InterruptedException { if (i > 0 && i % 10 == 0) { diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java new file mode 100644 index 000000000000..8fd0741ce7df --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.QueryLaningStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class WeightedQueryLaningStrategyTest +{ + private static final Map TWO_LANES = ImmutableMap.of( + "low", new WeightedQueryLaningStrategy.LaneConfig(1, 30), + "very-low", new WeightedQueryLaningStrategy.LaneConfig(3, 10) + ); + + private Druids.TimeseriesQueryBuilder queryBuilder; + + @Before + public void setup() + { + queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.of("2020-01-01/2020-01-02"))) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + } + + @Test + public void testGetLaneLimits() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10, null); + + Object2IntMap limits = strategy.getLaneLimits(100); + Assert.assertEquals(2, limits.size()); + Assert.assertEquals(30, limits.getInt("low")); + Assert.assertEquals(10, limits.getInt("very-low")); + } + + @Test + public void testComputeLane_noViolations() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); + TimeseriesQuery query = queryBuilder.build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()); + Assert.assertFalse(lane.isPresent()); + } + + @Test + public void testComputeLane_oneViolation_segmentCount() + { + // segmentCountThreshold=1, query has 5 segments → score 1 → "low" + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 1, null); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testComputeLane_multipleViolations_higherLane() + { + // segmentCountThreshold=1 + durationThreshold very short → score >= 2 + // With a wide interval query, durationThreshold breached too + WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( + null, + "PT1S", // 1 second duration threshold — query covers 1 day, will breach + 1, // segment count threshold — 5 segments will breach + null, + TWO_LANES + ); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + // score=2, meets "low" (minScore=1) but not "very-low" (minScore=3) + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testComputeLane_allViolations_mostRestrictiveLane() + { + // All 4 thresholds set very low — all will breach + WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( + "PT1S", // period threshold — query interval in 2020 is far in the past + "PT1S", // duration threshold — 1 day query > 1 second + 1, // segment count threshold — 5 > 1 + "PT1S", // segment range threshold — will breach with segments spanning time + TWO_LANES + ); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + // score=4: all 4 thresholds breached → meets "very-low" (minScore=3) + Assert.assertEquals("very-low", lane.get()); + } + + @Test + public void testComputeLane_existingLane_preserved() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); + TimeseriesQuery query = queryBuilder + .context(ImmutableMap.of(QueryContexts.LANE_KEY, "custom")) + .build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("custom", lane.get()); + } + + @Test + public void testValidation_noThresholds() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> newStrategy(null, null, null, null) + ); + } + + @Test + public void testValidation_noLanes() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + ImmutableMap.of() + ) + ); + } + + @Test + public void testValidation_nullLanes() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + null + ) + ); + } + + @Test + public void testLaneConfig_invalidMinScore() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy.LaneConfig(0, 30) + ); + } + + @Test + public void testLaneConfig_invalidMaxPercent() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy.LaneConfig(1, 0) + ); + } + + @Test + public void testSerde() throws Exception + { + ObjectMapper mapper = TestHelper.makeJsonMapper(); + String json = "{\n" + + " \"strategy\": \"weighted\",\n" + + " \"segmentCountThreshold\": 1000,\n" + + " \"durationThreshold\": \"P1D\",\n" + + " \"lanes\": {\n" + + " \"low\": { \"minScore\": 1, \"maxPercent\": 30 },\n" + + " \"very-low\": { \"minScore\": 3, \"maxPercent\": 10 }\n" + + " }\n" + + "}"; + + QueryLaningStrategy deserialized = mapper.readValue(json, QueryLaningStrategy.class); + Assert.assertTrue(deserialized instanceof WeightedQueryLaningStrategy); + + Object2IntMap limits = deserialized.getLaneLimits(100); + Assert.assertEquals(30, limits.getInt("low")); + Assert.assertEquals(10, limits.getInt("very-low")); + } + + private static WeightedQueryLaningStrategy newStrategy( + String periodThreshold, + String durationThreshold, + Integer segmentCountThreshold, + String segmentRangeThreshold + ) + { + return new WeightedQueryLaningStrategy( + periodThreshold, + durationThreshold, + segmentCountThreshold, + segmentRangeThreshold, + TWO_LANES + ); + } + + private static Set makeSegments(int count) + { + Set segments = new HashSet<>(); + for (int i = 0; i < count; i++) { + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-01/2020-01-02"), "v1", i) + )); + } + return segments; + } +} From 2aaf8f6ffcc7840c89efbf7b9eb3e9703451b4aa Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Mon, 30 Mar 2026 14:36:02 -0700 Subject: [PATCH 2/8] improvements --- .../WeightedQueryLaningStrategy.java | 37 ++++++------ .../druid/server/QuerySchedulerTest.java | 12 ++-- .../WeightedQueryLaningStrategyTest.java | 59 ++++++++++++------- 3 files changed, 63 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java index e21e117e5440..0e5bdb9cd67b 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -64,33 +64,24 @@ public class WeightedQueryLaningStrategy implements QueryLaningStrategy private final int segmentCountThreshold; @Nullable - private final Duration periodThreshold; + private final Period periodThreshold; @Nullable private final Duration durationThreshold; @Nullable private final Duration segmentRangeThreshold; + + @JsonProperty private final Map lanes; @JsonCreator public WeightedQueryLaningStrategy( - @JsonProperty("periodThreshold") @Nullable String periodThresholdString, - @JsonProperty("durationThreshold") @Nullable String durationThresholdString, + @JsonProperty("periodThreshold") @Nullable String periodThreshold, + @JsonProperty("durationThreshold") @Nullable String durationThreshold, @JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold, - @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString, + @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThreshold, @JsonProperty("lanes") Map lanes ) { - this.segmentCountThreshold = segmentCountThreshold == null ? DEFAULT_SEGMENT_THRESHOLD : segmentCountThreshold; - this.periodThreshold = periodThresholdString == null - ? null - : new Period(periodThresholdString).toDurationFrom(DateTimes.nowUtc()); - this.durationThreshold = durationThresholdString == null - ? null - : new Period(durationThresholdString).toStandardDuration(); - this.segmentRangeThreshold = segmentRangeThresholdString == null - ? null - : new Period(segmentRangeThresholdString).toStandardDuration(); - Preconditions.checkArgument( segmentCountThreshold != null || periodThreshold != null || durationThreshold != null || segmentRangeThreshold != null, "At least one of periodThreshold, durationThreshold, segmentCountThreshold, or segmentRangeThreshold must be set" @@ -99,6 +90,15 @@ public WeightedQueryLaningStrategy( lanes != null && !lanes.isEmpty(), "At least one lane must be defined" ); + + this.segmentCountThreshold = segmentCountThreshold == null ? DEFAULT_SEGMENT_THRESHOLD : segmentCountThreshold; + this.periodThreshold = periodThreshold == null ? null : new Period(periodThreshold); + this.durationThreshold = durationThreshold == null + ? null + : new Period(durationThreshold).toStandardDuration(); + this.segmentRangeThreshold = segmentRangeThreshold == null + ? null + : new Period(segmentRangeThreshold).toStandardDuration(); this.lanes = lanes; } @@ -143,7 +143,8 @@ private int computeScore(Query query, Set segments int score = 0; if (periodThreshold != null) { - final DateTime cutoff = DateTimes.nowUtc().minus(periodThreshold); + final DateTime now = DateTimes.nowUtc(); + final DateTime cutoff = now.minus(periodThreshold.toDurationFrom(now)); if (query.getIntervals().stream().anyMatch(interval -> interval.getStart().isBefore(cutoff))) { score++; } @@ -174,9 +175,7 @@ private int computeScore(Query query, Set segments public static class LaneConfig { - @JsonProperty private final int minScore; - @JsonProperty private final int maxPercent; @JsonCreator @@ -194,11 +193,13 @@ public LaneConfig( this.maxPercent = maxPercent; } + @JsonProperty public int getMinScore() { return minScore; } + @JsonProperty public int getMaxPercent() { return maxPercent; diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index c0ca0942b21f..f27977930a20 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -80,7 +80,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -639,7 +641,7 @@ public void testWeightedLaneAssignment() null, 1, null, - ImmutableMap.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + Map.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) ), SERVER_CONFIG_WITH_TOTAL ); @@ -647,7 +649,7 @@ public void testWeightedLaneAssignment() // Query with 2 segments exceeds segmentCountThreshold=1 → "low" lane Query query = weightedScheduler.prioritizeAndLaneQuery( QueryPlus.wrap(makeDefaultQuery()), - ImmutableSet.of( + Set.of( EasyMock.createMock(SegmentServerSelector.class), EasyMock.createMock(SegmentServerSelector.class) ) @@ -657,7 +659,7 @@ public void testWeightedLaneAssignment() // Query with 0 segments → no lane Query noLaneQuery = weightedScheduler.prioritizeAndLaneQuery( QueryPlus.wrap(makeDefaultQuery()), - ImmutableSet.of() + Set.of() ); Assert.assertNull(noLaneQuery.context().getLane()); } @@ -673,12 +675,12 @@ public void testWeightedFailsWhenOutOfLaneCapacity() null, 1, null, - ImmutableMap.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + Map.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) ), SERVER_CONFIG_WITH_TOTAL ); - ImmutableSet manySegments = ImmutableSet.of( + Set manySegments = Set.of( EasyMock.createMock(SegmentServerSelector.class), EasyMock.createMock(SegmentServerSelector.class) ); diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java index 8fd0741ce7df..74950bfeb590 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -20,9 +20,6 @@ package org.apache.druid.server.scheduling; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.Intervals; @@ -40,13 +37,14 @@ import org.junit.Test; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; public class WeightedQueryLaningStrategyTest { - private static final Map TWO_LANES = ImmutableMap.of( + private static final Map TWO_LANES = Map.of( "low", new WeightedQueryLaningStrategy.LaneConfig(1, 30), "very-low", new WeightedQueryLaningStrategy.LaneConfig(3, 10) ); @@ -58,7 +56,7 @@ public void setup() { queryBuilder = Druids.newTimeseriesQueryBuilder() .dataSource("test") - .intervals(ImmutableList.of(Intervals.of("2020-01-01/2020-01-02"))) + .intervals(List.of(Intervals.of("2020-01-01/2020-01-02"))) .granularity(Granularities.DAY) .aggregators(new CountAggregatorFactory("count")); } @@ -79,14 +77,14 @@ public void testComputeLane_noViolations() { WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); TimeseriesQuery query = queryBuilder.build(); - Optional lane = strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); Assert.assertFalse(lane.isPresent()); } @Test public void testComputeLane_oneViolation_segmentCount() { - // segmentCountThreshold=1, query has 5 segments → score 1 → "low" + // segmentCountThreshold=1, query has 5 segments → score=1 → "low" WeightedQueryLaningStrategy strategy = newStrategy(null, null, 1, null); TimeseriesQuery query = queryBuilder.build(); Set segments = makeSegments(5); @@ -96,14 +94,14 @@ public void testComputeLane_oneViolation_segmentCount() } @Test - public void testComputeLane_multipleViolations_higherLane() + public void testComputeLane_twoViolations_matchesLowerMinScore() { - // segmentCountThreshold=1 + durationThreshold very short → score >= 2 - // With a wide interval query, durationThreshold breached too + // segmentCountThreshold=1 + durationThreshold=PT1S → score=2 + // Matches "low" (minScore=1) but NOT "very-low" (minScore=3) WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( null, - "PT1S", // 1 second duration threshold — query covers 1 day, will breach - 1, // segment count threshold — 5 segments will breach + "PT1S", + 1, null, TWO_LANES ); @@ -111,26 +109,24 @@ public void testComputeLane_multipleViolations_higherLane() Set segments = makeSegments(5); Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); Assert.assertTrue(lane.isPresent()); - // score=2, meets "low" (minScore=1) but not "very-low" (minScore=3) Assert.assertEquals("low", lane.get()); } @Test public void testComputeLane_allViolations_mostRestrictiveLane() { - // All 4 thresholds set very low — all will breach + // All 4 thresholds set very low → score=4 → meets "very-low" (minScore=3) WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( - "PT1S", // period threshold — query interval in 2020 is far in the past - "PT1S", // duration threshold — 1 day query > 1 second - 1, // segment count threshold — 5 > 1 - "PT1S", // segment range threshold — will breach with segments spanning time + "PT1S", + "PT1S", + 1, + "PT1S", TWO_LANES ); TimeseriesQuery query = queryBuilder.build(); Set segments = makeSegments(5); Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); Assert.assertTrue(lane.isPresent()); - // score=4: all 4 thresholds breached → meets "very-low" (minScore=3) Assert.assertEquals("very-low", lane.get()); } @@ -139,13 +135,32 @@ public void testComputeLane_existingLane_preserved() { WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); TimeseriesQuery query = queryBuilder - .context(ImmutableMap.of(QueryContexts.LANE_KEY, "custom")) + .context(Map.of(QueryContexts.LANE_KEY, "custom")) .build(); - Optional lane = strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); Assert.assertTrue(lane.isPresent()); Assert.assertEquals("custom", lane.get()); } + @Test + public void testComputeLane_segmentRangeWithDifferentIntervals() + { + // Segments spanning different intervals to validate segmentRange summing + WeightedQueryLaningStrategy strategy = newStrategy(null, null, null, "PT1S"); + TimeseriesQuery query = queryBuilder.build(); + Set segments = new HashSet<>(); + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-01/2020-01-02"), "v1", 0) + )); + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-02/2020-01-03"), "v1", 1) + )); + // Total range = 1 day + 1 day = 2 days > 1 second → score=1 → "low" + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + @Test public void testValidation_noThresholds() { @@ -165,7 +180,7 @@ public void testValidation_noLanes() null, 10, null, - ImmutableMap.of() + Map.of() ) ); } From c11ab56acb79534d0df0dd7b0d50d4e95c0540d4 Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Fri, 15 May 2026 12:35:29 -0700 Subject: [PATCH 3/8] Reject reserved lane names --- .../WeightedQueryLaningStrategy.java | 9 ++++++ .../WeightedQueryLaningStrategyTest.java | 30 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java index 0e5bdb9cd67b..b5f95c1de920 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -29,6 +29,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.server.QueryLaningStrategy; +import org.apache.druid.server.QueryScheduler; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -90,6 +91,14 @@ public WeightedQueryLaningStrategy( lanes != null && !lanes.isEmpty(), "At least one lane must be defined" ); + Preconditions.checkArgument( + lanes.keySet().stream().noneMatch(QueryScheduler.TOTAL::equals), + "Lane cannot be named 'total'" + ); + Preconditions.checkArgument( + lanes.keySet().stream().noneMatch("default"::equals), + "Lane cannot be named 'default'" + ); this.segmentCountThreshold = segmentCountThreshold == null ? DEFAULT_SEGMENT_THRESHOLD : segmentCountThreshold; this.periodThreshold = periodThreshold == null ? null : new Period(periodThreshold); diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java index 74950bfeb590..87227ffc4cad 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -200,6 +200,36 @@ public void testValidation_nullLanes() ); } + @Test + public void testValidation_reservedLaneNameTotal() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + Map.of("total", new WeightedQueryLaningStrategy.LaneConfig(1, 30)) + ) + ); + } + + @Test + public void testValidation_reservedLaneNameDefault() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + Map.of("default", new WeightedQueryLaningStrategy.LaneConfig(1, 30)) + ) + ); + } + @Test public void testLaneConfig_invalidMinScore() { From f9d5cfcf8c63a262a0a4c9b0303b11b0e0b2b1da Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Fri, 22 May 2026 10:11:14 -0700 Subject: [PATCH 4/8] comment updates --- .../WeightedQueryLaningStrategy.java | 37 ++++++++-- .../WeightedQueryLaningStrategyTest.java | 72 +++++++++++++++++++ 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java index b5f95c1de920..91c18cea205b 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -61,13 +61,16 @@ */ public class WeightedQueryLaningStrategy implements QueryLaningStrategy { - private static final int DEFAULT_SEGMENT_THRESHOLD = Integer.MAX_VALUE; - - private final int segmentCountThreshold; + @JsonProperty + @Nullable + private final Integer segmentCountThreshold; + @JsonProperty @Nullable private final Period periodThreshold; + @JsonProperty @Nullable private final Duration durationThreshold; + @JsonProperty @Nullable private final Duration segmentRangeThreshold; @@ -87,20 +90,40 @@ public WeightedQueryLaningStrategy( segmentCountThreshold != null || periodThreshold != null || durationThreshold != null || segmentRangeThreshold != null, "At least one of periodThreshold, durationThreshold, segmentCountThreshold, or segmentRangeThreshold must be set" ); + Preconditions.checkArgument( + segmentCountThreshold == null || segmentCountThreshold > 0, + "segmentCountThreshold must be > 0, got [%s]", segmentCountThreshold + ); + if (durationThreshold != null) { + Duration dur = new Period(durationThreshold).toStandardDuration(); + Preconditions.checkArgument(dur.getMillis() > 0, "durationThreshold must be positive, got [%s]", durationThreshold); + } + if (segmentRangeThreshold != null) { + Duration dur = new Period(segmentRangeThreshold).toStandardDuration(); + Preconditions.checkArgument(dur.getMillis() > 0, "segmentRangeThreshold must be positive, got [%s]", segmentRangeThreshold); + } + if (periodThreshold != null) { + Period p = new Period(periodThreshold); + DateTime now = DateTimes.nowUtc(); + Preconditions.checkArgument( + now.minus(p.toDurationFrom(now)).isBefore(now), + "periodThreshold must be positive, got [%s]", periodThreshold + ); + } Preconditions.checkArgument( lanes != null && !lanes.isEmpty(), "At least one lane must be defined" ); Preconditions.checkArgument( - lanes.keySet().stream().noneMatch(QueryScheduler.TOTAL::equals), + !lanes.containsKey(QueryScheduler.TOTAL), "Lane cannot be named 'total'" ); Preconditions.checkArgument( - lanes.keySet().stream().noneMatch("default"::equals), + !lanes.containsKey("default"), "Lane cannot be named 'default'" ); - this.segmentCountThreshold = segmentCountThreshold == null ? DEFAULT_SEGMENT_THRESHOLD : segmentCountThreshold; + this.segmentCountThreshold = segmentCountThreshold; this.periodThreshold = periodThreshold == null ? null : new Period(periodThreshold); this.durationThreshold = durationThreshold == null ? null @@ -163,7 +186,7 @@ private int computeScore(Query query, Set segments score++; } - if (segments.size() > segmentCountThreshold) { + if (segmentCountThreshold != null && segments.size() > segmentCountThreshold) { score++; } diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java index 87227ffc4cad..950d183ef342 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -286,6 +286,78 @@ private static WeightedQueryLaningStrategy newStrategy( ); } + @Test + public void testValidation_segmentCountThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, 0, null, TWO_LANES) + ); + } + + @Test + public void testValidation_segmentCountThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, -1, null, TWO_LANES) + ); + } + + @Test + public void testValidation_durationThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, "PT0S", null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_durationThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, "-PT1S", null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_segmentRangeThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, null, "PT0S", TWO_LANES) + ); + } + + @Test + public void testValidation_segmentRangeThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, null, "-PT1S", TWO_LANES) + ); + } + + @Test + public void testValidation_periodThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy("PT0S", null, null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_periodThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy("-PT1S", null, null, null, TWO_LANES) + ); + } + private static Set makeSegments(int count) { Set segments = new HashSet<>(); From 4a5d7e2673eafb7ac72ba604acc2a8c1c2fa008e Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Thu, 28 May 2026 14:36:39 -0700 Subject: [PATCH 5/8] updates from LLM review --- .../WeightedQueryLaningStrategy.java | 106 +++++++++++++----- 1 file changed, 81 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java index 91c18cea205b..d43bb8ecb94b 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -64,13 +64,25 @@ public class WeightedQueryLaningStrategy implements QueryLaningStrategy @JsonProperty @Nullable private final Integer segmentCountThreshold; - @JsonProperty + // Stored as the original config strings so that Jackson round-trip serde is consistent: + // the @JsonCreator constructor accepts String parameters, so serialization must also emit + // strings. Storing Period/Duration objects directly with @JsonProperty would cause Jackson + // to serialize them as complex JSON objects that the constructor cannot deserialize back. + @JsonProperty("periodThreshold") + @Nullable + private final String periodThresholdString; + @JsonProperty("durationThreshold") + @Nullable + private final String durationThresholdString; + @JsonProperty("segmentRangeThreshold") + @Nullable + private final String segmentRangeThresholdString; + + // Parsed from the string fields above at construction time; not serialized. @Nullable private final Period periodThreshold; - @JsonProperty @Nullable private final Duration durationThreshold; - @JsonProperty @Nullable private final Duration segmentRangeThreshold; @@ -79,35 +91,72 @@ public class WeightedQueryLaningStrategy implements QueryLaningStrategy @JsonCreator public WeightedQueryLaningStrategy( - @JsonProperty("periodThreshold") @Nullable String periodThreshold, - @JsonProperty("durationThreshold") @Nullable String durationThreshold, + @JsonProperty("periodThreshold") @Nullable String periodThresholdString, + @JsonProperty("durationThreshold") @Nullable String durationThresholdString, @JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold, - @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThreshold, + @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString, @JsonProperty("lanes") Map lanes ) { + final Period parsedPeriod; + if (periodThresholdString == null) { + parsedPeriod = null; + } else { + try { + parsedPeriod = new Period(periodThresholdString); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "periodThreshold is not a valid ISO 8601 period, got [" + periodThresholdString + "]" + ); + } + } + final Duration parsedDuration; + if (durationThresholdString == null) { + parsedDuration = null; + } else { + try { + parsedDuration = new Period(durationThresholdString).toStandardDuration(); + } + catch (UnsupportedOperationException e) { + throw new IllegalArgumentException( + "durationThreshold must not contain month or year components, got [" + durationThresholdString + "]" + ); + } + } + final Duration parsedSegmentRange; + if (segmentRangeThresholdString == null) { + parsedSegmentRange = null; + } else { + try { + parsedSegmentRange = new Period(segmentRangeThresholdString).toStandardDuration(); + } + catch (UnsupportedOperationException e) { + throw new IllegalArgumentException( + "segmentRangeThreshold must not contain month or year components, got [" + segmentRangeThresholdString + "]" + ); + } + } + Preconditions.checkArgument( - segmentCountThreshold != null || periodThreshold != null || durationThreshold != null || segmentRangeThreshold != null, + segmentCountThreshold != null || parsedPeriod != null || parsedDuration != null || parsedSegmentRange != null, "At least one of periodThreshold, durationThreshold, segmentCountThreshold, or segmentRangeThreshold must be set" ); Preconditions.checkArgument( segmentCountThreshold == null || segmentCountThreshold > 0, "segmentCountThreshold must be > 0, got [%s]", segmentCountThreshold ); - if (durationThreshold != null) { - Duration dur = new Period(durationThreshold).toStandardDuration(); - Preconditions.checkArgument(dur.getMillis() > 0, "durationThreshold must be positive, got [%s]", durationThreshold); + if (parsedDuration != null) { + Preconditions.checkArgument(parsedDuration.getMillis() > 0, "durationThreshold must be positive, got [%s]", durationThresholdString); } - if (segmentRangeThreshold != null) { - Duration dur = new Period(segmentRangeThreshold).toStandardDuration(); - Preconditions.checkArgument(dur.getMillis() > 0, "segmentRangeThreshold must be positive, got [%s]", segmentRangeThreshold); + if (parsedSegmentRange != null) { + Preconditions.checkArgument(parsedSegmentRange.getMillis() > 0, "segmentRangeThreshold must be positive, got [%s]", segmentRangeThresholdString); } - if (periodThreshold != null) { - Period p = new Period(periodThreshold); + if (parsedPeriod != null) { DateTime now = DateTimes.nowUtc(); Preconditions.checkArgument( - now.minus(p.toDurationFrom(now)).isBefore(now), - "periodThreshold must be positive, got [%s]", periodThreshold + now.minus(parsedPeriod.toDurationFrom(now)).isBefore(now), + "periodThreshold must be positive, got [%s]", periodThresholdString ); } Preconditions.checkArgument( @@ -122,15 +171,22 @@ public WeightedQueryLaningStrategy( !lanes.containsKey("default"), "Lane cannot be named 'default'" ); + long distinctScores = lanes.values().stream().mapToInt(LaneConfig::getMinScore).distinct().count(); + Preconditions.checkArgument( + distinctScores == lanes.size(), + "Each lane must have a unique minScore so that lane selection is deterministic " + + "(a query is assigned to the lane with the highest minScore it meets; equal scores " + + "produce non-deterministic results). Found duplicate minScore values in lanes: [%s]", + lanes + ); this.segmentCountThreshold = segmentCountThreshold; - this.periodThreshold = periodThreshold == null ? null : new Period(periodThreshold); - this.durationThreshold = durationThreshold == null - ? null - : new Period(durationThreshold).toStandardDuration(); - this.segmentRangeThreshold = segmentRangeThreshold == null - ? null - : new Period(segmentRangeThreshold).toStandardDuration(); + this.periodThresholdString = periodThresholdString; + this.durationThresholdString = durationThresholdString; + this.segmentRangeThresholdString = segmentRangeThresholdString; + this.periodThreshold = parsedPeriod; + this.durationThreshold = parsedDuration; + this.segmentRangeThreshold = parsedSegmentRange; this.lanes = lanes; } @@ -197,7 +253,7 @@ private int computeScore(Query query, Set segments .distinct() .mapToLong(AbstractInterval::toDurationMillis) .sum(); - if (new Duration(segmentRangeMs).isLongerThan(segmentRangeThreshold)) { + if (segmentRangeMs > segmentRangeThreshold.getMillis()) { score++; } } From 54960eb094e632fcda02470c361054e1e373d22c Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Thu, 2 Jul 2026 11:47:34 -0700 Subject: [PATCH 6/8] updates --- docs/configuration/index.md | 33 +++++++ .../apache/druid/server/QueryScheduler.java | 10 +- .../scheduling/ManualQueryLaningStrategy.java | 2 +- .../WeightedQueryLaningStrategy.java | 94 ++++++++++--------- .../druid/server/QuerySchedulerTest.java | 2 +- .../WeightedQueryLaningStrategyTest.java | 36 +++++-- 6 files changed, 121 insertions(+), 56 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 851db47e3a61..dc742e6d3d7e 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1770,6 +1770,39 @@ This laning strategy is best suited for cases where one or more external applica |`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this. The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| |`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`| +###### Weighted laning strategy + +This laning strategy assigns a cost to each query based on how many of a configurable set of thresholds it breaches, then routes the query to the most restrictive lane whose `minCost` the query meets. It is a more granular alternative to the 'High/Low' strategy: rather than a binary split, a query that breaches one threshold can be laned differently than one that breaches several. The thresholds are the same ones used by the [threshold prioritization strategy](#threshold-prioritization-strategy) (data age, query interval duration, segment count, and total segment range). Each threshold a query breaches adds 1 to its cost. A query with cost 0 is not assigned a lane and runs in the interactive (default) pool. + +If a lane is specified in the [query context](../querying/query-context-reference.md) `lane` parameter, this will override the computed lane. + +This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=weighted`. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.laning.periodThreshold`|ISO 8601 period. A query is charged 1 if any of its intervals starts before `now - periodThreshold` (that is, it reads data older than this).|null (not evaluated)| +|`druid.query.scheduler.laning.durationThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if its total interval duration exceeds this.|null (not evaluated)| +|`druid.query.scheduler.laning.segmentCountThreshold`|A query is charged 1 if the number of segments it involves exceeds this. Must be greater than 0.|null (not evaluated)| +|`druid.query.scheduler.laning.segmentRangeThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if the summed time range of its distinct segments exceeds this.|null (not evaluated)| +|`druid.query.scheduler.laning.lanes.{name}.minCost`|Minimum query cost required to be assigned to this lane. A query is assigned to the lane with the highest `minCost` it meets. Each lane must have a unique `minCost` so that lane selection is deterministic. Must be greater than 0. The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane| +|`druid.query.scheduler.laning.lanes.{name}.maxPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads` that queries in this lane may use concurrently. Must be an integer in the range 1 to 100.|No default, must be set for each lane| + +At least one of `periodThreshold`, `durationThreshold`, `segmentCountThreshold`, or `segmentRangeThreshold` must be set. For example, the following configuration routes queries breaching 1 or 2 thresholds to a `low` lane capped at 30% capacity, and queries breaching 3 or 4 thresholds to a `very-low` lane capped at 10%: + +```json +{ + "strategy": "weighted", + "periodThreshold": "P1M", + "durationThreshold": "P1D", + "segmentCountThreshold": 1000, + "segmentRangeThreshold": "P180D", + "lanes": { + "low": { "minCost": 1, "maxPercent": 30 }, + "very-low": { "minCost": 3, "maxPercent": 10 } + } +} +``` + ##### Server configuration Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, (in most cases) comparatively very short-lived, HTTP requests. diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index 489af47e869e..ad43d5148906 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -66,6 +66,12 @@ public class QueryScheduler implements QueryWatcher private static final Logger LOGGER = new Logger(QueryScheduler.class); public static final int UNAVAILABLE = -1; public static final String TOTAL = "total"; + /** + * Lane name used for queries that are not assigned to any explicit lane (interactive pool). It has special meaning + * for the resilience4j bulkhead used to enforce lane limits, and is also emitted as the {@code lane} metric dimension + * for un-laned queries. It is therefore reserved and cannot be used as a configured lane name. + */ + public static final String DEFAULT = "default"; private final int totalCapacity; private final QueryPrioritizationStrategy prioritizationStrategy; private final QueryLaningStrategy laningStrategy; @@ -165,12 +171,12 @@ public Query prioritizeAndLaneQuery(QueryPlus queryPlus, Set lane = laningStrategy.computeLane(queryPlus.withQuery(query), segments); LOGGER.debug( "[%s] lane assigned to [%s] query with [%,d] priority", - lane.orElse("default"), + lane.orElse(DEFAULT), query.getType(), priority.orElse(0) ); final ServiceMetricEvent.Builder builderUsr = ServiceMetricEvent.builder().setFeed("metrics") - .setDimension("lane", lane.orElse("default")) + .setDimension("lane", lane.orElse(DEFAULT)) .setDimension("dataSource", query.getDataSource().getTableNames()) .setDimension("type", query.getType()); emitter.emit(builderUsr.setMetric("query/priority", priority.orElse(Integer.valueOf(0)))); diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java index 0f17adfae785..146a07e50387 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java @@ -64,7 +64,7 @@ public ManualQueryLaningStrategy( // 'default' has special meaning for resilience4j bulkhead used by query scheduler, this restriction // can potentially be relaxed if we ever change enforcement mechanism Preconditions.checkArgument( - lanes.keySet().stream().noneMatch("default"::equals), + lanes.keySet().stream().noneMatch(QueryScheduler.DEFAULT::equals), "Lane cannot be named 'default'" ); } diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java index d43bb8ecb94b..b3f075981c72 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -32,19 +32,21 @@ import org.apache.druid.server.QueryScheduler; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.joda.time.Interval; import org.joda.time.Period; import org.joda.time.base.AbstractInterval; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; /** - * Query laning strategy that scores queries by how many configured thresholds they breach, - * then assigns them to the most restrictive matching lane. This provides more nuanced lane - * assignment than {@link HiLoQueryLaningStrategy}, which uses a binary high/low split. + * Query laning strategy that assigns a cost to queries based on how many configured thresholds + * they breach, then assigns them to the most restrictive matching lane. This provides more nuanced + * lane assignment than {@link HiLoQueryLaningStrategy}, which uses a binary high/low split. * *

Configuration example: *

{@code
@@ -53,8 +55,8 @@
  *   "periodThreshold": "P1M",
  *   "segmentCountThreshold": 1000,
  *   "lanes": {
- *     "low": { "minScore": 1, "maxPercent": 30 },
- *     "very-low": { "minScore": 3, "maxPercent": 10 }
+ *     "low": { "minCost": 1, "maxPercent": 30 },
+ *     "very-low": { "minCost": 3, "maxPercent": 10 }
  *   }
  * }
  * }
@@ -118,9 +120,10 @@ public WeightedQueryLaningStrategy( try { parsedDuration = new Period(durationThresholdString).toStandardDuration(); } - catch (UnsupportedOperationException e) { + catch (IllegalArgumentException | UnsupportedOperationException e) { throw new IllegalArgumentException( - "durationThreshold must not contain month or year components, got [" + durationThresholdString + "]" + "durationThreshold is not a valid ISO 8601 duration, " + + "got [" + durationThresholdString + "]" ); } } @@ -131,9 +134,10 @@ public WeightedQueryLaningStrategy( try { parsedSegmentRange = new Period(segmentRangeThresholdString).toStandardDuration(); } - catch (UnsupportedOperationException e) { + catch (IllegalArgumentException | UnsupportedOperationException e) { throw new IllegalArgumentException( - "segmentRangeThreshold must not contain month or year components, got [" + segmentRangeThresholdString + "]" + "segmentRangeThreshold is not a valid ISO 8601 duration, " + + "got [" + segmentRangeThresholdString + "]" ); } } @@ -165,18 +169,18 @@ public WeightedQueryLaningStrategy( ); Preconditions.checkArgument( !lanes.containsKey(QueryScheduler.TOTAL), - "Lane cannot be named 'total'" + "Lane cannot be named '%s'", QueryScheduler.TOTAL ); Preconditions.checkArgument( - !lanes.containsKey("default"), - "Lane cannot be named 'default'" + !lanes.containsKey(QueryScheduler.DEFAULT), + "Lane cannot be named '%s'", QueryScheduler.DEFAULT ); - long distinctScores = lanes.values().stream().mapToInt(LaneConfig::getMinScore).distinct().count(); + long distinctCosts = lanes.values().stream().mapToInt(LaneConfig::getMinCost).distinct().count(); Preconditions.checkArgument( - distinctScores == lanes.size(), - "Each lane must have a unique minScore so that lane selection is deterministic " - + "(a query is assigned to the lane with the highest minScore it meets; equal scores " - + "produce non-deterministic results). Found duplicate minScore values in lanes: [%s]", + distinctCosts == lanes.size(), + "Each lane must have a unique minCost so that lane selection is deterministic " + + "(a query is assigned to the lane with the highest minCost it meets; equal costs " + + "produce non-deterministic results). Found duplicate minCost values in lanes: [%s]", lanes ); @@ -208,42 +212,46 @@ public Optional computeLane(QueryPlus query, Set entry : lanes.entrySet()) { - int minScore = entry.getValue().minScore; - if (score >= minScore && minScore > bestMinScore) { - bestLane = entry.getKey(); - bestMinScore = minScore; + int minCost = entry.getValue().minCost; + if (cost >= minCost && minCost > highestMinCost) { + highestLane = entry.getKey(); + highestMinCost = minCost; } } - return Optional.ofNullable(bestLane); + return Optional.ofNullable(highestLane); } - private int computeScore(Query query, Set segments) + private int computeCost(Query query, Set segments) { - int score = 0; + int cost = 0; if (periodThreshold != null) { final DateTime now = DateTimes.nowUtc(); final DateTime cutoff = now.minus(periodThreshold.toDurationFrom(now)); - if (query.getIntervals().stream().anyMatch(interval -> interval.getStart().isBefore(cutoff))) { - score++; + // Query intervals are condensed and sorted ascending by start (see JodaUtils.condenseIntervals, applied by + // every QuerySegmentSpec), so the earliest start is the first interval. Checking only it avoids scanning a + // query with hundreds of intervals. + final List intervals = query.getIntervals(); + if (!intervals.isEmpty() && intervals.get(0).getStart().isBefore(cutoff)) { + cost++; } } if (durationThreshold != null && query.getDuration().isLongerThan(durationThreshold)) { - score++; + cost++; } if (segmentCountThreshold != null && segments.size() > segmentCountThreshold) { - score++; + cost++; } if (segmentRangeThreshold != null) { @@ -254,37 +262,37 @@ private int computeScore(Query query, Set segments .mapToLong(AbstractInterval::toDurationMillis) .sum(); if (segmentRangeMs > segmentRangeThreshold.getMillis()) { - score++; + cost++; } } - return score; + return cost; } public static class LaneConfig { - private final int minScore; + private final int minCost; private final int maxPercent; @JsonCreator public LaneConfig( - @JsonProperty("minScore") int minScore, + @JsonProperty("minCost") int minCost, @JsonProperty("maxPercent") int maxPercent ) { - Preconditions.checkArgument(minScore > 0, "minScore must be > 0, got [%s]", minScore); + Preconditions.checkArgument(minCost > 0, "minCost must be > 0, got [%s]", minCost); Preconditions.checkArgument( maxPercent > 0 && maxPercent <= 100, "maxPercent must be in the range 1 to 100, got [%s]", maxPercent ); - this.minScore = minScore; + this.minCost = minCost; this.maxPercent = maxPercent; } @JsonProperty - public int getMinScore() + public int getMinCost() { - return minScore; + return minCost; } @JsonProperty @@ -303,13 +311,13 @@ public boolean equals(Object o) return false; } LaneConfig that = (LaneConfig) o; - return minScore == that.minScore && maxPercent == that.maxPercent; + return minCost == that.minCost && maxPercent == that.maxPercent; } @Override public int hashCode() { - return Objects.hash(minScore, maxPercent); + return Objects.hash(minCost, maxPercent); } } } diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index f27977930a20..9d584dccc7ce 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -622,7 +622,7 @@ public void testConfigWeighted() properties.setProperty(propertyPrefix + ".numThreads", "10"); properties.setProperty(propertyPrefix + ".laning.strategy", "weighted"); properties.setProperty(propertyPrefix + ".laning.segmentCountThreshold", "1"); - properties.setProperty(propertyPrefix + ".laning.lanes", "{\"low\": {\"minScore\": 1, \"maxPercent\": 30}}"); + properties.setProperty(propertyPrefix + ".laning.lanes", "{\"low\": {\"minCost\": 1, \"maxPercent\": 30}}"); provider.inject(properties, injector.getInstance(JsonConfigurator.class)); final QueryScheduler scheduler = provider.get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java index 950d183ef342..65eeb50d4da7 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -84,7 +84,7 @@ public void testComputeLane_noViolations() @Test public void testComputeLane_oneViolation_segmentCount() { - // segmentCountThreshold=1, query has 5 segments → score=1 → "low" + // segmentCountThreshold=1, query has 5 segments → cost=1 → "low" WeightedQueryLaningStrategy strategy = newStrategy(null, null, 1, null); TimeseriesQuery query = queryBuilder.build(); Set segments = makeSegments(5); @@ -94,10 +94,10 @@ public void testComputeLane_oneViolation_segmentCount() } @Test - public void testComputeLane_twoViolations_matchesLowerMinScore() + public void testComputeLane_twoViolations_matchesLowerMinCost() { - // segmentCountThreshold=1 + durationThreshold=PT1S → score=2 - // Matches "low" (minScore=1) but NOT "very-low" (minScore=3) + // segmentCountThreshold=1 + durationThreshold=PT1S → cost=2 + // Matches "low" (minCost=1) but NOT "very-low" (minCost=3) WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( null, "PT1S", @@ -115,7 +115,7 @@ public void testComputeLane_twoViolations_matchesLowerMinScore() @Test public void testComputeLane_allViolations_mostRestrictiveLane() { - // All 4 thresholds set very low → score=4 → meets "very-low" (minScore=3) + // All 4 thresholds set very low → cost=4 → meets "very-low" (minCost=3) WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( "PT1S", "PT1S", @@ -155,12 +155,30 @@ public void testComputeLane_segmentRangeWithDifferentIntervals() segments.add(new SegmentServerSelector( new SegmentDescriptor(Intervals.of("2020-01-02/2020-01-03"), "v1", 1) )); - // Total range = 1 day + 1 day = 2 days > 1 second → score=1 → "low" + // Total range = 1 day + 1 day = 2 days > 1 second → cost=1 → "low" Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); Assert.assertTrue(lane.isPresent()); Assert.assertEquals("low", lane.get()); } + @Test + public void testComputeLane_periodThreshold_usesEarliestInterval() + { + // periodThreshold=P1D → cutoff is ~1 day ago. Provide multiple intervals in unsorted order where only the + // earliest (old) interval is before the cutoff. getIntervals() is condensed/sorted ascending by start, so the + // period check inspects the earliest interval and must charge cost even though later intervals are recent. + WeightedQueryLaningStrategy strategy = newStrategy("P1D", null, null, null); + TimeseriesQuery query = queryBuilder + .intervals(List.of( + Intervals.of("2038-01-01/2038-01-02"), + Intervals.of("2000-01-01/2000-01-02") + )) + .build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + @Test public void testValidation_noThresholds() { @@ -231,7 +249,7 @@ public void testValidation_reservedLaneNameDefault() } @Test - public void testLaneConfig_invalidMinScore() + public void testLaneConfig_invalidMinCost() { Assert.assertThrows( IllegalArgumentException.class, @@ -257,8 +275,8 @@ public void testSerde() throws Exception + " \"segmentCountThreshold\": 1000,\n" + " \"durationThreshold\": \"P1D\",\n" + " \"lanes\": {\n" - + " \"low\": { \"minScore\": 1, \"maxPercent\": 30 },\n" - + " \"very-low\": { \"minScore\": 3, \"maxPercent\": 10 }\n" + + " \"low\": { \"minCost\": 1, \"maxPercent\": 30 },\n" + + " \"very-low\": { \"minCost\": 3, \"maxPercent\": 10 }\n" + " }\n" + "}"; From 34d789aef7cdb311ed2556df9d32160e1422525b Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Thu, 2 Jul 2026 13:32:22 -0700 Subject: [PATCH 7/8] spelling --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index dc742e6d3d7e..8c82c2bb4d0f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1772,7 +1772,7 @@ This laning strategy is best suited for cases where one or more external applica ###### Weighted laning strategy -This laning strategy assigns a cost to each query based on how many of a configurable set of thresholds it breaches, then routes the query to the most restrictive lane whose `minCost` the query meets. It is a more granular alternative to the 'High/Low' strategy: rather than a binary split, a query that breaches one threshold can be laned differently than one that breaches several. The thresholds are the same ones used by the [threshold prioritization strategy](#threshold-prioritization-strategy) (data age, query interval duration, segment count, and total segment range). Each threshold a query breaches adds 1 to its cost. A query with cost 0 is not assigned a lane and runs in the interactive (default) pool. +This laning strategy assigns a cost to each query based on how many of a configurable set of thresholds it breaches, then routes the query to the most restrictive lane whose `minCost` the query meets. It is a more granular alternative to the 'High/Low' strategy: rather than a binary split, a query that breaches one threshold can be assigned to a different lane than one that breaches several. The thresholds are the same ones used by the [threshold prioritization strategy](#threshold-prioritization-strategy) (data age, query interval duration, segment count, and total segment range). Each threshold a query breaches adds 1 to its cost. A query with cost 0 is not assigned a lane and runs in the interactive (default) pool. If a lane is specified in the [query context](../querying/query-context-reference.md) `lane` parameter, this will override the computed lane. From 205c3eb92a94b91aba09008fc86bf1ce56461954 Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Fri, 3 Jul 2026 12:02:56 -0700 Subject: [PATCH 8/8] docs: document weighted laning lanes as a single map --- docs/configuration/index.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8c82c2bb4d0f..da8a697f98b4 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1784,8 +1784,7 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=w |`druid.query.scheduler.laning.durationThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if its total interval duration exceeds this.|null (not evaluated)| |`druid.query.scheduler.laning.segmentCountThreshold`|A query is charged 1 if the number of segments it involves exceeds this. Must be greater than 0.|null (not evaluated)| |`druid.query.scheduler.laning.segmentRangeThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if the summed time range of its distinct segments exceeds this.|null (not evaluated)| -|`druid.query.scheduler.laning.lanes.{name}.minCost`|Minimum query cost required to be assigned to this lane. A query is assigned to the lane with the highest `minCost` it meets. Each lane must have a unique `minCost` so that lane selection is deterministic. Must be greater than 0. The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane| -|`druid.query.scheduler.laning.lanes.{name}.maxPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads` that queries in this lane may use concurrently. Must be an integer in the range 1 to 100.|No default, must be set for each lane| +|`druid.query.scheduler.laning.lanes`|A map of lane name to its configuration. At least one lane must be defined. Each lane's configuration has two fields: `minCost`, the minimum query cost required to be assigned to the lane (a query is assigned to the lane with the highest `minCost` it meets; must be greater than 0 and unique across lanes so lane selection is deterministic), and `maxPercent`, the maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads` that queries in the lane may use concurrently (an integer in the range 1 to 100). The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane| At least one of `periodThreshold`, `durationThreshold`, `segmentCountThreshold`, or `segmentRangeThreshold` must be set. For example, the following configuration routes queries breaching 1 or 2 thresholds to a `low` lane capped at 30% capacity, and queries breaching 3 or 4 thresholds to a `very-low` lane capped at 10%: