From bc6b458abe0353664d379576e185fc97a2b07167 Mon Sep 17 00:00:00 2001 From: Stevo Mitric Date: Fri, 26 Jun 2026 09:18:27 +0000 Subject: [PATCH] Add SORT / ORDER BY and window correctness tests for nanosecond-precision timestamp types End-to-end SORT / ORDER BY and window-function coverage over the nanosecond timestamp types TIMESTAMP_NTZ(p) / TIMESTAMP_LTZ(p) (p in 7..9). These already work (they ride on the orderability / hashing / UnsafeRow primitives); this locks the behaviour in, mirroring the MIN/MAX follow-up which was likewise tests-only. Joins are deferred to a later change. - TimestampNanosWindowSuiteBase (+ ANSI on/off): row_number / rank / dense_rank / lag / lead over a nanosecond ordering key, NTZ and LTZ, codegen on and off. No existing test asserts a window function over a timestamp ordering key. - TimestampNanosSortSuiteBase (+ ANSI on/off): the DataFrame / SQL scenarios not already covered generically by OrderingSuite / SortSuite (which exercise the nanos types via DataTypeTestUtils.atomicTypes) -- a public-API ORDER BY tie-break smoke test, mixed-precision UNION ordering, a vectorized-ORC-read- then-sort, and an intercept documenting that caching a nanosecond column is not supported yet. - Golden coverage: a short ORDER BY + row_number + lead section appended to timestamp-ntz-nanos.sql / timestamp-ltz-nanos.sql. Tests only; no production change. Co-authored-by: Isaac --- .../timestamp-ltz-nanos.sql.out | 63 ++++ .../timestamp-ntz-nanos.sql.out | 63 ++++ .../sql-tests/inputs/timestamp-ltz-nanos.sql | 19 ++ .../sql-tests/inputs/timestamp-ntz-nanos.sql | 19 ++ .../results/timestamp-ltz-nanos.sql.out | 39 +++ .../results/timestamp-ntz-nanos.sql.out | 39 +++ .../sql/TimestampNanosSortSuiteBase.scala | 235 +++++++++++++ .../sql/TimestampNanosWindowSuiteBase.scala | 313 ++++++++++++++++++ 8 files changed, 790 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosSortSuiteBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosWindowSuiteBase.scala diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out index 21e3c2e4a0208..ee73c05a24fb5 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out @@ -971,3 +971,66 @@ SELECT typeof(CASE WHEN true -- !query analysis Project [typeof(CASE WHEN true THEN cast(cast(1969-12-31 23:59:59.1234567 as timestamp_ntz(7)) as timestamp_ltz(9)) ELSE cast(1970-01-01 00:00:00.123456789 UTC as timestamp_ltz(9)) END) AS typeof(CASE WHEN true THEN CAST(1969-12-31 23:59:59.1234567 AS TIMESTAMP_NTZ(7)) ELSE CAST(1970-01-01 00:00:00.123456789 UTC AS TIMESTAMP_LTZ(9)) END)#x] +- OneRowRelation + + +-- !query +SELECT v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001') ORDER BY v +-- !query analysis +Sort [v#x ASC NULLS FIRST], true ++- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.000001 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.000000999 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.000000001 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001'#x] + +- OneRowRelation + + +-- !query +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY rn +-- !query analysis +Sort [rn#x ASC NULLS FIRST], true ++- Project [v#x, rn#x] + +- Project [v#x, rn#x, rn#x] + +- Window [row_number() windowspecdefinition(v#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#x], [v#x ASC NULLS FIRST] + +- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.0000009 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.0000001 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.0000005 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500'#x] + +- OneRowRelation + + +-- !query +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY v +-- !query analysis +Sort [v#x ASC NULLS FIRST], true ++- Project [v#x, next_v#x] + +- Project [v#x, next_v#x, next_v#x] + +- Window [lead(v#x, 1, null) windowspecdefinition(v#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_v#x], [v#x ASC NULLS FIRST] + +- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.0000009 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.0000001 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.0000005 AS TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500'#x] + +- OneRowRelation diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz-nanos.sql.out index 09c18e0647578..abba0bc6b43f9 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz-nanos.sql.out @@ -805,3 +805,66 @@ SELECT map('min', '0001-01-01 00:00:00.000000001' :: timestamp_ntz(9), -- !query analysis Project [map(min, cast(0001-01-01 00:00:00.000000001 as timestamp_ntz(9)), max, cast(9999-12-31 23:59:59.999999 as timestamp_ntz(9))) AS map(min, CAST(0001-01-01 00:00:00.000000001 AS TIMESTAMP_NTZ(9)), max, TIMESTAMP_NTZ '9999-12-31 23:59:59.999999')#x] +- OneRowRelation + + +-- !query +SELECT v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') ORDER BY v +-- !query analysis +Sort [v#x ASC NULLS FIRST], true ++- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.000001 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.000000999 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.000000001 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001'#x] + +- OneRowRelation + + +-- !query +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY rn +-- !query analysis +Sort [rn#x ASC NULLS FIRST], true ++- Project [v#x, rn#x] + +- Project [v#x, rn#x, rn#x] + +- Window [row_number() windowspecdefinition(v#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#x], [v#x ASC NULLS FIRST] + +- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.0000009 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.0000001 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.0000005 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500'#x] + +- OneRowRelation + + +-- !query +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY v +-- !query analysis +Sort [v#x ASC NULLS FIRST], true ++- Project [v#x, next_v#x] + +- Project [v#x, next_v#x, next_v#x] + +- Window [lead(v#x, 1, null) windowspecdefinition(v#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS next_v#x], [v#x ASC NULLS FIRST] + +- Project [v#x] + +- SubqueryAlias __auto_generated_subquery_name + +- Union false, false + :- Union false, false + : :- Project [2020-01-01 00:00:00.0000009 AS v#x] + : : +- OneRowRelation + : +- Project [2020-01-01 00:00:00.0000001 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100'#x] + : +- OneRowRelation + +- Project [2020-01-01 00:00:00.0000005 AS TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500'#x] + +- OneRowRelation diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql index 061d218c275d7..f3327244541ae 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql @@ -309,3 +309,22 @@ SELECT typeof(coalesce('0001-01-01 00:00:00.0000001' :: timestamp_ntz(7), SELECT typeof(CASE WHEN true THEN '1969-12-31 23:59:59.1234567' :: timestamp_ntz(7) ELSE '1970-01-01 00:00:00.123456789 UTC' :: timestamp_ltz(9) END); + +-- SORT / ORDER BY tie-breaks on the sub-microsecond remainder: 001 and 999 share a microsecond, +-- 1000 rolls into the next, so a micro-truncating sort would misorder them (full value 001<999<1000). +SELECT v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001') ORDER BY v; + +-- row_number() over a nanosecond ORDER BY key: the row numbers follow the sub-microsecond order. +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY rn; + +-- lead() over a nanosecond ORDER BY key returns the next sub-microsecond value (carrier round-trip). +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY v; diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz-nanos.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz-nanos.sql index df7406a9ec9ed..b248913813a6f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz-nanos.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz-nanos.sql @@ -249,3 +249,22 @@ SELECT typeof(array(TIMESTAMP_NTZ '9999-12-31 23:59:59', '0001-01-01 00:00:00.000000001' :: timestamp_ntz(9))); SELECT map('min', '0001-01-01 00:00:00.000000001' :: timestamp_ntz(9), 'max', TIMESTAMP_NTZ '9999-12-31 23:59:59.999999'); + +-- SORT / ORDER BY tie-breaks on the sub-microsecond remainder: 001 and 999 share a microsecond, +-- 1000 rolls into the next, so a micro-truncating sort would misorder them (full value 001<999<1000). +SELECT v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') ORDER BY v; + +-- row_number() over a nanosecond ORDER BY key: the row numbers follow the sub-microsecond order. +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY rn; + +-- lead() over a nanosecond ORDER BY key returns the next sub-microsecond value (carrier round-trip). +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY v; diff --git a/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out index ceec6d71ebad7..845b1e30137cd 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out @@ -1086,3 +1086,42 @@ SELECT typeof(CASE WHEN true struct -- !query output timestamp_ltz(9) + + +-- !query +SELECT v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001') ORDER BY v +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.000000001 +2020-01-01 00:00:00.000000999 +2020-01-01 00:00:00.000001 + + +-- !query +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY rn +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.0000001 1 +2020-01-01 00:00:00.0000005 2 +2020-01-01 00:00:00.0000009 3 + + +-- !query +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500') ORDER BY v +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.0000001 2020-01-01 00:00:00.0000005 +2020-01-01 00:00:00.0000005 2020-01-01 00:00:00.0000009 +2020-01-01 00:00:00.0000009 NULL diff --git a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz-nanos.sql.out index ba21a069ba4c6..5311019e61f34 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz-nanos.sql.out @@ -883,3 +883,42 @@ SELECT map('min', '0001-01-01 00:00:00.000000001' :: timestamp_ntz(9), struct> -- !query output {"max":9999-12-31 23:59:59.999999,"min":0001-01-01 00:00:00.000000001} + + +-- !query +SELECT v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000001000' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001') ORDER BY v +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.000000001 +2020-01-01 00:00:00.000000999 +2020-01-01 00:00:00.000001 + + +-- !query +SELECT v, row_number() OVER (ORDER BY v) AS rn FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY rn +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.0000001 1 +2020-01-01 00:00:00.0000005 2 +2020-01-01 00:00:00.0000009 3 + + +-- !query +SELECT v, lead(v) OVER (ORDER BY v) AS next_v FROM ( + SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000900' AS v + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000100' + UNION ALL SELECT TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500') ORDER BY v +-- !query schema +struct +-- !query output +2020-01-01 00:00:00.0000001 2020-01-01 00:00:00.0000005 +2020-01-01 00:00:00.0000005 2020-01-01 00:00:00.0000009 +2020-01-01 00:00:00.0000009 NULL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosSortSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosSortSuiteBase.scala new file mode 100644 index 0000000000000..2ff3b9723f2be --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosSortSuiteBase.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.time.{Instant, LocalDateTime} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * End-to-end SORT / ORDER BY tests over the nanosecond-precision timestamp types + * `TIMESTAMP_NTZ(p)` / `TIMESTAMP_LTZ(p)` (`p` in `[7, 9]`). These capabilities ride on the + * orderability the nanosecond types already implement, so no production change is required. + * + * Scope note: per-type ordering correctness is already covered generically -- the nanos types are + * in `DataTypeTestUtils.atomicTypes` (SPARK-57259), so `OrderingSuite` exercises interpreted vs + * generated ordering for them (and SPARK-57103 added the sub-microsecond tie-break, Long-boundary, + * pre-epoch, NULLS-first and precision-independence cases there), and `SortSuite` runs physical + * `SortExec` (radix on and off) over those types. This suite therefore only covers the DataFrame / + * SQL end-to-end scenarios NOT exercised by those generic suites: + * - a public-API `Dataset.orderBy` smoke test that also pins the sub-microsecond tie-break, + * - mixed-precision ORDER BY via `UNION ALL` (ordering after type-coercion widening), + * - the vectorized-ORC-read-then-sort columnar path, and + * - the (currently unsupported) cache-then-sort path. + * + * The nanosecond timestamp types are gated behind a preview flag enabled by default under tests + * (`Utils.isTesting`), so it is not set here. The session time zone is fixed so the + * `TIMESTAMP_LTZ` (`Instant`) values render deterministically. The two subclasses run every test + * with ANSI mode on and off. + * + * NOTE on assertions: `checkAnswer` is order-INSENSITIVE (QueryTest sorts both sides), so it + * cannot verify ORDER BY ordering. Ordering claims use `df.orderBy(...).collect().toSeq === + * `; `checkAnswer` is used only as a value-set (multiset) cross-check. + */ +abstract class TimestampNanosSortSuiteBase extends SharedSparkSession { + + import testImplicits._ + + override def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Los_Angeles") + + // Exercise both genComp arms: forced whole-stage codegen, then forced interpreted fallback. + // Mirrors TimestampNanosFunctionsSuiteBase.scala. + protected val codegenModes: Seq[Seq[(String, String)]] = Seq( + Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY"), + Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN")) + + // Single nanosecond TIMESTAMP_NTZ(p) column "c"; a null element becomes a NULL row. + protected def ntzDF(values: Seq[String], precision: Int): DataFrame = + spark.createDataFrame( + spark.sparkContext.parallelize( + values.map(s => Row(if (s == null) null else LocalDateTime.parse(s)))), + new StructType().add("c", TimestampNTZNanosType(precision))) + + // Single nanosecond TIMESTAMP_LTZ(p) column "c"; a null element becomes a NULL row. + protected def ltzDF(values: Seq[String], precision: Int): DataFrame = + spark.createDataFrame( + spark.sparkContext.parallelize( + values.map(s => Row(if (s == null) null else Instant.parse(s)))), + new StructType().add("c", TimestampLTZNanosType(precision))) + + // ========================================================================================== + // Public-API ORDER BY smoke test, also pinning the sub-microsecond tie-break end to end. + // ========================================================================================== + // The first two non-null values share epochMicros (..00.000000001 and ..00.000000999 are both + // inside micro 1577836800000000); the third (..00.000001000) rolls into the NEXT micro. The full + // TimestampNanosVal.compareTo must order them 001 < 999 < 1000 through the Dataset.orderBy path, + // on both the whole-stage codegen and interpreted comparison arms, for NTZ and LTZ. + test("Dataset.orderBy over a nanosecond key tie-breaks on the sub-microsecond remainder") { + val ntzVals = Seq( + "2020-01-01T00:00:00.000001000", + "2020-01-01T00:00:00.000000999", + "2020-01-01T00:00:00.000000001", + null) + val ntzAsc = Seq( + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000001")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000999")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000001000"))) + val ltzVals = Seq( + "2020-01-01T00:00:00.000001000Z", + "2020-01-01T00:00:00.000000999Z", + "2020-01-01T00:00:00.000000001Z", + null) + val ltzAsc = Seq( + Row(Instant.parse("2020-01-01T00:00:00.000000001Z")), + Row(Instant.parse("2020-01-01T00:00:00.000000999Z")), + Row(Instant.parse("2020-01-01T00:00:00.000001000Z"))) + + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + // --- NTZ --- ASC default => NULLS FIRST; DESC default => NULLS LAST. + val ntz = ntzDF(ntzVals, 9) + assert(ntz.orderBy($"c".asc).collect().toSeq === (Row(null) +: ntzAsc)) + assert(ntz.orderBy($"c".desc).collect().toSeq === (ntzAsc.reverse :+ Row(null))) + checkAnswer(ntz.filter($"c".isNotNull), ntzAsc) + + // --- LTZ --- + val ltz = ltzDF(ltzVals, 9) + assert(ltz.orderBy($"c".asc).collect().toSeq === (Row(null) +: ltzAsc)) + assert(ltz.orderBy($"c".desc).collect().toSeq === (ltzAsc.reverse :+ Row(null))) + checkAnswer(ltz.filter($"c".isNotNull), ltzAsc) + } + } + } + + // ========================================================================================== + // Mixed-precision ORDER BY via UNION ALL (widens to the wider p, findWiderDateTimeType). + // ========================================================================================== + // TypeCoercionHelper.findWiderDateTimeType widens nanos by max precision within the (NTZ) family, + // so p=7 UNION ALL p=9 -> TimestampNTZNanosType(9). The p=9 frame's ..001 remainder needs full + // nanos; after widening the global order must be exact. (Remainders are 100ns multiples so the + // p=7 frame's values are exact at precision 7.) + test("ORDER BY over a UNION ALL of mixed-precision nanosecond timestamps orders correctly") { + val p7 = ntzDF(Seq( + "2020-01-01T00:00:00.000000200", + "2020-01-01T00:00:00.000000800"), 7) + val p9 = ntzDF(Seq( + "2020-01-01T00:00:00.000000001", + "2020-01-01T00:00:00.000000999"), 9) + val unioned = p7.unionByName(p9) + assert(unioned.schema("c").dataType === TimestampNTZNanosType(9)) + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + assert(unioned.orderBy($"c".asc).collect().toSeq === Seq( + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000001")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000200")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000800")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000999")))) + } + } + } + + // ========================================================================================== + // POSITIVE columnar path: vectorized ORC read THEN sort -- executes and is correct today. + // ========================================================================================== + // ColumnarToRowExec reads nanos via the typed leaf getters getTimestampNTZNanos/ + // getTimestampLTZNanos (and via UnsafeProjection), never through ColumnarRow.copy()/get; the + // vectorized ORC batch path is taken (OrcFileFormat accepts nanos; OrcAtomicColumnVector has a + // nanos arm). Tie-break values share epochMicros and differ only in nanosWithinMicro. + test("sort over a nanosecond column read back from vectorized ORC orders correctly") { + Seq(7, 8, 9).foreach { p => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + val schema = new StructType().add("c", TimestampNTZNanosType(p)) + val data = Seq( + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000900")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000100")), + Row(null)) + spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) + .write.mode("overwrite").orc(path) + val read = spark.read.schema(schema).orc(path) + checkAnswer( + read.orderBy($"c".asc_nulls_first), + Seq( + Row(null), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000100")), + Row(LocalDateTime.parse("2020-01-01T00:00:00.000000900")))) + } + } + } + } + + // ========================================================================================== + // CACHE path: documents the CURRENT gap (caching a nanosecond column is not supported yet). + // ========================================================================================== + // InMemoryRelation.supportsColumnarInput is hardcoded false, so .cache() always builds + // row->CachedBatch via ColumnBuilder(attribute.dataType, ...). ColumnBuilder.apply has no nanos + // case -> QueryExecutionErrors.notSupportTypeError; ColumnType.apply has no nanos case -> + // unsupportedDataTypeError. The throw happens at cache-WRITE time, before any ColumnarRow read, + // so it fronts the ColumnarRow/ColumnarBatchRow copy()/get gap (no nanos arm) that a pure + // sort/window flow never reaches. This intercept pins the current behaviour so a future cache fix + // has to flip it. TODO: when the in-memory columnar cache learns the nanos types, replace this + // intercept with a passing cached-sort assertion, and add a nanos arm to: + // - sql/core/.../execution/columnar/ColumnBuilder.scala (apply) + // - sql/core/.../execution/columnar/ColumnType.scala (apply) + // - sql/catalyst/.../vectorized/ColumnarRow.java (copy + get(int, DataType)) + // - sql/catalyst/.../vectorized/ColumnarBatchRow.java (copy + get(int, DataType)) + test("caching a nanosecond-precision timestamp column is not supported yet") { + Seq(7, 8, 9).foreach { p => + Seq[(DataType, Any)]( + TimestampNTZNanosType(p) -> LocalDateTime.parse("2020-01-01T13:24:35.123456789"), + TimestampLTZNanosType(p) -> Instant.parse("2020-01-01T21:24:35.987654321Z") + ).foreach { case (dt, v) => + val schema = new StructType().add("c", dt) + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(v), Row(null))), schema).cache() + try { + // The failure surfaces when an action materializes the cached plan. + val e = intercept[Exception] { + df.orderBy($"c".asc_nulls_first).collect() + } + // Be lenient on exact class/message: the throw originates in ColumnBuilder/ColumnType and + // is surfaced (possibly wrapped) through the Dataset action. + val msg = Option(e.getMessage).map(_.toLowerCase(java.util.Locale.ROOT)).getOrElse("") + assert( + msg.contains("timestamp_ntz") || msg.contains("timestamp_ltz") || + msg.contains("not support") || msg.contains("unsupported"), + s"unexpected failure for cached sort over $dt: $e") + } finally { + df.unpersist() + } + } + } + } +} + +// Runs the nanosecond timestamp sort tests with ANSI mode enabled explicitly. +class TimestampNanosSortAnsiOnSuite extends TimestampNanosSortSuiteBase { + override def sparkConf: SparkConf = super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "true") +} + +// Runs the nanosecond timestamp sort tests with ANSI mode disabled explicitly. +class TimestampNanosSortAnsiOffSuite extends TimestampNanosSortSuiteBase { + override def sparkConf: SparkConf = super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosWindowSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosWindowSuiteBase.scala new file mode 100644 index 0000000000000..c351e30daeb6e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosWindowSuiteBase.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.time.{Instant, LocalDateTime} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * End-to-end window-function correctness tests over the nanosecond-precision timestamp types + * `TIMESTAMP_NTZ(p)` / `TIMESTAMP_LTZ(p)` (`p` in `[7, 9]`). Window functions are type-agnostic + * and ride entirely on orderability and the `UnsafeRow` / window-buffer primitives, so no + * production change is required -- this suite locks the behaviour in. + * + * The headline assertion is sub-microsecond ordering: input values share their `epochMicros` and + * differ only in `nanosWithinMicro`, so the micro path cannot distinguish them and + * `row_number()` / `rank()` / `dense_rank()` are the real proof of nanos ordering. + * `lag` / `lead` additionally round-trip the nanos value through the window buffer / `UnsafeRow` + * append, so collecting the neighbour back as `LocalDateTime` / `Instant` proves the carrier + * (`epochMicros` + `nanosWithinMicro`) survives. Each ordering body runs on both the whole-stage + * codegen comparison arm and the interpreted `Ordering[TimestampNanosVal]` arm, NTZ and LTZ. + * + * All sub-microsecond remainders are multiples of 100ns (100 / 200 / ... / 900) so they are exact + * at every precision p in [7, 9] (p=7 has 100ns resolution, p=8 has 10ns); a non-100ns-multiple + * remainder would be floored away at p=7/p=8 and collapse the intended distinct values into ties. + * + * The preview flag is enabled by default under tests (`Utils.isTesting`), so it is not set. The + * session time zone is fixed so `TIMESTAMP_LTZ` values render deterministically. The two + * subclasses run every test with ANSI mode on and off. + * + * NOTE: every test here projects a deterministic, distinct ordinal column (`id`, or the window + * output `rn`/`rk`) alongside the nanos column, so `checkAnswer` (order-insensitive) suffices -- + * the row-number / rank value IS the ordering proof, so no collect-strict assertion is needed. + */ +abstract class TimestampNanosWindowSuiteBase extends SharedSparkSession { + + import testImplicits._ + + override def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Los_Angeles") + + protected val codegenModes: Seq[Seq[(String, String)]] = Seq( + Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY"), + Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN")) + + private def ntzSchema(p: Int): StructType = + new StructType().add("id", IntegerType).add("ts", TimestampNTZNanosType(p)) + + private def ltzSchema(p: Int): StructType = + new StructType().add("id", IntegerType).add("ts", TimestampLTZNanosType(p)) + + // ========================================================================================== + // row_number() OVER (ORDER BY ) -- sub-microsecond ordering, NTZ + LTZ. + // ========================================================================================== + // All three values share epochMicros 2020-01-01T00:00:00.000000 and differ only inside the + // microsecond (100ns / 500ns / 900ns), so the row numbers are produced purely by nanos ordering. + test("row_number over a nanosecond TIMESTAMP_NTZ orders by the sub-microsecond part") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val data = Seq( + Row(10, LocalDateTime.parse("2020-01-01T00:00:00.000000900")), + Row(20, LocalDateTime.parse("2020-01-01T00:00:00.000000100")), + Row(30, LocalDateTime.parse("2020-01-01T00:00:00.000000500"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), ntzSchema(p)) + // ASC: 100ns -> 500ns -> 900ns -> ids 20, 30, 10. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts")).as("rn")), + Seq(Row(20, 1), Row(30, 2), Row(10, 3))) + // DESC: reversed. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts".desc)).as("rn")), + Seq(Row(10, 1), Row(30, 2), Row(20, 3))) + } + } + } + } + + test("row_number over a nanosecond TIMESTAMP_LTZ orders by the sub-microsecond part") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val data = Seq( + Row(10, Instant.parse("2020-01-01T00:00:00.000000900Z")), + Row(20, Instant.parse("2020-01-01T00:00:00.000000100Z")), + Row(30, Instant.parse("2020-01-01T00:00:00.000000500Z"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), ltzSchema(p)) + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts")).as("rn")), + Seq(Row(20, 1), Row(30, 2), Row(10, 3))) + } + } + } + } + + // ========================================================================================== + // rank()/dense_rank() OVER (PARTITION BY g ORDER BY ) -- ties at the nanos level. + // ========================================================================================== + // Two partitions. Within g=1 two rows share .000000500 (a sub-microsecond tie), so rank() skips + // and dense_rank() does not; the tie can only be detected by the full nanos comparison (all rows + // in g=1 share epochMicros). + test("rank/dense_rank partition by key, order by a nanosecond NTZ column") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val schema = new StructType() + .add("g", IntegerType).add("id", IntegerType).add("ts", TimestampNTZNanosType(p)) + val data = Seq( + Row(1, 101, LocalDateTime.parse("2020-01-01T00:00:00.000000500")), + Row(1, 102, LocalDateTime.parse("2020-01-01T00:00:00.000000500")), + Row(1, 103, LocalDateTime.parse("2020-01-01T00:00:00.000000900")), + Row(1, 104, LocalDateTime.parse("2020-01-01T00:00:00.000000100")), + Row(2, 201, LocalDateTime.parse("2020-01-01T00:00:00.000000900")), + Row(2, 202, LocalDateTime.parse("2020-01-01T00:00:00.000000500"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + val w = Window.partitionBy($"g").orderBy($"ts") + checkAnswer( + df.select($"g", $"id", rank().over(w).as("rk"), dense_rank().over(w).as("drk")), + Seq( + Row(1, 104, 1, 1), // 100ns + Row(1, 101, 2, 2), // 500ns + Row(1, 102, 2, 2), // 500ns tie: same rank/dense_rank as 101 + Row(1, 103, 4, 3), // 900ns: rank skips to 4, dense_rank advances to 3 + Row(2, 202, 1, 1), // 500ns + Row(2, 201, 2, 2))) // 900ns + } + } + } + } + + test("rank/dense_rank partition by key, order by a nanosecond LTZ column (SQL path)") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val schema = new StructType() + .add("g", IntegerType).add("id", IntegerType).add("ts", TimestampLTZNanosType(p)) + val data = Seq( + Row(1, 101, Instant.parse("2020-01-01T00:00:00.000000500Z")), + Row(1, 102, Instant.parse("2020-01-01T00:00:00.000000500Z")), + Row(1, 103, Instant.parse("2020-01-01T00:00:00.000000900Z")), + Row(2, 201, Instant.parse("2020-01-01T00:00:00.000000900Z")), + Row(2, 202, Instant.parse("2020-01-01T00:00:00.000000500Z"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + withTempView("nanos_ltz") { + df.createOrReplaceTempView("nanos_ltz") + checkAnswer( + spark.sql( + """select g, id, + | rank() over (partition by g order by ts) as rk, + | dense_rank() over (partition by g order by ts) as drk + |from nanos_ltz""".stripMargin), + Seq( + Row(1, 101, 1, 1), Row(1, 102, 1, 1), Row(1, 103, 3, 2), + Row(2, 202, 1, 1), Row(2, 201, 2, 2))) + } + } + } + } + } + + // ========================================================================================== + // lag()/lead() return the neighbouring nanos VALUE -- round-trips epochMicros+nanosWithinMicro + // through the window buffer. NTZ + LTZ. Window ordered by an unambiguous Int key so the order is + // independent of ts; the asserted values differ only inside the microsecond. + // ========================================================================================== + test("lag/lead return the neighbouring nanosecond NTZ value down to the sub-microsecond") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val v1 = LocalDateTime.parse("2020-01-01T00:00:00.000000100") + val v2 = LocalDateTime.parse("2020-01-01T00:00:00.000000500") + val v3 = LocalDateTime.parse("2020-01-01T00:00:00.000000900") + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, v1), Row(2, v2), Row(3, v3))), ntzSchema(p)) + val w = Window.orderBy($"id") + val res = df.select( + $"id", lag($"ts", 1).over(w).as("prev_ts"), lead($"ts", 1).over(w).as("next_ts")) + checkAnswer(res, Seq( + Row(1, null, v2), // first row: no previous + Row(2, v1, v3), // prev=100ns, next=900ns round-trip exactly + Row(3, v2, null))) // last row: no next + assert(res.schema("prev_ts").dataType === TimestampNTZNanosType(p)) + assert(res.schema("next_ts").dataType === TimestampNTZNanosType(p)) + } + } + } + } + + test("lag/lead return the neighbouring nanosecond LTZ value down to the sub-microsecond") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val v1 = Instant.parse("2020-01-01T00:00:00.000000100Z") + val v2 = Instant.parse("2020-01-01T00:00:00.000000500Z") + val v3 = Instant.parse("2020-01-01T00:00:00.000000900Z") + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, v1), Row(2, v2), Row(3, v3))), ltzSchema(p)) + val w = Window.orderBy($"id") + val res = df.select( + $"id", lag($"ts", 1).over(w).as("prev_ts"), lead($"ts", 1).over(w).as("next_ts")) + checkAnswer(res, Seq(Row(1, null, v2), Row(2, v1, v3), Row(3, v2, null))) + assert(res.schema("prev_ts").dataType === TimestampLTZNanosType(p)) + assert(res.schema("next_ts").dataType === TimestampLTZNanosType(p)) + } + } + } + } + + // ========================================================================================== + // lead() over a window ORDERED BY the nanos column itself -- combines both nanos paths: ordering + // is by the sub-microsecond key AND the returned neighbour is also a nanos value. + // ========================================================================================== + test("lead over a window ordered by the nanosecond NTZ column itself") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val a = LocalDateTime.parse("2020-01-01T00:00:00.000000100") + val b = LocalDateTime.parse("2020-01-01T00:00:00.000000200") + val c = LocalDateTime.parse("2020-01-01T00:00:00.000000300") + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(30, c), Row(10, a), Row(20, b))), ntzSchema(p)) + val w = Window.orderBy($"ts") + checkAnswer( + df.select($"id", lead($"ts", 1).over(w).as("next_ts")), + Seq(Row(10, b), Row(20, c), Row(30, null))) + } + } + } + } + + // ========================================================================================== + // NULLS ordering inside a window (NULLS FIRST/LAST x ASC/DESC), NTZ + LTZ. + // ========================================================================================== + test("row_number honours NULLS FIRST/LAST over a nanosecond NTZ window") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val lo = LocalDateTime.parse("2020-01-01T00:00:00.000000100") + val hi = LocalDateTime.parse("2020-01-01T00:00:00.000000900") + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, lo), Row(2, hi), Row(3, null))), ntzSchema(p)) + // ASC default => NULLS FIRST: null, 100ns, 900ns. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts")).as("rn")), + Seq(Row(3, 1), Row(1, 2), Row(2, 3))) + // ASC NULLS LAST: 100ns, 900ns, null. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts".asc_nulls_last)).as("rn")), + Seq(Row(1, 1), Row(2, 2), Row(3, 3))) + // DESC default => NULLS LAST: 900ns, 100ns, null. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts".desc)).as("rn")), + Seq(Row(2, 1), Row(1, 2), Row(3, 3))) + // DESC NULLS FIRST: null, 900ns, 100ns. + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts".desc_nulls_first)).as("rn")), + Seq(Row(3, 1), Row(2, 2), Row(1, 3))) + } + } + } + } + + test("row_number honours NULLS FIRST/LAST over a nanosecond LTZ window") { + codegenModes.foreach { conf => + withSQLConf(conf: _*) { + Seq(7, 8, 9).foreach { p => + val lo = Instant.parse("2020-01-01T00:00:00.000000100Z") + val hi = Instant.parse("2020-01-01T00:00:00.000000900Z") + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(1, lo), Row(2, hi), Row(3, null))), ltzSchema(p)) + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts")).as("rn")), + Seq(Row(3, 1), Row(1, 2), Row(2, 3))) + checkAnswer( + df.select($"id", row_number().over(Window.orderBy($"ts".desc)).as("rn")), + Seq(Row(2, 1), Row(1, 2), Row(3, 3))) + } + } + } + } +} + +// Runs the nanosecond timestamp window tests with ANSI mode enabled explicitly. +class TimestampNanosWindowAnsiOnSuite extends TimestampNanosWindowSuiteBase { + override def sparkConf: SparkConf = super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "true") +} + +// Runs the nanosecond timestamp window tests with ANSI mode disabled explicitly. +class TimestampNanosWindowAnsiOffSuite extends TimestampNanosWindowSuiteBase { + override def sparkConf: SparkConf = super.sparkConf.set(SQLConf.ANSI_ENABLED.key, "false") +}