[WIP][SPARK-57421][SQL][CONNECT] Support @-syntax version time travel on table names#56481
[WIP][SPARK-57421][SQL][CONNECT] Support @-syntax version time travel on table names#56481sotikoug83 wants to merge 3 commits into
Conversation
…me travel on table names
| TemporalIdentifier(visitMultipartIdentifier(ctx.id), timestamp, version) | ||
| } | ||
|
|
||
| private val atSyntaxTimestampFormat = "yyyyMMddHHmmssSSS" |
There was a problem hiding this comment.
let's don't define the constant here. Also, try finding if you can reuse some existing constant
There was a problem hiding this comment.
Moved but could not find existing constant to use.
gengliangwang
left a comment
There was a problem hiding this comment.
0 blocking, 6 non-blocking, 0 nits.
Nicely scoped, well-tested Delta-style @ time-travel shorthand. Findings are naming/consistency/polish, not behavior defects.
Design / architecture (4)
- QueryParsingErrors.scala:875: new
INVALID_TIMESTAMP_FORMATdiverges from the time-travel error family — see inline - SQLConf.scala:7006: feature enabled by default (
atSyntax.enabled=true) — confirm intended — see inline - SparkConnectPlanner.scala:1676: classic
DataStreamReader.table()lacks Connect's clean streaming guard — see inline - AstBuilder.scala:2653:
visitTableNamereimplementsTemporalIdentifier.wrapTimeTravelinline — see inline
Suggestions (2)
- SparkConnectPlanner.scala:1678: use
toSQLId(notquoteNameParts) forrelationIdconsistency — see inline - PlanParserSuite.scala:2228: add a malformed-component
@-timestamp test — see inline
Verification
Traced the timestamp path: @<17-digit> builds Literal(micros, TimestampType) at parse time (session TZ), and at resolution TimeTravelSpec.create runs Cast(timestamp -> timestamp), which is identity for instant-based timestamps — so there is no double time-zone conversion and it resolves to the same micros as TIMESTAMP AS OF (test confirms @20190129003758000 == stringToTimestampAnsi("2019-01-29 00:37:58")). INTEGER_VALUE is pure [0-9]+, the 17-char length is checked, and out-of-range components raise the caught DateTimeException, so no uncaught NumberFormatException. @+AS-OF conflict is rejected at parse; @+reader-options is rejected at RelationResolution, so the broadened MULTIPLE_TIME_TRAVEL_SPEC message is accurate.
PR description suggestions
- Document the timestamp-format restriction:
@<timestamp>accepts only the fixed 17-digityyyyMMddHHmmssSSSform (Delta convention), unlikeTIMESTAMP AS OF's arbitrary expression. This is the most consequential design decision and isn't in the description body. - Document that the feature is enabled by default (
spark.sql.timeTravel.atSyntax.enabled=true).
| def invalidAtSyntaxTimestamp( | ||
| timestamp: String, format: String, ctx: ParserRuleContext): Throwable = { | ||
| new ParseException( | ||
| errorClass = "INVALID_TIMESTAMP_FORMAT", |
There was a problem hiding this comment.
INVALID_TIMESTAMP_FORMAT is a generic, top-level error name, but it's only ever used for the @-syntax time-travel timestamp. The codebase already has a time-travel timestamp error family — INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.{INPUT,UNEVALUABLE,NON_DETERMINISTIC,OPTION} and INVALID_TIME_TRAVEL_SPEC. Could this fit there instead, e.g. a new INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.FORMAT sub-condition or a dedicated INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT? Error-condition names are stable API, so worth settling before this ships.
There was a problem hiding this comment.
Good idea, using a dedicated INVALID_TIME_TRAVEL_TIMESTAMP_FORMAT
| "fails at parse time.") | ||
| .version("5.0.0") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
This enables the new @ syntax by default. The backward-compat risk looks low — unquoted @ was previously a parse error, and @-named tables need backticks (keeping @ inside the identifier) — but is enabling a brand-new (and [WIP]) syntax on by default intended here, rather than starting opt-in? Just flagging the default for an explicit decision.
There was a problem hiding this comment.
Thank you for raising this, decided default remains true.
| parser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier), | ||
| val temporalIdent = | ||
| parser.parseTemporalTableIdentifier(rel.getNamedTable.getUnparsedIdentifier) | ||
| if (temporalIdent.isTemporal && rel.getIsStreaming) { |
There was a problem hiding this comment.
Good that Connect rejects @ + streaming with a clear UNSUPPORTED_FEATURE.TIME_TRAVEL. The classic DataStreamReader.table() (sql/core/.../classic/DataStreamReader.scala, still on parseMultipartIdentifier) has no equivalent guard, so spark.readStream.table("t@v1") falls through to a generic PARSE_SYNTAX_ERROR. It was already an error pre-PR so this isn't a regression, but for parity consider routing the classic streaming table() through parseTemporalTableIdentifier and raising the same timeTravelUnsupportedError.
| } | ||
| val relation = createUnresolvedRelation(ttCtx.identifierReference, Option(ctx.optionsClause)) | ||
| val withAtSpec = if (hasAtSpec) { | ||
| RelationTimeTravel(relation, atTimestamp, atVersion) |
There was a problem hiding this comment.
visitTableName now reconciles two time-travel sources inline (the @ spec and the AS OF temporalClause): read the @ spec, reject the both-specified case, conditionally wrap in RelationTimeTravel, then still optionalMap(temporalClause)(withTimeTravel). That's a fair bit heavier than the original 4-line method.
Note it can't simply route through TemporalIdentifier.wrapTimeTravel here: this path needs the IdentifierReferenceContext (for IDENTIFIER(...), via createUnresolvedRelation -> withIdentClause), which TemporalIdentifier's Seq[String] nameParts can't carry. So the clean reduction is a plain plan-level helper that owns the conflict-check + source-selection + wrap, leaving visitTableName as its original flat optionalMap chain:
val relation = createUnresolvedRelation(ttCtx.identifierReference, Option(ctx.optionsClause))
val withTT = withTableTimeTravel(relation, ttCtx, ctx.temporalClause)
val table = mayApplyAliasPlan(ctx.tableAlias, withTT)
// ... sample / watermark as before
/** Applies the '@' suffix and/or the AS OF clause, rejecting more than one. */
private def withTableTimeTravel(
relation: LogicalPlan,
ttCtx: TemporalTableIdentifierReferenceContext,
clause: TemporalClauseContext): LogicalPlan = {
val (atTs, atVer) = temporalSpec(ttCtx, ttCtx.timestamp, ttCtx.version)
if ((atTs.isDefined || atVer.isDefined) && clause != null) {
withOrigin(clause)(throw QueryParsingErrors.multipleTimeTravelSpec(clause))
}
val withAt =
if (atTs.isDefined || atVer.isDefined) RelationTimeTravel(relation, atTs, atVer) else relation
withAt.optionalMap(clause)(withTimeTravel)
}This isolates the new behavior in one named, testable helper. The reader/Connect paths have no AS OF clause (so no conflict to check) and keep the simpler wrapTimeTravel.
| parser.parseTemporalTableIdentifier(rel.getNamedTable.getUnparsedIdentifier) | ||
| if (temporalIdent.isTemporal && rel.getIsStreaming) { | ||
| throw QueryCompilationErrors.timeTravelUnsupportedError( | ||
| QuotingUtils.quoteNameParts(temporalIdent.nameParts)) |
There was a problem hiding this comment.
The five other timeTravelUnsupportedError call sites pass toSQLId(...); this one uses QuotingUtils.quoteNameParts. They produce the same string for ordinary names (toSQLId additionally strips the __auto_generated_subquery_name prefix), so for consistency prefer toSQLId(temporalIdent.nameParts) here (may need the corresponding import).
|
|
||
| checkError( | ||
| exception = parseException("SELECT * FROM t@123"), | ||
| condition = "INVALID_TIMESTAMP_FORMAT", |
There was a problem hiding this comment.
This covers the wrong-length timestamp. Consider also adding a malformed-component case (e.g. t@20191301000000000, month 13) — it exercises the DateTimeException -> INVALID_TIMESTAMP_FORMAT catch branch in parseAtSyntaxTimestamp, which the length check never reaches.
There was a problem hiding this comment.
Split timestamp-specific @ shorthand parsing changes to: #56847
|
I would like to check this one too. |
What changes were proposed in this pull request?
Add the
@time travel shorthand on table names so thatand
parse into the same plan as the existing
VERSION AS OF/TIMESTAMP AS OFandoption("versionAsOf")/option("timestampAsOf")clauses. Support for SQL, the DataFrame reader, and Spark Connect.Why are the changes needed?
Spark has SQL
AS OFand theversionAsOf/timestampAsOfreader options but not the compact@suffix. Resolving it at parse time also simplifies the time travel entry points pipeline.Does this PR introduce any user-facing change?
Yes, new syntax on table names gated by a
conf(spark.sql.timeTravel.atSyntax.enabled) which allows time travel, default true. No longer gives a parse error when enabled and the syntax is used. Additionally, different fromTIMESTAMP AS OF <TS>(which allows for arbitrary expressions in<TS>), the@<ts>timestamp only accepts the fixed 17-digityyyyMMddHHmmssSSSDelta convention.How was this patch tested?
New tests in
PlanParserSuite,DataSourceV2SQLSuite, andSparkConnectPlannerSuite.Was this patch authored or co-authored using generative AI tooling?
With help from Claude :)