From 34ee71367d9ff97bc164b55239c0c285a7d9bb60 Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Thu, 12 Mar 2026 19:58:46 +0800 Subject: [PATCH 1/5] support simple consumer supports subscribing to multiple topics --- .../pom.xml | 6 ++- .../pom.xml | 2 +- .../rocketmq-v5-client-consume-demo/pom.xml | 2 +- .../pom.xml | 20 ++++++++ .../V5SimpleConsumerConsumerApplication.java | 46 +++++++++++++++++++ .../src/main/resources/application.properties | 23 ++++++++++ .../pom.xml | 2 +- .../pom.xml | 2 +- .../pom.xml | 2 +- .../rocketmq-v5-client-producer-demo/pom.xml | 2 +- .../pom.xml | 2 +- .../RocketMQAutoConfiguration.java | 13 ++++-- .../autoconfigure/RocketMQProperties.java | 37 +++++++++++++++ .../rocketmq/client/support/RocketMQUtil.java | 12 +++++ 14 files changed, 159 insertions(+), 12 deletions(-) create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml index ae931911..d8559c9a 100644 --- a/rocketmq-v5-client-spring-boot-samples/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples pom - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-spring-boot-samples rocketmq-v5-client-spring-boot-samples @@ -35,7 +35,9 @@ rocketmq-v5-client-producer-simple-demo rocketmq-v5-client-consumer-simple-demo rocketmq-v5-client-consumer-push-simple-demo - + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + 1.8 diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml index a73027b6..cc600607 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml index 0b2c891b..2b5d46a8 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml new file mode 100644 index 00000000..98de13d6 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + org.apache.rocketmq + rocketmq-v5-client-spring-boot-samples + 2.3.6-SNAPSHOT + + + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java new file mode 100644 index 00000000..35b4b579 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.springboot; + +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import javax.annotation.Resource; +import java.time.Duration; +import java.util.List; + +@SpringBootApplication +public class V5SimpleConsumerConsumerApplication implements CommandLineRunner { + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + public static void main(String[] args) { + SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args); + } + + @Override + public void run(String... args) throws Exception { + for (int i = 0; i < 10; i++) { + List messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(60)); + System.out.println(messageList); + } + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties new file mode 100644 index 00000000..effdb118 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rocketmq.simple-consumer.endpoints=localhost:8081 +rocketmq.simple-consumer.consumer-group=demo-group +rocketmq.simple-consumer.filter-expression-map.demo-topic1.tag=tagA +rocketmq.simple-consumer.filter-expression-map.demo-topic1.filter-expression-type=tag +rocketmq.simple-consumer.filter-expression-map.demo-topic2.tag=tagB +rocketmq.simple-consumer.filter-expression-map.demo-topic2.filter-expression-type=tag +#rocketmq.simple-consumer.access-key= +#rocketmq.simple-consumer.secret-key= diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml index 1a20a74c..b67eeead 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-push-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml index c997133f..8008c7ec 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml index 22b190cd..cd97b8bf 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml index 6c23b307..a4e155a5 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml index b6769604..e72039fb 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-simple-demo diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..85052dbd 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -46,6 +46,7 @@ import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Objects; @Configuration @@ -105,7 +106,6 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration); @@ -116,9 +116,16 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr if (StringUtils.hasLength(consumerGroup)) { simpleConsumerBuilder.setConsumerGroup(consumerGroup); } + // Set the subscription for the consumer. - if (Objects.nonNull(filterExpression)) { - simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + if (simpleConsumer.getFilterExpressionMap().isEmpty()) { + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); + if (Objects.nonNull(filterExpression)) { + simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + } + } else { + Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getFilterExpressionMap()); + simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); } return simpleConsumerBuilder; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..8d961e5e 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -18,6 +18,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.Map; + @SuppressWarnings("WeakerAccess") @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { @@ -211,6 +213,11 @@ public static class SimpleConsumer { private String namespace = ""; + /** + * key is topic + */ + private Map filterExpressionMap; + public String getAccessKey() { return accessKey; } @@ -299,6 +306,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public Map getFilterExpressionMap() { + return filterExpressionMap; + } + + public void setFilterExpressionMap(Map filterExpressionMap) { + this.filterExpressionMap = filterExpressionMap; + } + @Override public String toString() { return "SimpleConsumer{" + @@ -315,4 +330,26 @@ public String toString() { } } + public static class FilterExpression { + private String tag; + + private String filterExpressionType; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getFilterExpressionType() { + return filterExpressionType; + } + + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; + } + } + } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 55f3948f..187f399c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -34,6 +34,8 @@ import java.nio.charset.Charset; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class RocketMQUtil { @@ -184,4 +186,14 @@ public static FilterExpression createFilterExpression(String tag, String type) { FilterExpression filterExpression = new FilterExpression(tag, filterExpressionType); return filterExpression; } + + public static Map createSubscriptionExpressions(Map map) { + Map subscriptionExpressions = new HashMap<>(); + map.forEach((topic, expression) -> { + FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; + FilterExpression filterExpression = new FilterExpression(expression.getTag(), filterExpressionType); + subscriptionExpressions.put(topic, filterExpression); + }); + return subscriptionExpressions; + } } From 407551380b0f65ca34cecd01e92e0efabe9c5e33 Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Thu, 12 Mar 2026 20:15:38 +0800 Subject: [PATCH 2/5] support simple consumer supports subscribing to multiple topics --- .../pom.xml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml index 98de13d6..af13654f 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml @@ -1,4 +1,20 @@ + From b62097b3b4fe709e8fedf3a00f6cb9e7dfbb4052 Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Thu, 12 Mar 2026 21:09:26 +0800 Subject: [PATCH 3/5] support simple consumer supports subscribing to multiple topics --- .../src/main/resources/application.properties | 11 +++--- .../RocketMQAutoConfiguration.java | 4 +-- .../autoconfigure/RocketMQProperties.java | 34 ++++--------------- .../client/common/FilterExpression.java | 23 +++++++++++++ .../rocketmq/client/support/RocketMQUtil.java | 2 +- 5 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties index effdb118..327970f8 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties @@ -14,10 +14,11 @@ # limitations under the License. rocketmq.simple-consumer.endpoints=localhost:8081 -rocketmq.simple-consumer.consumer-group=demo-group -rocketmq.simple-consumer.filter-expression-map.demo-topic1.tag=tagA -rocketmq.simple-consumer.filter-expression-map.demo-topic1.filter-expression-type=tag -rocketmq.simple-consumer.filter-expression-map.demo-topic2.tag=tagB -rocketmq.simple-consumer.filter-expression-map.demo-topic2.filter-expression-type=tag +rocketmq.simple-consumer.consumer-group=localhost:8081 +rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA +rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag +rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB +rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag #rocketmq.simple-consumer.access-key= #rocketmq.simple-consumer.secret-key= + diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 85052dbd..fde2d72d 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -118,13 +118,13 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr } // Set the subscription for the consumer. - if (simpleConsumer.getFilterExpressionMap().isEmpty()) { + if (simpleConsumer.getSubscriptionExpressions().isEmpty()) { FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); if (Objects.nonNull(filterExpression)) { simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); } } else { - Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getFilterExpressionMap()); + Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getSubscriptionExpressions()); simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); } return simpleConsumerBuilder; diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8d961e5e..d1f796e7 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.autoconfigure; +import org.apache.rocketmq.client.common.FilterExpression; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.Map; @@ -216,7 +217,7 @@ public static class SimpleConsumer { /** * key is topic */ - private Map filterExpressionMap; + private Map subscriptionExpressions; public String getAccessKey() { return accessKey; @@ -306,12 +307,12 @@ public void setNamespace(String namespace) { this.namespace = namespace; } - public Map getFilterExpressionMap() { - return filterExpressionMap; + public Map getSubscriptionExpressions() { + return subscriptionExpressions; } - public void setFilterExpressionMap(Map filterExpressionMap) { - this.filterExpressionMap = filterExpressionMap; + public void setSubscriptionExpressions(Map subscriptionExpressions) { + this.subscriptionExpressions = subscriptionExpressions; } @Override @@ -329,27 +330,4 @@ public String toString() { '}'; } } - - public static class FilterExpression { - private String tag; - - private String filterExpressionType; - - public String getTag() { - return tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - public String getFilterExpressionType() { - return filterExpressionType; - } - - public void setFilterExpressionType(String filterExpressionType) { - this.filterExpressionType = filterExpressionType; - } - } - } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java new file mode 100644 index 00000000..3898b59c --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java @@ -0,0 +1,23 @@ +package org.apache.rocketmq.client.common; + +public class FilterExpression { + private String tag; + + private String filterExpressionType; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getFilterExpressionType() { + return filterExpressionType; + } + + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; + } + } \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 187f399c..33347717 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -187,7 +187,7 @@ public static FilterExpression createFilterExpression(String tag, String type) { return filterExpression; } - public static Map createSubscriptionExpressions(Map map) { + public static Map createSubscriptionExpressions(Map map) { Map subscriptionExpressions = new HashMap<>(); map.forEach((topic, expression) -> { FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; From ed638164109c071dc20f698e0ced39fd2b04decc Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Thu, 12 Mar 2026 21:17:41 +0800 Subject: [PATCH 4/5] support simple consumer supports subscribing to multiple topics --- .../autoconfigure/RocketMQProperties.java | 23 ++++++++++++++++++- .../client/common/FilterExpression.java | 23 ------------------- .../rocketmq/client/support/RocketMQUtil.java | 2 +- 3 files changed, 23 insertions(+), 25 deletions(-) delete mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index d1f796e7..6e5be06c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.client.autoconfigure; -import org.apache.rocketmq.client.common.FilterExpression; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.Map; @@ -330,4 +329,26 @@ public String toString() { '}'; } } + + public static class FilterExpression { + private String tag; + + private String filterExpressionType; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getFilterExpressionType() { + return filterExpressionType; + } + + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; + } + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java deleted file mode 100644 index 3898b59c..00000000 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/common/FilterExpression.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.apache.rocketmq.client.common; - -public class FilterExpression { - private String tag; - - private String filterExpressionType; - - public String getTag() { - return tag; - } - - public void setTag(String tag) { - this.tag = tag; - } - - public String getFilterExpressionType() { - return filterExpressionType; - } - - public void setFilterExpressionType(String filterExpressionType) { - this.filterExpressionType = filterExpressionType; - } - } \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 33347717..187f399c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -187,7 +187,7 @@ public static FilterExpression createFilterExpression(String tag, String type) { return filterExpression; } - public static Map createSubscriptionExpressions(Map map) { + public static Map createSubscriptionExpressions(Map map) { Map subscriptionExpressions = new HashMap<>(); map.forEach((topic, expression) -> { FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; From bc8cb83bb6528d3e5c9aa5231ae778dda3b059c1 Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Fri, 3 Apr 2026 11:07:41 +0800 Subject: [PATCH 5/5] support simple consumer supports subscribing to multiple topics --- .../springboot/ExtRocketMQTemplate.java | 27 +++++++ .../V5SimpleConsumerConsumerApplication.java | 10 ++- .../src/main/resources/application.properties | 5 +- rocketmq-v5-client-spring-boot/pom.xml | 4 ++ .../ExtConsumerResetConfiguration.java | 24 +++++++ .../ExtConsumerResetConfiguration.java | 70 +++++++++++-------- .../RocketMQAutoConfiguration.java | 3 +- 7 files changed, 107 insertions(+), 36 deletions(-) create mode 100644 rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java new file mode 100644 index 00000000..6911b9ef --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.springboot; + +import org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; + +@ExtConsumerResetConfiguration(subscriptionExpressions = { + @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic", tag = "tagA", filterExpressionType = "tag"), + @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic2", tag = "tagB", filterExpressionType = "tag") +}) +public class ExtRocketMQTemplate extends RocketMQClientTemplate { +} \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java index 35b4b579..bd933772 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java @@ -32,14 +32,20 @@ public class V5SimpleConsumerConsumerApplication implements CommandLineRunner { @Resource private RocketMQClientTemplate rocketMQClientTemplate; + @Resource + private ExtRocketMQTemplate extRocketMQTemplate; + public static void main(String[] args) { SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args); } @Override public void run(String... args) throws Exception { - for (int i = 0; i < 10; i++) { - List messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(60)); + while (true){ + List messageList = extRocketMQTemplate.receive(10, Duration.ofSeconds(10)); + System.out.println(messageList); + + messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(10)); System.out.println(messageList); } } diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties index 327970f8..d625fdb4 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -rocketmq.simple-consumer.endpoints=localhost:8081 -rocketmq.simple-consumer.consumer-group=localhost:8081 +rocketmq.simple-consumer.endpoints=localhost:8080 +rocketmq.simple-consumer.consumer-group=test-group rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag #rocketmq.simple-consumer.access-key= #rocketmq.simple-consumer.secret-key= +#rocketmq.simple-consumer.namespace= diff --git a/rocketmq-v5-client-spring-boot/pom.xml b/rocketmq-v5-client-spring-boot/pom.xml index 202b7256..e4d910af 100644 --- a/rocketmq-v5-client-spring-boot/pom.xml +++ b/rocketmq-v5-client-spring-boot/pom.xml @@ -93,6 +93,10 @@ junit test + + com.alibaba + fastjson + diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..6212735b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -91,4 +91,28 @@ * The namespace of consumer. */ String namespace() default ""; + + /** + * subscribing to multiple topics + */ + FilterExpression[] subscriptionExpressions() default {}; + + @Retention(RetentionPolicy.RUNTIME) + @Target({}) + @interface FilterExpression { + /** + * Topic name of consumer. + */ + String topic(); + + /** + * Tag of consumer. + */ + String tag() default "*"; + + /** + * The type of filter expression + */ + String filterExpressionType() default "tag"; + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index a501e64e..46bb399b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.autoconfigure; +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.support.RocketMQMessageConverter; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; @@ -40,7 +41,7 @@ import org.springframework.util.StringUtils; import java.time.Duration; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -110,30 +111,44 @@ private SimpleConsumerInfo createConsumer( SimpleConsumerBuilder simpleConsumerBuilder) { RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); String consumerGroupName = resolvePlaceholders(annotation.consumerGroup(), simpleConsumer.getConsumerGroup()); - String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic()); String accessKey = resolvePlaceholders(annotation.accessKey(), simpleConsumer.getAccessKey()); String secretKey = resolvePlaceholders(annotation.secretKey(), simpleConsumer.getSecretKey()); String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); - String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); - Assert.hasText(topicName, "[topic] must not be null"); ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); Duration duration = Duration.ofSeconds(awaitDuration); simpleConsumerBuilder.setClientConfiguration(clientConfiguration); if (StringUtils.hasLength(consumerGroupName)) { simpleConsumerBuilder.setConsumerGroup(consumerGroupName); } simpleConsumerBuilder.setAwaitDuration(duration); - if (Objects.nonNull(filterExpression)) { - simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(topicName, filterExpression)); + + Map subscriptionExpressions = new HashMap<>(); + org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression[] filterExpressions = annotation.subscriptionExpressions(); + if (filterExpressions.length > 0) { + for (org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression expression : filterExpressions) { + Assert.hasText(expression.topic(), "[topic] must not be null"); + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(expression.tag(), expression.filterExpressionType()); + if (Objects.nonNull(filterExpression)) { + subscriptionExpressions.put(expression.topic(), filterExpression); + } + } + } else { + String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic()); + Assert.hasText(topicName, "[topic] must not be null"); + String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); + String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); + if (Objects.nonNull(filterExpression)) { + subscriptionExpressions.put(topicName, filterExpression); + } } + simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); - return new SimpleConsumerInfo(consumerGroupName, topicName, endPoints, namespace, tag, filterExpressionType, requestTimeout, awaitDuration, sslEnabled); + return new SimpleConsumerInfo(consumerGroupName, endPoints, namespace, requestTimeout, awaitDuration, sslEnabled, subscriptionExpressions); } private String resolvePlaceholders(String text, String defaultValue) { @@ -144,47 +159,40 @@ private String resolvePlaceholders(String text, String defaultValue) { static class SimpleConsumerInfo { String consumerGroup; - String topicName; - String endPoints; String namespace; - String tag; - - String filterExpressionType; - Duration requestTimeout; int awaitDuration; Boolean sslEnabled; - public SimpleConsumerInfo(String consumerGroupName, String topicName, String endPoints, String namespace, - String tag, String filterExpressionType, Duration requestTimeout, int awaitDuration, Boolean sslEnabled) { + Map subscriptionExpressions; + + public SimpleConsumerInfo(String consumerGroupName, String endPoints, String namespace, Duration requestTimeout, + int awaitDuration, Boolean sslEnabled, Map subscriptionExpressions) { this.consumerGroup = consumerGroupName; - this.topicName = topicName; this.endPoints = endPoints; this.namespace = namespace; - this.tag = tag; - this.filterExpressionType = filterExpressionType; this.requestTimeout = requestTimeout; this.awaitDuration = awaitDuration; this.sslEnabled = sslEnabled; + this.subscriptionExpressions = subscriptionExpressions; } - @Override public String toString() { + @Override + public String toString() { return "SimpleConsumerInfo{" + - "consumerGroup='" + consumerGroup + '\'' + - ", topicName='" + topicName + '\'' + - ", endPoints='" + endPoints + '\'' + - ", namespace='" + namespace + '\'' + - ", tag='" + tag + '\'' + - ", filterExpressionType='" + filterExpressionType + '\'' + - ", requestTimeout(seconds)=" + requestTimeout.getSeconds() + - ", awaitDuration=" + awaitDuration + - ", sslEnabled=" + sslEnabled + - '}'; + "consumerGroup='" + consumerGroup + '\'' + + ", endPoints='" + endPoints + '\'' + + ", namespace='" + namespace + '\'' + + ", requestTimeout=" + requestTimeout + + ", awaitDuration=" + awaitDuration + + ", sslEnabled=" + sslEnabled + + ", subscriptionExpressions=" + JSON.toJSONString(subscriptionExpressions) + + '}'; } } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index fde2d72d..83748903 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -42,6 +42,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.time.Duration; @@ -118,7 +119,7 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr } // Set the subscription for the consumer. - if (simpleConsumer.getSubscriptionExpressions().isEmpty()) { + if (CollectionUtils.isEmpty(simpleConsumer.getSubscriptionExpressions())) { FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); if (Objects.nonNull(filterExpression)) { simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression));