[SPARK-57568][SQL] Support TimeType in Parquet/ORC aggregate push-down#56846
Open
MaxGekk wants to merge 3 commits into
Open
[SPARK-57568][SQL] Support TimeType in Parquet/ORC aggregate push-down#56846MaxGekk wants to merge 3 commits into
MaxGekk wants to merge 3 commits into
Conversation
### What changes were proposed in this pull request? Enable MIN/MAX/COUNT aggregate push-down over TIME columns for the Parquet and ORC data sources, computed from file-footer statistics. - Add `TimeType` to the MIN/MAX type allow-list in `AggregatePushDownUtils.getSchemaForPushedAggregation`, which enables push-down eligibility for both engines at once. - Add a `TimeType` case to `OrcUtils.getMinMaxFromColumnStatistics` (ORC stores TIME as a LONG, so its stats are `IntegerColumnStatistics`; the value is wrapped in a `LongWritable` and converted back by `OrcDeserializer`). - No Parquet reader change is needed: TIME is stored as `INT64`, so the existing INT64 branch feeds the footer stat into a `ParquetRowConverter` built from the footer `PrimitiveType`, which carries the TIME(MICROS)/TIME(NANOS) annotation. ### Why are the changes needed? To extend support for the TIME data type. Push-down avoids reading and aggregating TIME data at the Spark layer when MIN/MAX/COUNT can be answered from footer stats. ### Does this PR introduce any user-facing change? No. This is an internal optimization on the push-down path; results are unchanged. ### How was this patch tested? Added positive tests covering TIME precisions 0, 6, 7, and 9 (exercising both the Parquet micros and nanos storage paths) and negative tests (data filter, aggregate over an expression, and push-down disabled) to the shared `FileSourceAggregatePushDownSuite`, which is run by the Parquet V1/V2 and ORC V1/V2 suites.
Member
Author
|
Follow-up filed for the all-null aggregate push-down behavior found while reviewing this PR: SPARK-57739 (https://issues.apache.org/jira/browse/SPARK-57739). Parquet throws Update: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Enable MIN/MAX/COUNT aggregate push-down over
TIMEcolumns for the Parquet and ORC data sources, computed from file-footer statistics.TimeTypeto the MIN/MAX type allow-list inAggregatePushDownUtils.getSchemaForPushedAggregation. This is shared, engine-agnostic code called by bothParquetScanBuilderandOrcScanBuilder, so the single change enables push-down eligibility for both engines at once.TimeTypecase toOrcUtils.getMinMaxFromColumnStatistics. ORC storesTIMEas aLONG, so its statistics areIntegerColumnStatistics; the min/max value is wrapped in aLongWritableand converted back to the SparkTimeTypebyOrcDeserializer.TIMEis stored as ParquetINT64, so the existingINT64branch inParquetUtils.createAggInternalRowFromFooterfeeds the footer stat into aParquetRowConverterbuilt from the footerPrimitiveType, which carries theTIME(MICROS)/TIME(NANOS)logical annotation and maps toTimeType.The columnar conversion (
AggregatePushDownUtils.convertAggregatesRowToBatchviaRowToColumnConverter) already supportsTimeType(SPARK-54203), so the columnar path works forTIME.Why are the changes needed?
This is a sub-task of SPARK-57550 (extending support for the
TIMEdata type). Aggregate push-down lets Parquet/ORC answerMIN/MAX/COUNTfrom footer statistics without reading and aggregatingTIMEdata at the Spark layer.Does this PR introduce any user-facing change?
No. This is an internal optimization on the aggregate push-down path; query results are unchanged.
How was this patch tested?
Added tests to the shared
FileSourceAggregatePushDownSuitetrait, which is extended byParquetV1/V2AggregatePushDownSuiteandOrcV1/V2AggregatePushDownSuite, so each test exercises all four engines:MIN/MAX/COUNT(col)/COUNT(*)push-down over aTIMEcolumn at precisions 0, 6, 7, and 9, covering both the Parquet micros (precision <= 6) and nanos (precision >= 7) storage paths, with a null row soCOUNT(col)andCOUNT(*)differ.TIMEcolumn, an aggregate over a non-column expression, and push-down disabled by config -- all asserting the aggregate is not pushed.Ran:
All 92 tests pass.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4.8)