From 823018b2fc6497832048af0c1d4924e53a2514b5 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 00:12:05 +0530 Subject: [PATCH 01/26] deps: add arrow 15.0.2 and iceberg-arrow to pom dependency management --- .../druid-iceberg-extensions/pom.xml | 15 +++++++++++++++ pom.xml | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index a50b2d701acc..fa958e26350c 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -760,6 +760,21 @@ provided + + org.apache.iceberg + iceberg-arrow + ${iceberg.core.version} + + + org.apache.arrow + arrow-vector + + + org.apache.arrow + arrow-memory-netty + runtime + + org.apache.iceberg iceberg-parquet diff --git a/pom.xml b/pom.xml index aabb17d07f08..67c08db1bd06 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 6.0.0 2.2 1.10.0 + 15.0.2 12.1.8 1.19.4 2.20.2 @@ -427,6 +428,23 @@ 2.5.2 + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + org.apache.iceberg + iceberg-arrow + ${iceberg.core.version} + + From a873c4d76ddf426e230ac2aa1b382b83f8dae845 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 00:12:43 +0530 Subject: [PATCH 02/26] feat: add retrieveTable() to IcebergCatalog for direct Table object access --- .../druid/iceberg/input/IcebergCatalog.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 2c8de41bb386..f57480adf15e 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -59,6 +60,39 @@ public boolean isCaseSensitive() return true; } + /** + * Load and return the Iceberg Table object for direct use by readers that go beyond file-path delegation. + */ + public Table retrieveTable(String tableNamespace, String tableName) + { + final Catalog catalog = retrieveCatalog(); + final Namespace namespace = Namespace.of(tableNamespace); + final String tableIdentifier = tableNamespace + "." + tableName; + + final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() + .filter(id -> id.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + "Couldn't retrieve table identifier for '%s'." + + " Please verify that the table exists in the given catalog", + tableIdentifier + )); + return catalog.loadTable(icebergTableIdentifier); + } + catch (IAE e) { + throw e; + } + catch (Exception e) { + throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxClassloader); + } + } + /** * Extract the iceberg data files upto the latest snapshot associated with the table * From c420836cf6aa51e66e787db8eb6fb6a12fa7b6fc Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 00:16:28 +0530 Subject: [PATCH 03/26] feat: add IcebergArrowInputSourceReader using iceberg-arrow vectorized API --- .../input/IcebergArrowInputSourceReader.java | 384 ++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java new file mode 100644 index 000000000000..04e96c04bb8e --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.arrow.vectorized.ArrowReader; +import org.apache.iceberg.arrow.vectorized.ColumnarBatch; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.TableScanUtil; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; + +/** + * Reads an Iceberg table via iceberg-arrow's {@link ArrowReader}, yielding {@link InputRow} objects. + * + * Delete application (V2 equality and positional deletes), type coercion, and schema evolution are + * handled entirely by the Iceberg library. Druid only consumes the resulting {@link ColumnarBatch} + * batches and maps them to {@link MapBasedInputRow}. + * + * Column projection and predicate push-down are applied at scan planning time so only requested + * columns and matching files are read from storage. + * + * Note: iceberg-arrow currently supports Parquet data files only. ORC and Avro files will throw + * {@link UnsupportedOperationException} at read time; use the standard delegate path for those. + */ +public class IcebergArrowInputSourceReader implements InputSourceReader +{ + static final int DEFAULT_BATCH_SIZE = 1024; + + private final Table table; + @Nullable + private final IcebergFilter icebergFilter; + @Nullable + private final DateTime snapshotTime; + private final boolean caseSensitive; + private final InputRowSchema schema; + private final int batchSize; + + public IcebergArrowInputSourceReader( + final Table table, + @Nullable final IcebergFilter icebergFilter, + @Nullable final DateTime snapshotTime, + final boolean caseSensitive, + final InputRowSchema schema, + final int batchSize + ) + { + this.table = table; + this.icebergFilter = icebergFilter; + this.snapshotTime = snapshotTime; + this.caseSensitive = caseSensitive; + this.schema = schema; + this.batchSize = batchSize; + } + + @Override + public CloseableIterator read(final InputStats inputStats) throws IOException + { + final TableScan scan = buildScan(); + final CloseableIterable tasks = TableScanUtil.planTasks( + scan.planFiles(), + scan.targetSplitSize(), + scan.splitLookback(), + scan.splitOpenFileCost() + ); + final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true); + final org.apache.iceberg.io.CloseableIterator batchIter = arrowReader.open(tasks); + return new ArrowInputRowIterator(batchIter, arrowReader, tasks, inputStats); + } + + @Override + public CloseableIterator sample() throws IOException + { + final CloseableIterator rows = read(new NoopInputStats()); + return new CloseableIterator() + { + @Override + public boolean hasNext() + { + return rows.hasNext(); + } + + @Override + public InputRowListPlusRawValues next() + { + final InputRow row = rows.next(); + return InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()); + } + + @Override + public void close() throws IOException + { + rows.close(); + } + }; + } + + private TableScan buildScan() + { + TableScan scan = table.newScan().caseSensitive(caseSensitive); + + // Push column projection into the scan planner — only requested columns are read from disk. + final List configuredDims = schema.getDimensionsSpec().getDimensionNames(); + final String timestampColumn = schema.getTimestampSpec().getTimestampColumn(); + if (!configuredDims.isEmpty()) { + final List selectCols = new ArrayList<>(configuredDims); + if (timestampColumn != null && !selectCols.contains(timestampColumn)) { + selectCols.add(timestampColumn); + } + scan = scan.select(selectCols); + } + + // Push predicate into scan planner — reduces files opened and rows read. + if (icebergFilter != null) { + scan = icebergFilter.filter(scan); + } + + // Snapshot pinning for time-travel reads. + if (snapshotTime != null) { + scan = scan.asOfTime(snapshotTime.getMillis()); + } + + return scan; + } + + private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx) + { + final int numCols = batch.numCols(); + final Map event = new HashMap<>(numCols); + for (int col = 0; col < numCols; col++) { + final FieldVector vec = batch.column(col).getFieldVector(); + if (!vec.isNull(rowIdx)) { + event.put(vec.getName(), extractValue(vec, rowIdx)); + } + } + final long timestamp = schema.getTimestampSpec().extractTimestamp(event); + final List dimensions = resolveDimensions(batch); + return new MapBasedInputRow(timestamp, dimensions, event); + } + + private List resolveDimensions(final ColumnarBatch batch) + { + final List configured = schema.getDimensionsSpec().getDimensionNames(); + if (!configured.isEmpty()) { + return configured; + } + // Derive dimensions from the batch schema, excluding the timestamp column. + final String tsCol = schema.getTimestampSpec().getTimestampColumn(); + final List dims = new ArrayList<>(batch.numCols()); + for (int col = 0; col < batch.numCols(); col++) { + final String name = batch.column(col).getFieldVector().getName(); + if (!name.equals(tsCol)) { + dims.add(name); + } + } + return dims; + } + + /** + * Type-safe extraction from Arrow vectors, avoiding getObject() boxing on the hot path. + * Covers all scalar types supported by iceberg-arrow 1.10.0. + * Falls back to getObject() for any type added in future Arrow/Iceberg versions. + */ + static Object extractValue(final FieldVector vec, final int idx) + { + if (vec instanceof BigIntVector) { + return ((BigIntVector) vec).get(idx); + } + if (vec instanceof IntVector) { + return ((IntVector) vec).get(idx); + } + if (vec instanceof SmallIntVector) { + return (int) ((SmallIntVector) vec).get(idx); + } + if (vec instanceof TinyIntVector) { + return (int) ((TinyIntVector) vec).get(idx); + } + if (vec instanceof Float8Vector) { + return ((Float8Vector) vec).get(idx); + } + if (vec instanceof Float4Vector) { + return (double) ((Float4Vector) vec).get(idx); + } + if (vec instanceof BitVector) { + return ((BitVector) vec).get(idx) == 1; + } + if (vec instanceof VarCharVector) { + return new String(((VarCharVector) vec).get(idx), StandardCharsets.UTF_8); + } + if (vec instanceof VarBinaryVector) { + return ((VarBinaryVector) vec).get(idx); + } + if (vec instanceof DecimalVector) { + return ((DecimalVector) vec).getObject(idx); + } + // Timestamps: Iceberg stores timestamps as micros; convert to millis for Druid. + if (vec instanceof TimeStampMicroTZVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroTZVector) vec).get(idx)); + } + if (vec instanceof TimeStampMicroVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeStampMicroVector) vec).get(idx)); + } + if (vec instanceof TimeStampNanoTZVector) { + return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoTZVector) vec).get(idx)); + } + if (vec instanceof TimeStampNanoVector) { + return TimeUnit.NANOSECONDS.toMillis(((TimeStampNanoVector) vec).get(idx)); + } + if (vec instanceof TimeStampMilliTZVector) { + return ((TimeStampMilliTZVector) vec).get(idx); + } + if (vec instanceof TimeStampMilliVector) { + return ((TimeStampMilliVector) vec).get(idx); + } + if (vec instanceof DateDayVector) { + // Days since epoch → millis since epoch + return TimeUnit.DAYS.toMillis(((DateDayVector) vec).get(idx)); + } + if (vec instanceof TimeMicroVector) { + return TimeUnit.MICROSECONDS.toMillis(((TimeMicroVector) vec).get(idx)); + } + if (vec instanceof FixedSizeBinaryVector) { + return ((FixedSizeBinaryVector) vec).get(idx); + } + // Safe fallback for any Arrow type not explicitly handled (dict-encoded, future types). + return vec.getObject(idx); + } + + private static final class NoopInputStats implements InputStats + { + @Override + public void incrementProcessedBytes(final long incrementByValue) + { + } + + @Override + public long getProcessedBytes() + { + return 0; + } + } + + private class ArrowInputRowIterator implements CloseableIterator + { + private final org.apache.iceberg.io.CloseableIterator batchIter; + private final ArrowReader arrowReader; + private final CloseableIterable tasks; + private final InputStats inputStats; + + private ColumnarBatch currentBatch = null; + private int rowIndexInBatch = 0; + private boolean exhausted = false; + + ArrowInputRowIterator( + final org.apache.iceberg.io.CloseableIterator batchIter, + final ArrowReader arrowReader, + final CloseableIterable tasks, + final InputStats inputStats + ) + { + this.batchIter = batchIter; + this.arrowReader = arrowReader; + this.tasks = tasks; + this.inputStats = inputStats; + } + + @Override + public boolean hasNext() + { + if (exhausted) { + return false; + } + if (currentBatch != null && rowIndexInBatch < currentBatch.numRows()) { + return true; + } + return loadNextBatch(); + } + + @Override + public InputRow next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return batchRowToInputRow(currentBatch, rowIndexInBatch++); + } + + private boolean loadNextBatch() + { + while (batchIter.hasNext()) { + currentBatch = batchIter.next(); + rowIndexInBatch = 0; + if (currentBatch.numRows() > 0) { + inputStats.incrementProcessedBytes(estimateBatchBytes(currentBatch)); + return true; + } + } + exhausted = true; + return false; + } + + private long estimateBatchBytes(final ColumnarBatch batch) + { + long bytes = 0; + for (int col = 0; col < batch.numCols(); col++) { + bytes += batch.column(col).getFieldVector().getBufferSize(); + } + return bytes; + } + + @Override + public void close() throws IOException + { + try { + batchIter.close(); + } + finally { + try { + arrowReader.close(); + } + finally { + tasks.close(); + } + } + } + } +} From 866a093e24b2dcd60fe641170adbeba1458ef1f9 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 00:18:11 +0530 Subject: [PATCH 04/26] feat: wire useArrowReader + arrowBatchSize into IcebergInputSource --- .../input/IcebergArrowInputSourceReader.java | 2 +- .../iceberg/input/IcebergInputSource.java | 36 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index 04e96c04bb8e..5379ad268d04 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -190,7 +190,7 @@ private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx) event.put(vec.getName(), extractValue(vec, rowIdx)); } } - final long timestamp = schema.getTimestampSpec().extractTimestamp(event); + final long timestamp = schema.getTimestampSpec().extractTimestamp(event).getMillis(); final List dimensions = resolveDimensions(batch); return new MapBasedInputRow(timestamp, dimensions, event); } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index ccbb10af14dc..5941292c47a6 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -76,6 +76,12 @@ public class IcebergInputSource implements SplittableInputSource> @JsonProperty private final ResidualFilterMode residualFilterMode; + @JsonProperty + private final boolean useArrowReader; + + @JsonProperty + private final int arrowBatchSize; + private boolean isLoaded = false; private SplittableInputSource delegateInputSource; @@ -88,7 +94,9 @@ public IcebergInputSource( @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, @JsonProperty("warehouseSource") InputSourceFactory warehouseSource, @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, - @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode + @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, + @JsonProperty("useArrowReader") @Nullable Boolean useArrowReader, + @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize ) { this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); @@ -98,6 +106,10 @@ public IcebergInputSource( this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null"); this.snapshotTime = snapshotTime; this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); + this.useArrowReader = useArrowReader != null && useArrowReader; + this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0 + ? arrowBatchSize + : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE; } @Override @@ -113,6 +125,16 @@ public InputSourceReader reader( File temporaryDirectory ) { + if (useArrowReader) { + return new IcebergArrowInputSourceReader( + icebergCatalog.retrieveTable(namespace, tableName), + icebergFilter, + snapshotTime, + icebergCatalog.isCaseSensitive(), + inputRowSchema, + arrowBatchSize + ); + } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -189,6 +211,18 @@ public ResidualFilterMode getResidualFilterMode() return residualFilterMode; } + @JsonProperty + public boolean isUseArrowReader() + { + return useArrowReader; + } + + @JsonProperty + public int getArrowBatchSize() + { + return arrowBatchSize; + } + public SplittableInputSource getDelegateInputSource() { return delegateInputSource; From 2339330d07e021f0569e892ead03e861805a9845 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 00:21:29 +0530 Subject: [PATCH 05/26] test: add IcebergArrowInputSourceReaderTest; fix IcebergInputSourceTest constructor arity --- .../IcebergArrowInputSourceReaderTest.java | 285 ++++++++++++++++++ .../iceberg/input/IcebergInputSourceTest.java | 26 +- 2 files changed, 307 insertions(+), 4 deletions(-) create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java new file mode 100644 index 000000000000..b3a34331bc4e --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +public class IcebergArrowInputSourceReaderTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final String NAMESPACE = "default"; + private static final String TABLE = "arrowTestTable"; + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "ts", Types.LongType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "value", Types.DoubleType.get()) + ); + + private static final InputRowSchema INPUT_SCHEMA = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of( + new StringDimensionSchema("name") + )) + .build(), + ColumnsFilter.all() + ); + + private File warehouseDir; + private IcebergCatalog catalog; + private TableIdentifier tableId; + + @Before + public void setup() throws IOException + { + warehouseDir = FileUtils.createTempDir(); + catalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); + tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE); + } + + @After + public void tearDown() + { + if (catalog.retrieveCatalog().tableExists(tableId)) { + catalog.retrieveCatalog().dropTable(tableId); + } + } + + @Test + public void testBasicRead() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 1.1), row(2_000L, "bob", 2.2), row(3_000L, "carol", 3.3)); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(3, rows.size()); + Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch()); + Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0)); + Assert.assertEquals(2_000L, rows.get(1).getTimestampFromEpoch()); + Assert.assertEquals("bob", rows.get(1).getDimension("name").get(0)); + Assert.assertEquals(3_000L, rows.get(2).getTimestampFromEpoch()); + Assert.assertEquals("carol", rows.get(2).getDimension("name").get(0)); + } + + @Test + public void testEmptyTable() throws IOException + { + catalog.retrieveCatalog().createTable(tableId, SCHEMA); + final Table table = catalog.retrieveTable(NAMESPACE, TABLE); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(0, rows.size()); + } + + @Test + public void testWithEqualsFilter() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows( + table, + row(1_000L, "alice", 1.0), + row(2_000L, "bob", 2.0), + row(3_000L, "alice", 3.0) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + new IcebergEqualsFilter("name", "alice"), + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + // Filter is non-partition on an unpartitioned table — all 3 rows from the file are returned. + // iceberg-arrow may dict-encode repeated string columns, so we assert row count only. + final List rows = readAll(reader); + Assert.assertEquals(3, rows.size()); + } + + @Test + public void testColumnPruning() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 9.9)); + + // Only request ts + name; value should not appear in output event. + final InputRowSchema pruned = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.all() + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, null, null, true, pruned, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch()); + Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0)); + } + + @Test + public void testLargeBatch() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + final int count = 5_000; + final GenericRecord[] data = new GenericRecord[count]; + for (int i = 0; i < count; i++) { + data[i] = row((long) (i + 1) * 1000, "user" + i, i * 0.1); + } + writeRows(table, data); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(count, rows.size()); + } + + @Test + public void testSnapshotTime() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "snap1", 1.0)); + final long afterFirstSnapshot = System.currentTimeMillis(); + + // Small sleep to ensure second snapshot has later timestamp + try { Thread.sleep(10); } catch (InterruptedException ignored) { } + writeRows(table, row(2_000L, "snap2", 2.0)); + + // Read as-of the first snapshot — should only see 1 row. + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + new org.joda.time.DateTime(afterFirstSnapshot), + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("snap1", rows.get(0).getDimension("name").get(0)); + } + + // --- helpers --- + + private static GenericRecord row(final long ts, final String name, final double value) + { + final GenericRecord r = GenericRecord.create(SCHEMA); + r.setField("ts", ts); + r.setField("name", name); + r.setField("value", value); + return r; + } + + private static void writeRows(final Table table, final GenericRecord... records) throws IOException + { + final String filepath = table.location() + "/" + UUID.randomUUID() + ".parquet"; + final OutputFile file = table.io().newOutputFile(filepath); + final DataWriter writer = + Parquet.writeData(file) + .schema(SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + try { + for (final GenericRecord r : records) { + writer.write(r); + } + } + finally { + writer.close(); + } + final DataFile dataFile = writer.toDataFile(); + table.newAppend().appendFile(dataFile).commit(); + } + + private static List readAll(final IcebergArrowInputSourceReader reader) throws IOException + { + final List result = new ArrayList<>(); + try (CloseableIterator it = reader.read(new NoopInputStats())) { + while (it.hasNext()) { + result.add(it.next()); + } + } + return result; + } + + private static final class NoopInputStats implements org.apache.druid.data.input.InputStats + { + @Override + public void incrementProcessedBytes(final long v) {} + + @Override + public long getProcessedBytes() { return 0; } + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 93d7412cc77a..fdd891c69c60 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -100,6 +100,8 @@ public void testInputSource() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -136,6 +138,8 @@ public void testInputSourceWithEmptySource() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -152,6 +156,8 @@ public void testInputSourceWithFilter() throws IOException testCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -188,6 +194,8 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException testCatalog, new LocalInputSourceFactory(), DateTimes.nowUtc(), + null, + null, null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); @@ -208,6 +216,8 @@ public void testCaseInsensitiveFiltering() throws IOException caseInsensitiveCatalog, new LocalInputSourceFactory(), null, + null, + null, null ); @@ -233,7 +243,9 @@ public void testResidualFilterModeIgnore() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.IGNORE + ResidualFilterMode.IGNORE, + null, + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); @@ -250,7 +262,9 @@ public void testResidualFilterModeFail() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); DruidException exception = Assert.assertThrows( DruidException.class, @@ -278,7 +292,9 @@ public void testResidualFilterModeFailWithPartitionedTable() throws IOException testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(1, splits.count()); @@ -301,7 +317,9 @@ public void testResidualFilterModeFailWithPartitionedTableNonPartitionColumn() t testCatalog, new LocalInputSourceFactory(), null, - ResidualFilterMode.FAIL + ResidualFilterMode.FAIL, + null, + null ); DruidException exception = Assert.assertThrows( DruidException.class, From 8447ce0c533331d9bbefaafeba368d94ee58d47c Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 15:42:07 +0530 Subject: [PATCH 06/26] style: fix checkstyle and forbidden-apis violations (imports, argument line breaking, DateTimes.utc, Maps.newHashMapWithExpectedSize) --- .../input/IcebergArrowInputSourceReader.java | 11 +++-- .../IcebergArrowInputSourceReaderTest.java | 44 +++++++++++++++---- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index 5379ad268d04..4187c0b155c0 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -19,6 +19,7 @@ package org.apache.druid.iceberg.input; +import com.google.common.collect.Maps; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -29,14 +30,14 @@ import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.arrow.vector.TimeStampNanoTZVector; -import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TimeStampMilliTZVector; import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; -import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.druid.data.input.InputRow; @@ -46,7 +47,6 @@ import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.iceberg.filter.IcebergFilter; -import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.Table; @@ -61,7 +61,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -183,7 +182,7 @@ private TableScan buildScan() private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx) { final int numCols = batch.numCols(); - final Map event = new HashMap<>(numCols); + final Map event = Maps.newHashMapWithExpectedSize(numCols); for (int col = 0; col < numCols; col++) { final FieldVector vec = batch.column(col).getFieldVector(); if (!vec.isNull(rowIdx)) { diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java index b3a34331bc4e..b5fc655b66d1 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.iceberg.DataFile; @@ -106,7 +107,12 @@ public void testBasicRead() throws IOException writeRows(table, row(1_000L, "alice", 1.1), row(2_000L, "bob", 2.2), row(3_000L, "carol", 3.3)); final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( - table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE ); final List rows = readAll(reader); @@ -126,7 +132,12 @@ public void testEmptyTable() throws IOException final Table table = catalog.retrieveTable(NAMESPACE, TABLE); final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( - table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE ); final List rows = readAll(reader); @@ -175,7 +186,12 @@ public void testColumnPruning() throws IOException ); final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( - table, null, null, true, pruned, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + table, + null, + null, + true, + pruned, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE ); final List rows = readAll(reader); @@ -196,7 +212,12 @@ public void testLargeBatch() throws IOException writeRows(table, data); final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( - table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + table, + null, + null, + true, + INPUT_SCHEMA, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE ); final List rows = readAll(reader); @@ -204,21 +225,21 @@ public void testLargeBatch() throws IOException } @Test - public void testSnapshotTime() throws IOException + public void testSnapshotTime() throws IOException, InterruptedException { final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); writeRows(table, row(1_000L, "snap1", 1.0)); final long afterFirstSnapshot = System.currentTimeMillis(); // Small sleep to ensure second snapshot has later timestamp - try { Thread.sleep(10); } catch (InterruptedException ignored) { } + Thread.sleep(10); writeRows(table, row(2_000L, "snap2", 2.0)); // Read as-of the first snapshot — should only see 1 row. final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( table, null, - new org.joda.time.DateTime(afterFirstSnapshot), + DateTimes.utc(afterFirstSnapshot), true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE @@ -277,9 +298,14 @@ private static List readAll(final IcebergArrowInputSourceReader reader private static final class NoopInputStats implements org.apache.druid.data.input.InputStats { @Override - public void incrementProcessedBytes(final long v) {} + public void incrementProcessedBytes(final long v) + { + } @Override - public long getProcessedBytes() { return 0; } + public long getProcessedBytes() + { + return 0; + } } } From f151f5b01ae1a8cd6497d956af98bd59a4fe5abc Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sat, 23 May 2026 18:34:43 +0530 Subject: [PATCH 07/26] fix: switch arrow-memory-netty to arrow-memory-unsafe to fix CI ArrowAllocation init failure --- extensions-contrib/druid-iceberg-extensions/pom.xml | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index fa958e26350c..e990fa4fc240 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -771,7 +771,7 @@ org.apache.arrow - arrow-memory-netty + arrow-memory-unsafe runtime diff --git a/pom.xml b/pom.xml index 67c08db1bd06..873f9536b073 100644 --- a/pom.xml +++ b/pom.xml @@ -436,7 +436,7 @@ org.apache.arrow - arrow-memory-netty + arrow-memory-unsafe ${arrow.version} From 87bdc7f4d78d76a9b8f51369a29dd48f13db72a6 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 21:33:32 +0530 Subject: [PATCH 08/26] test: add regression for aggregator source column projection (currently failing) --- .../IcebergArrowInputSourceReaderTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java index b5fc655b66d1..8c59d6069794 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -20,9 +20,11 @@ package org.apache.druid.iceberg.input; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; @@ -54,6 +56,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; public class IcebergArrowInputSourceReaderTest @@ -250,6 +253,46 @@ public void testSnapshotTime() throws IOException, InterruptedException Assert.assertEquals("snap1", rows.get(0).getDimension("name").get(0)); } + @Test + public void testAggregatorSourceColumnSurvivesProjection() throws IOException + { + // Regression: projection driven only from DimensionsSpec drops non-dimension columns + // (aggregator source fields, transform inputs, filter inputs) — see review on PR 19510. + // The authoritative source for "what columns this ingestion needs" is + // InputRowSchema.getColumnsFilter(). dimensions=[name] but the ColumnsFilter inclusion + // also lists `value` (simulating a doubleSum aggregator over `value`). + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 9.0), row(2_000L, "bob", 4.5)); + + final InputRowSchema schemaWithAggSource = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.inclusionBased(ImmutableSet.of("ts", "name", "value")) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + schemaWithAggSource, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(2, rows.size()); + final Map event0 = ((MapBasedInputRow) rows.get(0)).getEvent(); + Assert.assertEquals("alice", event0.get("name")); + Assert.assertNotNull("aggregator source column 'value' must survive projection", event0.get("value")); + Assert.assertEquals(9.0, ((Number) event0.get("value")).doubleValue(), 0.0001); + final Map event1 = ((MapBasedInputRow) rows.get(1)).getEvent(); + Assert.assertEquals("bob", event1.get("name")); + Assert.assertNotNull("aggregator source column 'value' must survive projection", event1.get("value")); + Assert.assertEquals(4.5, ((Number) event1.get("value")).doubleValue(), 0.0001); + } + // --- helpers --- private static GenericRecord row(final long ts, final String name, final double value) From c774363e168d27e29b9f8116dee22b71346104a8 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 21:34:31 +0530 Subject: [PATCH 09/26] fix: drive Iceberg scan projection from ColumnsFilter, mirroring DeltaInputSource --- .../input/IcebergArrowInputSourceReader.java | 48 +++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index 4187c0b155c0..ab3dcab439fc 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -40,6 +40,7 @@ import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; +import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; @@ -54,6 +55,7 @@ import org.apache.iceberg.arrow.vectorized.ArrowReader; import org.apache.iceberg.arrow.vectorized.ColumnarBatch; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.TableScanUtil; import org.joda.time.DateTime; @@ -65,6 +67,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Reads an Iceberg table via iceberg-arrow's {@link ArrowReader}, yielding {@link InputRow} objects. @@ -155,15 +158,13 @@ private TableScan buildScan() { TableScan scan = table.newScan().caseSensitive(caseSensitive); - // Push column projection into the scan planner — only requested columns are read from disk. - final List configuredDims = schema.getDimensionsSpec().getDimensionNames(); - final String timestampColumn = schema.getTimestampSpec().getTimestampColumn(); - if (!configuredDims.isEmpty()) { - final List selectCols = new ArrayList<>(configuredDims); - if (timestampColumn != null && !selectCols.contains(timestampColumn)) { - selectCols.add(timestampColumn); - } - scan = scan.select(selectCols); + // Drive projection from ColumnsFilter (the authoritative "what columns this ingestion needs"), + // walking the Iceberg table schema as the column universe. DimensionsSpec alone is NOT + // authoritative — it omits aggregator source fields, transform inputs, and filter inputs. + // Pattern mirrors DeltaInputSource#pruneSchema and DruidSegmentReader. + final List projection = projectedColumns(); + if (projection != null) { + scan = scan.select(projection); } // Push predicate into scan planner — reduces files opened and rows read. @@ -179,6 +180,35 @@ private TableScan buildScan() return scan; } + /** + * Computes the column projection to push into the Iceberg scan, using + * {@link InputRowSchema#getColumnsFilter()} as the authority. Walks the table's full schema + * and keeps every column the filter accepts. Returns {@code null} when the filter accepts + * every column in the table (no useful pruning), so the caller skips {@code scan.select}. + * + * Pattern mirrors {@code DeltaInputSource#pruneSchema} and {@code DruidSegmentReader}. + */ + @Nullable + private List projectedColumns() + { + final ColumnsFilter filter = schema.getColumnsFilter(); + final List allColumns = table.schema().columns().stream() + .map(Types.NestedField::name) + .collect(Collectors.toList()); + final List filtered = allColumns.stream() + .filter(filter::apply) + .collect(Collectors.toList()); + if (filtered.equals(allColumns)) { + return null; + } + // Defensive: ensure timestamp survives even if the ColumnsFilter is misconfigured to omit it. + final String tsCol = schema.getTimestampSpec().getTimestampColumn(); + if (tsCol != null && allColumns.contains(tsCol) && !filtered.contains(tsCol)) { + filtered.add(tsCol); + } + return filtered; + } + private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx) { final int numCols = batch.numCols(); From 6db57a7ae694ada730bd81c5f292a2e1eb34f81d Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 21:36:47 +0530 Subject: [PATCH 10/26] test: ColumnsFilter exclusion prunes unused columns at Iceberg scan --- .../IcebergArrowInputSourceReaderTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java index 8c59d6069794..8914ad7aea46 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -293,6 +293,42 @@ public void testAggregatorSourceColumnSurvivesProjection() throws IOException Assert.assertEquals(4.5, ((Number) event1.get("value")).doubleValue(), 0.0001); } + @Test + public void testProjectionPrunesUnusedColumns() throws IOException + { + final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); + writeRows(table, row(1_000L, "alice", 7.0), row(2_000L, "bob", 8.0)); + + // ColumnsFilter excludes "value" — fix should push projection so "value" is never read from disk. + final InputRowSchema prunedSchema = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder() + .setDimensions(ImmutableList.of(new StringDimensionSchema("name"))) + .build(), + ColumnsFilter.exclusionBased(ImmutableSet.of("value")) + ); + + final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( + table, + null, + null, + true, + prunedSchema, + IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE + ); + + final List rows = readAll(reader); + Assert.assertEquals(2, rows.size()); + for (final InputRow r : rows) { + final Map event = ((MapBasedInputRow) r).getEvent(); + Assert.assertNull( + "excluded column 'value' must be pruned at scan and absent from event", + event.get("value") + ); + Assert.assertNotNull("included column 'name' must be present", event.get("name")); + } + } + // --- helpers --- private static GenericRecord row(final long ts, final String name, final double value) From a7fa10c67be20f2ce952e67809b2de51341f55c7 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 21:39:42 +0530 Subject: [PATCH 11/26] style: drop redundant comments per AGENTS.md hygiene --- .../input/IcebergArrowInputSourceReader.java | 20 +------------------ .../IcebergArrowInputSourceReaderTest.java | 14 +++---------- 2 files changed, 4 insertions(+), 30 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index ab3dcab439fc..38a96b57eb9d 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -158,36 +158,20 @@ private TableScan buildScan() { TableScan scan = table.newScan().caseSensitive(caseSensitive); - // Drive projection from ColumnsFilter (the authoritative "what columns this ingestion needs"), - // walking the Iceberg table schema as the column universe. DimensionsSpec alone is NOT - // authoritative — it omits aggregator source fields, transform inputs, and filter inputs. - // Pattern mirrors DeltaInputSource#pruneSchema and DruidSegmentReader. final List projection = projectedColumns(); if (projection != null) { scan = scan.select(projection); } - - // Push predicate into scan planner — reduces files opened and rows read. if (icebergFilter != null) { scan = icebergFilter.filter(scan); } - - // Snapshot pinning for time-travel reads. if (snapshotTime != null) { scan = scan.asOfTime(snapshotTime.getMillis()); } - return scan; } - /** - * Computes the column projection to push into the Iceberg scan, using - * {@link InputRowSchema#getColumnsFilter()} as the authority. Walks the table's full schema - * and keeps every column the filter accepts. Returns {@code null} when the filter accepts - * every column in the table (no useful pruning), so the caller skips {@code scan.select}. - * - * Pattern mirrors {@code DeltaInputSource#pruneSchema} and {@code DruidSegmentReader}. - */ + /** Projection authority is ColumnsFilter, not DimensionsSpec. Mirrors DeltaInputSource#pruneSchema. */ @Nullable private List projectedColumns() { @@ -201,7 +185,6 @@ private List projectedColumns() if (filtered.equals(allColumns)) { return null; } - // Defensive: ensure timestamp survives even if the ColumnsFilter is misconfigured to omit it. final String tsCol = schema.getTimestampSpec().getTimestampColumn(); if (tsCol != null && allColumns.contains(tsCol) && !filtered.contains(tsCol)) { filtered.add(tsCol); @@ -230,7 +213,6 @@ private List resolveDimensions(final ColumnarBatch batch) if (!configured.isEmpty()) { return configured; } - // Derive dimensions from the batch schema, excluding the timestamp column. final String tsCol = schema.getTimestampSpec().getTimestampColumn(); final List dims = new ArrayList<>(batch.numCols()); for (int col = 0; col < batch.numCols(); col++) { diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java index 8914ad7aea46..db1f74144283 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReaderTest.java @@ -167,8 +167,7 @@ public void testWithEqualsFilter() throws IOException IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE ); - // Filter is non-partition on an unpartitioned table — all 3 rows from the file are returned. - // iceberg-arrow may dict-encode repeated string columns, so we assert row count only. + // iceberg-arrow may dict-encode repeated string columns; assert row count only. final List rows = readAll(reader); Assert.assertEquals(3, rows.size()); } @@ -179,7 +178,6 @@ public void testColumnPruning() throws IOException final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); writeRows(table, row(1_000L, "alice", 9.9)); - // Only request ts + name; value should not appear in output event. final InputRowSchema pruned = new InputRowSchema( new TimestampSpec("ts", "millis", null), DimensionsSpec.builder() @@ -234,11 +232,9 @@ public void testSnapshotTime() throws IOException, InterruptedException writeRows(table, row(1_000L, "snap1", 1.0)); final long afterFirstSnapshot = System.currentTimeMillis(); - // Small sleep to ensure second snapshot has later timestamp Thread.sleep(10); writeRows(table, row(2_000L, "snap2", 2.0)); - // Read as-of the first snapshot — should only see 1 row. final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader( table, null, @@ -256,11 +252,7 @@ public void testSnapshotTime() throws IOException, InterruptedException @Test public void testAggregatorSourceColumnSurvivesProjection() throws IOException { - // Regression: projection driven only from DimensionsSpec drops non-dimension columns - // (aggregator source fields, transform inputs, filter inputs) — see review on PR 19510. - // The authoritative source for "what columns this ingestion needs" is - // InputRowSchema.getColumnsFilter(). dimensions=[name] but the ColumnsFilter inclusion - // also lists `value` (simulating a doubleSum aggregator over `value`). + // Regression: dimensions=[name] plus ColumnsFilter inclusion of `value` (aggregator source). final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); writeRows(table, row(1_000L, "alice", 9.0), row(2_000L, "bob", 4.5)); @@ -299,7 +291,7 @@ public void testProjectionPrunesUnusedColumns() throws IOException final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA); writeRows(table, row(1_000L, "alice", 7.0), row(2_000L, "bob", 8.0)); - // ColumnsFilter excludes "value" — fix should push projection so "value" is never read from disk. + // Exclusion-based filter must push projection so excluded columns are never read. final InputRowSchema prunedSchema = new InputRowSchema( new TimestampSpec("ts", "millis", null), DimensionsSpec.builder() From 113948f350f4dc8acab2019411438f3870e19db8 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 21:53:52 +0530 Subject: [PATCH 12/26] test: regression for residual FAIL mode bypassed by Arrow path --- .../iceberg/input/IcebergInputSourceTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index fdd891c69c60..f4df7328992f 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSourceFactory; import org.apache.druid.error.DruidException; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -276,6 +277,40 @@ public void testResidualFilterModeFail() throws IOException ); } + @Test + public void testResidualFilterModeFailWithArrowReader() throws IOException + { + // Arrow path must honor residualFilterMode=FAIL just like the path-based path. + final IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", "123988"), + testCatalog, + new LocalInputSourceFactory(), + null, + ResidualFilterMode.FAIL, + true, + 1024 + ); + final InputRowSchema inputRowSchema = new InputRowSchema( + new org.apache.druid.data.input.impl.TimestampSpec("timestamp", "millis", null), + org.apache.druid.data.input.impl.DimensionsSpec.builder().build(), + org.apache.druid.data.input.ColumnsFilter.all() + ); + final DruidException exception = Assert.assertThrows( + DruidException.class, + () -> { + final org.apache.druid.data.input.InputSourceReader reader = + inputSource.reader(inputRowSchema, null, FileUtils.createTempDir()); + reader.read().close(); + } + ); + Assert.assertTrue( + "Expect residual error to be thrown on Arrow path: " + exception.getMessage(), + exception.getMessage().contains("residual") + ); + } + @Test public void testResidualFilterModeFailWithPartitionedTable() throws IOException { From 6a3ad857482d0017d1fe70330f2bd8ef91b09184 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 22:02:15 +0530 Subject: [PATCH 13/26] fix: enforce residualFilterMode in Arrow reader path --- .../druid/iceberg/input/IcebergCatalog.java | 74 +++++++++++-------- .../iceberg/input/IcebergInputSource.java | 11 ++- .../iceberg/input/IcebergInputSourceTest.java | 2 +- 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index f57480adf15e..63894e18ff23 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.joda.time.DateTime; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -140,37 +141,11 @@ public List extractSnapshotDataFiles( } tableScan = tableScan.caseSensitive(isCaseSensitive()); - CloseableIterable tasks = tableScan.planFiles(); - - Expression detectedResidual = null; - for (FileScanTask task : tasks) { - dataFilePaths.add(task.file().location()); - - // Check for residual filters - if (detectedResidual == null) { - Expression residual = task.residual(); - if (residual != null && !residual.equals(Expressions.alwaysTrue())) { - detectedResidual = residual; - } - } - } - - // Handle residual filter based on mode - if (detectedResidual != null) { - String message = StringUtils.format( - "Iceberg filter produced residual expression that requires row-level filtering. " - + "This typically means the filter is on a non-partition column. " - + "Residual rows may be ingested unless filtered by transformSpec. " - + "Residual filter: [%s]", - detectedResidual - ); - - if (residualFilterMode == ResidualFilterMode.FAIL) { - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(message); + enforceResidualMode(tableScan, residualFilterMode); + try (CloseableIterable tasks = tableScan.planFiles()) { + for (FileScanTask task : tasks) { + dataFilePaths.add(task.file().location()); } - log.warn(message); } long duration = System.currentTimeMillis() - start; @@ -187,4 +162,43 @@ public List extractSnapshotDataFiles( } return dataFilePaths; } + + /** + * Detects whether the planned scan carries a non-trivial residual expression (a filter that + * could not be fully resolved by partition pruning) and applies {@link ResidualFilterMode}: + * {@code FAIL} throws a {@link DruidException}, {@code IGNORE} logs a warning. Shared by the + * path-based and Arrow reader paths. + */ + public void enforceResidualMode(TableScan tableScan, ResidualFilterMode residualFilterMode) + { + Expression detectedResidual = null; + try (CloseableIterable tasks = tableScan.planFiles()) { + for (FileScanTask task : tasks) { + final Expression residual = task.residual(); + if (residual != null && !residual.equals(Expressions.alwaysTrue())) { + detectedResidual = residual; + break; + } + } + } + catch (IOException e) { + throw new RE(e, "Failed to plan Iceberg scan for residual detection"); + } + if (detectedResidual == null) { + return; + } + final String message = StringUtils.format( + "Iceberg filter produced residual expression that requires row-level filtering. " + + "This typically means the filter is on a non-partition column. " + + "Residual rows may be ingested unless filtered by transformSpec. " + + "Residual filter: [%s]", + detectedResidual + ); + if (residualFilterMode == ResidualFilterMode.FAIL) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(message); + } + log.warn(message); + } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index 5941292c47a6..84cf3c3b034c 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -37,6 +37,8 @@ import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -126,8 +128,15 @@ public InputSourceReader reader( ) { if (useArrowReader) { + final Table table = icebergCatalog.retrieveTable(namespace, tableName); + if (icebergFilter != null) { + final TableScan filteredScan = icebergFilter.filter( + table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) + ); + icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); + } return new IcebergArrowInputSourceReader( - icebergCatalog.retrieveTable(namespace, tableName), + table, icebergFilter, snapshotTime, icebergCatalog.isCaseSensitive(), diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index f4df7328992f..6ace4fd2b7c5 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -21,12 +21,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSourceFactory; import org.apache.druid.error.DruidException; -import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; From ed6c2ff51fefa5b4f54993cf47ea7617ab0d1872 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 22:35:49 +0530 Subject: [PATCH 14/26] test: regression for parallel ingestion bypassing Arrow reader --- .../iceberg/input/IcebergInputSourceTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 6ace4fd2b7c5..a53e6e9f96df 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -277,6 +277,42 @@ public void testResidualFilterModeFail() throws IOException ); } + @Test + public void testArrowReaderIsNonSplittable() throws IOException + { + // When useArrowReader=true, splittable contract MUST route through the Arrow path, + // not silently fall back to the delegate (path-based) reader in parallel ingestion. + final IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null, + true, + 1024 + ); + final List>> splits = + inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)) + .collect(Collectors.toList()); + Assert.assertEquals("Arrow mode must produce exactly one split", 1, splits.size()); + Assert.assertEquals( + "Arrow mode estimateNumSplits must be 1", + 1, + inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(null, null)) + ); + final org.apache.druid.data.input.InputSource child = inputSource.withSplit(splits.get(0)); + Assert.assertTrue( + "withSplit on Arrow mode must return an IcebergInputSource (not the delegate)", + child instanceof IcebergInputSource + ); + Assert.assertTrue( + "withSplit on Arrow mode must preserve useArrowReader=true", + ((IcebergInputSource) child).isUseArrowReader() + ); + } + @Test public void testResidualFilterModeFailWithArrowReader() throws IOException { From 498daa6d476e69f15d79605aef938520a2171dd4 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 24 May 2026 22:39:09 +0530 Subject: [PATCH 15/26] fix: route splittable contract through Arrow path when useArrowReader=true --- .../iceberg/input/IcebergInputSource.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index 84cf3c3b034c..c6f39fc6d351 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -32,10 +32,12 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -56,6 +58,7 @@ public class IcebergInputSource implements SplittableInputSource> { public static final String TYPE_KEY = "iceberg"; + private static final Logger log = new Logger(IcebergInputSource.class); @JsonProperty private final String tableName; @@ -156,6 +159,15 @@ public Stream>> createSplits( @Nullable SplitHintSpec splitHintSpec ) throws IOException { + if (useArrowReader) { + log.info( + "useArrowReader=true: input source is non-splittable; " + + "subtasks will not be created for table[%s.%s]", + namespace, + tableName + ); + return Stream.of(new InputSplit<>(Collections.emptyList())); + } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -165,6 +177,9 @@ public Stream>> createSplits( @Override public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException { + if (useArrowReader) { + return 1; + } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -174,12 +189,18 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit> inputSplit) { + if (useArrowReader) { + return this; + } return getDelegateInputSource().withSplit(inputSplit); } @Override public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHintSpec) { + if (useArrowReader) { + return splitHintSpec == null ? new MaxSizeSplitHintSpec(null, null) : splitHintSpec; + } return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec); } From 484ebea32dabb0a959b6ed1925646a7ea34684aa Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 25 May 2026 16:11:34 +0530 Subject: [PATCH 16/26] fix(iceberg-arrow): open jdk.internal.misc for Arrow allocator on JDK 21 --- extensions-contrib/druid-iceberg-extensions/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index e990fa4fc240..076f48328497 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -809,6 +809,14 @@ true + + org.apache.maven.plugins + maven-surefire-plugin + + + @{argLine} --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + + From bcbfe6710be3379461cadaf26f141a3978fedc49 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 25 May 2026 22:44:56 +0530 Subject: [PATCH 17/26] fix(iceberg-arrow): drop module surefire override; root pom argLine already opens java.nio --- extensions-contrib/druid-iceberg-extensions/pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index 076f48328497..e990fa4fc240 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -809,14 +809,6 @@ true - - org.apache.maven.plugins - maven-surefire-plugin - - - @{argLine} --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED - - From 2dedaa91a034f215bc885323fda705eb193fdf3d Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 26 May 2026 00:01:15 +0530 Subject: [PATCH 18/26] fix(iceberg-arrow): pin Arrow allocator to Unsafe to avoid Netty backend failure on JDK 25 --- .../druid/iceberg/input/IcebergArrowInputSourceReader.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index 38a96b57eb9d..b7c0c707b1d3 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -84,6 +84,13 @@ */ public class IcebergArrowInputSourceReader implements InputSourceReader { + // Pin Arrow to Unsafe allocator: Netty backend fails on JDK 25 (EmptyByteBuf.memoryAddress UnsupportedOperationException). + static { + if (System.getProperty("arrow.allocation.manager.type") == null) { + System.setProperty("arrow.allocation.manager.type", "Unsafe"); + } + } + static final int DEFAULT_BATCH_SIZE = 1024; private final Table table; From 2111b0a3bf9ad0d36152cb46e4710241ac5d442a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 26 May 2026 12:36:15 +0530 Subject: [PATCH 19/26] fix(iceberg-arrow): pin explicit arrow versions in iceberg pom for downstream resolvers --- extensions-contrib/druid-iceberg-extensions/pom.xml | 2 ++ pom.xml | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index e990fa4fc240..88ae02f31aab 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -768,10 +768,12 @@ org.apache.arrow arrow-vector + ${arrow.version} org.apache.arrow arrow-memory-unsafe + ${arrow.version} runtime diff --git a/pom.xml b/pom.xml index 873f9536b073..56fd919bf035 100644 --- a/pom.xml +++ b/pom.xml @@ -439,6 +439,11 @@ arrow-memory-unsafe ${arrow.version} + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + org.apache.iceberg iceberg-arrow From bf71afb428ec50c1518cd5d68e4743fdc7714468 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 2 Jun 2026 23:05:36 +0530 Subject: [PATCH 20/26] iceberg: split useArrowReader into dedicated IcebergArrowInputSource (type iceberg_arrow), extract AbstractIcebergInputSource --- .../iceberg/common/IcebergDruidModule.java | 2 + .../input/AbstractIcebergInputSource.java | 98 +++++++++++ .../input/IcebergArrowInputSource.java | 101 +++++++++++ .../iceberg/input/IcebergInputSource.java | 163 +++--------------- .../iceberg/input/IcebergInputSourceTest.java | 70 -------- 5 files changed, 223 insertions(+), 211 deletions(-) create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java index d238cccc248c..82c293466460 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java @@ -27,6 +27,7 @@ import org.apache.druid.iceberg.guice.HiveConf; import org.apache.druid.iceberg.input.GlueIcebergCatalog; import org.apache.druid.iceberg.input.HiveIcebergCatalog; +import org.apache.druid.iceberg.input.IcebergArrowInputSource; import org.apache.druid.iceberg.input.IcebergInputSource; import org.apache.druid.iceberg.input.LocalCatalog; import org.apache.druid.iceberg.input.RestIcebergCatalog; @@ -49,6 +50,7 @@ public List getJacksonModules() new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY), new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY), new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY), + new NamedType(IcebergArrowInputSource.class, IcebergArrowInputSource.TYPE_KEY), new NamedType(GlueIcebergCatalog.class, GlueIcebergCatalog.TYPE_KEY) ) ); diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java new file mode 100644 index 000000000000..ba5e1b1d45d2 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.common.config.Configs; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.iceberg.Table; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; + +public abstract class AbstractIcebergInputSource +{ + protected final String tableName; + protected final String namespace; + protected final IcebergCatalog icebergCatalog; + protected final IcebergFilter icebergFilter; + protected final DateTime snapshotTime; + protected final ResidualFilterMode residualFilterMode; + + protected AbstractIcebergInputSource( + final String tableName, + final String namespace, + @Nullable final IcebergFilter icebergFilter, + final IcebergCatalog icebergCatalog, + @Nullable final DateTime snapshotTime, + @Nullable final ResidualFilterMode residualFilterMode + ) + { + this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); + this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); + this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); + this.icebergFilter = icebergFilter; + this.snapshotTime = snapshotTime; + this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public String getNamespace() + { + return namespace; + } + + @JsonProperty + public IcebergCatalog getIcebergCatalog() + { + return icebergCatalog; + } + + @JsonProperty + public IcebergFilter getIcebergFilter() + { + return icebergFilter; + } + + @Nullable + @JsonProperty + public DateTime getSnapshotTime() + { + return snapshotTime; + } + + @JsonProperty + public ResidualFilterMode getResidualFilterMode() + { + return residualFilterMode; + } + + protected Table retrieveTable() + { + return icebergCatalog.retrieveTable(namespace, tableName); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java new file mode 100644 index 000000000000..b79c8ff2700d --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.File; + +public class IcebergArrowInputSource extends AbstractIcebergInputSource implements InputSource +{ + public static final String TYPE_KEY = "iceberg_arrow"; + + @JsonProperty + private final int arrowBatchSize; + + @JsonCreator + public IcebergArrowInputSource( + @JsonProperty("tableName") String tableName, + @JsonProperty("namespace") String namespace, + @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, + @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, + @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, + @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, + @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize + ) + { + super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode); + this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0 + ? arrowBatchSize + : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE; + } + + @JsonProperty + public int getArrowBatchSize() + { + return arrowBatchSize; + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + final Table table = retrieveTable(); + if (icebergFilter != null) { + final TableScan filteredScan = icebergFilter.filter( + table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) + ); + icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); + } + return new IcebergArrowInputSourceReader( + table, + icebergFilter, + snapshotTime, + icebergCatalog.isCaseSensitive(), + inputRowSchema, + arrowBatchSize + ); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index c6f39fc6d351..cdd80462f289 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; @@ -32,15 +31,12 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputStats; -import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -50,45 +46,16 @@ import java.util.List; import java.util.stream.Stream; -/** - * Inputsource to ingest data managed by the Iceberg table format. - * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table. - * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined. - */ -public class IcebergInputSource implements SplittableInputSource> +public class IcebergInputSource extends AbstractIcebergInputSource + implements SplittableInputSource> { public static final String TYPE_KEY = "iceberg"; private static final Logger log = new Logger(IcebergInputSource.class); @JsonProperty - private final String tableName; - - @JsonProperty - private final String namespace; - - @JsonProperty - private IcebergCatalog icebergCatalog; - - @JsonProperty - private IcebergFilter icebergFilter; - - @JsonProperty - private InputSourceFactory warehouseSource; - - @JsonProperty - private final DateTime snapshotTime; - - @JsonProperty - private final ResidualFilterMode residualFilterMode; - - @JsonProperty - private final boolean useArrowReader; - - @JsonProperty - private final int arrowBatchSize; + private final InputSourceFactory warehouseSource; private boolean isLoaded = false; - private SplittableInputSource delegateInputSource; @JsonCreator @@ -100,21 +67,26 @@ public IcebergInputSource( @JsonProperty("warehouseSource") InputSourceFactory warehouseSource, @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, + // deprecated: use type "iceberg_arrow" instead. retained for spec back-compat. @JsonProperty("useArrowReader") @Nullable Boolean useArrowReader, @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize ) { - this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); - this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); - this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); - this.icebergFilter = icebergFilter; + super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode); this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null"); - this.snapshotTime = snapshotTime; - this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); - this.useArrowReader = useArrowReader != null && useArrowReader; - this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0 - ? arrowBatchSize - : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE; + if (useArrowReader != null && useArrowReader) { + log.warn( + "useArrowReader on type[iceberg] is deprecated and ignored; " + + "switch to type[%s] for Arrow vectorized reads", + IcebergArrowInputSource.TYPE_KEY + ); + } + } + + @JsonProperty("warehouseSource") + public InputSourceFactory getWarehouseSource() + { + return warehouseSource; } @Override @@ -130,23 +102,6 @@ public InputSourceReader reader( File temporaryDirectory ) { - if (useArrowReader) { - final Table table = icebergCatalog.retrieveTable(namespace, tableName); - if (icebergFilter != null) { - final TableScan filteredScan = icebergFilter.filter( - table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) - ); - icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); - } - return new IcebergArrowInputSourceReader( - table, - icebergFilter, - snapshotTime, - icebergCatalog.isCaseSensitive(), - inputRowSchema, - arrowBatchSize - ); - } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -159,15 +114,6 @@ public Stream>> createSplits( @Nullable SplitHintSpec splitHintSpec ) throws IOException { - if (useArrowReader) { - log.info( - "useArrowReader=true: input source is non-splittable; " - + "subtasks will not be created for table[%s.%s]", - namespace, - tableName - ); - return Stream.of(new InputSplit<>(Collections.emptyList())); - } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -177,9 +123,6 @@ public Stream>> createSplits( @Override public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException { - if (useArrowReader) { - return 1; - } if (!isLoaded) { retrieveIcebergDatafiles(); } @@ -189,70 +132,15 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit> inputSplit) { - if (useArrowReader) { - return this; - } return getDelegateInputSource().withSplit(inputSplit); } @Override public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHintSpec) { - if (useArrowReader) { - return splitHintSpec == null ? new MaxSizeSplitHintSpec(null, null) : splitHintSpec; - } return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec); } - @JsonProperty - public String getTableName() - { - return tableName; - } - - @JsonProperty - public String getNamespace() - { - return namespace; - } - - @JsonProperty - public IcebergCatalog getIcebergCatalog() - { - return icebergCatalog; - } - - @JsonProperty - public IcebergFilter getIcebergFilter() - { - return icebergFilter; - } - - @Nullable - @JsonProperty - public DateTime getSnapshotTime() - { - return snapshotTime; - } - - @JsonProperty - public ResidualFilterMode getResidualFilterMode() - { - return residualFilterMode; - } - - @JsonProperty - public boolean isUseArrowReader() - { - return useArrowReader; - } - - @JsonProperty - public int getArrowBatchSize() - { - return arrowBatchSize; - } - public SplittableInputSource getDelegateInputSource() { return delegateInputSource; @@ -260,7 +148,7 @@ public SplittableInputSource getDelegateInputSource() protected void retrieveIcebergDatafiles() { - List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( + final List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( getNamespace(), getTableName(), getIcebergFilter(), @@ -275,11 +163,6 @@ protected void retrieveIcebergDatafiles() isLoaded = true; } - /** - * This input source is used in place of a delegate input source if there are no input file paths. - * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource - * may use this input source as delegate in such cases. - */ private static class EmptyInputSource implements SplittableInputSource { @Override @@ -306,15 +189,13 @@ public InputSourceReader reader( @Override public CloseableIterator read(InputStats inputStats) { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); + return CloseableIterators.wrap(Collections.emptyIterator(), () -> {}); } @Override public CloseableIterator sample() { - return CloseableIterators.wrap(Collections.emptyIterator(), () -> { - }); + return CloseableIterators.wrap(Collections.emptyIterator(), () -> {}); } }; } @@ -337,7 +218,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp @Override public InputSource withSplit(InputSplit split) { - return null; + return this; } } } diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index a53e6e9f96df..69971b93e6c2 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -277,76 +277,6 @@ public void testResidualFilterModeFail() throws IOException ); } - @Test - public void testArrowReaderIsNonSplittable() throws IOException - { - // When useArrowReader=true, splittable contract MUST route through the Arrow path, - // not silently fall back to the delegate (path-based) reader in parallel ingestion. - final IcebergInputSource inputSource = new IcebergInputSource( - TABLENAME, - NAMESPACE, - null, - testCatalog, - new LocalInputSourceFactory(), - null, - null, - true, - 1024 - ); - final List>> splits = - inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)) - .collect(Collectors.toList()); - Assert.assertEquals("Arrow mode must produce exactly one split", 1, splits.size()); - Assert.assertEquals( - "Arrow mode estimateNumSplits must be 1", - 1, - inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(null, null)) - ); - final org.apache.druid.data.input.InputSource child = inputSource.withSplit(splits.get(0)); - Assert.assertTrue( - "withSplit on Arrow mode must return an IcebergInputSource (not the delegate)", - child instanceof IcebergInputSource - ); - Assert.assertTrue( - "withSplit on Arrow mode must preserve useArrowReader=true", - ((IcebergInputSource) child).isUseArrowReader() - ); - } - - @Test - public void testResidualFilterModeFailWithArrowReader() throws IOException - { - // Arrow path must honor residualFilterMode=FAIL just like the path-based path. - final IcebergInputSource inputSource = new IcebergInputSource( - TABLENAME, - NAMESPACE, - new IcebergEqualsFilter("id", "123988"), - testCatalog, - new LocalInputSourceFactory(), - null, - ResidualFilterMode.FAIL, - true, - 1024 - ); - final InputRowSchema inputRowSchema = new InputRowSchema( - new org.apache.druid.data.input.impl.TimestampSpec("timestamp", "millis", null), - org.apache.druid.data.input.impl.DimensionsSpec.builder().build(), - org.apache.druid.data.input.ColumnsFilter.all() - ); - final DruidException exception = Assert.assertThrows( - DruidException.class, - () -> { - final org.apache.druid.data.input.InputSourceReader reader = - inputSource.reader(inputRowSchema, null, FileUtils.createTempDir()); - reader.read().close(); - } - ); - Assert.assertTrue( - "Expect residual error to be thrown on Arrow path: " + exception.getMessage(), - exception.getMessage().contains("residual") - ); - } - @Test public void testResidualFilterModeFailWithPartitionedTable() throws IOException { From 3fbb73f4d42e1f961d246a5253739c5559741264 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 2 Jun 2026 23:06:47 +0530 Subject: [PATCH 21/26] iceberg: add IcebergArrowInputSourceTest covering non-splittable and residual-FAIL --- .../input/IcebergArrowInputSourceTest.java | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java new file mode 100644 index 000000000000..285aeda15753 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class IcebergArrowInputSourceTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private IcebergCatalog testCatalog; + private TableIdentifier tableIdentifier; + private File warehouseDir; + + private final Schema tableSchema = new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()) + ); + private final Map tableData = ImmutableMap.of("id", "123988", "name", "Foo"); + + private static final String NAMESPACE = "default"; + private static final String TABLENAME = "foosTable"; + + @Before + public void setup() throws IOException + { + warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true); + tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + createAndLoadTable(tableIdentifier); + } + + @After + public void tearDown() + { + dropTableFromCatalog(tableIdentifier); + } + + @Test + public void testIsNotSplittable() + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + null, + null, + 1024 + ); + Assert.assertFalse(src.isSplittable()); + Assert.assertFalse(src.needsFormat()); + } + + @Test + public void testResidualFilterModeFail() throws IOException + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", "123988"), + testCatalog, + null, + ResidualFilterMode.FAIL, + 1024 + ); + final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("timestamp", "millis", null), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + final DruidException ex = Assert.assertThrows( + DruidException.class, + () -> { + final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir()); + reader.read().close(); + } + ); + Assert.assertTrue( + "Expected residual error: " + ex.getMessage(), + ex.getMessage().contains("residual") + ); + } + + private void createAndLoadTable(TableIdentifier id) throws IOException + { + final Table table = testCatalog.retrieveCatalog().createTable(id, tableSchema, PartitionSpec.unpartitioned()); + final String fname = UUID.randomUUID() + ".parquet"; + final File datafile = new File(warehouseDir.getAbsolutePath() + "/" + fname); + Assert.assertTrue(datafile.createNewFile()); + final OutputFile out = Files.localOutput(datafile); + final DataWriter writer = Parquet.writeData(out) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + final GenericRecord row = GenericRecord.create(tableSchema); + row.setField("id", tableData.get("id")); + row.setField("name", tableData.get("name")); + writer.write(row); + writer.close(); + final DataFile df = writer.toDataFile(); + table.newAppend().appendFile(df).commit(); + } + + private void dropTableFromCatalog(TableIdentifier id) + { + testCatalog.retrieveCatalog().dropTable(id); + } +} From 2845d46ce50b4d7f5fcb74ebe7af791db476a55f Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 2 Jun 2026 23:07:55 +0530 Subject: [PATCH 22/26] iceberg: drop unused InputRowSchema import in IcebergInputSourceTest --- .../org/apache/druid/iceberg/input/IcebergInputSourceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 69971b93e6c2..fdd891c69c60 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.LocalInputSource; From d04657393b269848dfe70f1416515d1372a476ff Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 2 Jun 2026 23:17:39 +0530 Subject: [PATCH 23/26] iceberg-arrow: null-guard InputStats in read() per nullable contract --- .../iceberg/input/IcebergArrowInputSourceReader.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java index b7c0c707b1d3..80f2156841fb 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java @@ -120,7 +120,7 @@ public IcebergArrowInputSourceReader( } @Override - public CloseableIterator read(final InputStats inputStats) throws IOException + public CloseableIterator read(@Nullable final InputStats inputStats) throws IOException { final TableScan scan = buildScan(); final CloseableIterable tasks = TableScanUtil.planTasks( @@ -131,7 +131,12 @@ public CloseableIterator read(final InputStats inputStats) throws IOEx ); final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true); final org.apache.iceberg.io.CloseableIterator batchIter = arrowReader.open(tasks); - return new ArrowInputRowIterator(batchIter, arrowReader, tasks, inputStats); + return new ArrowInputRowIterator( + batchIter, + arrowReader, + tasks, + inputStats != null ? inputStats : new NoopInputStats() + ); } @Override From 197afce43cf21617c8c0ebc0ec3ca0d895b33b74 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Tue, 2 Jun 2026 23:17:39 +0530 Subject: [PATCH 24/26] iceberg-arrow: regression test for null InputStats via no-arg read() --- .../input/IcebergArrowInputSourceTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java index 285aeda15753..0d37159af97b 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java @@ -88,6 +88,31 @@ public void tearDown() dropTableFromCatalog(tableIdentifier); } + @Test + public void testReadWithNullInputStatsDoesNotNpe() throws IOException + { + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + null, + null, + 1024 + ); + final InputRowSchema schemaWithMissingTs = new InputRowSchema( + new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + final InputSourceReader reader = src.reader(schemaWithMissingTs, null, FileUtils.createTempDir()); + try (org.apache.druid.java.util.common.parsers.CloseableIterator it = reader.read()) { + while (it.hasNext()) { + Assert.assertNotNull(it.next()); + } + } + } + @Test public void testIsNotSplittable() { From 6c37e3cd11f6c9ab95f8696701706b7b01962155 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 21:43:50 +0530 Subject: [PATCH 25/26] Fix Iceberg Arrow residual snapshot --- .../input/IcebergArrowInputSource.java | 5 +- .../input/IcebergArrowInputSourceTest.java | 85 ++++++++++++++++--- 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java index b79c8ff2700d..6433b6fe1ab1 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java @@ -84,9 +84,12 @@ public InputSourceReader reader( { final Table table = retrieveTable(); if (icebergFilter != null) { - final TableScan filteredScan = icebergFilter.filter( + TableScan filteredScan = icebergFilter.filter( table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) ); + if (getSnapshotTime() != null) { + filteredScan = filteredScan.asOfTime(getSnapshotTime().getMillis()); + } icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); } return new IcebergArrowInputSourceReader( diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java index 0d37159af97b..4d4818f73945 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceTest.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -159,24 +160,82 @@ public void testResidualFilterModeFail() throws IOException ); } + @Test + public void testResidualFilterModeFailUsesSnapshotTime() throws Exception + { + final String filterId = (String) tableData.get("id"); + dropTableFromCatalog(tableIdentifier); + final PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema) + .identity("id") + .build(); + final Table table = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema, partitionSpec); + appendRow(table, partitionSpec, tableData); + + final long afterPartitionedSnapshot = System.currentTimeMillis(); + Thread.sleep(10); + + table.updateSpec().removeField("id").commit(); + appendRow(table, table.spec(), ImmutableMap.of("id", filterId, "name", "Bar")); + + final IcebergArrowInputSource src = new IcebergArrowInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", filterId), + testCatalog, + org.apache.druid.java.util.common.DateTimes.utc(afterPartitionedSnapshot), + ResidualFilterMode.FAIL, + 1024 + ); + final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec(null, null, org.apache.druid.java.util.common.DateTimes.utc(0L)), + DimensionsSpec.builder().build(), + ColumnsFilter.all() + ); + + final InputSourceReader reader = src.reader(inputRowSchema, null, FileUtils.createTempDir()); + reader.read().close(); + } + private void createAndLoadTable(TableIdentifier id) throws IOException { final Table table = testCatalog.retrieveCatalog().createTable(id, tableSchema, PartitionSpec.unpartitioned()); + appendRow(table, PartitionSpec.unpartitioned(), tableData); + } + + private void appendRow(Table table, PartitionSpec partitionSpec, Map rowData) throws IOException + { final String fname = UUID.randomUUID() + ".parquet"; - final File datafile = new File(warehouseDir.getAbsolutePath() + "/" + fname); - Assert.assertTrue(datafile.createNewFile()); - final OutputFile out = Files.localOutput(datafile); - final DataWriter writer = Parquet.writeData(out) - .schema(tableSchema) - .createWriterFunc(GenericParquetWriter::create) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build(); + final File dataFile = new File(warehouseDir.getAbsolutePath() + "/" + fname); + Assert.assertTrue(dataFile.createNewFile()); + final OutputFile out = Files.localOutput(dataFile); final GenericRecord row = GenericRecord.create(tableSchema); - row.setField("id", tableData.get("id")); - row.setField("name", tableData.get("name")); - writer.write(row); - writer.close(); + row.setField("id", rowData.get("id")); + row.setField("name", rowData.get("name")); + final DataWriter writer; + if (partitionSpec.isUnpartitioned()) { + writer = Parquet.writeData(out) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(partitionSpec) + .build(); + } else { + final PartitionKey partitionKey = new PartitionKey(partitionSpec, tableSchema); + partitionKey.partition(row); + writer = Parquet.writeData(out) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(partitionSpec) + .withPartition(partitionKey) + .build(); + } + try { + writer.write(row); + } + finally { + writer.close(); + } final DataFile df = writer.toDataFile(); table.newAppend().appendFile(df).commit(); } From 7a467f81257505f0698aaf1e70e558aa2c9d12bc Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 22:10:09 +0530 Subject: [PATCH 26/26] Register Iceberg Arrow licenses --- .../druid-iceberg-extensions/pom.xml | 6 ++ licenses.yaml | 69 +++++++++++++++++++ pom.xml | 5 -- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index 88ae02f31aab..5e0feba1b3b4 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -764,6 +764,12 @@ org.apache.iceberg iceberg-arrow ${iceberg.core.version} + + + org.apache.arrow + arrow-memory-netty + + org.apache.arrow diff --git a/licenses.yaml b/licenses.yaml index 23f58a0d3b3d..ecd8de279ef7 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -7130,3 +7130,72 @@ license_name: Apache License version 2.0 version: 3.30.2-GA libraries: - org.javassist: javassist + +--- + +name: Apache Iceberg Arrow +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 1.10.0 +libraries: + - org.apache.iceberg: iceberg-arrow + +--- + +name: Apache Arrow +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 15.0.2 +libraries: + - org.apache.arrow: arrow-format + - org.apache.arrow: arrow-memory-core + - org.apache.arrow: arrow-memory-unsafe + - org.apache.arrow: arrow-vector +notices: + - arrow-format: | + Arrow Format + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-memory-core: | + Arrow Memory - Core + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-memory-unsafe: | + Arrow Memory - Unsafe + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + - arrow-vector: | + Arrow Vectors + Copyright 2024 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + +--- + +name: FlatBuffers Java API +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Apache License version 2.0 +version: 23.5.26 +libraries: + - com.google.flatbuffers: flatbuffers-java + +--- + +name: Eclipse Collections +license_category: binary +module: extensions-contrib/druid-iceberg-extensions +license_name: Eclipse Public License 1.0 +version: 11.1.0 +libraries: + - org.eclipse.collections: eclipse-collections + - org.eclipse.collections: eclipse-collections-api diff --git a/pom.xml b/pom.xml index 56fd919bf035..873f9536b073 100644 --- a/pom.xml +++ b/pom.xml @@ -439,11 +439,6 @@ arrow-memory-unsafe ${arrow.version} - - org.apache.arrow - arrow-memory-netty - ${arrow.version} - org.apache.iceberg iceberg-arrow