From ccc962f7f80b668c837807da1d2604680f01a10f Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Tue, 30 Jun 2026 10:22:09 +0800 Subject: [PATCH] [ISSUE #9791] Prefer original producer when checking transaction state When multiple producers in the same group handle different topics, the transaction check may be routed to an unrelated producer that cannot determine the transaction state correctly. Fix: Record the original producer's clientId in the half message properties. During transaction check-back, prefer the channel matching that clientId. Fall back to round-robin if the original producer is offline. Changes: - MessageConst: add PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID - DefaultMQProducerImpl: write clientId into half message - ProducerManager: add getAvailableChannel(groupId, preferredClientId) - AbstractTransactionalMessageCheckListener: use new method - TransactionalMessageRocksDBService: use new method - ProducerManagerTest: 3 new test cases --- .../broker/client/ProducerManager.java | 27 +++++++++++++ .../processor/EndTransactionProcessor.java | 1 + ...ractTransactionalMessageCheckListener.java | 3 +- .../TransactionalMessageRocksDBService.java | 3 +- .../broker/client/ProducerManagerTest.java | 40 +++++++++++++++++++ .../impl/producer/DefaultMQProducerImpl.java | 2 + .../rocketmq/common/message/MessageConst.java | 2 + .../client/ProxyClientRemotingProcessor.java | 3 +- 8 files changed, 78 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index bc8400c19a2..9cdca3bac53 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -316,6 +316,33 @@ public Channel getAvailableChannel(String groupId) { return lastActiveChannel; } + /** + * Get an available channel for the given group, preferring the producer that originally sent the message. + * Falls back to round-robin if the preferred producer is not found or not available. + * + * @param groupId producer group + * @param preferredClientId the clientId of the original producer (from half message properties), may be null + * @return an available channel, or null if none found + */ + public Channel getAvailableChannel(String groupId, String preferredClientId) { + if (groupId == null) { + return null; + } + if (preferredClientId != null) { + ConcurrentMap channelMap = groupChannelTable.get(groupId); + if (channelMap != null) { + for (Map.Entry entry : channelMap.entrySet()) { + if (preferredClientId.equals(entry.getValue().getClientId()) + && entry.getKey().isActive() && entry.getKey().isWritable()) { + return entry.getKey(); + } + } + } + } + // Fall back to round-robin selection + return getAvailableChannel(groupId); + } + public Channel findChannel(String clientId) { return clientChannelTable.get(clientId); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 2be2e188023..a2440fc7412 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -145,6 +145,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { deletePrepareMessage(result); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index d1b77355b03..6a357a6e782 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -61,7 +61,8 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception { msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); + String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); + Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java index dbd3575d69c..b0ff35d45a7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java @@ -234,7 +234,8 @@ private void sendCheckMessage(MessageExt msgExt) { msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); + String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); + Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index 451b0e044c7..39ffdf88ca9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -225,4 +225,44 @@ public void testGetAvailableChannel() { assertThat(c).isNull(); } + @Test + public void testGetAvailableChannelWithPreferredClientId() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // Match: preferred clientId matches registered producer + Channel c = producerManager.getAvailableChannel(group, "clientId"); + assertThat(c).isSameAs(channel); + } + + @Test + public void testGetAvailableChannelWithPreferredClientIdNotFound() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // No match: falls back to round-robin (returns some channel from group) + Channel c = producerManager.getAvailableChannel(group, "nonExistentClientId"); + assertThat(c).isNotNull(); // should fall back to round-robin + } + + @Test + public void testGetAvailableChannelWithNullPreferredClientId() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // null clientId: should behave exactly like original getAvailableChannel + Channel c = producerManager.getAvailableChannel(group, null); + assertThat(c).isNotNull(); + } + + @Test + public void testGetAvailableChannelWithNullGroupId() { + // null groupId with non-null preferredClientId should return null (no NPE) + Channel c = producerManager.getAvailableChannel(null, "someClientId"); + assertThat(c).isNull(); + } + } \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f68742949ff..f891ade42f7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1433,6 +1433,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener localTransactionListener, final Object arg) throws MQClientException { + this.makeSureStateOK(); TransactionListener transactionListener = getCheckListener(); if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); @@ -1444,6 +1445,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID, this.mQClientFactory.getClientId()); try { sendResult = this.send(msg); } catch (Exception e) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 77ab3f2cb9f..b8c95f8dc16 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -52,6 +52,7 @@ public class MessageConst { public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET"; public static final String PROPERTY_TRANSACTION_ID = "__transactionId__"; public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; + public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID = "__TXN_PRODUCER_CID__"; public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID"; public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT"; @@ -173,5 +174,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_CRC32); STRING_HASH_SET.add(PROPERTY_PRIORITY); STRING_HASH_SET.add(PROPERTY_LITE_TOPIC); + STRING_HASH_SET.add(PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java index 10a8f3df50d..09317a2e500 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java @@ -67,11 +67,12 @@ public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, if (messageExt != null) { final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { + final String producerClientId = messageExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); request.writeCustomHeader(requestHeader); request.addExtField(ProxyUtils.BROKER_ADDR, NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); - Channel channel = this.producerManager.getAvailableChannel(group); + Channel channel = this.producerManager.getAvailableChannel(group, producerClientId); if (channel != null) { channel.writeAndFlush(request); } else {