[ISSUE #10569] [Feature] Support Lite Simple Consumer on server side#10570
[ISSUE #10569] [Feature] Support Lite Simple Consumer on server side#10570f1amingo wants to merge 1 commit into
Conversation
- Broker: add PopLiteLongPollingService to NotificationProcessor for lite consumer notification polling - Broker: LiteEventDispatcher notifies both PopLiteMessageProcessor and NotificationProcessor polling services - Broker: add hasEvents(clientId) API to LiteEventDispatcher for message availability check - Proxy: unify lite/normal pop paths into single popMessage call, route by ProxyContext.isLiteConsumer() - Proxy: register LITE_SIMPLE_CONSUMER in ClientActivity and GrpcClientSettingsManager lifecycle - Proxy: pass liteTopic property only for lite consumers in filter and response writer - Proxy: remove standalone popLiteMessage from MessagingProcessor/ConsumerProcessor - Remoting: add isLiteConsumer and clientId fields to NotificationRequestHeader
3155d4a to
73671fa
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR adds server-side support for Lite Simple Consumer, extending the existing Lite Push Consumer architecture to support pull-based consumption for Lite Topics. The changes span Broker (new PopLiteLongPollingService in NotificationProcessor), Proxy (unified popMessage routing, LITE_SIMPLE_CONSUMER lifecycle registration), and Remoting (NotificationRequestHeader extension).
Overall the refactoring is clean — consolidating the separate popLiteMessage code path into a unified popMessage with internal routing reduces duplication significantly (-185/+179).
Findings
-
[Critical]
proxy/.../processor/MessagingProcessor.java:166— RemovingpopLiteMessagefrom theMessagingProcessorinterface is a breaking API change. Any external or downstream implementations of this interface will fail to compile after upgrade. If this interface is considered internal (not part of the public SPI), this is acceptable, but please confirm. If external implementations exist, consider deprecating with@Deprecatedfirst instead of removing outright. -
[Warning]
broker/.../processor/NotificationProcessor.java— The newhasMsgForLiteConsumer(clientId)method relies onLiteEventDispatcher.hasEvents(clientId)which checksclientEventMap. This is a presence-of-events check, not a presence-of-messages check like the normal consumer path (which queries the store). If the event map becomes stale (e.g., events consumed but map not cleaned), lite consumers could get false positives and return empty results. Consider whether a fallback to actual store lookup is needed whenhasEventsreturns true but the subsequent pop returns nothing. -
[Warning]
broker/.../processor/NotificationProcessor.java— Lite consumers skip retry topic lookup (if (!isLiteConsumer && !hasMsg)). The comment says "Lite topic has no retry", which matches the PR description. However, what happens to messages that fail consumption in the lite path? If there is no retry/DLQ mechanism for lite consumers, failed messages may be silently lost. Please confirm this is intentional and documented. -
[Info]
proxy/.../processor/ConsumerProcessor.java:131-170— The unifiedpopMessagenow branches onctx.isLiteConsumer()to build eitherPopLiteMessageRequestHeaderorPopMessageRequestHeader. The lite branch does not setqueueId,initMode,expType,exp, ororder— these are intentionally omitted for lite consumers, but adding a brief inline comment explaining why would help future maintainers. -
[Info]
proxy/src/test/.../ReceiveMessageActivityTest.java— Only this test is updated. Consider adding tests for:LiteEventDispatcher.hasEvents()— empty map, null clientId, non-empty event setNotificationProcessorlite consumer path — verifyinghasMsgForLiteConsumerand polling service selectionConsumerProcessor.popMessagewithisLiteConsumer=true— verifying correctPopLiteMessageRequestHeaderconstruction
-
[Info]
remoting/.../header/NotificationRequestHeader.java— NewisLiteConsumerdefaults tofalseandclientIdis nullable — good backward compatibility. ThetoString()withomitNullValues()is a nice touch for log readability. -
[Info]
proxy/.../common/GrpcClientSettingsManager.java:230-233— The added null check forsettingsinofflineClientLiteSubscriptionis a good defensive improvement that prevents NPE when settings have already been evicted.
Suggestions
- Add a brief comment in
ConsumerProcessor.popMessageexplaining why lite consumers omit certain header fields. - Consider adding unit tests for
LiteEventDispatcher.hasEvents()and the lite consumer notification path. - If
MessagingProcessoris a public SPI, add a deprecation cycle forpopLiteMessagebefore removal.
Cross-repo Note
The NotificationRequestHeader changes (new isLiteConsumer and clientId fields) affect the Broker-Proxy remoting protocol. Since apache/rocketmq-clients communicates via gRPC through the Proxy, this should be transparent to clients — but worth verifying that the Proxy correctly translates these fields in both directions.
Automated review by github-manager-bot
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10570 +/- ##
=============================================
- Coverage 48.22% 48.16% -0.06%
+ Complexity 13422 13412 -10
=============================================
Files 1378 1378
Lines 100817 100832 +15
Branches 13040 13050 +10
=============================================
- Hits 48614 48569 -45
- Misses 46252 46296 +44
- Partials 5951 5967 +16 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Dismiss accidental changes requested review
Which Issue(s) This PR Fixes
Brief Description
Add server-side support for Lite Simple Consumer. Currently RocketMQ Lite Topic only supports Lite Push Consumer; this PR enables the pull-based (Simple Consumer) consumption pattern for Lite Topics.
Broker
NotificationProcessor: add a dedicatedPopLiteLongPollingServiceso lite consumers can use the notification (long-polling) protocol. Lite consumer requests check message availability viaLiteEventDispatcher.hasEvents(clientId)and skip retry-topic lookup.LiteEventDispatcher: refactornotifyMessageArrivinginto a private helper that wakes bothPopLiteMessageProcessorandNotificationProcessorpolling services. ExposehasEvents(clientId)for availability checks.BrokerController: managepopLiteLongPollingServicelifecycle (start/shutdown).Proxy
ProxyContext: addclientType/isLiteConsumer()so lite-consumer context is accessible throughout the proxy layer.ConsumerProcessor.popMessage: route internally based onctx.isLiteConsumer()— lite consumers usePopLiteMessageRequestHeader, normal consumers usePopMessageRequestHeader. Removes the separatepopLiteMessagecode path.MessagingProcessor/DefaultMessagingProcessor: remove standalonepopLiteMessageinterface method (now unified).ClientActivity/GrpcClientSettingsManager: registerLITE_SIMPLE_CONSUMERin all client lifecycle switch statements (register, unregister, heartbeat, consume-type, message-model).ReceiveMessageActivity/ReceiveMessageResponseStreamWriter: passliteTopicproperty only for lite consumers; normal consumers getnull.Remoting
NotificationRequestHeader: addisLiteConsumerandclientIdfields so the broker can distinguish lite consumer notification requests.How Did You Test This Change?
ReceiveMessageActivityTestto mock the unifiedpopMessagepath.