diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReaderSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReaderSuite.scala new file mode 100644 index 0000000000000..9e3c06de9a3d0 --- /dev/null +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReaderSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client.arrow + +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.TimeStampMicroTZVector + +import org.apache.spark.sql.connect.test.ConnectFunSuite +import org.apache.spark.sql.types.{TimestampLTZNanosType, TimestampNTZNanosType, TimestampType} +import org.apache.spark.sql.util.ArrowUtils + +class ArrowVectorReaderSuite extends ConnectFunSuite { + + private val allocator = new RootAllocator() + + override def afterAll(): Unit = { + allocator.close() + super.afterAll() + } + + // Build a TimeStampMicroTZVector (the Arrow encoding for TimestampType) backed by a live + // allocator. This is the vector a Connect server would send for any LTZ timestamp column. + private def microTZVector(): TimeStampMicroTZVector = { + val field = ArrowUtils.toArrowField("ts", TimestampType, nullable = true, "UTC") + field.createVector(allocator).asInstanceOf[TimeStampMicroTZVector] + } + + test("SPARK-XXXXX: ArrowVectorReader rejects TimestampLTZNanosType with a clear error") { + val vector = microTZVector() + try { + val ex = intercept[RuntimeException] { + ArrowVectorReader(TimestampLTZNanosType(9), vector, "UTC") + } + assert(ex.getMessage.contains("not yet supported"), + s"Expected 'not yet supported' in error message, got: ${ex.getMessage}") + } finally { + vector.close() + } + } + + test("SPARK-XXXXX: ArrowVectorReader rejects TimestampNTZNanosType with a clear error") { + val vector = microTZVector() + try { + val ex = intercept[RuntimeException] { + ArrowVectorReader(TimestampNTZNanosType(7), vector, "UTC") + } + assert(ex.getMessage.contains("not yet supported"), + s"Expected 'not yet supported' in error message, got: ${ex.getMessage}") + } finally { + vector.close() + } + } + + test("SPARK-XXXXX: ArrowVectorReader still succeeds for plain TimestampType") { + val vector = microTZVector() + try { + val reader = ArrowVectorReader(TimestampType, vector, "UTC") + assert(reader != null) + } finally { + vector.close() + } + } +} diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala index 54311cecc1627..7a622d3010e9f 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils._ import org.apache.spark.sql.connect.common.types.ops.ConnectTypeOps -import org.apache.spark.sql.types.{DataType, DayTimeIntervalType, Decimal, UpCastRule, YearMonthIntervalType} +import org.apache.spark.sql.types.{AnyTimestampNanoType, DataType, DayTimeIntervalType, Decimal, UpCastRule, YearMonthIntervalType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.util.SparkStringUtils @@ -84,6 +84,15 @@ object ArrowVectorReader { throw new RuntimeException( s"Reading '$targetDataType' values from a ${vector.getClass} instance is not supported.") } + // Nanosecond-precision timestamp types (TIMESTAMP_LTZ(p) / TIMESTAMP_NTZ(p), p in [7,9]) are + // not yet supported over Spark Connect: there is no Arrow vector type for sub-microsecond + // timestamps and no reader implementation here. UpCastRule.canUpCast now returns true for the + // micro -> nanos widening direction (SPARK-57303), so the generic guard above no longer + // catches this case. Fail fast with a clear message until Connect nanos support is added. + if (targetDataType.isInstanceOf[AnyTimestampNanoType]) { + throw new RuntimeException( + s"Reading '$targetDataType' values over Spark Connect is not yet supported.") + } vector match { case v: BitVector => new BitVectorReader(v) case v: TinyIntVector => new TinyIntVectorReader(v)