Skip to content

[ISSUE #10569] [Feature] Support Lite Simple Consumer on server side#10570

Open
f1amingo wants to merge 1 commit into
apache:developfrom
f1amingo:feature/lite-simple-consumer
Open

[ISSUE #10569] [Feature] Support Lite Simple Consumer on server side#10570
f1amingo wants to merge 1 commit into
apache:developfrom
f1amingo:feature/lite-simple-consumer

Conversation

@f1amingo

@f1amingo f1amingo commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Which Issue(s) This PR Fixes

Brief Description

Add server-side support for Lite Simple Consumer. Currently RocketMQ Lite Topic only supports Lite Push Consumer; this PR enables the pull-based (Simple Consumer) consumption pattern for Lite Topics.

Broker

  • NotificationProcessor: add a dedicated PopLiteLongPollingService so lite consumers can use the notification (long-polling) protocol. Lite consumer requests check message availability via LiteEventDispatcher.hasEvents(clientId) and skip retry-topic lookup.
  • LiteEventDispatcher: refactor notifyMessageArriving into a private helper that wakes both PopLiteMessageProcessor and NotificationProcessor polling services. Expose hasEvents(clientId) for availability checks.
  • BrokerController: manage popLiteLongPollingService lifecycle (start/shutdown).

Proxy

  • ProxyContext: add clientType / isLiteConsumer() so lite-consumer context is accessible throughout the proxy layer.
  • ConsumerProcessor.popMessage: route internally based on ctx.isLiteConsumer() — lite consumers use PopLiteMessageRequestHeader, normal consumers use PopMessageRequestHeader. Removes the separate popLiteMessage code path.
  • MessagingProcessor / DefaultMessagingProcessor: remove standalone popLiteMessage interface method (now unified).
  • ClientActivity / GrpcClientSettingsManager: register LITE_SIMPLE_CONSUMER in all client lifecycle switch statements (register, unregister, heartbeat, consume-type, message-model).
  • ReceiveMessageActivity / ReceiveMessageResponseStreamWriter: pass liteTopic property only for lite consumers; normal consumers get null.

Remoting

  • NotificationRequestHeader: add isLiteConsumer and clientId fields so the broker can distinguish lite consumer notification requests.

How Did You Test This Change?

  • Updated ReceiveMessageActivityTest to mock the unified popMessage path.
  • Manual verification with local broker + proxy cluster.

- Broker: add PopLiteLongPollingService to NotificationProcessor for lite consumer notification polling
- Broker: LiteEventDispatcher notifies both PopLiteMessageProcessor and NotificationProcessor polling services
- Broker: add hasEvents(clientId) API to LiteEventDispatcher for message availability check
- Proxy: unify lite/normal pop paths into single popMessage call, route by ProxyContext.isLiteConsumer()
- Proxy: register LITE_SIMPLE_CONSUMER in ClientActivity and GrpcClientSettingsManager lifecycle
- Proxy: pass liteTopic property only for lite consumers in filter and response writer
- Proxy: remove standalone popLiteMessage from MessagingProcessor/ConsumerProcessor
- Remoting: add isLiteConsumer and clientId fields to NotificationRequestHeader
@f1amingo f1amingo force-pushed the feature/lite-simple-consumer branch from 3155d4a to 73671fa Compare July 1, 2026 10:34

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

This PR adds server-side support for Lite Simple Consumer, extending the existing Lite Push Consumer architecture to support pull-based consumption for Lite Topics. The changes span Broker (new PopLiteLongPollingService in NotificationProcessor), Proxy (unified popMessage routing, LITE_SIMPLE_CONSUMER lifecycle registration), and Remoting (NotificationRequestHeader extension).

Overall the refactoring is clean — consolidating the separate popLiteMessage code path into a unified popMessage with internal routing reduces duplication significantly (-185/+179).

Findings

  • [Critical] proxy/.../processor/MessagingProcessor.java:166 — Removing popLiteMessage from the MessagingProcessor interface is a breaking API change. Any external or downstream implementations of this interface will fail to compile after upgrade. If this interface is considered internal (not part of the public SPI), this is acceptable, but please confirm. If external implementations exist, consider deprecating with @Deprecated first instead of removing outright.

  • [Warning] broker/.../processor/NotificationProcessor.java — The new hasMsgForLiteConsumer(clientId) method relies on LiteEventDispatcher.hasEvents(clientId) which checks clientEventMap. This is a presence-of-events check, not a presence-of-messages check like the normal consumer path (which queries the store). If the event map becomes stale (e.g., events consumed but map not cleaned), lite consumers could get false positives and return empty results. Consider whether a fallback to actual store lookup is needed when hasEvents returns true but the subsequent pop returns nothing.

  • [Warning] broker/.../processor/NotificationProcessor.java — Lite consumers skip retry topic lookup (if (!isLiteConsumer && !hasMsg)). The comment says "Lite topic has no retry", which matches the PR description. However, what happens to messages that fail consumption in the lite path? If there is no retry/DLQ mechanism for lite consumers, failed messages may be silently lost. Please confirm this is intentional and documented.

  • [Info] proxy/.../processor/ConsumerProcessor.java:131-170 — The unified popMessage now branches on ctx.isLiteConsumer() to build either PopLiteMessageRequestHeader or PopMessageRequestHeader. The lite branch does not set queueId, initMode, expType, exp, or order — these are intentionally omitted for lite consumers, but adding a brief inline comment explaining why would help future maintainers.

  • [Info] proxy/src/test/.../ReceiveMessageActivityTest.java — Only this test is updated. Consider adding tests for:

    1. LiteEventDispatcher.hasEvents() — empty map, null clientId, non-empty event set
    2. NotificationProcessor lite consumer path — verifying hasMsgForLiteConsumer and polling service selection
    3. ConsumerProcessor.popMessage with isLiteConsumer=true — verifying correct PopLiteMessageRequestHeader construction
  • [Info] remoting/.../header/NotificationRequestHeader.java — New isLiteConsumer defaults to false and clientId is nullable — good backward compatibility. The toString() with omitNullValues() is a nice touch for log readability.

  • [Info] proxy/.../common/GrpcClientSettingsManager.java:230-233 — The added null check for settings in offlineClientLiteSubscription is a good defensive improvement that prevents NPE when settings have already been evicted.

Suggestions

  1. Add a brief comment in ConsumerProcessor.popMessage explaining why lite consumers omit certain header fields.
  2. Consider adding unit tests for LiteEventDispatcher.hasEvents() and the lite consumer notification path.
  3. If MessagingProcessor is a public SPI, add a deprecation cycle for popLiteMessage before removal.

Cross-repo Note

The NotificationRequestHeader changes (new isLiteConsumer and clientId fields) affect the Broker-Proxy remoting protocol. Since apache/rocketmq-clients communicates via gRPC through the Proxy, this should be transparent to clients — but worth verifying that the Proxy correctly translates these fields in both directions.


Automated review by github-manager-bot

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 51.08696% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.16%. Comparing base (8242c1e) to head (73671fa).

Files with missing lines Patch % Lines
...he/rocketmq/proxy/processor/ConsumerProcessor.java 50.00% 12 Missing and 2 partials ⚠️
...cketmq/broker/processor/NotificationProcessor.java 16.66% 10 Missing ⚠️
...ing/protocol/header/NotificationRequestHeader.java 0.00% 10 Missing ⚠️
...roxy/grpc/v2/common/GrpcClientSettingsManager.java 37.50% 4 Missing and 1 partial ⚠️
...ache/rocketmq/broker/lite/LiteEventDispatcher.java 83.33% 2 Missing ⚠️
...org/apache/rocketmq/proxy/common/ProxyContext.java 83.33% 0 Missing and 1 partial ⚠️
.../rocketmq/proxy/grpc/v2/client/ClientActivity.java 0.00% 0 Missing and 1 partial ⚠️
...proxy/grpc/v2/consumer/ReceiveMessageActivity.java 91.66% 0 Missing and 1 partial ⚠️
...2/consumer/ReceiveMessageResponseStreamWriter.java 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10570      +/-   ##
=============================================
- Coverage      48.22%   48.16%   -0.06%     
+ Complexity     13422    13412      -10     
=============================================
  Files           1378     1378              
  Lines         100817   100832      +15     
  Branches       13040    13050      +10     
=============================================
- Hits           48614    48569      -45     
- Misses         46252    46296      +44     
- Partials        5951     5967      +16     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@fuyou001 fuyou001 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cr

@fuyou001 fuyou001 dismissed their stale review July 1, 2026 12:37

Dismiss accidental changes requested review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support Lite Simple Consumer on server side

4 participants