From 398cadc8df17969306df8e091cee665c192258dd Mon Sep 17 00:00:00 2001 From: Anurag Mantripragada Date: Wed, 24 Jun 2026 16:45:48 -0700 Subject: [PATCH] [SPARK-57681][SQL] Support dynamic table options for DELETE and UPDATE --- .../sql/catalyst/parser/SqlBaseParser.g4 | 4 +- .../analysis/RewriteDeleteFromTable.scala | 3 +- .../analysis/RewriteUpdateTable.scala | 3 +- .../sql/catalyst/parser/AstBuilder.scala | 6 +- .../sql/catalyst/parser/DDLParserSuite.scala | 66 +++++++++++++++++++ .../InMemoryRowLevelOperationTable.scala | 18 +++-- .../connector/DeleteFromTableSuiteBase.scala | 17 +++++ .../RowLevelOperationSuiteBase.scala | 24 ++++++- .../sql/connector/UpdateTableSuiteBase.scala | 16 +++++ 9 files changed, 144 insertions(+), 13 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index cce03c169aac2..0c533e76ab271 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -739,8 +739,8 @@ resource dmlStatementNoWith : insertInto (query | LEFT_PAREN query RIGHT_PAREN queryAlias=tableAlias) #singleInsertQuery | fromClause multiInsertQueryBody+ #multiInsertQuery - | DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable - | UPDATE identifierReference tableAlias setClause whereClause? #updateTable + | DELETE FROM identifierReference optionsClause? tableAlias whereClause? #deleteFromTable + | UPDATE identifierReference optionsClause? tableAlias setClause whereClause? #updateTable | MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias USING (source=identifierReference | LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala index f8881e2077103..c8795b8ad9b1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLeve import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table} -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A rule that rewrites DELETE operations using plans that operate on individual or groups of rows. @@ -45,7 +44,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand { d case r @ ExtractV2Table(t: SupportsRowLevelOperations) => - val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty()) + val table = buildOperationTable(t, DELETE, r.options) table.operation match { case _: SupportsDelta => buildWriteDeltaPlan(r, table, cond) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala index f235374bd5d6f..d3312324ac6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDel import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table} import org.apache.spark.sql.types.IntegerType -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A rule that rewrites UPDATE operations using plans that operate on individual or groups of rows. @@ -41,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { EliminateSubqueryAliases(aliasedTable) match { case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) => - val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty()) + val table = buildOperationTable(tbl, UPDATE, r.options) val updateCond = cond.getOrElse(TrueLiteral) table.operation match { case _: SupportsDelta => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 43e094b5ccecd..7fb1519ffa9a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1224,7 +1224,8 @@ class AstBuilder extends DataTypeAstBuilder override def visitDeleteFromTable( ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) { val table = createUnresolvedRelation( - ctx.identifierReference, writePrivileges = Set(TableWritePrivilege.DELETE)) + ctx.identifierReference, Option(ctx.optionsClause()), + writePrivileges = Set(TableWritePrivilege.DELETE)) val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "DELETE") val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table) val predicate = if (ctx.whereClause() != null) { @@ -1237,7 +1238,8 @@ class AstBuilder extends DataTypeAstBuilder override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) { val table = createUnresolvedRelation( - ctx.identifierReference, writePrivileges = Set(TableWritePrivilege.UPDATE)) + ctx.identifierReference, Option(ctx.optionsClause()), + writePrivileges = Set(TableWritePrivilege.UPDATE)) val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "UPDATE") val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table) val assignments = withAssignments(ctx.setClause().assignmentList()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 4edeb3176798f..ac735823338c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransfo import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, StringType, StructType, TimestampLTZNanosType, TimestampNTZNanosType, TimestampType, TimeType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.StorageLevelMapper import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -2211,6 +2212,32 @@ class DDLParserSuite extends AnalysisTest { stop = 56)) } + test("delete from table: with options") { + parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size` = 5) WHERE a = 2", + DeleteFromTable( + UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), + new CaseInsensitiveStringMap( + java.util.Map.of("split-size", "5"))), + EqualTo(UnresolvedAttribute("a"), Literal(2)))) + } + + test("delete from table: with options and alias") { + parseCompare("DELETE FROM testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t WHERE t.a = 2", + DeleteFromTable( + SubqueryAlias("t", + UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), + new CaseInsensitiveStringMap( + java.util.Map.of("k", "v")))), + EqualTo(UnresolvedAttribute("t.a"), Literal(2)))) + } + + test("delete from table: options without values are not allowed") { + val e = intercept[ParseException] { + parsePlan("DELETE FROM testcat.ns1.ns2.tbl WITH (`split-size`)") + } + assert(e.getMessage.contains("Values must be specified for key(s): [split-size]")) + } + test("update table: basic") { parseCompare( """ @@ -2253,6 +2280,45 @@ class DDLParserSuite extends AnalysisTest { stop = 70)) } + test("update table: with options") { + parseCompare( + """ + |UPDATE testcat.ns1.ns2.tbl WITH (`write.split-size` = 10) + |SET a='Robert', b=32 + """.stripMargin, + UpdateTable( + UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), + new CaseInsensitiveStringMap( + java.util.Map.of("write.split-size", "10"))), + Seq(Assignment(UnresolvedAttribute("a"), Literal("Robert")), + Assignment(UnresolvedAttribute("b"), Literal(32))), + None)) + } + + test("update table: with options and alias") { + parseCompare( + """ + |UPDATE testcat.ns1.ns2.tbl WITH (`k` = 'v') AS t + |SET t.a='Robert', t.b=32 + |WHERE t.c=2 + """.stripMargin, + UpdateTable( + SubqueryAlias("t", + UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"), + new CaseInsensitiveStringMap( + java.util.Map.of("k", "v")))), + Seq(Assignment(UnresolvedAttribute("t.a"), Literal("Robert")), + Assignment(UnresolvedAttribute("t.b"), Literal(32))), + Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2))))) + } + + test("update table: options without values are not allowed") { + val e = intercept[ParseException] { + parsePlan("UPDATE testcat.ns1.ns2.tbl WITH (`split-size`) SET a = 1") + } + assert(e.getMessage.contains("Values must be specified for key(s): [split-size]")) + } + test("merge into table: basic") { parseCompare( """ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index 3b94f19f6fcdc..f62f1b0dce25e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -32,6 +32,14 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ArrayImplicits._ +/** + * Test helper trait mixed into the in-memory row-level operations so tests can verify that + * per-statement SQL options reach the operation via [[RowLevelOperationInfo#options]]. + */ +trait RowLevelOperationWithOptions { + def options: CaseInsensitiveStringMap +} + class InMemoryRowLevelOperationTable private ( name: String, columns: Array[Column], @@ -108,13 +116,14 @@ class InMemoryRowLevelOperationTable private ( override def newRowLevelOperationBuilder( info: RowLevelOperationInfo): RowLevelOperationBuilder = { if (properties.getOrDefault(SUPPORTS_DELTAS, "false") == "true") { - () => DeltaBasedOperation(info.command) + () => DeltaBasedOperation(info.command, info.options) } else { - () => PartitionBasedOperation(info.command) + () => PartitionBasedOperation(info.command, info.options) } } - case class PartitionBasedOperation(command: Command) extends RowLevelOperation { + case class PartitionBasedOperation(command: Command, options: CaseInsensitiveStringMap) + extends RowLevelOperation with RowLevelOperationWithOptions { var configuredScan: InMemoryBatchScan = _ override def requiredMetadataAttributes(): Array[NamedReference] = { @@ -183,7 +192,8 @@ class InMemoryRowLevelOperationTable private ( } } - case class DeltaBasedOperation(command: Command) extends RowLevelOperation with SupportsDelta { + case class DeltaBasedOperation(command: Command, options: CaseInsensitiveStringMap) + extends RowLevelOperation with SupportsDelta with RowLevelOperationWithOptions { private final val PK_COLUMN_REF = FieldReference("pk") override def requiredMetadataAttributes(): Array[NamedReference] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index b894d5d75b3c8..bf19d113fc6c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -1051,4 +1051,21 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { fail("unexpected executed plan: " + other) } } + + test("delete with dynamic options") { + createAndInitTable("pk INT NOT NULL, id INT, dep STRING", + """{ "pk": 1, "id": 1, "dep": "hr" } + |{ "pk": 2, "id": 2, "dep": "software" } + |{ "pk": 3, "id": 3, "dep": "hr" } + |""".stripMargin) + + // the WITH options must reach the relation, the RowLevelOperationInfo, and the write builder + checkRowLevelOperationOptions( + sql(s"DELETE FROM $tableNameAsString WITH (`write.split-size` = 10) WHERE id IN (1, 100)"), + "write.split-size" -> "10") + + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/RowLevelOperationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/RowLevelOperationSuiteBase.scala index 199b9ecbe0a07..6490695fbc356 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/RowLevelOperationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/RowLevelOperationSuiteBase.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceData, WriteDelta} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, Table, TableInfo, Txn, TxnTable, Update, Write} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, RowLevelOperationWithOptions, Table, TableInfo, Txn, TxnTable, Update, Write} import org.apache.spark.sql.connector.expressions.LogicalExpressions.{identity, reference} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.RowLevelOperationTable @@ -179,6 +179,28 @@ abstract class RowLevelOperationSuiteBase }.getOrElse(fail("couldn't find row-level operation in optimized plan")) } + // runs a row-level command and asserts the given SQL options reached every layer that should + // carry them: the rewritten DataSourceV2Relation, the RowLevelOperationInfo passed to the + // operation builder (the row-level scan/write planning hook), and the write builder's + // LogicalWriteInfo + protected def checkRowLevelOperationOptions( + func: => Unit, + expectedOptions: (String, String)*): Unit = { + val Seq(qe) = withQueryExecutionsCaptured(spark)(func) + val writeRelation = qe.optimizedPlan.collectFirst { + case rd: ReplaceData => rd.table + case wd: WriteDelta => wd.table + }.getOrElse(fail("couldn't find row-level operation in optimized plan")) + .asInstanceOf[DataSourceV2Relation] + val operation = writeRelation.table.asInstanceOf[RowLevelOperationTable].operation + .asInstanceOf[RowLevelOperationWithOptions] + expectedOptions.foreach { case (key, value) => + assert(writeRelation.options.get(key) === value, s"relation option '$key'") + assert(operation.options.get(key) === value, s"row-level operation option '$key'") + assert(table.lastWriteInfo.options().get(key) === value, s"write option '$key'") + } + } + protected def assertNoScanPlanning(plan: LogicalPlan): Unit = { val relations = plan.collect { case r: DataSourceV2Relation => r } assert(relations.nonEmpty, "plan must contain relations") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala index 8eb314e00df81..8798b4a90dcce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala @@ -1232,4 +1232,20 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { Row(1, 100, "hr"), Row(2, 200, "software"))) } + + test("update with dynamic options") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |""".stripMargin) + + // the WITH options must reach the relation, the RowLevelOperationInfo, and the write builder + checkRowLevelOperationOptions( + sql(s"UPDATE $tableNameAsString WITH (`write.split-size` = 10) SET salary = -1 WHERE pk = 1"), + "write.split-size" -> "10") + + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Row(1, -1, "hr") :: Row(2, 200, "software") :: Nil) + } }