From 6434cbd52f58d651edf3b31d939747e5a8c3f495 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 23:20:02 +0800 Subject: [PATCH] Prevent POP revive checkpoint ordering overflow POP revive checkpoints are ordered by reviveOffset before mergeAndRevive advances through the pending checkpoints. The previous comparator subtracted two long offsets and cast the result to int, so large offset gaps could reverse the intended ordering. Use Comparator.comparingLong to preserve the full long ordering and add a regression test covering offsets separated by more than Integer.MAX_VALUE. Constraint: reviveOffset is a long queue offset and can exceed int comparison range Rejected: Keep subtraction with a wider cast | Comparator APIs express the ordering directly and avoid overflow Confidence: high Scope-risk: narrow Directive: Do not compare long queue offsets by subtraction in POP revive ordering Tested: mvn -pl broker -Dtest=PopReviveServiceTest test -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: git diff --check --- .../broker/processor/PopReviveService.java | 3 ++- .../broker/processor/PopReviveServiceTest.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 07f16e98965..e5eec54f5b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -53,6 +53,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -725,7 +726,7 @@ ArrayList genSortList() { return sortList; } sortList = new ArrayList<>(map.values()); - sortList.sort((o1, o2) -> (int) (o1.getReviveOffset() - o2.getReviveOffset())); + sortList.sort(Comparator.comparingLong(PopCheckPoint::getReviveOffset)); return sortList; } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index fa7e9982e1f..7b9f9e7fddc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -442,6 +442,20 @@ public void testReviveMsgFromBatchAck() throws Throwable { assertEquals(1, commitOffsetCaptor.getValue().longValue()); } + @Test + public void testGenSortListShouldSortLargeReviveOffsets() { + PopReviveService.ConsumeReviveObj consumeReviveObj = new PopReviveService.ConsumeReviveObj(); + PopCheckPoint lowOffsetCk = buildPopCheckPoint(0, 0, 1); + PopCheckPoint highOffsetCk = buildPopCheckPoint(1, 0, (long) Integer.MAX_VALUE + 2); + consumeReviveObj.map.put("high", highOffsetCk); + consumeReviveObj.map.put("low", lowOffsetCk); + + List sortList = consumeReviveObj.genSortList(); + + assertEquals(lowOffsetCk, sortList.get(0)); + assertEquals(highOffsetCk, sortList.get(1)); + } + public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) { MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC, batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs, PopMessageProcessor.genAckUniqueId(batchAckMsg)); result.setQueueOffset(reviveOffset);