From 73671fa71961127c46776b10fe0272f7c0e10aeb Mon Sep 17 00:00:00 2001 From: Quan Date: Wed, 1 Jul 2026 18:04:38 +0800 Subject: [PATCH] [ISSUE #10569] Support Lite Simple Consumer on server side - Broker: add PopLiteLongPollingService to NotificationProcessor for lite consumer notification polling - Broker: LiteEventDispatcher notifies both PopLiteMessageProcessor and NotificationProcessor polling services - Broker: add hasEvents(clientId) API to LiteEventDispatcher for message availability check - Proxy: unify lite/normal pop paths into single popMessage call, route by ProxyContext.isLiteConsumer() - Proxy: register LITE_SIMPLE_CONSUMER in ClientActivity and GrpcClientSettingsManager lifecycle - Proxy: pass liteTopic property only for lite consumers in filter and response writer - Proxy: remove standalone popLiteMessage from MessagingProcessor/ConsumerProcessor - Remoting: add isLiteConsumer and clientId fields to NotificationRequestHeader --- .../rocketmq/broker/BrokerController.java | 2 + .../broker/lite/LiteEventDispatcher.java | 31 +++- .../processor/NotificationProcessor.java | 30 +++- .../broker/lite/LiteEventDispatcherTest.java | 3 + .../proxy/common/ContextVariable.java | 1 + .../rocketmq/proxy/common/ProxyContext.java | 15 ++ .../proxy/grpc/v2/client/ClientActivity.java | 7 +- .../v2/common/GrpcClientSettingsManager.java | 13 +- .../v2/consumer/ReceiveMessageActivity.java | 69 ++++----- .../ReceiveMessageResponseStreamWriter.java | 4 +- .../proxy/processor/ConsumerProcessor.java | 141 ++++++------------ .../processor/DefaultMessagingProcessor.java | 10 -- .../proxy/processor/MessagingProcessor.java | 14 -- .../consumer/ReceiveMessageActivityTest.java | 4 +- .../processor/ConsumerProcessorTest.java | 4 +- .../header/NotificationRequestHeader.java | 23 +++ 16 files changed, 184 insertions(+), 187 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 8e2954d8ff0..91f281e5e10 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1594,6 +1594,7 @@ protected void shutdownBasicService() { if (this.notificationProcessor != null) { this.notificationProcessor.getPopLongPollingService().shutdown(); + this.notificationProcessor.getPopLiteLongPollingService().shutdown(); } if (this.consumerIdsChangeListener != null) { @@ -1890,6 +1891,7 @@ protected void startBasicService() throws Exception { if (this.notificationProcessor != null) { this.notificationProcessor.getPopLongPollingService().start(); + this.notificationProcessor.getPopLiteLongPollingService().start(); } if (this.popConsumerService != null) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java index 2bd1f36186b..798077f4461 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java @@ -119,7 +119,7 @@ protected void doDispatch(String group, String lmqName, String excludeClientId) * If there are multiple clients, randomly select one and consider fallback options * Try to avoid dispatching to the excluded one but fallback if no other choice. * - * @param clients all clients of one group + * @param clients all clients of one group * @param excludeClientId the client ID to exclude from selection, probably consuming blocked. * @return true if dispatched to one client */ @@ -160,8 +160,7 @@ public boolean selectAndDispatch(String lmqName, List clients, Stri } } if (selectedClient != null) { - this.brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService() - .notifyMessageArriving(selectedClient, true, 0, group); + notifyMessageArriving(selectedClient, group); } else if (isWildcardGroup) { // no one available in this group, so schedule a full dispatch once scheduleFullDispatchForWildcardGroup(group, brokerController.getBrokerConfig().getLiteEventFullDispatchDelayTimeForWildcardGroup()); @@ -236,8 +235,7 @@ public void doFullDispatchForClient(String clientId, String group) { } if (eventSet.offer(lmqName)) { if (count++ % 10 == 0) { - brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService() - .notifyMessageArriving(clientId, true, 0, group); + notifyMessageArriving(clientId, group); } } else { LOGGER.warn("client event set full again, wait another period. {}, {}", clientId, isActiveConsuming); @@ -246,11 +244,30 @@ public void doFullDispatchForClient(String clientId, String group) { break; } } - brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService() - .notifyMessageArriving(clientId, true, 0, group); + notifyMessageArriving(clientId, group); LOGGER.info("client full dispatch finish. {}, dispatch:{}", clientId, count); } + private void notifyMessageArriving(String clientId, String group) { + brokerController.getPopLiteMessageProcessor() + .getPopLiteLongPollingService() + .notifyMessageArriving(clientId, true, 0, group); + brokerController.getNotificationProcessor() + .getPopLiteLongPollingService() + .notifyMessageArriving(clientId, true, 0, group); + } + + /** + * Check whether a client has any events in the event queue. + * + * @param clientId the client ID to check + * @return true if the client has events, false otherwise + */ + public boolean hasEvents(String clientId) { + ClientEventSet eventSet = clientEventMap.get(clientId); + return eventSet != null && eventSet.size() > 0; + } + /** * Perform a full dispatch for wildcard group which was previously marked for a delayed full dispatch. * It iterates through all LMQ topics in CQ table, so it may be a heavy work. diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 5217861565b..20c2f8880e9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.longpolling.PollingHeader; import org.apache.rocketmq.broker.longpolling.PollingResult; +import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService; import org.apache.rocketmq.broker.longpolling.PopLongPollingService; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; @@ -59,11 +60,13 @@ public class NotificationProcessor implements NettyRequestProcessor { private final BrokerController brokerController; private final Random random = new Random(System.currentTimeMillis()); private final PopLongPollingService popLongPollingService; + private final PopLiteLongPollingService popLiteLongPollingService; private static final String BORN_TIME = "bornTime"; public NotificationProcessor(final BrokerController brokerController) { this.brokerController = brokerController; this.popLongPollingService = new PopLongPollingService(brokerController, this, true); + this.popLiteLongPollingService = new PopLiteLongPollingService(brokerController, this, false); } public void shutdown() throws Exception { @@ -140,6 +143,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup()); return response; } + + boolean isLiteConsumer = requestHeader.isLiteConsumer(); int randomQ = random.nextInt(100); boolean hasMsg = false; BrokerConfig brokerConfig = brokerController.getBrokerConfig(); @@ -178,15 +183,18 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, } } - if (requestHeader.getQueueId() < 0) { + if (isLiteConsumer) { + hasMsg = hasMsgForLiteConsumer(requestHeader.getClientId()); + } else if (requestHeader.getQueueId() < 0) { // read all queue hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter); } else { int queueId = requestHeader.getQueueId(); hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter); } - // if it doesn't have message, fetch retry - if (!hasMsg) { + + // if it doesn't have message, fetch retry. Lite topic has no retry + if (!isLiteConsumer && !hasMsg) { String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null); if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { @@ -196,7 +204,13 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, } if (!hasMsg) { - PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter); + PollingResult pollingResult; + if (isLiteConsumer) { + pollingResult = popLiteLongPollingService.polling(ctx, request, requestHeader.getBornTime(), + requestHeader.getPollTime(), requestHeader.getClientId(), requestHeader.getConsumerGroup()); + } else { + pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter); + } if (pollingResult == PollingResult.POLLING_SUC) { return null; } else if (pollingResult == PollingResult.POLLING_FULL) { @@ -208,6 +222,10 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, return response; } + private boolean hasMsgForLiteConsumer(String clientId) { + return brokerController.getLiteEventDispatcher().hasEvents(clientId); + } + private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName); @@ -295,4 +313,8 @@ private long getPopOffset(String topic, String cid, int queueId) { public PopLongPollingService getPopLongPollingService() { return popLongPollingService; } + + public PopLiteLongPollingService getPopLiteLongPollingService() { + return popLiteLongPollingService; + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java index 84aec24829f..f96e5cb80de 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.processor.NotificationProcessor; import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.BrokerConfig; @@ -95,7 +96,9 @@ public void setUp() { liteEventDispatcher = new LiteEventDispatcher(brokerController, liteSubscriptionRegistry, liteLifecycleManager); PopLiteMessageProcessor popLiteMessageProcessor = new PopLiteMessageProcessor(brokerController, liteEventDispatcher); + NotificationProcessor notificationProcessor = new NotificationProcessor(brokerController); when(brokerController.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessor); + when(brokerController.getNotificationProcessor()).thenReturn(notificationProcessor); } @Test diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java index 93b4eacd8ad..d76c38130cd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java @@ -28,4 +28,5 @@ public class ContextVariable { public static final String ACTION = "action"; public static final String PROTOCOL_TYPE = "protocol-type"; public static final String NAMESPACE = "namespace"; + public static final String CLIENT_TYPE = "client-type"; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java index a2ab892a071..111c9a25fa2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.proxy.common; +import apache.rocketmq.v2.ClientType; import io.netty.channel.Channel; import java.util.HashMap; import java.util.Map; @@ -141,4 +142,18 @@ public String getNamespace() { return this.getVal(ContextVariable.NAMESPACE); } + public void setClientType(String clientType) { + this.withVal(ContextVariable.CLIENT_TYPE, clientType); + } + + public String getClientType() { + return this.getVal(ContextVariable.CLIENT_TYPE); + } + + public boolean isLiteConsumer() { + String clientType = this.getClientType(); + return ClientType.LITE_PUSH_CONSUMER.name().equals(clientType) + || ClientType.LITE_SIMPLE_CONSUMER.name().equals(clientType); + } + } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java index 13287f47c38..b674d448c06 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java @@ -116,7 +116,8 @@ public CompletableFuture heartbeat(ProxyContext ctx, Heartbea } case PUSH_CONSUMER: case LITE_PUSH_CONSUMER: - case SIMPLE_CONSUMER: { + case SIMPLE_CONSUMER: + case LITE_SIMPLE_CONSUMER: { validateConsumerGroup(request.getGroup()); String consumerGroup = request.getGroup().getName(); this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList(), false); @@ -168,6 +169,7 @@ public CompletableFuture notifyClientTerminatio case PUSH_CONSUMER: case LITE_PUSH_CONSUMER: case SIMPLE_CONSUMER: + case LITE_SIMPLE_CONSUMER: validateConsumerGroup(request.getGroup()); String consumerGroup = request.getGroup().getName(); GrpcClientChannel channel = this.grpcChannelManager.removeChannel(clientId); @@ -516,6 +518,7 @@ protected ConsumeMessageDirectlyResult buildConsumeMessageDirectlyResult(Status protected ConsumeType buildConsumeType(ClientType clientType) { switch (clientType) { case SIMPLE_CONSUMER: + case LITE_SIMPLE_CONSUMER: return ConsumeType.CONSUME_ACTIVELY; case PUSH_CONSUMER: case LITE_PUSH_CONSUMER: @@ -526,7 +529,7 @@ protected ConsumeType buildConsumeType(ClientType clientType) { } protected MessageModel buildMessageModel(ClientType clientType) { - if (clientType == ClientType.LITE_PUSH_CONSUMER) { + if (ClientType.LITE_PUSH_CONSUMER == clientType || ClientType.LITE_SIMPLE_CONSUMER == clientType) { return MessageModel.LITE_SELECTIVE; } return MessageModel.CLUSTERING; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java index 75cac21be4a..ac87da8c244 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java @@ -228,8 +228,12 @@ public String getServiceName() { public void offlineClientLiteSubscription(ProxyContext ctx, String clientId, Settings settings) { if (settings == null) { settings = getRawClientSettings(clientId); + if (settings == null) { + return; + } } - if (settings == null || ClientType.LITE_PUSH_CONSUMER != settings.getClientType()) { + if (ClientType.LITE_PUSH_CONSUMER != settings.getClientType() + && ClientType.LITE_SIMPLE_CONSUMER != settings.getClientType()) { return; } try { @@ -266,9 +270,10 @@ protected void onWaitEnd() { for (String clientId : clientIdSet) { try { CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> { - if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) && - !settings.getClientType().equals(ClientType.SIMPLE_CONSUMER) && - !settings.getClientType().equals(ClientType.LITE_PUSH_CONSUMER)) { + if (ClientType.PUSH_CONSUMER != settings.getClientType() && + ClientType.SIMPLE_CONSUMER != settings.getClientType() && + ClientType.LITE_PUSH_CONSUMER != settings.getClientType() && + ClientType.LITE_SIMPLE_CONSUMER != settings.getClientType()) { return settings; } String consumerGroup = settings.getSubscription().getGroup().getName(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index becf2c2165d..39f2995d6fc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.proxy.grpc.v2.consumer; -import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.Code; import apache.rocketmq.v2.FilterExpression; import apache.rocketmq.v2.ReceiveMessageRequest; @@ -65,7 +64,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, try { Settings settings = this.grpcClientSettingsManager.getClientSettings(ctx); - final boolean isLite = ClientType.LITE_PUSH_CONSUMER.equals(settings.getClientType()); + ctx.setClientType(settings.getClientType().name()); Subscription subscription = settings.getSubscription(); boolean fifo = subscription.getFifo(); @@ -123,9 +122,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, return; } - CompletableFuture popFuture; - if (isLite) { - + if (ctx.isLiteConsumer()) { GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID()); if (clientChannel == null) { writer.writeAndComplete(ctx, Code.BAD_REQUEST, @@ -140,42 +137,26 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, ctx.getClientID(), unackedMessageCount)); return; } - - popFuture = this.messagingProcessor.popLiteMessage( - ctx, - new ReceiveMessageQueueSelector( - request.getMessageQueue().getBroker().getName() - ), - group, - topic, - request.getBatchSize(), - actualInvisibleTime, - pollingTime, - subscriptionData, - new PopMessageResultFilterImpl(maxAttempts), - request.hasAttemptId() ? request.getAttemptId() : null, - timeRemaining - ); - } else { - popFuture = this.messagingProcessor.popMessage( - ctx, - new ReceiveMessageQueueSelector( - request.getMessageQueue().getBroker().getName() - ), - group, - topic, - request.getBatchSize(), - actualInvisibleTime, - pollingTime, - ConsumeInitMode.MAX, - subscriptionData, - fifo, - new PopMessageResultFilterImpl(maxAttempts), - request.hasAttemptId() ? request.getAttemptId() : null, - timeRemaining - ); } + CompletableFuture popFuture = this.messagingProcessor.popMessage( + ctx, + new ReceiveMessageQueueSelector( + request.getMessageQueue().getBroker().getName() + ), + group, + topic, + request.getBatchSize(), + actualInvisibleTime, + pollingTime, + ConsumeInitMode.MAX, + subscriptionData, + fifo, + new PopMessageResultFilterImpl(maxAttempts), + request.hasAttemptId() ? request.getAttemptId() : null, + timeRemaining + ); + final boolean autoRenew = proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew(); popFuture.thenAccept(popResult -> { Runnable doAfterWrite = null; @@ -208,14 +189,16 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request throw e; } return () -> { + boolean isLiteConsumer = ctx.isLiteConsumer(); List messageExtList = popResult.getMsgFoundList(); for (MessageExt messageExt : messageExtList) { String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK); if (receiptHandle != null) { - MessageReceiptHandle messageReceiptHandle = - new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), - messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), - messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC)); + // lite topic can be consumed by normal consumer + String liteTopic = isLiteConsumer ? messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) : null; + MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, + messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), messageExt.getQueueOffset(), + messageExt.getReconsumeTimes(), liteTopic); messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java index 69bd2a6bc4e..78a5f80bf10 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java @@ -129,7 +129,7 @@ protected void processThrowableWhenWriteMessage(Throwable throwable, if (handle == null) { return; } - + String liteTopic = ctx.isLiteConsumer() ? messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) : null; this.messagingProcessor.changeInvisibleTime( ctx, ReceiptHandle.decode(handle), @@ -137,7 +137,7 @@ protected void processThrowableWhenWriteMessage(Throwable throwable, request.getGroup().getName(), request.getMessageQueue().getTopic().getName(), NACK_INVISIBLE_TIME, - null, + liteTopic, MessagingProcessor.DEFAULT_TIMEOUT_MILLS, true ); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index b66d57c62a6..f77f269274b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -128,26 +128,46 @@ public CompletableFuture popMessage( maxMsgNums = ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST; } - PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setTopic(topic); - requestHeader.setQueueId(messageQueue.getQueueId()); - requestHeader.setMaxMsgNums(maxMsgNums); - requestHeader.setInvisibleTime(invisibleTime); - requestHeader.setPollTime(pollTime); - requestHeader.setInitMode(initMode); - requestHeader.setExpType(subscriptionData.getExpressionType()); - requestHeader.setExp(subscriptionData.getSubString()); - requestHeader.setOrder(fifo); - requestHeader.setAttemptId(attemptId); - - future = this.serviceManager.getMessageService().popMessage( - ctx, - messageQueue, - requestHeader, - timeoutMillis) - .thenApplyAsync(popResult -> filterPopResult(ctx, popResult, - requestHeader, consumerGroup, topic, subscriptionData, popMessageResultFilter), this.executor); + if (ctx.isLiteConsumer()) { + PopLiteMessageRequestHeader requestHeader = new PopLiteMessageRequestHeader(); + requestHeader.setClientId(ctx.getClientID()); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setTopic(topic); + requestHeader.setMaxMsgNum(maxMsgNums); + requestHeader.setInvisibleTime(invisibleTime); + requestHeader.setPollTime(pollTime); + requestHeader.setAttemptId(attemptId); + requestHeader.setBornTime(System.currentTimeMillis()); + + future = this.serviceManager.getMessageService().popLiteMessage( + ctx, + messageQueue, + requestHeader, + timeoutMillis) + .thenApplyAsync(popResult -> filterPopResult(ctx, popResult, + requestHeader, consumerGroup, topic, subscriptionData, popMessageResultFilter), this.executor); + } else { + PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setTopic(topic); + requestHeader.setQueueId(messageQueue.getQueueId()); + requestHeader.setMaxMsgNums(maxMsgNums); + requestHeader.setInvisibleTime(invisibleTime); + requestHeader.setPollTime(pollTime); + requestHeader.setInitMode(initMode); + requestHeader.setExpType(subscriptionData.getExpressionType()); + requestHeader.setExp(subscriptionData.getSubString()); + requestHeader.setOrder(fifo); + requestHeader.setAttemptId(attemptId); + + future = this.serviceManager.getMessageService().popMessage( + ctx, + messageQueue, + requestHeader, + timeoutMillis) + .thenApplyAsync(popResult -> filterPopResult(ctx, popResult, + requestHeader, consumerGroup, topic, subscriptionData, popMessageResultFilter), this.executor); + } } catch (Throwable t) { future.completeExceptionally(t); } @@ -173,8 +193,7 @@ private PopResult filterPopResult(ProxyContext ctx, PopResult popResult, Command } MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, handleString); - String liteTopic = messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC); - + String liteTopic = ctx.isLiteConsumer() ? messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) : null; PopMessageResultFilter.FilterResult filterResult = popMessageResultFilter.filterMessage(ctx, consumerGroup, subscriptionData, messageExt); switch (filterResult) { @@ -185,8 +204,7 @@ private PopResult filterPopResult(ProxyContext ctx, PopResult popResult, Command messageExt.getMsgId(), consumerGroup, topic, - liteTopic, - MessagingProcessor.DEFAULT_TIMEOUT_MILLS); + liteTopic); break; case TO_DLQ: this.messagingProcessor.forwardMessageToDeadLetterQueue( @@ -195,8 +213,7 @@ private PopResult filterPopResult(ProxyContext ctx, PopResult popResult, Command messageExt.getMsgId(), consumerGroup, topic, - liteTopic, - MessagingProcessor.DEFAULT_TIMEOUT_MILLS); + liteTopic); break; case TO_RETURN: this.messagingProcessor.changeInvisibleTime( @@ -225,78 +242,6 @@ private PopResult filterPopResult(ProxyContext ctx, PopResult popResult, Command return popResult; } - public CompletableFuture popLiteMessage( - ProxyContext ctx, - QueueSelector queueSelector, - String consumerGroup, - String topic, - int maxMsgNums, - long invisibleTime, - long pollTime, - SubscriptionData subscriptionData, - PopMessageResultFilter popMessageResultFilter, - String attemptId, - long timeoutMillis - ) { - CompletableFuture future = new CompletableFuture<>(); - try { - AddressableMessageQueue messageQueue = queueSelector.select(ctx, - this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic)); - if (messageQueue == null) { - throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue"); - } - return doPopLiteMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, - subscriptionData, popMessageResultFilter, attemptId, timeoutMillis); - } catch (Throwable t) { - future.completeExceptionally(t); - } - return future; - } - - private CompletableFuture doPopLiteMessage( - ProxyContext ctx, - AddressableMessageQueue messageQueue, - String consumerGroup, - String topic, - int maxMsgNums, - long invisibleTime, - long pollTime, - SubscriptionData subscriptionData, - PopMessageResultFilter popMessageResultFilter, - String attemptId, - long timeoutMillis - ) { - CompletableFuture future = new CompletableFuture<>(); - try { - if (maxMsgNums > ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST) { - log.warn("change maxNums from {} to {} for pop request, with info: topic:{}, group:{}", - maxMsgNums, ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST, topic, consumerGroup); - maxMsgNums = ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST; - } - - PopLiteMessageRequestHeader requestHeader = new PopLiteMessageRequestHeader(); - requestHeader.setClientId(ctx.getClientID()); - requestHeader.setConsumerGroup(consumerGroup); - requestHeader.setTopic(topic); - requestHeader.setMaxMsgNum(maxMsgNums); - requestHeader.setInvisibleTime(invisibleTime); - requestHeader.setPollTime(pollTime); - requestHeader.setAttemptId(attemptId); - requestHeader.setBornTime(System.currentTimeMillis()); - - future = this.serviceManager.getMessageService().popLiteMessage( - ctx, - messageQueue, - requestHeader, - timeoutMillis) - .thenApplyAsync(popResult -> filterPopResult(ctx, popResult, - requestHeader, consumerGroup, topic, subscriptionData, popMessageResultFilter), this.executor); - } catch (Throwable t) { - future.completeExceptionally(t); - FutureUtils.addExecutor(future, this.executor); - } - return future; - } private void fillUniqIDIfNeed(MessageExt messageExt) { if (StringUtils.isBlank(MessageClientIDSetter.getUniqID(messageExt))) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 3e7a8894859..a56bc42596b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -204,16 +204,6 @@ public CompletableFuture popMessage( invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis); } - @Override - public CompletableFuture popLiteMessage(ProxyContext ctx, QueueSelector queueSelector, - String consumerGroup, String topic, int maxMsgNums, long invisibleTime, long pollTime, - SubscriptionData subscriptionData, PopMessageResultFilter popMessageResultFilter, - String attemptId, long timeoutMillis) { - return this.consumerProcessor.popLiteMessage(ctx, queueSelector, - consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, - subscriptionData, popMessageResultFilter, attemptId, timeoutMillis); - } - @Override public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle handle, String messageId, String consumerGroup, String topic, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index a1500dbdedd..35d10bd083b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -163,20 +163,6 @@ CompletableFuture popMessage( long timeoutMillis ); - CompletableFuture popLiteMessage( - ProxyContext ctx, - QueueSelector queueSelector, - String consumerGroup, - String topic, - int maxMsgNums, - long invisibleTime, - long pollTime, - SubscriptionData subscriptionData, - PopMessageResultFilter popMessageResultFilter, - String attemptId, - long timeoutMillis - ); - default CompletableFuture ackMessage( ProxyContext ctx, ReceiptHandle handle, diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 6478f90cb64..5341c259761 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -342,7 +342,7 @@ public void testReceiveLiteMessageAddReceiptHandleWithLiteTopic() { messageExt.setBody("body".getBytes()); PopResult popResult = new PopResult(PopStatus.FOUND, Collections.singletonList(messageExt)); - when(this.messagingProcessor.popLiteMessage( + when(this.messagingProcessor.popMessage( any(), any(), anyString(), @@ -350,7 +350,9 @@ public void testReceiveLiteMessageAddReceiptHandleWithLiteTopic() { anyInt(), anyLong(), anyLong(), + anyInt(), any(), + anyBoolean(), any(), isNull(), anyLong())).thenReturn(CompletableFuture.completedFuture(popResult)); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java index 4df343c409e..b61c22b441e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java @@ -111,11 +111,11 @@ public void testPopMessage() throws Throwable { .thenReturn(mock(MessageQueueView.class)); ArgumentCaptor ackMessageIdArgumentCaptor = ArgumentCaptor.forClass(String.class); - when(this.messagingProcessor.ackMessage(any(), any(), ackMessageIdArgumentCaptor.capture(), anyString(), anyString(), any(), anyLong())) + when(this.messagingProcessor.ackMessage(any(), any(), ackMessageIdArgumentCaptor.capture(), anyString(), anyString(), any())) .thenReturn(CompletableFuture.completedFuture(mock(AckResult.class))); ArgumentCaptor toDLQMessageIdArgumentCaptor = ArgumentCaptor.forClass(String.class); - when(this.messagingProcessor.forwardMessageToDeadLetterQueue(any(), any(), toDLQMessageIdArgumentCaptor.capture(), anyString(), anyString(), any(), anyLong())) + when(this.messagingProcessor.forwardMessageToDeadLetterQueue(any(), any(), toDLQMessageIdArgumentCaptor.capture(), anyString(), anyString(), any())) .thenReturn(CompletableFuture.completedFuture(mock(RemotingCommand.class))); AddressableMessageQueue messageQueue = mock(AddressableMessageQueue.class); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java index 46c5930c1d1..8fe7382a357 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java @@ -43,6 +43,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader { private Boolean order = Boolean.FALSE; private String attemptId; + // for lite topic + private boolean isLiteConsumer = false; + private String clientId; private String expType; private String exp; @@ -127,6 +130,22 @@ public void setExp(String exp) { this.exp = exp; } + public boolean isLiteConsumer() { + return isLiteConsumer; + } + + public void setLiteConsumer(boolean liteConsumer) { + isLiteConsumer = liteConsumer; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -137,6 +156,10 @@ public String toString() { .add("bornTime", bornTime) .add("order", order) .add("attemptId", attemptId) + // print isLiteConsumer only when true + .add("isLiteConsumer", isLiteConsumer ? true : null) + .add("clientId", clientId) + .omitNullValues() .toString(); } }