From cc9c594cd9201a9c155699659d42f8d2ff608714 Mon Sep 17 00:00:00 2001 From: liuhy Date: Thu, 2 Jul 2026 21:38:55 +0800 Subject: [PATCH] Prevent stale master election from offset comparator overflow DefaultElectPolicy sorted broker candidates by subtracting maxOffset values and casting the long delta to int. Large offset gaps can overflow the comparator result and rank a lower-offset broker first. This replaces subtraction with safe comparator helpers and adds a focused overflow regression test. Constraint: Preserve existing election order: higher epoch, higher maxOffset, lower electionPriority Rejected: Keep subtraction comparator | unsafe for long offset deltas greater than Integer.MAX_VALUE Confidence: high Scope-risk: narrow Tested: mvn -q -pl controller -DskipTests=false -Dtest=DefaultElectPolicyTest -Djacoco.skip=true test Tested: mvn -q -pl controller -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Tested: mvn -q -DskipTests compile -Dspotbugs.skip=true -Dcheckstyle.skip=true Not-tested: Full controller suite on local JDK due existing JaCoCo/Hessian module-access failures Related: #10578 --- .../elect/impl/DefaultElectPolicy.java | 15 ++++-- .../elect/impl/DefaultElectPolicyTest.java | 50 +++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 controller/src/test/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicyTest.java diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java index da3b3ed30e4..44bc1c8c4b1 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicy.java @@ -37,12 +37,17 @@ public class DefaultElectPolicy implements ElectPolicy { // Sort in descending order according to, and sort in ascending order according to priority private final Comparator comparator = (o1, o2) -> { - if (o1.getEpoch() == o2.getEpoch()) { - return o1.getMaxOffset() == o2.getMaxOffset() ? o1.getElectionPriority() - o2.getElectionPriority() : - (int) (o2.getMaxOffset() - o1.getMaxOffset()); - } else { - return o2.getEpoch() - o1.getEpoch(); + int epochCompare = Integer.compare(o2.getEpoch(), o1.getEpoch()); + if (epochCompare != 0) { + return epochCompare; } + + int offsetCompare = Long.compare(o2.getMaxOffset(), o1.getMaxOffset()); + if (offsetCompare != 0) { + return offsetCompare; + } + + return Integer.compare(o1.getElectionPriority(), o2.getElectionPriority()); }; public DefaultElectPolicy(BrokerValidPredicate validPredicate, BrokerLiveInfoGetter brokerLiveInfoGetter) { diff --git a/controller/src/test/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicyTest.java b/controller/src/test/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicyTest.java new file mode 100644 index 00000000000..9857e31f352 --- /dev/null +++ b/controller/src/test/java/org/apache/rocketmq/controller/elect/impl/DefaultElectPolicyTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.controller.elect.impl; + +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DefaultElectPolicyTest { + + @Test + public void testElectPrefersHigherOffsetWhenOffsetDifferenceExceedsIntegerMaxValue() { + long lowerOffset = 0L; + long higherOffset = (long) Integer.MAX_VALUE + 1L; + Set brokers = new HashSet<>(); + brokers.add(1L); + brokers.add(2L); + + DefaultElectPolicy electPolicy = new DefaultElectPolicy((clusterName, brokerName, brokerId) -> true, + (clusterName, brokerName, brokerId) -> { + if (brokerId == 1L) { + return new BrokerLiveInfo(brokerName, "127.0.0.1:10911", brokerId, + System.currentTimeMillis(), 3000L, null, 1, lowerOffset, 0); + } + return new BrokerLiveInfo(brokerName, "127.0.0.1:10912", brokerId, + System.currentTimeMillis(), 3000L, null, 1, higherOffset, 0); + }); + + Long electedBrokerId = electPolicy.elect("cluster", "broker", brokers, null, null, null); + + assertEquals(Long.valueOf(2L), electedBrokerId); + } +}