From c5a6772e66d7ffdb1e582763b8d36c3886f145c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sat, 30 May 2026 13:53:13 +0200 Subject: [PATCH 1/4] fix(sql,core): preserve string literals through Update AST round-trip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes that together close issue #92 — UPDATE with a string-literal RHS silently overwrote the target column with null. 1. Update.sql (sql module) Render literals via `value.sql` instead of `value.value`. For StringValue the two differ: `.value` returns the bare scala string `USA`, while `.sql` returns the quoted form `'USA'`. For numerics and booleans the two coincide, so this change is identity for non-string literals. The bug surfaced because GatewayApi.run(update: Update) previously did `api.updateByQuery(update.table, update.sql)` — re-emitting the AST as a string and re-parsing it. The unquoted re-emission turned the bare token `USA` into an Identifier on the second parse, which routed `Update.customPipeline` through `ScriptProcessor.fromScript` and produced `def param1 = ctx.USA; ctx.country = param1`. With `ignore_failure = true` the field-miss was swallowed and the column was set to null. A new regression test in ParserSpec exercises the full parse → Update.sql → re-parse cycle and asserts the resulting customPipeline produces a SetProcessor (not a ScriptProcessor). 2. IndicesApi.updateByQuery (core module) Add an overload that accepts an already-parsed Update AST and skips the SQL parser entirely. The string-based overload is retained for callers submitting raw JSON. Both overloads share a private `runUpdateByQuery` helper for the post-parse path (pipeline extraction → WHERE → JSON → execute), so behaviour is identical regardless of entry point. ElasticClientDelegator and MetricsElasticClient forward the new overload. 3. GatewayApi.run (core module) Dispatch SQL UPDATE statements to the AST overload so the parse runs exactly once. This is the structural fix that closes the entire "AST → re-emit → re-parse" failure mode — even a future literal-rendering regression would no longer silently corrupt data through this path. Side effect of pulling the JSON content out of `parseQueryForUpdate`'s Right branch inside the shared helper: a latent bug where SELECT queries pre-converted to JSON were being discarded in favour of the raw input string is also fixed. Closes #92. --- .../client/ElasticClientDelegator.scala | 11 ++- .../elastic/client/GatewayApi.scala | 4 +- .../elastic/client/IndicesApi.scala | 85 ++++++++++++++++--- .../client/metrics/MetricsElasticClient.scala | 12 ++- .../elastic/sql/query/package.scala | 8 +- .../elastic/sql/parser/ParserSpec.scala | 19 +++++ .../client/GatewayApiIntegrationSpec.scala | 34 ++++---- 7 files changed, 141 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala index 2c7b299b..3da6cd04 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala @@ -29,7 +29,8 @@ import app.softnetwork.elastic.sql.query.{ SQLAggregation, SearchStatement, SelectStatement, - SingleSearch + SingleSearch, + Update } import app.softnetwork.elastic.sql.schema.{Schema, TableAlias} import app.softnetwork.elastic.sql.transform.{ @@ -236,6 +237,14 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { ): ElasticResult[Long] = delegate.updateByQuery(index, query, pipelineId, refresh) + override def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = + delegate.updateByQuery(index, update, pipelineId, refresh) + /** Insert documents by query into an index. * * @param index diff --git a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala index dd7e4586..0badaaf8 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala @@ -207,7 +207,9 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement ) } case update: Update => - api.updateByQuery(update.table, update.sql) match { + // Pass the AST directly — avoids the AST → Update.sql → re-parse round-trip that + // historically dropped the syntactic shape of string literals (issue #92). + api.updateByQuery(update.table, update, None, refresh = true) match { case ElasticSuccess(count) => logger.info(s"✅ Updated $count documents in ${update.table}.") Future.successful(ElasticResult.success(DmlResult(updated = count))) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala index a5aa2afc..7a51c363 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala @@ -850,14 +850,81 @@ trait IndicesApi extends ElasticClientHelpers { // 2. Parse SQL or JSON parsed <- parseQueryForUpdate(index, query).toEither + count <- runUpdateByQuery(index, parsed, pipelineId, refresh) + } yield count + + result match { + case Right(count) => ElasticSuccess(count) + case Left(err) => ElasticFailure(err) + } + } + + /** Update documents by query using an already-parsed [[Update]] AST. + * + * Skips the SQL parser entirely — used by [[GatewayApi.run]] when the dispatcher has already + * parsed the SQL once. Avoids the round-trip AST → `Update.sql` → re-parse that historically + * lost the syntactic shape of string literals (see issue #92). + */ + def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = { + val result = for { + _ <- validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("updateByQuery"), + statusCode = Some(400), + index = Some(index) + ) + ) + + _ <- + if (update.table != index) + Left( + sqlErrorFor( + operation = "updateByQuery", + index = index, + message = s"SQL query index '${update.table}' does not match provided index '$index'" + ) + ) + else Right(()) + + count <- runUpdateByQuery(index, Left(update), pipelineId, refresh) + } yield count + + result match { + case Right(count) => ElasticSuccess(count) + case Left(err) => ElasticFailure(err) + } + } + + /** Shared execution path for both [[updateByQuery]] overloads. Runs everything from the + * post-parse step (pipeline extraction → WHERE → JSON conversion → execute). + * + * @param parsed + * Left(Update) for SQL update statements; Right(jsonQuery) for JSON passthrough or for a + * SELECT that [[parseQueryForUpdate]] has already converted into JSON. + */ + private def runUpdateByQuery( + index: String, + parsed: Either[Update, String], + pipelineId: Option[String], + refresh: Boolean + ): Either[ElasticError, Long] = { + for { // 3. Extract SQL pipeline (optional) - sqlPipeline = parsed match { + sqlPipeline <- Right(parsed match { case Left(u: Update) if u.values.nonEmpty => Some(u.customPipeline) case _ => None - } + }) - // 4. Extract SQL WHERE → JSON query - jsonQuery = parsed match { + // 4. Build the JSON query to execute + jsonQuery <- Right(parsed match { case Left(u: Update) => u.where match { case None => @@ -878,8 +945,9 @@ trait IndicesApi extends ElasticClientHelpers { search } - case _ => query // JSON passthrough - } + case Right(jsonOrConverted) => + jsonOrConverted + }) // 5. Load user pipeline if provided userPipeline <- pipelineId match { @@ -937,11 +1005,6 @@ trait IndicesApi extends ElasticClientHelpers { _ <- restore().toEither } yield updated - - result match { - case Right(count) => ElasticSuccess(count) - case Left(err) => ElasticFailure(err) - } } /** Insert documents by query into an index. diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index a2d6cb23..ca30fdcc 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -34,7 +34,7 @@ import app.softnetwork.elastic.client.scroll._ import app.softnetwork.elastic.schema.{Index, IndexMappings} import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask} import app.softnetwork.elastic.sql.{query, schema} -import app.softnetwork.elastic.sql.query.{SQLAggregation, SearchStatement, SelectStatement} +import app.softnetwork.elastic.sql.query.{SQLAggregation, SearchStatement, SelectStatement, Update} import app.softnetwork.elastic.sql.schema.{Schema, TableAlias} import app.softnetwork.elastic.sql.transform.{ TransformConfig, @@ -227,6 +227,16 @@ class MetricsElasticClient( delegate.updateByQuery(index, query, pipelineId, refresh) } + override def updateByQuery( + index: String, + update: Update, + pipelineId: Option[String], + refresh: Boolean + ): ElasticResult[Long] = + measureResult("updateByQuery", Some(index)) { + delegate.updateByQuery(index, update, pipelineId, refresh) + } + /** Insert documents by query into an index. * * @param index diff --git a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala index 72519b59..1434d4b5 100644 --- a/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala +++ b/sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala @@ -495,10 +495,16 @@ package object query { case class Update(table: String, values: ListMap[String, PainlessScript], where: Option[Where]) extends DmlStatement { + // Uses `value.sql` (not `value.value`) for the literal branch so the rendered string is + // re-parseable into the same AST. `value.value` loses the syntactic shape of string + // literals (`'USA'` becomes `USA`), and any downstream consumer that re-parses this output + // — e.g. GatewayApi.run → api.updateByQuery(table, update.sql) before it was changed to + // accept the AST directly — would mis-parse the bare token as an Identifier and silently + // null the column via a broken painless script. override def sql: String = s"UPDATE $table SET ${values .map { case (k, v) => v match { - case value: Value[_] => s"$k = ${value.value}" + case value: Value[_] => s"$k = ${value.sql}" case painlessScript => s"$k = ${painlessScript.sql}" } } diff --git a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala index 10d9efc0..9733bc3a 100644 --- a/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala +++ b/sql/src/test/scala/app/softnetwork/elastic/sql/parser/ParserSpec.scala @@ -2844,6 +2844,25 @@ class ParserSpec extends AnyFlatSpec with Matchers { } } + it should "round-trip Update.sql with string literals so re-parsing yields the same AST" in { + // Regression for #92: Update.sql previously rendered string literals as bare tokens + // (`country = USA` instead of `country = 'USA'`), which made re-parsing turn the literal + // into an Identifier. Downstream paths that re-parse the emitted SQL (e.g. updateByQuery) + // would then build a broken painless script that silently nulled the column. + val sql = "UPDATE customers SET country = 'USA' WHERE id = 1" + val first = Parser(sql).toOption.get.asInstanceOf[Update] + first.values("country").asInstanceOf[Value[_]].value shouldBe "USA" + val reEmitted = first.sql + reEmitted should include("country = 'USA'") + val second = Parser(reEmitted).toOption.get.asInstanceOf[Update] + second.values("country").asInstanceOf[Value[_]].value shouldBe "USA" + // Pipeline must take the SET branch (not SCRIPT) for string literals, otherwise the + // generated painless source references ctx. instead of assigning the literal. + val pipeline = second.customPipeline + pipeline.processors should have size 1 + pipeline.processors.head shouldBe a[SetProcessor] + } + it should "parse UPDATE with scripts" in { val sql = "UPDATE products SET price = price * 1.1, updated_at = CURRENT_TIMESTAMP WHERE category = 'Electronics'" diff --git a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala index 39e2dab8..e8924a6c 100644 --- a/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala +++ b/testkit/src/main/scala/app/softnetwork/elastic/client/GatewayApiIntegrationSpec.scala @@ -963,12 +963,12 @@ trait GatewayApiIntegrationSpec extends GatewayIntegrationTestKit { val sql = """SELECT | o.id, - | items.product, - | items.quantity, - | SUM(items.price * items.quantity) OVER (PARTITION BY o.id) AS total_price + | oi.product, + | oi.quantity, + | SUM(oi.price * oi.quantity) OVER (PARTITION BY o.id) AS total_price |FROM dql_orders o - |JOIN UNNEST(o.items) AS items - |WHERE items.quantity >= 1 + |JOIN UNNEST(o.items) AS oi + |WHERE oi.quantity >= 1 |ORDER BY o.id ASC;""".stripMargin val res = client.run(sql).futureValue @@ -977,22 +977,22 @@ trait GatewayApiIntegrationSpec extends GatewayIntegrationTestKit { res, Seq( Map( - "id" -> 1, - "items.product" -> "A", - "items.quantity" -> 2, - "total_price" -> 40.0 + "id" -> 1, + "oi.product" -> "A", + "oi.quantity" -> 2, + "total_price" -> 40.0 ), Map( - "id" -> 1, - "items.product" -> "B", - "items.quantity" -> 1, - "total_price" -> 40.0 + "id" -> 1, + "oi.product" -> "B", + "oi.quantity" -> 1, + "total_price" -> 40.0 ), Map( - "id" -> 2, - "items.product" -> "C", - "items.quantity" -> 3, - "total_price" -> 15.0 + "id" -> 2, + "oi.product" -> "C", + "oi.quantity" -> 3, + "total_price" -> 15.0 ) ) ) From 86ae09b808c3c7909a2745615afa0db6af12bdf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sat, 30 May 2026 15:48:44 +0200 Subject: [PATCH 2/4] refactor(core): extend no-re-parse path to deleteByQuery and insertByQuery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Continuation of the issue #92 fix. The same audit applied to DELETE and INSERT — neither has the silent-data-corruption bug UPDATE had (Where.sql and Insert.sql round-trip safely through the parser), but both paid the same double-parse cost: GatewayApi.run was emitting `delete.sql` / `insert.sql` and IndicesApi was re-parsing the result. Mirror the updateByQuery refactor: - `IndicesApi` - Add `deleteByQuery(index, delete: Delete, refresh)` overload that skips parseSqlQueryForDeletion. Shared `runDeleteByQuery` helper holds the post-parse path (exists-check → openIfNeeded → execute → restore). Small `validateDeleteIndex` and `finalizeDeleteByQuery` helpers keep both entry points readable. - Add `insertByQuery(index, insert: Insert, refresh)` overload that skips parseInsertQuery. Shared `runInsertByQuery` helper holds the post-parse path (load index metadata → validate ON CONFLICT → validate AS SELECT columns → derive bulk options → build source → bulk insert). `validateInsertIndex` and `insertResultFinalize` factor out the index check and the recover-to-ElasticFailure plumbing. - `GatewayApi.run` - `case delete: Delete` and `case insert: Insert` now call the AST overloads directly. Each SQL DML statement is parsed exactly once on its way through the dispatcher. The other `insertByQuery` call site (INSERT … AS SELECT constructed from scratch for index population) keeps the string overload — it isn't a round-trip. - `ElasticClientDelegator` and `MetricsElasticClient` - Forward both new overloads (Delete / Insert imports added). `BulkResult` is now imported explicitly in IndicesApi so the `runInsertByQuery` helper can annotate its `Future[BulkResult]` return type without leaking it through the existing public API. No behaviour change for callers — the string overloads still exist and do the exact same work — the AST overloads just skip one parse. core compiles clean; ParserSpec UPDATE tests (including the round-trip regression from the previous commit) all pass. --- .../client/ElasticClientDelegator.scala | 10 + .../elastic/client/GatewayApi.scala | 6 +- .../elastic/client/IndicesApi.scala | 226 +++++++++++++----- .../client/metrics/MetricsElasticClient.scala | 22 +- 4 files changed, 199 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala index 3da6cd04..f44e828f 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala @@ -26,6 +26,8 @@ import app.softnetwork.elastic.schema.{Index, IndexMappings} import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask} import app.softnetwork.elastic.sql.{query, schema, PainlessContextType} import app.softnetwork.elastic.sql.query.{ + Delete, + Insert, SQLAggregation, SearchStatement, SelectStatement, @@ -213,6 +215,9 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { override def deleteByQuery(index: String, query: String, refresh: Boolean): ElasticResult[Long] = delegate.deleteByQuery(index, query, refresh) + override def deleteByQuery(index: String, delete: Delete, refresh: Boolean): ElasticResult[Long] = + delegate.deleteByQuery(index, delete, refresh) + override def isIndexClosed(index: String): ElasticResult[Boolean] = delegate.isIndexClosed(index) @@ -261,6 +266,11 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { ): Future[ElasticResult[DmlResult]] = delegate.insertByQuery(index, query, refresh) + override def insertByQuery(index: String, insert: Insert, refresh: Boolean)(implicit + system: ActorSystem + ): Future[ElasticResult[DmlResult]] = + delegate.insertByQuery(index, insert, refresh) + /** Load the schema for the provided index. * * @param index diff --git a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala index 0badaaf8..6f2059b2 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/GatewayApi.scala @@ -195,7 +195,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement implicit val ec: ExecutionContext = system.dispatcher statement match { case delete: Delete => - api.deleteByQuery(delete.table.name, delete.sql) match { + // Pass the AST directly — avoids the AST → Delete.sql → re-parse round-trip. + api.deleteByQuery(delete.table.name, delete, refresh = true) match { case ElasticSuccess(count) => logger.info(s"✅ Deleted $count documents from ${delete.table.name}.") Future.successful(ElasticResult.success(DmlResult(deleted = count))) @@ -221,7 +222,8 @@ class DmlExecutor(api: IndicesApi, logger: Logger) extends Executor[DmlStatement ) } case insert: Insert => - api.insertByQuery(insert.table, insert.sql).map { + // Pass the AST directly — avoids the AST → Insert.sql → re-parse round-trip. + api.insertByQuery(insert.table, insert, refresh = true).map { case success @ ElasticSuccess(res) => logger.info(s"✅ Inserted ${res.inserted} documents into ${insert.table}.") success diff --git a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala index 7a51c363..eee7a33f 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/IndicesApi.scala @@ -18,7 +18,7 @@ package app.softnetwork.elastic.client import akka.actor.ActorSystem import akka.stream.scaladsl.Source -import app.softnetwork.elastic.client.bulk.BulkOptions +import app.softnetwork.elastic.client.bulk.{BulkOptions, BulkResult} import app.softnetwork.elastic.client.result._ import app.softnetwork.elastic.schema.Index import app.softnetwork.elastic.sql.PainlessContextType @@ -756,25 +756,78 @@ trait IndicesApi extends ElasticClientHelpers { query: String, refresh: Boolean = true ): ElasticResult[Long] = { + val result = for { + _ <- validateDeleteIndex(index) + jsonQuery <- parseQueryForDeletion(index, query) + deleted <- runDeleteByQuery(index, jsonQuery, refresh) + } yield deleted + finalizeDeleteByQuery(index, result) + } + /** Delete documents using an already-parsed [[Delete]] AST. Skips the SQL parser entirely. + * + * Same motivation as the [[updateByQuery]] AST overload: avoid the AST → `Delete.sql` → re-parse + * round-trip when [[GatewayApi.run]] already has the AST. + */ + def deleteByQuery( + index: String, + delete: Delete, + refresh: Boolean + ): ElasticResult[Long] = { val result = for { - // 1. Validate index name - _ <- validateIndexName(index) - .toLeft(()) - .left - .map(err => - err.copy( - operation = Some("deleteByQuery"), - statusCode = Some(400), - index = Some(index), - message = s"Invalid index: ${err.message}" + _ <- validateDeleteIndex(index) + _ <- + if (delete.table.name != index) + Left( + sqlErrorFor( + operation = "deleteByQuery", + index = index, + message = + s"SQL query index '${delete.table.name}' does not match provided index '$index'" + ) ) - ) + else Right(()) + jsonQuery = delete.where match { + case None => + logger.info( + s"SQL delete query has no WHERE clause, deleting all documents from index '$index'" + ) + """{"query": {"match_all": {}}}""" + case Some(where) => + implicit val timestamp: Long = System.currentTimeMillis() + val search: String = + SingleSearch( + from = From(tables = Seq(delete.table)), + where = Some(where), + deleteByQuery = true + ) + logger.info(s"✅ Converted SQL delete query to search for deleteByQuery: $search") + search + } + deleted <- runDeleteByQuery(index, jsonQuery, refresh) + } yield deleted + finalizeDeleteByQuery(index, result) + } - // 2. Parse query (SQL or JSON) - jsonQuery <- parseQueryForDeletion(index, query) + private def validateDeleteIndex(index: String): Either[ElasticError, Unit] = + validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("deleteByQuery"), + statusCode = Some(400), + index = Some(index), + message = s"Invalid index: ${err.message}" + ) + ) - // 3. Check index exists + private def runDeleteByQuery( + index: String, + jsonQuery: String, + refresh: Boolean + ): Either[ElasticError, Long] = + for { _ <- indexExists(index, pattern = false) match { case ElasticSuccess(true) => Right(()) case ElasticSuccess(false) => @@ -788,30 +841,25 @@ trait IndicesApi extends ElasticClientHelpers { ) case ElasticFailure(err) => Left(err) } - - // 4. Open index if needed tuple <- openIfNeeded(index) (_, restore) = tuple - - // 5. Execute delete-by-query deleted <- executeDeleteByQuery(index, jsonQuery, refresh).toEither - - // 6. Restore state _ <- restore().toEither.left.map { restoreErr => logger.warn(s"❌ Failed to restore index state for '$index': ${restoreErr.message}") restoreErr } - } yield deleted - result match { - case Right(count) => - logger.info(s"✅ Deleted $count documents from index '$index'") - ElasticSuccess(count) - case Left(err) => - logger.error(s"❌ Failed to delete by query on index '$index': ${err.message}") - ElasticFailure(err) - } + private def finalizeDeleteByQuery( + index: String, + result: Either[ElasticError, Long] + ): ElasticResult[Long] = result match { + case Right(count) => + logger.info(s"✅ Deleted $count documents from index '$index'") + ElasticSuccess(count) + case Left(err) => + logger.error(s"❌ Failed to delete by query on index '$index': ${err.message}") + ElasticFailure(err) } /** Update documents by query from an index. @@ -1023,26 +1071,100 @@ trait IndicesApi extends ElasticClientHelpers { refresh: Boolean = true )(implicit system: ActorSystem): Future[ElasticResult[DmlResult]] = { implicit val ec: ExecutionContext = system.dispatcher + val result = for { + _ <- Future.fromTry(validateInsertIndex(index).toTry) + parsed <- parseInsertQuery(index, query).toFuture + out <- runInsertByQuery(index, parsed, refresh) + } yield out + insertResultFinalize(index, result) + } + /** Insert documents using an already-parsed [[Insert]] AST. Skips the SQL parser entirely. + * + * Same motivation as the [[updateByQuery]] AST overload — avoid the AST → `Insert.sql` → + * re-parse round-trip when [[GatewayApi.run]] already has the AST. + */ + def insertByQuery( + index: String, + insert: Insert, + refresh: Boolean + )(implicit system: ActorSystem): Future[ElasticResult[DmlResult]] = { + implicit val ec: ExecutionContext = system.dispatcher val result = for { - // 1. Validate index - _ <- Future.fromTry( - validateIndexName(index) - .toLeft(()) - .left - .map(err => - err.copy( + _ <- Future.fromTry(validateInsertIndex(index).toTry) + _ <- Future.fromTry { + if (insert.table != index) + Left( + ElasticError( operation = Some("insertByQuery"), statusCode = Some(400), - index = Some(index) + index = Some(index), + message = s"SQL table '${insert.table}' does not match index '$index'" ) - ) - .toTry + ).toTry + else + insert.validate() match { + case Left(msg) => + Left( + ElasticError( + operation = Some("insertByQuery"), + statusCode = Some(400), + index = Some(index), + message = msg + ) + ).toTry + case Right(_) => Right(()).toTry + } + } + out <- runInsertByQuery(index, insert, refresh) + } yield out + insertResultFinalize(index, result) + } + + private def validateInsertIndex(index: String): Either[ElasticError, Unit] = + validateIndexName(index) + .toLeft(()) + .left + .map(err => + err.copy( + operation = Some("insertByQuery"), + statusCode = Some(400), + index = Some(index) + ) ) - // 2. Parse SQL INSERT - parsed <- parseInsertQuery(index, query).toFuture + private def insertResultFinalize( + index: String, + result: Future[BulkResult] + )(implicit ec: ExecutionContext): Future[ElasticResult[DmlResult]] = + result + .map(r => ElasticSuccess(DmlResult(inserted = r.successCount, rejected = r.failedCount))) + .recover { + case e: ElasticError => + ElasticFailure( + e.copy( + operation = Some("insertByQuery"), + index = Some(index) + ) + ) + case e => + ElasticFailure( + ElasticError( + message = e.getMessage, + operation = Some("insertByQuery"), + index = Some(index) + ) + ) + } + /** Shared post-parse path for both [[insertByQuery]] overloads. */ + private def runInsertByQuery( + index: String, + parsed: Insert, + refresh: Boolean + )(implicit system: ActorSystem): Future[BulkResult] = { + implicit val ec: ExecutionContext = system.dispatcher + for { // 3. Load index metadata idx <- Future.fromTry(getIndex(index).toEither.flatMap { case Some(i) => Right(i.schema) @@ -1238,26 +1360,6 @@ trait IndicesApi extends ElasticClientHelpers { )(BulkOptions(defaultIndex = index, disableRefresh = !refresh), system) } yield bulkResult - - result - .map(r => ElasticSuccess(DmlResult(inserted = r.successCount, rejected = r.failedCount))) - .recover { - case e: ElasticError => - ElasticFailure( - e.copy( - operation = Some("insertByQuery"), - index = Some(index) - ) - ) - case e => - ElasticFailure( - ElasticError( - message = e.getMessage, - operation = Some("insertByQuery"), - index = Some(index) - ) - ) - } } /** Copy documents from files into an index. diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index ca30fdcc..34b12e3a 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -34,7 +34,14 @@ import app.softnetwork.elastic.client.scroll._ import app.softnetwork.elastic.schema.{Index, IndexMappings} import app.softnetwork.elastic.sql.policy.{EnrichPolicy, EnrichPolicyTask} import app.softnetwork.elastic.sql.{query, schema} -import app.softnetwork.elastic.sql.query.{SQLAggregation, SearchStatement, SelectStatement, Update} +import app.softnetwork.elastic.sql.query.{ + Delete, + Insert, + SQLAggregation, + SearchStatement, + SelectStatement, + Update +} import app.softnetwork.elastic.sql.schema.{Schema, TableAlias} import app.softnetwork.elastic.sql.transform.{ TransformConfig, @@ -199,6 +206,11 @@ class MetricsElasticClient( delegate.deleteByQuery(index, query, refresh) } + override def deleteByQuery(index: String, delete: Delete, refresh: Boolean): ElasticResult[Long] = + measureResult("deleteByQuery", Some(index)) { + delegate.deleteByQuery(index, delete, refresh) + } + override def isIndexClosed(index: String): ElasticResult[Boolean] = measureResult("isIndexClosed", Some(index)) { delegate.isIndexClosed(index) @@ -256,6 +268,14 @@ class MetricsElasticClient( }(system.dispatcher) } + override def insertByQuery(index: String, insert: Insert, refresh: Boolean)(implicit + system: ActorSystem + ): Future[ElasticResult[DmlResult]] = { + measureAsync("insertByQuery", Some(index)) { + delegate.insertByQuery(index, insert, refresh) + }(system.dispatcher) + } + /** Load the schema for the provided index. * * @param index From 193298392b926d3e49ad0ad17b952c8286519966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sat, 30 May 2026 16:28:29 +0200 Subject: [PATCH 3/4] test(licensing): expose resolveMode for cross-repo Driver-mode verification Narrow `LicenseRefreshStrategyFactory.resolveMode` from `private` to `private[licensing]` so the licensing module can verify the resolution contract directly, without going through SPI resolution. Add `LicenseRefreshStrategyFactoryDriverModeSpec` covering both branches: `refresh.enabled = false` -> Driver, `refresh.enabled = true` -> LongRunning. Companion to softclient4es-jdbc Story 10.3 (Review patch D2) - JDBC's own integration test cannot verify mode resolution end-to-end because its priority-1 TestLicenseManagerSpi ignores the mode parameter. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../LicenseRefreshStrategyFactory.scala | 8 ++- ...RefreshStrategyFactoryDriverModeSpec.scala | 61 +++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala index c03ead37..9c4e1341 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactory.scala @@ -85,8 +85,12 @@ object LicenseRefreshStrategyFactory extends LazyLogging { } } - /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. */ - private def resolveMode(config: Config): Option[LicenseMode] = { + /** Resolve LicenseMode from config. refreshEnabled=true -> LongRunning, else -> Driver. + * + * `private[licensing]` so cross-repo licensing-mode tests can verify the resolution rule + * directly without going through SPI resolution (Story 10.3 Review patch D2). + */ + private[licensing] def resolveMode(config: Config): Option[LicenseMode] = { val licenseConfig = LicenseConfig.load(config) if (licenseConfig.refreshEnabled) Some(LicenseMode.LongRunning) else Some(LicenseMode.Driver) diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala new file mode 100644 index 00000000..c0816455 --- /dev/null +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2026 SOFTNETWORK + * + * Licensed under the Elastic License 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.elastic.co/licensing/elastic-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.softnetwork.elastic.licensing + +import com.typesafe.config.ConfigFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +/** Verifies the mode-resolution contract that the JDBC driver (Story 10.3) relies on for AC2: + * overlay `softclient4es.license.refresh-enabled = false` onto the merged HOCON config and + * [[LicenseRefreshStrategyFactory.resolveMode]] returns `Some(LicenseMode.Driver)`. + * + * The companion integration test in `JdbcIntegrationSpec` cannot verify this end-to-end — + * the JDBC test classpath registers a priority-1 `TestLicenseManagerSpi` that ignores the + * `mode` parameter, so a strategy-class assertion there is vacuous. `resolveMode` is pure + * on the config (no SPI involvement), so a direct call here is the authoritative check. + */ +class LicenseRefreshStrategyFactoryDriverModeSpec extends AnyFlatSpec with Matchers { + + behavior of "LicenseRefreshStrategyFactory.resolveMode" + + it should "resolve LicenseMode.Driver when softclient4es.license.refresh.enabled = false" in { + // Same HOCON merge that ElasticConnection.licenseStrategy / ConnectionUrl.toConfig + // build: an inline overlay with refresh.enabled=false on top of ConfigFactory.load + // (reference.conf defaults). The HOCON path is `softclient4es.license.refresh.enabled` + // (with a dot between refresh and enabled) — that is what `LicenseConfig.load` reads + // via `license.getBoolean("refresh.enabled")`. + val cfg = ConfigFactory + .parseString("softclient4es.license.refresh.enabled = false") + .withFallback(ConfigFactory.load()) + .resolve() + + LicenseRefreshStrategyFactory.resolveMode(cfg) shouldBe Some(LicenseMode.Driver) + } + + it should "resolve LicenseMode.LongRunning when softclient4es.license.refresh.enabled = true" in { + // Symmetric verification: the same factory hook used by sidecar / federation startup + // (which leaves refresh.enabled at its true default) must NOT be silently downgraded + // to Driver mode if someone refactors resolveMode. + val cfg = ConfigFactory + .parseString("softclient4es.license.refresh.enabled = true") + .withFallback(ConfigFactory.load()) + .resolve() + + LicenseRefreshStrategyFactory.resolveMode(cfg) shouldBe Some(LicenseMode.LongRunning) + } +} From 56a0bd73dfbfab3080257d56d8178be6852d6704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Sun, 31 May 2026 07:10:33 +0200 Subject: [PATCH 4/4] fix formatting --- .../LicenseRefreshStrategyFactoryDriverModeSpec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala index c0816455..a92358d8 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/LicenseRefreshStrategyFactoryDriverModeSpec.scala @@ -24,10 +24,10 @@ import org.scalatest.matchers.should.Matchers * overlay `softclient4es.license.refresh-enabled = false` onto the merged HOCON config and * [[LicenseRefreshStrategyFactory.resolveMode]] returns `Some(LicenseMode.Driver)`. * - * The companion integration test in `JdbcIntegrationSpec` cannot verify this end-to-end — - * the JDBC test classpath registers a priority-1 `TestLicenseManagerSpi` that ignores the - * `mode` parameter, so a strategy-class assertion there is vacuous. `resolveMode` is pure - * on the config (no SPI involvement), so a direct call here is the authoritative check. + * The companion integration test in `JdbcIntegrationSpec` cannot verify this end-to-end — the JDBC + * test classpath registers a priority-1 `TestLicenseManagerSpi` that ignores the `mode` parameter, + * so a strategy-class assertion there is vacuous. `resolveMode` is pure on the config (no SPI + * involvement), so a direct call here is the authoritative check. */ class LicenseRefreshStrategyFactoryDriverModeSpec extends AnyFlatSpec with Matchers {