Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public class SqlExpressionBenchmark extends SqlBaseQueryBenchmark
// ===========================
// non-expression reference queries
// ===========================
// 0: non-expression timeseries reference, 1 columns
// 0: non-expression timeseries reference, 1 column
"SELECT SUM(long1) FROM expressions",
// 1: non-expression timeseries reference, 2 columns
"SELECT SUM(long1), SUM(long2) FROM expressions",
// 1: non-expression timeseries reference, 1 column with nulls
"SELECT SUM(long5) FROM expressions",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i can revert it, its mentioned in the PR description that I changed it because long1 has no nulls and long5 has a bunch of them, so they exercise different paths

// 2: non-expression timeseries reference, 3 columns
"SELECT SUM(long1), SUM(long4), SUM(double1) FROM expressions",
// 3: non-expression timeseries reference, 4 columns
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,18 @@ the following properties.
JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
:::

### Expression processing

The properties below tune Druid's native expression engine, which evaluates virtual columns, expression-based filters,
the `expression` aggregator/post-aggregator, and any SQL functions that lower to native expressions.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.expressions.processArraysAsMultiValueStrings`|If true, all `ARRAY` typed values are converted to `STRING` by column selectors and treated as multi-value strings rather than native arrays. Provided for backwards compatibility with the behavior of Druid 24.0 and earlier, before array types were introduced.|false|
|`druid.expressions.homogenizeNullMultiValueStringArrays`|If true, multi-value string expression input values of `null`, `[]`, and `[null]` are all coerced to `[null]`. Provided for backwards compatibility with Druid 0.22 and earlier. If false (the default), this coercion only happens when single-value expressions are implicitly mapped across multi-value rows, so the single-valued expression is evaluated with an input of `null`.|false|
|`druid.expressions.allowVectorizeFallback`|If true, the vectorized query engine handles expressions without a native vectorized implementation using a fallback processor that invokes the scalar expression evaluator in a loop. If false, such expressions cannot be vectorized and the query falls back to the non-vectorized engine.|true|
|`druid.expressions.useVectorApi`|If true, vectorized expression vector processors and numeric vector aggregators dispatch to SIMD specializations backed by the JDK incubator Vector API (`jdk.incubator.vector`) where available. Requires `--add-modules=jdk.incubator.vector` on the JVM command line (see [strong encapsulation](../operations/java.md#strong-encapsulation)). Off by default while the Vector API remains an incubator JDK feature.|false|

### Double column storage

Prior to version 0.13.0, Druid's storage layer used a 32-bit float representation to store columns created by the
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,7 @@
<exclude>**/*_generated*.class</exclude>
<!-- forbidden-apis can't resolve jdk.incubator.vector classes from its own classpath -->
<exclude>**/math/expr/vector/simd/Simd*.class</exclude>
<exclude>**/query/aggregation/simd/Simd*.class</exclude>
</excludes>
<suppressAnnotations>
<annotation>**.SuppressForbidden</annotation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.aggregation.simd.SimdDoubleSumVectorAggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
Expand Down Expand Up @@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
if (ExpressionProcessing.useVectorApi()) {
return new SimdDoubleSumVectorAggregator(selector);
}
return new DoubleSumVectorAggregator(selector);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.aggregation.simd.SimdFloatSumVectorAggregator;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
Expand Down Expand Up @@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
if (ExpressionProcessing.useVectorApi()) {
return new SimdFloatSumVectorAggregator(selector);
}
return new FloatSumVectorAggregator(selector);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.aggregation.simd.SimdLongSumVectorAggregator;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
Expand Down Expand Up @@ -106,6 +108,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
if (ExpressionProcessing.useVectorApi()) {
return new SimdLongSumVectorAggregator(selector);
}
return new LongSumVectorAggregator(selector);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation;

import java.nio.ByteBuffer;

/**
* Capability marker for {@link VectorAggregator}s that can efficiently aggregate a contiguous row range while
* skipping null inputs themselves (such as via SIMD masking) rather than relying on the
* {@link NullableNumericVectorAggregator} wrapper to filter rows into the per-row scatter-gather
* {@link VectorAggregator#aggregate(ByteBuffer, int, int[], int[], int) aggregate} variant.
*
* When the wrapper sees the input batch has nulls (i.e. {@code selector.getNullVector() != null}) and the
* delegate is an instance of this interface, it routes the call here instead of falling back to the
* scatter-gather path.
*/
public interface NullAwareVectorAggregator extends VectorAggregator
{
/**
* Aggregate rows {@code [startRow, endRow)} into the slot at {@code position}, skipping rows where
* {@code nullVector[i] == true}. Implementations should use {@link jdk.incubator.vector.VectorMask} (or
* equivalent) to avoid per-row branching.
*
* @return {@code true} if at least one non-null row was aggregated; {@code false} if every row in the range
* was null. {@link NullableNumericVectorAggregator} uses this to set or leave the null-marker byte
* that precedes the delegate's state.
*/
boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,16 @@ public void init(ByteBuffer buf, int position)
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final boolean[] nullVector = selector.getNullVector();
if (nullVector != null) {
if (nullVector == null) {
doAggregate(buf, position, startRow, endRow);
} else if (delegate instanceof NullAwareVectorAggregator nullAware) {
// Delegate handles null inputs itself (typically via SIMD masking); set the null marker only if it
// reports at least one non-null row contributed.
if (nullAware.aggregate(buf, position + Byte.BYTES, startRow, endRow, nullVector)) {
buf.put(position, TypeStrategies.IS_NOT_NULL_BYTE);
}
} else {
// Fallback: filter non-null rows and route through the scatter-gather variant with a uniform position.
// Deferred initialization, since vAggregationPositions and vAggregationRows are only needed if nulls
// actually occur.
if (vAggregationPositions == null) {
Expand All @@ -94,8 +103,6 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
Arrays.fill(vAggregationPositions, 0, j, position);

doAggregate(buf, j, vAggregationPositions, vAggregationRows, 0);
} else {
doAggregate(buf, position, startRow, endRow);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.simd;

import jdk.incubator.vector.DoubleVector;
import jdk.incubator.vector.VectorMask;
import jdk.incubator.vector.VectorOperators;
import jdk.incubator.vector.VectorSpecies;
import org.apache.druid.query.aggregation.DoubleSumVectorAggregator;
import org.apache.druid.query.aggregation.NullAwareVectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;

import java.nio.ByteBuffer;

/**
* SIMD specialization of {@link DoubleSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop
* issues a hardcoded {@link DoubleVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the
* platform's double-add and double-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}.
*
* The grouped scatter-gather variant is inherited from the parent (scalar) class.
*/
public final class SimdDoubleSumVectorAggregator extends DoubleSumVectorAggregator implements NullAwareVectorAggregator
{
private static final VectorSpecies<Double> SPECIES = DoubleVector.SPECIES_PREFERRED;

private final VectorValueSelector selector;

public SimdDoubleSumVectorAggregator(VectorValueSelector selector)
{
super(selector);
this.selector = selector;
}

@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final double[] vector = selector.getDoubleVector();

final int laneCount = SPECIES.length();
final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
int i = startRow;
DoubleVector vacc = DoubleVector.zero(SPECIES);
for (; i < upperBound; i += laneCount) {
vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i));
}
double sum = vacc.reduceLanes(VectorOperators.ADD);
for (; i < endRow; i++) {
sum += vector[i];
}
buf.putDouble(position, buf.getDouble(position) + sum);
}

@Override
public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector)
{
final double[] vector = selector.getDoubleVector();

final int laneCount = SPECIES.length();
final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
int i = startRow;
DoubleVector vacc = DoubleVector.zero(SPECIES);
int nonNullCount = 0;
for (; i < upperBound; i += laneCount) {
final VectorMask<Double> notNull = VectorMask.fromArray(SPECIES, nullVector, i).not();
vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i), notNull);
nonNullCount += notNull.trueCount();
}
double sum = vacc.reduceLanes(VectorOperators.ADD);
for (; i < endRow; i++) {
if (!nullVector[i]) {
sum += vector[i];
nonNullCount++;
}
}
if (nonNullCount > 0) {
buf.putDouble(position, buf.getDouble(position) + sum);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.simd;

import jdk.incubator.vector.FloatVector;
import jdk.incubator.vector.VectorMask;
import jdk.incubator.vector.VectorOperators;
import jdk.incubator.vector.VectorSpecies;
import org.apache.druid.query.aggregation.FloatSumVectorAggregator;
import org.apache.druid.query.aggregation.NullAwareVectorAggregator;
import org.apache.druid.segment.vector.VectorValueSelector;

import java.nio.ByteBuffer;

/**
* SIMD specialization of {@link FloatSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop
* issues a hardcoded {@link FloatVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the
* platform's float-add and float-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}.
*
* The grouped scatter-gather variant is inherited from the parent (scalar) class.
*/
public final class SimdFloatSumVectorAggregator extends FloatSumVectorAggregator implements NullAwareVectorAggregator
{
private static final VectorSpecies<Float> SPECIES = FloatVector.SPECIES_PREFERRED;

private final VectorValueSelector selector;

public SimdFloatSumVectorAggregator(VectorValueSelector selector)
{
super(selector);
this.selector = selector;
}

@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final float[] vector = selector.getFloatVector();

final int laneCount = SPECIES.length();
final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
int i = startRow;
FloatVector vacc = FloatVector.zero(SPECIES);
for (; i < upperBound; i += laneCount) {
vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i));
}
float sum = vacc.reduceLanes(VectorOperators.ADD);
for (; i < endRow; i++) {
sum += vector[i];
}
buf.putFloat(position, buf.getFloat(position) + sum);
}

@Override
public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector)
{
final float[] vector = selector.getFloatVector();

final int laneCount = SPECIES.length();
final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
int i = startRow;
FloatVector vacc = FloatVector.zero(SPECIES);
int nonNullCount = 0;
for (; i < upperBound; i += laneCount) {
final VectorMask<Float> notNull = VectorMask.fromArray(SPECIES, nullVector, i).not();
vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i), notNull);
nonNullCount += notNull.trueCount();
}
float sum = vacc.reduceLanes(VectorOperators.ADD);
for (; i < endRow; i++) {
if (!nullVector[i]) {
sum += vector[i];
nonNullCount++;
}
}
if (nonNullCount > 0) {
buf.putFloat(position, buf.getFloat(position) + sum);
return true;
}
return false;
}
}
Loading
Loading