From 2279a41b305e61ecc2fd26a0b9dcf3d6766fd4d0 Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Tue, 30 Jun 2026 11:44:09 +0800 Subject: [PATCH] [ISSUE #10052] Add capacity check in PopBufferMergeService.addCkJustOffset to prevent OOM addCk() already guards against buffer overflow with popCkMaxBufferSize, but addCkJustOffset() did not have this check. Under high-concurrency POP consumption with large messages, the buffer ConcurrentHashMap could grow without bound, eventually causing OOM in PopBufferMergeService. Fix: Add the same counter > popCkMaxBufferSize guard at the beginning of addCkJustOffset(), consistent with addCk(). --- .../processor/PopBufferMergeService.java | 5 ++++ .../processor/PopBufferMergeServiceTest.java | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 5373eaea333..91bcc2b14b1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -445,6 +445,11 @@ private boolean checkQueueOk(PopCheckPointWrapper pointWrapper) { */ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { + if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) { + POP_LOGGER.warn("[PopBuffer]add ck just offset, max size, {}, {}", point, this.counter.get()); + return false; + } + PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset, true); if (this.buffer.containsKey(pointWrapper.getMergeKey())) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index 6cbbd9cfd92..ae01c9c1b21 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -234,4 +234,32 @@ public void testPutAckToStore() throws Exception { method.invoke(popBufferMergeService, pointWrapper, msgIndex, count); verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); } + + @Test + public void testAddCkJustOffsetMaxSize() { + // Set popCkMaxBufferSize to 0 so any addCkJustOffset should be rejected + when(brokerConfig.getPopCkMaxBufferSize()).thenReturn(0); + + PopCheckPoint point1 = mock(PopCheckPoint.class); + when(point1.getTopic()).thenReturn("topic1"); + when(point1.getCId()).thenReturn("cid1"); + when(point1.getQueueId()).thenReturn(0); + when(point1.getStartOffset()).thenReturn(0L); + when(point1.getPopTime()).thenReturn(0L); + when(point1.getBrokerName()).thenReturn(""); + + // counter starts at 0, 0 > 0 is false, so first add succeeds and counter becomes 1 + assertThat(popBufferMergeService.addCkJustOffset(point1, 0, 0, 0)).isTrue(); + + PopCheckPoint point2 = mock(PopCheckPoint.class); + when(point2.getTopic()).thenReturn("topic2"); + when(point2.getCId()).thenReturn("cid2"); + when(point2.getQueueId()).thenReturn(0); + when(point2.getStartOffset()).thenReturn(1L); + when(point2.getPopTime()).thenReturn(1L); + when(point2.getBrokerName()).thenReturn(""); + + // counter is now 1, 1 > 0 is true, so second add is rejected + assertThat(popBufferMergeService.addCkJustOffset(point2, 0, 0, 1)).isFalse(); + } }