[ISSUE #10545] Support queue selector for transactional sends#10548
[ISSUE #10545] Support queue selector for transactional sends#10548Alaske wants to merge 4 commits into
Conversation
|
this PR is ready for review. The change is limited to the Java client and does not modify broker/store/remoting protocol behavior. I verified the focused client tests with JDK 8 locally; details are included in the PR description. Could you please approve the GitHub Actions workflows when you have time? |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Adds MessageQueueSelector support for transactional message sending in the Java client, allowing users to control which queue the transactional half-message is sent to.
Findings
Overall this is a well-structured and well-tested PR. The implementation correctly:
-
Uses a Java 8
defaultmethod inMQProducerto preserve binary compatibility -
Keeps
selectorArgandtransactionArgcleanly separated -
Reuses existing selector queue resolution and validation logic
-
Applies namespace wrapping in
TransactionMQProducerconsistently with the existingsendMessageInTransaction -
Updates the API schema file
-
[Info]
DefaultMQProducerImpl.java:1439— The null check forselectoris performed in bothTransactionMQProducer.sendMessageInTransaction()(line ~109) andDefaultMQProducerImpl.sendMessageInTransaction()(line ~1439). The check in the impl is technically unreachable when called throughTransactionMQProducer, but it's good defensive programming if the impl method is ever called directly. No action needed. -
[Info]
DefaultMQProducerTest.java:785— The testassertSendMessageInTransactionByQueueSelectorverifies thatDefaultMQProducer(notTransactionMQProducer) rejects the selector-based transactional send. The test name could be slightly more descriptive, e.g.assertDefaultMQProducerRejectsSendMessageInTransactionByQueueSelector, to distinguish it from the happy-path test. This is purely cosmetic.
Suggestions
No blocking suggestions. The implementation is clean, the test coverage is thorough (null selector, different-topic rejection, arg separation, end-transaction routing), and backward compatibility is properly maintained.
Cross-repo Note
No changes needed in apache/rocketmq-clients — this is a Java client-only API addition and does not affect the gRPC protocol or broker behavior.
Automated review by github-manager-bot
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10548 +/- ##
=============================================
- Coverage 48.27% 48.17% -0.10%
+ Complexity 13435 13413 -22
=============================================
Files 1377 1378 +1
Lines 100844 100837 -7
Branches 13036 13038 +2
=============================================
- Hits 48678 48583 -95
- Misses 46217 46287 +70
- Partials 5949 5967 +18 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| * @throws MQClientException if there is any client error. | ||
| */ | ||
| @Override | ||
| public TransactionSendResult sendMessageInTransaction(Message msg, |
There was a problem hiding this comment.
Since this method already has the same default implementation in MQProducer, do we still need to duplicate it in DefaultMQProducer?
There was a problem hiding this comment.
Runtime-wise, the override is not strictly required because MQProducer already provides the same default implementation.
I kept it in DefaultMQProducer for two reasons:
- To keep the transactional-send rejection explicit and consistent with the existing
sendMessageInTransaction(Message, Object)method inDefaultMQProducer. - To make the public API change visible in
DefaultMQProducer.schema. The current schema tool collects declared methods from the class/superclasses, but does not include interface default methods.
If you prefer to avoid the duplicated override, I can remove it and adjust the schema expectation accordingly.
There was a problem hiding this comment.
Thanks for the explanation. I think we should keep the style consistent. If we prefer to explicitly implement this method in DefaultMQProducer, should we avoid making it a default method in MQProducer and keep it consistent with the other interface methods?
There was a problem hiding this comment.
Updated. I removed the default implementation from MQProducer and kept the new method as a regular interface method, consistent with the existing sendMessageInTransaction(Message, Object) API.
DefaultMQProducer still explicitly implements the method and rejects transactional sends with the same error message, while TransactionMQProducer provides the actual selector-aware implementation.
I also verified the related tests with JDK 8.
There was a problem hiding this comment.
Hi @yx9o, I pushed the latest test isolation fix and verified the official non-ubuntu Maven workflow command locally with JDK 8: mvn -B package --file pom.xml -Djacoco.skip=true
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Adds MessageQueueSelector support for transactional message sending. Users can now control which queue the transactional half-message is sent to, while keeping selectorArg and transactionArg separate.
Findings
API Design
- [Info]
MQProducer.java:87-103— Newdefaultmethod on the interface preserves binary compatibility for third-partyMQProducerimplementations. Good choice. - [Info]
DefaultMQProducer.java:953-973— Correctly throwsRuntimeExceptiondirecting users toTransactionMQProducer, consistent with the existingsendMessageInTransaction(msg, arg)pattern. - [Info]
TransactionMQProducer.java:104-121— Validates bothtransactionListenerandselectorfor null before proceeding. Namespace wrapping is applied. Clean.
Implementation
- [Info]
DefaultMQProducerImpl.java:1436-1448— The existing public method delegates to the new private 5-parameter method withselector=null, selectorArg=null. This preserves the original behavior exactly. - [Info]
DefaultMQProducerImpl.java:1463-1469— Whenselector != null, usesinvokeMessageQueueSelectorto resolve the queue, then callssend(msg, mq)for the specific-queue path. This correctly reuses existing selector validation (including the topic mismatch check tested intestSendMessageInTransactionBySelectorWithDifferentTopicQueue). - [Info] The separation of
selectorArg(passed only toMessageQueueSelector.select()) andtransactionArg(passed only toTransactionListener.executeLocalTransaction()) is clean and avoids the ambiguity of a single shared argument.
Tests
- [Info]
DefaultMQProducerTest.java— Comprehensive test coverage:assertSendMessageInTransactionByQueueSelector: VerifiesDefaultMQProducerrejects transactional sendstestSendMessageInTransactionByQueueSelector: Full integration test verifying selector arg routing, transaction arg routing, queue selection, and end-transaction routingtestSendMessageInTransactionByNullQueueSelector: Null selector rejectiontestSendMessageInTransactionBySelectorWithDifferentTopicQueue: Topic mismatch rejection
- [Info]
api/client.producer.DefaultMQProducer.schema— Public API schema updated to include the new method.
Suggestions
- [Warning]
DefaultMQProducerImpl.java:1463-1469— The selector-based path usesthis.send(msg, mq)which goes through the specific-queue send path. This path does not perform the broker/queue reselection retry that the defaultthis.send(msg)path does (as noted in the Javadoc). This is documented and intentional, but worth confirming that users understand the trade-off: they get deterministic queue selection at the cost of no automatic retry on broker unavailability. - [Info] The
@Test(expected = RuntimeException.class)onassertSendMessageInTransactionByQueueSelectoris a bit loose — it catches anyRuntimeExceptionrather than asserting a specific message. ThetestSendMessageInTransactionByNullQueueSelectortest does this better with explicit message assertion. Minor style point.
Verdict
Well-designed API extension with proper backward compatibility, clean argument separation, and thorough test coverage. LGTM.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
New commit dffbe61 ("remove MQProducer default method") changes the API approach: the new sendMessageInTransaction(Message, MessageQueueSelector, Object, Object) method on MQProducer is now abstract instead of default.
Changes Since Last Review
MQProducer.java— Method added as abstract (wasdefault). This is a binary compatibility break for any third-party class that implementsMQProducerdirectly — those implementations will fail to compile until they add the new method.DefaultMQProducerImpl.java— Same delegation pattern as before: existing public method delegates to a new private 5-parameter method; new public overload validates selector and delegates.DefaultMQProducer.java/TransactionMQProducer.java— Unchanged from previous review; both implement the new abstract method.- Tests — Unchanged; coverage remains comprehensive.
Findings
- [Warning]
MQProducer.java:87-103— Adding an abstract method to a public interface is a breaking change for third-party implementors. If binary/source compatibility is a goal (as it was with the previousdefaultmethod approach), consider keeping thedefaultkeyword and throwingUnsupportedOperationExceptionin the body. If the project has decided this interface change is acceptable, the change is otherwise clean. - [Info]
DefaultMQProducerImpl.java:1436-1448— The refactoring into a private 5-parameter method is clean. The original public method delegates withselector=null, selectorArg=null, preserving exact original behavior. - [Info] The selector-based path correctly uses
invokeMessageQueueSelector+send(msg, mq), consistent with normal selector sends.
Suggestions
- If the abstract method approach is intentional, consider adding a note in the PR description or commit message explaining the compatibility trade-off.
- The rest of the implementation and tests look solid.
Automated review by github-manager-bot
|
The transaction producer test setup now uses a dedicated MQClientInstance instead of the shared test-level client factory. When the full client test suite runs, other DefaultMQProducerTest cases can mutate or stub the shared MQClientInstance state. The new transactional selector test depends on broker lookup through findBrokerAddressInPublish(...) after the selector chooses a queue, so sharing that client factory made the test order-sensitive. To avoid cross-test pollution, prepareTransactionProducer(...) now creates an isolated MQClientInstance spy for the transaction producer, injects the mocked MQClientAPIImpl into it, and stubs only the broker lookup required by this test path. This keeps the transactional send selector test deterministic in both focused runs and the full Maven package run. |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
Adds MessageQueueSelector support for transactional message sending, allowing users to control which queue the transactional half-message is sent to.
Changes Since Last Review
- test: isolate transaction producer client factory (59b995) — Uses a
spy()on a separateMQClientInstancefor transaction producer tests instead of sharing the mock. Good improvement for test isolation — prevents cross-test state leakage.
Findings
- [Info]
DefaultMQProducerImpl.java— The new public overloadsendMessageInTransaction(Message, MessageQueueSelector, Object, Object)correctly validates null selector before delegating. The private implementation cleanly handles both selector and non-selector paths. - [Info]
TransactionMQProducer.java— Proper delegation pattern with null check on selector. Consistent with existingsend()overloads. - [Info] Test coverage is solid: normal flow, null selector, topic mismatch, and the new test isolation improvement.
Verdict
Clean implementation following existing patterns. The latest commit improves test isolation. LGTM.
Automated review by github-manager-bot
|
If everything looks good and the required checks are green, could you please help approve and merge this PR? @yx9o |
Which Issue(s) This PR Fixes
Fixes #10545
Brief Description
This PR adds
MessageQueueSelectorsupport for transactional message sending in the Java client.Currently normal message sends can use
MessageQueueSelector, but transactional sends only support:This PR adds a selector-aware overload so users can control the real queue used by the transactional half message:
How Did You Implement It
defaultmethod toMQProducerto preserve binary compatibility for third-party implementations.DefaultMQProducerbehavior unchanged by explicitly rejecting transactional sends and directing users toTransactionMQProducer.TransactionMQProducer.DefaultMQProducerImpl.selectorArgandtransactionArgseparate:selectorArgis passed only toMessageQueueSelector.transactionArgis passed only toTransactionListener#executeLocalTransaction.REAL_TOPIC/REAL_QIDfor half messages and restores the real queue when committing.How to Verify It
Added unit tests covering:
TransactionMQProducersends transactional messages to the queue selected byMessageQueueSelector.selectorArgis passed to the selector andtransactionArgis passed to the local transaction listener.DefaultMQProducerstill rejects transactional sends.Verified with JDK 8:
mvn -pl client \ -Dtest=DefaultMQProducerTest#assertSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByQueueSelector+testSendMessageInTransactionByNullQueueSelector+testSendMessageInTransactionBySelectorWithDifferentTopicQueue \ -DskipITs testResult:
The public API schema for
DefaultMQProducerwas also updated.