diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java
index 0ada0b23640a..e5507bee3ad0 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java
@@ -230,7 +230,14 @@ private void startDatabase() {
mysql.withDatabaseName("openmetadata");
mysql.withUsername("test");
mysql.withPassword("test");
- mysql.withCommand("mysqld", "--max_allowed_packet=" + mysqlMaxAllowedPacket);
+ mysql.withCommand(
+ "mysqld",
+ "--max_allowed_packet=" + mysqlMaxAllowedPacket,
+ // The tag list query (TagDAO.listAfter) joins three tables and sorts by tag.name,
+ // tag.id; under the parallel-tests fork the tag table grows large and the default
+ // 256KB sort_buffer_size overflows with "Out of sort memory" (#27649). 8MB is plenty
+ // for an integration-test workload and well under the 4GB overall limit.
+ "--sort_buffer_size=8M");
mysql.withStartupTimeoutSeconds(240);
mysql.withConnectTimeoutSeconds(240);
mysql.withTmpFs(java.util.Map.of("/var/lib/mysql", "rw,size=2g"));
@@ -278,7 +285,12 @@ private void startDatabase() {
"-c",
"synchronous_commit=off",
"-c",
- "full_page_writes=off");
+ "full_page_writes=off",
+ // Bump work_mem for the same reason MySQL gets a larger sort_buffer above:
+ // TagDAO.listAfter joins three tables and sorts; default 4MB spills to temp files
+ // under load.
+ "-c",
+ "work_mem=32MB");
postgres.withTmpFs(java.util.Map.of("/var/lib/postgresql/data", "rw,size=2g"));
postgres.withCreateContainerCmdModifier(
cmd ->
diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java
index dec08acab14e..065f9df593e0 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java
@@ -4378,7 +4378,7 @@ void test_bulkCreateOrUpdate_searchIndexed(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();
Awaitility.await()
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(2))
.ignoreExceptions()
@@ -4440,7 +4440,7 @@ void test_bulkUpdate_searchIndexUpdated(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();
Awaitility.await()
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptions()
@@ -5130,7 +5130,7 @@ void checkDeletedEntity(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
@@ -5166,7 +5166,7 @@ void checkIndexCreated(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
@@ -5194,7 +5194,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
@@ -5213,7 +5213,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
Awaitility.await("Wait for search to reflect update")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
- .atMost(Duration.ofSeconds(90))
+ .atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java
index 8a2f50f062af..7c6107efe2ef 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java
@@ -17,6 +17,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.jupiter.api.parallel.Isolated;
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
import org.openmetadata.it.factories.GlossaryTermTestFactory;
import org.openmetadata.it.factories.GlossaryTestFactory;
@@ -41,10 +42,13 @@
*
*
Test isolation: Uses TestNamespace for unique entity naming.
*
- *
Parallelization: Runs with @Execution(ExecutionMode.SAME_THREAD) because each test
- * blocks a server thread on synchronous Fuseki writes; concurrent execution can exhaust the
- * server thread pool and cause request timeouts.
+ *
Parallelization: Annotated {@code @Isolated} because {@link RdfUpdater} is a JVM-wide
+ * singleton. {@code @BeforeAll} flips it on, so any test class running concurrently starts doing
+ * synchronous Fuseki writes on every entity create — saturating the Dropwizard thread pool and
+ * causing 60s request timeouts (see issue #27649). {@code @Execution(SAME_THREAD)} alone only
+ * serializes within this class and does not prevent that cross-class leakage.
*/
+@Isolated
@Execution(ExecutionMode.SAME_THREAD)
@ExtendWith(TestNamespaceExtension.class)
public class GlossaryOntologyExportIT {
diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java
index e4cd66576be6..7348aa656d54 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java
@@ -793,64 +793,6 @@ void testEnqueuePreservesErrorDetailInFailureReason(TestNamespace ns) {
retryQueueDAO.deleteByEntity(entityId, entityFqn);
}
- // ---------------------------------------------------------------------------
- // Suspension tests
- // ---------------------------------------------------------------------------
-
- @Test
- void testSuspensionPreventsEnqueue(TestNamespace ns) {
- String entityId = UUID.randomUUID().toString();
- String entityFqn = ns.prefix("rq") + ".suspended.entity";
- try {
- SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);
- assertTrue(SearchIndexRetryQueue.isSuspendAllStreaming());
-
- // Enqueue should still insert (suspension affects worker processing, not enqueueing)
- SearchIndexRetryQueue.enqueue(entityId, entityFqn, "during suspension");
- List records =
- retryQueueDAO.findByStatus(SearchIndexRetryQueue.STATUS_PENDING, 1000);
- assertTrue(records.stream().anyMatch(r -> r.getEntityId().equals(entityId)));
- } finally {
- SearchIndexRetryQueue.clearSuspension();
- retryQueueDAO.deleteByEntity(entityId, entityFqn);
- }
- }
-
- @Test
- void testWorkerDeletesRecordsDuringSuspendAll(TestNamespace ns) throws Exception {
- String entityId = UUID.randomUUID().toString();
- String entityFqn = ns.prefix("rq") + ".suspended.entity";
-
- retryQueueDAO.upsert(
- entityId, entityFqn, "will be suspended", SearchIndexRetryQueue.STATUS_PENDING, "table");
-
- try {
- SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);
-
- SearchIndexRetryWorker worker = new SearchIndexRetryWorker(collectionDAO, searchRepository);
- worker.start();
- try {
- Awaitility.await("Worker should delete record during full suspension")
- .atMost(Duration.ofSeconds(30))
- .pollInterval(Duration.ofSeconds(1))
- .until(
- () -> {
- List remaining =
- retryQueueDAO.findByStatuses(
- List.of(
- SearchIndexRetryQueue.STATUS_PENDING,
- SearchIndexRetryQueue.STATUS_IN_PROGRESS),
- 1000);
- return remaining.stream().noneMatch(r -> r.getEntityId().equals(entityId));
- });
- } finally {
- worker.stop();
- }
- } finally {
- SearchIndexRetryQueue.clearSuspension();
- }
- }
-
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
index 7e90a04dc8a7..d6f1434cfc9b 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
@@ -6882,7 +6882,6 @@ private void triggerWorkflow_SDK(TestNamespace ns, List localTestTables)
String workflowName = "DataCompletenessWorkflow";
OpenMetadataClient client = SdkClients.adminClient();
- waitForWorkflowDeployment(client, workflowName);
for (Table table : localTestTables) {
waitForEntityIndexedInSearch(client, "table_search_index", table.getFullyQualifiedName());
}
@@ -7000,7 +6999,7 @@ private Domain getOrCreateDomain(TestNamespace ns) {
}
private void waitForWorkflowDeployment(OpenMetadataClient client, String workflowName) {
- await()
+ await("workflow '" + workflowName + "' to finish Flowable deployment")
.atMost(Duration.ofSeconds(120))
.pollDelay(Duration.ofSeconds(1))
.pollInterval(Duration.ofSeconds(2))
@@ -7015,7 +7014,7 @@ private void waitForWorkflowDeployment(OpenMetadataClient client, String workflo
private void waitForEntityIndexedInSearch(
OpenMetadataClient client, String indexName, String entityFqn) {
- await()
+ await("entity '" + entityFqn + "' to appear in " + indexName)
.atMost(Duration.ofSeconds(120))
.pollDelay(Duration.ofSeconds(1))
.pollInterval(Duration.ofSeconds(2))
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java
index b98f48161ddb..d90763ae9036 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java
@@ -99,6 +99,13 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
}
if (shouldPromote) {
+ // Always clear staged-index routing on the way out, regardless of outcome:
+ // - swap success → alias now points at staged; canonical and staged resolve to the
+ // same index, so unregistering keeps reads/writes consistent.
+ // - swap failure / empty aliases / exception → leaving routing active would silently
+ // send live writes to a staged index nothing reads from, which
+ // is strictly worse than the writes going back to the canonical
+ // alias target. Operators need to retry the reindex either way.
try {
Set aliasesToAttach = new HashSet<>();
@@ -148,6 +155,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
entityType);
return;
}
+ } else {
+ LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType);
}
LOG.info(
@@ -180,6 +189,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
if (metrics != null) {
metrics.recordPromotionFailure(entityType);
}
+ } finally {
+ searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
} else {
try {
@@ -196,6 +207,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
stagedIndex,
entityType,
ex);
+ } finally {
+ searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
}
}
@@ -262,10 +275,13 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
stagedIndex,
entityType,
ex);
+ } finally {
+ searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
return;
}
+ // Always clear staged-index routing on the way out — see the rationale in finalizeReindex.
try {
Set aliasesToAttach =
getAliasesFromMapping(indexMapping, searchRepository.getClusterAlias());
@@ -340,6 +356,8 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
if (promoteMetrics != null) {
promoteMetrics.recordPromotionFailure(entityType);
}
+ } finally {
+ searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
}
@@ -422,6 +440,7 @@ protected void recreateIndexFromMapping(
String stagedIndexName = buildStagedIndexName(canonicalIndexName);
searchClient.createIndex(stagedIndexName, mappingContent);
+ searchRepository.registerStagedIndex(entityType, stagedIndexName);
Set existingAliases =
activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>();
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java
index 2f04cbb0d4eb..eccb11ef8e5f 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java
@@ -1,12 +1,7 @@
package org.openmetadata.service.search;
import io.micrometer.core.instrument.Metrics;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.service.Entity;
@@ -24,10 +19,6 @@ public final class SearchIndexRetryQueue {
private static final int MAX_REASON_LENGTH = 8192;
- private static final AtomicReference> SUSPENDED_ENTITY_TYPES =
- new AtomicReference<>(Collections.emptySet());
- private static final AtomicBoolean SUSPEND_ALL_STREAMING = new AtomicBoolean(false);
-
private SearchIndexRetryQueue() {}
public static void enqueue(EntityInterface entity, String operation, Throwable failure) {
@@ -117,46 +108,6 @@ public static boolean isRetryableStatusCode(int status) {
return status < 400;
}
- public static void updateSuspension(Set entityTypes, boolean suspendAll) {
- Set normalized = new HashSet<>();
- for (String entityType : entityTypes == null ? Collections.emptySet() : entityTypes) {
- String normalizedType = normalize(entityType);
- if (!normalizedType.isEmpty()) {
- normalized.add(normalizedType);
- }
- }
-
- // Set entity types before the boolean so that isEntityTypeSuspended never
- // sees suspendAll=false with an outdated (empty) entity-types set.
- SUSPENDED_ENTITY_TYPES.set(Collections.unmodifiableSet(normalized));
- SUSPEND_ALL_STREAMING.set(suspendAll);
- }
-
- public static void clearSuspension() {
- SUSPEND_ALL_STREAMING.set(false);
- SUSPENDED_ENTITY_TYPES.set(Collections.emptySet());
- }
-
- public static boolean isEntityTypeSuspended(String entityType) {
- if (SUSPEND_ALL_STREAMING.get()) {
- return true;
- }
- String normalized = normalize(entityType);
- return !normalized.isEmpty() && SUSPENDED_ENTITY_TYPES.get().contains(normalized);
- }
-
- public static boolean isStreamingSuspended() {
- return SUSPEND_ALL_STREAMING.get() || !SUSPENDED_ENTITY_TYPES.get().isEmpty();
- }
-
- public static boolean isSuspendAllStreaming() {
- return SUSPEND_ALL_STREAMING.get();
- }
-
- public static Set getSuspendedEntityTypes() {
- return SUSPENDED_ENTITY_TYPES.get();
- }
-
private static String truncate(String value) {
if (value == null) {
return null;
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java
index 1c4348d0c817..7fa3fb79ecb8 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryWorker.java
@@ -1,7 +1,6 @@
package org.openmetadata.service.search;
import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_FAILED;
-import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING;
import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_1;
import static org.openmetadata.service.search.SearchIndexRetryQueue.STATUS_PENDING_RETRY_2;
import static org.openmetadata.service.search.SearchIndexRetryQueue.normalize;
@@ -25,7 +24,6 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
-import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
@@ -35,7 +33,6 @@
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
-import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexJobDAO.SearchIndexJobRecord;
import org.openmetadata.service.jdbi3.CollectionDAO.SearchIndexRetryQueueDAO.SearchIndexRetryRecord;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
@@ -63,27 +60,18 @@ public class SearchIndexRetryWorker implements Managed {
private static final int MAX_CASCADE_REINDEX = 5000;
private static final int CASCADE_BATCH_SIZE = 200;
private static final int MAX_BACKOFF_SECONDS = 60;
- private static final int SUSPENSION_REFRESH_INTERVAL_MS = 5000;
private static final int CANDIDATE_TYPES_REFRESH_INTERVAL_MS = 60000;
private static final long STALE_RECOVERY_INTERVAL_MS = 60_000;
private static final long STALE_THRESHOLD_MS = 10 * 60 * 1000;
- private static final List ACTIVE_REINDEX_JOB_STATUSES =
- List.of("RUNNING", "READY", "STOPPING");
- private static final List PURGEABLE_QUEUE_STATUSES =
- List.of(STATUS_PENDING, STATUS_PENDING_RETRY_1, STATUS_PENDING_RETRY_2, STATUS_FAILED);
-
private final CollectionDAO collectionDAO;
private final SearchRepository searchRepository;
private final AtomicBoolean running = new AtomicBoolean(false);
private final List workerThreads = new ArrayList<>();
- private final Object scopeRefreshLock = new Object();
private final Object candidateTypesLock = new Object();
private final Object staleRecoveryLock = new Object();
- private volatile long lastScopeRefreshAt;
private volatile long lastStaleRecoveryAt;
- private volatile String activeScopeSignature = "";
private volatile long candidateTypesLastRefreshAt;
private volatile List cachedCandidateEntityTypes = Collections.emptyList();
private final AtomicInteger consecutiveUnavailableCount = new AtomicInteger();
@@ -137,7 +125,6 @@ public void stop() {
}
}
workerThreads.clear();
- SearchIndexRetryQueue.clearSuspension();
LOG.info("Stopped search index retry worker");
}
@@ -148,7 +135,6 @@ public void stop() {
private void runLoop(int workerId) {
while (running.get()) {
try {
- refreshReindexSuspensionScopeIfNeeded();
recoverStaleInProgressIfNeeded();
if (!waitForClientAvailability(workerId)) {
@@ -181,22 +167,8 @@ private void runLoop(int workerId) {
private void processRecord(SearchIndexRetryRecord record) {
try {
- if (SearchIndexRetryQueue.isSuspendAllStreaming()) {
- collectionDAO
- .searchIndexRetryQueueDAO()
- .deleteByEntity(record.getEntityId(), record.getEntityFqn());
- return;
- }
-
EntityReference root = resolveEntityReference(record);
if (root != null) {
- if (SearchIndexRetryQueue.isEntityTypeSuspended(root.getType())) {
- collectionDAO
- .searchIndexRetryQueueDAO()
- .deleteByEntity(record.getEntityId(), record.getEntityFqn());
- return;
- }
-
reindexEntityCascade(root);
collectionDAO
.searchIndexRetryQueueDAO()
@@ -642,82 +614,9 @@ private int extractSearchStatusCode(Throwable t) {
}
// ---------------------------------------------------------------------------
- // Suspension and scheduling
+ // Scheduling
// ---------------------------------------------------------------------------
- private void refreshReindexSuspensionScopeIfNeeded() {
- long now = System.currentTimeMillis();
- if (now - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) {
- return;
- }
-
- synchronized (scopeRefreshLock) {
- long currentTime = System.currentTimeMillis();
- if (currentTime - lastScopeRefreshAt < SUSPENSION_REFRESH_INTERVAL_MS) {
- return;
- }
- lastScopeRefreshAt = currentTime;
-
- List activeJobs =
- collectionDAO.searchIndexJobDAO().findByStatusesWithLimit(ACTIVE_REINDEX_JOB_STATUSES, 1);
-
- if (activeJobs.isEmpty()) {
- if (!activeScopeSignature.isEmpty() || SearchIndexRetryQueue.isStreamingSuspended()) {
- SearchIndexRetryQueue.clearSuspension();
- activeScopeSignature = "";
- LOG.info("Cleared live search indexing suspension - no active reindex jobs");
- }
- return;
- }
-
- SearchIndexJobRecord activeJob = activeJobs.getFirst();
- EventPublisherJob jobConfiguration = null;
- try {
- if (activeJob.jobConfiguration() != null) {
- jobConfiguration =
- JsonUtils.readValue(activeJob.jobConfiguration(), EventPublisherJob.class);
- }
- } catch (Exception e) {
- LOG.warn("Failed to parse job configuration for active reindex job {}", activeJob.id(), e);
- }
-
- Set requestedEntities =
- normalizeReindexEntities(
- jobConfiguration != null ? jobConfiguration.getEntities() : null);
- Set searchableEntities = searchRepository.getSearchEntities();
-
- boolean containsAllToken = requestedEntities.stream().anyMatch("all"::equalsIgnoreCase);
- Set suspendedTypes =
- containsAllToken ? new HashSet<>(searchableEntities) : new HashSet<>(requestedEntities);
- suspendedTypes.retainAll(searchableEntities);
-
- boolean suspendAll =
- !searchableEntities.isEmpty() && suspendedTypes.containsAll(searchableEntities);
- String newSignature = buildScopeSignature(activeJob.id(), suspendedTypes, suspendAll);
-
- if (newSignature.equals(activeScopeSignature)) {
- return;
- }
-
- activeScopeSignature = newSignature;
- SearchIndexRetryQueue.updateSuspension(suspendedTypes, suspendAll);
-
- if (suspendAll) {
- int purged =
- collectionDAO.searchIndexRetryQueueDAO().deleteByStatuses(PURGEABLE_QUEUE_STATUSES);
- LOG.info(
- "Activated live search indexing suspension for all entity types using reindex job {} and purged {} retry queue rows",
- activeJob.id(),
- purged);
- } else {
- LOG.info(
- "Activated live search indexing suspension for {} entity types using reindex job {}",
- suspendedTypes.size(),
- activeJob.id());
- }
- }
- }
-
private void recoverStaleInProgressIfNeeded() {
long now = System.currentTimeMillis();
if (now - lastStaleRecoveryAt < STALE_RECOVERY_INTERVAL_MS) {
@@ -744,26 +643,6 @@ private void recoverStaleInProgressIfNeeded() {
}
}
- private Set normalizeReindexEntities(Set rawEntities) {
- Set normalized = new HashSet<>();
- if (rawEntities == null) {
- return normalized;
- }
- for (String entityType : rawEntities) {
- String value = SearchIndexRetryQueue.normalize(entityType);
- if (!value.isEmpty()) {
- normalized.add(value);
- }
- }
- return normalized;
- }
-
- private String buildScopeSignature(String jobId, Set suspendedTypes, boolean suspendAll) {
- List sorted = new ArrayList<>(suspendedTypes);
- Collections.sort(sorted);
- return jobId + "|" + suspendAll + "|" + String.join(",", sorted);
- }
-
// ---------------------------------------------------------------------------
// Utilities
// ---------------------------------------------------------------------------
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java
index 9028199b2b36..3db14f3bccab 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java
@@ -77,6 +77,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -154,6 +155,18 @@ public class SearchRepository {
@Getter private Map entityIndexMap;
+ /**
+ * Staged index names being populated by an in-flight reindex, keyed by the canonical index name
+ * the alias normally points at (e.g. {@code openmetadata_table_search_index}). While an entry
+ * is present, every live write that resolves through {@link #getWriteIndexName(IndexMapping)}
+ * targets the staged index directly — so the writes survive the final alias swap that would
+ * otherwise promote a pre-snapshot index and drop the old one that held them.
+ *
+ * Keying by canonical index name lets any write site route correctly even if it does not
+ * have the entity type in scope (deletes by FQN prefix, script updates, child propagation, …).
+ */
+ private final Map activeStagedIndices = new ConcurrentHashMap<>();
+
private final String language;
@Getter @Setter public SearchIndexFactory searchIndexFactory = new SearchIndexFactory();
@@ -234,8 +247,15 @@ public SearchRepository(ElasticSearchConfiguration config, int maxDBConnections)
*/
private void registerSearchIndexHandler() {
try {
+ EntityLifecycleEventDispatcher dispatcher = EntityLifecycleEventDispatcher.getInstance();
SearchIndexHandler searchHandler = new SearchIndexHandler(this);
- EntityLifecycleEventDispatcher.getInstance().registerHandler(searchHandler);
+ // Drop any stale handler bound to a previous SearchRepository instance. Test suites and
+ // app bootstrap construct SearchRepository more than once and replace the singleton via
+ // Entity.setSearchRepository(...); without this the dispatcher keeps delivering events to
+ // the first instance and state maintained on the current instance (e.g. activeStagedIndices
+ // used for reindex write-routing) is never consulted.
+ dispatcher.unregisterHandler(searchHandler.getHandlerName());
+ dispatcher.registerHandler(searchHandler);
LOG.info("Successfully registered SearchIndexHandler for entity lifecycle events");
} catch (Exception e) {
LOG.error("Failed to register SearchIndexHandler", e);
@@ -486,6 +506,141 @@ public IndexMapping getIndexMapping(String entityType) {
return entityIndexMap.get(entityType);
}
+ /**
+ * Register a staged index as the live-write target for {@code entityType} while a reindex
+ * populates it. Must be paired with {@link #unregisterStagedIndex(String, String)} once the
+ * alias swap is complete so writes go back through the canonical alias.
+ */
+ public void registerStagedIndex(String entityType, String stagedIndex) {
+ if (entityType == null || stagedIndex == null) {
+ return;
+ }
+ String canonical = canonicalIndexFor(entityType);
+ if (canonical == null) {
+ LOG.warn(
+ "Cannot register staged index '{}' for entity '{}': no IndexMapping found",
+ stagedIndex,
+ entityType);
+ return;
+ }
+ activeStagedIndices.put(canonical, stagedIndex);
+ LOG.info(
+ "Routing live writes for canonical index '{}' (entity '{}') to staged index '{}' until reindex promotes it",
+ canonical,
+ entityType,
+ stagedIndex);
+ }
+
+ /** Clear the staged-index routing for {@code entityType} if it matches {@code stagedIndex}. */
+ public void unregisterStagedIndex(String entityType, String stagedIndex) {
+ if (entityType == null || stagedIndex == null) {
+ return;
+ }
+ String canonical = canonicalIndexFor(entityType);
+ if (canonical == null) {
+ return;
+ }
+ if (activeStagedIndices.remove(canonical, stagedIndex)) {
+ LOG.info(
+ "Cleared staged-index routing for canonical index '{}' (entity '{}', was '{}')",
+ canonical,
+ entityType,
+ stagedIndex);
+ }
+ }
+
+ private String canonicalIndexFor(String entityType) {
+ IndexMapping mapping = entityIndexMap.get(entityType);
+ return mapping != null ? mapping.getIndexName(clusterAlias) : null;
+ }
+
+ /**
+ * Centralized resolution of the index name a live write should target. Every write path that
+ * ultimately calls {@link IndexMapping#getIndexName(String)} should route through this method
+ * so it transparently picks up the staged index when a reindex is in flight. Returns the
+ * canonical index name when nothing is staged.
+ */
+ public String getWriteIndexName(IndexMapping indexMapping) {
+ if (indexMapping == null) {
+ return null;
+ }
+ String canonical = indexMapping.getIndexName(clusterAlias);
+ return routeToStagedIfActive(canonical);
+ }
+
+ /**
+ * Centralized routing for writes that already hold a resolved canonical index name — i.e. the
+ * value of {@link IndexMapping#getIndexName(String)}, NOT a short alias such as the result of
+ * {@link IndexMapping#getAlias(String)} or any of the parent aliases. If a reindex has
+ * registered a staged index for {@code canonicalIndexName}, returns the staged name; otherwise
+ * returns {@code canonicalIndexName} unchanged. Pass non-canonical aliases through unchanged
+ * since the routing map only knows about canonical names.
+ */
+ public String routeToStagedIfActive(String canonicalIndexName) {
+ if (canonicalIndexName == null) {
+ return null;
+ }
+ String staged = activeStagedIndices.get(canonicalIndexName);
+ return staged != null ? staged : canonicalIndexName;
+ }
+
+ /**
+ * @deprecated Use {@link #getWriteIndexName(IndexMapping)} directly. The {@code entityType}
+ * argument is ignored; the canonical name is resolved from the supplied {@code
+ * indexMapping}.
+ */
+ @Deprecated(forRemoval = true)
+ public String resolveWriteIndex(String entityType, IndexMapping indexMapping) {
+ return getWriteIndexName(indexMapping);
+ }
+
+ /**
+ * Returns the index targets a write should fan out to so it survives an in-flight reindex.
+ *
+ *
+ * - When {@code aliasOrIndex} is a known canonical entity index name (i.e. matches the
+ * value of {@link IndexMapping#getIndexName(String)} for some registered entity), the
+ * result is the input plus the single staged index for that entity (if any). Avoids
+ * fanning out an entity-scoped update-by-query — e.g. {@code updateDomainFqnByPrefix}
+ * targeting only the domain index — onto unrelated staged indices.
+ *
- When {@code aliasOrIndex} is a multi-entity alias such as {@code GLOBAL_SEARCH_ALIAS}
+ * or {@code DATA_ASSET_SEARCH_ALIAS}, the result is the input plus every currently
+ * staged index, since the original update can match documents whose owning entity
+ * type is being reindexed.
+ *
+ *
+ * Without this fan-out, update-by-query operations rooted on shared aliases would update
+ * only the about-to-be-discarded active index and lose their effect on alias swap.
+ */
+ public List getWriteFanoutTargets(String aliasOrIndex) {
+ if (aliasOrIndex == null) {
+ return new ArrayList<>(activeStagedIndices.values());
+ }
+ List targets = new ArrayList<>();
+ targets.add(aliasOrIndex);
+ if (isKnownCanonicalIndex(aliasOrIndex)) {
+ String staged = activeStagedIndices.get(aliasOrIndex);
+ if (staged != null) {
+ targets.add(staged);
+ }
+ } else {
+ targets.addAll(activeStagedIndices.values());
+ }
+ return targets;
+ }
+
+ private boolean isKnownCanonicalIndex(String name) {
+ if (entityIndexMap == null || name == null) {
+ return false;
+ }
+ for (IndexMapping mapping : entityIndexMap.values()) {
+ if (mapping != null && name.equals(mapping.getIndexName(clusterAlias))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public String getIndexOrAliasName(String name) {
if (clusterAlias == null || clusterAlias.isEmpty()) {
return name;
@@ -635,7 +790,7 @@ public void createEntityIndex(EntityInterface entity) {
IndexMapping indexMapping = entityIndexMap.get(entityType);
SearchIndex index = searchIndexFactory.buildIndex(entityType, entity);
String doc = JsonUtils.pojoToJson(index.buildSearchIndexDoc());
- searchClient.createEntity(indexMapping.getIndexName(clusterAlias), entityId, doc);
+ searchClient.createEntity(getWriteIndexName(indexMapping), entityId, doc);
if (Entity.TABLE.equals(entityType)) {
indexTableColumns((Table) entity);
@@ -686,7 +841,7 @@ private void indexTableColumns(Table table) {
if (!docs.isEmpty()) {
try {
- searchClient.createEntities(columnIndexMapping.getIndexName(clusterAlias), docs);
+ searchClient.createEntities(getWriteIndexName(columnIndexMapping), docs);
} catch (Exception e) {
LOG.error(
"Issue bulk indexing columns for table [{}]: {}",
@@ -708,7 +863,7 @@ private void deleteTableColumns(Table table) {
try {
searchClient.deleteEntityByFields(
- List.of(columnIndexMapping.getIndexName(clusterAlias)),
+ List.of(getWriteIndexName(columnIndexMapping)),
List.of(new ImmutablePair<>("table.id", table.getId().toString())));
} catch (Exception e) {
LOG.error(
@@ -811,7 +966,7 @@ private void updateTableColumnsInheritedFields(Table table) {
// Use updateChildren to efficiently update all columns for this table
searchClient.updateChildren(
- List.of(columnIndexMapping.getIndexName(clusterAlias)),
+ List.of(getWriteIndexName(columnIndexMapping)),
new ImmutablePair<>("table.id", table.getId().toString()),
new ImmutablePair<>(DEFAULT_UPDATE_SCRIPT, inheritedFields));
@@ -852,13 +1007,6 @@ public void createEntitiesIndex(List entities) throws IOExcepti
String entityType = entities.getFirst().getEntityReference().getType();
Timer.Sample searchSample = RequestLatencyContext.startSearchOperation();
try {
- if (SearchIndexRetryQueue.isEntityTypeSuspended(entityType)) {
- LOG.debug(
- "Skipping live search indexing for {} entities because reindex is active for {}",
- entities.size(),
- entityType);
- return;
- }
if (!getSearchClient().isClientAvailable()) {
for (EntityInterface entity : entities) {
SearchIndexRetryQueue.enqueue(
@@ -888,7 +1036,7 @@ public void createEntitiesIndex(List entities) throws IOExcepti
return;
}
- searchClient.createEntities(indexMapping.getIndexName(clusterAlias), docs);
+ searchClient.createEntities(getWriteIndexName(indexMapping), docs);
if (Entity.TABLE.equals(entityType)) {
indexColumnsForTables(entities);
@@ -907,7 +1055,7 @@ private void indexColumnsForTables(List entities) {
return;
}
- String indexName = columnIndexMapping.getIndexName(clusterAlias);
+ String indexName = getWriteIndexName(columnIndexMapping);
List