diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 851db47e3a61..da8a697f98b4 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1770,6 +1770,38 @@ This laning strategy is best suited for cases where one or more external applica |`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this. The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| |`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`| +###### Weighted laning strategy + +This laning strategy assigns a cost to each query based on how many of a configurable set of thresholds it breaches, then routes the query to the most restrictive lane whose `minCost` the query meets. It is a more granular alternative to the 'High/Low' strategy: rather than a binary split, a query that breaches one threshold can be assigned to a different lane than one that breaches several. The thresholds are the same ones used by the [threshold prioritization strategy](#threshold-prioritization-strategy) (data age, query interval duration, segment count, and total segment range). Each threshold a query breaches adds 1 to its cost. A query with cost 0 is not assigned a lane and runs in the interactive (default) pool. + +If a lane is specified in the [query context](../querying/query-context-reference.md) `lane` parameter, this will override the computed lane. + +This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=weighted`. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.laning.periodThreshold`|ISO 8601 period. A query is charged 1 if any of its intervals starts before `now - periodThreshold` (that is, it reads data older than this).|null (not evaluated)| +|`druid.query.scheduler.laning.durationThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if its total interval duration exceeds this.|null (not evaluated)| +|`druid.query.scheduler.laning.segmentCountThreshold`|A query is charged 1 if the number of segments it involves exceeds this. Must be greater than 0.|null (not evaluated)| +|`druid.query.scheduler.laning.segmentRangeThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if the summed time range of its distinct segments exceeds this.|null (not evaluated)| +|`druid.query.scheduler.laning.lanes`|A map of lane name to its configuration. At least one lane must be defined. Each lane's configuration has two fields: `minCost`, the minimum query cost required to be assigned to the lane (a query is assigned to the lane with the highest `minCost` it meets; must be greater than 0 and unique across lanes so lane selection is deterministic), and `maxPercent`, the maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads` that queries in the lane may use concurrently (an integer in the range 1 to 100). The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane| + +At least one of `periodThreshold`, `durationThreshold`, `segmentCountThreshold`, or `segmentRangeThreshold` must be set. For example, the following configuration routes queries breaching 1 or 2 thresholds to a `low` lane capped at 30% capacity, and queries breaching 3 or 4 thresholds to a `very-low` lane capped at 10%: + +```json +{ + "strategy": "weighted", + "periodThreshold": "P1M", + "durationThreshold": "P1D", + "segmentCountThreshold": 1000, + "segmentRangeThreshold": "P180D", + "lanes": { + "low": { "minCost": 1, "maxPercent": 30 }, + "very-low": { "minCost": 3, "maxPercent": 10 } + } +} +``` + ##### Server configuration Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, (in most cases) comparatively very short-lived, HTTP requests. diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java index d601384c6659..2e95f9d1b3a6 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -28,6 +28,7 @@ import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; import org.apache.druid.server.scheduling.ManualQueryLaningStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; +import org.apache.druid.server.scheduling.WeightedQueryLaningStrategy; import java.util.Optional; import java.util.Set; @@ -37,7 +38,8 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class), @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class), - @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class) + @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class), + @JsonSubTypes.Type(name = "weighted", value = WeightedQueryLaningStrategy.class) }) public interface QueryLaningStrategy { diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index 489af47e869e..ad43d5148906 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -66,6 +66,12 @@ public class QueryScheduler implements QueryWatcher private static final Logger LOGGER = new Logger(QueryScheduler.class); public static final int UNAVAILABLE = -1; public static final String TOTAL = "total"; + /** + * Lane name used for queries that are not assigned to any explicit lane (interactive pool). It has special meaning + * for the resilience4j bulkhead used to enforce lane limits, and is also emitted as the {@code lane} metric dimension + * for un-laned queries. It is therefore reserved and cannot be used as a configured lane name. + */ + public static final String DEFAULT = "default"; private final int totalCapacity; private final QueryPrioritizationStrategy prioritizationStrategy; private final QueryLaningStrategy laningStrategy; @@ -165,12 +171,12 @@ public Query prioritizeAndLaneQuery(QueryPlus queryPlus, Set lane = laningStrategy.computeLane(queryPlus.withQuery(query), segments); LOGGER.debug( "[%s] lane assigned to [%s] query with [%,d] priority", - lane.orElse("default"), + lane.orElse(DEFAULT), query.getType(), priority.orElse(0) ); final ServiceMetricEvent.Builder builderUsr = ServiceMetricEvent.builder().setFeed("metrics") - .setDimension("lane", lane.orElse("default")) + .setDimension("lane", lane.orElse(DEFAULT)) .setDimension("dataSource", query.getDataSource().getTableNames()) .setDimension("type", query.getType()); emitter.emit(builderUsr.setMetric("query/priority", priority.orElse(Integer.valueOf(0)))); diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java index 0f17adfae785..146a07e50387 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java @@ -64,7 +64,7 @@ public ManualQueryLaningStrategy( // 'default' has special meaning for resilience4j bulkhead used by query scheduler, this restriction // can potentially be relaxed if we ever change enforcement mechanism Preconditions.checkArgument( - lanes.keySet().stream().noneMatch("default"::equals), + lanes.keySet().stream().noneMatch(QueryScheduler.DEFAULT::equals), "Lane cannot be named 'default'" ); } diff --git a/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java new file mode 100644 index 000000000000..b3f075981c72 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategy.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; +import org.apache.druid.server.QueryScheduler; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.joda.time.base.AbstractInterval; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Query laning strategy that assigns a cost to queries based on how many configured thresholds + * they breach, then assigns them to the most restrictive matching lane. This provides more nuanced + * lane assignment than {@link HiLoQueryLaningStrategy}, which uses a binary high/low split. + * + *

Configuration example: + *

{@code
+ * {
+ *   "strategy": "weighted",
+ *   "periodThreshold": "P1M",
+ *   "segmentCountThreshold": 1000,
+ *   "lanes": {
+ *     "low": { "minCost": 1, "maxPercent": 30 },
+ *     "very-low": { "minCost": 3, "maxPercent": 10 }
+ *   }
+ * }
+ * }
+ */ +public class WeightedQueryLaningStrategy implements QueryLaningStrategy +{ + @JsonProperty + @Nullable + private final Integer segmentCountThreshold; + // Stored as the original config strings so that Jackson round-trip serde is consistent: + // the @JsonCreator constructor accepts String parameters, so serialization must also emit + // strings. Storing Period/Duration objects directly with @JsonProperty would cause Jackson + // to serialize them as complex JSON objects that the constructor cannot deserialize back. + @JsonProperty("periodThreshold") + @Nullable + private final String periodThresholdString; + @JsonProperty("durationThreshold") + @Nullable + private final String durationThresholdString; + @JsonProperty("segmentRangeThreshold") + @Nullable + private final String segmentRangeThresholdString; + + // Parsed from the string fields above at construction time; not serialized. + @Nullable + private final Period periodThreshold; + @Nullable + private final Duration durationThreshold; + @Nullable + private final Duration segmentRangeThreshold; + + @JsonProperty + private final Map lanes; + + @JsonCreator + public WeightedQueryLaningStrategy( + @JsonProperty("periodThreshold") @Nullable String periodThresholdString, + @JsonProperty("durationThreshold") @Nullable String durationThresholdString, + @JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold, + @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString, + @JsonProperty("lanes") Map lanes + ) + { + final Period parsedPeriod; + if (periodThresholdString == null) { + parsedPeriod = null; + } else { + try { + parsedPeriod = new Period(periodThresholdString); + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "periodThreshold is not a valid ISO 8601 period, got [" + periodThresholdString + "]" + ); + } + } + final Duration parsedDuration; + if (durationThresholdString == null) { + parsedDuration = null; + } else { + try { + parsedDuration = new Period(durationThresholdString).toStandardDuration(); + } + catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new IllegalArgumentException( + "durationThreshold is not a valid ISO 8601 duration, " + + "got [" + durationThresholdString + "]" + ); + } + } + final Duration parsedSegmentRange; + if (segmentRangeThresholdString == null) { + parsedSegmentRange = null; + } else { + try { + parsedSegmentRange = new Period(segmentRangeThresholdString).toStandardDuration(); + } + catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new IllegalArgumentException( + "segmentRangeThreshold is not a valid ISO 8601 duration, " + + "got [" + segmentRangeThresholdString + "]" + ); + } + } + + Preconditions.checkArgument( + segmentCountThreshold != null || parsedPeriod != null || parsedDuration != null || parsedSegmentRange != null, + "At least one of periodThreshold, durationThreshold, segmentCountThreshold, or segmentRangeThreshold must be set" + ); + Preconditions.checkArgument( + segmentCountThreshold == null || segmentCountThreshold > 0, + "segmentCountThreshold must be > 0, got [%s]", segmentCountThreshold + ); + if (parsedDuration != null) { + Preconditions.checkArgument(parsedDuration.getMillis() > 0, "durationThreshold must be positive, got [%s]", durationThresholdString); + } + if (parsedSegmentRange != null) { + Preconditions.checkArgument(parsedSegmentRange.getMillis() > 0, "segmentRangeThreshold must be positive, got [%s]", segmentRangeThresholdString); + } + if (parsedPeriod != null) { + DateTime now = DateTimes.nowUtc(); + Preconditions.checkArgument( + now.minus(parsedPeriod.toDurationFrom(now)).isBefore(now), + "periodThreshold must be positive, got [%s]", periodThresholdString + ); + } + Preconditions.checkArgument( + lanes != null && !lanes.isEmpty(), + "At least one lane must be defined" + ); + Preconditions.checkArgument( + !lanes.containsKey(QueryScheduler.TOTAL), + "Lane cannot be named '%s'", QueryScheduler.TOTAL + ); + Preconditions.checkArgument( + !lanes.containsKey(QueryScheduler.DEFAULT), + "Lane cannot be named '%s'", QueryScheduler.DEFAULT + ); + long distinctCosts = lanes.values().stream().mapToInt(LaneConfig::getMinCost).distinct().count(); + Preconditions.checkArgument( + distinctCosts == lanes.size(), + "Each lane must have a unique minCost so that lane selection is deterministic " + + "(a query is assigned to the lane with the highest minCost it meets; equal costs " + + "produce non-deterministic results). Found duplicate minCost values in lanes: [%s]", + lanes + ); + + this.segmentCountThreshold = segmentCountThreshold; + this.periodThresholdString = periodThresholdString; + this.durationThresholdString = durationThresholdString; + this.segmentRangeThresholdString = segmentRangeThresholdString; + this.periodThreshold = parsedPeriod; + this.durationThreshold = parsedDuration; + this.segmentRangeThreshold = parsedSegmentRange; + this.lanes = lanes; + } + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + Object2IntMap limits = new Object2IntArrayMap<>(lanes.size()); + for (Map.Entry entry : lanes.entrySet()) { + limits.put(entry.getKey(), computeLimitFromPercent(totalLimit, entry.getValue().maxPercent)); + } + return limits; + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + final String existingLane = query.getQuery().context().getLane(); + if (existingLane != null) { + return Optional.of(existingLane); + } + + int cost = computeCost(query.getQuery(), segments); + if (cost == 0) { + return Optional.empty(); + } + + // Find the lane with the highest minCost that this query meets + String highestLane = null; + int highestMinCost = 0; + for (Map.Entry entry : lanes.entrySet()) { + int minCost = entry.getValue().minCost; + if (cost >= minCost && minCost > highestMinCost) { + highestLane = entry.getKey(); + highestMinCost = minCost; + } + } + return Optional.ofNullable(highestLane); + } + + private int computeCost(Query query, Set segments) + { + int cost = 0; + + if (periodThreshold != null) { + final DateTime now = DateTimes.nowUtc(); + final DateTime cutoff = now.minus(periodThreshold.toDurationFrom(now)); + // Query intervals are condensed and sorted ascending by start (see JodaUtils.condenseIntervals, applied by + // every QuerySegmentSpec), so the earliest start is the first interval. Checking only it avoids scanning a + // query with hundreds of intervals. + final List intervals = query.getIntervals(); + if (!intervals.isEmpty() && intervals.get(0).getStart().isBefore(cutoff)) { + cost++; + } + } + + if (durationThreshold != null && query.getDuration().isLongerThan(durationThreshold)) { + cost++; + } + + if (segmentCountThreshold != null && segments.size() > segmentCountThreshold) { + cost++; + } + + if (segmentRangeThreshold != null) { + long segmentRangeMs = segments.stream() + .filter(s -> s.getSegmentDescriptor() != null) + .map(s -> s.getSegmentDescriptor().getInterval()) + .distinct() + .mapToLong(AbstractInterval::toDurationMillis) + .sum(); + if (segmentRangeMs > segmentRangeThreshold.getMillis()) { + cost++; + } + } + + return cost; + } + + public static class LaneConfig + { + private final int minCost; + private final int maxPercent; + + @JsonCreator + public LaneConfig( + @JsonProperty("minCost") int minCost, + @JsonProperty("maxPercent") int maxPercent + ) + { + Preconditions.checkArgument(minCost > 0, "minCost must be > 0, got [%s]", minCost); + Preconditions.checkArgument( + maxPercent > 0 && maxPercent <= 100, + "maxPercent must be in the range 1 to 100, got [%s]", maxPercent + ); + this.minCost = minCost; + this.maxPercent = maxPercent; + } + + @JsonProperty + public int getMinCost() + { + return minCost; + } + + @JsonProperty + public int getMaxPercent() + { + return maxPercent; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LaneConfig that = (LaneConfig) o; + return minCost == that.minCost && maxPercent == that.maxPercent; + } + + @Override + public int hashCode() + { + return Objects.hash(minCost, maxPercent); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index ed602678a82c..9d584dccc7ce 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -68,6 +68,7 @@ import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; +import org.apache.druid.server.scheduling.WeightedQueryLaningStrategy; import org.easymock.EasyMock; import org.hamcrest.text.StringContainsInOrder; import org.junit.After; @@ -79,7 +80,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -606,6 +609,103 @@ public void testConfigManualPercent() Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } + @Test + public void testConfigWeighted() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.setProperty(propertyPrefix + ".numThreads", "10"); + properties.setProperty(propertyPrefix + ".laning.strategy", "weighted"); + properties.setProperty(propertyPrefix + ".laning.segmentCountThreshold", "1"); + properties.setProperty(propertyPrefix + ".laning.lanes", "{\"low\": {\"minCost\": 1, \"maxPercent\": 30}}"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(3, scheduler.getLaneAvailableCapacity("low")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); + } + + @Test + public void testWeightedLaneAssignment() + { + ObservableQueryScheduler weightedScheduler = new ObservableQueryScheduler( + TEST_HI_CAPACITY, + ManualQueryPrioritizationStrategy.INSTANCE, + new WeightedQueryLaningStrategy( + null, + null, + 1, + null, + Map.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + ), + SERVER_CONFIG_WITH_TOTAL + ); + + // Query with 2 segments exceeds segmentCountThreshold=1 → "low" lane + Query query = weightedScheduler.prioritizeAndLaneQuery( + QueryPlus.wrap(makeDefaultQuery()), + Set.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ) + ); + Assert.assertEquals("low", query.context().getLane()); + + // Query with 0 segments → no lane + Query noLaneQuery = weightedScheduler.prioritizeAndLaneQuery( + QueryPlus.wrap(makeDefaultQuery()), + Set.of() + ); + Assert.assertNull(noLaneQuery.context().getLane()); + } + + @Test + public void testWeightedFailsWhenOutOfLaneCapacity() + { + ObservableQueryScheduler weightedScheduler = new ObservableQueryScheduler( + TEST_HI_CAPACITY, + ManualQueryPrioritizationStrategy.INSTANCE, + new WeightedQueryLaningStrategy( + null, + null, + 1, + null, + Map.of("low", new WeightedQueryLaningStrategy.LaneConfig(1, 40)) + ), + SERVER_CONFIG_WITH_TOTAL + ); + + Set manySegments = Set.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ); + + // Fill the low lane (capacity = ceil(5 * 40/100) = 2) + Query q1 = weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments); + Yielders.each(weightedScheduler.run(q1, Sequences.empty())); + Assert.assertEquals(1, weightedScheduler.getLaneAvailableCapacity("low")); + + Query q2 = weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments); + Yielders.each(weightedScheduler.run(q2, Sequences.empty())); + Assert.assertEquals(0, weightedScheduler.getLaneAvailableCapacity("low")); + + // Third should fail with 429 + Assert.assertThrows( + QueryCapacityExceededException.class, + () -> Yielders.each( + weightedScheduler.run( + weightedScheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeDefaultQuery()), manySegments), + Sequences.empty() + ) + ) + ); + } + private void maybeDelayNextIteration(int i) throws InterruptedException { if (i > 0 && i % 10 == 0) { diff --git a/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java new file mode 100644 index 000000000000..65eeb50d4da7 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/WeightedQueryLaningStrategyTest.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.QueryLaningStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class WeightedQueryLaningStrategyTest +{ + private static final Map TWO_LANES = Map.of( + "low", new WeightedQueryLaningStrategy.LaneConfig(1, 30), + "very-low", new WeightedQueryLaningStrategy.LaneConfig(3, 10) + ); + + private Druids.TimeseriesQueryBuilder queryBuilder; + + @Before + public void setup() + { + queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(List.of(Intervals.of("2020-01-01/2020-01-02"))) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + } + + @Test + public void testGetLaneLimits() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10, null); + + Object2IntMap limits = strategy.getLaneLimits(100); + Assert.assertEquals(2, limits.size()); + Assert.assertEquals(30, limits.getInt("low")); + Assert.assertEquals(10, limits.getInt("very-low")); + } + + @Test + public void testComputeLane_noViolations() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); + TimeseriesQuery query = queryBuilder.build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); + Assert.assertFalse(lane.isPresent()); + } + + @Test + public void testComputeLane_oneViolation_segmentCount() + { + // segmentCountThreshold=1, query has 5 segments → cost=1 → "low" + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 1, null); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testComputeLane_twoViolations_matchesLowerMinCost() + { + // segmentCountThreshold=1 + durationThreshold=PT1S → cost=2 + // Matches "low" (minCost=1) but NOT "very-low" (minCost=3) + WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( + null, + "PT1S", + 1, + null, + TWO_LANES + ); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testComputeLane_allViolations_mostRestrictiveLane() + { + // All 4 thresholds set very low → cost=4 → meets "very-low" (minCost=3) + WeightedQueryLaningStrategy strategy = new WeightedQueryLaningStrategy( + "PT1S", + "PT1S", + 1, + "PT1S", + TWO_LANES + ); + TimeseriesQuery query = queryBuilder.build(); + Set segments = makeSegments(5); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("very-low", lane.get()); + } + + @Test + public void testComputeLane_existingLane_preserved() + { + WeightedQueryLaningStrategy strategy = newStrategy(null, null, 10000, null); + TimeseriesQuery query = queryBuilder + .context(Map.of(QueryContexts.LANE_KEY, "custom")) + .build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("custom", lane.get()); + } + + @Test + public void testComputeLane_segmentRangeWithDifferentIntervals() + { + // Segments spanning different intervals to validate segmentRange summing + WeightedQueryLaningStrategy strategy = newStrategy(null, null, null, "PT1S"); + TimeseriesQuery query = queryBuilder.build(); + Set segments = new HashSet<>(); + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-01/2020-01-02"), "v1", 0) + )); + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-02/2020-01-03"), "v1", 1) + )); + // Total range = 1 day + 1 day = 2 days > 1 second → cost=1 → "low" + Optional lane = strategy.computeLane(QueryPlus.wrap(query), segments); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testComputeLane_periodThreshold_usesEarliestInterval() + { + // periodThreshold=P1D → cutoff is ~1 day ago. Provide multiple intervals in unsorted order where only the + // earliest (old) interval is before the cutoff. getIntervals() is condensed/sorted ascending by start, so the + // period check inspects the earliest interval and must charge cost even though later intervals are recent. + WeightedQueryLaningStrategy strategy = newStrategy("P1D", null, null, null); + TimeseriesQuery query = queryBuilder + .intervals(List.of( + Intervals.of("2038-01-01/2038-01-02"), + Intervals.of("2000-01-01/2000-01-02") + )) + .build(); + Optional lane = strategy.computeLane(QueryPlus.wrap(query), Set.of()); + Assert.assertTrue(lane.isPresent()); + Assert.assertEquals("low", lane.get()); + } + + @Test + public void testValidation_noThresholds() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> newStrategy(null, null, null, null) + ); + } + + @Test + public void testValidation_noLanes() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + Map.of() + ) + ); + } + + @Test + public void testValidation_nullLanes() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + null + ) + ); + } + + @Test + public void testValidation_reservedLaneNameTotal() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + Map.of("total", new WeightedQueryLaningStrategy.LaneConfig(1, 30)) + ) + ); + } + + @Test + public void testValidation_reservedLaneNameDefault() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy( + null, + null, + 10, + null, + Map.of("default", new WeightedQueryLaningStrategy.LaneConfig(1, 30)) + ) + ); + } + + @Test + public void testLaneConfig_invalidMinCost() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy.LaneConfig(0, 30) + ); + } + + @Test + public void testLaneConfig_invalidMaxPercent() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy.LaneConfig(1, 0) + ); + } + + @Test + public void testSerde() throws Exception + { + ObjectMapper mapper = TestHelper.makeJsonMapper(); + String json = "{\n" + + " \"strategy\": \"weighted\",\n" + + " \"segmentCountThreshold\": 1000,\n" + + " \"durationThreshold\": \"P1D\",\n" + + " \"lanes\": {\n" + + " \"low\": { \"minCost\": 1, \"maxPercent\": 30 },\n" + + " \"very-low\": { \"minCost\": 3, \"maxPercent\": 10 }\n" + + " }\n" + + "}"; + + QueryLaningStrategy deserialized = mapper.readValue(json, QueryLaningStrategy.class); + Assert.assertTrue(deserialized instanceof WeightedQueryLaningStrategy); + + Object2IntMap limits = deserialized.getLaneLimits(100); + Assert.assertEquals(30, limits.getInt("low")); + Assert.assertEquals(10, limits.getInt("very-low")); + } + + private static WeightedQueryLaningStrategy newStrategy( + String periodThreshold, + String durationThreshold, + Integer segmentCountThreshold, + String segmentRangeThreshold + ) + { + return new WeightedQueryLaningStrategy( + periodThreshold, + durationThreshold, + segmentCountThreshold, + segmentRangeThreshold, + TWO_LANES + ); + } + + @Test + public void testValidation_segmentCountThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, 0, null, TWO_LANES) + ); + } + + @Test + public void testValidation_segmentCountThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, -1, null, TWO_LANES) + ); + } + + @Test + public void testValidation_durationThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, "PT0S", null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_durationThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, "-PT1S", null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_segmentRangeThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, null, "PT0S", TWO_LANES) + ); + } + + @Test + public void testValidation_segmentRangeThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy(null, null, null, "-PT1S", TWO_LANES) + ); + } + + @Test + public void testValidation_periodThresholdZero() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy("PT0S", null, null, null, TWO_LANES) + ); + } + + @Test + public void testValidation_periodThresholdNegative() + { + Assert.assertThrows( + IllegalArgumentException.class, + () -> new WeightedQueryLaningStrategy("-PT1S", null, null, null, TWO_LANES) + ); + } + + private static Set makeSegments(int count) + { + Set segments = new HashSet<>(); + for (int i = 0; i < count; i++) { + segments.add(new SegmentServerSelector( + new SegmentDescriptor(Intervals.of("2020-01-01/2020-01-02"), "v1", i) + )); + } + return segments; + } +}