Skip to content

[branch-0.9] Cherry pick feat!(datafusion): enable parallel file scanning with eager task bucketing (#2298)#18

Draft
toutane wants to merge 16 commits intobranch-0.9-cherry-pick-8from
branch-0.9-cherry-pick-9
Draft

[branch-0.9] Cherry pick feat!(datafusion): enable parallel file scanning with eager task bucketing (#2298)#18
toutane wants to merge 16 commits intobranch-0.9-cherry-pick-8from
branch-0.9-cherry-pick-9

Conversation

toutane and others added 16 commits May 5, 2026 14:56
…itionedScan for parallel file scanning

(cherry picked from commit b9819b4)
Co-authored-by: Tim Saucer <timsaucer@gmail.com>
(cherry picked from commit ec1bd37)
(cherry picked from commit 75e521d)
…identity-hash partitioning

Replace the one-task-per-partition layout in IcebergPartitionedScan with
N buckets sized from the session's target_partitions. When the table's
default spec exposes identity-transform columns and every task carries
the corresponding partition values, tasks are bucketed by hashing those
values via DataFusion's REPARTITION_RANDOM_STATE so the resulting
partitioning matches what RepartitionExec would produce. The scan then
declares Partitioning::Hash(exprs, N), letting downstream joins and
aggregates skip an extra repartition.

Hash declaration is conservative and only stands when:
  - the table has a single partition spec (no spec evolution)
  - every identity source column is present in the output projection
  - every column type is supported by literal_to_array
  - every task supplied a full identity key
Any miss collapses to UnknownPartitioning(N) while bucketing falls
back to a hash of data_file_path so partitions still distribute.

IcebergPartitionedScan now stores Vec<Vec<FileScanTask>> and execute(i)
streams every task in buckets[i] through to_arrow_with_tasks. Bucket
count is capped at min(target_partitions, num_files), and an empty
table still yields zero partitions to avoid out-of-bounds execute calls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit 9edd54a)
`IcebergPartitionedTableProvider::supports_filters_pushdown` previously
returned `Inexact` for every filter, forcing DataFusion to re-evaluate
even filters that Iceberg's manifest-level pruning has fully resolved.
Per-filter the provider now returns `Exact` when both:
  - the iceberg conversion can represent the filter, so manifest pruning
    will remove every row that fails it, and
  - every leaf is a comparison or null check against an identity-
    partition column with a literal RHS.

Identity-partitioned column names are cached at `try_new` from the
table's default spec; tables with spec evolution (>1 historical specs)
fall back to an empty set so all filters stay `Inexact`. Supported
shapes: =, !=, <, <=, >, >=, IS NULL, IS NOT NULL, IN/NOT IN, plus
AND/OR/NOT compositions of the above. Every other shape is `Inexact`.

`convert_filter_to_predicate` is promoted to `pub(crate)` so the
provider can probe convertibility per filter without rebuilding the
whole AND-collapsed predicate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit add5e35)
…column intersection

Previously identity_partition_col_names returned an empty set whenever
the table had more than one historical partition spec, forcing every
filter back to Inexact under spec evolution. This was overly
conservative: Iceberg evaluates partition predicates against each
manifest's own spec, so a column that is identity-partitioned in every
spec is fully prunable across the entire table regardless of which spec
a given file was written under.

Replace the multi-spec gate with an intersection across every spec's
identity-source set. A column survives only if every spec includes it
with Transform::Identity; columns that appear with non-identity
transforms in some spec, or are missing from a spec entirely, are
dropped. The result remains an honest set of columns for which Exact
pushdown is provably safe across all surviving files.

Hash bucketing (compute_identity_cols) keeps its single-spec gate
because slot-order alignment with the table's default spec depends on
each task carrying its own spec id, which the native plan flow does
not yet do.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
(cherry picked from commit e8771e4)
…via per-column intersection"

This reverts commit b2613e3.

(cherry picked from commit 826f054)
(cherry picked from commit aba4523)
…shdown"

This reverts commit 6d0ed4c.

(cherry picked from commit 4381f00)
(cherry picked from commit 598c5de)
…ergTableProvider

IcebergPartitionedTableProvider and IcebergPartitionedScan were introduced
to enable parallel file scanning by bucketing FileScanTasks across DataFusion
partitions. However, maintaining two TableProvider implementations is
redundant: the new provider is strictly more capable, and its degenerate case
(target_partitions=1) reproduces the old single-partition behavior exactly.

This commit folds the partitioned provider into IcebergTableProvider and
the partitioned scan into IcebergTableScan, eliminating the parallel types.

Changes:
- IcebergTableProvider::scan() now eagerly calls plan_files() and distributes
FileScanTasks into buckets using the same identity-hash strategy
(REPARTITION_RANDOM_STATE + create_hashes) that was in
IcebergPartitionedTableProvider, enabling Partitioning::Hash declarations
that align with DataFusion's RepartitionExec.
- IcebergTableScan gains a new_with_tasks() constructor that accepts
pre-planned buckets and a caller-supplied Partitioning. execute(i) streams
the tasks in buckets[i] via TableScan::to_arrow_with_tasks, rebuilding
the TableScan per-partition to avoid serializing PlanContext Arc-shared
caches across workers.
- The original new() constructor and the to_arrow() lazy path are kept
unchanged for IcebergStaticTableProvider, which does not pre-plan tasks.
- Limit slicing (try_filter_map truncation) from the old IcebergTableScan
is preserved in both execution paths.
- Bucketing helpers (IdentityCol, compute_identity_cols, bucket_tasks,
identity_hash, fallback_hash, literal_to_array, is_supported_dtype) are
moved verbatim into a new private table/bucketing.rs module.
- Unit tests from partitioned.rs are migrated to table/mod.rs and updated
to use IcebergTableProvider and IcebergTableScan.
- integration_datafusion_test.rs: fix test_provider_plan_stream_schema to
call execute(0) instead of execute(1). The old call worked only because
the previous IcebergTableScan silently ignored the partition index.

(cherry picked from commit d2e5e04)
(cherry picked from commit 23f3d8f)
Review pass over the partitioned-scan branch ahead of upstream
contribution.

- Rename `TableScan::to_arrow_with_tasks` to `to_arrow_from_tasks` —
  `from` better signals that the tasks are the input source rather
  than a builder-style modifier.
- Restructure the doc with a `# Correctness` section that calls out
  the projection/filter contract while clarifying that reader-side
  configuration (concurrency, batch size, row-group filtering, row
  selection) is taken from `self`.
- Make `IcebergTableScan::new` and `new_with_tasks` `pub` (were
  `pub(crate)`) so external users can construct the node directly,
  matching the public visibility of the struct itself.
- Drop the `convert_filters_to_predicate` re-export from
  `physical_plan/mod.rs`: it was unused outside the module.

- Extract a private `new_inner` constructor on `IcebergTableScan` so
  `new` and `new_with_tasks` share a single source of truth for the
  `PlanProperties` / projection / predicate setup.
- Split `IcebergTableScan::execute` into a linear pipeline backed by
  three helpers: `build_table_scan` (synchronous scan-builder
  plumbing), `build_record_batch_stream` (async stream construction
  for the lazy/eager modes), and `apply_limit`.
- Trim the `IcebergTableScan` struct doc and field comments to match
  the rest of the file's style; drop the verbose `to_arrow_with_tasks`
  rationale (the `# Correctness` doc carries the load-bearing info).
- Tighten `DisplayAs::fmt_as`: remove the file-path enumeration (file
  count alone is enough for `EXPLAIN`) and factor the common prefix.
- Trim several narrating comments in `table/mod.rs` and the module
  doc that duplicated information already evident from the code.

- Add `test_identity_partitioned_declares_hash`: verifies the happy
  path where an identity-partitioned table with the partition column
  in the projection produces `Partitioning::Hash` referencing that
  column. This was the main missing coverage for the bucketing logic.
- Add `test_projection_without_partition_col_falls_back_to_unknown`:
  verifies the `compute_identity_cols → None` branch when the
  projection omits the partition source column.
- Add helpers (`make_partitioned_catalog_and_table_for_bucketing`,
  `append_partitioned_fake_data_files`) to build identity-partitioned
  fixtures without writing real Parquet files.

(cherry picked from commit b1f2d66)
(cherry picked from commit 616dcdc)
IcebergTableProvider::scan now plans files eagerly and buckets them
across DataFusion partitions before returning the ExecutionPlan.
As a result, IcebergTableScan's DisplayAs output always includes
`buckets:[N] file_count:[M]` - even for unpartitioned tables where
N = 1.

Update the four .slt files whose EXPLAIN snapshots were missing this
suffix, and fix the like_predicate_pushdown snapshots that also had
a stale input_partitions count on RepartitionExec (the table now has
multiple files across multiple buckets).

(cherry picked from commit 6ae4a71)
(cherry picked from commit 581dde7)
(cherry picked from commit 7ad9dcc)
(cherry picked from commit 7ff1f6d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants