Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 104 additions & 7 deletions core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,8 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
system: ActorSystem
): Source[Either[FailedDocument, SuccessfulDocument], NotUsed] = {

implicit val ec: ExecutionContext = system.dispatcher

var metrics = BulkMetrics()

items
// ✅ Transformation en BulkItem
.map { item =>
bulkStream(
items.map { item =>
toBulkItem(
toDocument,
indexKey,
Expand All @@ -354,6 +349,42 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
item
)
}
)
}

/** Streaming bulk pipeline keyed directly on [[BulkItem]].
*
* Same chunking / backpressure / per-item retry pipeline as [[bulkSource]], but accepts a
* pre-built [[BulkItem]] stream — no `D => String` JSON conversion, no `indexKey`/`idKey`
* parsing back from the document. Use this from callers that already have the document content
* split out (target index, optional `_id` derived from a primary key, action type), e.g. the
* CTAS / INSERT INTO … SELECT-with-JOIN executors built on top of Arrow Flight SQL or JDBC/ADBC.
*
* Arrow / Parquet / DuckDB are deliberately kept out of this signature so the method can stay in
* core.
*
* @param items
* the [[BulkItem]] stream to bulk-write
* @param bulkOptions
* chunking, retry, parallelism settings (reuses the existing [[BulkOptions]])
* @param system
* Akka actor system for materialization
* @return
* a source emitting `Right(SuccessfulDocument)` / `Left(FailedDocument)` for each item, in
* completion order
*/
def bulkStream(
items: Source[BulkItem, NotUsed]
)(implicit
bulkOptions: BulkOptions,
system: ActorSystem
): Source[Either[FailedDocument, SuccessfulDocument], NotUsed] = {

implicit val ec: ExecutionContext = system.dispatcher

var metrics = BulkMetrics()

items
// ✅ Settings management (refresh, replicas)
.via(
BulkSettings[BulkItem](bulkOptions.disableRefresh)(
Expand Down Expand Up @@ -398,6 +429,72 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
}
}

/** Materialized counterpart of [[bulkStream]] — runs the stream to completion and returns a
* [[BulkResult]] aggregating successes, failures, indices touched, and metrics. Mirrors
* [[bulkWithResult]] but skips the `D => String` conversion. Triggers per-index refresh on
* completion unless `bulkOptions.disableRefresh` is set.
*
* @param items
* the [[BulkItem]] stream to bulk-write
* @param callbacks
* per-event callbacks (`onSuccess`, `onFailure`, `onComplete`)
* @param bulkOptions
* chunking, retry, parallelism settings
* @param system
* Akka actor system for materialization
* @return
* future of the aggregated [[BulkResult]]
*/
def bulkStreamWithResult(
items: Source[BulkItem, NotUsed],
callbacks: BulkCallbacks = BulkCallbacks.default
)(implicit
bulkOptions: BulkOptions,
system: ActorSystem
): Future[BulkResult] = {

implicit val materializer: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher

val startTime = System.currentTimeMillis()
var metrics = BulkMetrics(startTime = startTime)

bulkStream(items)
.runWith(Sink.fold((Set.empty[String], Seq.empty[FailedDocument], Set.empty[String])) {
case ((successIds, failedDocs, indices), Right(successfulDoc)) =>
callbacks.onSuccess(successfulDoc.id, successfulDoc.index)
(successIds + successfulDoc.id, failedDocs, indices + successfulDoc.index)

case ((successIds, failedDocs, indices), Left(failed)) =>
callbacks.onFailure(failed)
metrics = metrics.addFailure(failed.error)
(successIds, failedDocs :+ failed, indices + failed.index)
})
.map { case (successIds, failedDocs, indices) =>
metrics = metrics.copy(
endTime = Some(System.currentTimeMillis()),
totalDocuments = successIds.size + failedDocs.size
)

val result = BulkResult(
successCount = successIds.size,
successIds = successIds,
failedCount = failedDocs.size,
failedDocuments = failedDocs,
indices = indices,
metrics = metrics.complete
)

callbacks.onComplete(result)

if (!bulkOptions.disableRefresh) {
indices.foreach(refresh)
}

result
}
}

/** Backward compatible API (old signature).
*
* @deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1644,8 +1644,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
override private[client] def toBulkAction(bulkItem: BulkItem): BulkActionType =
delegate.toBulkAction(bulkItem).asInstanceOf[BulkActionType]

override private[client] implicit def toBulkElasticAction(a: BulkActionType): BulkElasticAction =
delegate.toBulkElasticAction(a.asInstanceOf)
override private[client] implicit def toBulkElasticAction(
a: BulkActionType
): BulkElasticAction = {
// `a.asInstanceOf` (no type param) infers `Nothing` and the runtime cast blows up the moment
// the delegated `bulkStream`/`bulkStreamWithResult` path is exercised — `delegate`'s
// `BulkActionType` is abstract here, so we must explicitly project to it. (This was a latent
// bug — there was no real caller of `bulkStream` through the delegator chain until the
// sidecar's CTAS / INSERT-SELECT row-1 executor in softclient4es-arrow added one.)
// `delegate` is a `def`, so alias it into a stable `val` first to make the path-dependent
// type `d.BulkActionType` legal.
val d = delegate
d.toBulkElasticAction(a.asInstanceOf[d.BulkActionType])
}

/** Basic flow for executing a bulk action. This method must be implemented by concrete classes
* depending on the Elasticsearch version and client used.
Expand All @@ -1659,7 +1670,13 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
bulkOptions: BulkOptions,
system: ActorSystem
): Flow[Seq[BulkActionType], BulkResultType, NotUsed] =
delegate.bulkFlow(bulkOptions, system).asInstanceOf
// Same pattern as `toBulkElasticAction`/`actionToBulkItem` (~line 1648/1697): bare
// `.asInstanceOf` infers `Nothing`, so the synthetic cast at the call site (in
// `BulkApi.balancedBulkFlow`) tries `Flow → Nothing$` and CCEs. Project to this trait's
// own A/R abstract types — both erase to Object, so the runtime cast is a no-op.
delegate
.bulkFlow(bulkOptions, system)
.asInstanceOf[Flow[Seq[BulkActionType], BulkResultType, NotUsed]]

/** Convert a BulkResultType into individual results. This method must extract the successes and
* failures from the ES response.
Expand All @@ -1672,12 +1689,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
override private[client] def extractBulkResults(
result: BulkResultType,
originalBatch: Seq[BulkItem]
): Seq[Either[FailedDocument, SuccessfulDocument]] =
delegate.extractBulkResults(result.asInstanceOf, originalBatch)
): Seq[Either[FailedDocument, SuccessfulDocument]] = {
// Same pattern as `toBulkElasticAction` (see ~line 1648): bare `.asInstanceOf` infers
// `Nothing`, which blows up the moment the delegated bulk pipeline runs end-to-end. Project
// through a stable alias of `delegate` to the right path-dependent type.
val d = delegate
d.extractBulkResults(result.asInstanceOf[d.BulkResultType], originalBatch)
}

/** Conversion BulkActionType -> BulkItem */
override private[client] def actionToBulkItem(action: BulkActionType): BulkItem =
delegate.actionToBulkItem(action.asInstanceOf)
override private[client] def actionToBulkItem(action: BulkActionType): BulkItem = {
val d = delegate
d.actionToBulkItem(action.asInstanceOf[d.BulkActionType])
}

// ==================== PipelineApi (delegate) ====================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,12 @@ class MetricsElasticClient(
context: ConversionContext
): Future[ElasticResult[ElasticResponse]] =
measureAsync("searchAsync", Some(elasticQuery.indices.mkString(","))) {
// Bare `.asInstanceOf` infers `Nothing` — same latent shape as the bulk-pipeline fixes in
// [[ElasticClientDelegator]]. Project to this trait's `ElasticResponse` so the cast is a
// no-op at runtime (both sides erase to Object) instead of `Future → Nothing$`.
delegate
.singleSearchAsync(elasticQuery, fieldAliases, aggregations, fields, nestedHits)
.asInstanceOf
.asInstanceOf[Future[ElasticResult[ElasticResponse]]]
}

/** Asynchronous multi-search with Elasticsearch queries.
Expand All @@ -964,9 +967,10 @@ class MetricsElasticClient(
context: ConversionContext
): Future[ElasticResult[ElasticResponse]] =
measureAsync("multisearchAsync") {
// Same latent-`Nothing`-cast fix as `singleSearchAsync` above.
delegate
.multiSearchAsync(elasticQueries, fieldAliases, aggregations, fields, nestedHits)
.asInstanceOf
.asInstanceOf[Future[ElasticResult[ElasticResponse]]]
}

/** Searches and converts results into typed entities.
Expand Down
Loading