From 90c8117058d7b4add7895a50a34377f47a34d5e4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 4 Jun 2026 17:40:34 -0700 Subject: [PATCH 1/4] feat: SIMD sum vector aggregators (Long/Double/Float) changes: * add `NullAwareVectorAggregator` marker interface declaring `aggregate(buf, position, startRow, endRow, nullVector)` for delegates that handle nulls themselves; return value reports whether any non-null row contributed. * update `NullableNumericVectorAggregator.aggregate(buf, position, startRow, endRow)` to a three-way dispatch: null-free fast path; `instanceof NullAwareVectorAggregator` -> new null-aware overload (set null marker iff it returned true); else existing scatter-gather fallback. * add `SimdLongSumVectorAggregator`, `SimdDoubleSumVectorAggregator`, and `SimdFloatSumVectorAggregator` under `query/aggregation/simd/`. Each extends its scalar parent, overrides the ungrouped no-null aggregate with a `va.add(vb)` + `reduceLanes(VectorOperators.ADD)` SIMD reduction, and implements the null-aware overload using `VectorMask`-based masked accumulation with `notNull.trueCount()` for the non-null check. * wire `Long/Double/FloatSumAggregatorFactory.factorizeVector` to dispatch on `ExpressionProcessing.useVectorApi()`. * add `SimdSumVectorAggregatorTest`, for each of the three types, tries various vector sizes and null patterns and asserts SIMD output matches the scalar reference (exact for long, within relative tolerance for double/float to accommodate SIMD's tree-reduce vs scalar's left-to-right reduce). --- .../query/SqlExpressionBenchmark.java | 2 +- pom.xml | 1 + .../DoubleSumAggregatorFactory.java | 5 + .../FloatSumAggregatorFactory.java | 5 + .../aggregation/LongSumAggregatorFactory.java | 5 + .../NullAwareVectorAggregator.java | 46 +++ .../NullableNumericVectorAggregator.java | 13 +- .../simd/SimdDoubleSumVectorAggregator.java | 98 ++++++ .../simd/SimdFloatSumVectorAggregator.java | 98 ++++++ .../simd/SimdLongSumVectorAggregator.java | 98 ++++++ .../simd/SimdSumVectorAggregatorTest.java | 328 ++++++++++++++++++ 11 files changed, 695 insertions(+), 4 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 0ef6395a1fce..cc5084a005fc 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -50,7 +50,7 @@ public class SqlExpressionBenchmark extends SqlBaseQueryBenchmark // non-expression reference queries // =========================== // 0: non-expression timeseries reference, 1 columns - "SELECT SUM(long1) FROM expressions", + "SELECT SUM(long5) FROM expressions", // 1: non-expression timeseries reference, 2 columns "SELECT SUM(long1), SUM(long2) FROM expressions", // 2: non-expression timeseries reference, 3 columns diff --git a/pom.xml b/pom.xml index bab799d96ad1..e0fa42262484 100644 --- a/pom.xml +++ b/pom.xml @@ -1788,6 +1788,7 @@ **/*_generated*.class **/math/expr/vector/simd/Simd*.class + **/query/aggregation/simd/Simd*.class **.SuppressForbidden diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java index 6a433b7bf8fb..dd67990fb371 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.math.expr.ExpressionProcessing; +import org.apache.druid.query.aggregation.simd.SimdDoubleSumVectorAggregator; import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector( VectorValueSelector selector ) { + if (ExpressionProcessing.useVectorApi()) { + return new SimdDoubleSumVectorAggregator(selector); + } return new DoubleSumVectorAggregator(selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java index 397b55abed78..bbb2ec52f935 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.math.expr.ExpressionProcessing; +import org.apache.druid.query.aggregation.simd.SimdFloatSumVectorAggregator; import org.apache.druid.segment.BaseFloatColumnValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector( VectorValueSelector selector ) { + if (ExpressionProcessing.useVectorApi()) { + return new SimdFloatSumVectorAggregator(selector); + } return new FloatSumVectorAggregator(selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java index 8e70fac2536d..a070fb30d401 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.math.expr.ExpressionProcessing; +import org.apache.druid.query.aggregation.simd.SimdLongSumVectorAggregator; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -106,6 +108,9 @@ protected VectorAggregator factorizeVector( VectorValueSelector selector ) { + if (ExpressionProcessing.useVectorApi()) { + return new SimdLongSumVectorAggregator(selector); + } return new LongSumVectorAggregator(selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java new file mode 100644 index 000000000000..de1852a5c6d4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import java.nio.ByteBuffer; + +/** + * Capability marker for {@link VectorAggregator}s that can efficiently aggregate a contiguous row range while + * skipping null inputs themselves (such as via SIMD masking) rather than relying on the + * {@link NullableNumericVectorAggregator} wrapper to filter rows into the per-row scatter-gather + * {@link VectorAggregator#aggregate(ByteBuffer, int, int[], int[], int) aggregate} variant. + * + * When the wrapper sees the input batch has nulls (i.e. {@code selector.getNullVector() != null}) and the + * delegate is an instance of this interface, it routes the call here instead of falling back to the + * scatter-gather path. + */ +public interface NullAwareVectorAggregator extends VectorAggregator +{ + /** + * Aggregate rows {@code [startRow, endRow)} into the slot at {@code position}, skipping rows where + * {@code nullVector[i] == true}. Implementations should use {@link jdk.incubator.vector.VectorMask} (or + * equivalent) to avoid per-row branching. + * + * @return {@code true} if at least one non-null row was aggregated; {@code false} if every row in the range + * was null. {@link NullableNumericVectorAggregator} uses this to set or leave the null-marker byte + * that precedes the delegate's state. + */ + boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector); +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java index 9a91176e2ab7..f056dfed3c84 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java @@ -76,7 +76,16 @@ public void init(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) { final boolean[] nullVector = selector.getNullVector(); - if (nullVector != null) { + if (nullVector == null) { + doAggregate(buf, position, startRow, endRow); + } else if (delegate instanceof NullAwareVectorAggregator nullAware) { + // Delegate handles null inputs itself (typically via SIMD masking); set the null marker only if it + // reports at least one non-null row contributed. + if (nullAware.aggregate(buf, position + Byte.BYTES, startRow, endRow, nullVector)) { + buf.put(position, TypeStrategies.IS_NOT_NULL_BYTE); + } + } else { + // Fallback: filter non-null rows and route through the scatter-gather variant with a uniform position. // Deferred initialization, since vAggregationPositions and vAggregationRows are only needed if nulls // actually occur. if (vAggregationPositions == null) { @@ -94,8 +103,6 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) Arrays.fill(vAggregationPositions, 0, j, position); doAggregate(buf, j, vAggregationPositions, vAggregationRows, 0); - } else { - doAggregate(buf, position, startRow, endRow); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java new file mode 100644 index 000000000000..ab4a2afb0ccb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.simd; + +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorMask; +import jdk.incubator.vector.VectorOperators; +import jdk.incubator.vector.VectorSpecies; +import org.apache.druid.query.aggregation.DoubleSumVectorAggregator; +import org.apache.druid.query.aggregation.NullAwareVectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * SIMD specialization of {@link DoubleSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop + * issues a hardcoded {@link DoubleVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the + * platform's double-add and double-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}. + * + * The grouped scatter-gather variant is inherited from the parent (scalar) class. + */ +public final class SimdDoubleSumVectorAggregator extends DoubleSumVectorAggregator implements NullAwareVectorAggregator +{ + private static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED; + + private final VectorValueSelector selector; + + public SimdDoubleSumVectorAggregator(VectorValueSelector selector) + { + super(selector); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final double[] vector = selector.getDoubleVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + DoubleVector vacc = DoubleVector.zero(SPECIES); + for (; i < upperBound; i += laneCount) { + vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i)); + } + double sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + sum += vector[i]; + } + buf.putDouble(position, buf.getDouble(position) + sum); + } + + @Override + public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector) + { + final double[] vector = selector.getDoubleVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + DoubleVector vacc = DoubleVector.zero(SPECIES); + int nonNullCount = 0; + for (; i < upperBound; i += laneCount) { + final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not(); + vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i), notNull); + nonNullCount += notNull.trueCount(); + } + double sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + if (!nullVector[i]) { + sum += vector[i]; + nonNullCount++; + } + } + if (nonNullCount > 0) { + buf.putDouble(position, buf.getDouble(position) + sum); + return true; + } + return false; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java new file mode 100644 index 000000000000..c9b843ca3ed3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.simd; + +import jdk.incubator.vector.FloatVector; +import jdk.incubator.vector.VectorMask; +import jdk.incubator.vector.VectorOperators; +import jdk.incubator.vector.VectorSpecies; +import org.apache.druid.query.aggregation.FloatSumVectorAggregator; +import org.apache.druid.query.aggregation.NullAwareVectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * SIMD specialization of {@link FloatSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop + * issues a hardcoded {@link FloatVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the + * platform's float-add and float-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}. + * + * The grouped scatter-gather variant is inherited from the parent (scalar) class. + */ +public final class SimdFloatSumVectorAggregator extends FloatSumVectorAggregator implements NullAwareVectorAggregator +{ + private static final VectorSpecies SPECIES = FloatVector.SPECIES_PREFERRED; + + private final VectorValueSelector selector; + + public SimdFloatSumVectorAggregator(VectorValueSelector selector) + { + super(selector); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final float[] vector = selector.getFloatVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + FloatVector vacc = FloatVector.zero(SPECIES); + for (; i < upperBound; i += laneCount) { + vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i)); + } + float sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + sum += vector[i]; + } + buf.putFloat(position, buf.getFloat(position) + sum); + } + + @Override + public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector) + { + final float[] vector = selector.getFloatVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + FloatVector vacc = FloatVector.zero(SPECIES); + int nonNullCount = 0; + for (; i < upperBound; i += laneCount) { + final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not(); + vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i), notNull); + nonNullCount += notNull.trueCount(); + } + float sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + if (!nullVector[i]) { + sum += vector[i]; + nonNullCount++; + } + } + if (nonNullCount > 0) { + buf.putFloat(position, buf.getFloat(position) + sum); + return true; + } + return false; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java new file mode 100644 index 000000000000..a16355618e46 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.simd; + +import jdk.incubator.vector.LongVector; +import jdk.incubator.vector.VectorMask; +import jdk.incubator.vector.VectorOperators; +import jdk.incubator.vector.VectorSpecies; +import org.apache.druid.query.aggregation.LongSumVectorAggregator; +import org.apache.druid.query.aggregation.NullAwareVectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import java.nio.ByteBuffer; + +/** + * SIMD specialization of {@link LongSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop + * issues a hardcoded {@link LongVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the + * platform's long-add and long-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}. + * + * The grouped scatter-gather variant is inherited from the parent (scalar) class. + */ +public final class SimdLongSumVectorAggregator extends LongSumVectorAggregator implements NullAwareVectorAggregator +{ + private static final VectorSpecies SPECIES = LongVector.SPECIES_PREFERRED; + + private final VectorValueSelector selector; + + public SimdLongSumVectorAggregator(VectorValueSelector selector) + { + super(selector); + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] vector = selector.getLongVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + LongVector vacc = LongVector.zero(SPECIES); + for (; i < upperBound; i += laneCount) { + vacc = vacc.add(LongVector.fromArray(SPECIES, vector, i)); + } + long sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + sum += vector[i]; + } + buf.putLong(position, buf.getLong(position) + sum); + } + + @Override + public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector) + { + final long[] vector = selector.getLongVector(); + + final int laneCount = SPECIES.length(); + final int upperBound = startRow + SPECIES.loopBound(endRow - startRow); + int i = startRow; + LongVector vacc = LongVector.zero(SPECIES); + int nonNullCount = 0; + for (; i < upperBound; i += laneCount) { + final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not(); + vacc = vacc.add(LongVector.fromArray(SPECIES, vector, i), notNull); + nonNullCount += notNull.trueCount(); + } + long sum = vacc.reduceLanes(VectorOperators.ADD); + for (; i < endRow; i++) { + if (!nullVector[i]) { + sum += vector[i]; + nonNullCount++; + } + } + if (nonNullCount > 0) { + buf.putLong(position, buf.getLong(position) + sum); + return true; + } + return false; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java new file mode 100644 index 000000000000..dce2caf94d07 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.simd; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.DoubleSumVectorAggregator; +import org.apache.druid.query.aggregation.FloatSumVectorAggregator; +import org.apache.druid.query.aggregation.LongSumVectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.function.IntPredicate; + +/** + * For each (sum type, vector size, null pattern) combination, drives the SIMD and scalar sum vector aggregators + * directly and asserts equivalent buffer state. Two paths covered: + * - the ungrouped no-null path (SIMD subclass's overridden {@code aggregate(buf, pos, start, end)} vs the scalar + * parent's implementation), and + * - the null-aware path on the SIMD class (the new {@code aggregate(buf, pos, start, end, nullVector)} declared by + * {@link org.apache.druid.query.aggregation.NullAwareVectorAggregator}), compared against a manually-computed + * reference sum. + */ +public class SimdSumVectorAggregatorTest extends InitializedNullHandlingTest +{ + private static final int[] VECTOR_SIZES = {1, 8, 17, 64, 1023}; + + @Test + public void testLongSum() + { + for (int size : VECTOR_SIZES) { + for (NullPattern pattern : NullPattern.values()) { + runLong(size, pattern); + } + } + } + + @Test + public void testDoubleSum() + { + for (int size : VECTOR_SIZES) { + for (NullPattern pattern : NullPattern.values()) { + runDouble(size, pattern); + } + } + } + + @Test + public void testFloatSum() + { + for (int size : VECTOR_SIZES) { + for (NullPattern pattern : NullPattern.values()) { + runFloat(size, pattern); + } + } + } + + private static void runLong(int size, NullPattern pattern) + { + final long[] values = randomLongs(size, 0); + final boolean[] nulls = pattern.toMask(size); + final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, values, null, null, nulls); + final String msg = StringUtils.format("type[long] size[%s] nulls[%s]", size, pattern); + + final LongSumVectorAggregator scalar = new LongSumVectorAggregator(selector); + final SimdLongSumVectorAggregator simd = new SimdLongSumVectorAggregator(selector); + + if (nulls == null) { + final ByteBuffer scalarBuf = ByteBuffer.allocate(Long.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(Long.BYTES); + scalar.init(scalarBuf, 0); + simd.init(simdBuf, 0); + scalar.aggregate(scalarBuf, 0, 0, size); + simd.aggregate(simdBuf, 0, 0, size); + Assert.assertEquals(msg, scalarBuf.getLong(0), simdBuf.getLong(0)); + } else { + long expected = 0; + boolean anyNonNull = false; + for (int i = 0; i < size; i++) { + if (!nulls[i]) { + expected += values[i]; + anyNonNull = true; + } + } + final ByteBuffer simdBuf = ByteBuffer.allocate(Long.BYTES); + simd.init(simdBuf, 0); + final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); + if (reported) { + Assert.assertEquals(msg, expected, simdBuf.getLong(0)); + } + } + } + + private static void runDouble(int size, NullPattern pattern) + { + final double[] values = randomDoubles(size, 1); + final boolean[] nulls = pattern.toMask(size); + final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, null, values, null, nulls); + final String msg = StringUtils.format("type[double] size[%s] nulls[%s]", size, pattern); + + final DoubleSumVectorAggregator scalar = new DoubleSumVectorAggregator(selector); + final SimdDoubleSumVectorAggregator simd = new SimdDoubleSumVectorAggregator(selector); + + if (nulls == null) { + final ByteBuffer scalarBuf = ByteBuffer.allocate(Double.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(Double.BYTES); + scalar.init(scalarBuf, 0); + simd.init(simdBuf, 0); + scalar.aggregate(scalarBuf, 0, 0, size); + simd.aggregate(simdBuf, 0, 0, size); + Assert.assertEquals( + msg, + scalarBuf.getDouble(0), + simdBuf.getDouble(0), + Math.max(Math.abs(scalarBuf.getDouble(0)) * 1e-12, 1e-12) + ); + } else { + double expected = 0; + boolean anyNonNull = false; + for (int i = 0; i < size; i++) { + if (!nulls[i]) { + expected += values[i]; + anyNonNull = true; + } + } + final ByteBuffer simdBuf = ByteBuffer.allocate(Double.BYTES); + simd.init(simdBuf, 0); + final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); + if (reported) { + Assert.assertEquals(msg, expected, simdBuf.getDouble(0), Math.max(Math.abs(expected) * 1e-12, 1e-12)); + } + } + } + + private static void runFloat(int size, NullPattern pattern) + { + final float[] values = randomFloats(size, 2); + final boolean[] nulls = pattern.toMask(size); + final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, null, null, values, nulls); + final String msg = StringUtils.format("type[float] size[%s] nulls[%s]", size, pattern); + + final FloatSumVectorAggregator scalar = new FloatSumVectorAggregator(selector); + final SimdFloatSumVectorAggregator simd = new SimdFloatSumVectorAggregator(selector); + + if (nulls == null) { + final ByteBuffer scalarBuf = ByteBuffer.allocate(Float.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(Float.BYTES); + scalar.init(scalarBuf, 0); + simd.init(simdBuf, 0); + scalar.aggregate(scalarBuf, 0, 0, size); + simd.aggregate(simdBuf, 0, 0, size); + Assert.assertEquals( + msg, + scalarBuf.getFloat(0), + simdBuf.getFloat(0), + Math.max(Math.abs(scalarBuf.getFloat(0)) * 1e-5f, 1e-5f) + ); + } else { + float expected = 0; + boolean anyNonNull = false; + for (int i = 0; i < size; i++) { + if (!nulls[i]) { + expected += values[i]; + anyNonNull = true; + } + } + final ByteBuffer simdBuf = ByteBuffer.allocate(Float.BYTES); + simd.init(simdBuf, 0); + final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); + if (reported) { + Assert.assertEquals(msg, expected, simdBuf.getFloat(0), Math.max(Math.abs(expected) * 1e-5f, 1e-5f)); + } + } + } + + private static long[] randomLongs(int size, int seed) + { + final Random r = new Random(0xC0FFEEL + seed); + final long[] out = new long[size]; + for (int i = 0; i < size; i++) { + out[i] = r.nextInt() & 0xFFFFFL; + } + return out; + } + + private static double[] randomDoubles(int size, int seed) + { + final Random r = new Random(0xC0FFEEL + seed); + final double[] out = new double[size]; + for (int i = 0; i < size; i++) { + out[i] = (r.nextDouble() - 0.5) * 1000.0; + } + return out; + } + + private static float[] randomFloats(int size, int seed) + { + final Random r = new Random(0xC0FFEEL + seed); + final float[] out = new float[size]; + for (int i = 0; i < size; i++) { + out[i] = (r.nextFloat() - 0.5f) * 1000.0f; + } + return out; + } + + private enum NullPattern + { + NONE(i -> false), + ALL(i -> true), + ALTERNATING(i -> (i & 1) == 0), + SPARSE(i -> i % 7 == 0), + FIRST_THREE(i -> i < 3), + CHUNK_BOUNDARY(i -> i == 7 || i == 8); + + private final IntPredicate predicate; + + NullPattern(IntPredicate predicate) + { + this.predicate = predicate; + } + + @Nullable + boolean[] toMask(int size) + { + if (this == NONE) { + return null; // models a column with no null vector at all + } + final boolean[] mask = new boolean[size]; + for (int i = 0; i < size; i++) { + mask[i] = predicate.test(i); + } + return mask; + } + } + + /** + * Minimal in-memory {@link VectorValueSelector} backed by pre-built primitive arrays for tests. Only the + * accessor for the type used by a given test is non-null. + */ + private static final class FakeVectorValueSelector implements VectorValueSelector + { + private final int size; + @Nullable + private final long[] longs; + @Nullable + private final double[] doubles; + @Nullable + private final float[] floats; + @Nullable + private final boolean[] nulls; + + FakeVectorValueSelector( + int size, + @Nullable long[] longs, + @Nullable double[] doubles, + @Nullable float[] floats, + @Nullable boolean[] nulls + ) + { + this.size = size; + this.longs = longs; + this.doubles = doubles; + this.floats = floats; + this.nulls = nulls; + } + + @Override + public long[] getLongVector() + { + return longs; + } + + @Override + public float[] getFloatVector() + { + return floats; + } + + @Override + public double[] getDoubleVector() + { + return doubles; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return nulls; + } + + @Override + public int getMaxVectorSize() + { + return size; + } + + @Override + public int getCurrentVectorSize() + { + return size; + } + } +} From cebc8a930373c319bd06b9c09d6d4585e936574b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 15 Jun 2026 00:51:40 -0700 Subject: [PATCH 2/4] review stuff --- docs/configuration/index.md | 12 ++ .../simd/SimdSumVectorAggregatorTest.java | 180 ++++++++++++------ .../query/groupby/GroupByQueryRunnerTest.java | 41 +++- .../GroupByTimeseriesQueryRunnerTest.java | 18 +- .../timeseries/TimeseriesQueryRunnerTest.java | 34 +++- 5 files changed, 213 insertions(+), 72 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index aad78964f199..1f008f6f05e2 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -640,6 +640,18 @@ the following properties. JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it. ::: +### Expression processing + +The properties below tune Druid's native expression engine, which evaluates virtual columns, expression-based filters, +the `expression` aggregator/post-aggregator, and any SQL functions that lower to native expressions. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.expressions.processArraysAsMultiValueStrings`|If true, all `ARRAY` typed values are converted to `STRING` by column selectors and treated as multi-value strings rather than native arrays. Provided for backwards compatibility with the pre-array behavior in Druid 24.0 and earlier.|false| +|`druid.expressions.homogenizeNullMultiValueStringArrays`|If true, multi-value string expression input values of `null`, `[]`, and `[null]` are all coerced to `[null]`. Provided for backwards compatibility with Druid 0.22 and earlier. If false (the default), this coercion only happens when single-value expressions are implicitly mapped across multi-value rows, so the single-valued expression is evaluated with an input of `null`.|false| +|`druid.expressions.allowVectorizeFallback`|If true, the vectorized query engine handles expressions without a native vectorized implementation using a fallback processor that invokes the scalar expression evaluator in a loop. If false, such expressions cannot be vectorized and the query falls back to the non-vectorized engine.|true| +|`druid.expressions.useVectorApi`|If true, vectorized expression vector processors and numeric vector aggregators dispatch to SIMD specializations backed by the JDK incubator Vector API (`jdk.incubator.vector`) where available. Requires `--add-modules=jdk.incubator.vector` on the JVM command line (see [strong encapsulation](../operations/java.md#strong-encapsulation)). Off by default while the Vector API remains an incubator JDK feature.|false| + ### Double column storage Prior to version 0.13.0, Druid's storage layer used a 32-bit float representation to store columns created by the diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java index dce2caf94d07..19e54b075a36 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java @@ -41,17 +41,26 @@ * - the null-aware path on the SIMD class (the new {@code aggregate(buf, pos, start, end, nullVector)} declared by * {@link org.apache.druid.query.aggregation.NullAwareVectorAggregator}), compared against a manually-computed * reference sum. + * + * Each scenario is exercised twice: once with {@code (position=0, startRow=0, endRow=size)} and once with + * {@code (position=1, startRow=1, endRow=size+1)} where the row at index 0 is a deliberately extreme "poison" + * value that would visibly skew the result if the aggregator incorrectly read past {@code startRow}, and the + * buffer slot starts at byte offset 1 so any indexing off the position parameter shows up. */ public class SimdSumVectorAggregatorTest extends InitializedNullHandlingTest { private static final int[] VECTOR_SIZES = {1, 8, 17, 64, 1023}; + private static final long POISON_LONG = Long.MAX_VALUE / 2; + private static final double POISON_DOUBLE = 1e15; + private static final float POISON_FLOAT = 1e10f; @Test public void testLongSum() { for (int size : VECTOR_SIZES) { for (NullPattern pattern : NullPattern.values()) { - runLong(size, pattern); + runLong(size, pattern, 0, 0); + runLong(size, pattern, 1, 1); } } } @@ -61,7 +70,8 @@ public void testDoubleSum() { for (int size : VECTOR_SIZES) { for (NullPattern pattern : NullPattern.values()) { - runDouble(size, pattern); + runDouble(size, pattern, 0, 0); + runDouble(size, pattern, 1, 1); } } } @@ -71,132 +81,186 @@ public void testFloatSum() { for (int size : VECTOR_SIZES) { for (NullPattern pattern : NullPattern.values()) { - runFloat(size, pattern); + runFloat(size, pattern, 0, 0); + runFloat(size, pattern, 1, 1); } } } - private static void runLong(int size, NullPattern pattern) + private static void runLong(int size, NullPattern pattern, int position, int startRow) { - final long[] values = randomLongs(size, 0); - final boolean[] nulls = pattern.toMask(size); - final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, values, null, null, nulls); - final String msg = StringUtils.format("type[long] size[%s] nulls[%s]", size, pattern); + final int arrLen = startRow + size; + final long[] values = new long[arrLen]; + for (int i = 0; i < startRow; i++) { + values[i] = POISON_LONG; + } + System.arraycopy(randomLongs(size, 0), 0, values, startRow, size); + + final boolean[] realNulls = pattern.toMask(size); + final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow); + + final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, values, null, null, nulls); + final int endRow = startRow + size; + final String msg = StringUtils.format( + "type[long] size[%s] nulls[%s] pos[%s] start[%s]", + size, pattern, position, startRow + ); final LongSumVectorAggregator scalar = new LongSumVectorAggregator(selector); final SimdLongSumVectorAggregator simd = new SimdLongSumVectorAggregator(selector); if (nulls == null) { - final ByteBuffer scalarBuf = ByteBuffer.allocate(Long.BYTES); - final ByteBuffer simdBuf = ByteBuffer.allocate(Long.BYTES); - scalar.init(scalarBuf, 0); - simd.init(simdBuf, 0); - scalar.aggregate(scalarBuf, 0, 0, size); - simd.aggregate(simdBuf, 0, 0, size); - Assert.assertEquals(msg, scalarBuf.getLong(0), simdBuf.getLong(0)); + final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Long.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Long.BYTES); + scalar.init(scalarBuf, position); + simd.init(simdBuf, position); + scalar.aggregate(scalarBuf, position, startRow, endRow); + simd.aggregate(simdBuf, position, startRow, endRow); + Assert.assertEquals(msg, scalarBuf.getLong(position), simdBuf.getLong(position)); } else { long expected = 0; boolean anyNonNull = false; - for (int i = 0; i < size; i++) { + for (int i = startRow; i < endRow; i++) { if (!nulls[i]) { expected += values[i]; anyNonNull = true; } } - final ByteBuffer simdBuf = ByteBuffer.allocate(Long.BYTES); - simd.init(simdBuf, 0); - final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Long.BYTES); + simd.init(simdBuf, position); + final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls); Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); if (reported) { - Assert.assertEquals(msg, expected, simdBuf.getLong(0)); + Assert.assertEquals(msg, expected, simdBuf.getLong(position)); } } } - private static void runDouble(int size, NullPattern pattern) + private static void runDouble(int size, NullPattern pattern, int position, int startRow) { - final double[] values = randomDoubles(size, 1); - final boolean[] nulls = pattern.toMask(size); - final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, null, values, null, nulls); - final String msg = StringUtils.format("type[double] size[%s] nulls[%s]", size, pattern); + final int arrLen = startRow + size; + final double[] values = new double[arrLen]; + for (int i = 0; i < startRow; i++) { + values[i] = POISON_DOUBLE; + } + System.arraycopy(randomDoubles(size, 1), 0, values, startRow, size); + + final boolean[] realNulls = pattern.toMask(size); + final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow); + + final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, null, values, null, nulls); + final int endRow = startRow + size; + final String msg = StringUtils.format( + "type[double] size[%s] nulls[%s] pos[%s] start[%s]", + size, pattern, position, startRow + ); final DoubleSumVectorAggregator scalar = new DoubleSumVectorAggregator(selector); final SimdDoubleSumVectorAggregator simd = new SimdDoubleSumVectorAggregator(selector); if (nulls == null) { - final ByteBuffer scalarBuf = ByteBuffer.allocate(Double.BYTES); - final ByteBuffer simdBuf = ByteBuffer.allocate(Double.BYTES); - scalar.init(scalarBuf, 0); - simd.init(simdBuf, 0); - scalar.aggregate(scalarBuf, 0, 0, size); - simd.aggregate(simdBuf, 0, 0, size); + final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Double.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Double.BYTES); + scalar.init(scalarBuf, position); + simd.init(simdBuf, position); + scalar.aggregate(scalarBuf, position, startRow, endRow); + simd.aggregate(simdBuf, position, startRow, endRow); Assert.assertEquals( msg, - scalarBuf.getDouble(0), - simdBuf.getDouble(0), - Math.max(Math.abs(scalarBuf.getDouble(0)) * 1e-12, 1e-12) + scalarBuf.getDouble(position), + simdBuf.getDouble(position), + Math.max(Math.abs(scalarBuf.getDouble(position)) * 1e-12, 1e-12) ); } else { double expected = 0; boolean anyNonNull = false; - for (int i = 0; i < size; i++) { + for (int i = startRow; i < endRow; i++) { if (!nulls[i]) { expected += values[i]; anyNonNull = true; } } - final ByteBuffer simdBuf = ByteBuffer.allocate(Double.BYTES); - simd.init(simdBuf, 0); - final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Double.BYTES); + simd.init(simdBuf, position); + final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls); Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); if (reported) { - Assert.assertEquals(msg, expected, simdBuf.getDouble(0), Math.max(Math.abs(expected) * 1e-12, 1e-12)); + Assert.assertEquals( + msg, + expected, + simdBuf.getDouble(position), + Math.max(Math.abs(expected) * 1e-12, 1e-12) + ); } } } - private static void runFloat(int size, NullPattern pattern) + private static void runFloat(int size, NullPattern pattern, int position, int startRow) { - final float[] values = randomFloats(size, 2); - final boolean[] nulls = pattern.toMask(size); - final FakeVectorValueSelector selector = new FakeVectorValueSelector(size, null, null, values, nulls); - final String msg = StringUtils.format("type[float] size[%s] nulls[%s]", size, pattern); + final int arrLen = startRow + size; + final float[] values = new float[arrLen]; + for (int i = 0; i < startRow; i++) { + values[i] = POISON_FLOAT; + } + System.arraycopy(randomFloats(size, 2), 0, values, startRow, size); + + final boolean[] realNulls = pattern.toMask(size); + final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow); + + final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, null, null, values, nulls); + final int endRow = startRow + size; + final String msg = StringUtils.format( + "type[float] size[%s] nulls[%s] pos[%s] start[%s]", + size, pattern, position, startRow + ); final FloatSumVectorAggregator scalar = new FloatSumVectorAggregator(selector); final SimdFloatSumVectorAggregator simd = new SimdFloatSumVectorAggregator(selector); if (nulls == null) { - final ByteBuffer scalarBuf = ByteBuffer.allocate(Float.BYTES); - final ByteBuffer simdBuf = ByteBuffer.allocate(Float.BYTES); - scalar.init(scalarBuf, 0); - simd.init(simdBuf, 0); - scalar.aggregate(scalarBuf, 0, 0, size); - simd.aggregate(simdBuf, 0, 0, size); + final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Float.BYTES); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Float.BYTES); + scalar.init(scalarBuf, position); + simd.init(simdBuf, position); + scalar.aggregate(scalarBuf, position, startRow, endRow); + simd.aggregate(simdBuf, position, startRow, endRow); Assert.assertEquals( msg, - scalarBuf.getFloat(0), - simdBuf.getFloat(0), - Math.max(Math.abs(scalarBuf.getFloat(0)) * 1e-5f, 1e-5f) + scalarBuf.getFloat(position), + simdBuf.getFloat(position), + Math.max(Math.abs(scalarBuf.getFloat(position)) * 1e-5f, 1e-5f) ); } else { float expected = 0; boolean anyNonNull = false; - for (int i = 0; i < size; i++) { + for (int i = startRow; i < endRow; i++) { if (!nulls[i]) { expected += values[i]; anyNonNull = true; } } - final ByteBuffer simdBuf = ByteBuffer.allocate(Float.BYTES); - simd.init(simdBuf, 0); - final boolean reported = simd.aggregate(simdBuf, 0, 0, size, nulls); + final ByteBuffer simdBuf = ByteBuffer.allocate(position + Float.BYTES); + simd.init(simdBuf, position); + final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls); Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported); if (reported) { - Assert.assertEquals(msg, expected, simdBuf.getFloat(0), Math.max(Math.abs(expected) * 1e-5f, 1e-5f)); + Assert.assertEquals( + msg, + expected, + simdBuf.getFloat(position), + Math.max(Math.abs(expected) * 1e-5f, 1e-5f) + ); } } } + private static boolean[] padNulls(boolean[] realNulls, int startRow) + { + final boolean[] padded = new boolean[startRow + realNulls.length]; + System.arraycopy(realNulls, 0, padded, startRow, realNulls.length); + return padded; + } + private static long[] randomLongs(int size, int seed) { final Random r = new Random(0xC0FFEEL + seed); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index b0fc2c51583a..cd1102a03300 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -49,6 +49,7 @@ import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.math.expr.ExpressionProcessing; import org.apache.druid.query.BySegmentResultValue; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.ChainedExecutionQueryRunner; @@ -144,9 +145,11 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -211,6 +214,7 @@ public int getNumThreads() private final GroupByQueryConfig config; private final boolean vectorize; private final GroupByStatsProvider statsProvider; + private final boolean useVectorApi; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -433,11 +437,20 @@ public static Collection constructorFeeder() final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config, BUFFER_POOLS, statsProvider); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners(factory, true)) { for (boolean vectorize : ImmutableList.of(false, true)) { - final String testName = StringUtils.format("config=%s, runner=%s, vectorize=%s", config, runner, vectorize); + for (boolean useVectorApi : ImmutableList.of(false, true)) { + if (!vectorize && useVectorApi) { + // SIMD path is reachable only when vectorization is on; skip the redundant combo. + continue; + } + final String testName = StringUtils.format( + "config=%s, runner=%s, vectorize=%s, useVectorApi=%s", + config, runner, vectorize, useVectorApi + ); - // Add vectorization tests for any indexes that support it. - if (!vectorize || (QueryRunnerTestHelper.isTestRunnerVectorizable(runner))) { - constructors.add(new Object[]{testName, config, factory, runner, vectorize, statsProvider}); + // Add vectorization tests for any indexes that support it. + if (!vectorize || (QueryRunnerTestHelper.isTestRunnerVectorizable(runner))) { + constructors.add(new Object[]{testName, config, factory, runner, vectorize, statsProvider, useVectorApi}); + } } } } @@ -468,7 +481,8 @@ public GroupByQueryRunnerTest( GroupByQueryRunnerFactory factory, TestQueryRunner runner, boolean vectorize, - GroupByStatsProvider statsProvider + GroupByStatsProvider statsProvider, + boolean useVectorApi ) { this.config = config; @@ -477,6 +491,23 @@ public GroupByQueryRunnerTest( this.originalRunner = runner; this.vectorize = vectorize; this.statsProvider = statsProvider; + this.useVectorApi = useVectorApi; + } + + @Before + public void initializeExpressionProcessing() + { + if (useVectorApi) { + ExpressionProcessing.initializeForVectorApiTests(); + } else { + ExpressionProcessing.initializeForTests(); + } + } + + @After + public void resetExpressionProcessing() + { + ExpressionProcessing.initializeForTests(); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 6b2a9cb51207..9b682ba87b95 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -90,7 +90,7 @@ public static void tearDownClass() } @SuppressWarnings("unchecked") - @Parameterized.Parameters(name = "{0}, vectorize = {1}") + @Parameterized.Parameters(name = "{0}, vectorize = {1}, useVectorApi = {2}") public static Iterable constructorFeeder() { setUpClass(); @@ -183,9 +183,15 @@ public String toString() }; for (boolean vectorize : ImmutableList.of(false, true)) { - // Add vectorization tests for any indexes that support it. - if (!vectorize || QueryRunnerTestHelper.isTestRunnerVectorizable(runner)) { - constructors.add(new Object[]{modifiedRunner, vectorize}); + for (boolean useVectorApi : ImmutableList.of(false, true)) { + if (!vectorize && useVectorApi) { + // SIMD path is reachable only when vectorization is on; skip the redundant combo. + continue; + } + // Add vectorization tests for any indexes that support it. + if (!vectorize || QueryRunnerTestHelper.isTestRunnerVectorizable(runner)) { + constructors.add(new Object[]{modifiedRunner, vectorize, useVectorApi}); + } } } } @@ -193,9 +199,9 @@ public String toString() return constructors; } - public GroupByTimeseriesQueryRunnerTest(QueryRunner runner, boolean vectorize) + public GroupByTimeseriesQueryRunnerTest(QueryRunner runner, boolean vectorize, boolean useVectorApi) { - super(runner, false, vectorize, QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS); + super(runner, false, vectorize, QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS, useVectorApi); } // GroupBy handles timestamps differently when granularity is ALL diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 52cc5b149668..27697e572210 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -80,7 +80,9 @@ import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -106,7 +108,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest @Rule public ExpectedException expectedException = ExpectedException.none(); - @Parameterized.Parameters(name = "{0}:descending={1},vectorize={2}") + @Parameterized.Parameters(name = "{0}:descending={1},vectorize={2},useVectorApi={4}") public static Iterable constructorFeeder() { final Iterable baseConstructors = QueryRunnerTestHelper.cartesian( @@ -124,7 +126,9 @@ public static Iterable constructorFeeder() // vectorize? Arrays.asList(false, true), // double vs. float - Arrays.asList(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS, QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS) + Arrays.asList(QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS, QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS), + // useVectorApi? (SIMD aggregators) + Arrays.asList(false, true) ); // Add vectorization tests for any indexes that support it. @@ -136,6 +140,11 @@ public static Iterable constructorFeeder() QueryRunnerTestHelper.isTestRunnerVectorizable((QueryRunner) constructor[0]) && !(boolean) constructor[1] /* descending */; final boolean vectorize = (boolean) constructor[2]; /* vectorize */ + final boolean useVectorApi = (boolean) constructor[4]; /* useVectorApi */ + if (!vectorize && useVectorApi) { + // SIMD path is reachable only when vectorization is on; skip the redundant combo. + return false; + } return !vectorize || canVectorize; } ) @@ -154,18 +163,37 @@ protected void assertExpectedResults(Iterable> expectedResults, It protected final boolean descending; protected final boolean vectorize; protected final List aggregatorFactoryList; + protected final boolean useVectorApi; public TimeseriesQueryRunnerTest( QueryRunner> runner, boolean descending, boolean vectorize, - List aggregatorFactoryList + List aggregatorFactoryList, + boolean useVectorApi ) { this.runner = runner; this.descending = descending; this.vectorize = vectorize; this.aggregatorFactoryList = aggregatorFactoryList; + this.useVectorApi = useVectorApi; + } + + @Before + public void initializeExpressionProcessing() + { + if (useVectorApi) { + ExpressionProcessing.initializeForVectorApiTests(); + } else { + ExpressionProcessing.initializeForTests(); + } + } + + @After + public void resetExpressionProcessing() + { + ExpressionProcessing.initializeForTests(); } @Test From caa3cceec73bf87213c3f76af5e2eb17eabc2cb7 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 15 Jun 2026 09:57:31 -0700 Subject: [PATCH 3/4] fix spelling --- docs/configuration/index.md | 2 +- website/.spelling | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1f008f6f05e2..22384c53887f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -647,7 +647,7 @@ the `expression` aggregator/post-aggregator, and any SQL functions that lower to |Property|Description|Default| |--------|-----------|-------| -|`druid.expressions.processArraysAsMultiValueStrings`|If true, all `ARRAY` typed values are converted to `STRING` by column selectors and treated as multi-value strings rather than native arrays. Provided for backwards compatibility with the pre-array behavior in Druid 24.0 and earlier.|false| +|`druid.expressions.processArraysAsMultiValueStrings`|If true, all `ARRAY` typed values are converted to `STRING` by column selectors and treated as multi-value strings rather than native arrays. Provided for backwards compatibility with the behavior of Druid 24.0 and earlier, before array types were introduced.|false| |`druid.expressions.homogenizeNullMultiValueStringArrays`|If true, multi-value string expression input values of `null`, `[]`, and `[null]` are all coerced to `[null]`. Provided for backwards compatibility with Druid 0.22 and earlier. If false (the default), this coercion only happens when single-value expressions are implicitly mapped across multi-value rows, so the single-valued expression is evaluated with an input of `null`.|false| |`druid.expressions.allowVectorizeFallback`|If true, the vectorized query engine handles expressions without a native vectorized implementation using a fallback processor that invokes the scalar expression evaluator in a loop. If false, such expressions cannot be vectorized and the query falls back to the non-vectorized engine.|true| |`druid.expressions.useVectorApi`|If true, vectorized expression vector processors and numeric vector aggregators dispatch to SIMD specializations backed by the JDK incubator Vector API (`jdk.incubator.vector`) where available. Requires `--add-modules=jdk.incubator.vector` on the JVM command line (see [strong encapsulation](../operations/java.md#strong-encapsulation)). Off by default while the Vector API remains an incubator JDK feature.|false| diff --git a/website/.spelling b/website/.spelling index c85463563e13..a46e41c95531 100644 --- a/website/.spelling +++ b/website/.spelling @@ -233,6 +233,7 @@ S3 SAS SDK SIGAR +SIMD SNI SPNEGO Splunk From 5b2e2ee87dda0d394e82173c82b861323a098614 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 18 Jun 2026 13:47:10 -0700 Subject: [PATCH 4/4] swap benchmark queries --- .../druid/benchmark/query/SqlExpressionBenchmark.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index cc5084a005fc..1c2b3c723f53 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -49,10 +49,10 @@ public class SqlExpressionBenchmark extends SqlBaseQueryBenchmark // =========================== // non-expression reference queries // =========================== - // 0: non-expression timeseries reference, 1 columns + // 0: non-expression timeseries reference, 1 column + "SELECT SUM(long1) FROM expressions", + // 1: non-expression timeseries reference, 1 column with nulls "SELECT SUM(long5) FROM expressions", - // 1: non-expression timeseries reference, 2 columns - "SELECT SUM(long1), SUM(long2) FROM expressions", // 2: non-expression timeseries reference, 3 columns "SELECT SUM(long1), SUM(long4), SUM(double1) FROM expressions", // 3: non-expression timeseries reference, 4 columns