Skip to content

dynamic filtering development branch#516

Draft
jayshrivastava wants to merge 7 commits into
mainfrom
dynamic-filtering-dev-df-55
Draft

dynamic filtering development branch#516
jayshrivastava wants to merge 7 commits into
mainfrom
dynamic-filtering-dev-df-55

Conversation

@jayshrivastava

@jayshrivastava jayshrivastava commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

Goal

Closes #530

I would like to create a development branch to work on dynamic filtering. Datafusion 54, which is currently used in main does not have this change: apache/datafusion#21055.

This PR aims to create a development branch using apache/datafusion@01bf68c, which is the latest commit on apache/datafusion:main as of writing.

This is almost like updating main to datafusion 55. I say almost because datafusion 55 is not released yet and there may be other changes that go in. See apache/datafusion#22393.

Once Datafusion 55 releases, we can do the upgrade on this branch and merge it in to datafusion-distributed:main. I don't want all the work in this PR to go to waste.

Summary of Significant Changes

  1. Distribute byte ranges across partitions: feat: lower repartition_file_min_size default from 10 MiB to 1 MiB apache/datafusion#22439
  • Lowers repartition_file_min_size from 10 MiB to 1 MiB. The PR explicitly calls out TPC-DS SF1 dimension tables. Files may be duplicated across multiple partitions where but each partition reads a different byte range (this is hidden by ...., but we know from the correctness tests that nothing broke). A lot of tpcds queries now split across target_partitions instead of staying under-partitioned. In the tpcds plan tests, we use target_partitions=3.

Example:

-                │     t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
-                │     t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
-                │     t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
-                │     t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
+                │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
+                │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
+                │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
+                │     t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_week_seq, d_day_name], file_type=parquet, predicate=DynamicFilter [ empty ]
  1. dynamic_rg_pruning=eligible display changes feat(parquet): intra-file early stopping via statistics + dynamic filters apache/datafusion#22450
  • Adds dynamic_rg_pruning=eligible to leaf nodes. Lots of snapshots change because of this.
  1. convert inner joins to semi joins when equivalent perf: Convert inner joins to semi joins when equivalent apache/datafusion#22652.
  • Left semi joins cannot be broadcast. insert_broadcast_execs correctly rejects these. However, in _inject_network_boundaries, we would assume that every CollectLeft HashJoin can be broadcast. So, I added some code (look for collect_left_hash_join_requires_single_task) to check for a BroadcastExec before committing to doing a broadcast join. Without this change, correctness tests fail. I honestly think we previously did not have good coverage for LeftSemi, which is why this bug never appeared before.
  • Why can't LeftSemi be broadcast? A left semi join means "emit a row on the left side of the join if it matches any row on the right". You cannot broadcast the left side of a LeftSemi join because you duplicate the left side across tasks, which may result in duplicate rows.

Example: TPCDS query 1 (note the new LeftSemi join)

Before
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [c_customer_id@0 ASC NULLS LAST], fetch=100
│   [Stage 9] => NetworkCoalesceExec: output_partitions=12, input_tasks=4
└──────────────────────────────────────────────────
  ┌───── Stage 9 ── Tasks: t0:[p0..p2] t1:[p0..p2] t2:[p0..p2] t3:[p0..p2]
  │ SortExec: TopK(fetch=100), expr=[c_customer_id@0 ASC NULLS LAST], preserve_partitioning=[true]
  │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ctr_store_sk@0, ctr_store_sk@1)], filter=CAST(ctr_total_return@0 AS Decimal128(30, 15)) > avg(ctr2.ctr_total_return) * Float64(1.2)@1, projection=[c_customer_id@2]
  │     CoalescePartitionsExec
  │       [Stage 5] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=4
  │     ProjectionExec: expr=[CAST(CAST(avg(ctr2.ctr_total_return)@1 AS Float64) * 1.2 AS Decimal128(30, 15)) as avg(ctr2.ctr_total_return) * Float64(1.2), ctr_store_sk@0 as ctr_store_sk]
  │       AggregateExec: mode=FinalPartitioned, gby=[ctr_store_sk@0 as ctr_store_sk], aggr=[avg(ctr2.ctr_total_return)]
  │         [Stage 8] => NetworkShuffleExec: output_partitions=3, input_tasks=2
  └──────────────────────────────────────────────────
    ┌───── Stage 5 ── Tasks: t0:[p0..p11] t1:[p12..p23] t2:[p24..p35] t3:[p36..p47]
    │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
    │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ctr_customer_sk@0, c_customer_sk@0)], projection=[ctr_store_sk@1, ctr_total_return@2, c_customer_id@4]
    │     CoalescePartitionsExec
    │       [Stage 4] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=2
    │     RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2
    │       DistributedLeafExec:
    │         t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
    │         t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
    │         t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-3.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
    │         t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-3.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
    └──────────────────────────────────────────────────
      ┌───── Stage 4 ── Tasks: t0:[p0..p11] t1:[p12..p23]
      │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
      │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_store_sk@0, ctr_store_sk@1)], projection=[ctr_customer_sk@1, ctr_store_sk@2, ctr_total_return@3]
      │     CoalescePartitionsExec
      │       [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=6, input_tasks=4
      │     ProjectionExec: expr=[sr_customer_sk@0 as ctr_customer_sk, sr_store_sk@1 as ctr_store_sk, sum(store_returns.sr_return_amt)@2 as ctr_total_return]
      │       AggregateExec: mode=FinalPartitioned, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
      │         [Stage 3] => NetworkShuffleExec: output_partitions=3, input_tasks=4
      └──────────────────────────────────────────────────
        ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p6..p11] t2:[p12..p17] t3:[p18..p23]
        │ BroadcastExec: input_partitions=3, consumer_tasks=2, output_partitions=6
        │   FilterExec: s_state@1 = TN, projection=[s_store_sk@0]
        │     RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2
        │       DistributedLeafExec:
        │         t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-1.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
        │         t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-2.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
        │         t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-3.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
        │         t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-3.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
        └──────────────────────────────────────────────────
        ┌───── Stage 3 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5]
        │ RepartitionExec: partitioning=Hash([sr_customer_sk@0, sr_store_sk@1], 6), input_partitions=3
        │   AggregateExec: mode=Partial, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
        │     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, sr_returned_date_sk@0)], projection=[sr_customer_sk@2, sr_store_sk@3, sr_return_amt@4]
        │       CoalescePartitionsExec
        │         [Stage 2] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=4
        │       DistributedLeafExec:
        │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        └──────────────────────────────────────────────────
          ┌───── Stage 2 ── Tasks: t0:[p0..p11] t1:[p12..p23] t2:[p24..p35] t3:[p36..p47]
          │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
          │   FilterExec: d_year@1 = 2000, projection=[d_date_sk@0]
          │     RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2
          │       DistributedLeafExec:
          │         t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          └──────────────────────────────────────────────────
      ┌───── Stage 8 ── Tasks: t0:[p0..p11] t1:[p0..p11]
      │ RepartitionExec: partitioning=Hash([ctr_store_sk@0], 12), input_partitions=3
      │   AggregateExec: mode=Partial, gby=[ctr_store_sk@0 as ctr_store_sk], aggr=[avg(ctr2.ctr_total_return)]
      │     ProjectionExec: expr=[sr_store_sk@1 as ctr_store_sk, sum(store_returns.sr_return_amt)@2 as ctr_total_return]
      │       AggregateExec: mode=FinalPartitioned, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
      │         [Stage 7] => NetworkShuffleExec: output_partitions=3, input_tasks=4
      └──────────────────────────────────────────────────
        ┌───── Stage 7 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5]
        │ RepartitionExec: partitioning=Hash([sr_customer_sk@0, sr_store_sk@1], 6), input_partitions=3
        │   AggregateExec: mode=Partial, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
        │     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, sr_returned_date_sk@0)], projection=[sr_customer_sk@2, sr_store_sk@3, sr_return_amt@4]
        │       CoalescePartitionsExec
        │         [Stage 6] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=4
        │       DistributedLeafExec:
        │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        │         t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        └──────────────────────────────────────────────────
          ┌───── Stage 6 ── Tasks: t0:[p0..p11] t1:[p12..p23] t2:[p24..p35] t3:[p36..p47]
          │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
          │   FilterExec: d_year@1 = 2000, projection=[d_date_sk@0]
          │     RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2
          │       DistributedLeafExec:
          │         t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          │         t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
          └──────────────────────────────────────────────────

After
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [c_customer_id@0 ASC NULLS LAST], fetch=100
│   SortExec: TopK(fetch=100), expr=[c_customer_id@0 ASC NULLS LAST], preserve_partitioning=[true]
│     HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(ctr_store_sk@0, ctr_store_sk@1)], filter=CAST(ctr_total_return@0 AS Decimal128(30, 15)) > avg(ctr2.ctr_total_return) * Float64(1.2)@1, projection=[c_customer_id@2]
│       CoalescePartitionsExec
│         [Stage 5] => NetworkCoalesceExec: output_partitions=12, input_tasks=4
│       ProjectionExec: expr=[CAST(CAST(avg(ctr2.ctr_total_return)@1 AS Float64) * 1.2 AS Decimal128(30, 15)) as avg(ctr2.ctr_total_return) * Float64(1.2), ctr_store_sk@0 as ctr_store_sk]
│         AggregateExec: mode=FinalPartitioned, gby=[ctr_store_sk@0 as ctr_store_sk], aggr=[avg(ctr2.ctr_total_return)]
│           [Stage 8] => NetworkShuffleExec: output_partitions=3, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 5 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] t3:[p9..p11]
  │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ctr_customer_sk@0, c_customer_sk@0)], projection=[ctr_store_sk@1, ctr_total_return@2, c_customer_id@4]
  │   CoalescePartitionsExec
  │     [Stage 4] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=2
  │   DistributedLeafExec:
  │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
  │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-3.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
  │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-3.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
  │     t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/customer/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/customer/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/customer/part-3.parquet:<int>..<int>]]}, projection=[c_customer_sk, c_customer_id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
  └──────────────────────────────────────────────────
    ┌───── Stage 4 ── Tasks: t0:[p0..p11] t1:[p12..p23]
    │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
    │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_store_sk@0, ctr_store_sk@1)], projection=[ctr_customer_sk@1, ctr_store_sk@2, ctr_total_return@3]
    │     CoalescePartitionsExec
    │       [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=6, input_tasks=4
    │     ProjectionExec: expr=[sr_customer_sk@0 as ctr_customer_sk, sr_store_sk@1 as ctr_store_sk, sum(store_returns.sr_return_amt)@2 as ctr_total_return]
    │       AggregateExec: mode=FinalPartitioned, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
    │         [Stage 3] => NetworkShuffleExec: output_partitions=3, input_tasks=4
    └──────────────────────────────────────────────────
      ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p6..p11] t2:[p12..p17] t3:[p18..p23]
      │ BroadcastExec: input_partitions=3, consumer_tasks=2, output_partitions=6
      │   FilterExec: s_state@1 = TN, projection=[s_store_sk@0]
      │     RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=2
      │       DistributedLeafExec:
      │         t0: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-1.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
      │         t1: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-2.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
      │         t2: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store/part-3.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
      │         t3: DataSourceExec: file_groups={2 groups: [[/testdata/tpcds/plans_sf1_partitions4/store/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store/part-3.parquet:<int>..<int>]]}, projection=[s_store_sk, s_state], file_type=parquet, predicate=s_state@24 = TN, pruning_predicate=s_state_null_count@2 != row_count@3 AND s_state_min@0 <= TN AND TN <= s_state_max@1, required_guarantees=[s_state in (TN)]
      └──────────────────────────────────────────────────
      ┌───── Stage 3 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5]
      │ RepartitionExec: partitioning=Hash([sr_customer_sk@0, sr_store_sk@1], 6), input_partitions=3
      │   AggregateExec: mode=Partial, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
      │     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, sr_returned_date_sk@0)], projection=[sr_customer_sk@2, sr_store_sk@3, sr_return_amt@4]
      │       CoalescePartitionsExec
      │         [Stage 2] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=4
      │       DistributedLeafExec:
      │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      └──────────────────────────────────────────────────
        ┌───── Stage 2 ── Tasks: t0:[p0..p11] t1:[p12..p23] t2:[p24..p35] t3:[p36..p47]
        │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
        │   FilterExec: d_year@1 = 2000, projection=[d_date_sk@0]
        │     DistributedLeafExec:
        │       t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        └──────────────────────────────────────────────────
    ┌───── Stage 8 ── Tasks: t0:[p0..p2] t1:[p0..p2]
    │ RepartitionExec: partitioning=Hash([ctr_store_sk@0], 3), input_partitions=3
    │   AggregateExec: mode=Partial, gby=[ctr_store_sk@0 as ctr_store_sk], aggr=[avg(ctr2.ctr_total_return)]
    │     ProjectionExec: expr=[sr_store_sk@1 as ctr_store_sk, sum(store_returns.sr_return_amt)@2 as ctr_total_return]
    │       AggregateExec: mode=FinalPartitioned, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
    │         [Stage 7] => NetworkShuffleExec: output_partitions=3, input_tasks=4
    └──────────────────────────────────────────────────
      ┌───── Stage 7 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] t3:[p0..p5]
      │ RepartitionExec: partitioning=Hash([sr_customer_sk@0, sr_store_sk@1], 6), input_partitions=3
      │   AggregateExec: mode=Partial, gby=[sr_customer_sk@0 as sr_customer_sk, sr_store_sk@1 as sr_store_sk], aggr=[sum(store_returns.sr_return_amt)]
      │     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, sr_returned_date_sk@0)], projection=[sr_customer_sk@2, sr_store_sk@3, sr_return_amt@4]
      │       CoalescePartitionsExec
      │         [Stage 6] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=12, input_tasks=4
      │       DistributedLeafExec:
      │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      │         t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/store_returns/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/store_returns/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/store_returns/part-3.parquet:<int>..<int>]]}, projection=[sr_returned_date_sk, sr_customer_sk, sr_store_sk, sr_return_amt], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ], dynamic_rg_pruning=eligible
      └──────────────────────────────────────────────────
        ┌───── Stage 6 ── Tasks: t0:[p0..p11] t1:[p12..p23] t2:[p24..p35] t3:[p36..p47]
        │ BroadcastExec: input_partitions=3, consumer_tasks=4, output_partitions=12
        │   FilterExec: d_year@1 = 2000, projection=[d_date_sk@0]
        │     DistributedLeafExec:
        │       t0: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t1: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t2: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        │       t3: DataSourceExec: file_groups={3 groups: [[/testdata/tpcds/plans_sf1_partitions4/date_dim/part-0.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-1.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>, /testdata/tpcds/plans_sf1_partitions4/date_dim/part-2.parquet:<int>..<int>], [/testdata/tpcds/plans_sf1_partitions4/date_dim/part-3.parquet:<int>..<int>]]}, projection=[d_date_sk, d_year], file_type=parquet, predicate=d_year@6 = 2000, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 2000 AND 2000 <= d_year_max@1, required_guarantees=[d_year in (2000)]
        └──────────────────────────────────────────────────

@jayshrivastava jayshrivastava force-pushed the dynamic-filtering-dev-df-55 branch 3 times, most recently from 742db96 to 1b6edbb Compare July 2, 2026 15:00
@jayshrivastava jayshrivastava force-pushed the dynamic-filtering-dev-df-55 branch from bbb829d to b95034a Compare July 2, 2026 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[dynamic filtering] 1. create development branch with proto converter

1 participant