Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5062,6 +5062,12 @@
},
"sqlState" : "42K0E"
},
"INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT" : {
"message" : [
"The provided timestamp <timestamp> doesn't match the expected syntax <format>."
],
"sqlState" : "22000"
},
"INVALID_TYPED_LITERAL" : {
"message" : [
"The value of the typed literal <valueType> is invalid: <value>."
Expand Down Expand Up @@ -5579,7 +5585,7 @@
},
"MULTIPLE_TIME_TRAVEL_SPEC" : {
"message" : [
"Cannot specify time travel in both the time travel clause and options."
"Cannot specify time travel in more than one of: the '@' suffix in the table name, the time travel clause, and the read options."
],
"sqlState" : "42K0E"
},
Expand Down Expand Up @@ -8469,6 +8475,11 @@
"Time travel on the relation: <relationId>."
]
},
"TIME_TRAVEL_AT_SYNTAX" : {
"message" : [
"Time travel using the '@' suffix in table names. Set <config> to true to enable it."
]
},
"TOO_MANY_TYPE_ARGUMENTS_FOR_UDF_CLASS" : {
"message" : [
"UDF class with <num> type arguments."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ OPERATOR_PIPE: '|>';
HAT: '^';
COLON: ':';
DOUBLE_COLON: '::';
AT_SIGN: '@';
AT_VERSION: '@V';
ARROW: '->';
FAT_ARROW : '=>';
HENT_START: '/*+';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ singleTableIdentifier
: tableIdentifier EOF
;

singleTemporalTableIdentifier
: temporalTableIdentifier EOF
;

singleMultipartIdentifier
: multipartIdentifier EOF
;
Expand Down Expand Up @@ -1133,7 +1137,7 @@ relationPrimary
: streamRelationPrimary #streamRelation
| identifierReference changesClause
optionsClause? tableAlias #changelogTableName
| identifierReference temporalClause?
| temporalTableIdentifierReference temporalClause?
optionsClause? sample? watermarkClause? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? watermarkClause?
tableAlias #aliasedQuery
Expand Down Expand Up @@ -1239,6 +1243,18 @@ tableIdentifier
: (db=errorCapturingIdentifier DOT)? table=errorCapturingIdentifier
;

temporalTableIdentifier
: id=multipartIdentifier AT_SIGN timestamp=INTEGER_VALUE
| id=multipartIdentifier AT_VERSION version
| id=multipartIdentifier
;

temporalTableIdentifierReference
: identifierReference AT_SIGN timestamp=INTEGER_VALUE
| identifierReference AT_VERSION version
| identifierReference
;

functionIdentifier
: (db=errorCapturingIdentifier DOT)? function=errorCapturingIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,28 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def invalidAtSyntaxTimestamp(
timestamp: String, format: String, ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT",
messageParameters = Map("timestamp" -> timestamp, "format" -> format),
ctx)
}

def multipleTimeTravelSpec(ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "MULTIPLE_TIME_TRAVEL_SPEC",
messageParameters = Map.empty,
ctx)
}

def timeTravelAtSyntaxDisabled(configKey: String, ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.TIME_TRAVEL_AT_SYNTAX",
messageParameters = Map("config" -> toSQLConf(configKey)),
ctx)
}

def invalidNameForDropTempFunc(name: Seq[String], ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "INVALID_SQL_SYNTAX.MULTI_PART_NAME",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ abstract class AbstractSqlParser extends AbstractParser with ParserInterface {
}
}

/** Creates a TemporalIdentifier for a given SQL string */
override def parseTemporalTableIdentifier(sqlText: String): TemporalIdentifier = {
parse(sqlText) { parser =>
val ctx = parser.singleTemporalTableIdentifier()
withErrorHandling(ctx, Some(sqlText)) {
astBuilder.visitSingleTemporalTableIdentifier(ctx)
}
}
}

/** Creates LogicalPlan for a given SQL string of query. */
override def parseQuery(sqlText: String): LogicalPlan =
parse(sqlText) { parser =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.parser

import java.time.{DateTimeException, LocalDateTime}
import java.util.{List, Locale}
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -45,7 +46,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeConstants, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, fractionalSecondsDigits, getZoneId, stringToDate, stringToTime, stringToTimestamp, stringToTimestampLTZNanos, stringToTimestampNTZNanos, stringToTimestampWithoutTimeZone}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogContext, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange}
Expand Down Expand Up @@ -708,6 +709,11 @@ class AstBuilder extends DataTypeAstBuilder
visitMultipartIdentifier(ctx.multipartIdentifier)
}

override def visitSingleTemporalTableIdentifier(
ctx: SingleTemporalTableIdentifierContext): TemporalIdentifier = withOrigin(ctx) {
visitTemporalTableIdentifier(ctx.temporalTableIdentifier)
}

override def visitSinglePathElementList(
ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) {
ctx.pathElement().asScala.map(visitPathElement).toSeq
Expand Down Expand Up @@ -2634,13 +2640,89 @@ class AstBuilder extends DataTypeAstBuilder
* Create an aliased table reference. This is typically used in FROM clauses.
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause))
val table = mayApplyAliasPlan(
ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel))
val ttCtx = ctx.temporalTableIdentifierReference
val relation = createUnresolvedRelation(ttCtx.identifierReference, Option(ctx.optionsClause))
val withTimeTravelSpec = withTableTimeTravel(relation, ttCtx, ctx.temporalClause)
val table = mayApplyAliasPlan(ctx.tableAlias, withTimeTravelSpec)
val sample = table.optionalMap(ctx.sample)(withSample)
sample.optionalMap(ctx.watermarkClause)(withWatermark)
}

/**
* Applies the table-name '@' time travel suffix and/or the `AS OF` clause to `relation`.
*/
private def withTableTimeTravel(
relation: LogicalPlan,
ttCtx: TemporalTableIdentifierReferenceContext,
clause: TemporalClauseContext): LogicalPlan = {
val (atTimestamp, atVersion) = temporalSpec(ttCtx, ttCtx.timestamp, ttCtx.version)
val hasAtSpec = atTimestamp.isDefined || atVersion.isDefined
if (hasAtSpec && clause != null) {
withOrigin(clause) {
throw QueryParsingErrors.multipleTimeTravelSpec(clause)
}
}
val withAtSpec =
if (hasAtSpec) RelationTimeTravel(relation, atTimestamp, atVersion) else relation
withAtSpec.optionalMap(clause)(withTimeTravel)
}

override def visitTemporalTableIdentifier(
ctx: TemporalTableIdentifierContext): TemporalIdentifier = withOrigin(ctx) {
val (timestamp, version) = temporalSpec(ctx, ctx.timestamp, ctx.version)
TemporalIdentifier(visitMultipartIdentifier(ctx.id), timestamp, version)
}

/**
* Parse the digits of an '@' time travel timestamp (format yyyyMMddHHmmssSSS) to
* microseconds since epoch in the session time zone.
*/
private def parseAtSyntaxTimestamp(text: String, ctx: ParserRuleContext): Long = {
val format = TemporalIdentifier.TimestampFormat
if (text.length != format.length) {
throw QueryParsingErrors.invalidAtSyntaxTimestamp(text, format, ctx)
}
try {
val localDateTime = LocalDateTime.of(
text.substring(0, 4).toInt,
text.substring(4, 6).toInt,
text.substring(6, 8).toInt,
text.substring(8, 10).toInt,
text.substring(10, 12).toInt,
text.substring(12, 14).toInt,
text.substring(14, 17).toInt * DateTimeConstants.NANOS_PER_MILLIS.toInt)
DateTimeUtils.instantToMicros(
localDateTime.atZone(getZoneId(conf.sessionLocalTimeZone)).toInstant)
} catch {
case _: DateTimeException =>
throw QueryParsingErrors.invalidAtSyntaxTimestamp(text, format, ctx)
}
}

/**
* Extract the optional '@' time travel suffix of a table identifier: '@<timestamp>'
* (format yyyyMMddHHmmssSSS) or '@v<version>'.
*/
private def temporalSpec(
ctx: ParserRuleContext,
timestampToken: Token,
versionCtx: VersionContext): (Option[Expression], Option[String]) = {
if (timestampToken == null && versionCtx == null) {
(None, None)
} else {
if (!conf.getConf(SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED)) {
throw QueryParsingErrors.timeTravelAtSyntaxDisabled(
SQLConf.TIME_TRAVEL_AT_SYNTAX_ENABLED.key, ctx)
}
if (timestampToken != null) {
val micros = parseAtSyntaxTimestamp(timestampToken.getText, ctx)
(Some(Literal(micros, TimestampType)), None)
} else {
(None, visitVersion(versionCtx))
}
}
}

override def visitVersion(ctx: VersionContext): Option[String] = {
if (ctx != null) {
if (ctx.INTEGER_VALUE() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ trait ParserInterface extends DataTypeParserInterface {
@throws[ParseException]("Text cannot be parsed to a multi-part identifier")
def parseMultipartIdentifier(sqlText: String): Seq[String]

/**
* Parse a string to a [[TemporalIdentifier]].
*/
@throws[ParseException]("Text cannot be parsed to a temporal table identifier")
def parseTemporalTableIdentifier(sqlText: String): TemporalIdentifier = {
// Default implementation does not recognize the time travel suffix. Concrete
// implementations can override this to support it.
TemporalIdentifier(parseMultipartIdentifier(sqlText), None, None)
}

/**
* Parse a query string to a [[LogicalPlan]].
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser

import org.apache.spark.sql.catalyst.analysis.RelationTimeTravel
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* Result of parsing a table name that may carry an '@' time travel suffix: `name@v<version>`
* or `name@<yyyyMMddHHmmssSSS>`. At most one of `timestamp` and `version` is set.
*/
case class TemporalIdentifier(
nameParts: Seq[String],
timestamp: Option[Expression],
version: Option[String]) {

def isTemporal: Boolean = timestamp.isDefined || version.isDefined

/** Wraps `plan` in a [[RelationTimeTravel]] if a time travel suffix was specified. */
def wrapTimeTravel(plan: LogicalPlan): LogicalPlan = {
if (isTemporal) RelationTimeTravel(plan, timestamp, version) else plan
}
}

object TemporalIdentifier {
/** The fixed timestamp format accepted in the suffix. */
val TimestampFormat: String = "yyyyMMddHHmmssSSS"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6994,6 +6994,17 @@ object SQLConf {
.stringConf
.createWithDefault("versionAsOf")

val TIME_TRAVEL_AT_SYNTAX_ENABLED =
buildConf("spark.sql.timeTravel.atSyntax.enabled")
.doc("When true, a table name in a query or in table-reading APIs can carry a time " +
"travel suffix: 'name@v123' reads version 123 of the table, and " +
"'name@20240101000000000' (format yyyyMMddHHmmssSSS, interpreted in the session " +
"time zone) reads the table as of that timestamp. When false, '@' in table names " +
"fails at parse time.")
.version("5.0.0")
.booleanConf
.createWithDefault(true)

val OPERATOR_PIPE_SYNTAX_ENABLED =
buildConf("spark.sql.operatorPipeSyntaxEnabled")
.doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " +
Expand Down
Loading