Skip to content

[SPARK-57568][SQL] Support TimeType in Parquet/ORC aggregate push-down#56846

Open
MaxGekk wants to merge 3 commits into
apache:masterfrom
MaxGekk:time-aggr-pushdown
Open

[SPARK-57568][SQL] Support TimeType in Parquet/ORC aggregate push-down#56846
MaxGekk wants to merge 3 commits into
apache:masterfrom
MaxGekk:time-aggr-pushdown

Conversation

@MaxGekk

@MaxGekk MaxGekk commented Jun 28, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Enable MIN/MAX/COUNT aggregate push-down over TIME columns for the Parquet and ORC data sources, computed from file-footer statistics.

  • Add TimeType to the MIN/MAX type allow-list in AggregatePushDownUtils.getSchemaForPushedAggregation. This is shared, engine-agnostic code called by both ParquetScanBuilder and OrcScanBuilder, so the single change enables push-down eligibility for both engines at once.
  • Add a TimeType case to OrcUtils.getMinMaxFromColumnStatistics. ORC stores TIME as a LONG, so its statistics are IntegerColumnStatistics; the min/max value is wrapped in a LongWritable and converted back to the Spark TimeType by OrcDeserializer.
  • No Parquet reader change is needed: TIME is stored as Parquet INT64, so the existing INT64 branch in ParquetUtils.createAggInternalRowFromFooter feeds the footer stat into a ParquetRowConverter built from the footer PrimitiveType, which carries the TIME(MICROS)/TIME(NANOS) logical annotation and maps to TimeType.

The columnar conversion (AggregatePushDownUtils.convertAggregatesRowToBatch via RowToColumnConverter) already supports TimeType (SPARK-54203), so the columnar path works for TIME.

Why are the changes needed?

This is a sub-task of SPARK-57550 (extending support for the TIME data type). Aggregate push-down lets Parquet/ORC answer MIN/MAX/COUNT from footer statistics without reading and aggregating TIME data at the Spark layer.

Does this PR introduce any user-facing change?

No. This is an internal optimization on the aggregate push-down path; query results are unchanged.

How was this patch tested?

Added tests to the shared FileSourceAggregatePushDownSuite trait, which is extended by ParquetV1/V2AggregatePushDownSuite and OrcV1/V2AggregatePushDownSuite, so each test exercises all four engines:

  • Positive: MIN/MAX/COUNT(col)/COUNT(*) push-down over a TIME column at precisions 0, 6, 7, and 9, covering both the Parquet micros (precision <= 6) and nanos (precision >= 7) storage paths, with a null row so COUNT(col) and COUNT(*) differ.
  • Negative: a data filter on the TIME column, an aggregate over a non-column expression, and push-down disabled by config -- all asserting the aggregate is not pushed.

Ran:

build/sbt 'sql/testOnly *ParquetV1AggregatePushDownSuite *ParquetV2AggregatePushDownSuite *OrcV1AggregatePushDownSuite *OrcV2AggregatePushDownSuite'

All 92 tests pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor (Claude Opus 4.8)

MaxGekk added 3 commits June 28, 2026 16:30
### What changes were proposed in this pull request?
Enable MIN/MAX/COUNT aggregate push-down over TIME columns for the Parquet and
ORC data sources, computed from file-footer statistics.

- Add `TimeType` to the MIN/MAX type allow-list in
  `AggregatePushDownUtils.getSchemaForPushedAggregation`, which enables push-down
  eligibility for both engines at once.
- Add a `TimeType` case to `OrcUtils.getMinMaxFromColumnStatistics` (ORC stores
  TIME as a LONG, so its stats are `IntegerColumnStatistics`; the value is wrapped
  in a `LongWritable` and converted back by `OrcDeserializer`).
- No Parquet reader change is needed: TIME is stored as `INT64`, so the existing
  INT64 branch feeds the footer stat into a `ParquetRowConverter` built from the
  footer `PrimitiveType`, which carries the TIME(MICROS)/TIME(NANOS) annotation.

### Why are the changes needed?
To extend support for the TIME data type. Push-down avoids reading and aggregating
TIME data at the Spark layer when MIN/MAX/COUNT can be answered from footer stats.

### Does this PR introduce any user-facing change?
No. This is an internal optimization on the push-down path; results are unchanged.

### How was this patch tested?
Added positive tests covering TIME precisions 0, 6, 7, and 9 (exercising both the
Parquet micros and nanos storage paths) and negative tests (data filter, aggregate
over an expression, and push-down disabled) to the shared
`FileSourceAggregatePushDownSuite`, which is run by the Parquet V1/V2 and ORC V1/V2
suites.
@MaxGekk

MaxGekk commented Jun 28, 2026

Copy link
Copy Markdown
Member Author

Follow-up filed for the all-null aggregate push-down behavior found while reviewing this PR: SPARK-57739 (https://issues.apache.org/jira/browse/SPARK-57739). Parquet throws _LEGACY_ERROR_TEMP_3172 on a row group with an all-null chunk, while ORC returns NULL; the fix is type-agnostic and user-facing, so it's tracked separately rather than in this PR.

Update: _LEGACY_ERROR_TEMP_3172 has since been given a proper name in SPARK-57746 (#56859) -> PARQUET_AGGREGATE_PUSH_DOWN_UNSUPPORTED.NO_MIN_MAX (that PR also renames the sibling _LEGACY_ERROR_TEMP_3171 -> .NO_NUM_NULLS). If SPARK-57739 lands after #56859, it should update/remove that named sub-condition rather than the legacy one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant