diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala index 2c7b299b..f44e828f 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala @@ -26,10 +26,13 @@ import app.softnetwork.elastic.schema.{Index, IndexMappings} import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask} import app.softnetwork.elastic.sql.{query, schema, PainlessContextType} import app.softnetwork.elastic.sql.query.{ + Delete, + Insert, SQLAggregation, SearchStatement, SelectStatement, - SingleSearch + SingleSearch, + Update } import app.softnetwork.elastic.sql.schema.{Schema, TableAlias} import app.softnetwork.elastic.sql.transform.{ @@ -212,6 +215,9 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { override def deleteByQuery(index: String, query: String, refresh: Boolean): ElasticResult[Long] = delegate.deleteByQuery(index, query, refresh) + override def deleteByQuery(index: String, delete: Delete, refresh: Boolean): ElasticResult[Long] = + delegate.deleteByQuery(index, delete, refresh) + override def isIndexClosed(index: String): ElasticResult[Boolean] = delegate.isIndexClosed(index) @@ -236,6 +242,14 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { ): ElasticResult[Long] = delegate.updateByQuery(index, query, pipelineId, refresh) + override def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = + delegate.updateByQuery(index, update, pipelineId, refresh) + /** Insert documents by query into an index. * * @param index @@ -252,6 +266,11 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { ): Future[ElasticResult[DmlResult]] = delegate.insertByQuery(index, query, refresh) + override def insertByQuery(index: String, insert: Insert, refresh: Boolean)(implicit + system: ActorSystem + ): Future[ElasticResult[DmlResult]] = + delegate.insertByQuery(index, insert, refresh) + /** Load the schema for the provided index. * * @param index diff --git a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala index dd7e4586..6f2059b2 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala @@ -195,7 +195,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement implicit val ec: ExecutionContext = system.dispatcher statement match { case delete: Delete => - api.deleteByQuery(delete.table.name, delete.sql) match { + // Pass the AST directly — avoids the AST → Delete.sql → re-parse round-trip. + api.deleteByQuery(delete.table.name, delete, refresh = true) match { case ElasticSuccess(count) => logger.info(s"✅ Deleted $count documents from ${delete.table.name}.") Future.successful(ElasticResult.success(DmlResult(deleted = count))) @@ -207,7 +208,9 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement ) } case update: Update => - api.updateByQuery(update.table, update.sql) match { + // Pass the AST directly — avoids the AST → Update.sql → re-parse round-trip that + // historically dropped the syntactic shape of string literals (issue #92). + api.updateByQuery(update.table, update, None, refresh = true) match { case ElasticSuccess(count) => logger.info(s"✅ Updated $count documents in ${update.table}.") Future.successful(ElasticResult.success(DmlResult(updated = count))) @@ -219,7 +222,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement ) } case insert: Insert => - api.insertByQuery(insert.table, insert.sql).map { + // Pass the AST directly — avoids the AST → Insert.sql → re-parse round-trip. + api.insertByQuery(insert.table, insert, refresh = true).map { case success @ ElasticSuccess(res) => logger.info(s"✅ Inserted ${res.inserted} documents into ${insert.table}.") success diff --git a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala index a5aa2afc..eee7a33f 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala @@ -18,7 +18,7 @@ package app.softnetwork.elastic.client import akka.actor.ActorSystem import akka.stream.scaladsl.Source -import app.softnetwork.elastic.client.bulk.BulkOptions +import app.softnetwork.elastic.client.bulk.{BulkOptions, BulkResult} import app.softnetwork.elastic.client.result._ import app.softnetwork.elastic.schema.Index import app.softnetwork.elastic.sql.PainlessContextType @@ -756,25 +756,78 @@ trait IndicesApi extends ElasticClientHelpers { query: String, refresh: Boolean = true ): ElasticResult[Long] = { + val result = for { + _ <- validateDeleteIndex(index) + jsonQuery <- parseQueryForDeletion(index, query) + deleted <- runDeleteByQuery(index, jsonQuery, refresh) + } yield deleted + finalizeDeleteByQuery(index, result) + } + /** Delete documents using an already-parsed [[Delete]] AST. Skips the SQL parser entirely. + * + * Same motivation as the [[updateByQuery]] AST overload: avoid the AST → `Delete.sql` → re-parse + * round-trip when [[GatewayApi.run]] already has the AST. + */ + def deleteByQuery( + index: String, + delete: Delete, + refresh: Boolean + ): ElasticResult[Long] = { val result = for { - // 1. Validate index name - _ <- validateIndexName(index) - .toLeft(()) - .left - .map(err => - err.copy( - operation = Some("deleteByQuery"), - statusCode = Some(400), - index = Some(index), - message = s"Invalid index: ${err.message}" + _ <- validateDeleteIndex(index) + _ <- + if (delete.table.name != index) + Left( + sqlErrorFor( + operation = "deleteByQuery", + index = index, + message = + s"SQL query index '${delete.table.name}' does not match provided index '$index'" + ) ) - ) + else Right(()) + jsonQuery = delete.where match { + case None => + logger.info( + s"SQL delete query has no WHERE clause, deleting all documents from index '$index'" + ) + """{"query": {"match_all": {}}}""" + case Some(where) => + implicit val timestamp: Long = System.currentTimeMillis() + val search: String = + SingleSearch( + from = From(tables = Seq(delete.table)), + where = Some(where), + deleteByQuery = true + ) + logger.info(s"✅ Converted SQL delete query to search for deleteByQuery: $search") + search + } + deleted <- runDeleteByQuery(index, jsonQuery, refresh) + } yield deleted + finalizeDeleteByQuery(index, result) + } - // 2. Parse query (SQL or JSON) - jsonQuery <- parseQueryForDeletion(index, query) + private def validateDeleteIndex(index: String): Either[ElasticError, Unit] = + validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("deleteByQuery"), + statusCode = Some(400), + index = Some(index), + message = s"Invalid index: ${err.message}" + ) + ) - // 3. Check index exists + private def runDeleteByQuery( + index: String, + jsonQuery: String, + refresh: Boolean + ): Either[ElasticError, Long] = + for { _ <- indexExists(index, pattern = false) match { case ElasticSuccess(true) => Right(()) case ElasticSuccess(false) => @@ -788,30 +841,25 @@ trait IndicesApi extends ElasticClientHelpers { ) case ElasticFailure(err) => Left(err) } - - // 4. Open index if needed tuple <- openIfNeeded(index) (_, restore) = tuple - - // 5. Execute delete-by-query deleted <- executeDeleteByQuery(index, jsonQuery, refresh).toEither - - // 6. Restore state _ <- restore().toEither.left.map { restoreErr => logger.warn(s"❌ Failed to restore index state for '$index': ${restoreErr.message}") restoreErr } - } yield deleted - result match { - case Right(count) => - logger.info(s"✅ Deleted $count documents from index '$index'") - ElasticSuccess(count) - case Left(err) => - logger.error(s"❌ Failed to delete by query on index '$index': ${err.message}") - ElasticFailure(err) - } + private def finalizeDeleteByQuery( + index: String, + result: Either[ElasticError, Long] + ): ElasticResult[Long] = result match { + case Right(count) => + logger.info(s"✅ Deleted $count documents from index '$index'") + ElasticSuccess(count) + case Left(err) => + logger.error(s"❌ Failed to delete by query on index '$index': ${err.message}") + ElasticFailure(err) } /** Update documents by query from an index. @@ -850,14 +898,81 @@ trait IndicesApi extends ElasticClientHelpers { // 2. Parse SQL or JSON parsed <- parseQueryForUpdate(index, query).toEither + count <- runUpdateByQuery(index, parsed, pipelineId, refresh) + } yield count + + result match { + case Right(count) => ElasticSuccess(count) + case Left(err) => ElasticFailure(err) + } + } + + /** Update documents by query using an already-parsed [[Update]] AST. + * + * Skips the SQL parser entirely — used by [[GatewayApi.run]] when the dispatcher has already + * parsed the SQL once. Avoids the round-trip AST → `Update.sql` → re-parse that historically + * lost the syntactic shape of string literals (see issue #92). + */ + def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = { + val result = for { + _ <- validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("updateByQuery"), + statusCode = Some(400), + index = Some(index) + ) + ) + + _ <- + if (update.table != index) + Left( + sqlErrorFor( + operation = "updateByQuery", + index = index, + message = s"SQL query index '${update.table}' does not match provided index '$index'" + ) + ) + else Right(()) + + count <- runUpdateByQuery(index, Left(update), pipelineId, refresh) + } yield count + + result match { + case Right(count) => ElasticSuccess(count) + case Left(err) => ElasticFailure(err) + } + } + + /** Shared execution path for both [[updateByQuery]] overloads. Runs everything from the + * post-parse step (pipeline extraction → WHERE → JSON conversion → execute). + * + * @param parsed + * Left(Update) for SQL update statements; Right(jsonQuery) for JSON passthrough or for a + * SELECT that [[parseQueryForUpdate]] has already converted into JSON. + */ + private def runUpdateByQuery( + index: String, + parsed: Either[Update, String], + pipelineId: Option[String], + refresh: Boolean + ): Either[ElasticError, Long] = { + for { // 3. Extract SQL pipeline (optional) - sqlPipeline = parsed match { + sqlPipeline <- Right(parsed match { case Left(u: Update) if u.values.nonEmpty => Some(u.customPipeline) case _ => None - } + }) - // 4. Extract SQL WHERE → JSON query - jsonQuery = parsed match { + // 4. Build the JSON query to execute + jsonQuery <- Right(parsed match { case Left(u: Update) => u.where match { case None => @@ -878,8 +993,9 @@ trait IndicesApi extends ElasticClientHelpers { search } - case _ => query // JSON passthrough - } + case Right(jsonOrConverted) => + jsonOrConverted + }) // 5. Load user pipeline if provided userPipeline <- pipelineId match { @@ -937,11 +1053,6 @@ trait IndicesApi extends ElasticClientHelpers { _ <- restore().toEither } yield updated - - result match { - case Right(count) => ElasticSuccess(count) - case Left(err) => ElasticFailure(err) - } } /** Insert documents by query into an index. @@ -960,26 +1071,100 @@ trait IndicesApi extends ElasticClientHelpers { refresh: Boolean = true )(implicit system: ActorSystem): Future[ElasticResult[DmlResult]] = { implicit val ec: ExecutionContext = system.dispatcher + val result = for { + _ <- Future.fromTry(validateInsertIndex(index).toTry) + parsed <- parseInsertQuery(index, query).toFuture + out <- runInsertByQuery(index, parsed, refresh) + } yield out + insertResultFinalize(index, result) + } + /** Insert documents using an already-parsed [[Insert]] AST. Skips the SQL parser entirely. + * + * Same motivation as the [[updateByQuery]] AST overload — avoid the AST → `Insert.sql` → + * re-parse round-trip when [[GatewayApi.run]] already has the AST. + */ + def insertByQuery( + index: String, + insert: Insert, + refresh: Boolean + )(implicit system: ActorSystem): Future[ElasticResult[DmlResult]] = { + implicit val ec: ExecutionContext = system.dispatcher val result = for { - // 1. Validate index - _ <- Future.fromTry( - validateIndexName(index) - .toLeft(()) - .left - .map(err => - err.copy( + _ <- Future.fromTry(validateInsertIndex(index).toTry) + _ <- Future.fromTry { + if (insert.table != index) + Left( + ElasticError( operation = Some("insertByQuery"), statusCode = Some(400), - index = Some(index) + index = Some(index), + message = s"SQL table '${insert.table}' does not match index '$index'" ) - ) - .toTry + ).toTry + else + insert.validate() match { + case Left(msg) => + Left( + ElasticError( + operation = Some("insertByQuery"), + statusCode = Some(400), + index = Some(index), + message = msg + ) + ).toTry + case Right(_) => Right(()).toTry + } + } + out <- runInsertByQuery(index, insert, refresh) + } yield out + insertResultFinalize(index, result) + } + + private def validateInsertIndex(index: String): Either[ElasticError, Unit] = + validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("insertByQuery"), + statusCode = Some(400), + index = Some(index) + ) ) - // 2. Parse SQL INSERT - parsed <- parseInsertQuery(index, query).toFuture + private def insertResultFinalize( + index: String, + result: Future[BulkResult] + )(implicit ec: ExecutionContext): Future[ElasticResult[DmlResult]] = + result + .map(r => ElasticSuccess(DmlResult(inserted = r.successCount, rejected = r.failedCount))) + .recover { + case e: ElasticError => + ElasticFailure( + e.copy( + operation = Some("insertByQuery"), + index = Some(index) + ) + ) + case e => + ElasticFailure( + ElasticError( + message = e.getMessage, + operation = Some("insertByQuery"), + index = Some(index) + ) + ) + } + /** Shared post-parse path for both [[insertByQuery]] overloads. */ + private def runInsertByQuery( + index: String, + parsed: Insert, + refresh: Boolean + )(implicit system: ActorSystem): Future[BulkResult] = { + implicit val ec: ExecutionContext = system.dispatcher + for { // 3. Load index metadata idx <- Future.fromTry(getIndex(index).toEither.flatMap { case Some(i) => Right(i.schema) @@ -1175,26 +1360,6 @@ trait IndicesApi extends ElasticClientHelpers { )(BulkOptions(defaultIndex = index, disableRefresh = !refresh), system) } yield bulkResult - - result - .map(r => ElasticSuccess(DmlResult(inserted = r.successCount, rejected = r.failedCount))) - .recover { - case e: ElasticError => - ElasticFailure( - e.copy( - operation = Some("insertByQuery"), - index = Some(index) - ) - ) - case e => - ElasticFailure( - ElasticError( - message = e.getMessage, - operation = Some("insertByQuery"), - index = Some(index) - ) - ) - } } /** Copy documents from files into an index. diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index a2d6cb23..34b12e3a 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -34,7 +34,14 @@ import app.softnetwork.elastic.client.scroll._ import app.softnetwork.elastic.schema.{Index, IndexMappings} import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask} import app.softnetwork.elastic.sql.{query, schema} -import app.softnetwork.elastic.sql.query.{SQLAggregation, SearchStatement, SelectStatement} +import app.softnetwork.elastic.sql.query.{ + Delete, + Insert, + SQLAggregation, + SearchStatement, + SelectStatement, + Update +} import app.softnetwork.elastic.sql.schema.{Schema, TableAlias} import app.softnetwork.elastic.sql.transform.{ TransformConfig, @@ -199,6 +206,11 @@ class MetricsElasticClient( delegate.deleteByQuery(index, query, refresh) } + override def deleteByQuery(index: String, delete: Delete, refresh: Boolean): ElasticResult[Long] = + measureResult("deleteByQuery", Some(index)) { + delegate.deleteByQuery(index, delete, refresh) + } + override def isIndexClosed(index: String): ElasticResult[Boolean] = measureResult("isIndexClosed", Some(index)) { delegate.isIndexClosed(index) @@ -227,6 +239,16 @@ class MetricsElasticClient( delegate.updateByQuery(index, query, pipelineId, refresh) } + override def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = + measureResult("updateByQuery", Some(index)) { + delegate.updateByQuery(index, update, pipelineId, refresh) + } + /** Insert documents by query into an index. * * @param index @@ -246,6 +268,14 @@ class MetricsElasticClient( }(system.dispatcher) } + override def insertByQuery(index: String, insert: Insert, refresh: Boolean)(implicit + system: ActorSystem + ): Future[ElasticResult[DmlResult]] = { + measureAsync("insertByQuery", Some(index)) { + delegate.insertByQuery(index, insert, refresh) + }(system.dispatcher) + } + /** Load the schema for the provided index. * * @param index diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala index c03ead37..9c4e1341 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala @@ -85,8 +85,12 @@ object LicenseRefreshStrategyFactory extends LazyLogging { } } - /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. */ - private def resolveMode(config: Config): Option[LicenseMode] = { + /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. + * + * `private[licensing]` so cross-repo licensing-mode tests can verify the resolution rule + * directly without going through SPI resolution (Story 10.3 Review patch D2). + */ + private[licensing] def resolveMode(config: Config): Option[LicenseMode] = { val licenseConfig = LicenseConfig.load(config) if (licenseConfig.refreshEnabled) Some(LicenseMode.LongRunning) else Some(LicenseMode.Driver) diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala new file mode 100644 index 00000000..a92358d8 --- /dev/null +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2026 SOFTNETWORK + * + * Licensed under the Elastic License 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.elastic.co/licensing/elastic-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.licensing + +import com.typesafe.config.ConfigFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +/** Verifies the mode-resolution contract that the JDBC driver (Story 10.3) relies on for AC2: + * overlay `softclient4es.license.refresh-enabled = false` onto the merged HOCON config and + * [[LicenseRefreshStrategyFactory.resolveMode]] returns `Some(LicenseMode.Driver)`. + * + * The companion integration test in `JdbcIntegrationSpec` cannot verify this end-to-end — the JDBC + * test classpath registers a priority-1 `TestLicenseManagerSpi` that ignores the `mode` parameter, + * so a strategy-class assertion there is vacuous. `resolveMode` is pure on the config (no SPI + * involvement), so a direct call here is the authoritative check. + */ +class LicenseRefreshStrategyFactoryDriverModeSpec extends AnyFlatSpec with Matchers { + + behavior of "LicenseRefreshStrategyFactory.resolveMode" + + it should "resolve LicenseMode.Driver when softclient4es.license.refresh.enabled = false" in { + // Same HOCON merge that ElasticConnection.licenseStrategy / ConnectionUrl.toConfig + // build: an inline overlay with refresh.enabled=false on top of ConfigFactory.load + // (reference.conf defaults). The HOCON path is `softclient4es.license.refresh.enabled` + // (with a dot between refresh and enabled) — that is what `LicenseConfig.load` reads + // via `license.getBoolean("refresh.enabled")`. + val cfg = ConfigFactory + .parseString("softclient4es.license.refresh.enabled = false") + .withFallback(ConfigFactory.load()) + .resolve() + + LicenseRefreshStrategyFactory.resolveMode(cfg) shouldBe Some(LicenseMode.Driver) + } + + it should "resolve LicenseMode.LongRunning when softclient4es.license.refresh.enabled = true" in { + // Symmetric verification: the same factory hook used by sidecar / federation startup + // (which leaves refresh.enabled at its true default) must NOT be silently downgraded + // to Driver mode if someone refactors resolveMode. + val cfg = ConfigFactory + .parseString("softclient4es.license.refresh.enabled = true") + .withFallback(ConfigFactory.load()) + .resolve() + + LicenseRefreshStrategyFactory.resolveMode(cfg) shouldBe Some(LicenseMode.LongRunning) + } +} diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala index 72519b59..1434d4b5 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala @@ -495,10 +495,16 @@ package object query { case class Update(table: String, values: ListMap[String, PainlessScript], where: Option[Where]) extends DmlStatement { + // Uses `value.sql` (not `value.value`) for the literal branch so the rendered string is + // re-parseable into the same AST. `value.value` loses the syntactic shape of string + // literals (`'USA'` becomes `USA`), and any downstream consumer that re-parses this output + // — e.g. GatewayApi.run → api.updateByQuery(table, update.sql) before it was changed to + // accept the AST directly — would mis-parse the bare token as an Identifier and silently + // null the column via a broken painless script. override def sql: String = s"UPDATE $table SET ${values .map { case (k, v) => v match { - case value: Value[_] => s"$k = ${value.value}" + case value: Value[_] => s"$k = ${value.sql}" case painlessScript => s"$k = ${painlessScript.sql}" } } diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala index 10d9efc0..9733bc3a 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala @@ -2844,6 +2844,25 @@ class ParserSpec extends AnyFlatSpec with Matchers { } } + it should "round-trip Update.sql with string literals so re-parsing yields the same AST" in { + // Regression for #92: Update.sql previously rendered string literals as bare tokens + // (`country = USA` instead of `country = 'USA'`), which made re-parsing turn the literal + // into an Identifier. Downstream paths that re-parse the emitted SQL (e.g. updateByQuery) + // would then build a broken painless script that silently nulled the column. + val sql = "UPDATE customers SET country = 'USA' WHERE id = 1" + val first = Parser(sql).toOption.get.asInstanceOf[Update] + first.values("country").asInstanceOf[Value[_]].value shouldBe "USA" + val reEmitted = first.sql + reEmitted should include("country = 'USA'") + val second = Parser(reEmitted).toOption.get.asInstanceOf[Update] + second.values("country").asInstanceOf[Value[_]].value shouldBe "USA" + // Pipeline must take the SET branch (not SCRIPT) for string literals, otherwise the + // generated painless source references ctx. instead of assigning the literal. + val pipeline = second.customPipeline + pipeline.processors should have size 1 + pipeline.processors.head shouldBe a[SetProcessor] + } + it should "parse UPDATE with scripts" in { val sql = "UPDATE products SET price = price * 1.1, updated_at = CURRENT_TIMESTAMP WHERE category = 'Electronics'" diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala index 39e2dab8..e8924a6c 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala @@ -963,12 +963,12 @@ trait GatewayApiIntegrationSpec extends GatewayIntegrationTestKit { val sql = """SELECT | o.id, - | items.product, - | items.quantity, - | SUM(items.price * items.quantity) OVER (PARTITION BY o.id) AS total_price + | oi.product, + | oi.quantity, + | SUM(oi.price * oi.quantity) OVER (PARTITION BY o.id) AS total_price |FROM dql_orders o - |JOIN UNNEST(o.items) AS items - |WHERE items.quantity >= 1 + |JOIN UNNEST(o.items) AS oi + |WHERE oi.quantity >= 1 |ORDER BY o.id ASC;""".stripMargin val res = client.run(sql).futureValue @@ -977,22 +977,22 @@ trait GatewayApiIntegrationSpec extends GatewayIntegrationTestKit { res, Seq( Map( - "id" -> 1, - "items.product" -> "A", - "items.quantity" -> 2, - "total_price" -> 40.0 + "id" -> 1, + "oi.product" -> "A", + "oi.quantity" -> 2, + "total_price" -> 40.0 ), Map( - "id" -> 1, - "items.product" -> "B", - "items.quantity" -> 1, - "total_price" -> 40.0 + "id" -> 1, + "oi.product" -> "B", + "oi.quantity" -> 1, + "total_price" -> 40.0 ), Map( - "id" -> 2, - "items.product" -> "C", - "items.quantity" -> 3, - "total_price" -> 15.0 + "id" -> 2, + "oi.product" -> "C", + "oi.quantity" -> 3, + "total_price" -> 15.0 ) ) )