diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java index 0ada0b23640a..e5507bee3ad0 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java @@ -230,7 +230,14 @@ private void startDatabase() { mysql.withDatabaseName("openmetadata"); mysql.withUsername("test"); mysql.withPassword("test"); - mysql.withCommand("mysqld", "--max_allowed_packet=" + mysqlMaxAllowedPacket); + mysql.withCommand( + "mysqld", + "--max_allowed_packet=" + mysqlMaxAllowedPacket, + // The tag list query (TagDAO.listAfter) joins three tables and sorts by tag.name, + // tag.id; under the parallel-tests fork the tag table grows large and the default + // 256KB sort_buffer_size overflows with "Out of sort memory" (#27649). 8MB is plenty + // for an integration-test workload and well under the 4GB overall limit. + "--sort_buffer_size=8M"); mysql.withStartupTimeoutSeconds(240); mysql.withConnectTimeoutSeconds(240); mysql.withTmpFs(java.util.Map.of("/var/lib/mysql", "rw,size=2g")); @@ -278,7 +285,12 @@ private void startDatabase() { "-c", "synchronous_commit=off", "-c", - "full_page_writes=off"); + "full_page_writes=off", + // Bump work_mem for the same reason MySQL gets a larger sort_buffer above: + // TagDAO.listAfter joins three tables and sorts; default 4MB spills to temp files + // under load. + "-c", + "work_mem=32MB"); postgres.withTmpFs(java.util.Map.of("/var/lib/postgresql/data", "rw,size=2g")); postgres.withCreateContainerCmdModifier( cmd -> diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java index dec08acab14e..065f9df593e0 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java @@ -4378,7 +4378,7 @@ void test_bulkCreateOrUpdate_searchIndexed(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); Awaitility.await() - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(2)) .ignoreExceptions() @@ -4440,7 +4440,7 @@ void test_bulkUpdate_searchIndexUpdated(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); Awaitility.await() - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(3)) .ignoreExceptions() @@ -5130,7 +5130,7 @@ void checkDeletedEntity(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5166,7 +5166,7 @@ void checkIndexCreated(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5194,7 +5194,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception { Awaitility.await("Wait for entity to appear in search index") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { @@ -5213,7 +5213,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception { Awaitility.await("Wait for search to reflect update") .pollDelay(Duration.ofMillis(500)) .pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(90)) + .atMost(Duration.ofSeconds(180)) .ignoreExceptions() .untilAsserted( () -> { diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java index 8a2f50f062af..7c6107efe2ef 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; import org.openmetadata.it.bootstrap.TestSuiteBootstrap; import org.openmetadata.it.factories.GlossaryTermTestFactory; import org.openmetadata.it.factories.GlossaryTestFactory; @@ -41,10 +42,13 @@ * *

Test isolation: Uses TestNamespace for unique entity naming. * - *

Parallelization: Runs with @Execution(ExecutionMode.SAME_THREAD) because each test - * blocks a server thread on synchronous Fuseki writes; concurrent execution can exhaust the - * server thread pool and cause request timeouts. + *

Parallelization: Annotated {@code @Isolated} because {@link RdfUpdater} is a JVM-wide + * singleton. {@code @BeforeAll} flips it on, so any test class running concurrently starts doing + * synchronous Fuseki writes on every entity create — saturating the Dropwizard thread pool and + * causing 60s request timeouts (see issue #27649). {@code @Execution(SAME_THREAD)} alone only + * serializes within this class and does not prevent that cross-class leakage. */ +@Isolated @Execution(ExecutionMode.SAME_THREAD) @ExtendWith(TestNamespaceExtension.class) public class GlossaryOntologyExportIT { diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java index e4cd66576be6..7348aa656d54 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java @@ -793,64 +793,6 @@ void testEnqueuePreservesErrorDetailInFailureReason(TestNamespace ns) { retryQueueDAO.deleteByEntity(entityId, entityFqn); } - // --------------------------------------------------------------------------- - // Suspension tests - // --------------------------------------------------------------------------- - - @Test - void testSuspensionPreventsEnqueue(TestNamespace ns) { - String entityId = UUID.randomUUID().toString(); - String entityFqn = ns.prefix("rq") + ".suspended.entity"; - try { - SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true); - assertTrue(SearchIndexRetryQueue.isSuspendAllStreaming()); - - // Enqueue should still insert (suspension affects worker processing, not enqueueing) - SearchIndexRetryQueue.enqueue(entityId, entityFqn, "during suspension"); - List records = - retryQueueDAO.findByStatus(SearchIndexRetryQueue.STATUS_PENDING, 1000); - assertTrue(records.stream().anyMatch(r -> r.getEntityId().equals(entityId))); - } finally { - SearchIndexRetryQueue.clearSuspension(); - retryQueueDAO.deleteByEntity(entityId, entityFqn); - } - } - - @Test - void testWorkerDeletesRecordsDuringSuspendAll(TestNamespace ns) throws Exception { - String entityId = UUID.randomUUID().toString(); - String entityFqn = ns.prefix("rq") + ".suspended.entity"; - - retryQueueDAO.upsert( - entityId, entityFqn, "will be suspended", SearchIndexRetryQueue.STATUS_PENDING, "table"); - - try { - SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true); - - SearchIndexRetryWorker worker = new SearchIndexRetryWorker(collectionDAO, searchRepository); - worker.start(); - try { - Awaitility.await("Worker should delete record during full suspension") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until( - () -> { - List remaining = - retryQueueDAO.findByStatuses( - List.of( - SearchIndexRetryQueue.STATUS_PENDING, - SearchIndexRetryQueue.STATUS_IN_PROGRESS), - 1000); - return remaining.stream().noneMatch(r -> r.getEntityId().equals(entityId)); - }); - } finally { - worker.stop(); - } - } finally { - SearchIndexRetryQueue.clearSuspension(); - } - } - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index 7e90a04dc8a7..d6f1434cfc9b 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -6882,7 +6882,6 @@ private void triggerWorkflow_SDK(TestNamespace ns, List localTestTables) String workflowName = "DataCompletenessWorkflow"; OpenMetadataClient client = SdkClients.adminClient(); - waitForWorkflowDeployment(client, workflowName); for (Table table : localTestTables) { waitForEntityIndexedInSearch(client, "table_search_index", table.getFullyQualifiedName()); } @@ -7000,7 +6999,7 @@ private Domain getOrCreateDomain(TestNamespace ns) { } private void waitForWorkflowDeployment(OpenMetadataClient client, String workflowName) { - await() + await("workflow '" + workflowName + "' to finish Flowable deployment") .atMost(Duration.ofSeconds(120)) .pollDelay(Duration.ofSeconds(1)) .pollInterval(Duration.ofSeconds(2)) @@ -7015,7 +7014,7 @@ private void waitForWorkflowDeployment(OpenMetadataClient client, String workflo private void waitForEntityIndexedInSearch( OpenMetadataClient client, String indexName, String entityFqn) { - await() + await("entity '" + entityFqn + "' to appear in " + indexName) .atMost(Duration.ofSeconds(120)) .pollDelay(Duration.ofSeconds(1)) .pollInterval(Duration.ofSeconds(2)) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java index b98f48161ddb..d90763ae9036 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java @@ -99,6 +99,13 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess } if (shouldPromote) { + // Always clear staged-index routing on the way out, regardless of outcome: + // - swap success → alias now points at staged; canonical and staged resolve to the + // same index, so unregistering keeps reads/writes consistent. + // - swap failure / empty aliases / exception → leaving routing active would silently + // send live writes to a staged index nothing reads from, which + // is strictly worse than the writes going back to the canonical + // alias target. Operators need to retry the reindex either way. try { Set aliasesToAttach = new HashSet<>(); @@ -148,6 +155,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess entityType); return; } + } else { + LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType); } LOG.info( @@ -180,6 +189,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess if (metrics != null) { metrics.recordPromotionFailure(entityType); } + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } else { try { @@ -196,6 +207,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess stagedIndex, entityType, ex); + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } } @@ -262,10 +275,13 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc stagedIndex, entityType, ex); + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } return; } + // Always clear staged-index routing on the way out — see the rationale in finalizeReindex. try { Set aliasesToAttach = getAliasesFromMapping(indexMapping, searchRepository.getClusterAlias()); @@ -340,6 +356,8 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc if (promoteMetrics != null) { promoteMetrics.recordPromotionFailure(entityType); } + } finally { + searchRepository.unregisterStagedIndex(entityType, stagedIndex); } } @@ -422,6 +440,7 @@ protected void recreateIndexFromMapping( String stagedIndexName = buildStagedIndexName(canonicalIndexName); searchClient.createIndex(stagedIndexName, mappingContent); + searchRepository.registerStagedIndex(entityType, stagedIndexName); Set existingAliases = activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java index 2f04cbb0d4eb..eccb11ef8e5f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java @@ -1,12 +1,7 @@ package org.openmetadata.service.search; import io.micrometer.core.instrument.Metrics; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; import org.openmetadata.service.Entity; @@ -24,10 +19,6 @@ public final class SearchIndexRetryQueue { private static final int MAX_REASON_LENGTH = 8192; - private static final AtomicReference> SUSPENDED_ENTITY_TYPES = - new AtomicReference<>(Collections.emptySet()); - private static final AtomicBoolean SUSPEND_ALL_STREAMING = new AtomicBoolean(false); - private SearchIndexRetryQueue() {} public static void enqueue(EntityInterface entity, String operation, Throwable failure) { @@ -117,46 +108,6 @@ public static boolean isRetryableStatusCode(int status) { return status < 400; } - public static void updateSuspension(Set entityTypes, boolean suspendAll) { - Set normalized = new HashSet<>(); - for (String entityType : entityTypes == null ? Collections.emptySet() : entityTypes) { - String normalizedType = normalize(entityType); - if (!normalizedType.isEmpty()) { - normalized.add(normalizedType); - } - } - - // Set entity types before the boolean so that isEntityTypeSuspended never - // sees suspendAll=false with an outdated (empty) entity-types set. - SUSPENDED_ENTITY_TYPES.set(Collections.unmodifiableSet(normalized)); - SUSPEND_ALL_STREAMING.set(suspendAll); - } - - public static void clearSuspension() { - SUSPEND_ALL_STREAMING.set(false); - SUSPENDED_ENTITY_TYPES.set(Collections.emptySet()); - } - - public static boolean isEntityTypeSuspended(String entityType) { - if (SUSPEND_ALL_STREAMING.get()) { - return true; - } - String normalized = normalize(entityType); - return !normalized.isEmpty() && SUSPENDED_ENTITY_TYPES.get().contains(normalized); - } - - public static boolean isStreamingSuspended() { - return SUSPEND_ALL_STREAMING.get() || !SUSPENDED_ENTITY_TYPES.get().isEmpty(); - } - - public static boolean isSuspendAllStreaming() { - return SUSPEND_ALL_STREAMING.get(); - } - - public static Set getSuspendedEntityTypes() { - return SUSPENDED_ENTITY_TYPES.get(); - } - private static String truncate(String value) { if (value == null) { return null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java index 1c4348d0c817..7fa3fb79ecb8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java @@ -1,7 +1,6 @@ package org.openmetadata.service.search; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_FAILED; -import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_1; import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_2; import static org.openmetadata.service.search.SearchIndexRetryQueue.normalize; @@ -25,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.EntityInterface; -import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; @@ -35,7 +33,6 @@ import org.openmetadata.service.apps.bundles.searchIndex.BulkSink; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; -import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexJobDAO.SearchIndexJobRecord; import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexRetryQueueDAO.SearchIndexRetryRecord; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.workflows.searchIndex.ReindexingUtil; @@ -63,27 +60,18 @@ public class SearchIndexRetryWorker implements Managed { private static final int MAX_CASCADE_REINDEX = 5000; private static final int CASCADE_BATCH_SIZE = 200; private static final int MAX_BACKOFF_SECONDS = 60; - private static final int SUSPENSION_REFRESH_INTERVAL_MS = 5000; private static final int CANDIDATE_TYPES_REFRESH_INTERVAL_MS = 60000; private static final long STALE_RECOVERY_INTERVAL_MS = 60_000; private static final long STALE_THRESHOLD_MS = 10 * 60 * 1000; - private static final List ACTIVE_REINDEX_JOB_STATUSES = - List.of("RUNNING", "READY", "STOPPING"); - private static final List PURGEABLE_QUEUE_STATUSES = - List.of(STATUS_PENDING, STATUS_PENDING_RETRY_1, STATUS_PENDING_RETRY_2, STATUS_FAILED); - private final CollectionDAO collectionDAO; private final SearchRepository searchRepository; private final AtomicBoolean running = new AtomicBoolean(false); private final List workerThreads = new ArrayList<>(); - private final Object scopeRefreshLock = new Object(); private final Object candidateTypesLock = new Object(); private final Object staleRecoveryLock = new Object(); - private volatile long lastScopeRefreshAt; private volatile long lastStaleRecoveryAt; - private volatile String activeScopeSignature = ""; private volatile long candidateTypesLastRefreshAt; private volatile List cachedCandidateEntityTypes = Collections.emptyList(); private final AtomicInteger consecutiveUnavailableCount = new AtomicInteger(); @@ -137,7 +125,6 @@ public void stop() { } } workerThreads.clear(); - SearchIndexRetryQueue.clearSuspension(); LOG.info("Stopped search index retry worker"); } @@ -148,7 +135,6 @@ public void stop() { private void runLoop(int workerId) { while (running.get()) { try { - refreshReindexSuspensionScopeIfNeeded(); recoverStaleInProgressIfNeeded(); if (!waitForClientAvailability(workerId)) { @@ -181,22 +167,8 @@ private void runLoop(int workerId) { private void processRecord(SearchIndexRetryRecord record) { try { - if (SearchIndexRetryQueue.isSuspendAllStreaming()) { - collectionDAO - .searchIndexRetryQueueDAO() - .deleteByEntity(record.getEntityId(), record.getEntityFqn()); - return; - } - EntityReference root = resolveEntityReference(record); if (root != null) { - if (SearchIndexRetryQueue.isEntityTypeSuspended(root.getType())) { - collectionDAO - .searchIndexRetryQueueDAO() - .deleteByEntity(record.getEntityId(), record.getEntityFqn()); - return; - } - reindexEntityCascade(root); collectionDAO .searchIndexRetryQueueDAO() @@ -642,82 +614,9 @@ private int extractSearchStatusCode(Throwable t) { } // --------------------------------------------------------------------------- - // Suspension and scheduling + // Scheduling // --------------------------------------------------------------------------- - private void refreshReindexSuspensionScopeIfNeeded() { - long now = System.currentTimeMillis(); - if (now - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) { - return; - } - - synchronized (scopeRefreshLock) { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) { - return; - } - lastScopeRefreshAt = currentTime; - - List activeJobs = - collectionDAO.searchIndexJobDAO().findByStatusesWithLimit(ACTIVE_REINDEX_JOB_STATUSES, 1); - - if (activeJobs.isEmpty()) { - if (!activeScopeSignature.isEmpty() || SearchIndexRetryQueue.isStreamingSuspended()) { - SearchIndexRetryQueue.clearSuspension(); - activeScopeSignature = ""; - LOG.info("Cleared live search indexing suspension - no active reindex jobs"); - } - return; - } - - SearchIndexJobRecord activeJob = activeJobs.getFirst(); - EventPublisherJob jobConfiguration = null; - try { - if (activeJob.jobConfiguration() != null) { - jobConfiguration = - JsonUtils.readValue(activeJob.jobConfiguration(), EventPublisherJob.class); - } - } catch (Exception e) { - LOG.warn("Failed to parse job configuration for active reindex job {}", activeJob.id(), e); - } - - Set requestedEntities = - normalizeReindexEntities( - jobConfiguration != null ? jobConfiguration.getEntities() : null); - Set searchableEntities = searchRepository.getSearchEntities(); - - boolean containsAllToken = requestedEntities.stream().anyMatch("all"::equalsIgnoreCase); - Set suspendedTypes = - containsAllToken ? new HashSet<>(searchableEntities) : new HashSet<>(requestedEntities); - suspendedTypes.retainAll(searchableEntities); - - boolean suspendAll = - !searchableEntities.isEmpty() && suspendedTypes.containsAll(searchableEntities); - String newSignature = buildScopeSignature(activeJob.id(), suspendedTypes, suspendAll); - - if (newSignature.equals(activeScopeSignature)) { - return; - } - - activeScopeSignature = newSignature; - SearchIndexRetryQueue.updateSuspension(suspendedTypes, suspendAll); - - if (suspendAll) { - int purged = - collectionDAO.searchIndexRetryQueueDAO().deleteByStatuses(PURGEABLE_QUEUE_STATUSES); - LOG.info( - "Activated live search indexing suspension for all entity types using reindex job {} and purged {} retry queue rows", - activeJob.id(), - purged); - } else { - LOG.info( - "Activated live search indexing suspension for {} entity types using reindex job {}", - suspendedTypes.size(), - activeJob.id()); - } - } - } - private void recoverStaleInProgressIfNeeded() { long now = System.currentTimeMillis(); if (now - lastStaleRecoveryAt < STALE_RECOVERY_INTERVAL_MS) { @@ -744,26 +643,6 @@ private void recoverStaleInProgressIfNeeded() { } } - private Set normalizeReindexEntities(Set rawEntities) { - Set normalized = new HashSet<>(); - if (rawEntities == null) { - return normalized; - } - for (String entityType : rawEntities) { - String value = SearchIndexRetryQueue.normalize(entityType); - if (!value.isEmpty()) { - normalized.add(value); - } - } - return normalized; - } - - private String buildScopeSignature(String jobId, Set suspendedTypes, boolean suspendAll) { - List sorted = new ArrayList<>(suspendedTypes); - Collections.sort(sorted); - return jobId + "|" + suspendAll + "|" + String.join(",", sorted); - } - // --------------------------------------------------------------------------- // Utilities // --------------------------------------------------------------------------- diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 9028199b2b36..3db14f3bccab 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -77,6 +77,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -154,6 +155,18 @@ public class SearchRepository { @Getter private Map entityIndexMap; + /** + * Staged index names being populated by an in-flight reindex, keyed by the canonical index name + * the alias normally points at (e.g. {@code openmetadata_table_search_index}). While an entry + * is present, every live write that resolves through {@link #getWriteIndexName(IndexMapping)} + * targets the staged index directly — so the writes survive the final alias swap that would + * otherwise promote a pre-snapshot index and drop the old one that held them. + * + *

Keying by canonical index name lets any write site route correctly even if it does not + * have the entity type in scope (deletes by FQN prefix, script updates, child propagation, …). + */ + private final Map activeStagedIndices = new ConcurrentHashMap<>(); + private final String language; @Getter @Setter public SearchIndexFactory searchIndexFactory = new SearchIndexFactory(); @@ -234,8 +247,15 @@ public SearchRepository(ElasticSearchConfiguration config, int maxDBConnections) */ private void registerSearchIndexHandler() { try { + EntityLifecycleEventDispatcher dispatcher = EntityLifecycleEventDispatcher.getInstance(); SearchIndexHandler searchHandler = new SearchIndexHandler(this); - EntityLifecycleEventDispatcher.getInstance().registerHandler(searchHandler); + // Drop any stale handler bound to a previous SearchRepository instance. Test suites and + // app bootstrap construct SearchRepository more than once and replace the singleton via + // Entity.setSearchRepository(...); without this the dispatcher keeps delivering events to + // the first instance and state maintained on the current instance (e.g. activeStagedIndices + // used for reindex write-routing) is never consulted. + dispatcher.unregisterHandler(searchHandler.getHandlerName()); + dispatcher.registerHandler(searchHandler); LOG.info("Successfully registered SearchIndexHandler for entity lifecycle events"); } catch (Exception e) { LOG.error("Failed to register SearchIndexHandler", e); @@ -486,6 +506,141 @@ public IndexMapping getIndexMapping(String entityType) { return entityIndexMap.get(entityType); } + /** + * Register a staged index as the live-write target for {@code entityType} while a reindex + * populates it. Must be paired with {@link #unregisterStagedIndex(String, String)} once the + * alias swap is complete so writes go back through the canonical alias. + */ + public void registerStagedIndex(String entityType, String stagedIndex) { + if (entityType == null || stagedIndex == null) { + return; + } + String canonical = canonicalIndexFor(entityType); + if (canonical == null) { + LOG.warn( + "Cannot register staged index '{}' for entity '{}': no IndexMapping found", + stagedIndex, + entityType); + return; + } + activeStagedIndices.put(canonical, stagedIndex); + LOG.info( + "Routing live writes for canonical index '{}' (entity '{}') to staged index '{}' until reindex promotes it", + canonical, + entityType, + stagedIndex); + } + + /** Clear the staged-index routing for {@code entityType} if it matches {@code stagedIndex}. */ + public void unregisterStagedIndex(String entityType, String stagedIndex) { + if (entityType == null || stagedIndex == null) { + return; + } + String canonical = canonicalIndexFor(entityType); + if (canonical == null) { + return; + } + if (activeStagedIndices.remove(canonical, stagedIndex)) { + LOG.info( + "Cleared staged-index routing for canonical index '{}' (entity '{}', was '{}')", + canonical, + entityType, + stagedIndex); + } + } + + private String canonicalIndexFor(String entityType) { + IndexMapping mapping = entityIndexMap.get(entityType); + return mapping != null ? mapping.getIndexName(clusterAlias) : null; + } + + /** + * Centralized resolution of the index name a live write should target. Every write path that + * ultimately calls {@link IndexMapping#getIndexName(String)} should route through this method + * so it transparently picks up the staged index when a reindex is in flight. Returns the + * canonical index name when nothing is staged. + */ + public String getWriteIndexName(IndexMapping indexMapping) { + if (indexMapping == null) { + return null; + } + String canonical = indexMapping.getIndexName(clusterAlias); + return routeToStagedIfActive(canonical); + } + + /** + * Centralized routing for writes that already hold a resolved canonical index name — i.e. the + * value of {@link IndexMapping#getIndexName(String)}, NOT a short alias such as the result of + * {@link IndexMapping#getAlias(String)} or any of the parent aliases. If a reindex has + * registered a staged index for {@code canonicalIndexName}, returns the staged name; otherwise + * returns {@code canonicalIndexName} unchanged. Pass non-canonical aliases through unchanged + * since the routing map only knows about canonical names. + */ + public String routeToStagedIfActive(String canonicalIndexName) { + if (canonicalIndexName == null) { + return null; + } + String staged = activeStagedIndices.get(canonicalIndexName); + return staged != null ? staged : canonicalIndexName; + } + + /** + * @deprecated Use {@link #getWriteIndexName(IndexMapping)} directly. The {@code entityType} + * argument is ignored; the canonical name is resolved from the supplied {@code + * indexMapping}. + */ + @Deprecated(forRemoval = true) + public String resolveWriteIndex(String entityType, IndexMapping indexMapping) { + return getWriteIndexName(indexMapping); + } + + /** + * Returns the index targets a write should fan out to so it survives an in-flight reindex. + * + *

    + *
  • When {@code aliasOrIndex} is a known canonical entity index name (i.e. matches the + * value of {@link IndexMapping#getIndexName(String)} for some registered entity), the + * result is the input plus the single staged index for that entity (if any). Avoids + * fanning out an entity-scoped update-by-query — e.g. {@code updateDomainFqnByPrefix} + * targeting only the domain index — onto unrelated staged indices. + *
  • When {@code aliasOrIndex} is a multi-entity alias such as {@code GLOBAL_SEARCH_ALIAS} + * or {@code DATA_ASSET_SEARCH_ALIAS}, the result is the input plus every currently + * staged index, since the original update can match documents whose owning entity + * type is being reindexed. + *
+ * + *

Without this fan-out, update-by-query operations rooted on shared aliases would update + * only the about-to-be-discarded active index and lose their effect on alias swap. + */ + public List getWriteFanoutTargets(String aliasOrIndex) { + if (aliasOrIndex == null) { + return new ArrayList<>(activeStagedIndices.values()); + } + List targets = new ArrayList<>(); + targets.add(aliasOrIndex); + if (isKnownCanonicalIndex(aliasOrIndex)) { + String staged = activeStagedIndices.get(aliasOrIndex); + if (staged != null) { + targets.add(staged); + } + } else { + targets.addAll(activeStagedIndices.values()); + } + return targets; + } + + private boolean isKnownCanonicalIndex(String name) { + if (entityIndexMap == null || name == null) { + return false; + } + for (IndexMapping mapping : entityIndexMap.values()) { + if (mapping != null && name.equals(mapping.getIndexName(clusterAlias))) { + return true; + } + } + return false; + } + public String getIndexOrAliasName(String name) { if (clusterAlias == null || clusterAlias.isEmpty()) { return name; @@ -635,7 +790,7 @@ public void createEntityIndex(EntityInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createEntity(indexMapping.getIndexName(clusterAlias), entityId, doc); + searchClient.createEntity(getWriteIndexName(indexMapping), entityId, doc); if (Entity.TABLE.equals(entityType)) { indexTableColumns((Table) entity); @@ -686,7 +841,7 @@ private void indexTableColumns(Table table) { if (!docs.isEmpty()) { try { - searchClient.createEntities(columnIndexMapping.getIndexName(clusterAlias), docs); + searchClient.createEntities(getWriteIndexName(columnIndexMapping), docs); } catch (Exception e) { LOG.error( "Issue bulk indexing columns for table [{}]: {}", @@ -708,7 +863,7 @@ private void deleteTableColumns(Table table) { try { searchClient.deleteEntityByFields( - List.of(columnIndexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(columnIndexMapping)), List.of(new ImmutablePair<>("table.id", table.getId().toString()))); } catch (Exception e) { LOG.error( @@ -811,7 +966,7 @@ private void updateTableColumnsInheritedFields(Table table) { // Use updateChildren to efficiently update all columns for this table searchClient.updateChildren( - List.of(columnIndexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(columnIndexMapping)), new ImmutablePair<>("table.id", table.getId().toString()), new ImmutablePair<>(DEFAULT_UPDATE_SCRIPT, inheritedFields)); @@ -852,13 +1007,6 @@ public void createEntitiesIndex(List entities) throws IOExcepti String entityType = entities.getFirst().getEntityReference().getType(); Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); try { - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping live search indexing for {} entities because reindex is active for {}", - entities.size(), - entityType); - return; - } if (!getSearchClient().isClientAvailable()) { for (EntityInterface entity : entities) { SearchIndexRetryQueue.enqueue( @@ -888,7 +1036,7 @@ public void createEntitiesIndex(List entities) throws IOExcepti return; } - searchClient.createEntities(indexMapping.getIndexName(clusterAlias), docs); + searchClient.createEntities(getWriteIndexName(indexMapping), docs); if (Entity.TABLE.equals(entityType)) { indexColumnsForTables(entities); @@ -907,7 +1055,7 @@ private void indexColumnsForTables(List entities) { return; } - String indexName = columnIndexMapping.getIndexName(clusterAlias); + String indexName = getWriteIndexName(columnIndexMapping); List> allColumnDocs = new ArrayList<>(); for (EntityInterface entity : entities) { @@ -972,7 +1120,7 @@ public void createTimeSeriesEntity(EntityTimeSeriesInterface entity) { IndexMapping indexMapping = entityIndexMap.get(entityType); SearchIndex index = searchIndexFactory.buildIndex(entityType, entity); String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc()); - searchClient.createTimeSeriesEntity(indexMapping.getIndexName(clusterAlias), entityId, doc); + searchClient.createTimeSeriesEntity(getWriteIndexName(indexMapping), entityId, doc); } catch (Exception ie) { SearchIndexRetryQueue.enqueue( entityId, @@ -1003,7 +1151,7 @@ public void updateTimeSeriesEntity(EntityTimeSeriesInterface entityTimeSeries) { searchIndexFactory.buildIndex(entityType, entityTimeSeries); Map doc = elasticSearchIndex.buildSearchIndexDoc(); searchClient.updateEntity( - indexMapping.getIndexName(clusterAlias), entityId, doc, DEFAULT_UPDATE_SCRIPT); + getWriteIndexName(indexMapping), entityId, doc, DEFAULT_UPDATE_SCRIPT); } catch (RuntimeException e) { SearchIndexRetryQueue.enqueue( entityId, @@ -1086,7 +1234,7 @@ public void updateEntityIndex(EntityInterface entity) { doc, SearchClusterMetrics.DEFAULT_BULK_PAYLOAD_SIZE_BYTES, entityId, entityType); } - searchClient.updateEntity(indexMapping.getIndexName(clusterAlias), entityId, doc, scriptTxt); + searchClient.updateEntity(getWriteIndexName(indexMapping), entityId, doc, scriptTxt); if (Entity.TABLE.equals(entityType)) { try { @@ -1161,7 +1309,7 @@ public void updateEntityIndex(EntityInterface entity) { public void bulkIndexPipelineExecutions( Pipeline pipeline, List pipelineStatuses) { try { - String indexName = getIndexOrAliasName("pipeline_status_search_index"); + String indexName = routeToStagedIfActive(getIndexOrAliasName("pipeline_status_search_index")); List> docsAndIds = new ArrayList<>(); for (PipelineStatus pipelineStatus : pipelineStatuses) { PipelineExecutionIndex pipelineExecutionIndex = @@ -1250,14 +1398,6 @@ public void updateEntitiesIndex(List entities) { String entityType = entry.getKey(); List typeEntities = entry.getValue(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping bulk live indexing for {} entities because reindex is active for {}", - typeEntities.size(), - entityType); - continue; - } - if (!getSearchClient().isClientAvailable()) { for (EntityInterface entity : typeEntities) { SearchIndexRetryQueue.enqueue( @@ -1366,12 +1506,6 @@ public void updateEntitiesBulk(List entities) { public void updateAssetDomainsForDataProduct( String dataProductFqn, List oldDomainFqns, List newDomains) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DATA_PRODUCT)) { - LOG.debug( - "Skipping updateAssetDomainsForDataProduct because reindex is active for {}", - Entity.DATA_PRODUCT); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, dataProductFqn, "updateAssetDomainsForDataProduct: Search client unavailable"); @@ -1393,11 +1527,6 @@ public void updateAssetDomainsByIds( List assetIds, List oldDomainFqns, List newDomains) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DATA_PRODUCT)) { - LOG.debug( - "Skipping updateAssetDomainsByIds because reindex is active for {}", Entity.DATA_PRODUCT); - return; - } if (!getSearchClient().isClientAvailable()) { for (UUID assetId : listOrEmpty(assetIds)) { SearchIndexRetryQueue.enqueue( @@ -1423,10 +1552,6 @@ public void updateAssetDomainsByIds( public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DOMAIN)) { - LOG.debug("Skipping updateDomainFqnByPrefix because reindex is active for {}", Entity.DOMAIN); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, newFqn, "updateDomainFqnByPrefix: Search client unavailable"); @@ -1445,11 +1570,6 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { Timer.Sample s = RequestLatencyContext.startSearchOperation(); - if (SearchIndexRetryQueue.isEntityTypeSuspended(Entity.DOMAIN)) { - LOG.debug( - "Skipping updateAssetDomainFqnByPrefix because reindex is active for {}", Entity.DOMAIN); - return; - } if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue( null, newFqn, "updateAssetDomainFqnByPrefix: Search client unavailable"); @@ -1547,11 +1667,11 @@ private void propagateToDomainChildren( String domainId, IndexMapping indexMapping, Pair> updates) throws IOException { searchClient.updateChildren( - List.of(indexMapping.getIndexName(clusterAlias)), + List.of(getWriteIndexName(indexMapping)), new ImmutablePair<>(PARENT_ID, domainId), updates); searchClient.updateChildren( - List.of(entityIndexMap.get(Entity.DATA_PRODUCT).getIndexName(clusterAlias)), + List.of(getWriteIndexName(entityIndexMap.get(Entity.DATA_PRODUCT))), new ImmutablePair<>(DOMAINS_ID, domainId), updates); } @@ -1665,7 +1785,7 @@ private AssetCertification getCertificationFromEntity(EntityInterface entity) { private void updateEntityCertificationInSearch( EntityInterface entity, AssetCertification certification) { IndexMapping indexMapping = entityIndexMap.get(entity.getEntityReference().getType()); - String indexName = indexMapping.getIndexName(clusterAlias); + String indexName = getWriteIndexName(indexMapping); Map paramMap = new HashMap<>(); if (certification != null && certification.getTagLabel() != null) { @@ -1691,7 +1811,7 @@ public void propagateToRelatedEntities( EntityInterface entity) { if (changeDescription != null && entityType.equalsIgnoreCase(Entity.PAGE)) { - String indexName = indexMapping.getIndexName(clusterAlias); + String indexName = getWriteIndexName(indexMapping); for (FieldChange field : changeDescription.getFieldsAdded()) { if (field.getName().contains(PARENT)) { String oldParentFQN = entity.getName(); @@ -2088,7 +2208,7 @@ public void deleteByScript(String entityType, String scriptTxt, Map getSearchEntities() { private boolean shouldSkipStreamingIndexing( String entityType, String entityId, String entityFqn, String operation) { - if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) { - LOG.debug( - "Skipping live search indexing operation {} for entityType {} because reindex is active", - operation, - entityType); - return true; - } - if (!getSearchClient().isClientAvailable()) { SearchIndexRetryQueue.enqueue(entityId, entityFqn, operation + ": Search client unavailable"); return true; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java index 0076c4ff8563..f2ef03d6d5d5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchEntityManager.java @@ -1086,7 +1086,11 @@ public void updateAssetDomainsForDataProduct( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(termQuery) .conflicts(Conflicts.Proceed) .script( @@ -1167,7 +1171,7 @@ public void updateAssetDomainsByIds( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(idsQuery) .conflicts(Conflicts.Proceed) .script( @@ -1236,7 +1240,7 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(domainIndexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(domainIndexName)) .query(combinedQuery) .conflicts(Conflicts.Proceed) .script( @@ -1295,7 +1299,7 @@ public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(matchingDomainQuery) .conflicts(Conflicts.Proceed) .script( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java index bdda727d6402..566a173fe819 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchEntityManager.java @@ -1182,7 +1182,11 @@ public void updateAssetDomainsForDataProduct( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(termQuery) .conflicts(Conflicts.Proceed) .script( @@ -1261,7 +1265,11 @@ public void updateAssetDomainsByIds( UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)) + req.index( + Entity.getSearchRepository() + .getWriteFanoutTargets( + Entity.getSearchRepository() + .getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))) .query(idsQuery) .conflicts(Conflicts.Proceed) .script( @@ -1332,7 +1340,7 @@ public void updateDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(domainIndexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(domainIndexName)) .query(combinedQuery) .conflicts(Conflicts.Proceed) .script( @@ -1394,7 +1402,7 @@ public void updateAssetDomainFqnByPrefix(String oldFqn, String newFqn) { UpdateByQueryResponse updateResponse = client.updateByQuery( req -> - req.index(indexName) + req.index(Entity.getSearchRepository().getWriteFanoutTargets(indexName)) .query(matchingDomainQuery) .conflicts(Conflicts.Proceed) .script(