diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 13287f47c38..939035f3ee2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -44,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ProducerChangeListener; @@ -440,7 +441,7 @@ protected GrpcClientChannel registerConsumer(ProxyContext ctx, String consumerGr this.buildConsumeType(clientType), this.buildMessageModel(clientType), ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, - this.buildSubscriptionDataSet(subscriptionEntryList), + this.buildSubscriptionDataSet(ctx, consumerGroup, subscriptionEntryList), updateSubscription ); return channel; @@ -532,16 +533,38 @@ protected MessageModel buildMessageModel(ClientType clientType) { return MessageModel.CLUSTERING; } - protected Set buildSubscriptionDataSet(List subscriptionEntryList) { + protected Set buildSubscriptionDataSet(ProxyContext ctx, String consumerGroup, + List subscriptionEntryList) { Set subscriptionDataSet = new HashSet<>(); + ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(ctx, consumerGroup); for (SubscriptionEntry sub : subscriptionEntryList) { String topicName = sub.getTopic().getName(); FilterExpression filterExpression = sub.getExpression(); - subscriptionDataSet.add(buildSubscriptionData(topicName, filterExpression)); + SubscriptionData subscriptionData = buildSubscriptionData(topicName, filterExpression); + reuseSubscriptionVersion(consumerGroupInfo, subscriptionData); + subscriptionDataSet.add(subscriptionData); } return subscriptionDataSet; } + protected void reuseSubscriptionVersion(ConsumerGroupInfo consumerGroupInfo, SubscriptionData subscriptionData) { + if (consumerGroupInfo == null) { + return; + } + SubscriptionData oldSubscriptionData = consumerGroupInfo.findSubscriptionData(subscriptionData.getTopic()); + if (oldSubscriptionData == null) { + return; + } + // FilterAPI.build creates a fresh subVersion every time gRPC settings rebuild the + // subscription. Normalize the version before equals so unchanged subscriptions do + // not look like real updates; restore it below when the subscription content differs. + long subVersion = subscriptionData.getSubVersion(); + subscriptionData.setSubVersion(oldSubscriptionData.getSubVersion()); + if (!oldSubscriptionData.equals(subscriptionData)) { + subscriptionData.setSubVersion(subVersion); + } + } + protected SubscriptionData buildSubscriptionData(String topicName, FilterExpression filterExpression) { String expression = filterExpression.getExpression(); String expressionType = GrpcConverter.getInstance().buildExpressionType(filterExpression.getType()); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java index 532c9795c87..22c5d0acbb7 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java @@ -43,7 +43,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.lite.LiteSubscriptionDTO; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; @@ -58,6 +60,8 @@ import org.apache.rocketmq.remoting.protocol.body.CMResult; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.assertj.core.util.Lists; import org.junit.Before; @@ -212,6 +216,72 @@ public void testConsumerHeartbeat() throws Throwable { assertEquals("tag", data.getSubString()); } + @Test + public void testConsumerHeartbeatReuseSubVersionForSameSubscription() throws Throwable { + ProxyContext context = createContext(); + when(grpcClientSettingsManager.getClientSettings(any())).thenReturn(buildConsumerSettings("tag")); + mockConsumerGroupInfo(buildTagSubscriptionData("tag", 123L)); + + ArgumentCaptor> subscriptionDatasArgumentCaptor = ArgumentCaptor.forClass(Set.class); + doNothing().when(this.messagingProcessor).registerConsumer(any(), anyString(), any(), any(), any(), any(), + subscriptionDatasArgumentCaptor.capture(), anyBoolean()); + + HeartbeatResponse response = this.sendConsumerHeartbeat(context); + + assertEquals(Code.OK, response.getStatus().getCode()); + SubscriptionData data = subscriptionDatasArgumentCaptor.getValue().stream().findAny().get(); + assertThat(data.getSubVersion()).isEqualTo(123L); + } + + @Test + public void testConsumerHeartbeatUseNewSubVersionWhenSubscriptionChanged() throws Throwable { + ProxyContext context = createContext(); + when(grpcClientSettingsManager.getClientSettings(any())).thenReturn(buildConsumerSettings("tagB")); + mockConsumerGroupInfo(buildTagSubscriptionData("tagA", 123L)); + + ArgumentCaptor> subscriptionDatasArgumentCaptor = ArgumentCaptor.forClass(Set.class); + doNothing().when(this.messagingProcessor).registerConsumer(any(), anyString(), any(), any(), any(), any(), + subscriptionDatasArgumentCaptor.capture(), anyBoolean()); + + HeartbeatResponse response = this.sendConsumerHeartbeat(context); + + assertEquals(Code.OK, response.getStatus().getCode()); + SubscriptionData data = subscriptionDatasArgumentCaptor.getValue().stream().findAny().get(); + assertThat(data.getSubVersion()).isGreaterThan(123L); + assertThat(data.getSubString()).isEqualTo("tagB"); + } + + private Settings buildConsumerSettings(String tag) { + return Settings.newBuilder() + .setClientType(ClientType.PUSH_CONSUMER) + .setSubscription(Subscription.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .addSubscriptions(SubscriptionEntry.newBuilder() + .setExpression(FilterExpression.newBuilder() + .setExpression(tag) + .setType(FilterType.TAG) + .build()) + .setTopic(Resource.newBuilder().setName(TOPIC).build()) + .build()) + .build()) + .build(); + } + + private void mockConsumerGroupInfo(SubscriptionData subscriptionData) { + ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo(CONSUMER_GROUP, ConsumeType.CONSUME_PASSIVELY, + MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + consumerGroupInfo.getSubscriptionTable().put(TOPIC, subscriptionData); + when(this.messagingProcessor.getConsumerGroupInfo(any(), anyString())).thenReturn(consumerGroupInfo); + } + + private SubscriptionData buildTagSubscriptionData(String tag, long subVersion) { + SubscriptionData subscriptionData = new SubscriptionData(TOPIC, tag); + subscriptionData.getTagsSet().add(tag); + subscriptionData.getCodeSet().add(tag.hashCode()); + subscriptionData.setSubVersion(subVersion); + return subscriptionData; + } + protected void assertClientChannelInfo(ClientChannelInfo clientChannelInfo, String group) { assertEquals(LanguageCode.JAVA, clientChannelInfo.getLanguage()); assertEquals(CLIENT_ID, clientChannelInfo.getClientId());