Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,7 @@ protected void shutdownBasicService() {

if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().shutdown();
this.notificationProcessor.getPopLiteLongPollingService().shutdown();
}

if (this.consumerIdsChangeListener != null) {
Expand Down Expand Up @@ -1890,6 +1891,7 @@ protected void startBasicService() throws Exception {

if (this.notificationProcessor != null) {
this.notificationProcessor.getPopLongPollingService().start();
this.notificationProcessor.getPopLiteLongPollingService().start();
}

if (this.popConsumerService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected void doDispatch(String group, String lmqName, String excludeClientId)
* If there are multiple clients, randomly select one and consider fallback options
* Try to avoid dispatching to the excluded one but fallback if no other choice.
*
* @param clients all clients of one group
* @param clients all clients of one group
* @param excludeClientId the client ID to exclude from selection, probably consuming blocked.
* @return true if dispatched to one client
*/
Expand Down Expand Up @@ -160,8 +160,7 @@ public boolean selectAndDispatch(String lmqName, List<ClientGroup> clients, Stri
}
}
if (selectedClient != null) {
this.brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService()
.notifyMessageArriving(selectedClient, true, 0, group);
notifyMessageArriving(selectedClient, group);
} else if (isWildcardGroup) { // no one available in this group, so schedule a full dispatch once
scheduleFullDispatchForWildcardGroup(group,
brokerController.getBrokerConfig().getLiteEventFullDispatchDelayTimeForWildcardGroup());
Expand Down Expand Up @@ -236,8 +235,7 @@ public void doFullDispatchForClient(String clientId, String group) {
}
if (eventSet.offer(lmqName)) {
if (count++ % 10 == 0) {
brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService()
.notifyMessageArriving(clientId, true, 0, group);
notifyMessageArriving(clientId, group);
}
} else {
LOGGER.warn("client event set full again, wait another period. {}, {}", clientId, isActiveConsuming);
Expand All @@ -246,11 +244,30 @@ public void doFullDispatchForClient(String clientId, String group) {
break;
}
}
brokerController.getPopLiteMessageProcessor().getPopLiteLongPollingService()
.notifyMessageArriving(clientId, true, 0, group);
notifyMessageArriving(clientId, group);
LOGGER.info("client full dispatch finish. {}, dispatch:{}", clientId, count);
}

private void notifyMessageArriving(String clientId, String group) {
brokerController.getPopLiteMessageProcessor()
.getPopLiteLongPollingService()
.notifyMessageArriving(clientId, true, 0, group);
brokerController.getNotificationProcessor()
.getPopLiteLongPollingService()
.notifyMessageArriving(clientId, true, 0, group);
}

/**
* Check whether a client has any events in the event queue.
*
* @param clientId the client ID to check
* @return true if the client has events, false otherwise
*/
public boolean hasEvents(String clientId) {
ClientEventSet eventSet = clientEventMap.get(clientId);
return eventSet != null && eventSet.size() > 0;
}

/**
* Perform a full dispatch for wildcard group which was previously marked for a delayed full dispatch.
* It iterates through all LMQ topics in CQ table, so it may be a heavy work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
Expand Down Expand Up @@ -59,11 +60,13 @@ public class NotificationProcessor implements NettyRequestProcessor {
private final BrokerController brokerController;
private final Random random = new Random(System.currentTimeMillis());
private final PopLongPollingService popLongPollingService;
private final PopLiteLongPollingService popLiteLongPollingService;
private static final String BORN_TIME = "bornTime";

public NotificationProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.popLongPollingService = new PopLongPollingService(brokerController, this, true);
this.popLiteLongPollingService = new PopLiteLongPollingService(brokerController, this, false);
}

public void shutdown() throws Exception {
Expand Down Expand Up @@ -140,6 +143,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
}

boolean isLiteConsumer = requestHeader.isLiteConsumer();
int randomQ = random.nextInt(100);
boolean hasMsg = false;
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
Expand Down Expand Up @@ -178,15 +183,18 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
}
}

if (requestHeader.getQueueId() < 0) {
if (isLiteConsumer) {
hasMsg = hasMsgForLiteConsumer(requestHeader.getClientId());
} else if (requestHeader.getQueueId() < 0) {
// read all queue
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
} else {
int queueId = requestHeader.getQueueId();
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
}
// if it doesn't have message, fetch retry
if (!hasMsg) {

// if it doesn't have message, fetch retry. Lite topic has no retry
if (!isLiteConsumer && !hasMsg) {
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null);
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
Expand All @@ -196,7 +204,13 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
}

if (!hasMsg) {
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
PollingResult pollingResult;
if (isLiteConsumer) {
pollingResult = popLiteLongPollingService.polling(ctx, request, requestHeader.getBornTime(),
requestHeader.getPollTime(), requestHeader.getClientId(), requestHeader.getConsumerGroup());
} else {
pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
}
if (pollingResult == PollingResult.POLLING_SUC) {
return null;
} else if (pollingResult == PollingResult.POLLING_FULL) {
Expand All @@ -208,6 +222,10 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
return response;
}

private boolean hasMsgForLiteConsumer(String clientId) {
return brokerController.getLiteEventDispatcher().hasEvents(clientId);
}

private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
throws RemotingCommandException {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
Expand Down Expand Up @@ -295,4 +313,8 @@ private long getPopOffset(String topic, String cid, int queueId) {
public PopLongPollingService getPopLongPollingService() {
return popLongPollingService;
}

public PopLiteLongPollingService getPopLiteLongPollingService() {
return popLiteLongPollingService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
Expand Down Expand Up @@ -95,7 +96,9 @@ public void setUp() {

liteEventDispatcher = new LiteEventDispatcher(brokerController, liteSubscriptionRegistry, liteLifecycleManager);
PopLiteMessageProcessor popLiteMessageProcessor = new PopLiteMessageProcessor(brokerController, liteEventDispatcher);
NotificationProcessor notificationProcessor = new NotificationProcessor(brokerController);
when(brokerController.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessor);
when(brokerController.getNotificationProcessor()).thenReturn(notificationProcessor);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class ContextVariable {
public static final String ACTION = "action";
public static final String PROTOCOL_TYPE = "protocol-type";
public static final String NAMESPACE = "namespace";
public static final String CLIENT_TYPE = "client-type";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.proxy.common;

import apache.rocketmq.v2.ClientType;
import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -141,4 +142,18 @@ public String getNamespace() {
return this.getVal(ContextVariable.NAMESPACE);
}

public void setClientType(String clientType) {
this.withVal(ContextVariable.CLIENT_TYPE, clientType);
}

public String getClientType() {
return this.getVal(ContextVariable.CLIENT_TYPE);
}

public boolean isLiteConsumer() {
String clientType = this.getClientType();
return ClientType.LITE_PUSH_CONSUMER.name().equals(clientType)
|| ClientType.LITE_SIMPLE_CONSUMER.name().equals(clientType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public CompletableFuture<HeartbeatResponse> heartbeat(ProxyContext ctx, Heartbea
}
case PUSH_CONSUMER:
case LITE_PUSH_CONSUMER:
case SIMPLE_CONSUMER: {
case SIMPLE_CONSUMER:
case LITE_SIMPLE_CONSUMER: {
validateConsumerGroup(request.getGroup());
String consumerGroup = request.getGroup().getName();
this.registerConsumer(ctx, consumerGroup, clientSettings.getClientType(), clientSettings.getSubscription().getSubscriptionsList(), false);
Expand Down Expand Up @@ -168,6 +169,7 @@ public CompletableFuture<NotifyClientTerminationResponse> notifyClientTerminatio
case PUSH_CONSUMER:
case LITE_PUSH_CONSUMER:
case SIMPLE_CONSUMER:
case LITE_SIMPLE_CONSUMER:
validateConsumerGroup(request.getGroup());
String consumerGroup = request.getGroup().getName();
GrpcClientChannel channel = this.grpcChannelManager.removeChannel(clientId);
Expand Down Expand Up @@ -516,6 +518,7 @@ protected ConsumeMessageDirectlyResult buildConsumeMessageDirectlyResult(Status
protected ConsumeType buildConsumeType(ClientType clientType) {
switch (clientType) {
case SIMPLE_CONSUMER:
case LITE_SIMPLE_CONSUMER:
return ConsumeType.CONSUME_ACTIVELY;
case PUSH_CONSUMER:
case LITE_PUSH_CONSUMER:
Expand All @@ -526,7 +529,7 @@ protected ConsumeType buildConsumeType(ClientType clientType) {
}

protected MessageModel buildMessageModel(ClientType clientType) {
if (clientType == ClientType.LITE_PUSH_CONSUMER) {
if (ClientType.LITE_PUSH_CONSUMER == clientType || ClientType.LITE_SIMPLE_CONSUMER == clientType) {
return MessageModel.LITE_SELECTIVE;
}
return MessageModel.CLUSTERING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,12 @@ public String getServiceName() {
public void offlineClientLiteSubscription(ProxyContext ctx, String clientId, Settings settings) {
if (settings == null) {
settings = getRawClientSettings(clientId);
if (settings == null) {
return;
}
}
if (settings == null || ClientType.LITE_PUSH_CONSUMER != settings.getClientType()) {
if (ClientType.LITE_PUSH_CONSUMER != settings.getClientType()
&& ClientType.LITE_SIMPLE_CONSUMER != settings.getClientType()) {
return;
}
try {
Expand Down Expand Up @@ -266,9 +270,10 @@ protected void onWaitEnd() {
for (String clientId : clientIdSet) {
try {
CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, settings) -> {
if (!settings.getClientType().equals(ClientType.PUSH_CONSUMER) &&
!settings.getClientType().equals(ClientType.SIMPLE_CONSUMER) &&
!settings.getClientType().equals(ClientType.LITE_PUSH_CONSUMER)) {
if (ClientType.PUSH_CONSUMER != settings.getClientType() &&
ClientType.SIMPLE_CONSUMER != settings.getClientType() &&
ClientType.LITE_PUSH_CONSUMER != settings.getClientType() &&
ClientType.LITE_SIMPLE_CONSUMER != settings.getClientType()) {
return settings;
}
String consumerGroup = settings.getSubscription().getGroup().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.proxy.grpc.v2.consumer;

import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.FilterExpression;
import apache.rocketmq.v2.ReceiveMessageRequest;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,

try {
Settings settings = this.grpcClientSettingsManager.getClientSettings(ctx);
final boolean isLite = ClientType.LITE_PUSH_CONSUMER.equals(settings.getClientType());
ctx.setClientType(settings.getClientType().name());

Subscription subscription = settings.getSubscription();
boolean fifo = subscription.getFifo();
Expand Down Expand Up @@ -123,9 +122,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
return;
}

CompletableFuture<PopResult> popFuture;
if (isLite) {

if (ctx.isLiteConsumer()) {
GrpcClientChannel clientChannel = grpcChannelManager.getChannel(ctx.getClientID());
if (clientChannel == null) {
writer.writeAndComplete(ctx, Code.BAD_REQUEST,
Expand All @@ -140,42 +137,26 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
ctx.getClientID(), unackedMessageCount));
return;
}

popFuture = this.messagingProcessor.popLiteMessage(
ctx,
new ReceiveMessageQueueSelector(
request.getMessageQueue().getBroker().getName()
),
group,
topic,
request.getBatchSize(),
actualInvisibleTime,
pollingTime,
subscriptionData,
new PopMessageResultFilterImpl(maxAttempts),
request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
);
} else {
popFuture = this.messagingProcessor.popMessage(
ctx,
new ReceiveMessageQueueSelector(
request.getMessageQueue().getBroker().getName()
),
group,
topic,
request.getBatchSize(),
actualInvisibleTime,
pollingTime,
ConsumeInitMode.MAX,
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
);
}

CompletableFuture<PopResult> popFuture = this.messagingProcessor.popMessage(
ctx,
new ReceiveMessageQueueSelector(
request.getMessageQueue().getBroker().getName()
),
group,
topic,
request.getBatchSize(),
actualInvisibleTime,
pollingTime,
ConsumeInitMode.MAX,
subscriptionData,
fifo,
new PopMessageResultFilterImpl(maxAttempts),
request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
);

final boolean autoRenew = proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew();
popFuture.thenAccept(popResult -> {
Runnable doAfterWrite = null;
Expand Down Expand Up @@ -208,14 +189,16 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request
throw e;
}
return () -> {
boolean isLiteConsumer = ctx.isLiteConsumer();
List<MessageExt> messageExtList = popResult.getMsgFoundList();
for (MessageExt messageExt : messageExtList) {
String receiptHandle = messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(), messageExt.getReconsumeTimes(),
messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC));
// lite topic can be consumed by normal consumer
String liteTopic = isLiteConsumer ? messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC) : null;
MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), messageExt.getQueueOffset(),
messageExt.getReconsumeTimes(), liteTopic);
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
}
}
Expand Down
Loading
Loading