diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3d1ca002719bc..49c213bcc38df 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5062,6 +5062,12 @@ }, "sqlState" : "42K0E" }, + "INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT" : { + "message" : [ + "The provided timestamp doesn't match the expected syntax ." + ], + "sqlState" : "22000" + }, "INVALID_TYPED_LITERAL" : { "message" : [ "The value of the typed literal is invalid: ." @@ -5579,7 +5585,7 @@ }, "MULTIPLE_TIME_TRAVEL_SPEC" : { "message" : [ - "Cannot specify time travel in both the time travel clause and options." + "Cannot specify time travel in more than one of: the '@' suffix in the table name, the time travel clause, and the read options." ], "sqlState" : "42K0E" }, @@ -8469,6 +8475,11 @@ "Time travel on the relation: ." ] }, + "TIME_TRAVEL_AT_SYNTAX" : { + "message" : [ + "Time travel using the '@' suffix in table names. Set to true to enable it." + ] + }, "TOO_MANY_TYPE_ARGUMENTS_FOR_UDF_CLASS" : { "message" : [ "UDF class with type arguments." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index bf759e501db9a..d5ad34d5821fe 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -590,6 +590,8 @@ OPERATOR_PIPE: '|>'; HAT: '^'; COLON: ':'; DOUBLE_COLON: '::'; +AT_SIGN: '@'; +AT_VERSION: '@V'; ARROW: '->'; FAT_ARROW : '=>'; HENT_START: '/*+'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index bb44041f67dd5..635deeeab1471 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -201,6 +201,10 @@ singleTableIdentifier : tableIdentifier EOF ; +singleTemporalTableIdentifier + : temporalTableIdentifier EOF + ; + singleMultipartIdentifier : multipartIdentifier EOF ; @@ -1133,7 +1137,7 @@ relationPrimary : streamRelationPrimary #streamRelation | identifierReference changesClause optionsClause? tableAlias #changelogTableName - | identifierReference temporalClause? + | temporalTableIdentifierReference temporalClause? optionsClause? sample? watermarkClause? tableAlias #tableName | LEFT_PAREN query RIGHT_PAREN sample? watermarkClause? tableAlias #aliasedQuery @@ -1239,6 +1243,18 @@ tableIdentifier : (db=errorCapturingIdentifier DOT)? table=errorCapturingIdentifier ; +temporalTableIdentifier + : id=multipartIdentifier AT_SIGN timestamp=INTEGER_VALUE + | id=multipartIdentifier AT_VERSION version + | id=multipartIdentifier + ; + +temporalTableIdentifierReference + : identifierReference AT_SIGN timestamp=INTEGER_VALUE + | identifierReference AT_VERSION version + | identifierReference + ; + functionIdentifier : (db=errorCapturingIdentifier DOT)? function=errorCapturingIdentifier ; diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 1cc050f488f64..6ecc836dd3dc7 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -869,6 +869,28 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + def invalidAtSyntaxTimestamp( + timestamp: String, format: String, ctx: ParserRuleContext): Throwable = { + new ParseException( + errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT", + messageParameters = Map("timestamp" -> timestamp, "format" -> format), + ctx) + } + + def multipleTimeTravelSpec(ctx: ParserRuleContext): Throwable = { + new ParseException( + errorClass = "MULTIPLE_TIME_TRAVEL_SPEC", + messageParameters = Map.empty, + ctx) + } + + def timeTravelAtSyntaxDisabled(configKey: String, ctx: ParserRuleContext): Throwable = { + new ParseException( + errorClass = "UNSUPPORTED_FEATURE.TIME_TRAVEL_AT_SYNTAX", + messageParameters = Map("config" -> toSQLConf(configKey)), + ctx) + } + def invalidNameForDropTempFunc(name: Seq[String], ctx: ParserRuleContext): Throwable = { new ParseException( errorClass = "INVALID_SQL_SYNTAX.MULTI_PART_NAME", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala index 29bf924f244e8..e6d53ecc1e258 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala @@ -72,6 +72,16 @@ abstract class AbstractSqlParser extends AbstractParser with ParserInterface { } } + /** Creates a TemporalIdentifier for a given SQL string */ + override def parseTemporalTableIdentifier(sqlText: String): TemporalIdentifier = { + parse(sqlText) { parser => + val ctx = parser.singleTemporalTableIdentifier() + withErrorHandling(ctx, Some(sqlText)) { + astBuilder.visitSingleTemporalTableIdentifier(ctx) + } + } + } + /** Creates LogicalPlan for a given SQL string of query. */ override def parseQuery(sqlText: String): LogicalPlan = parse(sqlText) { parser => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 23bdeaa165847..605a9aacfd481 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.parser +import java.time.{DateTimeException, LocalDateTime} import java.util.{List, Locale} import java.util.concurrent.TimeUnit @@ -45,7 +46,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeConstants, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, fractionalSecondsDigits, getZoneId, stringToDate, stringToTime, stringToTimestamp, stringToTimestampLTZNanos, stringToTimestampNTZNanos, stringToTimestampWithoutTimeZone} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogContext, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} @@ -708,6 +709,11 @@ class AstBuilder extends DataTypeAstBuilder visitMultipartIdentifier(ctx.multipartIdentifier) } + override def visitSingleTemporalTableIdentifier( + ctx: SingleTemporalTableIdentifierContext): TemporalIdentifier = withOrigin(ctx) { + visitTemporalTableIdentifier(ctx.temporalTableIdentifier) + } + override def visitSinglePathElementList( ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) { ctx.pathElement().asScala.map(visitPathElement).toSeq @@ -2634,13 +2640,89 @@ class AstBuilder extends DataTypeAstBuilder * Create an aliased table reference. This is typically used in FROM clauses. */ override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) { - val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause)) - val table = mayApplyAliasPlan( - ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel)) + val ttCtx = ctx.temporalTableIdentifierReference + val relation = createUnresolvedRelation(ttCtx.identifierReference, Option(ctx.optionsClause)) + val withTimeTravelSpec = withTableTimeTravel(relation, ttCtx, ctx.temporalClause) + val table = mayApplyAliasPlan(ctx.tableAlias, withTimeTravelSpec) val sample = table.optionalMap(ctx.sample)(withSample) sample.optionalMap(ctx.watermarkClause)(withWatermark) } + /** + * Applies the table-name '@' time travel suffix and/or the `AS OF` clause to `relation`. + */ + private def withTableTimeTravel( + relation: LogicalPlan, + ttCtx: TemporalTableIdentifierReferenceContext, + clause: TemporalClauseContext): LogicalPlan = { + val (atTimestamp, atVersion) = temporalSpec(ttCtx, ttCtx.timestamp, ttCtx.version) + val hasAtSpec = atTimestamp.isDefined || atVersion.isDefined + if (hasAtSpec && clause != null) { + withOrigin(clause) { + throw QueryParsingErrors.multipleTimeTravelSpec(clause) + } + } + val withAtSpec = + if (hasAtSpec) RelationTimeTravel(relation, atTimestamp, atVersion) else relation + withAtSpec.optionalMap(clause)(withTimeTravel) + } + + override def visitTemporalTableIdentifier( + ctx: TemporalTableIdentifierContext): TemporalIdentifier = withOrigin(ctx) { + val (timestamp, version) = temporalSpec(ctx, ctx.timestamp, ctx.version) + TemporalIdentifier(visitMultipartIdentifier(ctx.id), timestamp, version) + } + + /** + * Parse the digits of an '@' time travel timestamp (format yyyyMMddHHmmssSSS) to + * microseconds since epoch in the session time zone. + */ + private def parseAtSyntaxTimestamp(text: String, ctx: ParserRuleContext): Long = { + val format = TemporalIdentifier.TimestampFormat + if (text.length != format.length) { + throw QueryParsingErrors.invalidAtSyntaxTimestamp(text, format, ctx) + } + try { + val localDateTime = LocalDateTime.of( + text.substring(0, 4).toInt, + text.substring(4, 6).toInt, + text.substring(6, 8).toInt, + text.substring(8, 10).toInt, + text.substring(10, 12).toInt, + text.substring(12, 14).toInt, + text.substring(14, 17).toInt * DateTimeConstants.NANOS_PER_MILLIS.toInt) + DateTimeUtils.instantToMicros( + localDateTime.atZone(getZoneId(conf.sessionLocalTimeZone)).toInstant) + } catch { + case _: DateTimeException => + throw QueryParsingErrors.invalidAtSyntaxTimestamp(text, format, ctx) + } + } + + /** + * Extract the optional '@' time travel suffix of a table identifier: '@' + * (format yyyyMMddHHmmssSSS) or '@v'. + */ + private def temporalSpec( + ctx: ParserRuleContext, + timestampToken: Token, + versionCtx: VersionContext): (Option[Expression], Option[String]) = { + if (timestampToken == null && versionCtx == null) { + (None, None) + } else { + if (!conf.getConf(SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED)) { + throw QueryParsingErrors.timeTravelAtSyntaxDisabled( + SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED.key, ctx) + } + if (timestampToken != null) { + val micros = parseAtSyntaxTimestamp(timestampToken.getText, ctx) + (Some(Literal(micros, TimestampType)), None) + } else { + (None, visitVersion(versionCtx)) + } + } + } + override def visitVersion(ctx: VersionContext): Option[String] = { if (ctx != null) { if (ctx.INTEGER_VALUE() != null) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala index bcbc7039f53ce..11664c5091667 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala @@ -69,6 +69,16 @@ trait ParserInterface extends DataTypeParserInterface { @throws[ParseException]("Text cannot be parsed to a multi-part identifier") def parseMultipartIdentifier(sqlText: String): Seq[String] + /** + * Parse a string to a [[TemporalIdentifier]]. + */ + @throws[ParseException]("Text cannot be parsed to a temporal table identifier") + def parseTemporalTableIdentifier(sqlText: String): TemporalIdentifier = { + // Default implementation does not recognize the time travel suffix. Concrete + // implementations can override this to support it. + TemporalIdentifier(parseMultipartIdentifier(sqlText), None, None) + } + /** * Parse a query string to a [[LogicalPlan]]. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/TemporalIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/TemporalIdentifier.scala new file mode 100644 index 0000000000000..44bd7020155a1 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/TemporalIdentifier.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.apache.spark.sql.catalyst.analysis.RelationTimeTravel +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * Result of parsing a table name that may carry an '@' time travel suffix: `name@v` + * or `name@`. At most one of `timestamp` and `version` is set. + */ +case class TemporalIdentifier( + nameParts: Seq[String], + timestamp: Option[Expression], + version: Option[String]) { + + def isTemporal: Boolean = timestamp.isDefined || version.isDefined + + /** Wraps `plan` in a [[RelationTimeTravel]] if a time travel suffix was specified. */ + def wrapTimeTravel(plan: LogicalPlan): LogicalPlan = { + if (isTemporal) RelationTimeTravel(plan, timestamp, version) else plan + } +} + +object TemporalIdentifier { + /** The fixed timestamp format accepted in the suffix. */ + val TimestampFormat: String = "yyyyMMddHHmmssSSS" +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 971eddf7bef90..ce77408ffb3c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6994,6 +6994,17 @@ object SQLConf { .stringConf .createWithDefault("versionAsOf") + val TIME_TRAVEL_AT_SYNTAX_ENABLED = + buildConf("spark.sql.timeTravel.atSyntax.enabled") + .doc("When true, a table name in a query or in table-reading APIs can carry a time " + + "travel suffix: 'name@v123' reads version 123 of the table, and " + + "'name@20240101000000000' (format yyyyMMddHHmmssSSS, interpreted in the session " + + "time zone) reads the table as of that timestamp. When false, '@' in table names " + + "fails at parse time.") + .version("5.0.0") + .booleanConf + .createWithDefault(true) + val OPERATOR_PIPE_SYNTAX_ENABLED = buildConf("spark.sql.operatorPipeSyntaxEnabled") .doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index da82df6f612b7..556f67f6217d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, RelationChanges, Re import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{EvaluateUnresolvedInlineTable, IntervalUtils} +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils} import org.apache.spark.sql.connector.catalog.{ChangelogContext, ChangelogRange} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StringType} +import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StringType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String /** * Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]]. @@ -2193,6 +2194,83 @@ class PlanParserSuite extends AnalysisTest { stop = 38)) } + test("at syntax time travel") { + def versionPlan(version: String): LogicalPlan = { + Project(Seq(UnresolvedStar(None)), + RelationTimeTravel(UnresolvedRelation(Seq("a", "b", "c")), None, Some(version))) + } + assertEqual("SELECT * FROM a.b.c@v123456789", versionPlan("123456789")) + assertEqual("SELECT * FROM a.b.c@V123456789", versionPlan("123456789")) + assertEqual("SELECT * FROM a.b.c @v123456789", versionPlan("123456789")) + assertEqual("SELECT * FROM a.b.c@v'Snapshot123456789'", versionPlan("Snapshot123456789")) + + val micros = DateTimeUtils.stringToTimestampAnsi( + UTF8String.fromString("2019-01-29 00:37:58"), + DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + assertEqual("SELECT * FROM a.b.c@20190129003758000", + Project(Seq(UnresolvedStar(None)), + RelationTimeTravel( + UnresolvedRelation(Seq("a", "b", "c")), + Some(Literal(micros, TimestampType)), + None))) + + assertEqual("SELECT * FROM `t@v1`", + Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("t@v1")))) + + // A non-time-travel '@' suffix is always a parse error. + Seq("SELECT * FROM a@foo", "SELECT * FROM a@", "SELECT * FROM a@v").foreach { q => + assert(intercept[ParseException](parsePlan(q)).getCondition == "PARSE_SYNTAX_ERROR", + s"expected PARSE_SYNTAX_ERROR for: $q") + } + + checkError( + exception = parseException("SELECT * FROM t@v1 VERSION AS OF 2"), + condition = "MULTIPLE_TIME_TRAVEL_SPEC", + parameters = Map.empty, + context = ExpectedContext(fragment = "VERSION AS OF 2", start = 19, stop = 33)) + + checkError( + exception = parseException("SELECT * FROM t@123"), + condition = "INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT", + parameters = Map("timestamp" -> "123", "format" -> "yyyyMMddHHmmssSSS"), + context = ExpectedContext(fragment = "t@123", start = 14, stop = 18)) + + checkError( + exception = parseException("SELECT * FROM t@20191301000000000"), + condition = "INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT", + parameters = Map("timestamp" -> "20191301000000000", "format" -> "yyyyMMddHHmmssSSS"), + context = ExpectedContext(fragment = "t@20191301000000000", start = 14, stop = 32)) + + assert(intercept[ParseException] { + parsePlan("INSERT INTO t@v1 VALUES (1)") + }.getCondition == "PARSE_SYNTAX_ERROR") + + withSQLConf(SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED.key -> "false") { + checkError( + exception = parseException("SELECT * FROM t@v1"), + condition = "UNSUPPORTED_FEATURE.TIME_TRAVEL_AT_SYNTAX", + parameters = Map("config" -> "\"spark.sql.timeTravel.atSyntax.enabled\""), + context = ExpectedContext(fragment = "t@v1", start = 14, stop = 17)) + } + } + + test("parseTemporalTableIdentifier") { + assert(parseTemporalTableIdentifier("a.b") === + TemporalIdentifier(Seq("a", "b"), None, None)) + assert(parseTemporalTableIdentifier("a.b@v5") === + TemporalIdentifier(Seq("a", "b"), None, Some("5"))) + assert(parseTemporalTableIdentifier("`t@v1`") === + TemporalIdentifier(Seq("t@v1"), None, None)) + val micros = DateTimeUtils.stringToTimestampAnsi( + UTF8String.fromString("2019-01-29 00:37:58"), + DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + assert(parseTemporalTableIdentifier("t@20190129003758000") === + TemporalIdentifier(Seq("t"), Some(Literal(micros, TimestampType)), None)) + Seq("a.b@x", "a@foo", "a@", "a@v").foreach { s => + intercept[ParseException](parseTemporalTableIdentifier(s)) + } + } + test("CHANGES clause - version range") { def changesFromVersion( startVersion: String, diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b067efa0579a9..57692a13001f3 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1671,10 +1671,17 @@ class SparkConnectPlanner( rel.getReadTypeCase match { case proto.Read.ReadTypeCase.NAMED_TABLE => - UnresolvedRelation( - parser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier), + val temporalIdent = + parser.parseTemporalTableIdentifier(rel.getNamedTable.getUnparsedIdentifier) + if (temporalIdent.isTemporal && rel.getIsStreaming) { + throw QueryCompilationErrors.timeTravelUnsupportedError( + QueryCompilationErrors.toSQLId(temporalIdent.nameParts)) + } + val relation = UnresolvedRelation( + temporalIdent.nameParts, new CaseInsensitiveStringMap(rel.getNamedTable.getOptionsMap), isStreaming = rel.getIsStreaming) + temporalIdent.wrapTimeTravel(relation) case proto.Read.ReadTypeCase.DATA_SOURCE if !rel.getIsStreaming => val reader = session.read diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 89ccaea93a04e..22f7cabc8c18b 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Expression.{Alias, ExpressionString, UnresolvedStar} import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{RelationTimeTravel, UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} @@ -159,6 +159,46 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { assert(res.nodeName == "UnresolvedRelation") } + test("Read with at syntax time travel") { + val read = proto.Read + .newBuilder() + .setNamedTable(proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("name@v1").build()) + .build() + val res = transform(proto.Relation.newBuilder.setRead(read).build()) + res match { + case RelationTimeTravel(relation: UnresolvedRelation, timestamp, version) => + assert(relation.multipartIdentifier === Seq("name")) + assert(timestamp.isEmpty) + assert(version === Some("1")) + case other => fail(s"Expected RelationTimeTravel but got: $other") + } + + // Streaming reads do not support time travel. + val streamingRead = read.toBuilder.setIsStreaming(true).build() + val e = intercept[AnalysisException] { + transform(proto.Relation.newBuilder.setRead(streamingRead).build()) + } + assert(e.getCondition === "UNSUPPORTED_FEATURE.TIME_TRAVEL") + + // A non-time-travel '@' suffix is a parse error. + val badRead = read.toBuilder + .setNamedTable(proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("name@foo").build()) + .build() + val pe = intercept[AnalysisException] { + transform(proto.Relation.newBuilder.setRead(badRead).build()) + } + assert(pe.getCondition === "PARSE_SYNTAX_ERROR") + + // A backticked '@' name stays a literal table name. + val quotedRead = read.toBuilder + .setNamedTable(proto.Read.NamedTable.newBuilder.setUnparsedIdentifier("`name@v1`").build()) + .build() + transform(proto.Relation.newBuilder.setRead(quotedRead).build()) match { + case u: UnresolvedRelation => assert(u.multipartIdentifier === Seq("name@v1")) + case other => fail(s"Expected a literal UnresolvedRelation but got: $other") + } + } + test("Simple Table with options") { val read = proto.Read.newBuilder().build() // Invalid read without Table name. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index 3dbdf05305164..2c1255a302ad9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -315,10 +315,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) /** @inheritdoc */ def table(tableName: String): DataFrame = { assertNoSpecifiedSchema("table") - val multipartIdentifier = - sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - Dataset.ofRows(sparkSession, UnresolvedRelation(multipartIdentifier, - new CaseInsensitiveStringMap(extraOptions.toMap.asJava))) + val temporalIdent = + sparkSession.sessionState.sqlParser.parseTemporalTableIdentifier(tableName) + val relation = UnresolvedRelation(temporalIdent.nameParts, + new CaseInsensitiveStringMap(extraOptions.toMap.asJava)) + Dataset.ofRows(sparkSession, temporalIdent.wrapTimeTravel(relation)) } /** @inheritdoc */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala index eb3120cac05aa..b359554e527c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamReader.scala @@ -102,9 +102,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) /** @inheritdoc */ def table(tableName: String): DataFrame = { require(tableName != null, "The table name can't be null") - val identifier = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + val temporalIdent = + sparkSession.sessionState.sqlParser.parseTemporalTableIdentifier(tableName) + if (temporalIdent.isTemporal) { + throw QueryCompilationErrors.timeTravelUnsupportedError( + QueryCompilationErrors.toSQLId(temporalIdent.nameParts)) + } val unresolved = UnresolvedRelation( - identifier, + temporalIdent.nameParts, new CaseInsensitiveStringMap(extraOptions.toMap.asJava), isStreaming = true) val plan = NamedStreamingRelation.withUserProvidedName(unresolved, userProvidedSourceName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 752c8ed6c22e0..0f908707970ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -3865,6 +3865,84 @@ class DataSourceV2SQLSuiteV1Filter } } + test("time travel with at syntax") { + sql("use testcat") + val t1 = "testcat.tSnapshot123456789" + val t2 = "testcat.t2345678910" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 (id int) USING foo") + sql(s"CREATE TABLE $t2 (id int) USING foo") + sql(s"INSERT INTO $t1 VALUES (1), (2)") + sql(s"INSERT INTO $t2 VALUES (3), (4)") + + checkAnswer(sql("SELECT * FROM t@v2345678910"), Seq(Row(3), Row(4))) + checkAnswer(sql("SELECT * FROM t@V2345678910"), Seq(Row(3), Row(4))) + checkAnswer(sql("SELECT * FROM t@v'Snapshot123456789'"), Seq(Row(1), Row(2))) + checkAnswer(spark.read.table("t@v2345678910"), Seq(Row(3), Row(4))) + checkAnswer(spark.table("t@v2345678910"), Seq(Row(3), Row(4))) + + checkError( + exception = intercept[ParseException] { + sql("SELECT * FROM t@v2345678910 VERSION AS OF 1") + }, + condition = "MULTIPLE_TIME_TRAVEL_SPEC", + parameters = Map.empty, + context = ExpectedContext(fragment = "VERSION AS OF 1", start = 28, stop = 42)) + checkError( + exception = intercept[AnalysisException] { + spark.read.option("versionAsOf", "2345678910").table("t@v2345678910").collect() + }, + condition = "MULTIPLE_TIME_TRAVEL_SPEC", + parameters = Map.empty) + + withSQLConf(SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED.key -> "false") { + checkError( + exception = intercept[ParseException] { + sql("SELECT * FROM t@v2345678910") + }, + condition = "UNSUPPORTED_FEATURE.TIME_TRAVEL_AT_SYNTAX", + parameters = Map("config" -> "\"spark.sql.timeTravel.atSyntax.enabled\""), + context = ExpectedContext(fragment = "t@v2345678910", start = 14, stop = 26)) + intercept[ParseException](spark.read.table("t@v2345678910")) + } + } + + val ts1 = DateTimeUtils.stringToTimestampAnsi( + UTF8String.fromString("2019-01-29 00:37:58"), + DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + val t3 = s"testcat.t$ts1" + withTable(t3) { + sql(s"CREATE TABLE $t3 (id int) USING foo") + sql(s"INSERT INTO $t3 VALUES (5), (6)") + + checkAnswer(sql("SELECT * FROM t@20190129003758000"), Seq(Row(5), Row(6))) + checkAnswer(spark.read.table("t@20190129003758000"), Seq(Row(5), Row(6))) + } + + intercept[ParseException](sql("SELECT * FROM t@foo")) + intercept[ParseException](spark.read.table("t@foo")) + withTable("testcat.`weird@v1`") { + sql("CREATE TABLE testcat.`weird@v1` (id int) USING foo") + sql("INSERT INTO testcat.`weird@v1` VALUES (42)") + checkAnswer(sql("SELECT * FROM `weird@v1`"), Row(42)) + checkAnswer(spark.read.table("`weird@v1`"), Row(42)) + } + + withTempView("v") { + spark.range(1).createOrReplaceTempView("v") + checkError( + exception = analysisException("SELECT * FROM v@v1"), + condition = "UNSUPPORTED_FEATURE.TIME_TRAVEL", + sqlState = None, + parameters = Map("relationId" -> "`v`")) + } + checkError( + exception = analysisException("WITH x AS (SELECT 1) SELECT * FROM x@v1"), + condition = "UNSUPPORTED_FEATURE.TIME_TRAVEL", + sqlState = None, + parameters = Map("relationId" -> "`x`")) + } + test("SPARK-37827: put build-in properties into V1Table.properties to adapt v2 command") { val t = "tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index e2c74533e7f3c..2ba374e3d6742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -84,6 +84,15 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { checkErrorTableNotFound(e, "`non_exist_table`") } + test("read: time travel @-syntax is unsupported for streaming") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream.table("t@v1") + }, + condition = "UNSUPPORTED_FEATURE.TIME_TRAVEL", + parameters = Map("relationId" -> "`t`")) + } + test("read: stream table API with temp view") { val tblName = "my_table" val stream = MemoryStream[Int]