diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index bc8400c19a2..9cdca3bac53 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -316,6 +316,33 @@ public Channel getAvailableChannel(String groupId) { return lastActiveChannel; } + /** + * Get an available channel for the given group, preferring the producer that originally sent the message. + * Falls back to round-robin if the preferred producer is not found or not available. + * + * @param groupId producer group + * @param preferredClientId the clientId of the original producer (from half message properties), may be null + * @return an available channel, or null if none found + */ + public Channel getAvailableChannel(String groupId, String preferredClientId) { + if (groupId == null) { + return null; + } + if (preferredClientId != null) { + ConcurrentMap channelMap = groupChannelTable.get(groupId); + if (channelMap != null) { + for (Map.Entry entry : channelMap.entrySet()) { + if (preferredClientId.equals(entry.getValue().getClientId()) + && entry.getKey().isActive() && entry.getKey().isWritable()) { + return entry.getKey(); + } + } + } + } + // Fall back to round-robin selection + return getAvailableChannel(groupId); + } + public Channel findChannel(String clientId) { return clientChannelTable.get(clientId); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 2be2e188023..a2440fc7412 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -145,6 +145,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { deletePrepareMessage(result); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index d1b77355b03..6a357a6e782 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -61,7 +61,8 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception { msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); + String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); + Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java index dbd3575d69c..b0ff35d45a7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java @@ -234,7 +234,8 @@ private void sendCheckMessage(MessageExt msgExt) { msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); - Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); + String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); + Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index 451b0e044c7..39ffdf88ca9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -225,4 +225,44 @@ public void testGetAvailableChannel() { assertThat(c).isNull(); } + @Test + public void testGetAvailableChannelWithPreferredClientId() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // Match: preferred clientId matches registered producer + Channel c = producerManager.getAvailableChannel(group, "clientId"); + assertThat(c).isSameAs(channel); + } + + @Test + public void testGetAvailableChannelWithPreferredClientIdNotFound() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // No match: falls back to round-robin (returns some channel from group) + Channel c = producerManager.getAvailableChannel(group, "nonExistentClientId"); + assertThat(c).isNotNull(); // should fall back to round-robin + } + + @Test + public void testGetAvailableChannelWithNullPreferredClientId() { + producerManager.registerProducer(group, clientInfo); + when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); + + // null clientId: should behave exactly like original getAvailableChannel + Channel c = producerManager.getAvailableChannel(group, null); + assertThat(c).isNotNull(); + } + + @Test + public void testGetAvailableChannelWithNullGroupId() { + // null groupId with non-null preferredClientId should return null (no NPE) + Channel c = producerManager.getAvailableChannel(null, "someClientId"); + assertThat(c).isNull(); + } + } \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index f68742949ff..f891ade42f7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1433,6 +1433,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener localTransactionListener, final Object arg) throws MQClientException { + this.makeSureStateOK(); TransactionListener transactionListener = getCheckListener(); if (null == localTransactionListener && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); @@ -1444,6 +1445,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID, this.mQClientFactory.getClientId()); try { sendResult = this.send(msg); } catch (Exception e) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 77ab3f2cb9f..b8c95f8dc16 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -52,6 +52,7 @@ public class MessageConst { public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET"; public static final String PROPERTY_TRANSACTION_ID = "__transactionId__"; public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; + public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID = "__TXN_PRODUCER_CID__"; public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID"; public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID"; public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT"; @@ -173,5 +174,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_CRC32); STRING_HASH_SET.add(PROPERTY_PRIORITY); STRING_HASH_SET.add(PROPERTY_LITE_TOPIC); + STRING_HASH_SET.add(PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java index 10a8f3df50d..09317a2e500 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ProxyClientRemotingProcessor.java @@ -67,11 +67,12 @@ public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, if (messageExt != null) { final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { + final String producerClientId = messageExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID); CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); request.writeCustomHeader(requestHeader); request.addExtField(ProxyUtils.BROKER_ADDR, NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); - Channel channel = this.producerManager.getAvailableChannel(group); + Channel channel = this.producerManager.getAvailableChannel(group, producerClientId); if (channel != null) { channel.writeAndFlush(request); } else {