Skip to content

Commit af5e287

Browse files
authored
Lucene: handle requests for deferred merges (#3905)
1. Require deferred merge when queuing a pending write. This is done to cover cases of a merge indicator without active merge. Requesting a deferred merge will trigger merge and then drain the queue and clear the indicator. 2. Do not require a deferred merge in the explicit merge path. This will prevent an endless loop void merges. 3. As an optimization, use a `NoMergePolicy` in the non-indexing path.
1 parent 444d85d commit af5e287

3 files changed

Lines changed: 72 additions & 12 deletions

File tree

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ private <M extends Message> void writeDocument(final FDBIndexableRecord<M> newRe
177177
if (shouldUseQueue(entry.getKey(), partitionId)) {
178178
PendingWriteQueue queue = directoryManager.getPendingWriteQueue(entry.getKey(), partitionId);
179179
queue.enqueueInsert(state.context, newRecord.getPrimaryKey(), entry.getValue());
180+
// Require deferred merge (+ drain) in case there is a merge indicator without an active merge
181+
this.state.store.getIndexDeferredMaintenanceControl().setMergeRequiredIndexes(this.state.index);
180182
} else {
181183
writeDocumentBypassQueue(newRecord, entry, partitionId);
182184
}
@@ -204,6 +206,8 @@ private int deleteDocument(Tuple groupingKey, @Nullable Integer partitionId, Tup
204206
if (shouldUseQueue(groupingKey, partitionId)) {
205207
PendingWriteQueue queue = directoryManager.getPendingWriteQueue(groupingKey, partitionId);
206208
queue.enqueueDelete(state.context, primaryKey);
209+
// Require deferred merge (+ drain) in case there is a merge indicator without an active merge
210+
this.state.store.getIndexDeferredMaintenanceControl().setMergeRequiredIndexes(this.state.index);
207211
return 0; // partition count will be adjusted during drain
208212
} else {
209213
return deleteDocumentBypassQueue(groupingKey, partitionId, primaryKey);

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/directory/FDBDirectoryWrapper.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@
5656
import org.apache.lucene.index.MergePolicy;
5757
import org.apache.lucene.index.MergeScheduler;
5858
import org.apache.lucene.index.MergeTrigger;
59+
import org.apache.lucene.index.NoMergePolicy;
5960
import org.apache.lucene.index.StandardDirectoryReaderOptimization;
60-
import org.apache.lucene.index.TieredMergePolicy;
6161
import org.apache.lucene.store.LockFactory;
6262
import org.apache.lucene.store.NoLockFactory;
6363
import org.apache.lucene.util.IOUtils;
@@ -198,21 +198,27 @@ private IndexWriter createIndexWriter(final Exception exceptionAtCreation) throw
198198
@Nonnull
199199
private IndexWriterConfig createIndexWriterConfig(final Exception exceptionAtCreation) {
200200
final IndexDeferredMaintenanceControl mergeControl = this.state.store.getIndexDeferredMaintenanceControl();
201-
TieredMergePolicy tieredMergePolicy = new FDBTieredMergePolicy(mergeControl, this.agilityContext,
202-
this.state.indexSubspace, this.key, exceptionAtCreation)
203-
.setMaxMergedSegmentMB(this.state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_MERGE_MAX_SIZE))
204-
.setSegmentsPerTier(this.state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER));
205-
tieredMergePolicy.setNoCFSRatio(1.00);
206-
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(this.analyzerWrapper.getAnalyzer())
201+
final MergePolicy mergePolicy;
202+
if (mergeControl.shouldAutoMergeDuringCommit() || mergeControl.isExplicitMergePath()) {
203+
// Here: this function was called in an explicit merge path. Prepare an appropriate
204+
// merge policy, and avoid requesting a deferred merge
205+
mergePolicy = new FDBTieredMergePolicy(mergeControl, this.agilityContext,
206+
this.state.indexSubspace, this.key, exceptionAtCreation)
207+
.setMaxMergedSegmentMB(this.state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_MERGE_MAX_SIZE))
208+
.setSegmentsPerTier(this.state.context.getPropertyStorage().getPropertyValue(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER));
209+
} else {
210+
// Here: Not a merge path, optimize by using a "no merge" policy and request a deferred merge
211+
mergePolicy = NoMergePolicy.INSTANCE;
212+
mergeControl.setMergeRequiredIndexes(this.state.index);
213+
}
214+
215+
mergePolicy.setNoCFSRatio(1.00);
216+
return new IndexWriterConfig(this.analyzerWrapper.getAnalyzer())
207217
.setUseCompoundFile(USE_COMPOUND_FILE)
208-
.setMergePolicy(tieredMergePolicy)
218+
.setMergePolicy(mergePolicy)
209219
.setMergeScheduler(getMergeScheduler(this.state, this.mergeDirectoryCount, this.agilityContext, this.key))
210220
.setCodec(CODEC)
211221
.setInfoStream(new LuceneLoggerInfoStream(LOGGER));
212-
213-
// Merge is required when creating an index writer (do we have a better indicator for a required merge?)
214-
mergeControl.setMergeRequiredIndexes(this.state.index);
215-
return indexWriterConfig;
216222
}
217223

218224
@Nonnull

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/directory/PendingWriteQueueTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,56 @@ void pendingQueueTestDrainException2() {
942942
});
943943
}
944944

945+
@Test
946+
void testMergeRequiredIndexesWithPendingQueue() {
947+
// Test that store's getMergeRequiredIndexes does the right thing
948+
final Index index = SIMPLE_TEXT_SUFFIXES;
949+
final KeySpacePath path = pathManager.createPath(TestKeySpace.RECORD_STORE);
950+
final Function<FDBRecordContext, FDBRecordStore> schemaSetup = context ->
951+
LuceneIndexTestUtils.rebuildIndexMetaData(context, path,
952+
TestRecordsTextProto.SimpleDocument.getDescriptor().getName(),
953+
index, useCascadesPlanner).getLeft();
954+
955+
// Insert without queue indicator - should request deferred merge
956+
try (FDBRecordContext context = openContext()) {
957+
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
958+
recordStore.saveRecord(LuceneIndexTestUtils.createSimpleDocument(1001L, "first document", 1));
959+
960+
Set<Index> mergeRequired = recordStore.getIndexDeferredMaintenanceControl().getMergeRequiredIndexes();
961+
assertNotNull(mergeRequired);
962+
assertTrue(mergeRequired.contains(index));
963+
964+
commit(context);
965+
}
966+
967+
// Enable queue mode
968+
setOngoingMergeIndicator(schemaSetup, index, null, null);
969+
970+
// Insert with queue indicator - should request deferred merge
971+
try (FDBRecordContext context = openContext()) {
972+
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
973+
recordStore.saveRecord(LuceneIndexTestUtils.createSimpleDocument(1002L, "second document", 1));
974+
975+
Set<Index> mergeRequired = recordStore.getIndexDeferredMaintenanceControl().getMergeRequiredIndexes();
976+
assertNotNull(mergeRequired);
977+
assertTrue(mergeRequired.contains(index));
978+
979+
commit(context);
980+
}
981+
982+
// Call explicit merge - should not request deferred merge
983+
try (FDBRecordContext context = openContext()) {
984+
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
985+
final LuceneIndexMaintainer indexMaintainer = getIndexMaintainer(recordStore, index);
986+
indexMaintainer.mergeIndex().join();
987+
988+
Set<Index> mergeRequired = recordStore.getIndexDeferredMaintenanceControl().getMergeRequiredIndexes();
989+
assertTrue(mergeRequired == null || mergeRequired.isEmpty());
990+
991+
commit(context);
992+
}
993+
}
994+
945995
private void verifyClearedQueueAndIndicator(Function<FDBRecordContext, FDBRecordStore> schemaSetup, Index index, @Nullable Tuple groupingKey, @Nullable Integer partitionId) {
946996
try (FDBRecordContext context = openContext()) {
947997
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));

0 commit comments

Comments
 (0)