Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,38 @@ This laning strategy is best suited for cases where one or more external applica
|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this. The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.|
|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`|

###### Weighted laning strategy

This laning strategy assigns a cost to each query based on how many of a configurable set of thresholds it breaches, then routes the query to the most restrictive lane whose `minCost` the query meets. It is a more granular alternative to the 'High/Low' strategy: rather than a binary split, a query that breaches one threshold can be assigned to a different lane than one that breaches several. The thresholds are the same ones used by the [threshold prioritization strategy](#threshold-prioritization-strategy) (data age, query interval duration, segment count, and total segment range). Each threshold a query breaches adds 1 to its cost. A query with cost 0 is not assigned a lane and runs in the interactive (default) pool.

If a lane is specified in the [query context](../querying/query-context-reference.md) `lane` parameter, this will override the computed lane.

This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=weighted`.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.periodThreshold`|ISO 8601 period. A query is charged 1 if any of its intervals starts before `now - periodThreshold` (that is, it reads data older than this).|null (not evaluated)|
|`druid.query.scheduler.laning.durationThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if its total interval duration exceeds this.|null (not evaluated)|
|`druid.query.scheduler.laning.segmentCountThreshold`|A query is charged 1 if the number of segments it involves exceeds this. Must be greater than 0.|null (not evaluated)|
|`druid.query.scheduler.laning.segmentRangeThreshold`|ISO 8601 duration (must not contain month or year components). A query is charged 1 if the summed time range of its distinct segments exceeds this.|null (not evaluated)|
|`druid.query.scheduler.laning.lanes`|A map of lane name to its configuration. At least one lane must be defined. Each lane's configuration has two fields: `minCost`, the minimum query cost required to be assigned to the lane (a query is assigned to the lane with the highest `minCost` it meets; must be greater than 0 and unique across lanes so lane selection is deterministic), and `maxPercent`, the maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads` that queries in the lane may use concurrently (an integer in the range 1 to 100). The lane names 'total' and 'default' are reserved for internal use.|No default, must define at least one lane|

At least one of `periodThreshold`, `durationThreshold`, `segmentCountThreshold`, or `segmentRangeThreshold` must be set. For example, the following configuration routes queries breaching 1 or 2 thresholds to a `low` lane capped at 30% capacity, and queries breaching 3 or 4 thresholds to a `very-low` lane capped at 10%:

```json
{
"strategy": "weighted",
"periodThreshold": "P1M",
"durationThreshold": "P1D",
"segmentCountThreshold": 1000,
"segmentRangeThreshold": "P180D",
"lanes": {
"low": { "minCost": 1, "maxPercent": 30 },
"very-low": { "minCost": 3, "maxPercent": 10 }
}
}
```

##### Server configuration

Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, (in most cases) comparatively very short-lived, HTTP requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.scheduling.ManualQueryLaningStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.server.scheduling.WeightedQueryLaningStrategy;

import java.util.Optional;
import java.util.Set;
Expand All @@ -37,7 +38,8 @@
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class)
@JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "weighted", value = WeightedQueryLaningStrategy.class)
})
public interface QueryLaningStrategy
{
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/apache/druid/server/QueryScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public class QueryScheduler implements QueryWatcher
private static final Logger LOGGER = new Logger(QueryScheduler.class);
public static final int UNAVAILABLE = -1;
public static final String TOTAL = "total";
/**
* Lane name used for queries that are not assigned to any explicit lane (interactive pool). It has special meaning
* for the resilience4j bulkhead used to enforce lane limits, and is also emitted as the {@code lane} metric dimension
* for un-laned queries. It is therefore reserved and cannot be used as a configured lane name.
*/
public static final String DEFAULT = "default";
private final int totalCapacity;
private final QueryPrioritizationStrategy prioritizationStrategy;
private final QueryLaningStrategy laningStrategy;
Expand Down Expand Up @@ -165,12 +171,12 @@ public <T> Query<T> prioritizeAndLaneQuery(QueryPlus<T> queryPlus, Set<SegmentSe
Optional<String> lane = laningStrategy.computeLane(queryPlus.withQuery(query), segments);
LOGGER.debug(
"[%s] lane assigned to [%s] query with [%,d] priority",
lane.orElse("default"),
lane.orElse(DEFAULT),
query.getType(),
priority.orElse(0)
);
final ServiceMetricEvent.Builder builderUsr = ServiceMetricEvent.builder().setFeed("metrics")
.setDimension("lane", lane.orElse("default"))
.setDimension("lane", lane.orElse(DEFAULT))
.setDimension("dataSource", query.getDataSource().getTableNames())
.setDimension("type", query.getType());
emitter.emit(builderUsr.setMetric("query/priority", priority.orElse(Integer.valueOf(0))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ManualQueryLaningStrategy(
// 'default' has special meaning for resilience4j bulkhead used by query scheduler, this restriction
// can potentially be relaxed if we ever change enforcement mechanism
Preconditions.checkArgument(
lanes.keySet().stream().noneMatch("default"::equals),
lanes.keySet().stream().noneMatch(QueryScheduler.DEFAULT::equals),
"Lane cannot be named 'default'"
);
}
Expand Down
Loading
Loading