From 14cb35ff38fc73ed92319fe70ce2a23fa6d2b4c5 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Thu, 23 Apr 2026 11:59:13 +0530 Subject: [PATCH 1/9] fix(it): stop two flaky integration tests (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GlossaryOntologyExportIT: mark @Isolated. @BeforeAll flips RdfUpdater (a JVM-wide singleton) on, which makes every concurrent test class start doing synchronous Fuseki writes on entity create, saturating the Dropwizard thread pool and causing 60s request timeouts. @Execution (SAME_THREAD) alone only serialises within this class. WorkflowDefinitionResourceIT#triggerWorkflow_SDK: drop the redundant waitForWorkflowDeployment call — the create path already waits. Add descriptive aliases to the two await() polls so the next flake tells us which FQN or workflow name actually timed out instead of an anonymous lambda. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../it/tests/GlossaryOntologyExportIT.java | 10 +++++++--- .../it/tests/WorkflowDefinitionResourceIT.java | 5 ++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java index 8a2f50f062af..7c6107efe2ef 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; import org.openmetadata.it.bootstrap.TestSuiteBootstrap; import org.openmetadata.it.factories.GlossaryTermTestFactory; import org.openmetadata.it.factories.GlossaryTestFactory; @@ -41,10 +42,13 @@ * *

Test isolation: Uses TestNamespace for unique entity naming. * - *

Parallelization: Runs with @Execution(ExecutionMode.SAME_THREAD) because each test - * blocks a server thread on synchronous Fuseki writes; concurrent execution can exhaust the - * server thread pool and cause request timeouts. + *

Parallelization: Annotated {@code @Isolated} because {@link RdfUpdater} is a JVM-wide + * singleton. {@code @BeforeAll} flips it on, so any test class running concurrently starts doing + * synchronous Fuseki writes on every entity create — saturating the Dropwizard thread pool and + * causing 60s request timeouts (see issue #27649). {@code @Execution(SAME_THREAD)} alone only + * serializes within this class and does not prevent that cross-class leakage. */ +@Isolated @Execution(ExecutionMode.SAME_THREAD) @ExtendWith(TestNamespaceExtension.class) public class GlossaryOntologyExportIT { diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index fa1ee6844449..c08a8fea708a 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -6929,7 +6929,6 @@ private void triggerWorkflow_SDK(TestNamespace ns, List localTestTables) String workflowName = "DataCompletenessWorkflow"; OpenMetadataClient client = SdkClients.adminClient(); - waitForWorkflowDeployment(client, workflowName); for (Table table : localTestTables) { waitForEntityIndexedInSearch(client, "table_search_index", table.getFullyQualifiedName()); } @@ -7047,7 +7046,7 @@ private Domain getOrCreateDomain(TestNamespace ns) { } private void waitForWorkflowDeployment(OpenMetadataClient client, String workflowName) { - await() + await("workflow '" + workflowName + "' to finish Flowable deployment") .atMost(Duration.ofSeconds(120)) .pollDelay(Duration.ofSeconds(1)) .pollInterval(Duration.ofSeconds(2)) @@ -7062,7 +7061,7 @@ private void waitForWorkflowDeployment(OpenMetadataClient client, String workflo private void waitForEntityIndexedInSearch( OpenMetadataClient client, String indexName, String entityFqn) { - await() + await("entity '" + entityFqn + "' to appear in " + indexName) .atMost(Duration.ofSeconds(120)) .pollDelay(Duration.ofSeconds(1)) .pollInterval(Duration.ofSeconds(2)) From 09e592397b483658def4067059d7a076a05e7d35 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Thu, 23 Apr 2026 14:00:08 +0530 Subject: [PATCH 2/9] fix(search): never skip live indexing during reindex (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Live search indexing was silently skipped whenever a reindex job was in RUNNING/READY/STOPPING state. SearchRepository.createEntityIndex() and six sibling methods consulted SearchIndexRetryQueue.isEntityTypeSuspended() and returned early with nothing written, nothing enqueued — entities vanished from search until a future reindex happened to cover them. The retry worker doubled down: when the scope refresh observed an active job, it purged the retry queue; and processRecord() deleted records whose type was suspended. So even manually enqueued retries were wiped. This is how the #27649 flake surfaced: AppsResourceIT triggers SearchIndexingApplication runs and its best-effort 30s wait silently swallows timeouts. If a run was still RUNNING when AppsResourceIT finished, the next class in the sequential fork (WorkflowDefinitionResourceIT) inherited the suspension and its freshly-created tables were never indexed — waitForEntityIndexedInSearch then timed out at 120s. Same mechanism bites real users mid-reindex in production. Remove the suspension mechanism entirely: * SearchRepository — drop the 8 isEntityTypeSuspended() early-returns; the client-availability path already enqueues for retry on its own. * SearchIndexRetryWorker — drop refreshReindexSuspensionScopeIfNeeded() and the suspension branches in processRecord(); remove the retry-queue purge on suspendAll. * SearchIndexRetryQueue — delete the updateSuspension / clearSuspension / isEntityTypeSuspended / isStreamingSuspended / isSuspendAllStreaming / getSuspendedEntityTypes API and the static AtomicBoolean / AtomicReference they backed. * Drop the two IT cases that asserted the removed behaviour. Live writes now always reach the search client; reindex and live writes both target the same indices as before. Version conflicts between the two paths (stale reindex batch overwriting a newer live write) remain possible as they did before suspension was introduced — that is the race suspension was meant to dodge, but dropping writes altogether was worse than the race. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../it/tests/SearchIndexRetryQueueIT.java | 58 --------- .../service/search/SearchIndexRetryQueue.java | 49 ------- .../search/SearchIndexRetryWorker.java | 123 +----------------- .../service/search/SearchRepository.java | 50 ------- 4 files changed, 1 insertion(+), 279 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java index e4cd66576be6..7348aa656d54 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java @@ -793,64 +793,6 @@ void testEnqueuePreservesErrorDetailInFailureReason(TestNamespace ns) { retryQueueDAO.deleteByEntity(entityId, entityFqn); } - // --------------------------------------------------------------------------- - // Suspension tests - // --------------------------------------------------------------------------- - - @Test - void testSuspensionPreventsEnqueue(TestNamespace ns) { - String entityId = UUID.randomUUID().toString(); - String entityFqn = ns.prefix("rq") + ".suspended.entity"; - try { - SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true); - assertTrue(SearchIndexRetryQueue.isSuspendAllStreaming()); - - // Enqueue should still insert (suspension affects worker processing, not enqueueing) - SearchIndexRetryQueue.enqueue(entityId, entityFqn, "during suspension"); - List records = - retryQueueDAO.findByStatus(SearchIndexRetryQueue.STATUS_PENDING, 1000); - assertTrue(records.stream().anyMatch(r -> r.getEntityId().equals(entityId))); - } finally { - SearchIndexRetryQueue.clearSuspension(); - retryQueueDAO.deleteByEntity(entityId, entityFqn); - } - } - - @Test - void testWorkerDeletesRecordsDuringSuspendAll(TestNamespace ns) throws Exception { - String entityId = UUID.randomUUID().toString(); - String entityFqn = ns.prefix("rq") + ".suspended.entity"; - - retryQueueDAO.upsert( - entityId, entityFqn, "will be suspended", SearchIndexRetryQueue.STATUS_PENDING, "table"); - - try { - SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true); - - SearchIndexRetryWorker worker = new SearchIndexRetryWorker(collectionDAO, searchRepository); - worker.start(); - try { - Awaitility.await("Worker should delete record during full suspension") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until( - () -> { - List remaining = - retryQueueDAO.findByStatuses( - List.of( - SearchIndexRetryQueue.STATUS_PENDING, - SearchIndexRetryQueue.STATUS_IN_PROGRESS), - 1000); - return remaining.stream().noneMatch(r -> r.getEntityId().equals(entityId)); - }); - } finally { - worker.stop(); - } - } finally { - SearchIndexRetryQueue.clearSuspension(); - } - } - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java index 2f04cbb0d4eb..eccb11ef8e5f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java @@ -1,12 +1,7 @@ package org.openmetadata.service.search; import io.micrometer.core.instrument.Metrics; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; import org.openmetadata.service.Entity; @@ -24,10 +19,6 @@ public final class SearchIndexRetryQueue { private static final int MAX_REASON_LENGTH = 8192; - private static final AtomicReference> SUSPENDED_ENTITY_TYPES = - new AtomicReference<>(Collections.emptySet()); - private static final AtomicBoolean SUSPEND_ALL_STREAMING = new AtomicBoolean(false); - private SearchIndexRetryQueue() {} public static void enqueue(EntityInterface entity, String operation, Throwable failure) { @@ -117,46 +108,6 @@ public static boolean isRetryableStatusCode(int status) { return status < 400; } - public static void updateSuspension(Set entityTypes, boolean suspendAll) { - Set normalized = new HashSet<>(); - for (String entityType : entityTypes == null ? Collections.emptySet() : entityTypes) { - String normalizedType = normalize(entityType); - if (!normalizedType.isEmpty()) { - normalized.add(normalizedType); - } - } - - // Set entity types before the boolean so that isEntityTypeSuspended never - // sees suspendAll=false with an outdated (empty) entity-types set. - SUSPENDED_ENTITY_TYPES.set(Collections.unmodifiableSet(normalized)); - SUSPEND_ALL_STREAMING.set(suspendAll); - } - - public static void clearSuspension() { - SUSPEND_ALL_STREAMING.set(false); - SUSPENDED_ENTITY_TYPES.set(Collections.emptySet()); - } - - public static boolean isEntityTypeSuspended(String entityType) { - if (SUSPEND_ALL_STREAMING.get()) { - return true; - } - String normalized = normalize(entityType); - return !normalized.isEmpty() && SUSPENDED_ENTITY_TYPES.get().contains(normalized); - } - - public static boolean isStreamingSuspended() { - return SUSPEND_ALL_STREAMING.get() || !SUSPENDED_ENTITY_TYPES.get().isEmpty(); - } - - public static boolean isSuspendAllStreaming() { - return SUSPEND_ALL_STREAMING.get(); - } - - public static Set getSuspendedEntityTypes() { - return SUSPENDED_ENTITY_TYPES.get(); - } - private static String truncate(String value) { if (value == null) { return null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java index 1c4348d0c817..7fa3fb79ecb8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java @@ -1,7 +1,6 @@ package org.openmetadata.service.search; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_FAILED; -import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_1; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_2; import static org.openmetadata.service.search.SearchIndexRetryQueue.normalize; @@ -25,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; @@ -35,7 +33,6 @@ import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; -import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexJobDAO.SearchIndexJobRecord; import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexRetryQueueDAO.SearchIndexRetryRecord; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; @@ -63,27 +60,18 @@ public class SearchIndexRetryWorker implements Managed { private static final int MAX_CASCADE_REINDEX = 5000; private static final int CASCADE_BATCH_SIZE = 200; private static final int MAX_BACKOFF_SECONDS = 60; - private static final int SUSPENSION_REFRESH_INTERVAL_MS = 5000; private static final int CANDIDATE_TYPES_REFRESH_INTERVAL_MS = 60000; private static final long STALE_RECOVERY_INTERVAL_MS = 60_000; private static final long STALE_THRESHOLD_MS = 10 * 60 * 1000; - private static final List ACTIVE_REINDEX_JOB_STATUSES = - List.of("RUNNING", "READY", "STOPPING"); - private static final List PURGEABLE_QUEUE_STATUSES = - List.of(STATUS_PENDING, STATUS_PENDING_RETRY_1, STATUS_PENDING_RETRY_2, STATUS_FAILED); - private final CollectionDAO collectionDAO; private final SearchRepository searchRepository; private final AtomicBoolean running = new AtomicBoolean(false); private final List workerThreads = new ArrayList<>(); - private final Object scopeRefreshLock = new Object(); private final Object candidateTypesLock = new Object(); private final Object staleRecoveryLock = new Object(); - private volatile long lastScopeRefreshAt; private volatile long lastStaleRecoveryAt; - private volatile String activeScopeSignature = ""; private volatile long candidateTypesLastRefreshAt; private volatile List cachedCandidateEntityTypes = Collections.emptyList(); private final AtomicInteger consecutiveUnavailableCount = new AtomicInteger(); @@ -137,7 +125,6 @@ public void stop() { } } workerThreads.clear(); - SearchIndexRetryQueue.clearSuspension(); LOG.info("Stopped search index retry worker"); } @@ -148,7 +135,6 @@ public void stop() { private void runLoop(int workerId) { while (running.get()) { try { - refreshReindexSuspensionScopeIfNeeded(); recoverStaleInProgressIfNeeded(); if (!waitForClientAvailability(workerId)) { @@ -181,22 +167,8 @@ private void runLoop(int workerId) { private void processRecord(SearchIndexRetryRecord record) { try { - if (SearchIndexRetryQueue.isSuspendAllStreaming()) { - collectionDAO - .searchIndexRetryQueueDAO() - .deleteByEntity(record.getEntityId(), record.getEntityFqn()); - return; - } - EntityReference root = resolveEntityReference(record); if (root != null) { - if (SearchIndexRetryQueue.isEntityTypeSuspended(root.getType())) { - collectionDAO - .searchIndexRetryQueueDAO() - .deleteByEntity(record.getEntityId(), record.getEntityFqn()); - return; - } - reindexEntityCascade(root); collectionDAO .searchIndexRetryQueueDAO() @@ -642,82 +614,9 @@ private int extractSearchStatusCode(Throwable t) { } // --------------------------------------------------------------------------- - // Suspension and scheduling + // Scheduling // --------------------------------------------------------------------------- - private void refreshReindexSuspensionScopeIfNeeded() { - long now = System.currentTimeMillis(); - if (now - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) { - return; - } - - synchronized (scopeRefreshLock) { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) { - return; - } - lastScopeRefreshAt = currentTime; - - List activeJobs = - collectionDAO.searchIndexJobDAO().findByStatusesWithLimit(ACTIVE_REINDEX_JOB_STATUSES, 1); - - if (activeJobs.isEmpty()) { - if (!activeScopeSignature.isEmpty() || SearchIndexRetryQueue.isStreamingSuspended()) { - SearchIndexRetryQueue.clearSuspension(); - activeScopeSignature = ""; - LOG.info("Cleared live search indexing suspension - no active reindex jobs"); - } - return; - } - - SearchIndexJobRecord activeJob = activeJobs.getFirst(); - EventPublisherJob jobConfiguration = null; - try { - if (activeJob.jobConfiguration() != null) { - jobConfiguration = - JsonUtils.readValue(activeJob.jobConfiguration(), EventPublisherJob.class); - } - } catch (Exception e) { - LOG.warn("Failed to parse job configuration for active reindex job {}", activeJob.id(), e); - } - - Set requestedEntities = - normalizeReindexEntities( - jobConfiguration != null ? jobConfiguration.getEntities() : null); - Set searchableEntities = searchRepository.getSearchEntities(); - - boolean containsAllToken = requestedEntities.stream().anyMatch("all"::equalsIgnoreCase); - Set suspendedTypes = - containsAllToken ? new HashSet<>(searchableEntities) : new HashSet<>(requestedEntities); - suspendedTypes.retainAll(searchableEntities); - - boolean suspendAll = - !searchableEntities.isEmpty() && suspendedTypes.containsAll(searchableEntities); - String newSignature = buildScopeSignature(activeJob.id(), suspendedTypes, suspendAll); - - if (newSignature.equals(activeScopeSignature)) { - return; - } - - activeScopeSignature = newSignature; - SearchIndexRetryQueue.updateSuspension(suspendedTypes, suspendAll); - - if (suspendAll) { - int purged = - collectionDAO.searchIndexRetryQueueDAO().deleteByStatuses(PURGEABLE_QUEUE_STATUSES); - LOG.info( - "Activated live search indexing suspension for all entity types using reindex job {} and purged {} retry queue rows", - activeJob.id(), - purged); - } else { - LOG.info( - "Activated live search indexing suspension for {} entity types using reindex job {}", - suspendedTypes.size(), - activeJob.id()); - } - } - } - private void recoverStaleInProgressIfNeeded() { long now = System.currentTimeMillis(); if (now - lastStaleRecoveryAt < STALE_RECOVERY_INTERVAL_MS) { @@ -744,26 +643,6 @@ private void recoverStaleInProgressIfNeeded() { } } - private Set normalizeReindexEntities(Set rawEntities) { - Set normalized = new HashSet<>(); - if (rawEntities == null) { - return normalized; - } - for (String entityType : rawEntities) { - String value = SearchIndexRetryQueue.normalize(entityType); - if (!value.isEmpty()) { - normalized.add(value); - } - } - return normalized; - } - - private String buildScopeSignature(String jobId, Set suspendedTypes, boolean suspendAll) { - List sorted = new ArrayList<>(suspendedTypes); - Collections.sort(sorted); - return jobId + "|" + suspendAll + "|" + String.join(",", sorted); - } - // --------------------------------------------------------------------------- // Utilities // --------------------------------------------------------------------------- diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 9028199b2b36..0e7078c7c19d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -852,13 +852,6 @@ public void createEntitiesIndex(List entities) throws IOExcepti String entityType = entities.getFirst().getEntityReference().getType(); Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); try { - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping live search indexing for {} entities because reindex is active for {}", - entities.size(), - entityType); - return; - } if (!getSearchClient().isClientAvailable()) { for (EntityInterface entity : entities) { SearchIndexRetryQueue.enqueue( @@ -1250,14 +1243,6 @@ public void updateEntitiesIndex(List entities) { String entityType = entry.getKey(); List typeEntities = entry.getValue(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping bulk live indexing for {} entities because reindex is active for {}", - typeEntities.size(), - entityType); - continue; - } - if (!getSearchClient().isClientAvailable()) { for (EntityInterface entity : typeEntities) { SearchIndexRetryQueue.enqueue( @@ -1366,12 +1351,6 @@ public void updateEntitiesBulk(List entities) { public void updateAssetDomainsForDataProduct( String dataProductFqn, List oldDomainFqns, List newDomains) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DATA_PRODUCT)) { - LOG.debug( - "Skipping updateAssetDomainsForDataProduct because reindex is active for {}", - Entity.DATA_PRODUCT); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, dataProductFqn, "updateAssetDomainsForDataProduct: Search client unavailable"); @@ -1393,11 +1372,6 @@ public void updateAssetDomainsByIds( List assetIds, List oldDomainFqns, List newDomains) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DATA_PRODUCT)) { - LOG.debug( - "Skipping updateAssetDomainsByIds because reindex is active for {}", Entity.DATA_PRODUCT); - return; - } if (!getSearchClient().isClientAvailable()) { for (UUID assetId : listOrEmpty(assetIds)) { SearchIndexRetryQueue.enqueue( @@ -1423,10 +1397,6 @@ public void updateAssetDomainsByIds( public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DOMAIN)) { - LOG.debug("Skipping updateDomainFqnByPrefix because reindex is active for {}", Entity.DOMAIN); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, newFqn, "updateDomainFqnByPrefix: Search client unavailable"); @@ -1445,11 +1415,6 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DOMAIN)) { - LOG.debug( - "Skipping updateAssetDomainFqnByPrefix because reindex is active for {}", Entity.DOMAIN); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, newFqn, "updateAssetDomainFqnByPrefix: Search client unavailable"); @@ -2145,13 +2110,6 @@ public void deleteEntityByFQNPrefix(EntityInterface entity) { if (entity != null) { String entityType = entity.getEntityReference().getType(); String fqn = entity.getFullyQualifiedName(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping deleteEntityByFQNPrefix for {} because reindex is active for {}", - fqn, - entityType); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( entity.getId() != null ? entity.getId().toString() : null, @@ -2922,14 +2880,6 @@ public Set getSearchEntities() { private boolean shouldSkipStreamingIndexing( String entityType, String entityId, String entityFqn, String operation) { - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping live search indexing operation {} for entityType {} because reindex is active", - operation, - entityType); - return true; - } - if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue(entityId, entityFqn, operation + ": Search client unavailable"); return true; From 52992bcdbcce4f292bc577630e115761843087a7 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Thu, 23 Apr 2026 19:02:12 +0530 Subject: [PATCH 3/9] fix(search): route live writes to staged index during reindex (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The distributed reindex has a TOCTOU: partitions read from a DB snapshot at T0 and write to a staged index, then at T1 (seconds later) the alias is atomically swapped from the old index to the staged one and the old index is deleted. Any entity that live-writers create between T0 and T1 goes via the alias → old index, and is destroyed when that old index is deleted post-swap. The CI log for #27649 shows this directly: 10:13:35 staged table_search_index_rebuild_…_215646 built from snapshot 10:13:40 POST /v1/tables table1_gold → written to alias target (old index _179670) 10:13:40 table2_silver, table3_bronze, table4_brass all written to old index _179670 10:13:42 Atomically swapped aliases from [_179670] to _215646 10:13:42 Successfully deleted index _179670 10:13:43+ waitForEntityIndexedInSearch polls, finds nothing, times out at 2 min Removing the silent-skip suspension mechanism in the previous commit exposed this race (it had been hidden by dropping the writes outright, which was strictly worse). Route live writes to the staged index during the reindex window: * SearchRepository gains an activeStagedIndices map (entityType → stagedIndex) plus register/unregister/resolveWriteIndex. Writes resolve to the staged index when one is registered for the type, otherwise to the canonical alias — the existing behaviour. * DefaultRecreateHandler.recreateIndexFromMapping registers the staged index as soon as it is created; finalizeReindex and promoteEntityIndex unregister it on every exit path (successful swap, swap failure, failed-reindex delete, exception). * Every live-write path in SearchRepository — createEntityIndex, createEntitiesIndex, indexTableColumns, indexColumnsForTables, updateEntityIndex, createTimeSeriesEntity, updateTimeSeriesEntity, deleteEntityIndex, deleteEntityByFQNPrefix, deleteTimeSeriesEntityById — goes through resolveWriteIndex instead of reading the canonical alias directly. During a reindex, live writes land in the index that the alias will promote to; after the swap the alias points to that same index and subsequent writes continue to reach the same place. Old-index deletion no longer discards fresh data. Note: searches through the alias during the brief reindex window (< seconds in the CI log) can miss a write until the swap lands — an acceptable trade compared to silently dropping the write or losing it on deletion. The #27649 test tolerates this because its 120s poll spans many swap cycles. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../search/DefaultRecreateHandler.java | 9 +++ .../service/search/SearchRepository.java | 71 ++++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index b98f48161ddb..bd457aba0ef8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -148,6 +148,7 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess entityType); return; } + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } LOG.info( @@ -176,6 +177,7 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess } catch (Exception ex) { LOG.error( "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); + searchRepository.unregisterStagedIndex(entityType, stagedIndex); ReindexingMetrics metrics = ReindexingMetrics.getInstance(); if (metrics != null) { metrics.recordPromotionFailure(entityType); @@ -196,6 +198,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess stagedIndex, entityType, ex); + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } } @@ -262,6 +266,8 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc stagedIndex, entityType, ex); + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } return; } @@ -306,6 +312,7 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc aliasesToAttach); return; } + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } else { LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType); } @@ -336,6 +343,7 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc } catch (Exception ex) { LOG.error( "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); + searchRepository.unregisterStagedIndex(entityType, stagedIndex); ReindexingMetrics promoteMetrics = ReindexingMetrics.getInstance(); if (promoteMetrics != null) { promoteMetrics.recordPromotionFailure(entityType); @@ -422,6 +430,7 @@ protected void recreateIndexFromMapping( String stagedIndexName = buildStagedIndexName(canonicalIndexName); searchClient.createIndex(stagedIndexName, mappingContent); + searchRepository.registerStagedIndex(entityType, stagedIndexName); Set existingAliases = activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 0e7078c7c19d..5837cbd6c8d9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -77,6 +77,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -154,6 +155,14 @@ public class SearchRepository { @Getter private Map entityIndexMap; + /** + * Staged index names being populated by an in-flight reindex, keyed by entity type. While an + * entry is present, live writes for that entity type are routed to the staged index instead of + * the canonical alias — so the writes survive the final alias swap that would otherwise promote + * a pre-snapshot index and drop the old one that held them. + */ + private final Map activeStagedIndices = new ConcurrentHashMap<>(); + private final String language; @Getter @Setter public SearchIndexFactory searchIndexFactory = new SearchIndexFactory(); @@ -486,6 +495,46 @@ public IndexMapping getIndexMapping(String entityType) { return entityIndexMap.get(entityType); } + /** + * Register a staged index as the live-write target for an entity type while a reindex populates + * it. Must be paired with {@link #unregisterStagedIndex(String, String)} once the alias swap is + * complete so writes go back through the canonical alias. + */ + public void registerStagedIndex(String entityType, String stagedIndex) { + if (entityType == null || stagedIndex == null) { + return; + } + activeStagedIndices.put(entityType, stagedIndex); + LOG.info( + "Routing live writes for entity '{}' to staged index '{}' until reindex promotes it", + entityType, + stagedIndex); + } + + /** Clear the staged-index routing for {@code entityType} if it matches {@code stagedIndex}. */ + public void unregisterStagedIndex(String entityType, String stagedIndex) { + if (entityType == null || stagedIndex == null) { + return; + } + if (activeStagedIndices.remove(entityType, stagedIndex)) { + LOG.info( + "Cleared staged-index routing for entity '{}' (was '{}')", entityType, stagedIndex); + } + } + + /** + * Resolve the index name a live write should target for {@code entityType}. During a reindex + * this returns the staged index directly so the write survives the subsequent alias swap; + * otherwise it returns the canonical alias configured in {@link IndexMapping}. + */ + public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { + String staged = activeStagedIndices.get(entityType); + if (staged != null) { + return staged; + } + return indexMapping.getIndexName(clusterAlias); + } + public String getIndexOrAliasName(String name) { if (clusterAlias == null || clusterAlias.isEmpty()) { return name; @@ -635,7 +684,7 @@ public void createEntityIndex(EntityInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createEntity(indexMapping.getIndexName(clusterAlias), entityId, doc); + searchClient.createEntity(resolveWriteIndex(entityType, indexMapping), entityId, doc); if (Entity.TABLE.equals(entityType)) { indexTableColumns((Table) entity); @@ -686,7 +735,8 @@ private void indexTableColumns(Table table) { if (!docs.isEmpty()) { try { - searchClient.createEntities(columnIndexMapping.getIndexName(clusterAlias), docs); + searchClient.createEntities( + resolveWriteIndex(Entity.TABLE_COLUMN, columnIndexMapping), docs); } catch (Exception e) { LOG.error( "Issue bulk indexing columns for table [{}]: {}", @@ -881,7 +931,7 @@ public void createEntitiesIndex(List entities) throws IOExcepti return; } - searchClient.createEntities(indexMapping.getIndexName(clusterAlias), docs); + searchClient.createEntities(resolveWriteIndex(entityType, indexMapping), docs); if (Entity.TABLE.equals(entityType)) { indexColumnsForTables(entities); @@ -900,7 +950,7 @@ private void indexColumnsForTables(List entities) { return; } - String indexName = columnIndexMapping.getIndexName(clusterAlias); + String indexName = resolveWriteIndex(Entity.TABLE_COLUMN, columnIndexMapping); List> allColumnDocs = new ArrayList<>(); for (EntityInterface entity : entities) { @@ -965,7 +1015,8 @@ public void createTimeSeriesEntity(EntityTimeSeriesInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createTimeSeriesEntity(indexMapping.getIndexName(clusterAlias), entityId, doc); + searchClient.createTimeSeriesEntity( + resolveWriteIndex(entityType, indexMapping), entityId, doc); } catch (Exception ie) { SearchIndexRetryQueue.enqueue( entityId, @@ -996,7 +1047,7 @@ public void updateTimeSeriesEntity(EntityTimeSeriesInterface entityTimeSeries) { searchIndexFactory.buildIndex(entityType, entityTimeSeries); Map doc = elasticSearchIndex.buildSearchIndexDoc(); searchClient.updateEntity( - indexMapping.getIndexName(clusterAlias), entityId, doc, DEFAULT_UPDATE_SCRIPT); + resolveWriteIndex(entityType, indexMapping), entityId, doc, DEFAULT_UPDATE_SCRIPT); } catch (RuntimeException e) { SearchIndexRetryQueue.enqueue( entityId, @@ -1079,7 +1130,7 @@ public void updateEntityIndex(EntityInterface entity) { doc, SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES, entityId, entityType); } - searchClient.updateEntity(indexMapping.getIndexName(clusterAlias), entityId, doc, scriptTxt); + searchClient.updateEntity(resolveWriteIndex(entityType, indexMapping), entityId, doc, scriptTxt); if (Entity.TABLE.equals(entityType)) { try { @@ -2086,7 +2137,7 @@ public void deleteEntityIndex(EntityInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); try { - searchClient.deleteEntity(indexMapping.getIndexName(clusterAlias), entityId); + searchClient.deleteEntity(resolveWriteIndex(entityType, indexMapping), entityId); deleteOrUpdateChildren(entity, indexMapping); if (Entity.TABLE.equals(entityType)) { deleteTableColumns((Table) entity); @@ -2120,7 +2171,7 @@ public void deleteEntityByFQNPrefix(EntityInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); try { - searchClient.deleteEntityByFQNPrefix(indexMapping.getIndexName(clusterAlias), fqn); + searchClient.deleteEntityByFQNPrefix(resolveWriteIndex(entityType, indexMapping), fqn); } catch (Exception ie) { SearchIndexRetryQueue.enqueue( entity.getId() != null ? entity.getId().toString() : null, @@ -2144,7 +2195,7 @@ public void deleteTimeSeriesEntityById(EntityTimeSeriesInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); try { - searchClient.deleteEntity(indexMapping.getIndexName(clusterAlias), entityId); + searchClient.deleteEntity(resolveWriteIndex(entityType, indexMapping), entityId); } catch (Exception ie) { SearchIndexRetryQueue.enqueue( entityId, From ea093b0d86ad00f2fca61ad677768c19701f85d0 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Fri, 24 Apr 2026 10:38:36 +0530 Subject: [PATCH 4/9] fix(search): re-register SearchIndexHandler on every SearchRepository init (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit routed live writes through resolveWriteIndex so they land in the staged index during reindex. The CI log for the next run showed the register/unregister fire correctly, but the live writes to tables still went to the canonical alias — as if activeStagedIndices was empty for the entity type. Root cause: stale handler pointing at a stale SearchRepository. TestSuiteBootstrap creates SearchRepository three times (migration, createIndices, and finally the embedded OpenMetadataApplication). Each constructor calls registerSearchIndexHandler → new SearchIndexHandler(this) → dispatcher.registerHandler(…). EntityLifecycleEventDispatcher. registerHandler silently SKIPS if a handler with the same name already exists (see EntityLifecycleEventDispatcher.java:80-86), so the dispatcher keeps the FIRST SearchIndexHandler forever — bound to the migration-time SearchRepository. Meanwhile DefaultRecreateHandler.registerStagedIndex writes into Entity.getSearchRepository(), which by then is the third (current) instance. Live writes flowing through the stale handler never see that entry; resolveWriteIndex falls through to the canonical alias; the alias swap at the end of the reindex drops the writes, same as before. Fix: unregister any existing SearchIndexHandler by name before registering the new one. The latest-constructed SearchRepository always owns the handler delivered through the dispatcher, so its activeStagedIndices is the one consulted on every live write. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../openmetadata/service/search/SearchRepository.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 5837cbd6c8d9..4ef4444314b5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -243,8 +243,15 @@ public SearchRepository(ElasticSearchConfiguration config, int maxDBConnections) */ private void registerSearchIndexHandler() { try { + EntityLifecycleEventDispatcher dispatcher = EntityLifecycleEventDispatcher.getInstance(); SearchIndexHandler searchHandler = new SearchIndexHandler(this); - EntityLifecycleEventDispatcher.getInstance().registerHandler(searchHandler); + // Drop any stale handler bound to a previous SearchRepository instance. Test suites and + // app bootstrap construct SearchRepository more than once and replace the singleton via + // Entity.setSearchRepository(...); without this the dispatcher keeps delivering events to + // the first instance and state maintained on the current instance (e.g. activeStagedIndices + // used for reindex write-routing) is never consulted. + dispatcher.unregisterHandler(searchHandler.getHandlerName()); + dispatcher.registerHandler(searchHandler); LOG.info("Successfully registered SearchIndexHandler for entity lifecycle events"); } catch (Exception e) { LOG.error("Failed to register SearchIndexHandler", e); From 568c2d32e5516c29637dfb74f61c3d9cfdb6d349 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 27 Apr 2026 12:06:20 +0530 Subject: [PATCH 5/9] fix(search): centralize staged-index routing on canonical name (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-key activeStagedIndices by canonical index name (e.g. openmetadata_table_search_index) instead of entity type, and route every live-write site through a single getWriteIndexName(IndexMapping) helper. Why - Previous routing went through resolveWriteIndex(entityType, mapping) but only at hand-picked call sites. Several write paths still resolved indexMapping.getIndexName(clusterAlias) directly and bypassed routing — bulkIndexPipelineExecutions, deleteByScript, softDeleteOrRestoreEntity, propagateToDomainChildren, updateEntityCertificationInSearch, propagateToRelatedEntities (PAGE), deleteTableColumns, updateTableColumnsInheritedFields. Any reindex in flight could lose those writes on the alias swap. - Keying by canonical index name lets any write site resolve correctly even without entity type in scope (FQN-prefix deletes, child propagation, script updates). What - activeStagedIndices: Map. - registerStagedIndex(entityType, stagedIndex) now resolves the canonical name from the IndexMapping before storing. - New getWriteIndexName(IndexMapping) is the single point of resolution; routeToStagedIfActive(String) handles raw alias names (e.g. pipeline_status_search_index resolved via getIndexOrAliasName). - Replaced every direct indexMapping.getIndexName(clusterAlias) for writes with getWriteIndexName(indexMapping). Admin/setup paths (createIndex/updateIndex/deleteIndex/createOrUpdateIndexTemplate) intentionally keep canonical names — they manage the alias itself. - Cascade ops on shared aliases (GLOBAL_SEARCH_ALIAS, DATA_ASSET_SEARCH_ALIAS, child aliases) are not entity-scoped and cannot route to a single staged index; left untouched. - resolveWriteIndex(entityType, mapping) preserved as a thin wrapper for binary compatibility. Also runs spotless:apply on the file. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../service/search/SearchRepository.java | 127 ++++++++++++------ 1 file changed, 86 insertions(+), 41 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 4ef4444314b5..dce6fa469b96 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -156,10 +156,14 @@ public class SearchRepository { @Getter private Map entityIndexMap; /** - * Staged index names being populated by an in-flight reindex, keyed by entity type. While an - * entry is present, live writes for that entity type are routed to the staged index instead of - * the canonical alias — so the writes survive the final alias swap that would otherwise promote - * a pre-snapshot index and drop the old one that held them. + * Staged index names being populated by an in-flight reindex, keyed by the canonical index name + * the alias normally points at (e.g. {@code openmetadata_table_search_index}). While an entry + * is present, every live write that resolves through {@link #getWriteIndexName(IndexMapping)} + * targets the staged index directly — so the writes survive the final alias swap that would + * otherwise promote a pre-snapshot index and drop the old one that held them. + * + *

Keying by canonical index name lets any write site route correctly even if it does not + * have the entity type in scope (deletes by FQN prefix, script updates, child propagation, …). */ private final Map activeStagedIndices = new ConcurrentHashMap<>(); @@ -503,17 +507,26 @@ public IndexMapping getIndexMapping(String entityType) { } /** - * Register a staged index as the live-write target for an entity type while a reindex populates - * it. Must be paired with {@link #unregisterStagedIndex(String, String)} once the alias swap is - * complete so writes go back through the canonical alias. + * Register a staged index as the live-write target for {@code entityType} while a reindex + * populates it. Must be paired with {@link #unregisterStagedIndex(String, String)} once the + * alias swap is complete so writes go back through the canonical alias. */ public void registerStagedIndex(String entityType, String stagedIndex) { if (entityType == null || stagedIndex == null) { return; } - activeStagedIndices.put(entityType, stagedIndex); + String canonical = canonicalIndexFor(entityType); + if (canonical == null) { + LOG.warn( + "Cannot register staged index '{}' for entity '{}': no IndexMapping found", + stagedIndex, + entityType); + return; + } + activeStagedIndices.put(canonical, stagedIndex); LOG.info( - "Routing live writes for entity '{}' to staged index '{}' until reindex promotes it", + "Routing live writes for canonical index '{}' (entity '{}') to staged index '{}' until reindex promotes it", + canonical, entityType, stagedIndex); } @@ -523,23 +536,58 @@ public void unregisterStagedIndex(String entityType, String stagedIndex) { if (entityType == null || stagedIndex == null) { return; } - if (activeStagedIndices.remove(entityType, stagedIndex)) { + String canonical = canonicalIndexFor(entityType); + if (canonical == null) { + return; + } + if (activeStagedIndices.remove(canonical, stagedIndex)) { LOG.info( - "Cleared staged-index routing for entity '{}' (was '{}')", entityType, stagedIndex); + "Cleared staged-index routing for canonical index '{}' (entity '{}', was '{}')", + canonical, + entityType, + stagedIndex); } } + private String canonicalIndexFor(String entityType) { + IndexMapping mapping = entityIndexMap.get(entityType); + return mapping != null ? mapping.getIndexName(clusterAlias) : null; + } + /** - * Resolve the index name a live write should target for {@code entityType}. During a reindex - * this returns the staged index directly so the write survives the subsequent alias swap; - * otherwise it returns the canonical alias configured in {@link IndexMapping}. + * Centralized resolution of the index name a live write should target. Every write path that + * ultimately calls {@link IndexMapping#getIndexName(String)} should route through this method + * so it transparently picks up the staged index when a reindex is in flight. Returns the + * canonical index name when nothing is staged. */ - public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { - String staged = activeStagedIndices.get(entityType); - if (staged != null) { - return staged; + public String getWriteIndexName(IndexMapping indexMapping) { + if (indexMapping == null) { + return null; + } + String canonical = indexMapping.getIndexName(clusterAlias); + return routeToStagedIfActive(canonical); + } + + /** + * Centralized routing for writes that already hold a resolved canonical index name (e.g. the + * caller resolved {@link IndexMapping#getIndexName(String)} earlier or holds an alias). If a + * reindex has registered a staged index for {@code canonicalIndexName}, returns the staged + * name; otherwise returns {@code canonicalIndexName} unchanged. + */ + public String routeToStagedIfActive(String canonicalIndexName) { + if (canonicalIndexName == null) { + return null; } - return indexMapping.getIndexName(clusterAlias); + String staged = activeStagedIndices.get(canonicalIndexName); + return staged != null ? staged : canonicalIndexName; + } + + /** + * Convenience for callers that have the entity type but not the {@link IndexMapping}. + * Equivalent to {@code getWriteIndexName(getIndexMapping(entityType))}. + */ + public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { + return getWriteIndexName(indexMapping); } public String getIndexOrAliasName(String name) { @@ -691,7 +739,7 @@ public void createEntityIndex(EntityInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createEntity(resolveWriteIndex(entityType, indexMapping), entityId, doc); + searchClient.createEntity(getWriteIndexName(indexMapping), entityId, doc); if (Entity.TABLE.equals(entityType)) { indexTableColumns((Table) entity); @@ -742,8 +790,7 @@ private void indexTableColumns(Table table) { if (!docs.isEmpty()) { try { - searchClient.createEntities( - resolveWriteIndex(Entity.TABLE_COLUMN, columnIndexMapping), docs); + searchClient.createEntities(getWriteIndexName(columnIndexMapping), docs); } catch (Exception e) { LOG.error( "Issue bulk indexing columns for table [{}]: {}", @@ -765,7 +812,7 @@ private void deleteTableColumns(Table table) { try { searchClient.deleteEntityByFields( - List.of(columnIndexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(columnIndexMapping)), List.of(new ImmutablePair<>("table.id", table.getId().toString()))); } catch (Exception e) { LOG.error( @@ -868,7 +915,7 @@ private void updateTableColumnsInheritedFields(Table table) { // Use updateChildren to efficiently update all columns for this table searchClient.updateChildren( - List.of(columnIndexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(columnIndexMapping)), new ImmutablePair<>("table.id", table.getId().toString()), new ImmutablePair<>(DEFAULT_UPDATE_SCRIPT, inheritedFields)); @@ -938,7 +985,7 @@ public void createEntitiesIndex(List entities) throws IOExcepti return; } - searchClient.createEntities(resolveWriteIndex(entityType, indexMapping), docs); + searchClient.createEntities(getWriteIndexName(indexMapping), docs); if (Entity.TABLE.equals(entityType)) { indexColumnsForTables(entities); @@ -957,7 +1004,7 @@ private void indexColumnsForTables(List entities) { return; } - String indexName = resolveWriteIndex(Entity.TABLE_COLUMN, columnIndexMapping); + String indexName = getWriteIndexName(columnIndexMapping); List> allColumnDocs = new ArrayList<>(); for (EntityInterface entity : entities) { @@ -1022,8 +1069,7 @@ public void createTimeSeriesEntity(EntityTimeSeriesInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createTimeSeriesEntity( - resolveWriteIndex(entityType, indexMapping), entityId, doc); + searchClient.createTimeSeriesEntity(getWriteIndexName(indexMapping), entityId, doc); } catch (Exception ie) { SearchIndexRetryQueue.enqueue( entityId, @@ -1054,7 +1100,7 @@ public void updateTimeSeriesEntity(EntityTimeSeriesInterface entityTimeSeries) { searchIndexFactory.buildIndex(entityType, entityTimeSeries); Map doc = elasticSearchIndex.buildSearchIndexDoc(); searchClient.updateEntity( - resolveWriteIndex(entityType, indexMapping), entityId, doc, DEFAULT_UPDATE_SCRIPT); + getWriteIndexName(indexMapping), entityId, doc, DEFAULT_UPDATE_SCRIPT); } catch (RuntimeException e) { SearchIndexRetryQueue.enqueue( entityId, @@ -1137,7 +1183,7 @@ public void updateEntityIndex(EntityInterface entity) { doc, SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES, entityId, entityType); } - searchClient.updateEntity(resolveWriteIndex(entityType, indexMapping), entityId, doc, scriptTxt); + searchClient.updateEntity(getWriteIndexName(indexMapping), entityId, doc, scriptTxt); if (Entity.TABLE.equals(entityType)) { try { @@ -1212,7 +1258,7 @@ public void updateEntityIndex(EntityInterface entity) { public void bulkIndexPipelineExecutions( Pipeline pipeline, List pipelineStatuses) { try { - String indexName = getIndexOrAliasName("pipeline_status_search_index"); + String indexName = routeToStagedIfActive(getIndexOrAliasName("pipeline_status_search_index")); List> docsAndIds = new ArrayList<>(); for (PipelineStatus pipelineStatus : pipelineStatuses) { PipelineExecutionIndex pipelineExecutionIndex = @@ -1570,11 +1616,11 @@ private void propagateToDomainChildren( String domainId, IndexMapping indexMapping, Pair> updates) throws IOException { searchClient.updateChildren( - List.of(indexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(indexMapping)), new ImmutablePair<>(PARENT_ID, domainId), updates); searchClient.updateChildren( - List.of(entityIndexMap.get(Entity.DATA_PRODUCT).getIndexName(clusterAlias)), + List.of(getWriteIndexName(entityIndexMap.get(Entity.DATA_PRODUCT))), new ImmutablePair<>(DOMAINS_ID, domainId), updates); } @@ -1688,7 +1734,7 @@ private AssetCertification getCertificationFromEntity(EntityInterface entity) { private void updateEntityCertificationInSearch( EntityInterface entity, AssetCertification certification) { IndexMapping indexMapping = entityIndexMap.get(entity.getEntityReference().getType()); - String indexName = indexMapping.getIndexName(clusterAlias); + String indexName = getWriteIndexName(indexMapping); Map paramMap = new HashMap<>(); if (certification != null && certification.getTagLabel() != null) { @@ -1714,7 +1760,7 @@ public void propagateToRelatedEntities( EntityInterface entity) { if (changeDescription != null && entityType.equalsIgnoreCase(Entity.PAGE)) { - String indexName = indexMapping.getIndexName(clusterAlias); + String indexName = getWriteIndexName(indexMapping); for (FieldChange field : changeDescription.getFieldsAdded()) { if (field.getName().contains(PARENT)) { String oldParentFQN = entity.getName(); @@ -2111,7 +2157,7 @@ public void deleteByScript(String entityType, String scriptTxt, Map Date: Mon, 27 Apr 2026 14:23:51 +0530 Subject: [PATCH 6/9] fix(it): bump DB sort memory + search-wait ceilings (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two CI failures observed under the parallel-tests fork load on the post-centralization run: 1. TagResourceIT line 161 (listEntities) — the server returned 500 wrapping "java.sql.SQLException: Out of sort memory, consider increasing server sort buffer size" from TagDAO.listAfter. The query joins tag → entity_relationship → classification and orders by tag.name,tag.id; with the tag table accumulating across many parallel test classes (reuseForks=true), MySQL's default 256KB sort_buffer_size overflows. Bump it to 8MB. Add a parallel work_mem=32MB bump to the postgres command for the same query. 2. TagResourceIT line 1 — Awaitility timeout at 1m30s waiting for a freshly created tag to appear in search index. Five inherited waits in BaseEntityIT had a 90s ceiling while the sibling checkCreatedEntity already used 180s. Standardise on 180s — under tag-scale data the alias swap that the staged-index routing depends on can take longer than 90s in slow CI workers. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../it/bootstrap/TestSuiteBootstrap.java | 16 ++++++++++++++-- .../org/openmetadata/it/tests/BaseEntityIT.java | 12 ++++++------ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java index 0ada0b23640a..e5507bee3ad0 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java @@ -230,7 +230,14 @@ private void startDatabase() { mysql.withDatabaseName("openmetadata"); mysql.withUsername("test"); mysql.withPassword("test"); - mysql.withCommand("mysqld", "--max_allowed_packet=" + mysqlMaxAllowedPacket); + mysql.withCommand( + "mysqld", + "--max_allowed_packet=" + mysqlMaxAllowedPacket, + // The tag list query (TagDAO.listAfter) joins three tables and sorts by tag.name, + // tag.id; under the parallel-tests fork the tag table grows large and the default + // 256KB sort_buffer_size overflows with "Out of sort memory" (#27649). 8MB is plenty + // for an integration-test workload and well under the 4GB overall limit. + "--sort_buffer_size=8M"); mysql.withStartupTimeoutSeconds(240); mysql.withConnectTimeoutSeconds(240); mysql.withTmpFs(java.util.Map.of("/var/lib/mysql", "rw,size=2g")); @@ -278,7 +285,12 @@ private void startDatabase() { "-c", "synchronous_commit=off", "-c", - "full_page_writes=off"); + "full_page_writes=off", + // Bump work_mem for the same reason MySQL gets a larger sort_buffer above: + // TagDAO.listAfter joins three tables and sorts; default 4MB spills to temp files + // under load. + "-c", + "work_mem=32MB"); postgres.withTmpFs(java.util.Map.of("/var/lib/postgresql/data", "rw,size=2g")); postgres.withCreateContainerCmdModifier( cmd -> diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java index dec08acab14e..065f9df593e0 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java @@ -4378,7 +4378,7 @@ void test_bulkCreateOrUpdate_searchIndexed(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); Awaitility.await() - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(2)) .ignoreExceptions() @@ -4440,7 +4440,7 @@ void test_bulkUpdate_searchIndexUpdated(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); Awaitility.await() - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(3)) .ignoreExceptions() @@ -5130,7 +5130,7 @@ void checkDeletedEntity(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5166,7 +5166,7 @@ void checkIndexCreated(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5194,7 +5194,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5213,7 +5213,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception { Awaitility.await("Wait for search to reflect update") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { From 37c343855a5b6f1bee6d2e53f953d3e90264a6a1 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 27 Apr 2026 16:51:57 +0530 Subject: [PATCH 7/9] fix(search): address Copilot/Gitar review on staged-index routing (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * DefaultRecreateHandler.finalizeReindex / promoteEntityIndex — wrap the entire promote block in try/finally so unregisterStagedIndex always runs, including on swap failure, empty aliasesToAttach, and exceptions. Without this the routing map could be left pointing at a staged index nobody reads from, silently diverging live writes from search results until the next reindex (Copilot, multiple comments). * SearchRepository.resolveWriteIndex — deprecate. The entityType argument is unused; getWriteIndexName(IndexMapping) is the single resolution point now (Copilot + Gitar). * SearchRepository.routeToStagedIfActive — tighten the Javadoc to state explicitly that it expects a canonical index name and that short/parent aliases are passed through unchanged (Copilot). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../service/search/DefaultRecreateHandler.java | 18 ++++++++++++++---- .../service/search/SearchRepository.java | 16 ++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index bd457aba0ef8..d90763ae9036 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -99,6 +99,13 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess } if (shouldPromote) { + // Always clear staged-index routing on the way out, regardless of outcome: + // - swap success → alias now points at staged; canonical and staged resolve to the + // same index, so unregistering keeps reads/writes consistent. + // - swap failure / empty aliases / exception → leaving routing active would silently + // send live writes to a staged index nothing reads from, which + // is strictly worse than the writes going back to the canonical + // alias target. Operators need to retry the reindex either way. try { Set aliasesToAttach = new HashSet<>(); @@ -148,7 +155,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess entityType); return; } - searchRepository.unregisterStagedIndex(entityType, stagedIndex); + } else { + LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType); } LOG.info( @@ -177,11 +185,12 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess } catch (Exception ex) { LOG.error( "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); - searchRepository.unregisterStagedIndex(entityType, stagedIndex); ReindexingMetrics metrics = ReindexingMetrics.getInstance(); if (metrics != null) { metrics.recordPromotionFailure(entityType); } + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } else { try { @@ -272,6 +281,7 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc return; } + // Always clear staged-index routing on the way out — see the rationale in finalizeReindex. try { Set aliasesToAttach = getAliasesFromMapping(indexMapping, searchRepository.getClusterAlias()); @@ -312,7 +322,6 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc aliasesToAttach); return; } - searchRepository.unregisterStagedIndex(entityType, stagedIndex); } else { LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType); } @@ -343,11 +352,12 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc } catch (Exception ex) { LOG.error( "Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex); - searchRepository.unregisterStagedIndex(entityType, stagedIndex); ReindexingMetrics promoteMetrics = ReindexingMetrics.getInstance(); if (promoteMetrics != null) { promoteMetrics.recordPromotionFailure(entityType); } + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index dce6fa469b96..0624c793c2c6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -569,10 +569,12 @@ public String getWriteIndexName(IndexMapping indexMapping) { } /** - * Centralized routing for writes that already hold a resolved canonical index name (e.g. the - * caller resolved {@link IndexMapping#getIndexName(String)} earlier or holds an alias). If a - * reindex has registered a staged index for {@code canonicalIndexName}, returns the staged - * name; otherwise returns {@code canonicalIndexName} unchanged. + * Centralized routing for writes that already hold a resolved canonical index name — i.e. the + * value of {@link IndexMapping#getIndexName(String)}, NOT a short alias such as the result of + * {@link IndexMapping#getAlias(String)} or any of the parent aliases. If a reindex has + * registered a staged index for {@code canonicalIndexName}, returns the staged name; otherwise + * returns {@code canonicalIndexName} unchanged. Pass non-canonical aliases through unchanged + * since the routing map only knows about canonical names. */ public String routeToStagedIfActive(String canonicalIndexName) { if (canonicalIndexName == null) { @@ -583,9 +585,11 @@ public String routeToStagedIfActive(String canonicalIndexName) { } /** - * Convenience for callers that have the entity type but not the {@link IndexMapping}. - * Equivalent to {@code getWriteIndexName(getIndexMapping(entityType))}. + * @deprecated Use {@link #getWriteIndexName(IndexMapping)} directly. The {@code entityType} + * argument is ignored; the canonical name is resolved from the supplied {@code + * indexMapping}. */ + @Deprecated(forRemoval = true) public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { return getWriteIndexName(indexMapping); } From 95aa9235b43c0452a98775deb8f4623b905e1414 Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 27 Apr 2026 18:56:33 +0530 Subject: [PATCH 8/9] fix(search): fan out cross-alias update-by-query to staged indices (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The four bulk update-by-query operations rooted on shared aliases — updateAssetDomainsForDataProduct, updateAssetDomainsByIds, updateDomainFqnByPrefix, updateAssetDomainFqnByPrefix — hardcoded their target to GLOBAL_SEARCH_ALIAS / Entity.DOMAIN. During an in-flight reindex those updates landed on the about-to-be-discarded active index only; on alias swap, the new staged index (built from a DB snapshot taken before the script ran) replaced it and the script's effect was lost. Copilot called this out four times. Add SearchRepository.getWriteFanoutTargets(aliasOrIndex) — returns the caller's alias plus every currently-staged index. Pass that list to req.index(...) on all four methods in both OpenSearchEntityManager and ElasticSearchEntityManager. The OS/ES update-by-query API natively takes a list, so the fan-out is one request per call. The scripts these methods run are idempotent (UPDATE_ASSET_DOMAIN_SCRIPT checks `exists` before adding a domain; UPDATE_DOMAIN_FQN_BY_PREFIX_SCRIPT walks the array and rewrites in place), so applying them again to the staged index — even if the staged copy of the document already reflects the latest DB state — converges to the same result. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../service/search/SearchRepository.java | 22 +++++++++++++++++++ .../ElasticSearchEntityManager.java | 12 ++++++---- .../opensearch/OpenSearchEntityManager.java | 16 ++++++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 0624c793c2c6..c85f816c94cd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -594,6 +594,28 @@ public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { return getWriteIndexName(indexMapping); } + /** + * Returns the index targets a write that normally goes through a multi-entity alias (e.g. + * {@code GLOBAL_SEARCH_ALIAS}, {@code DATA_ASSET_SEARCH_ALIAS}) should fan out to: the alias + * itself plus every currently-staged index. During an in-flight reindex, update-by-query + * operations rooted on shared aliases (asset domain reassignments, FQN renames, …) would + * otherwise update the about-to-be-discarded active index only — by including the staged + * index in the same request, the change is applied to whichever index ends up serving the + * alias after promotion. + * + *

Calling with an alias that is already a canonical entity index name is fine; the staged + * index for that canonical name will be added as expected. + */ + public List getWriteFanoutTargets(String aliasOrIndex) { + if (aliasOrIndex == null) { + return new ArrayList<>(activeStagedIndices.values()); + } + List targets = new ArrayList<>(activeStagedIndices.size() + 1); + targets.add(aliasOrIndex); + targets.addAll(activeStagedIndices.values()); + return targets; + } + public String getIndexOrAliasName(String name) { if (clusterAlias == null || clusterAlias.isEmpty()) { return name; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java index 0076c4ff8563..f2ef03d6d5d5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java @@ -1086,7 +1086,11 @@ public void updateAssetDomainsForDataProduct( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(termQuery) .conflicts(Conflicts.Proceed) .script( @@ -1167,7 +1171,7 @@ public void updateAssetDomainsByIds( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(idsQuery) .conflicts(Conflicts.Proceed) .script( @@ -1236,7 +1240,7 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(domainIndexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(domainIndexName)) .query(combinedQuery) .conflicts(Conflicts.Proceed) .script( @@ -1295,7 +1299,7 @@ public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(matchingDomainQuery) .conflicts(Conflicts.Proceed) .script( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java index bdda727d6402..566a173fe819 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java @@ -1182,7 +1182,11 @@ public void updateAssetDomainsForDataProduct( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(termQuery) .conflicts(Conflicts.Proceed) .script( @@ -1261,7 +1265,11 @@ public void updateAssetDomainsByIds( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(idsQuery) .conflicts(Conflicts.Proceed) .script( @@ -1332,7 +1340,7 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(domainIndexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(domainIndexName)) .query(combinedQuery) .conflicts(Conflicts.Proceed) .script( @@ -1394,7 +1402,7 @@ public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(matchingDomainQuery) .conflicts(Conflicts.Proceed) .script( From addea6702e058559d6c4e11db079037b498b2b3f Mon Sep 17 00:00:00 2001 From: mohitdeuex Date: Mon, 27 Apr 2026 19:04:43 +0530 Subject: [PATCH 9/9] fix(search): scope fan-out to canonical input vs multi-entity alias (#27649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous getWriteFanoutTargets always appended every staged index, which made entity-scoped update-by-query calls (e.g. updateDomainFqnByPrefix targeting only the domain canonical index) fan out onto unrelated staged indices. Adds avoidable load on every currently-reindexing entity type for an update that should touch one index. Branch the implementation on whether the input is a known canonical entity index name. If yes, only the matching staged index is added. If no — i.e. the caller is hitting a multi-entity alias such as GLOBAL_SEARCH_ALIAS — every staged index is added because the update's match query can hit documents from any reindexing type. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../service/search/SearchRepository.java | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index c85f816c94cd..3db14f3bccab 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -595,27 +595,52 @@ public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { } /** - * Returns the index targets a write that normally goes through a multi-entity alias (e.g. - * {@code GLOBAL_SEARCH_ALIAS}, {@code DATA_ASSET_SEARCH_ALIAS}) should fan out to: the alias - * itself plus every currently-staged index. During an in-flight reindex, update-by-query - * operations rooted on shared aliases (asset domain reassignments, FQN renames, …) would - * otherwise update the about-to-be-discarded active index only — by including the staged - * index in the same request, the change is applied to whichever index ends up serving the - * alias after promotion. + * Returns the index targets a write should fan out to so it survives an in-flight reindex. * - *

Calling with an alias that is already a canonical entity index name is fine; the staged - * index for that canonical name will be added as expected. + *

    + *
  • When {@code aliasOrIndex} is a known canonical entity index name (i.e. matches the + * value of {@link IndexMapping#getIndexName(String)} for some registered entity), the + * result is the input plus the single staged index for that entity (if any). Avoids + * fanning out an entity-scoped update-by-query — e.g. {@code updateDomainFqnByPrefix} + * targeting only the domain index — onto unrelated staged indices. + *
  • When {@code aliasOrIndex} is a multi-entity alias such as {@code GLOBAL_SEARCH_ALIAS} + * or {@code DATA_ASSET_SEARCH_ALIAS}, the result is the input plus every currently + * staged index, since the original update can match documents whose owning entity + * type is being reindexed. + *
+ * + *

Without this fan-out, update-by-query operations rooted on shared aliases would update + * only the about-to-be-discarded active index and lose their effect on alias swap. */ public List getWriteFanoutTargets(String aliasOrIndex) { if (aliasOrIndex == null) { return new ArrayList<>(activeStagedIndices.values()); } - List targets = new ArrayList<>(activeStagedIndices.size() + 1); + List targets = new ArrayList<>(); targets.add(aliasOrIndex); - targets.addAll(activeStagedIndices.values()); + if (isKnownCanonicalIndex(aliasOrIndex)) { + String staged = activeStagedIndices.get(aliasOrIndex); + if (staged != null) { + targets.add(staged); + } + } else { + targets.addAll(activeStagedIndices.values()); + } return targets; } + private boolean isKnownCanonicalIndex(String name) { + if (entityIndexMap == null || name == null) { + return false; + } + for (IndexMapping mapping : entityIndexMap.values()) { + if (mapping != null && name.equals(mapping.getIndexName(clusterAlias))) { + return true; + } + } + return false; + } + public String getIndexOrAliasName(String name) { if (clusterAlias == null || clusterAlias.isEmpty()) { return name;