Skip to content

Commit 08e52b9

Browse files
mohityadav766github-actions[bot]claude
authored
Fix payload size issue (#27388)
* Fix Payload Size issue, increase bufer * Handle single entitiy with > 10 mb * Single entity push * Normalize SQl Queries * Update generated TypeScript types * Add Tests * Fix Failing Test * Revert fixes * Fix Tests * Strip Lineage * Strip lIneage and make default 9 mb * Add Warn log on Large entity size * Review Comments * Remove hierarchical fields * remove team containing users * revert unwanted changes * Fix test failures from payload size default change - Update mock expectations in SearchIndexExecutorControlFlowTest and DistributedJobParticipantTest to use DEFAULT_BULK_PAYLOAD_SIZE_BYTES instead of hardcoded 104857600L (old 100MB default) - Remove "charts" from DashboardIndex excluded fields — charts are needed for search filters and column lineage resolution Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix hardcoded payload size fallback in DistributedJobParticipant Replace hardcoded 104857600L with SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES to use the centralized 9MB default consistently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Spotless * Update payLoadSize schema defaults to 9MB The JSON schema default for payLoadSize was 104857600 (100MB), which meant EventPublisherJob.getPayLoadSize() always returned 100MB instead of null, bypassing the DEFAULT_BULK_PAYLOAD_SIZE_BYTES fallback in DistributedJobParticipant. Align schema defaults with the 9MB bulk payload limit. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address review comments: fix log message, reuse getLineageData, fix test key algorithm - Fix misleading log in stripLineageForSize: report post-strip size, not imply bytes removed - Reuse getLineageData() in populateLineageData() instead of duplicating the DAO call and edge construction loop - Fix AddUpdateLineageScriptTest key algorithm to use maxKey+1 matching the Painless script, avoiding key collisions after deletions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address review: equals() in Painless, prune stale SQL keys, flush oversized single ops - Use .equals() instead of == for string comparison in REMOVE_LINEAGE_SCRIPT Painless to be explicit about value equality and null-safe (params on left) - Prune orphaned sqlQueryKey in ADD_UPDATE_LINEAGE when updating an edge with a different SQL query, preventing unbounded lineageSqlQueries growth - Restore currentBufferSize >= maxPayloadSizeBytes check in CustomBulkProcessor.add() so a single oversized operation is flushed immediately rather than sitting in the buffer - Update AddUpdateLineageScriptTest to mirror the new pruning logic Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e7b2d26 commit 08e52b9

114 files changed

Lines changed: 1415 additions & 315 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.openmetadata.service.exception.EntityNotFoundException;
4747
import org.openmetadata.service.exception.SearchIndexException;
4848
import org.openmetadata.service.search.ReindexContext;
49+
import org.openmetadata.service.search.SearchIndexUtils;
4950
import org.openmetadata.service.search.SearchRepository;
5051
import org.openmetadata.service.search.elasticsearch.ElasticSearchClient;
5152
import org.openmetadata.service.search.elasticsearch.EsUtils;
@@ -95,6 +96,7 @@ public static synchronized void resetDocBuildPoolSize() {
9596

9697
private final ElasticSearchClient searchClient;
9798
protected final SearchRepository searchRepository;
99+
private final long maxPayloadSizeBytes;
98100
private final CustomBulkProcessor bulkProcessor;
99101
private final StepStats stats = new StepStats();
100102

@@ -133,6 +135,7 @@ public ElasticSearchBulkSink(
133135
this.searchClient = (ElasticSearchClient) searchRepository.getSearchClient();
134136
this.batchSize = batchSize;
135137
this.maxConcurrentRequests = maxConcurrentRequests;
138+
this.maxPayloadSizeBytes = maxPayloadSizeBytes;
136139

137140
// Initialize stats
138141
stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0);
@@ -294,7 +297,7 @@ protected StageStatsTracker extractTracker(Map<String, Object> contextData) {
294297
return null;
295298
}
296299

297-
private static final int BULK_OPERATION_METADATA_OVERHEAD = 50;
300+
private static final int BULK_OPERATION_METADATA_OVERHEAD = 150;
298301

299302
private void addEntity(
300303
EntityInterface entity, String indexName, boolean recreateIndex, StageStatsTracker tracker) {
@@ -303,16 +306,53 @@ private void addEntity(
303306
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
304307
String json = JsonUtils.pojoToJson(searchIndexDoc);
305308
String docId = entity.getId().toString();
306-
long estimatedSize =
307-
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
309+
long rawDocSize = (long) json.getBytes(StandardCharsets.UTF_8).length;
310+
long estimatedSize = rawDocSize + BULK_OPERATION_METADATA_OVERHEAD;
311+
312+
if (rawDocSize > 1024 * 1024) {
313+
LOG.warn(
314+
"Large indexed doc: entityType={}, docId={}, size={}MB",
315+
entityType,
316+
docId,
317+
rawDocSize / (1024 * 1024));
318+
}
308319

320+
if (estimatedSize > maxPayloadSizeBytes) {
321+
long sizeLimit = maxPayloadSizeBytes - BULK_OPERATION_METADATA_OVERHEAD;
322+
json = SearchIndexUtils.stripLineageForSize(json, sizeLimit, docId, entityType);
323+
rawDocSize = json.getBytes(StandardCharsets.UTF_8).length;
324+
estimatedSize = rawDocSize + BULK_OPERATION_METADATA_OVERHEAD;
325+
}
326+
327+
if (estimatedSize > maxPayloadSizeBytes) {
328+
LOG.warn(
329+
"Document {} of type {} is too large for bulk ({} bytes), sending directly",
330+
docId,
331+
entityType,
332+
rawDocSize);
333+
totalSubmitted.incrementAndGet();
334+
if (tracker != null) {
335+
tracker.incrementPendingSink();
336+
}
337+
indexDocumentDirectly(indexName, docId, json, entityType, tracker);
338+
processSuccess.incrementAndGet();
339+
if (tracker != null) {
340+
tracker.recordProcess(StatsResult.SUCCESS);
341+
}
342+
return;
343+
}
344+
345+
final String indexableJson = json;
309346
BulkOperation operation;
310347
if (recreateIndex) {
311348
operation =
312349
BulkOperation.of(
313350
op ->
314351
op.index(
315-
idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json))));
352+
idx ->
353+
idx.index(indexName)
354+
.id(docId)
355+
.document(EsUtils.toJsonData(indexableJson))));
316356
} else {
317357
operation =
318358
BulkOperation.of(
@@ -321,7 +361,10 @@ private void addEntity(
321361
upd ->
322362
upd.index(indexName)
323363
.id(docId)
324-
.action(a -> a.doc(EsUtils.toJsonData(json)).docAsUpsert(true))));
364+
.action(
365+
a ->
366+
a.doc(EsUtils.toJsonData(indexableJson))
367+
.docAsUpsert(true))));
325368
}
326369
if (tracker != null) {
327370
tracker.incrementPendingSink();
@@ -369,6 +412,42 @@ private void addEntity(
369412
}
370413
}
371414

415+
private void indexDocumentDirectly(
416+
String indexName, String docId, String json, String entityType, StageStatsTracker tracker) {
417+
try {
418+
searchClient
419+
.getNewClient()
420+
.index(idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json)));
421+
totalSuccess.incrementAndGet();
422+
updateStats();
423+
if (tracker != null) {
424+
tracker.recordSink(StatsResult.SUCCESS);
425+
}
426+
} catch (Exception e) {
427+
LOG.error(
428+
"Direct index failed for document {} of type {}: {}",
429+
docId,
430+
entityType,
431+
e.getMessage(),
432+
e);
433+
totalFailed.incrementAndGet();
434+
updateStats();
435+
if (tracker != null) {
436+
tracker.recordSink(StatsResult.FAILED);
437+
}
438+
if (failureCallback != null) {
439+
failureCallback.onFailure(
440+
entityType,
441+
docId,
442+
null,
443+
String.format(
444+
"Document too large for bulk (%d bytes); direct index failed: %s",
445+
json.getBytes(StandardCharsets.UTF_8).length, e.getMessage()),
446+
IndexingFailureRecorder.FailureStage.SINK);
447+
}
448+
}
449+
}
450+
372451
private void addTimeSeriesEntity(
373452
EntityTimeSeriesInterface entity,
374453
String indexName,
@@ -755,6 +834,8 @@ void add(
755834
throw new IllegalStateException("Bulk processor is closed");
756835
}
757836

837+
totalSubmitted.incrementAndGet();
838+
758839
if (docId != null) {
759840
if (entityType != null) {
760841
docIdToEntityType.put(docId, entityType);
@@ -766,6 +847,10 @@ void add(
766847

767848
long operationSize =
768849
estimatedSizeBytes > 0 ? estimatedSizeBytes : estimateOperationSize(operation);
850+
851+
if (!buffer.isEmpty() && currentBufferSize + operationSize >= maxPayloadSizeBytes) {
852+
flushInternal();
853+
}
769854
buffer.add(operation);
770855
currentBufferSize += operationSize;
771856

@@ -852,8 +937,6 @@ private void flushInternal() {
852937

853938
long executionId = executionIdCounter.incrementAndGet();
854939
int numberOfActions = toFlush.size();
855-
totalSubmitted.addAndGet(numberOfActions);
856-
857940
LOG.debug("Executing bulk request {} with {} actions", executionId, numberOfActions);
858941

859942
try {

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.openmetadata.service.exception.EntityNotFoundException;
4343
import org.openmetadata.service.exception.SearchIndexException;
4444
import org.openmetadata.service.search.ReindexContext;
45+
import org.openmetadata.service.search.SearchIndexUtils;
4546
import org.openmetadata.service.search.SearchRepository;
4647
import org.openmetadata.service.search.indexes.ColumnSearchIndex;
4748
import org.openmetadata.service.search.opensearch.OpenSearchClient;
@@ -107,6 +108,7 @@ public interface SinkStatsCallback {
107108

108109
private final OpenSearchClient searchClient;
109110
protected final SearchRepository searchRepository;
111+
private final long maxPayloadSizeBytes;
110112
private final CustomBulkProcessor bulkProcessor;
111113
private final StepStats stats = new StepStats();
112114

@@ -152,6 +154,7 @@ public OpenSearchBulkSink(
152154
this.searchClient = (OpenSearchClient) searchRepository.getSearchClient();
153155
this.batchSize = batchSize;
154156
this.maxConcurrentRequests = maxConcurrentRequests;
157+
this.maxPayloadSizeBytes = maxPayloadSizeBytes;
155158

156159
// Initialize stats
157160
stats.withTotalRecords(0).withSuccessRecords(0).withFailedRecords(0);
@@ -333,7 +336,7 @@ protected StageStatsTracker extractTracker(Map<String, Object> contextData) {
333336
return null;
334337
}
335338

336-
private static final int BULK_OPERATION_METADATA_OVERHEAD = 50;
339+
private static final int BULK_OPERATION_METADATA_OVERHEAD = 150;
337340

338341
private void addEntity(
339342
EntityInterface entity,
@@ -354,10 +357,43 @@ private void addEntity(
354357

355358
String finalJson = json;
356359
String docId = entity.getId().toString();
357-
long estimatedSize =
358-
(long) finalJson.getBytes(StandardCharsets.UTF_8).length
359-
+ BULK_OPERATION_METADATA_OVERHEAD;
360+
long rawDocSize = (long) finalJson.getBytes(StandardCharsets.UTF_8).length;
361+
long estimatedSize = rawDocSize + BULK_OPERATION_METADATA_OVERHEAD;
362+
363+
if (rawDocSize > 1024 * 1024) {
364+
LOG.warn(
365+
"Large indexed doc: entityType={}, docId={}, size={}MB",
366+
entityType,
367+
docId,
368+
rawDocSize / (1024 * 1024));
369+
}
370+
371+
if (estimatedSize > maxPayloadSizeBytes) {
372+
long sizeLimit = maxPayloadSizeBytes - BULK_OPERATION_METADATA_OVERHEAD;
373+
finalJson = SearchIndexUtils.stripLineageForSize(finalJson, sizeLimit, docId, entityType);
374+
rawDocSize = finalJson.getBytes(StandardCharsets.UTF_8).length;
375+
estimatedSize = rawDocSize + BULK_OPERATION_METADATA_OVERHEAD;
376+
}
377+
378+
if (estimatedSize > maxPayloadSizeBytes) {
379+
LOG.warn(
380+
"Document {} of type {} is too large for bulk ({} bytes), sending directly",
381+
docId,
382+
entityType,
383+
rawDocSize);
384+
totalSubmitted.incrementAndGet();
385+
if (tracker != null) {
386+
tracker.incrementPendingSink();
387+
}
388+
indexDocumentDirectly(indexName, docId, finalJson, entityType, tracker);
389+
processSuccess.incrementAndGet();
390+
if (tracker != null) {
391+
tracker.recordProcess(StatsResult.SUCCESS);
392+
}
393+
return;
394+
}
360395

396+
final String indexableJson = finalJson;
361397
BulkOperation operation;
362398
if (recreateIndex) {
363399
operation =
@@ -367,7 +403,7 @@ private void addEntity(
367403
idx ->
368404
idx.index(indexName)
369405
.id(docId)
370-
.document(OsUtils.toJsonData(finalJson))));
406+
.document(OsUtils.toJsonData(indexableJson))));
371407
} else {
372408
operation =
373409
BulkOperation.of(
@@ -376,7 +412,7 @@ private void addEntity(
376412
upd ->
377413
upd.index(indexName)
378414
.id(docId)
379-
.document(OsUtils.toJsonData(finalJson))
415+
.document(OsUtils.toJsonData(indexableJson))
380416
.docAsUpsert(true)));
381417
}
382418
if (tracker != null) {
@@ -425,6 +461,42 @@ private void addEntity(
425461
}
426462
}
427463

464+
private void indexDocumentDirectly(
465+
String indexName, String docId, String json, String entityType, StageStatsTracker tracker) {
466+
try {
467+
searchClient
468+
.getNewClient()
469+
.index(idx -> idx.index(indexName).id(docId).document(OsUtils.toJsonData(json)));
470+
totalSuccess.incrementAndGet();
471+
updateStats();
472+
if (tracker != null) {
473+
tracker.recordSink(StatsResult.SUCCESS);
474+
}
475+
} catch (Exception e) {
476+
LOG.error(
477+
"Direct index failed for document {} of type {}: {}",
478+
docId,
479+
entityType,
480+
e.getMessage(),
481+
e);
482+
totalFailed.incrementAndGet();
483+
updateStats();
484+
if (tracker != null) {
485+
tracker.recordSink(StatsResult.FAILED);
486+
}
487+
if (failureCallback != null) {
488+
failureCallback.onFailure(
489+
entityType,
490+
docId,
491+
null,
492+
String.format(
493+
"Document too large for bulk (%d bytes); direct index failed: %s",
494+
json.getBytes(StandardCharsets.UTF_8).length, e.getMessage()),
495+
IndexingFailureRecorder.FailureStage.SINK);
496+
}
497+
}
498+
}
499+
428500
private void addTimeSeriesEntity(
429501
EntityTimeSeriesInterface entity,
430502
String indexName,
@@ -899,6 +971,8 @@ void add(
899971
throw new IllegalStateException("Bulk processor is closed");
900972
}
901973

974+
totalSubmitted.incrementAndGet();
975+
902976
if (docId != null) {
903977
if (entityType != null) {
904978
docIdToEntityType.put(docId, entityType);
@@ -910,6 +984,10 @@ void add(
910984

911985
long operationSize =
912986
estimatedSizeBytes > 0 ? estimatedSizeBytes : estimateOperationSize(operation);
987+
988+
if (!buffer.isEmpty() && currentBufferSize + operationSize >= maxPayloadSizeBytes) {
989+
flushInternal();
990+
}
913991
buffer.add(operation);
914992
currentBufferSize += operationSize;
915993

@@ -1001,8 +1079,6 @@ private void flushInternal() {
10011079

10021080
long executionId = executionIdCounter.incrementAndGet();
10031081
int numberOfActions = toFlush.size();
1004-
totalSubmitted.addAndGet(numberOfActions);
1005-
10061082
LOG.debug("Executing bulk request {} with {} actions", executionId, numberOfActions);
10071083

10081084
try {

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public record ReindexingConfiguration(
4646
private static final int DEFAULT_PRODUCER_THREADS = 1;
4747
private static final int DEFAULT_QUEUE_SIZE = 100;
4848
private static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 100;
49-
private static final long DEFAULT_PAYLOAD_SIZE = 104857600L;
49+
private static final long DEFAULT_PAYLOAD_SIZE =
50+
SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES;
5051
private static final int DEFAULT_FIELD_FETCH_THREADS = 0;
5152
private static final int DEFAULT_DOC_BUILD_THREADS = 0;
5253
private static final long DEFAULT_STATS_INTERVAL_MS = 0;

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobParticipant.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.openmetadata.service.cache.CacheConfig;
2929
import org.openmetadata.service.jdbi3.AppRepository;
3030
import org.openmetadata.service.jdbi3.CollectionDAO;
31+
import org.openmetadata.service.search.SearchClusterMetrics;
3132
import org.openmetadata.service.search.SearchRepository;
3233

3334
/**
@@ -333,7 +334,7 @@ private void processJobPartitions(SearchIndexJob job) {
333334
: 100,
334335
job.getJobConfiguration().getPayLoadSize() != null
335336
? job.getJobConfiguration().getPayLoadSize()
336-
: 104857600L);
337+
: SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES);
337338

338339
int batchSize =
339340
job.getJobConfiguration().getBatchSize() != null

0 commit comments

Comments
 (0)