Optimize parquet row filter auto strategy with adaptive fallback#9956
Optimize parquet row filter auto strategy with adaptive fallback#9956hhhizzz wants to merge 39 commits into
Conversation
|
👋 |
👋🏻 I find the result is still not stable next day I publish it, I can repro some regression in some rare case, still working on it. |
|
Yeah, we are at the point where the code is already pretty fast, so additional optimizations get harder and harder |
…ack-pr' into codex/parquet-reader-auto-fallback-pr
|
A bit of context on how this PR evolved. The initial motivation was #8565: predicate pushdown is not always cheaper than scanning when the produced That turned out to be incomplete. Page pruning means the loaded pages may be sparse, and the previous mask path could assume rows were available even when their pages had not been loaded. So the work split into two parts:
I also tried several purely static rules based on selectivity, projection shape, and data type. Some helped, but the results were fragile: rules that improved one fragmented case could regress sparse output reads or cacheable predicate cases. In particular, string / variable-width cases were easy to overfit. That is why the final design moved toward a small adaptive cost model instead of a larger pile of static heuristics. The current implementation is roughly:
I also added focused benchmark cases after seeing that the original benchmark suite did not clearly expose the cost-model-sensitive cases. The goal was to cover both the original fragmented-selection cliff and the cases where an overly aggressive rule could regress. So the PR is larger than a single heuristic change because the important part was separating the concepts:
That separation is what makes the |
|
Thank you @hhhizzz -- I will try and review this later today or tomorrw |
|
(I am not likely going to be able to review 8k lines in detail, however, so I will probably look at the high level first) |
c472348 to
4d59bcd
Compare
Thanks for taking a look at this PR! I completely understand that an 8k-line diff is daunting to review in detail. To help make the review process easier, I wanted to clarify only about 3,450 lines are production code, while the remaining 4,800+ lines are benchmarks and extensive unit/integration tests. If you still feel this is too large to review as a single PR, I would be more than happy to split this into smaller , incremental PRs.😄 Here is how we can cleanly divide the work:
|
Please do...that would help me, and would provide a baseline to compare the improvements against. |
|
Closed the PR as a reference, I'll split it to smaller PRs. |
|
@hhhizzz I just thought about coming back to #9118 #9415 and saw your work here. I think it looks like a better framework for tackling the issues than my PRs:
I'll take a closer look at #10135 and try to help with the next PRs coming! 🚀 |
|
One thing @adriangb and @zhuqi-lucas and I have noticed in DataFusion is that getting heuristics to work well is very challenging -- for example cutoff values often vary from architecture to architecture (e.g. is 32 contiguous 1s good, or should it be 64?) One thing we have been exploring is a more dynamic approach -- aka to switch the predicate evaluation strategy at certain times when the decoder naturally has to re-create some state, such as between row groups, like in this PR: It seems as if you have taken a similar approach in this PR
(caveat I have not had a chance to read this one carefully, and for that I apologize) I think we had been planning to put more of the adaptivity at a higher level (DataFusion specifically) as it has more information about things like statistics, and cross file predicate selectivity. I wonder if you have thought about where these auto adaptive decisions would best be made. I do think the APIs you have outlined allow for both automatic and manually overriding (e.g. DataFusion could override the decisions made automatically) which is interesting |
The pattern that seems to work well is:
That last case specifically applies here:
|
|
I think the best model is multi-level adaptivity. DataFusion has more high-level context, such as query semantics, file / row-group statistics, projected columns across the whole scan, and cross-file predicate selectivity. That makes it a good place to decide whether a However, the part this PR is trying to handle is lower-level. The actual shape of the final So I see the responsibilities as:
The explicit policy APIs are important because they still allow DataFusion to override the automatic choice. But I think the default |
alamb
left a comment
There was a problem hiding this comment.
I am trying to grok this PR and figure out how to split it up and make progress.
This PR makes RowSelectionPolicy::Auto more cost-aware. Instead of treating predicate pushdown as always beneficial once planned, Auto now chooses among:
selector-backed pushdown when it can skip useful work;
mask-backed execution when fragmented selections are better represented as a dense bitmap;
adaptive post-filter execution when pushdown is unlikely to save enough decoding work.
As we discussed earlier, I agree that the multiple levels of adaptive filtering is likely what is needed for a really state of the art parquet decoding pipeline.
However, I am still struggling to figure out the most maintainable split for the functionality -- for example, adding "adaptive post filter execution" in the reader itself that sounds a lot like what @adriangb and @zhuqi-lucas are planning to implement downstram in DataFusion
I would say in general we seem to be moving more towards the higher level (DataFusion) potentially changing strategies at each Row Group boundary (and migating work across cores, etc), . There are also considerations like using predicate pushdown changes the IO pattern in the reader, which the higher level may want more control over.... 🤔
As you pointed out recently, there are some types of adaptivity that only the parquet reader itself will be able to do (for example, changing from mask --> RowSelection representation.
What do you think about first starting with just "row selection adapativity" feature and then contemplating Row Group selection ?
See also #10141 from @haohuaijin |
Which issue does this PR close?
Rationale for this change
RowFiltercan be much slower than a full scan when predicate pushdown produces a highly fragmentedRowSelection. In that shape, the reader spends substantial time repeatedly skipping and decoding tiny row runs. #8565 showed an extreme case where row-filter pushdown was around 10x slower than scanning and filtering afterwards.This PR makes
RowSelectionPolicy::Automore cost-aware. Instead of treating predicate pushdown as always beneficial once planned,Autonow chooses among:This is not intended to disable predicate evaluation. It changes where the predicate is evaluated when the observed row-selection shape suggests that pushdown overhead is likely to dominate.
This PR also fixes a correctness issue in the explicit
Maskpath. With sparse page-loaded ranges, a mask-backed read plan could previously try to consume selected rows outside the pages that were actually loaded, causing decoding failures. Loaded row ranges are now represented explicitly, and explicitMaskcan safely execute over sparse page-loaded data.What changes are included in this PR?
Auto strategy and cost model
Auto.CostModelObservation/ decision reasons so the reader can explain why it chose pushdown or post-filter execution.Mask / selector planning
MaskandSelectorsbehavior intact.Autoconservative around sparse page-loaded ranges.Maskno longer assumes all page data is dense.Post-filter execution
RowFilter, and then projects back to the caller-requested output columns.try_next_readerhandoff paths.Benchmarks and tests
arrow_reader_row_filterbenchmarks with strategy-sensitive cases.Maskcorrectness;Autostrategy decisions;Are these changes tested?
Yes.
Unit / integration validation:
cargo fmt -p parquet -- --checkgit diff --checkcargo test -p parquet --lib arrow::push_decodercargo test -p parquet --lib arrow::arrow_reader::read_plancargo test -p parquet --lib arrow::arrow_reader::selectioncargo test -p parquet --libcargo test -p parquet --test arrow_reader --all-featurescargo bench -p parquet --bench arrow_reader_row_filter --features arrow,async --no-runcargo clippy -p parquet --all-targets --all-features -- -D warningscargo +nightly doc --document-private-items --no-deps --workspace --all-featuresBenchmark evidence:
Lower
current/mainis better.arrow_reader_row_filterSummary across the
arrow_reader_row_filterbenchmark cases:The improvement is concentrated in async row-filter cases where fragmented pushdown previously paid extra planning/selection overhead. Sync cases are broadly neutral.
Most notable async improvements:
utf8View <> ''int64 > 90int64 > 90utf8View <> ''The remaining cases are mostly within noise or small regressions. The largest sync regressions in this run were around +1.5%, while the aggregate sync result was +0.28%.
TPC-DS with predicate pushdown enabled (SF10 on a AMD64 machine)
Against
main, with predicate pushdown enabled, aggregate current speedup was:Largest median improvements:
q9was the largest improvement and was stable: current was faster in 10/10 rounds.Largest median regressions:
Overall, the microbenchmark shows the intended improvement in async fragmented-row-filter cases, while sync behavior remains approximately neutral. The TPC-DS run shows a positive aggregate result with several large stable wins, but also identifies
q2as the main remaining regression to investigate.Are there any user-facing changes?
No intended breaking API changes.
RowSelectionPolicy::Automay choose different internal execution strategies than before. ExplicitMaskandSelectorspolicies remain available for callers that want fixed behavior.The
Maskexecution path is now more robust for sparse page-loaded ranges, which makes future use ofMasksafer in page-index / fragmented-selection cases.