From 8588038729a4b97ddf568a987130b8d00792b1f8 Mon Sep 17 00:00:00 2001 From: yibai Date: Thu, 25 Jun 2026 19:39:05 +0800 Subject: [PATCH 1/4] Support queue selector for transactional sends (#10545) --- .../impl/producer/DefaultMQProducerImpl.java | 23 ++- .../client/producer/DefaultMQProducer.java | 21 +++ .../rocketmq/client/producer/MQProducer.java | 20 +++ .../producer/TransactionMQProducer.java | 29 ++++ .../producer/DefaultMQProducerTest.java | 147 ++++++++++++++++++ .../client.producer.DefaultMQProducer.schema | 1 + 6 files changed, 240 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f68742949ff..bcdcabd0a88 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1433,6 +1433,21 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener localTransactionListener, final Object arg) throws MQClientException { + return sendMessageInTransaction(msg, localTransactionListener, null, null, arg); + } + + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object arg) + throws MQClientException { + if (selector == null) { + throw new MQClientException("MessageQueueSelector is null", null); + } + return sendMessageInTransaction(msg, null, selector, selectorArg, arg); + } + + private TransactionSendResult sendMessageInTransaction(final Message msg, + final TransactionListener localTransactionListener, final MessageQueueSelector selector, + final Object selectorArg, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); @@ -1445,7 +1460,13 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { - sendResult = this.send(msg); + if (selector == null) { + sendResult = this.send(msg); + } else { + MessageQueue mq = this.invokeMessageQueueSelector(msg, selector, selectorArg, + this.defaultMQProducer.getSendMsgTimeout()); + sendResult = this.send(msg, mq); + } } catch (Exception e) { throw new MQClientException("send message Exception", e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 2091bbabbff..fc422005bb8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -950,6 +950,27 @@ public TransactionSendResult sendMessageInTransaction(Message msg, throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * This method is used to send transactional messages with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg Transactional message to send. + * @param selector Message queue selector. + * @param selectorArg Argument used by the selector. + * @param transactionArg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException if there is any client error. + */ + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, + MessageQueueSelector selector, Object selectorArg, Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. * diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 4286fdd7f96..13a652dcc42 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -84,6 +84,26 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; + /** + * Send a transactional message with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + default TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); + } + //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 5c7b437809a..1c83bb20de1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -89,6 +89,35 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } + /** + * Send a transactional message with a message queue selector. + * + *

The {@code selectorArg} is passed only to {@link MessageQueueSelector#select(List, Message, Object)}. + * The {@code transactionArg} is passed only to {@link TransactionListener#executeLocalTransaction(Message, Object)}. + * This method follows the existing selector send semantics and does not perform the broker or queue reselection + * retry used by the default send path.

+ * + * @param msg transactional message to send. + * @param selector message queue selector. + * @param selectorArg argument used by the selector. + * @param transactionArg argument used by the local transaction executor. + * @return transaction result. + * @throws MQClientException if the message cannot be sent. + */ + @Override + public TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { + if (null == this.transactionListener) { + throw new MQClientException("TransactionListener is null", null); + } + if (null == selector) { + throw new MQClientException("MessageQueueSelector is null", null); + } + + msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, selector, selectorArg, transactionArg); + } + public TransactionCheckListener getTransactionCheckListener() { return transactionCheckListener; } diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 33cf0df390d..2d4c5c0c7b5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -32,11 +32,13 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -50,6 +52,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.lang.reflect.Field; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,6 +66,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; @@ -77,6 +81,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -777,6 +782,148 @@ public void assertSendMessageInTransaction() throws MQClientException { assertNull(result); } + @Test(expected = RuntimeException.class) + public void assertSendMessageInTransactionByQueueSelector() throws MQClientException { + MessageQueueSelector selector = mock(MessageQueueSelector.class); + TransactionSendResult result = producer.sendMessageInTransaction(message, selector, 1, 2); + assertNull(result); + } + + @Test + public void testSendMessageInTransactionByQueueSelector() throws Exception { + final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime(); + TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup); + final AtomicReference actualTransactionArg = new AtomicReference<>(); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + actualTransactionArg.set(arg); + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + transactionProducer.setNamesrvAddr("127.0.0.1:9876"); + transactionProducer.start(); + + try { + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); + transactionProducer.getDefaultMQProducerImpl().getMqClientFactory() + .registerProducer(transactionGroup, transactionProducer.getDefaultMQProducerImpl()); + + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + + final AtomicReference requestHeaderRef = new AtomicReference<>(); + doAnswer(invocation -> { + SendMessageRequestHeader requestHeader = invocation.getArgument(3); + requestHeaderRef.set(requestHeader); + SendResult sendResult = createSendResult(SendStatus.SEND_OK); + sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1)); + sendResult.setMessageQueue(new MessageQueue(topic, "BrokerA", requestHeader.getQueueId())); + return sendResult; + }).when(mQClientAPIImpl).sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), + nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)); + + final AtomicReference endTransactionBrokerAddr = new AtomicReference<>(); + final AtomicReference endTransactionRequestHeader = new AtomicReference<>(); + doAnswer(invocation -> { + endTransactionBrokerAddr.set(invocation.getArgument(0)); + endTransactionRequestHeader.set(invocation.getArgument(1)); + return null; + }).when(mQClientAPIImpl).endTransactionOneway(anyString(), any(EndTransactionRequestHeader.class), + nullable(String.class), anyLong()); + + final AtomicReference actualSelectorArg = new AtomicReference<>(); + TransactionSendResult result = transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + actualSelectorArg.set(arg); + return mqs.get(2); + } + }, "selectorArg", "transactionArg"); + + assertThat(actualSelectorArg.get()).isEqualTo("selectorArg"); + assertThat(actualTransactionArg.get()).isEqualTo("transactionArg"); + assertThat(requestHeaderRef.get().getQueueId()).isEqualTo(2); + assertThat(result.getMessageQueue()).isEqualTo(new MessageQueue(topic, "BrokerA", 2)); + assertThat(endTransactionBrokerAddr.get()).isEqualTo("127.0.0.1:10911"); + assertThat(endTransactionRequestHeader.get().getBrokerName()).isEqualTo("BrokerA"); + } finally { + transactionProducer.shutdown(); + } + } + + @Test + public void testSendMessageInTransactionByNullQueueSelector() { + TransactionMQProducer transactionProducer = new TransactionMQProducer(producerGroupPrefix + "_transaction_" + System.nanoTime()); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + + try { + transactionProducer.sendMessageInTransaction(message, null, "selectorArg", "transactionArg"); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("MessageQueueSelector is null"); + } + } + + @Test + public void testSendMessageInTransactionBySelectorWithDifferentTopicQueue() throws Exception { + final String transactionGroup = producerGroupPrefix + "_transaction_" + System.nanoTime(); + TransactionMQProducer transactionProducer = new TransactionMQProducer(transactionGroup); + transactionProducer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + transactionProducer.setNamesrvAddr("127.0.0.1:9876"); + transactionProducer.start(); + + try { + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); + transactionProducer.getDefaultMQProducerImpl().getMqClientFactory() + .registerProducer(transactionGroup, transactionProducer.getDefaultMQProducerImpl()); + + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + + transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return new MessageQueue("OtherTopic", mqs.get(0).getBrokerName(), mqs.get(0).getQueueId()); + } + }, "selectorArg", "transactionArg"); + failBecauseExceptionWasNotThrown(MQClientException.class); + } catch (MQClientException e) { + assertThat(e).hasMessageContaining("send message Exception"); + assertThat(e.getCause()).hasMessageContaining("message's topic not equal mq's topic"); + } finally { + transactionProducer.shutdown(); + } + } + @Test public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException { setDefaultMQProducerImpl(); diff --git a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema index d1111fb4572..1747a1be8de 100644 --- a/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema +++ b/test/src/test/resources/schema/api/client.producer.DefaultMQProducer.schema @@ -122,6 +122,7 @@ Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq Method send(org.apache.rocketmq.client.producer.SendCallback,org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (void) Method send(org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.SendResult) Method send(org.apache.rocketmq.common.message.Message,org.apache.rocketmq.common.message.MessageQueue) : public throws (org.apache.rocketmq.client.producer.SendResult) +Method sendMessageInTransaction(java.lang.Object,java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendMessageInTransaction(java.lang.Object,org.apache.rocketmq.common.message.Message) : public throws (org.apache.rocketmq.client.producer.TransactionSendResult) Method sendOneway(java.lang.Object,org.apache.rocketmq.client.producer.MessageQueueSelector,org.apache.rocketmq.common.message.Message) : public throws (void) Method sendOneway(org.apache.rocketmq.common.message.Message) : public throws (void) From 2a1a4435f75380ed9396502ea0f422bf2ff04eec Mon Sep 17 00:00:00 2001 From: yibai Date: Fri, 26 Jun 2026 17:04:18 +0800 Subject: [PATCH 2/4] test: stabilize transactional selector producer test --- .../producer/DefaultMQProducerTest.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 2d4c5c0c7b5..ff549b1db2c 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.message.Message; @@ -82,6 +83,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -807,17 +809,9 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { } }); transactionProducer.setNamesrvAddr("127.0.0.1:9876"); - transactionProducer.start(); + prepareTransactionProducer(transactionProducer); try { - Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); - transactionProducer.getDefaultMQProducerImpl().getMqClientFactory() - .registerProducer(transactionGroup, transactionProducer.getDefaultMQProducerImpl()); - - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - final AtomicReference requestHeaderRef = new AtomicReference<>(); doAnswer(invocation -> { SendMessageRequestHeader requestHeader = invocation.getArgument(3); @@ -855,7 +849,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { assertThat(endTransactionBrokerAddr.get()).isEqualTo("127.0.0.1:10911"); assertThat(endTransactionRequestHeader.get().getBrokerName()).isEqualTo("BrokerA"); } finally { - transactionProducer.shutdown(); + transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv(); } } @@ -898,17 +892,9 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { } }); transactionProducer.setNamesrvAddr("127.0.0.1:9876"); - transactionProducer.start(); + prepareTransactionProducer(transactionProducer); try { - Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); - transactionProducer.getDefaultMQProducerImpl().getMqClientFactory() - .registerProducer(transactionGroup, transactionProducer.getDefaultMQProducerImpl()); - - when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); - transactionProducer.sendMessageInTransaction(message, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { @@ -920,10 +906,23 @@ public MessageQueue select(List mqs, Message msg, Object arg) { assertThat(e).hasMessageContaining("send message Exception"); assertThat(e.getCause()).hasMessageContaining("message's topic not equal mq's topic"); } finally { - transactionProducer.shutdown(); + transactionProducer.getDefaultMQProducerImpl().destroyTransactionEnv(); } } + private void prepareTransactionProducer(TransactionMQProducer transactionProducer) throws Exception { + transactionProducer.getDefaultMQProducerImpl().initTransactionEnv(); + mQClientFactory.getClientConfig().setNamesrvAddr(transactionProducer.getNamesrvAddr()); + TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, createTopicRoute()); + topicPublishInfo.setHaveTopicRouterInfo(true); + transactionProducer.getDefaultMQProducerImpl().updateTopicPublishInfo(topic, topicPublishInfo); + doReturn("127.0.0.1:10911").when(mQClientFactory).findBrokerAddressInPublish(anyString()); + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); + transactionProducer.getDefaultMQProducerImpl().setServiceState(ServiceState.RUNNING); + } + @Test public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException { setDefaultMQProducerImpl(); From dffbe6154313694f96eacf99ee402d36b77f9444 Mon Sep 17 00:00:00 2001 From: yibai Date: Mon, 29 Jun 2026 10:39:00 +0800 Subject: [PATCH 3/4] remove MQProducer default method --- .../org/apache/rocketmq/client/producer/MQProducer.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 13a652dcc42..609127da5e5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -99,10 +99,8 @@ TransactionSendResult sendMessageInTransaction(final Message msg, * @return transaction result. * @throws MQClientException if the message cannot be sent. */ - default TransactionSendResult sendMessageInTransaction(final Message msg, - final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException { - throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); - } + TransactionSendResult sendMessageInTransaction(final Message msg, + final MessageQueueSelector selector, final Object selectorArg, final Object transactionArg) throws MQClientException; //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, From 59b995327b6245c88717d8dc4ba3f6f9ce4af73b Mon Sep 17 00:00:00 2001 From: yibai Date: Mon, 29 Jun 2026 13:38:12 +0800 Subject: [PATCH 4/4] test: isolate transaction producer client factory --- .../client/producer/DefaultMQProducerTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index ff549b1db2c..dc8f3eb1980 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -85,6 +85,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -911,15 +912,21 @@ public MessageQueue select(List mqs, Message msg, Object arg) { } private void prepareTransactionProducer(TransactionMQProducer transactionProducer) throws Exception { + MQClientInstance transactionMQClientFactory = spy(new MQClientInstance(new ClientConfig(), 0, + transactionProducer.getProducerGroup() + "_client")); + Field apiField = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + apiField.setAccessible(true); + apiField.set(transactionMQClientFactory, mQClientAPIImpl); + transactionProducer.getDefaultMQProducerImpl().initTransactionEnv(); - mQClientFactory.getClientConfig().setNamesrvAddr(transactionProducer.getNamesrvAddr()); + transactionMQClientFactory.getClientConfig().setNamesrvAddr(transactionProducer.getNamesrvAddr()); TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, createTopicRoute()); topicPublishInfo.setHaveTopicRouterInfo(true); transactionProducer.getDefaultMQProducerImpl().updateTopicPublishInfo(topic, topicPublishInfo); - doReturn("127.0.0.1:10911").when(mQClientFactory).findBrokerAddressInPublish(anyString()); + doReturn("127.0.0.1:10911").when(transactionMQClientFactory).findBrokerAddressInPublish(anyString()); Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); - field.set(transactionProducer.getDefaultMQProducerImpl(), mQClientFactory); + field.set(transactionProducer.getDefaultMQProducerImpl(), transactionMQClientFactory); transactionProducer.getDefaultMQProducerImpl().setServiceState(ServiceState.RUNNING); }