diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 0ef6395a1fce..1c2b3c723f53 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -49,10 +49,10 @@ public class SqlExpressionBenchmark extends SqlBaseQueryBenchmark
// ===========================
// non-expression reference queries
// ===========================
- // 0: non-expression timeseries reference, 1 columns
+ // 0: non-expression timeseries reference, 1 column
"SELECT SUM(long1) FROM expressions",
- // 1: non-expression timeseries reference, 2 columns
- "SELECT SUM(long1), SUM(long2) FROM expressions",
+ // 1: non-expression timeseries reference, 1 column with nulls
+ "SELECT SUM(long5) FROM expressions",
// 2: non-expression timeseries reference, 3 columns
"SELECT SUM(long1), SUM(long4), SUM(double1) FROM expressions",
// 3: non-expression timeseries reference, 4 columns
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index aad78964f199..22384c53887f 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -640,6 +640,18 @@ the following properties.
JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it.
:::
+### Expression processing
+
+The properties below tune Druid's native expression engine, which evaluates virtual columns, expression-based filters,
+the `expression` aggregator/post-aggregator, and any SQL functions that lower to native expressions.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.expressions.processArraysAsMultiValueStrings`|If true, all `ARRAY` typed values are converted to `STRING` by column selectors and treated as multi-value strings rather than native arrays. Provided for backwards compatibility with the behavior of Druid 24.0 and earlier, before array types were introduced.|false|
+|`druid.expressions.homogenizeNullMultiValueStringArrays`|If true, multi-value string expression input values of `null`, `[]`, and `[null]` are all coerced to `[null]`. Provided for backwards compatibility with Druid 0.22 and earlier. If false (the default), this coercion only happens when single-value expressions are implicitly mapped across multi-value rows, so the single-valued expression is evaluated with an input of `null`.|false|
+|`druid.expressions.allowVectorizeFallback`|If true, the vectorized query engine handles expressions without a native vectorized implementation using a fallback processor that invokes the scalar expression evaluator in a loop. If false, such expressions cannot be vectorized and the query falls back to the non-vectorized engine.|true|
+|`druid.expressions.useVectorApi`|If true, vectorized expression vector processors and numeric vector aggregators dispatch to SIMD specializations backed by the JDK incubator Vector API (`jdk.incubator.vector`) where available. Requires `--add-modules=jdk.incubator.vector` on the JVM command line (see [strong encapsulation](../operations/java.md#strong-encapsulation)). Off by default while the Vector API remains an incubator JDK feature.|false|
+
### Double column storage
Prior to version 0.13.0, Druid's storage layer used a 32-bit float representation to store columns created by the
diff --git a/pom.xml b/pom.xml
index bab799d96ad1..e0fa42262484 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1788,6 +1788,7 @@
**/*_generated*.class**/math/expr/vector/simd/Simd*.class
+ **/query/aggregation/simd/Simd*.class**.SuppressForbidden
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
index 6a433b7bf8fb..dd67990fb371 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
@@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.aggregation.simd.SimdDoubleSumVectorAggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
@@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
+ if (ExpressionProcessing.useVectorApi()) {
+ return new SimdDoubleSumVectorAggregator(selector);
+ }
return new DoubleSumVectorAggregator(selector);
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
index 397b55abed78..bbb2ec52f935 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
@@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.aggregation.simd.SimdFloatSumVectorAggregator;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
@@ -81,6 +83,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
+ if (ExpressionProcessing.useVectorApi()) {
+ return new SimdFloatSumVectorAggregator(selector);
+ }
return new FloatSumVectorAggregator(selector);
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
index 8e70fac2536d..a070fb30d401 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
@@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.aggregation.simd.SimdLongSumVectorAggregator;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
@@ -106,6 +108,9 @@ protected VectorAggregator factorizeVector(
VectorValueSelector selector
)
{
+ if (ExpressionProcessing.useVectorApi()) {
+ return new SimdLongSumVectorAggregator(selector);
+ }
return new LongSumVectorAggregator(selector);
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java
new file mode 100644
index 000000000000..de1852a5c6d4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareVectorAggregator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Capability marker for {@link VectorAggregator}s that can efficiently aggregate a contiguous row range while
+ * skipping null inputs themselves (such as via SIMD masking) rather than relying on the
+ * {@link NullableNumericVectorAggregator} wrapper to filter rows into the per-row scatter-gather
+ * {@link VectorAggregator#aggregate(ByteBuffer, int, int[], int[], int) aggregate} variant.
+ *
+ * When the wrapper sees the input batch has nulls (i.e. {@code selector.getNullVector() != null}) and the
+ * delegate is an instance of this interface, it routes the call here instead of falling back to the
+ * scatter-gather path.
+ */
+public interface NullAwareVectorAggregator extends VectorAggregator
+{
+ /**
+ * Aggregate rows {@code [startRow, endRow)} into the slot at {@code position}, skipping rows where
+ * {@code nullVector[i] == true}. Implementations should use {@link jdk.incubator.vector.VectorMask} (or
+ * equivalent) to avoid per-row branching.
+ *
+ * @return {@code true} if at least one non-null row was aggregated; {@code false} if every row in the range
+ * was null. {@link NullableNumericVectorAggregator} uses this to set or leave the null-marker byte
+ * that precedes the delegate's state.
+ */
+ boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector);
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java
index 9a91176e2ab7..f056dfed3c84 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java
@@ -76,7 +76,16 @@ public void init(ByteBuffer buf, int position)
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final boolean[] nullVector = selector.getNullVector();
- if (nullVector != null) {
+ if (nullVector == null) {
+ doAggregate(buf, position, startRow, endRow);
+ } else if (delegate instanceof NullAwareVectorAggregator nullAware) {
+ // Delegate handles null inputs itself (typically via SIMD masking); set the null marker only if it
+ // reports at least one non-null row contributed.
+ if (nullAware.aggregate(buf, position + Byte.BYTES, startRow, endRow, nullVector)) {
+ buf.put(position, TypeStrategies.IS_NOT_NULL_BYTE);
+ }
+ } else {
+ // Fallback: filter non-null rows and route through the scatter-gather variant with a uniform position.
// Deferred initialization, since vAggregationPositions and vAggregationRows are only needed if nulls
// actually occur.
if (vAggregationPositions == null) {
@@ -94,8 +103,6 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
Arrays.fill(vAggregationPositions, 0, j, position);
doAggregate(buf, j, vAggregationPositions, vAggregationRows, 0);
- } else {
- doAggregate(buf, position, startRow, endRow);
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java
new file mode 100644
index 000000000000..ab4a2afb0ccb
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdDoubleSumVectorAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.simd;
+
+import jdk.incubator.vector.DoubleVector;
+import jdk.incubator.vector.VectorMask;
+import jdk.incubator.vector.VectorOperators;
+import jdk.incubator.vector.VectorSpecies;
+import org.apache.druid.query.aggregation.DoubleSumVectorAggregator;
+import org.apache.druid.query.aggregation.NullAwareVectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * SIMD specialization of {@link DoubleSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop
+ * issues a hardcoded {@link DoubleVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the
+ * platform's double-add and double-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}.
+ *
+ * The grouped scatter-gather variant is inherited from the parent (scalar) class.
+ */
+public final class SimdDoubleSumVectorAggregator extends DoubleSumVectorAggregator implements NullAwareVectorAggregator
+{
+ private static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED;
+
+ private final VectorValueSelector selector;
+
+ public SimdDoubleSumVectorAggregator(VectorValueSelector selector)
+ {
+ super(selector);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final double[] vector = selector.getDoubleVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ DoubleVector vacc = DoubleVector.zero(SPECIES);
+ for (; i < upperBound; i += laneCount) {
+ vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i));
+ }
+ double sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ sum += vector[i];
+ }
+ buf.putDouble(position, buf.getDouble(position) + sum);
+ }
+
+ @Override
+ public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector)
+ {
+ final double[] vector = selector.getDoubleVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ DoubleVector vacc = DoubleVector.zero(SPECIES);
+ int nonNullCount = 0;
+ for (; i < upperBound; i += laneCount) {
+ final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not();
+ vacc = vacc.add(DoubleVector.fromArray(SPECIES, vector, i), notNull);
+ nonNullCount += notNull.trueCount();
+ }
+ double sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ if (!nullVector[i]) {
+ sum += vector[i];
+ nonNullCount++;
+ }
+ }
+ if (nonNullCount > 0) {
+ buf.putDouble(position, buf.getDouble(position) + sum);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java
new file mode 100644
index 000000000000..c9b843ca3ed3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdFloatSumVectorAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.simd;
+
+import jdk.incubator.vector.FloatVector;
+import jdk.incubator.vector.VectorMask;
+import jdk.incubator.vector.VectorOperators;
+import jdk.incubator.vector.VectorSpecies;
+import org.apache.druid.query.aggregation.FloatSumVectorAggregator;
+import org.apache.druid.query.aggregation.NullAwareVectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * SIMD specialization of {@link FloatSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop
+ * issues a hardcoded {@link FloatVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the
+ * platform's float-add and float-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}.
+ *
+ * The grouped scatter-gather variant is inherited from the parent (scalar) class.
+ */
+public final class SimdFloatSumVectorAggregator extends FloatSumVectorAggregator implements NullAwareVectorAggregator
+{
+ private static final VectorSpecies SPECIES = FloatVector.SPECIES_PREFERRED;
+
+ private final VectorValueSelector selector;
+
+ public SimdFloatSumVectorAggregator(VectorValueSelector selector)
+ {
+ super(selector);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final float[] vector = selector.getFloatVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ FloatVector vacc = FloatVector.zero(SPECIES);
+ for (; i < upperBound; i += laneCount) {
+ vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i));
+ }
+ float sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ sum += vector[i];
+ }
+ buf.putFloat(position, buf.getFloat(position) + sum);
+ }
+
+ @Override
+ public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector)
+ {
+ final float[] vector = selector.getFloatVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ FloatVector vacc = FloatVector.zero(SPECIES);
+ int nonNullCount = 0;
+ for (; i < upperBound; i += laneCount) {
+ final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not();
+ vacc = vacc.add(FloatVector.fromArray(SPECIES, vector, i), notNull);
+ nonNullCount += notNull.trueCount();
+ }
+ float sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ if (!nullVector[i]) {
+ sum += vector[i];
+ nonNullCount++;
+ }
+ }
+ if (nonNullCount > 0) {
+ buf.putFloat(position, buf.getFloat(position) + sum);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java
new file mode 100644
index 000000000000..a16355618e46
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/simd/SimdLongSumVectorAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.simd;
+
+import jdk.incubator.vector.LongVector;
+import jdk.incubator.vector.VectorMask;
+import jdk.incubator.vector.VectorOperators;
+import jdk.incubator.vector.VectorSpecies;
+import org.apache.druid.query.aggregation.LongSumVectorAggregator;
+import org.apache.druid.query.aggregation.NullAwareVectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.nio.ByteBuffer;
+
+/**
+ * SIMD specialization of {@link LongSumVectorAggregator}'s ungrouped contiguous-range aggregation. The hot loop
+ * issues a hardcoded {@link LongVector#add} and a {@code reduceLanes(VectorOperators.ADD)} so the JIT emits the
+ * platform's long-add and long-add-reduce intrinsics. Null lanes are skipped via {@link VectorMask}.
+ *
+ * The grouped scatter-gather variant is inherited from the parent (scalar) class.
+ */
+public final class SimdLongSumVectorAggregator extends LongSumVectorAggregator implements NullAwareVectorAggregator
+{
+ private static final VectorSpecies SPECIES = LongVector.SPECIES_PREFERRED;
+
+ private final VectorValueSelector selector;
+
+ public SimdLongSumVectorAggregator(VectorValueSelector selector)
+ {
+ super(selector);
+ this.selector = selector;
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+ {
+ final long[] vector = selector.getLongVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ LongVector vacc = LongVector.zero(SPECIES);
+ for (; i < upperBound; i += laneCount) {
+ vacc = vacc.add(LongVector.fromArray(SPECIES, vector, i));
+ }
+ long sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ sum += vector[i];
+ }
+ buf.putLong(position, buf.getLong(position) + sum);
+ }
+
+ @Override
+ public boolean aggregate(ByteBuffer buf, int position, int startRow, int endRow, boolean[] nullVector)
+ {
+ final long[] vector = selector.getLongVector();
+
+ final int laneCount = SPECIES.length();
+ final int upperBound = startRow + SPECIES.loopBound(endRow - startRow);
+ int i = startRow;
+ LongVector vacc = LongVector.zero(SPECIES);
+ int nonNullCount = 0;
+ for (; i < upperBound; i += laneCount) {
+ final VectorMask notNull = VectorMask.fromArray(SPECIES, nullVector, i).not();
+ vacc = vacc.add(LongVector.fromArray(SPECIES, vector, i), notNull);
+ nonNullCount += notNull.trueCount();
+ }
+ long sum = vacc.reduceLanes(VectorOperators.ADD);
+ for (; i < endRow; i++) {
+ if (!nullVector[i]) {
+ sum += vector[i];
+ nonNullCount++;
+ }
+ }
+ if (nonNullCount > 0) {
+ buf.putLong(position, buf.getLong(position) + sum);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java
new file mode 100644
index 000000000000..19e54b075a36
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/simd/SimdSumVectorAggregatorTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.simd;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.DoubleSumVectorAggregator;
+import org.apache.druid.query.aggregation.FloatSumVectorAggregator;
+import org.apache.druid.query.aggregation.LongSumVectorAggregator;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.function.IntPredicate;
+
+/**
+ * For each (sum type, vector size, null pattern) combination, drives the SIMD and scalar sum vector aggregators
+ * directly and asserts equivalent buffer state. Two paths covered:
+ * - the ungrouped no-null path (SIMD subclass's overridden {@code aggregate(buf, pos, start, end)} vs the scalar
+ * parent's implementation), and
+ * - the null-aware path on the SIMD class (the new {@code aggregate(buf, pos, start, end, nullVector)} declared by
+ * {@link org.apache.druid.query.aggregation.NullAwareVectorAggregator}), compared against a manually-computed
+ * reference sum.
+ *
+ * Each scenario is exercised twice: once with {@code (position=0, startRow=0, endRow=size)} and once with
+ * {@code (position=1, startRow=1, endRow=size+1)} where the row at index 0 is a deliberately extreme "poison"
+ * value that would visibly skew the result if the aggregator incorrectly read past {@code startRow}, and the
+ * buffer slot starts at byte offset 1 so any indexing off the position parameter shows up.
+ */
+public class SimdSumVectorAggregatorTest extends InitializedNullHandlingTest
+{
+ private static final int[] VECTOR_SIZES = {1, 8, 17, 64, 1023};
+ private static final long POISON_LONG = Long.MAX_VALUE / 2;
+ private static final double POISON_DOUBLE = 1e15;
+ private static final float POISON_FLOAT = 1e10f;
+
+ @Test
+ public void testLongSum()
+ {
+ for (int size : VECTOR_SIZES) {
+ for (NullPattern pattern : NullPattern.values()) {
+ runLong(size, pattern, 0, 0);
+ runLong(size, pattern, 1, 1);
+ }
+ }
+ }
+
+ @Test
+ public void testDoubleSum()
+ {
+ for (int size : VECTOR_SIZES) {
+ for (NullPattern pattern : NullPattern.values()) {
+ runDouble(size, pattern, 0, 0);
+ runDouble(size, pattern, 1, 1);
+ }
+ }
+ }
+
+ @Test
+ public void testFloatSum()
+ {
+ for (int size : VECTOR_SIZES) {
+ for (NullPattern pattern : NullPattern.values()) {
+ runFloat(size, pattern, 0, 0);
+ runFloat(size, pattern, 1, 1);
+ }
+ }
+ }
+
+ private static void runLong(int size, NullPattern pattern, int position, int startRow)
+ {
+ final int arrLen = startRow + size;
+ final long[] values = new long[arrLen];
+ for (int i = 0; i < startRow; i++) {
+ values[i] = POISON_LONG;
+ }
+ System.arraycopy(randomLongs(size, 0), 0, values, startRow, size);
+
+ final boolean[] realNulls = pattern.toMask(size);
+ final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow);
+
+ final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, values, null, null, nulls);
+ final int endRow = startRow + size;
+ final String msg = StringUtils.format(
+ "type[long] size[%s] nulls[%s] pos[%s] start[%s]",
+ size, pattern, position, startRow
+ );
+
+ final LongSumVectorAggregator scalar = new LongSumVectorAggregator(selector);
+ final SimdLongSumVectorAggregator simd = new SimdLongSumVectorAggregator(selector);
+
+ if (nulls == null) {
+ final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Long.BYTES);
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Long.BYTES);
+ scalar.init(scalarBuf, position);
+ simd.init(simdBuf, position);
+ scalar.aggregate(scalarBuf, position, startRow, endRow);
+ simd.aggregate(simdBuf, position, startRow, endRow);
+ Assert.assertEquals(msg, scalarBuf.getLong(position), simdBuf.getLong(position));
+ } else {
+ long expected = 0;
+ boolean anyNonNull = false;
+ for (int i = startRow; i < endRow; i++) {
+ if (!nulls[i]) {
+ expected += values[i];
+ anyNonNull = true;
+ }
+ }
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Long.BYTES);
+ simd.init(simdBuf, position);
+ final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls);
+ Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported);
+ if (reported) {
+ Assert.assertEquals(msg, expected, simdBuf.getLong(position));
+ }
+ }
+ }
+
+ private static void runDouble(int size, NullPattern pattern, int position, int startRow)
+ {
+ final int arrLen = startRow + size;
+ final double[] values = new double[arrLen];
+ for (int i = 0; i < startRow; i++) {
+ values[i] = POISON_DOUBLE;
+ }
+ System.arraycopy(randomDoubles(size, 1), 0, values, startRow, size);
+
+ final boolean[] realNulls = pattern.toMask(size);
+ final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow);
+
+ final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, null, values, null, nulls);
+ final int endRow = startRow + size;
+ final String msg = StringUtils.format(
+ "type[double] size[%s] nulls[%s] pos[%s] start[%s]",
+ size, pattern, position, startRow
+ );
+
+ final DoubleSumVectorAggregator scalar = new DoubleSumVectorAggregator(selector);
+ final SimdDoubleSumVectorAggregator simd = new SimdDoubleSumVectorAggregator(selector);
+
+ if (nulls == null) {
+ final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Double.BYTES);
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Double.BYTES);
+ scalar.init(scalarBuf, position);
+ simd.init(simdBuf, position);
+ scalar.aggregate(scalarBuf, position, startRow, endRow);
+ simd.aggregate(simdBuf, position, startRow, endRow);
+ Assert.assertEquals(
+ msg,
+ scalarBuf.getDouble(position),
+ simdBuf.getDouble(position),
+ Math.max(Math.abs(scalarBuf.getDouble(position)) * 1e-12, 1e-12)
+ );
+ } else {
+ double expected = 0;
+ boolean anyNonNull = false;
+ for (int i = startRow; i < endRow; i++) {
+ if (!nulls[i]) {
+ expected += values[i];
+ anyNonNull = true;
+ }
+ }
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Double.BYTES);
+ simd.init(simdBuf, position);
+ final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls);
+ Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported);
+ if (reported) {
+ Assert.assertEquals(
+ msg,
+ expected,
+ simdBuf.getDouble(position),
+ Math.max(Math.abs(expected) * 1e-12, 1e-12)
+ );
+ }
+ }
+ }
+
+ private static void runFloat(int size, NullPattern pattern, int position, int startRow)
+ {
+ final int arrLen = startRow + size;
+ final float[] values = new float[arrLen];
+ for (int i = 0; i < startRow; i++) {
+ values[i] = POISON_FLOAT;
+ }
+ System.arraycopy(randomFloats(size, 2), 0, values, startRow, size);
+
+ final boolean[] realNulls = pattern.toMask(size);
+ final boolean[] nulls = realNulls == null ? null : padNulls(realNulls, startRow);
+
+ final FakeVectorValueSelector selector = new FakeVectorValueSelector(arrLen, null, null, values, nulls);
+ final int endRow = startRow + size;
+ final String msg = StringUtils.format(
+ "type[float] size[%s] nulls[%s] pos[%s] start[%s]",
+ size, pattern, position, startRow
+ );
+
+ final FloatSumVectorAggregator scalar = new FloatSumVectorAggregator(selector);
+ final SimdFloatSumVectorAggregator simd = new SimdFloatSumVectorAggregator(selector);
+
+ if (nulls == null) {
+ final ByteBuffer scalarBuf = ByteBuffer.allocate(position + Float.BYTES);
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Float.BYTES);
+ scalar.init(scalarBuf, position);
+ simd.init(simdBuf, position);
+ scalar.aggregate(scalarBuf, position, startRow, endRow);
+ simd.aggregate(simdBuf, position, startRow, endRow);
+ Assert.assertEquals(
+ msg,
+ scalarBuf.getFloat(position),
+ simdBuf.getFloat(position),
+ Math.max(Math.abs(scalarBuf.getFloat(position)) * 1e-5f, 1e-5f)
+ );
+ } else {
+ float expected = 0;
+ boolean anyNonNull = false;
+ for (int i = startRow; i < endRow; i++) {
+ if (!nulls[i]) {
+ expected += values[i];
+ anyNonNull = true;
+ }
+ }
+ final ByteBuffer simdBuf = ByteBuffer.allocate(position + Float.BYTES);
+ simd.init(simdBuf, position);
+ final boolean reported = simd.aggregate(simdBuf, position, startRow, endRow, nulls);
+ Assert.assertEquals(msg + " (anyNonNull)", anyNonNull, reported);
+ if (reported) {
+ Assert.assertEquals(
+ msg,
+ expected,
+ simdBuf.getFloat(position),
+ Math.max(Math.abs(expected) * 1e-5f, 1e-5f)
+ );
+ }
+ }
+ }
+
+ private static boolean[] padNulls(boolean[] realNulls, int startRow)
+ {
+ final boolean[] padded = new boolean[startRow + realNulls.length];
+ System.arraycopy(realNulls, 0, padded, startRow, realNulls.length);
+ return padded;
+ }
+
+ private static long[] randomLongs(int size, int seed)
+ {
+ final Random r = new Random(0xC0FFEEL + seed);
+ final long[] out = new long[size];
+ for (int i = 0; i < size; i++) {
+ out[i] = r.nextInt() & 0xFFFFFL;
+ }
+ return out;
+ }
+
+ private static double[] randomDoubles(int size, int seed)
+ {
+ final Random r = new Random(0xC0FFEEL + seed);
+ final double[] out = new double[size];
+ for (int i = 0; i < size; i++) {
+ out[i] = (r.nextDouble() - 0.5) * 1000.0;
+ }
+ return out;
+ }
+
+ private static float[] randomFloats(int size, int seed)
+ {
+ final Random r = new Random(0xC0FFEEL + seed);
+ final float[] out = new float[size];
+ for (int i = 0; i < size; i++) {
+ out[i] = (r.nextFloat() - 0.5f) * 1000.0f;
+ }
+ return out;
+ }
+
+ private enum NullPattern
+ {
+ NONE(i -> false),
+ ALL(i -> true),
+ ALTERNATING(i -> (i & 1) == 0),
+ SPARSE(i -> i % 7 == 0),
+ FIRST_THREE(i -> i < 3),
+ CHUNK_BOUNDARY(i -> i == 7 || i == 8);
+
+ private final IntPredicate predicate;
+
+ NullPattern(IntPredicate predicate)
+ {
+ this.predicate = predicate;
+ }
+
+ @Nullable
+ boolean[] toMask(int size)
+ {
+ if (this == NONE) {
+ return null; // models a column with no null vector at all
+ }
+ final boolean[] mask = new boolean[size];
+ for (int i = 0; i < size; i++) {
+ mask[i] = predicate.test(i);
+ }
+ return mask;
+ }
+ }
+
+ /**
+ * Minimal in-memory {@link VectorValueSelector} backed by pre-built primitive arrays for tests. Only the
+ * accessor for the type used by a given test is non-null.
+ */
+ private static final class FakeVectorValueSelector implements VectorValueSelector
+ {
+ private final int size;
+ @Nullable
+ private final long[] longs;
+ @Nullable
+ private final double[] doubles;
+ @Nullable
+ private final float[] floats;
+ @Nullable
+ private final boolean[] nulls;
+
+ FakeVectorValueSelector(
+ int size,
+ @Nullable long[] longs,
+ @Nullable double[] doubles,
+ @Nullable float[] floats,
+ @Nullable boolean[] nulls
+ )
+ {
+ this.size = size;
+ this.longs = longs;
+ this.doubles = doubles;
+ this.floats = floats;
+ this.nulls = nulls;
+ }
+
+ @Override
+ public long[] getLongVector()
+ {
+ return longs;
+ }
+
+ @Override
+ public float[] getFloatVector()
+ {
+ return floats;
+ }
+
+ @Override
+ public double[] getDoubleVector()
+ {
+ return doubles;
+ }
+
+ @Nullable
+ @Override
+ public boolean[] getNullVector()
+ {
+ return nulls;
+ }
+
+ @Override
+ public int getMaxVectorSize()
+ {
+ return size;
+ }
+
+ @Override
+ public int getCurrentVectorSize()
+ {
+ return size;
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index b0fc2c51583a..cd1102a03300 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -49,6 +49,7 @@
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.ChainedExecutionQueryRunner;
@@ -144,9 +145,11 @@
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
@@ -211,6 +214,7 @@ public int getNumThreads()
private final GroupByQueryConfig config;
private final boolean vectorize;
private final GroupByStatsProvider statsProvider;
+ private final boolean useVectorApi;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -433,11 +437,20 @@ public static Collection