Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tieredstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ The following are some core configurations, for more details, see [TieredMessage
| tieredStoreGroupCommitCount | 2500 | | The number of messages that trigger one batch transfer |
| tieredStoreGroupCommitSize | 33554432 | byte | The size of messages that trigger one batch transfer, 32M by default |
| tieredStoreMaxGroupCommitCount | 10000 | | The maximum number of messages waiting to be transferred per queue |
| readAheadCacheExpireDuration | 1000 | millisecond | Read-ahead cache expiration time |
| readAheadCacheExpireDuration | 15000 | millisecond | Legacy fallback expiration time for read-ahead cache entries |
| readAheadCacheCreateExpireDuration | 180000 | millisecond | Read-ahead cache expiration time after entry creation |
| readAheadCacheAfterReadExpireDuration | 10000 | millisecond | Read-ahead cache expiration time after entry read |
| readAheadCacheSizeThresholdRate | 0.3 | | The maximum heap space occupied by the read-ahead cache |

## Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public boolean check(TieredStorageLevel targetLevel) {
private boolean readAheadCacheEnable = true;
private int readAheadMessageCountThreshold = 4096;
private int readAheadMessageSizeThreshold = 16 * 1024 * 1024;
// Legacy single-TTL fallback duration for read-ahead cache entries. The primary
// policy uses create/after-read TTLs below.
private long readAheadCacheExpireDuration = 15 * 1000;
private long readAheadCacheCreateExpireDuration = Duration.ofMinutes(3).toMillis();
private long readAheadCacheAfterReadExpireDuration = Duration.ofSeconds(10).toMillis();
private double readAheadCacheSizeThresholdRate = 0.3;

private int tieredStoreMaxPendingLimit = 10000;
Expand Down Expand Up @@ -356,6 +360,22 @@ public void setReadAheadCacheExpireDuration(long duration) {
this.readAheadCacheExpireDuration = duration;
}

public long getReadAheadCacheCreateExpireDuration() {
return readAheadCacheCreateExpireDuration;
}

public void setReadAheadCacheCreateExpireDuration(long readAheadCacheCreateExpireDuration) {
this.readAheadCacheCreateExpireDuration = readAheadCacheCreateExpireDuration;
}

public long getReadAheadCacheAfterReadExpireDuration() {
return readAheadCacheAfterReadExpireDuration;
}

public void setReadAheadCacheAfterReadExpireDuration(long readAheadCacheAfterReadExpireDuration) {
this.readAheadCacheAfterReadExpireDuration = readAheadCacheAfterReadExpireDuration;
}

public double getReadAheadCacheSizeThresholdRate() {
return readAheadCacheSizeThresholdRate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public TieredMessageStore(MessageStorePluginContext context, MessageStore next)
this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor);
this.indexService = new IndexStoreService(this.flatFileStore.getFlatFileFactory(),
MessageStoreUtil.getIndexFilePath(this.storeConfig.getBrokerName()));
this.fetcher = new MessageStoreFetcherImpl(this);
this.fetcher = createFetcher(this.storeConfig, this.flatFileStore, this.indexService);
this.dispatcher = new MessageStoreDispatcherImpl(this);
next.addDispatcher(dispatcher);
}
Expand Down Expand Up @@ -158,6 +158,18 @@ public FlatFileStore getFlatFileStore() {
return flatFileStore;
}

/**
* Build the fetcher used by this message store. Called from the constructor via
* virtual dispatch, so subclass overrides MUST NOT read {@link TieredMessageStore}
* instance fields beyond the parameters supplied here — other fields may not yet
* be initialized at the time this method runs. The supplied {@code storeConfig},
* {@code flatFileStore}, and {@code indexService} are guaranteed initialized.
*/
protected MessageStoreFetcher createFetcher(MessageStoreConfig storeConfig,
FlatFileStore flatFileStore, IndexService indexService) {
return new MessageStoreFetcherImpl(this, storeConfig, flatFileStore, indexService);
}

public IndexService getIndexService() {
return indexService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Ticker;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -62,7 +64,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
private final IndexService indexService;
private final FlatFileStore flatFileStore;
private final MessageStoreFilter topicFilter;
private final long memoryMaxSize;
protected final long memoryMaxSize;
private final Cache<String /* topic@queueId@offset */, SelectBufferResult> fetcherCache;

public MessageStoreFetcherImpl(TieredMessageStore messageStore) {
Expand All @@ -86,21 +88,107 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf
log.info("MessageStoreFetcher init success, brokerName={}", storeConfig.getBrokerName());
}

private Cache<String, SelectBufferResult> initCache(MessageStoreConfig storeConfig) {
/**
* Build the read-ahead cache. Called from the constructor via virtual dispatch
* (see {@link #MessageStoreFetcherImpl(TieredMessageStore, MessageStoreConfig, FlatFileStore, IndexService)}),
* so subclass overrides MUST NOT read subclass instance fields — those are not yet
* initialized. The {@code storeConfig} parameter and the parent class field
* {@link #memoryMaxSize} are safe to use because they are set before this method runs.
*/
protected Cache<String, SelectBufferResult> initCache(MessageStoreConfig storeConfig) {

ReadAheadCacheTtl ttl = resolveReadAheadCacheTtl(storeConfig);
if (ttl == null) {
return buildLegacyCache(storeConfig.getReadAheadCacheExpireDuration(), memoryMaxSize, Ticker.systemTicker());
}
if (ttl.isAfterReadNotShorter()) {
log.warn("MessageStoreFetcher read-ahead cache afterReadTtl ({}ms) >= createTtl ({}ms); "
+ "reads will extend rather than shorten entry lifetime", ttl.afterReadMs, ttl.createMs);
}
return buildDualTtlCache(ttl.createMs, ttl.afterReadMs, memoryMaxSize, Ticker.systemTicker());
}

static ReadAheadCacheTtl resolveReadAheadCacheTtl(MessageStoreConfig storeConfig) {
long createTtl = storeConfig.getReadAheadCacheCreateExpireDuration();
long afterReadTtl = storeConfig.getReadAheadCacheAfterReadExpireDuration();
if (createTtl <= 0 && afterReadTtl <= 0) {
return null;
}
long fallback = storeConfig.getReadAheadCacheExpireDuration();
long createMs = createTtl > 0 ? createTtl : fallback;
long afterReadMs = afterReadTtl > 0 ? afterReadTtl : fallback;
if (createMs <= 0 || afterReadMs <= 0) {
return null;
}
return new ReadAheadCacheTtl(createMs, afterReadMs);
}

static Cache<String, SelectBufferResult> buildLegacyCache(long expireMs, long memoryMaxSize, Ticker ticker) {
return Caffeine.newBuilder()
.ticker(ticker)
.scheduler(Scheduler.systemScheduler())
// Clients may repeatedly request messages at the same offset in tiered storage,
// causing the request queue to become full. Using expire after read or write policy
// to refresh the cache expiration time.
.expireAfterAccess(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS)
.expireAfterAccess(expireMs, TimeUnit.MILLISECONDS)
.maximumWeight(memoryMaxSize)
// Using the buffer size of messages to calculate memory usage
.weigher((String key, SelectBufferResult buffer) -> buffer.getSize())
.recordStats()
.build();
}

static Cache<String, SelectBufferResult> buildDualTtlCache(
long createMs, long afterReadMs, long memoryMaxSize, Ticker ticker) {
return Caffeine.newBuilder()
.ticker(ticker)
.scheduler(Scheduler.systemScheduler())
.expireAfter(new DualTtlExpiry(createMs, afterReadMs))
.maximumWeight(memoryMaxSize)
.weigher((String key, SelectBufferResult buffer) -> buffer.getSize())
.recordStats()
.build();
}

static final class ReadAheadCacheTtl {
final long createMs;
final long afterReadMs;

ReadAheadCacheTtl(long createMs, long afterReadMs) {
this.createMs = createMs;
this.afterReadMs = afterReadMs;
}

boolean isAfterReadNotShorter() {
return afterReadMs >= createMs;
}
}

static final class DualTtlExpiry implements Expiry<String, SelectBufferResult> {
private final long createNanos;
private final long afterReadNanos;

DualTtlExpiry(long createMs, long afterReadMs) {
this.createNanos = TimeUnit.MILLISECONDS.toNanos(createMs);
this.afterReadNanos = TimeUnit.MILLISECONDS.toNanos(afterReadMs);
}

@Override
public long expireAfterCreate(String key, SelectBufferResult value, long currentTime) {
return createNanos;
}

@Override
public long expireAfterUpdate(String key, SelectBufferResult value, long currentTime, long currentDuration) {
return createNanos;
}

@Override
public long expireAfterRead(String key, SelectBufferResult value, long currentTime, long currentDuration) {
return afterReadNanos;
}
}

public Cache<String, SelectBufferResult> getFetcherCache() {
return fetcherCache;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore.core;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Ticker;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.junit.Assert;
import org.junit.Test;

public class MessageStoreFetcherImplCacheTest {

private static SelectBufferResult buffer(int size) {
return new SelectBufferResult(ByteBuffer.allocate(size), 0L, size, 0L);
}

@Test
public void resolveReadAheadCacheTtl_defaultConfigUsesDualTtl() {
MessageStoreConfig config = new MessageStoreConfig();

MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config);

Assert.assertNotNull(ttl);
Assert.assertEquals(180_000L, ttl.createMs);
Assert.assertEquals(10_000L, ttl.afterReadMs);
}

@Test
public void resolveReadAheadCacheTtl_bothDualTtlsDisabledUsesLegacyFallback() {
MessageStoreConfig config = new MessageStoreConfig();
config.setReadAheadCacheCreateExpireDuration(0);
config.setReadAheadCacheAfterReadExpireDuration(0);

Assert.assertNull(MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config));
}

@Test
public void resolveReadAheadCacheTtl_oneDualTtlDisabledUsesLegacyDuration() {
MessageStoreConfig config = new MessageStoreConfig();
config.setReadAheadCacheExpireDuration(15_000L);
config.setReadAheadCacheCreateExpireDuration(0);
config.setReadAheadCacheAfterReadExpireDuration(10_000L);

MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config);

Assert.assertNotNull(ttl);
Assert.assertEquals(15_000L, ttl.createMs);
Assert.assertEquals(10_000L, ttl.afterReadMs);
}

@Test
public void resolveReadAheadCacheTtl_invalidPartialFallbackUsesLegacyFallback() {
MessageStoreConfig config = new MessageStoreConfig();
config.setReadAheadCacheExpireDuration(0);
config.setReadAheadCacheCreateExpireDuration(180_000L);
config.setReadAheadCacheAfterReadExpireDuration(0);

Assert.assertNull(MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config));
}

@Test
public void dualTtlCache_unreadEntryExpiresOnCreateTtl() {
FakeTicker ticker = new FakeTicker();
Cache<String, SelectBufferResult> cache =
MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker);

cache.put("key", buffer(100));
ticker.advanceMs(180_001L);
cache.cleanUp();
Assert.assertNull(cache.getIfPresent("key"));
}

@Test
public void dualTtlExpiryUsesCreateTtlUntilRead() {
SelectBufferResult buffer = buffer(100);
MessageStoreFetcherImpl.DualTtlExpiry expiry =
new MessageStoreFetcherImpl.DualTtlExpiry(180_000L, 10_000L);

Assert.assertEquals(
TimeUnit.MILLISECONDS.toNanos(180_000L), expiry.expireAfterCreate("key", buffer, 0L));
Assert.assertEquals(
TimeUnit.MILLISECONDS.toNanos(180_000L), expiry.expireAfterUpdate("key", buffer, 0L, 1L));
Assert.assertEquals(
TimeUnit.MILLISECONDS.toNanos(10_000L), expiry.expireAfterRead("key", buffer, 0L, 1L));
}

@Test
public void dualTtlCache_readEntryExpiresOnAfterReadTtl() {
FakeTicker ticker = new FakeTicker();
Cache<String, SelectBufferResult> cache =
MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker);

cache.put("key", buffer(100));
ticker.advanceMs(1_000L);
Assert.assertNotNull(cache.getIfPresent("key"));

ticker.advanceMs(15_000L);
cache.cleanUp();
Assert.assertNull(cache.getIfPresent("key"));
}

@Test
public void dualTtlCache_readThenRePutResetsToCreateTtl() {
FakeTicker ticker = new FakeTicker();
Cache<String, SelectBufferResult> cache =
MessageStoreFetcherImpl.buildDualTtlCache(180_000L, 10_000L, 1L << 30, ticker);

cache.put("key", buffer(100));
ticker.advanceMs(1_000L);
Assert.assertNotNull(cache.getIfPresent("key"));
cache.put("key", buffer(100));

ticker.advanceMs(60_000L);
cache.cleanUp();
Assert.assertNotNull(cache.getIfPresent("key"));
}

@Test
public void readAheadCacheTtl_afterReadNotShorterIsValidAndNotRewritten() {
MessageStoreConfig config = new MessageStoreConfig();
config.setReadAheadCacheCreateExpireDuration(100L);
config.setReadAheadCacheAfterReadExpireDuration(200L);

MessageStoreFetcherImpl.ReadAheadCacheTtl ttl = MessageStoreFetcherImpl.resolveReadAheadCacheTtl(config);

Assert.assertNotNull(ttl);
Assert.assertEquals(100L, ttl.createMs);
Assert.assertEquals(200L, ttl.afterReadMs);
Assert.assertTrue(ttl.isAfterReadNotShorter());
}

private static class FakeTicker implements Ticker {
private long nanos;

@Override
public long read() {
return nanos;
}

private void advanceMs(long ms) {
nanos += TimeUnit.MILLISECONDS.toNanos(ms);
}
}
}
Loading