Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
14cb35f
fix(it): stop two flaky integration tests (#27649)
mohityadav766 Apr 23, 2026
b8f8928
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 23, 2026
09e5923
fix(search): never skip live indexing during reindex (#27649)
mohityadav766 Apr 23, 2026
6e3f2b7
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 23, 2026
c1fa78c
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 23, 2026
5960466
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 23, 2026
52992bc
fix(search): route live writes to staged index during reindex (#27649)
mohityadav766 Apr 23, 2026
ea093b0
fix(search): re-register SearchIndexHandler on every SearchRepository…
mohityadav766 Apr 24, 2026
3d98c5d
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 24, 2026
61fa799
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 27, 2026
568c2d3
fix(search): centralize staged-index routing on canonical name (#27649)
mohityadav766 Apr 27, 2026
520b924
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 27, 2026
0a4144c
fix(it): bump DB sort memory + search-wait ceilings (#27649)
mohityadav766 Apr 27, 2026
37c3438
fix(search): address Copilot/Gitar review on staged-index routing (#2…
mohityadav766 Apr 27, 2026
642d2c4
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 27, 2026
95aa923
fix(search): fan out cross-alias update-by-query to staged indices (#…
mohityadav766 Apr 27, 2026
ce1ee8f
Merge branch 'main' into fix-flaky-tests
mohityadav766 Apr 27, 2026
addea67
fix(search): scope fan-out to canonical input vs multi-entity alias (…
mohityadav766 Apr 27, 2026
545191c
Merge branch 'fix-flaky-tests' of https://github.com/open-metadata/Op…
mohityadav766 Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,14 @@ private void startDatabase() {
mysql.withDatabaseName("openmetadata");
mysql.withUsername("test");
mysql.withPassword("test");
mysql.withCommand("mysqld", "--max_allowed_packet=" + mysqlMaxAllowedPacket);
mysql.withCommand(
"mysqld",
"--max_allowed_packet=" + mysqlMaxAllowedPacket,
// The tag list query (TagDAO.listAfter) joins three tables and sorts by tag.name,
// tag.id; under the parallel-tests fork the tag table grows large and the default
// 256KB sort_buffer_size overflows with "Out of sort memory" (#27649). 8MB is plenty
// for an integration-test workload and well under the 4GB overall limit.
"--sort_buffer_size=8M");
mysql.withStartupTimeoutSeconds(240);
mysql.withConnectTimeoutSeconds(240);
mysql.withTmpFs(java.util.Map.of("/var/lib/mysql", "rw,size=2g"));
Expand Down Expand Up @@ -278,7 +285,12 @@ private void startDatabase() {
"-c",
"synchronous_commit=off",
"-c",
"full_page_writes=off");
"full_page_writes=off",
// Bump work_mem for the same reason MySQL gets a larger sort_buffer above:
// TagDAO.listAfter joins three tables and sorts; default 4MB spills to temp files
// under load.
"-c",
"work_mem=32MB");
postgres.withTmpFs(java.util.Map.of("/var/lib/postgresql/data", "rw,size=2g"));
postgres.withCreateContainerCmdModifier(
cmd ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4378,7 +4378,7 @@ void test_bulkCreateOrUpdate_searchIndexed(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();

Awaitility.await()
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(2))
.ignoreExceptions()
Expand Down Expand Up @@ -4440,7 +4440,7 @@ void test_bulkUpdate_searchIndexUpdated(TestNamespace ns) {
OpenMetadataClient client = SdkClients.adminClient();

Awaitility.await()
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(3))
.ignoreExceptions()
Expand Down Expand Up @@ -5130,7 +5130,7 @@ void checkDeletedEntity(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
Expand Down Expand Up @@ -5166,7 +5166,7 @@ void checkIndexCreated(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
Expand Down Expand Up @@ -5194,7 +5194,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
Awaitility.await("Wait for entity to appear in search index")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
Expand All @@ -5213,7 +5213,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
Awaitility.await("Wait for search to reflect update")
.pollDelay(Duration.ofMillis(500))
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(90))
.atMost(Duration.ofSeconds(180))
.ignoreExceptions()
.untilAsserted(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
import org.openmetadata.it.factories.GlossaryTermTestFactory;
import org.openmetadata.it.factories.GlossaryTestFactory;
Expand All @@ -41,10 +42,13 @@
*
* <p>Test isolation: Uses TestNamespace for unique entity naming.
*
* <p>Parallelization: Runs with @Execution(ExecutionMode.SAME_THREAD) because each test
* blocks a server thread on synchronous Fuseki writes; concurrent execution can exhaust the
* server thread pool and cause request timeouts.
* <p>Parallelization: Annotated {@code @Isolated} because {@link RdfUpdater} is a JVM-wide
* singleton. {@code @BeforeAll} flips it on, so any test class running concurrently starts doing
* synchronous Fuseki writes on every entity create — saturating the Dropwizard thread pool and
* causing 60s request timeouts (see issue #27649). {@code @Execution(SAME_THREAD)} alone only
* serializes within this class and does not prevent that cross-class leakage.
*/
@Isolated
@Execution(ExecutionMode.SAME_THREAD)
@ExtendWith(TestNamespaceExtension.class)
public class GlossaryOntologyExportIT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,64 +793,6 @@ void testEnqueuePreservesErrorDetailInFailureReason(TestNamespace ns) {
retryQueueDAO.deleteByEntity(entityId, entityFqn);
}

// ---------------------------------------------------------------------------
// Suspension tests
// ---------------------------------------------------------------------------

@Test
void testSuspensionPreventsEnqueue(TestNamespace ns) {
String entityId = UUID.randomUUID().toString();
String entityFqn = ns.prefix("rq") + ".suspended.entity";
try {
SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);
assertTrue(SearchIndexRetryQueue.isSuspendAllStreaming());

// Enqueue should still insert (suspension affects worker processing, not enqueueing)
SearchIndexRetryQueue.enqueue(entityId, entityFqn, "during suspension");
List<SearchIndexRetryRecord> records =
retryQueueDAO.findByStatus(SearchIndexRetryQueue.STATUS_PENDING, 1000);
assertTrue(records.stream().anyMatch(r -> r.getEntityId().equals(entityId)));
} finally {
SearchIndexRetryQueue.clearSuspension();
retryQueueDAO.deleteByEntity(entityId, entityFqn);
}
}

@Test
void testWorkerDeletesRecordsDuringSuspendAll(TestNamespace ns) throws Exception {
String entityId = UUID.randomUUID().toString();
String entityFqn = ns.prefix("rq") + ".suspended.entity";

retryQueueDAO.upsert(
entityId, entityFqn, "will be suspended", SearchIndexRetryQueue.STATUS_PENDING, "table");

try {
SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);

SearchIndexRetryWorker worker = new SearchIndexRetryWorker(collectionDAO, searchRepository);
worker.start();
try {
Awaitility.await("Worker should delete record during full suspension")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(
() -> {
List<SearchIndexRetryRecord> remaining =
retryQueueDAO.findByStatuses(
List.of(
SearchIndexRetryQueue.STATUS_PENDING,
SearchIndexRetryQueue.STATUS_IN_PROGRESS),
1000);
return remaining.stream().noneMatch(r -> r.getEntityId().equals(entityId));
});
} finally {
worker.stop();
}
} finally {
SearchIndexRetryQueue.clearSuspension();
}
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6882,7 +6882,6 @@ private void triggerWorkflow_SDK(TestNamespace ns, List<Table> localTestTables)
String workflowName = "DataCompletenessWorkflow";
OpenMetadataClient client = SdkClients.adminClient();

waitForWorkflowDeployment(client, workflowName);
for (Table table : localTestTables) {
waitForEntityIndexedInSearch(client, "table_search_index", table.getFullyQualifiedName());
}
Expand Down Expand Up @@ -7000,7 +6999,7 @@ private Domain getOrCreateDomain(TestNamespace ns) {
}

private void waitForWorkflowDeployment(OpenMetadataClient client, String workflowName) {
await()
await("workflow '" + workflowName + "' to finish Flowable deployment")
.atMost(Duration.ofSeconds(120))
.pollDelay(Duration.ofSeconds(1))
.pollInterval(Duration.ofSeconds(2))
Expand All @@ -7015,7 +7014,7 @@ private void waitForWorkflowDeployment(OpenMetadataClient client, String workflo

private void waitForEntityIndexedInSearch(
OpenMetadataClient client, String indexName, String entityFqn) {
await()
await("entity '" + entityFqn + "' to appear in " + indexName)
.atMost(Duration.ofSeconds(120))
.pollDelay(Duration.ofSeconds(1))
.pollInterval(Duration.ofSeconds(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
}

if (shouldPromote) {
// Always clear staged-index routing on the way out, regardless of outcome:
// - swap success → alias now points at staged; canonical and staged resolve to the
// same index, so unregistering keeps reads/writes consistent.
// - swap failure / empty aliases / exception → leaving routing active would silently
// send live writes to a staged index nothing reads from, which
// is strictly worse than the writes going back to the canonical
// alias target. Operators need to retry the reindex either way.
try {
Set<String> aliasesToAttach = new HashSet<>();

Expand Down Expand Up @@ -148,6 +155,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
entityType);
Comment thread
mohityadav766 marked this conversation as resolved.
return;
}
} else {
LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType);
}
Comment thread
mohityadav766 marked this conversation as resolved.

LOG.info(
Expand Down Expand Up @@ -180,6 +189,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
if (metrics != null) {
metrics.recordPromotionFailure(entityType);
}
} finally {
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
} else {
try {
Expand All @@ -196,6 +207,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
stagedIndex,
entityType,
ex);
} finally {
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
}
}
Expand Down Expand Up @@ -262,10 +275,13 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
stagedIndex,
entityType,
ex);
} finally {
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
return;
}

// Always clear staged-index routing on the way out — see the rationale in finalizeReindex.
try {
Set<String> aliasesToAttach =
getAliasesFromMapping(indexMapping, searchRepository.getClusterAlias());
Expand Down Expand Up @@ -340,6 +356,8 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
if (promoteMetrics != null) {
promoteMetrics.recordPromotionFailure(entityType);
}
} finally {
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
}
}

Expand Down Expand Up @@ -422,6 +440,7 @@ protected void recreateIndexFromMapping(

String stagedIndexName = buildStagedIndexName(canonicalIndexName);
searchClient.createIndex(stagedIndexName, mappingContent);
searchRepository.registerStagedIndex(entityType, stagedIndexName);

Set<String> existingAliases =
activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.openmetadata.service.search;

import io.micrometer.core.instrument.Metrics;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.service.Entity;
Expand All @@ -24,10 +19,6 @@ public final class SearchIndexRetryQueue {

private static final int MAX_REASON_LENGTH = 8192;

private static final AtomicReference<Set<String>> SUSPENDED_ENTITY_TYPES =
new AtomicReference<>(Collections.emptySet());
private static final AtomicBoolean SUSPEND_ALL_STREAMING = new AtomicBoolean(false);

private SearchIndexRetryQueue() {}

public static void enqueue(EntityInterface entity, String operation, Throwable failure) {
Expand Down Expand Up @@ -117,46 +108,6 @@ public static boolean isRetryableStatusCode(int status) {
return status < 400;
}

public static void updateSuspension(Set<String> entityTypes, boolean suspendAll) {
Set<String> normalized = new HashSet<>();
for (String entityType : entityTypes == null ? Collections.<String>emptySet() : entityTypes) {
String normalizedType = normalize(entityType);
if (!normalizedType.isEmpty()) {
normalized.add(normalizedType);
}
}

// Set entity types before the boolean so that isEntityTypeSuspended never
// sees suspendAll=false with an outdated (empty) entity-types set.
SUSPENDED_ENTITY_TYPES.set(Collections.unmodifiableSet(normalized));
SUSPEND_ALL_STREAMING.set(suspendAll);
}

public static void clearSuspension() {
SUSPEND_ALL_STREAMING.set(false);
SUSPENDED_ENTITY_TYPES.set(Collections.emptySet());
}

public static boolean isEntityTypeSuspended(String entityType) {
if (SUSPEND_ALL_STREAMING.get()) {
return true;
}
String normalized = normalize(entityType);
return !normalized.isEmpty() && SUSPENDED_ENTITY_TYPES.get().contains(normalized);
}

public static boolean isStreamingSuspended() {
return SUSPEND_ALL_STREAMING.get() || !SUSPENDED_ENTITY_TYPES.get().isEmpty();
}

public static boolean isSuspendAllStreaming() {
return SUSPEND_ALL_STREAMING.get();
}

public static Set<String> getSuspendedEntityTypes() {
return SUSPENDED_ENTITY_TYPES.get();
}

private static String truncate(String value) {
if (value == null) {
return null;
Expand Down
Loading
Loading