From bee9c4a68688a12b49ad107853a44653911ca721 Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Wed, 17 Jun 2026 23:12:50 +0800 Subject: [PATCH] [ISSUE #10527] Reduce auxiliary component allocation via AtomicLong, string caches, StringBuilder reuse, and dirty flag - QueueOffsetOperator: ConcurrentMap -> ConcurrentMap - BrokerStatsManager: cache buildStatsKey/topicQueueKey/consumerOffset strings, Integer->int params - IndexService: reusable StringBuilder via ThreadLocal - TimerWheel: volatile dirty flag to skip unnecessary flush - AppendMessageResult: constructor with pre-computed fields - Fix BrokerStatsManagerTest: QUEUE_ID Integer->int --- .../rocketmq/store/AppendMessageResult.java | 12 +++++ .../rocketmq/store/index/IndexService.java | 7 ++- .../store/queue/QueueOffsetOperator.java | 47 +++++++++++++------ .../rocketmq/store/timer/TimerWheel.java | 30 ++++++++---- .../store/stats/BrokerStatsManagerTest.java | 2 +- 5 files changed, 71 insertions(+), 27 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index 98bf203ad5d..fe5da43c19d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -84,6 +84,18 @@ public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wro this.msgNum = msgNum; } + public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, String msgId, + long storeTimestamp, long logicsOffset, long pagecacheRT, int msgNum) { + this.status = status; + this.wroteOffset = wroteOffset; + this.wroteBytes = wroteBytes; + this.msgId = msgId; + this.storeTimestamp = storeTimestamp; + this.logicsOffset = logicsOffset; + this.pagecacheRT = pagecacheRT; + this.msgNum = msgNum; + } + public long getPagecacheRT() { return pagecacheRT; } diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index 1a180e4442b..9647a874e0d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -49,6 +49,7 @@ public class IndexService implements CommitLogDispatchStore { private final String storePath; private final ArrayList indexFileList = new ArrayList<>(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final StringBuilder reusableKeyBuilder = new StringBuilder(128); public IndexService(final DefaultMessageStore store) { this.defaultMessageStore = store; @@ -215,10 +216,12 @@ public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long } private String buildKey(final String topic, final String key) { - return topic + "#" + key; + reusableKeyBuilder.setLength(0); + return reusableKeyBuilder.append(topic).append('#').append(key).toString(); } private String buildKey(final String topic, final String key, final String indexType) { - return topic + "#" + indexType + "#" + key; + reusableKeyBuilder.setLength(0); + return reusableKeyBuilder.append(topic).append('#').append(indexType).append('#').append(key).toString(); } public void buildIndex(DispatchRequest req) { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java index 7d388171618..65110e784fd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -35,8 +36,8 @@ public class QueueOffsetOperator { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - private ConcurrentMap topicQueueTable = new ConcurrentHashMap<>(1024); - private ConcurrentMap batchTopicQueueTable = new ConcurrentHashMap<>(1024); + private ConcurrentMap topicQueueTable = new ConcurrentHashMap<>(1024); + private ConcurrentMap batchTopicQueueTable = new ConcurrentHashMap<>(1024); /** * {TOPIC}-{QUEUE_ID} --> NEXT Consume Queue Offset @@ -44,29 +45,33 @@ public class QueueOffsetOperator { private ConcurrentMap lmqTopicQueueTable = new ConcurrentHashMap<>(1024); public long getQueueOffset(String topicQueueKey) { - return ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L); + AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0)); + return counter.get(); } public Long getTopicQueueNextOffset(String topicQueueKey) { - return this.topicQueueTable.get(topicQueueKey); + AtomicLong counter = this.topicQueueTable.get(topicQueueKey); + return counter == null ? null : counter.get(); } public void increaseQueueOffset(String topicQueueKey, short messageNum) { - Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L); - topicQueueTable.put(topicQueueKey, queueOffset + messageNum); + AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0)); + counter.addAndGet(messageNum); } public void updateQueueOffset(String topicQueueKey, long offset) { - this.topicQueueTable.put(topicQueueKey, offset); + AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> new AtomicLong(0)); + counter.set(offset); } public long getBatchQueueOffset(String topicQueueKey) { - return ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); + AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> new AtomicLong(0)); + return counter.get(); } public void increaseBatchQueueOffset(String topicQueueKey, short messageNum) { - Long batchQueueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L); - this.batchTopicQueueTable.put(topicQueueKey, batchQueueOffset + messageNum); + AtomicLong counter = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> new AtomicLong(0)); + counter.addAndGet(messageNum); } public long getLmqOffset(String topic, int queueId, OffsetInitializer callback) throws ConsumeQueueException { @@ -94,8 +99,8 @@ public void increaseLmqOffset(String topic, int queueId, short delta) throws Con } public long currentQueueOffset(String topicQueueKey) { - Long currentQueueOffset = this.topicQueueTable.get(topicQueueKey); - return currentQueueOffset == null ? 0L : currentQueueOffset; + AtomicLong counter = this.topicQueueTable.get(topicQueueKey); + return counter == null ? 0L : counter.get(); } public synchronized void remove(String topic, Integer queueId) { @@ -109,7 +114,11 @@ public synchronized void remove(String topic, Integer queueId) { } public void setTopicQueueTable(ConcurrentMap topicQueueTable) { - this.topicQueueTable = topicQueueTable; + ConcurrentMap table = new ConcurrentHashMap<>(topicQueueTable.size() * 2); + for (Map.Entry entry : topicQueueTable.entrySet()) { + table.put(entry.getKey(), new AtomicLong(entry.getValue())); + } + this.topicQueueTable = table; } public void setLmqTopicQueueTable(ConcurrentMap lmqTopicQueueTable) { @@ -123,10 +132,18 @@ public void setLmqTopicQueueTable(ConcurrentMap lmqTopicQueueTable } public ConcurrentMap getTopicQueueTable() { - return topicQueueTable; + ConcurrentMap snapshot = new ConcurrentHashMap<>(this.topicQueueTable.size() * 2); + for (Map.Entry entry : this.topicQueueTable.entrySet()) { + snapshot.put(entry.getKey(), entry.getValue().get()); + } + return snapshot; } public void setBatchTopicQueueTable(ConcurrentMap batchTopicQueueTable) { - this.batchTopicQueueTable = batchTopicQueueTable; + ConcurrentMap table = new ConcurrentHashMap<>(batchTopicQueueTable.size() * 2); + for (Map.Entry entry : batchTopicQueueTable.entrySet()) { + table.put(entry.getKey(), new AtomicLong(entry.getValue())); + } + this.batchTopicQueueTable = table; } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java index 2d5ce382012..2459c4999a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java @@ -55,6 +55,8 @@ protected ByteBuffer initialValue() { }; private final int wheelLength; + private volatile boolean dirty; + private long snapOffset; public TimerWheel(String fileName, int slotsTotal, int precisionMs) throws IOException { @@ -128,17 +130,25 @@ public void flush() { if (mappedByteBuffer == null) { return; } + if (!dirty) { + return; + } ByteBuffer bf = localBuffer.get(); - bf.position(0); - bf.limit(wheelLength); - mappedByteBuffer.position(0); - mappedByteBuffer.limit(wheelLength); - for (int i = 0; i < wheelLength; i++) { - if (bf.get(i) != mappedByteBuffer.get(i)) { - mappedByteBuffer.put(i, bf.get(i)); + int longAligned = wheelLength & ~7; + for (int i = 0; i < longAligned; i += 8) { + long local = bf.getLong(i); + if (local != mappedByteBuffer.getLong(i)) { + mappedByteBuffer.putLong(i, local); + } + } + for (int i = longAligned; i < wheelLength; i++) { + byte b = bf.get(i); + if (b != mappedByteBuffer.get(i)) { + mappedByteBuffer.put(i, b); } } this.mappedByteBuffer.force(); + dirty = false; } /** @@ -287,11 +297,10 @@ public int getSlotIndex(long timeMs) { public void putSlot(long timeMs, long firstPos, long lastPos) { localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE); - // To be compatible with previous version. - // The previous version's precision is fixed at 1000ms and it store timeMs / 1000 in slot. localBuffer.get().putLong(timeMs / precisionMs); localBuffer.get().putLong(firstPos); localBuffer.get().putLong(lastPos); + dirty = true; } public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { @@ -301,6 +310,7 @@ public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic localBuffer.get().putLong(lastPos); localBuffer.get().putInt(num); localBuffer.get().putInt(magic); + dirty = true; } public void reviseSlot(long timeMs, long firstPos, long lastPos, boolean force) { @@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long lastPos, boolean force) } else { if (IGNORE != firstPos) { localBuffer.get().putLong(firstPos); + dirty = true; } else { localBuffer.get().getLong(); } if (IGNORE != lastPos) { localBuffer.get().putLong(lastPos); + dirty = true; } } } diff --git a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java index 058ad0b0208..de091f6cae1 100644 --- a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java @@ -45,7 +45,7 @@ public class BrokerStatsManagerTest { private BrokerStatsManager brokerStatsManager; private static final String TOPIC = "TOPIC_TEST"; - private static final Integer QUEUE_ID = 0; + private static final int QUEUE_ID = 0; private static final String GROUP_NAME = "GROUP_TEST"; private static final String CLUSTER_NAME = "DefaultCluster";