diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index cc3e37bf4b3..1fb0134feea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -62,6 +62,9 @@ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map pr decoded = true; tempProperties = MessageDecoder.decodeProperties(msgBuffer); } + if (tempProperties == null) { + return false; + } String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC); String group = KeyBuilder.parseGroup(subscriptionData.getTopic()); realFilterData = this.consumerFilterManager.get(realTopic, group); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index 02deeb18a7a..72b67bb2815 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -343,15 +343,18 @@ private CompletableFuture appendCheckPointThenAckOrigin( this.brokerController.getBrokerStatsManager().incBrokerCkNums(1); this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); } - } - if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT - && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { - POP_LOGGER.error("change invisible, put new ck error: {}", putMessageResult); - return CompletableFuture.completedFuture(false); + if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT + && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) { + POP_LOGGER.error("change invisible, put new ck error: {}", putMessageResult); + return CompletableFuture.completedFuture(false); + } else { + return ackOrigin(requestHeader, extraInfo); + } } else { - return ackOrigin(requestHeader, extraInfo); + POP_LOGGER.error("change invisible, put new ck error: putMessageResult is null"); + return CompletableFuture.completedFuture(false); } }).exceptionally(throwable -> { POP_LOGGER.error("change invisible, put new ck error", throwable); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 9e20ecd9b6a..b1baa02c6fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -93,7 +93,7 @@ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean f return; } if (force) { - //bakeup the old items + //backup the old items oldDetail.getHostedQueues().forEach((queueId, items) -> { newDetail.getHostedQueues().putIfAbsent(queueId, items); });