diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java index 647669fde24..ea902d9baa0 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -338,7 +339,7 @@ public static void checkPhysicalQueueConsistence(Map checkAndBuildMappingItems(List mappingDetailList, boolean replace, boolean checkConsistence) { - mappingDetailList.sort((o1, o2) -> (int) (o2.getEpoch() - o1.getEpoch())); + mappingDetailList.sort(Comparator.comparingLong(TopicQueueMappingDetail::getEpoch).reversed()); int maxNum = 0; Map globalIdMap = new HashMap<>(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java index d4962e00a58..d6e435dd701 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/rpc/ClientMetadata.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.rpc; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -119,12 +120,16 @@ public static ConcurrentMap topicRouteData2EndpointsForSta Map topicQueueMappingInfoMap = mapEntry.getValue(); ConcurrentMap mqEndPoints = new ConcurrentHashMap<>(); List> mappingInfos = new ArrayList<>(topicQueueMappingInfoMap.entrySet()); - mappingInfos.sort((o1, o2) -> (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch())); + mappingInfos.sort(Comparator.comparingLong( + (Map.Entry entry) -> entry.getValue().getEpoch()).reversed()); int maxTotalNums = 0; long maxTotalNumOfEpoch = -1; for (Map.Entry entry : mappingInfos) { TopicQueueMappingInfo info = entry.getValue(); - if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { + if (info.getEpoch() > maxTotalNumOfEpoch) { + maxTotalNumOfEpoch = info.getEpoch(); + maxTotalNums = info.getTotalQueues(); + } else if (info.getEpoch() == maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { maxTotalNums = info.getTotalQueues(); } for (Map.Entry idEntry : entry.getValue().getCurrIdMap().entrySet()) { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java index a12c9f89200..ca72da24ed9 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtilsTest.java @@ -81,6 +81,32 @@ private void testIdToBroker(Map idToBroker, Map mappingDetails = new ArrayList<>(); + mappingDetails.add(buildMappingDetail(topic, oldBroker, oldEpoch)); + mappingDetails.add(buildMappingDetail(topic, newBroker, newEpoch)); + + Map globalIdMap = + TopicQueueMappingUtils.checkAndBuildMappingItems(mappingDetails, true, false); + + TopicQueueMappingOne mappingOne = globalIdMap.get(0); + Assert.assertEquals(newEpoch, mappingOne.getMappingDetail().getEpoch()); + Assert.assertEquals(newBroker, mappingOne.getBname()); + } + + private TopicQueueMappingDetail buildMappingDetail(String topic, String brokerName, long epoch) { + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 1, brokerName, epoch); + mappingDetail.getHostedQueues().put(0, new ArrayList<>(Collections.singletonList( + new LogicQueueMappingItem(0, 0, brokerName, 0, 0, -1, -1, -1)))); + return mappingDetail; + } + @Test public void testAllocator() { //stability diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java index a9f38854586..db647e634eb 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/ClientMetadataTest.java @@ -17,20 +17,24 @@ package org.apache.rocketmq.remoting.rpc; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -120,4 +124,80 @@ public void testTopicRouteData2EndpointsForStaticTopic() { ConcurrentMap actual = ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); assertEquals(1, actual.size()); } + + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpoch() { + String scope = "scope"; + String oldBroker = "oldBroker"; + String newBroker = "newBroker"; + long oldEpoch = 0L; + long newEpoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new LinkedHashMap<>(); + mappingInfos.put(oldBroker, buildMappingInfo(scope, oldBroker, oldEpoch)); + mappingInfos.put(newBroker, buildMappingInfo(scope, newBroker, newEpoch)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue mq = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 0); + assertEquals(newBroker, actual.get(mq)); + } + + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesLatestEpochTotalQueues() { + String scope = "scope"; + String oldBroker = "oldBroker"; + String newBroker = "newBroker"; + long oldEpoch = 0L; + long newEpoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new LinkedHashMap<>(); + mappingInfos.put(oldBroker, buildMappingInfo(scope, oldBroker, oldEpoch, 2)); + mappingInfos.put(newBroker, buildMappingInfo(scope, newBroker, newEpoch, 1)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue staleQueue = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 1); + assertEquals(1, actual.size()); + assertFalse(actual.containsKey(staleQueue)); + } + + @Test + public void testTopicRouteData2EndpointsForStaticTopicUsesMaxTotalQueuesInSameEpoch() { + String scope = "scope"; + String brokerA = "brokerA"; + String brokerB = "brokerB"; + long epoch = (long) Integer.MAX_VALUE + 1L; + TopicRouteData topicRouteData = new TopicRouteData(); + Map mappingInfos = new LinkedHashMap<>(); + mappingInfos.put(brokerA, buildMappingInfo(scope, brokerA, epoch, 1)); + mappingInfos.put(brokerB, buildMappingInfo(scope, brokerB, epoch, 2)); + topicRouteData.setTopicQueueMappingByBroker(mappingInfos); + + ConcurrentMap actual = + ClientMetadata.topicRouteData2EndpointsForStaticTopic(defaultTopic, topicRouteData); + + MessageQueue missingQueue = new MessageQueue(defaultTopic, TopicQueueMappingUtils.getMockBrokerName(scope), 1); + assertEquals(2, actual.size()); + assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST, actual.get(missingQueue)); + } + + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch) { + return buildMappingInfo(scope, brokerName, epoch, 1); + } + + private TopicQueueMappingInfo buildMappingInfo(String scope, String brokerName, long epoch, int totalQueues) { + TopicQueueMappingInfo info = new TopicQueueMappingInfo(); + info.setScope(scope); + info.setCurrIdMap(new ConcurrentHashMap<>()); + info.getCurrIdMap().put(0, 0); + info.setTotalQueues(totalQueues); + info.setBname(brokerName); + info.setEpoch(epoch); + return info; + } }