From 8be56bdaca359ba9e770c70c194e9a8dbfd12a3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Mon, 1 Jun 2026 14:02:05 +0200 Subject: [PATCH 1/2] feat(core): add bulkStream + bulkStreamWithResult for direct BulkItem ingest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 1 of the CTAS / INSERT INTO … SELECT-with-JOIN design Adds two public methods to BulkApi: bulkStream(items: Source[BulkItem, NotUsed]) : Source[Either[FailedDocument, SuccessfulDocument], NotUsed] bulkStreamWithResult(items, callbacks): Future[BulkResult] --- .../softnetwork/elastic/client/BulkApi.scala | 111 ++++++++++++++++-- 1 file changed, 104 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala b/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala index 6973c95e..31d75b42 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala @@ -335,13 +335,8 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers { system: ActorSystem ): Source[Either[FailedDocument, SuccessfulDocument], NotUsed] = { - implicit val ec: ExecutionContext = system.dispatcher - - var metrics = BulkMetrics() - - items - // ✅ Transformation en BulkItem - .map { item => + bulkStream( + items.map { item => toBulkItem( toDocument, indexKey, @@ -354,6 +349,42 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers { item ) } + ) + } + + /** Streaming bulk pipeline keyed directly on [[BulkItem]]. + * + * Same chunking / backpressure / per-item retry pipeline as [[bulkSource]], but accepts a + * pre-built [[BulkItem]] stream — no `D => String` JSON conversion, no `indexKey`/`idKey` + * parsing back from the document. Use this from callers that already have the document content + * split out (target index, optional `_id` derived from a primary key, action type), e.g. the + * CTAS / INSERT INTO … SELECT-with-JOIN executors built on top of Arrow Flight SQL or JDBC/ADBC. + * + * Arrow / Parquet / DuckDB are deliberately kept out of this signature so the method can stay in + * core. + * + * @param items + * the [[BulkItem]] stream to bulk-write + * @param bulkOptions + * chunking, retry, parallelism settings (reuses the existing [[BulkOptions]]) + * @param system + * Akka actor system for materialization + * @return + * a source emitting `Right(SuccessfulDocument)` / `Left(FailedDocument)` for each item, in + * completion order + */ + def bulkStream( + items: Source[BulkItem, NotUsed] + )(implicit + bulkOptions: BulkOptions, + system: ActorSystem + ): Source[Either[FailedDocument, SuccessfulDocument], NotUsed] = { + + implicit val ec: ExecutionContext = system.dispatcher + + var metrics = BulkMetrics() + + items // ✅ Settings management (refresh, replicas) .via( BulkSettings[BulkItem](bulkOptions.disableRefresh)( @@ -398,6 +429,72 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers { } } + /** Materialized counterpart of [[bulkStream]] — runs the stream to completion and returns a + * [[BulkResult]] aggregating successes, failures, indices touched, and metrics. Mirrors + * [[bulkWithResult]] but skips the `D => String` conversion. Triggers per-index refresh on + * completion unless `bulkOptions.disableRefresh` is set. + * + * @param items + * the [[BulkItem]] stream to bulk-write + * @param callbacks + * per-event callbacks (`onSuccess`, `onFailure`, `onComplete`) + * @param bulkOptions + * chunking, retry, parallelism settings + * @param system + * Akka actor system for materialization + * @return + * future of the aggregated [[BulkResult]] + */ + def bulkStreamWithResult( + items: Source[BulkItem, NotUsed], + callbacks: BulkCallbacks = BulkCallbacks.default + )(implicit + bulkOptions: BulkOptions, + system: ActorSystem + ): Future[BulkResult] = { + + implicit val materializer: Materializer = Materializer(system) + implicit val ec: ExecutionContext = system.dispatcher + + val startTime = System.currentTimeMillis() + var metrics = BulkMetrics(startTime = startTime) + + bulkStream(items) + .runWith(Sink.fold((Set.empty[String], Seq.empty[FailedDocument], Set.empty[String])) { + case ((successIds, failedDocs, indices), Right(successfulDoc)) => + callbacks.onSuccess(successfulDoc.id, successfulDoc.index) + (successIds + successfulDoc.id, failedDocs, indices + successfulDoc.index) + + case ((successIds, failedDocs, indices), Left(failed)) => + callbacks.onFailure(failed) + metrics = metrics.addFailure(failed.error) + (successIds, failedDocs :+ failed, indices + failed.index) + }) + .map { case (successIds, failedDocs, indices) => + metrics = metrics.copy( + endTime = Some(System.currentTimeMillis()), + totalDocuments = successIds.size + failedDocs.size + ) + + val result = BulkResult( + successCount = successIds.size, + successIds = successIds, + failedCount = failedDocs.size, + failedDocuments = failedDocs, + indices = indices, + metrics = metrics.complete + ) + + callbacks.onComplete(result) + + if (!bulkOptions.disableRefresh) { + indices.foreach(refresh) + } + + result + } + } + /** Backward compatible API (old signature). * * @deprecated From b8f94f059929734d8d3d931e1fe0d0250cf3c187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Manciot?= Date: Mon, 1 Jun 2026 21:41:09 +0200 Subject: [PATCH 2/2] fix(core): project bare `.asInstanceOf` in delegator + metrics bulk/search paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ElasticClientDelegator.toBulkElasticAction`, `extractBulkResults`, `actionToBulkItem`, `bulkFlow`, and `MetricsElasticClient.singleSearchAsync` / `multiSearchAsync` cast the result of `delegate.*` through bare `.asInstanceOf` (no type argument). Scala infers `Nothing`, and the synthetic cast at the call site explodes the moment the delegated pipeline is exercised end-to-end (`BulkOperation → scala.runtime.Nothing$` ClassCastException). Latent — there was no real caller of `bulkStream`/`bulkStreamWithResult` through the metrics/monitored delegator chain until the sidecar row-1 INSERT/CTAS executor in softclient4es-arrow added one. Alias `delegate` to a stable local `val d` (the field is `def`, not `val`, so it isn't a stable identifier on its own) and project to the right path-dependent type — `d.BulkActionType`, `d.BulkResultType`, or this trait's own `Future[ElasticResult[ElasticResponse]]`. All abstract A/R types still erase to Object, so the runtime cast is a no-op. --- .../client/ElasticClientDelegator.scala | 38 +++++++++++++++---- .../client/metrics/MetricsElasticClient.scala | 8 +++- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala index f44e828f..2b9e8f8a 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala @@ -1644,8 +1644,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { override private[client] def toBulkAction(bulkItem: BulkItem): BulkActionType = delegate.toBulkAction(bulkItem).asInstanceOf[BulkActionType] - override private[client] implicit def toBulkElasticAction(a: BulkActionType): BulkElasticAction = - delegate.toBulkElasticAction(a.asInstanceOf) + override private[client] implicit def toBulkElasticAction( + a: BulkActionType + ): BulkElasticAction = { + // `a.asInstanceOf` (no type param) infers `Nothing` and the runtime cast blows up the moment + // the delegated `bulkStream`/`bulkStreamWithResult` path is exercised — `delegate`'s + // `BulkActionType` is abstract here, so we must explicitly project to it. (This was a latent + // bug — there was no real caller of `bulkStream` through the delegator chain until the + // sidecar's CTAS / INSERT-SELECT row-1 executor in softclient4es-arrow added one.) + // `delegate` is a `def`, so alias it into a stable `val` first to make the path-dependent + // type `d.BulkActionType` legal. + val d = delegate + d.toBulkElasticAction(a.asInstanceOf[d.BulkActionType]) + } /** Basic flow for executing a bulk action. This method must be implemented by concrete classes * depending on the Elasticsearch version and client used. @@ -1659,7 +1670,13 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { bulkOptions: BulkOptions, system: ActorSystem ): Flow[Seq[BulkActionType], BulkResultType, NotUsed] = - delegate.bulkFlow(bulkOptions, system).asInstanceOf + // Same pattern as `toBulkElasticAction`/`actionToBulkItem` (~line 1648/1697): bare + // `.asInstanceOf` infers `Nothing`, so the synthetic cast at the call site (in + // `BulkApi.balancedBulkFlow`) tries `Flow → Nothing$` and CCEs. Project to this trait's + // own A/R abstract types — both erase to Object, so the runtime cast is a no-op. + delegate + .bulkFlow(bulkOptions, system) + .asInstanceOf[Flow[Seq[BulkActionType], BulkResultType, NotUsed]] /** Convert a BulkResultType into individual results. This method must extract the successes and * failures from the ES response. @@ -1672,12 +1689,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes { override private[client] def extractBulkResults( result: BulkResultType, originalBatch: Seq[BulkItem] - ): Seq[Either[FailedDocument, SuccessfulDocument]] = - delegate.extractBulkResults(result.asInstanceOf, originalBatch) + ): Seq[Either[FailedDocument, SuccessfulDocument]] = { + // Same pattern as `toBulkElasticAction` (see ~line 1648): bare `.asInstanceOf` infers + // `Nothing`, which blows up the moment the delegated bulk pipeline runs end-to-end. Project + // through a stable alias of `delegate` to the right path-dependent type. + val d = delegate + d.extractBulkResults(result.asInstanceOf[d.BulkResultType], originalBatch) + } /** Conversion BulkActionType -> BulkItem */ - override private[client] def actionToBulkItem(action: BulkActionType): BulkItem = - delegate.actionToBulkItem(action.asInstanceOf) + override private[client] def actionToBulkItem(action: BulkActionType): BulkItem = { + val d = delegate + d.actionToBulkItem(action.asInstanceOf[d.BulkActionType]) + } // ==================== PipelineApi (delegate) ==================== diff --git a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala index 34b12e3a..97913391 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala @@ -937,9 +937,12 @@ class MetricsElasticClient( context: ConversionContext ): Future[ElasticResult[ElasticResponse]] = measureAsync("searchAsync", Some(elasticQuery.indices.mkString(","))) { + // Bare `.asInstanceOf` infers `Nothing` — same latent shape as the bulk-pipeline fixes in + // [[ElasticClientDelegator]]. Project to this trait's `ElasticResponse` so the cast is a + // no-op at runtime (both sides erase to Object) instead of `Future → Nothing$`. delegate .singleSearchAsync(elasticQuery, fieldAliases, aggregations, fields, nestedHits) - .asInstanceOf + .asInstanceOf[Future[ElasticResult[ElasticResponse]]] } /** Asynchronous multi-search with Elasticsearch queries. @@ -964,9 +967,10 @@ class MetricsElasticClient( context: ConversionContext ): Future[ElasticResult[ElasticResponse]] = measureAsync("multisearchAsync") { + // Same latent-`Nothing`-cast fix as `singleSearchAsync` above. delegate .multiSearchAsync(elasticQueries, fieldAliases, aggregations, fields, nestedHits) - .asInstanceOf + .asInstanceOf[Future[ElasticResult[ElasticResponse]]] } /** Searches and converts results into typed entities.