From 0bcfc6d442338e56649353a2952d09472eb3ce3e Mon Sep 17 00:00:00 2001 From: enkilee Date: Thu, 2 Jul 2026 10:03:33 +0800 Subject: [PATCH 1/2] Signed-off-by: enkilee fix null point bug and typo error --- .../ExpressionForRetryMessageFilter.java | 3 +++ .../ChangeInvisibleTimeProcessor.java | 19 +++++++++++-------- .../topic/TopicQueueMappingManager.java | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index cc3e37bf4b3..304fb0713b1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -62,6 +62,9 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr decoded = true; tempProperties = MessageDecoder.decodeProperties(msgBuffer); } + if (tempProperties == null) { + return true; + } String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); String group = KeyBuilder.parseGroup(subscriptionData.getTopic()); realFilterData = this.consumerFilterManager.get(realTopic, group); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index 02deeb18a7a..72b67bb2815 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -343,15 +343,18 @@ private CompletableFuture appendCheckPointThenAckOrigin( this.brokerController.getBrokerStatsManager().incBrokerCkNums(1); this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); } - } - if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { - POP_LOGGER.error("change invisible, put new ck error: {}", putMessageResult); - return CompletableFuture.completedFuture(false); + if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { + POP_LOGGER.error("change invisible, put new ck error: {}", putMessageResult); + return CompletableFuture.completedFuture(false); + } else { + return ackOrigin(requestHeader, extraInfo); + } } else { - return ackOrigin(requestHeader, extraInfo); + POP_LOGGER.error("change invisible, put new ck error: putMessageResult is null"); + return CompletableFuture.completedFuture(false); } }).exceptionally(throwable -> { POP_LOGGER.error("change invisible, put new ck error", throwable); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 9e20ecd9b6a..b1baa02c6fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -93,7 +93,7 @@ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean f return; } if (force) { - //bakeup the old items + //backup the old items oldDetail.getHostedQueues().forEach((queueId, items) -> { newDetail.getHostedQueues().putIfAbsent(queueId, items); }); From d12c157a8b3328b4426643e25b68ff90f1231b5b Mon Sep 17 00:00:00 2001 From: enkilee Date: Thu, 2 Jul 2026 13:11:23 +0800 Subject: [PATCH 2/2] Signed-off-by: enkilee fixed by rockermq-ai --- .../rocketmq/broker/filter/ExpressionForRetryMessageFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index 304fb0713b1..1fb0134feea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -63,7 +63,7 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr tempProperties = MessageDecoder.decodeProperties(msgBuffer); } if (tempProperties == null) { - return true; + return false; } String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); String group = KeyBuilder.parseGroup(subscriptionData.getTopic());