From f9108cb5d6574c0701b5bda9277e47af243cadf4 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 21:52:28 +0800 Subject: [PATCH 1/5] Prevent stale static topic mappings from epoch comparator overflow Static topic routing and mapping validation sorted epochs by subtracting long values and casting the result to int. Large epoch gaps can overflow the comparator result and allow stale mapping metadata to be processed before newer metadata. Constraint: Preserve existing ordering semantics: higher epoch first Rejected: Keep subtraction comparator | unsafe for long epoch deltas greater than Integer.MAX_VALUE Confidence: high Scope-risk: narrow Tested: mvn -q -pl remoting -DskipTests=false -Dtest=ClientMetadataTest,TopicQueueMappingUtilsTest -Djacoco.skip=true test Tested: mvn -q -pl remoting -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: mvn -q -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check Related: #10580 --- .../statictopic/TopicQueueMappingUtils.java | 2 +- .../rocketmq/remoting/rpc/ClientMetadata.java | 2 +- .../TopicQueueMappingUtilsTest.java | 26 +++++++++++++++ .../remoting/rpc/ClientMetadataTest.java | 32 +++++++++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java index 647669fde24..7fbc2b3d906 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java @@ -338,7 +338,7 @@ public static void checkPhysicalQueueConsistence(Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { - mappingDetailList.sort((o1, o2) -> (int) (o2.getEpoch() - o1.getEpoch())); + mappingDetailList.sort((o1, o2) -> Long.compare(o2.getEpoch(), o1.getEpoch())); int maxNum = 0; Map globalIdMap = new HashMap<>(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java index d4962e00a58..61f0b3388d1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java @@ -119,7 +119,7 @@ public static ConcurrentMap topicRouteData2EndpointsForSta Map topicQueueMappingInfoMap = mapEntry.getValue(); ConcurrentMap mqEndPoints = new ConcurrentHashMap<>(); List> mappingInfos = new ArrayList<>(topicQueueMappingInfoMap.entrySet()); - mappingInfos.sort((o1, o2) -> (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch())); + mappingInfos.sort((o1, o2) -> Long.compare(o2.getValue().getEpoch(), o1.getValue().getEpoch())); int maxTotalNums = 0; long maxTotalNumOfEpoch = -1; for (Map.Entry entry : mappingInfos) { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java index a12c9f89200..ca72da24ed9 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java @@ -81,6 +81,32 @@ private void testIdToBroker(Map idToBroker, Map mappingDetails = new ArrayList<>(); + mappingDetails.add(buildMappingDetail(topic, oldBroker, oldEpoch)); + mappingDetails.add(buildMappingDetail(topic, newBroker, newEpoch)); + + Map globalIdMap = + TopicQueueMappingUtils.checkAndBuildMappingItems(mappingDetails, true, false); + + TopicQueueMappingOne mappingOne = globalIdMap.get(0); + Assert.assertEquals(newEpoch, mappingOne.getMappingDetail().getEpoch()); + Assert.assertEquals(newBroker, mappingOne.getBname()); + } + + private TopicQueueMappingDetail buildMappingDetail(String topic, String brokerName, long epoch) { + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 1, brokerName, epoch); + mappingDetail.getHostedQueues().put(0, new ArrayList<>(Collections.singletonList( + new LogicQueueMappingItem(0, 0, brokerName, 0, 0, -1, -1, -1)))); + return mappingDetail; + } + @Test public void testAllocator() { //stability diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java index a9f38854586..b4e8250c619 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -120,4 +121,35 @@ public void testTopicRouteData2EndpointsForStaticTopic() { ConcurrentMap actual = ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); assertEquals(1, actual.size()); } + + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpoch() { + String scope = "scope"; + String oldBroker = "oldBroker"; + String newBroker = "newBroker"; + long oldEpoch = 0L; + long newEpoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new HashMap<>(); + mappingInfos.put(oldBroker, buildMappingInfo(scope, oldBroker, oldEpoch)); + mappingInfos.put(newBroker, buildMappingInfo(scope, newBroker, newEpoch)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue mq = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 0); + assertEquals(newBroker, actual.get(mq)); + } + + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch) { + TopicQueueMappingInfo info = new TopicQueueMappingInfo(); + info.setScope(scope); + info.setCurrIdMap(new ConcurrentHashMap<>()); + info.getCurrIdMap().put(0, 0); + info.setTotalQueues(1); + info.setBname(brokerName); + info.setEpoch(epoch); + return info; + } } From 1efb20caafb8d95fce10d671ccdbe9a78e44fc57 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 22:22:36 +0800 Subject: [PATCH 2/5] Stabilize static topic epoch routing regression test The ClientMetadata regression test now uses LinkedHashMap so the old epoch mapping is processed before the new epoch mapping deterministically. This makes the overflow scenario reproducible under the previous subtraction comparator instead of depending on HashMap iteration order. Constraint: Address PR review feedback without changing production semantics Rejected: Keep HashMap in regression setup | iteration order is unspecified and can hide the old bug Confidence: high Scope-risk: narrow Tested: mvn -q -pl remoting -DskipTests=false -Dtest=ClientMetadataTest,TopicQueueMappingUtilsTest -Djacoco.skip=true test Tested: mvn -q -pl remoting -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check Related: #10580 --- .../org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java index b4e8250c619..10daf604c48 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java @@ -27,6 +27,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -130,7 +131,7 @@ public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpoch() { long oldEpoch = 0L; long newEpoch = (long) Integer.MAX_VALUE + 1L; TopicRouteData topicRouteData = new TopicRouteData(); - Map mappingInfos = new HashMap<>(); + Map mappingInfos = new LinkedHashMap<>(); mappingInfos.put(oldBroker, buildMappingInfo(scope, oldBroker, oldEpoch)); mappingInfos.put(newBroker, buildMappingInfo(scope, newBroker, newEpoch)); topicRouteData.setTopicQueueMappingByBroker(mappingInfos); From f5ed2c9ddd4df13ddbb6cd57e2f8bdee61530ba0 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 22:26:12 +0800 Subject: [PATCH 3/5] Clarify static topic epoch comparator ordering Use Comparator.comparingLong(...).reversed() for static topic epoch sorting so the higher-epoch-first order is explicit while keeping the overflow-safe comparison introduced for #10580. Constraint: Preserve the current higher-epoch-first semantics Rejected: Keep argument-swapped Long.compare | correct but less self-descriptive per PR review Confidence: high Scope-risk: narrow Tested: mvn -q -pl remoting -DskipTests=false -Dtest=ClientMetadataTest,TopicQueueMappingUtilsTest -Djacoco.skip=true test Tested: mvn -q -pl remoting -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check Related: #10580 --- .../remoting/protocol/statictopic/TopicQueueMappingUtils.java | 3 ++- .../java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java index 7fbc2b3d906..ea902d9baa0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -338,7 +339,7 @@ public static void checkPhysicalQueueConsistence(Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { - mappingDetailList.sort((o1, o2) -> Long.compare(o2.getEpoch(), o1.getEpoch())); + mappingDetailList.sort(Comparator.comparingLong(TopicQueueMappingDetail::getEpoch).reversed()); int maxNum = 0; Map globalIdMap = new HashMap<>(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java index 61f0b3388d1..f3010ac5cf5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.rpc; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -119,7 +120,8 @@ public static ConcurrentMap topicRouteData2EndpointsForSta Map topicQueueMappingInfoMap = mapEntry.getValue(); ConcurrentMap mqEndPoints = new ConcurrentHashMap<>(); List> mappingInfos = new ArrayList<>(topicQueueMappingInfoMap.entrySet()); - mappingInfos.sort((o1, o2) -> Long.compare(o2.getValue().getEpoch(), o1.getValue().getEpoch())); + mappingInfos.sort(Comparator.comparingLong( + (Map.Entry entry) -> entry.getValue().getEpoch()).reversed()); int maxTotalNums = 0; long maxTotalNumOfEpoch = -1; for (Map.Entry entry : mappingInfos) { From 3737b8d15f97e2a050fe637b26a19eec03adf819 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 22:32:54 +0800 Subject: [PATCH 4/5] Keep static topic total queues scoped to latest epoch ClientMetadata tracked maxTotalNumOfEpoch but never updated it, so route construction could use a larger totalQueues value from stale lower-epoch metadata. Update the tracked epoch when a newer mapping is seen and only compare totalQueues within the latest epoch. Constraint: Stay within static-topic epoch routing behavior touched by #10580 Rejected: Leave maxTotalNumOfEpoch unchanged | stale epoch metadata can create extra logical queues Confidence: high Scope-risk: narrow Tested: mvn -q -pl remoting -DskipTests=false -Dtest=ClientMetadataTest,TopicQueueMappingUtilsTest -Djacoco.skip=true test Tested: mvn -q -pl remoting -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: mvn -q -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check Related: #10580 --- .../rocketmq/remoting/rpc/ClientMetadata.java | 5 +++- .../remoting/rpc/ClientMetadataTest.java | 28 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java index f3010ac5cf5..d6e435dd701 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java @@ -126,7 +126,10 @@ public static ConcurrentMap topicRouteData2EndpointsForSta long maxTotalNumOfEpoch = -1; for (Map.Entry entry : mappingInfos) { TopicQueueMappingInfo info = entry.getValue(); - if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { + if (info.getEpoch() > maxTotalNumOfEpoch) { + maxTotalNumOfEpoch = info.getEpoch(); + maxTotalNums = info.getTotalQueues(); + } else if (info.getEpoch() == maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { maxTotalNums = info.getTotalQueues(); } for (Map.Entry idEntry : entry.getValue().getCurrIdMap().entrySet()) { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java index 10daf604c48..14452b0896f 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -143,12 +144,37 @@ public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpoch() { assertEquals(newBroker, actual.get(mq)); } + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpochTotalQueues() { + String scope = "scope"; + String oldBroker = "oldBroker"; + String newBroker = "newBroker"; + long oldEpoch = 0L; + long newEpoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new LinkedHashMap<>(); + mappingInfos.put(oldBroker, buildMappingInfo(scope, oldBroker, oldEpoch, 2)); + mappingInfos.put(newBroker, buildMappingInfo(scope, newBroker, newEpoch, 1)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue staleQueue = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 1); + assertEquals(1, actual.size()); + assertFalse(actual.containsKey(staleQueue)); + } + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch) { + return buildMappingInfo(scope, brokerName, epoch, 1); + } + + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch, int totalQueues) { TopicQueueMappingInfo info = new TopicQueueMappingInfo(); info.setScope(scope); info.setCurrIdMap(new ConcurrentHashMap<>()); info.getCurrIdMap().put(0, 0); - info.setTotalQueues(1); + info.setTotalQueues(totalQueues); info.setBname(brokerName); info.setEpoch(epoch); return info; From 20696afafa982c8666028b04a21620c3dd30613d Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 23:27:58 +0800 Subject: [PATCH 5/5] Cover same-epoch static topic queue sizing Codecov reported partial patch coverage for the ClientMetadata maxTotalNums branch added in the static topic epoch fix. The existing tests covered lower-epoch entries being ignored but did not exercise the same-epoch path that expands totalQueues. Add a focused regression test with two mappings in the latest epoch and different totalQueues so the same-epoch maxTotalNums branch is covered. Constraint: Keep the production fix unchanged; this only addresses missing branch coverage Confidence: high Scope-risk: narrow Tested: mvn -pl remoting -Dtest=ClientMetadataTest,TopicQueueMappingUtilsTest test -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check --- .../remoting/rpc/ClientMetadataTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java index 14452b0896f..db647e634eb 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.rpc; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; @@ -165,6 +166,26 @@ public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpochTotalQueues assertFalse(actual.containsKey(staleQueue)); } + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesMaxTotalQueuesInSameEpoch() { + String scope = "scope"; + String brokerA = "brokerA"; + String brokerB = "brokerB"; + long epoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new LinkedHashMap<>(); + mappingInfos.put(brokerA, buildMappingInfo(scope, brokerA, epoch, 1)); + mappingInfos.put(brokerB, buildMappingInfo(scope, brokerB, epoch, 2)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue missingQueue = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 1); + assertEquals(2, actual.size()); + assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST, actual.get(missingQueue)); + } + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch) { return buildMappingInfo(scope, brokerName, epoch, 1); }