diff --git a/tieredstore/README.md b/tieredstore/README.md index 1532fc3b5fd..74c0c232178 100644 --- a/tieredstore/README.md +++ b/tieredstore/README.md @@ -31,7 +31,9 @@ The following are some core configurations, for more details, see [TieredMessage | tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer | | tieredStoreGroupCommitSize | 33554432 | byte | The size of messages that trigger one batch transfer, 32M by default | | tieredStoreMaxGroupCommitCount | 10000 | | The maximum number of messages waiting to be transferred per queue | -| readAheadCacheExpireDuration | 1000 | millisecond | Read-ahead cache expiration time | +| readAheadCacheExpireDuration | 15000 | millisecond | Legacy fallback expiration time for read-ahead cache entries | +| readAheadCacheCreateExpireDuration | 180000 | millisecond | Read-ahead cache expiration time after entry creation | +| readAheadCacheAfterReadExpireDuration | 10000 | millisecond | Read-ahead cache expiration time after entry read | | readAheadCacheSizeThresholdRate | 0.3 | | The maximum heap space occupied by the read-ahead cache | ## Metrics diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java index d22ab80dd82..fd090820da5 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java @@ -118,7 +118,11 @@ public boolean check(TieredStorageLevel targetLevel) { private boolean readAheadCacheEnable = true; private int readAheadMessageCountThreshold = 4096; private int readAheadMessageSizeThreshold = 16 * 1024 * 1024; + // Legacy single-TTL fallback duration for read-ahead cache entries. The primary + // policy uses create/after-read TTLs below. private long readAheadCacheExpireDuration = 15 * 1000; + private long readAheadCacheCreateExpireDuration = Duration.ofMinutes(3).toMillis(); + private long readAheadCacheAfterReadExpireDuration = Duration.ofSeconds(10).toMillis(); private double readAheadCacheSizeThresholdRate = 0.3; private int tieredStoreMaxPendingLimit = 10000; @@ -356,6 +360,22 @@ public void setReadAheadCacheExpireDuration(long duration) { this.readAheadCacheExpireDuration = duration; } + public long getReadAheadCacheCreateExpireDuration() { + return readAheadCacheCreateExpireDuration; + } + + public void setReadAheadCacheCreateExpireDuration(long readAheadCacheCreateExpireDuration) { + this.readAheadCacheCreateExpireDuration = readAheadCacheCreateExpireDuration; + } + + public long getReadAheadCacheAfterReadExpireDuration() { + return readAheadCacheAfterReadExpireDuration; + } + + public void setReadAheadCacheAfterReadExpireDuration(long readAheadCacheAfterReadExpireDuration) { + this.readAheadCacheAfterReadExpireDuration = readAheadCacheAfterReadExpireDuration; + } + public double getReadAheadCacheSizeThresholdRate() { return readAheadCacheSizeThresholdRate; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 38946fd1611..b17abe9162c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -99,7 +99,7 @@ public TieredMessageStore(MessageStorePluginContext context, MessageStore next) this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor); this.indexService = new IndexStoreService(this.flatFileStore.getFlatFileFactory(), MessageStoreUtil.getIndexFilePath(this.storeConfig.getBrokerName())); - this.fetcher = new MessageStoreFetcherImpl(this); + this.fetcher = createFetcher(this.storeConfig, this.flatFileStore, this.indexService); this.dispatcher = new MessageStoreDispatcherImpl(this); next.addDispatcher(dispatcher); } @@ -158,6 +158,18 @@ public FlatFileStore getFlatFileStore() { return flatFileStore; } + /** + * Build the fetcher used by this message store. Called from the constructor via + * virtual dispatch, so subclass overrides MUST NOT read {@link TieredMessageStore} + * instance fields beyond the parameters supplied here — other fields may not yet + * be initialized at the time this method runs. The supplied {@code storeConfig}, + * {@code flatFileStore}, and {@code indexService} are guaranteed initialized. + */ + protected MessageStoreFetcher createFetcher(MessageStoreConfig storeConfig, + FlatFileStore flatFileStore, IndexService indexService) { + return new MessageStoreFetcherImpl(this, storeConfig, flatFileStore, indexService); + } + public IndexService getIndexService() { return indexService; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 2a5dc2dd8a6..ceab7fc32f2 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -18,7 +18,9 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.Scheduler; +import com.github.benmanes.caffeine.cache.Ticker; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -62,7 +64,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { private final IndexService indexService; private final FlatFileStore flatFileStore; private final MessageStoreFilter topicFilter; - private final long memoryMaxSize; + protected final long memoryMaxSize; private final Cache fetcherCache; public MessageStoreFetcherImpl(TieredMessageStore messageStore) { @@ -86,14 +88,49 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf log.info("MessageStoreFetcher init success, brokerName={}", storeConfig.getBrokerName()); } - private Cache initCache(MessageStoreConfig storeConfig) { + /** + * Build the read-ahead cache. Called from the constructor via virtual dispatch + * (see {@link #MessageStoreFetcherImpl(TieredMessageStore, MessageStoreConfig, FlatFileStore, IndexService)}), + * so subclass overrides MUST NOT read subclass instance fields — those are not yet + * initialized. The {@code storeConfig} parameter and the parent class field + * {@link #memoryMaxSize} are safe to use because they are set before this method runs. + */ + protected Cache initCache(MessageStoreConfig storeConfig) { + + ReadAheadCacheTtl ttl = resolveReadAheadCacheTtl(storeConfig); + if (ttl == null) { + return buildLegacyCache(storeConfig.getReadAheadCacheExpireDuration(), memoryMaxSize, Ticker.systemTicker()); + } + if (ttl.isAfterReadNotShorter()) { + log.warn("MessageStoreFetcher read-ahead cache afterReadTtl ({}ms) >= createTtl ({}ms); " + + "reads will extend rather than shorten entry lifetime", ttl.afterReadMs, ttl.createMs); + } + return buildDualTtlCache(ttl.createMs, ttl.afterReadMs, memoryMaxSize, Ticker.systemTicker()); + } + + static ReadAheadCacheTtl resolveReadAheadCacheTtl(MessageStoreConfig storeConfig) { + long createTtl = storeConfig.getReadAheadCacheCreateExpireDuration(); + long afterReadTtl = storeConfig.getReadAheadCacheAfterReadExpireDuration(); + if (createTtl <= 0 && afterReadTtl <= 0) { + return null; + } + long fallback = storeConfig.getReadAheadCacheExpireDuration(); + long createMs = createTtl > 0 ? createTtl : fallback; + long afterReadMs = afterReadTtl > 0 ? afterReadTtl : fallback; + if (createMs <= 0 || afterReadMs <= 0) { + return null; + } + return new ReadAheadCacheTtl(createMs, afterReadMs); + } + static Cache buildLegacyCache(long expireMs, long memoryMaxSize, Ticker ticker) { return Caffeine.newBuilder() + .ticker(ticker) .scheduler(Scheduler.systemScheduler()) // Clients may repeatedly request messages at the same offset in tiered storage, // causing the request queue to become full. Using expire after read or write policy // to refresh the cache expiration time. - .expireAfterAccess(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS) + .expireAfterAccess(expireMs, TimeUnit.MILLISECONDS) .maximumWeight(memoryMaxSize) // Using the buffer size of messages to calculate memory usage .weigher((String key, SelectBufferResult buffer) -> buffer.getSize()) @@ -101,6 +138,57 @@ private Cache initCache(MessageStoreConfig storeConf .build(); } + static Cache buildDualTtlCache( + long createMs, long afterReadMs, long memoryMaxSize, Ticker ticker) { + return Caffeine.newBuilder() + .ticker(ticker) + .scheduler(Scheduler.systemScheduler()) + .expireAfter(new DualTtlExpiry(createMs, afterReadMs)) + .maximumWeight(memoryMaxSize) + .weigher((String key, SelectBufferResult buffer) -> buffer.getSize()) + .recordStats() + .build(); + } + + static final class ReadAheadCacheTtl { + final long createMs; + final long afterReadMs; + + ReadAheadCacheTtl(long createMs, long afterReadMs) { + this.createMs = createMs; + this.afterReadMs = afterReadMs; + } + + boolean isAfterReadNotShorter() { + return afterReadMs >= createMs; + } + } + + static final class DualTtlExpiry implements Expiry { + private final long createNanos; + private final long afterReadNanos; + + DualTtlExpiry(long createMs, long afterReadMs) { + this.createNanos = TimeUnit.MILLISECONDS.toNanos(createMs); + this.afterReadNanos = TimeUnit.MILLISECONDS.toNanos(afterReadMs); + } + + @Override + public long expireAfterCreate(String key, SelectBufferResult value, long currentTime) { + return createNanos; + } + + @Override + public long expireAfterUpdate(String key, SelectBufferResult value, long currentTime, long currentDuration) { + return createNanos; + } + + @Override + public long expireAfterRead(String key, SelectBufferResult value, long currentTime, long currentDuration) { + return afterReadNanos; + } + } + public Cache getFetcherCache() { return fetcherCache; } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplCacheTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplCacheTest.java new file mode 100644 index 00000000000..5a89b1cf513 --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplCacheTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.core; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Ticker; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.tieredstore.MessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.SelectBufferResult; +import org.junit.Assert; +import org.junit.Test; + +public class MessageStoreFetcherImplCacheTest { + + private static SelectBufferResult buffer(int size) { + return new SelectBufferResult(ByteBuffer.allocate(size), 0L, size, 0L); + } + + @Test + public void resolveReadAheadCacheTtl_defaultConfigUsesDualTtl() { + MessageStoreConfig config = new MessageStoreConfig(); + + MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config); + + Assert.assertNotNull(ttl); + Assert.assertEquals(180_000L, ttl.createMs); + Assert.assertEquals(10_000L, ttl.afterReadMs); + } + + @Test + public void resolveReadAheadCacheTtl_bothDualTtlsDisabledUsesLegacyFallback() { + MessageStoreConfig config = new MessageStoreConfig(); + config.setReadAheadCacheCreateExpireDuration(0); + config.setReadAheadCacheAfterReadExpireDuration(0); + + Assert.assertNull(MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config)); + } + + @Test + public void resolveReadAheadCacheTtl_oneDualTtlDisabledUsesLegacyDuration() { + MessageStoreConfig config = new MessageStoreConfig(); + config.setReadAheadCacheExpireDuration(15_000L); + config.setReadAheadCacheCreateExpireDuration(0); + config.setReadAheadCacheAfterReadExpireDuration(10_000L); + + MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config); + + Assert.assertNotNull(ttl); + Assert.assertEquals(15_000L, ttl.createMs); + Assert.assertEquals(10_000L, ttl.afterReadMs); + } + + @Test + public void resolveReadAheadCacheTtl_invalidPartialFallbackUsesLegacyFallback() { + MessageStoreConfig config = new MessageStoreConfig(); + config.setReadAheadCacheExpireDuration(0); + config.setReadAheadCacheCreateExpireDuration(180_000L); + config.setReadAheadCacheAfterReadExpireDuration(0); + + Assert.assertNull(MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config)); + } + + @Test + public void dualTtlCache_unreadEntryExpiresOnCreateTtl() { + FakeTicker ticker = new FakeTicker(); + Cache cache = + MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker); + + cache.put("key", buffer(100)); + ticker.advanceMs(180_001L); + cache.cleanUp(); + Assert.assertNull(cache.getIfPresent("key")); + } + + @Test + public void dualTtlExpiryUsesCreateTtlUntilRead() { + SelectBufferResult buffer = buffer(100); + MessageStoreFetcherImpl.DualTtlExpiry expiry = + new MessageStoreFetcherImpl.DualTtlExpiry(180_000L, 10_000L); + + Assert.assertEquals( + TimeUnit.MILLISECONDS.toNanos(180_000L), expiry.expireAfterCreate("key", buffer, 0L)); + Assert.assertEquals( + TimeUnit.MILLISECONDS.toNanos(180_000L), expiry.expireAfterUpdate("key", buffer, 0L, 1L)); + Assert.assertEquals( + TimeUnit.MILLISECONDS.toNanos(10_000L), expiry.expireAfterRead("key", buffer, 0L, 1L)); + } + + @Test + public void dualTtlCache_readEntryExpiresOnAfterReadTtl() { + FakeTicker ticker = new FakeTicker(); + Cache cache = + MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker); + + cache.put("key", buffer(100)); + ticker.advanceMs(1_000L); + Assert.assertNotNull(cache.getIfPresent("key")); + + ticker.advanceMs(15_000L); + cache.cleanUp(); + Assert.assertNull(cache.getIfPresent("key")); + } + + @Test + public void dualTtlCache_readThenRePutResetsToCreateTtl() { + FakeTicker ticker = new FakeTicker(); + Cache cache = + MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker); + + cache.put("key", buffer(100)); + ticker.advanceMs(1_000L); + Assert.assertNotNull(cache.getIfPresent("key")); + cache.put("key", buffer(100)); + + ticker.advanceMs(60_000L); + cache.cleanUp(); + Assert.assertNotNull(cache.getIfPresent("key")); + } + + @Test + public void readAheadCacheTtl_afterReadNotShorterIsValidAndNotRewritten() { + MessageStoreConfig config = new MessageStoreConfig(); + config.setReadAheadCacheCreateExpireDuration(100L); + config.setReadAheadCacheAfterReadExpireDuration(200L); + + MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config); + + Assert.assertNotNull(ttl); + Assert.assertEquals(100L, ttl.createMs); + Assert.assertEquals(200L, ttl.afterReadMs); + Assert.assertTrue(ttl.isAfterReadNotShorter()); + } + + private static class FakeTicker implements Ticker { + private long nanos; + + @Override + public long read() { + return nanos; + } + + private void advanceMs(long ms) { + nanos += TimeUnit.MILLISECONDS.toNanos(ms); + } + } +}