-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat: Added IcebergArrowInputSourceReader #19510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Shekharrajak
wants to merge
28
commits into
apache:master
Choose a base branch
from
Shekharrajak:feature/iceberg-arrow-reader
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
823018b
deps: add arrow 15.0.2 and iceberg-arrow to pom dependency management
Shekharrajak a873c4d
feat: add retrieveTable() to IcebergCatalog for direct Table object a…
Shekharrajak c420836
feat: add IcebergArrowInputSourceReader using iceberg-arrow vectorize…
Shekharrajak 866a093
feat: wire useArrowReader + arrowBatchSize into IcebergInputSource
Shekharrajak 2339330
test: add IcebergArrowInputSourceReaderTest; fix IcebergInputSourceTe…
Shekharrajak 8447ce0
style: fix checkstyle and forbidden-apis violations (imports, argumen…
Shekharrajak f151f5b
fix: switch arrow-memory-netty to arrow-memory-unsafe to fix CI Arrow…
Shekharrajak 87bdc7f
test: add regression for aggregator source column projection (current…
Shekharrajak c774363
fix: drive Iceberg scan projection from ColumnsFilter, mirroring Delt…
Shekharrajak 6db57a7
test: ColumnsFilter exclusion prunes unused columns at Iceberg scan
Shekharrajak a7fa10c
style: drop redundant comments per AGENTS.md hygiene
Shekharrajak 113948f
test: regression for residual FAIL mode bypassed by Arrow path
Shekharrajak 6a3ad85
fix: enforce residualFilterMode in Arrow reader path
Shekharrajak ed6c2ff
test: regression for parallel ingestion bypassing Arrow reader
Shekharrajak 498daa6
fix: route splittable contract through Arrow path when useArrowReader…
Shekharrajak 484ebea
fix(iceberg-arrow): open jdk.internal.misc for Arrow allocator on JDK 21
Shekharrajak bcbfe67
fix(iceberg-arrow): drop module surefire override; root pom argLine a…
Shekharrajak 2dedaa9
fix(iceberg-arrow): pin Arrow allocator to Unsafe to avoid Netty back…
Shekharrajak 2111b0a
fix(iceberg-arrow): pin explicit arrow versions in iceberg pom for do…
Shekharrajak bf71afb
iceberg: split useArrowReader into dedicated IcebergArrowInputSource …
Shekharrajak 3fbb73f
iceberg: add IcebergArrowInputSourceTest covering non-splittable and …
Shekharrajak 2845d46
iceberg: drop unused InputRowSchema import in IcebergInputSourceTest
Shekharrajak d046573
iceberg-arrow: null-guard InputStats in read() per nullable contract
Shekharrajak 197afce
iceberg-arrow: regression test for null InputStats via no-arg read()
Shekharrajak 6c37e3c
Fix Iceberg Arrow residual snapshot
Shekharrajak 7a467f8
Register Iceberg Arrow licenses
Shekharrajak df75d6d
Merge upstream master
Shekharrajak fd9fe66
Merge origin feature branch
Shekharrajak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
98 changes: 98 additions & 0 deletions
98
...g-extensions/src/main/java/org/apache/druid/iceberg/input/AbstractIcebergInputSource.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.iceberg.input; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.google.common.base.Preconditions; | ||
| import org.apache.druid.common.config.Configs; | ||
| import org.apache.druid.iceberg.filter.IcebergFilter; | ||
| import org.apache.iceberg.Table; | ||
| import org.joda.time.DateTime; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| public abstract class AbstractIcebergInputSource | ||
| { | ||
| protected final String tableName; | ||
| protected final String namespace; | ||
| protected final IcebergCatalog icebergCatalog; | ||
| protected final IcebergFilter icebergFilter; | ||
| protected final DateTime snapshotTime; | ||
| protected final ResidualFilterMode residualFilterMode; | ||
|
|
||
| protected AbstractIcebergInputSource( | ||
| final String tableName, | ||
| final String namespace, | ||
| @Nullable final IcebergFilter icebergFilter, | ||
| final IcebergCatalog icebergCatalog, | ||
| @Nullable final DateTime snapshotTime, | ||
| @Nullable final ResidualFilterMode residualFilterMode | ||
| ) | ||
| { | ||
| this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); | ||
| this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); | ||
| this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); | ||
| this.icebergFilter = icebergFilter; | ||
| this.snapshotTime = snapshotTime; | ||
| this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE); | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public String getTableName() | ||
| { | ||
| return tableName; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public String getNamespace() | ||
| { | ||
| return namespace; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public IcebergCatalog getIcebergCatalog() | ||
| { | ||
| return icebergCatalog; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public IcebergFilter getIcebergFilter() | ||
| { | ||
| return icebergFilter; | ||
| } | ||
|
|
||
| @Nullable | ||
| @JsonProperty | ||
| public DateTime getSnapshotTime() | ||
| { | ||
| return snapshotTime; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public ResidualFilterMode getResidualFilterMode() | ||
| { | ||
| return residualFilterMode; | ||
| } | ||
|
|
||
| protected Table retrieveTable() | ||
| { | ||
| return icebergCatalog.retrieveTable(namespace, tableName); | ||
| } | ||
| } |
104 changes: 104 additions & 0 deletions
104
...berg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSource.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.iceberg.input; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import org.apache.druid.data.input.InputFormat; | ||
| import org.apache.druid.data.input.InputRowSchema; | ||
| import org.apache.druid.data.input.InputSource; | ||
| import org.apache.druid.data.input.InputSourceReader; | ||
| import org.apache.druid.iceberg.filter.IcebergFilter; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableScan; | ||
| import org.joda.time.DateTime; | ||
|
|
||
| import javax.annotation.Nullable; | ||
| import java.io.File; | ||
|
|
||
| public class IcebergArrowInputSource extends AbstractIcebergInputSource implements InputSource | ||
| { | ||
| public static final String TYPE_KEY = "iceberg_arrow"; | ||
|
|
||
| @JsonProperty | ||
| private final int arrowBatchSize; | ||
|
|
||
| @JsonCreator | ||
| public IcebergArrowInputSource( | ||
| @JsonProperty("tableName") String tableName, | ||
| @JsonProperty("namespace") String namespace, | ||
| @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, | ||
| @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, | ||
| @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime, | ||
| @JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode, | ||
| @JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize | ||
| ) | ||
| { | ||
| super(tableName, namespace, icebergFilter, icebergCatalog, snapshotTime, residualFilterMode); | ||
| this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0 | ||
| ? arrowBatchSize | ||
| : IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public int getArrowBatchSize() | ||
| { | ||
| return arrowBatchSize; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean needsFormat() | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isSplittable() | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public InputSourceReader reader( | ||
| InputRowSchema inputRowSchema, | ||
| @Nullable InputFormat inputFormat, | ||
| File temporaryDirectory | ||
| ) | ||
| { | ||
| final Table table = retrieveTable(); | ||
| if (icebergFilter != null) { | ||
| TableScan filteredScan = icebergFilter.filter( | ||
| table.newScan().caseSensitive(icebergCatalog.isCaseSensitive()) | ||
| ); | ||
| if (getSnapshotTime() != null) { | ||
| filteredScan = filteredScan.asOfTime(getSnapshotTime().getMillis()); | ||
| } | ||
| icebergCatalog.enforceResidualMode(filteredScan, getResidualFilterMode()); | ||
|
Shekharrajak marked this conversation as resolved.
|
||
| } | ||
| return new IcebergArrowInputSourceReader( | ||
| table, | ||
| icebergFilter, | ||
| snapshotTime, | ||
| icebergCatalog.isCaseSensitive(), | ||
| inputRowSchema, | ||
| arrowBatchSize | ||
| ); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.