Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,33 @@ public Channel getAvailableChannel(String groupId) {
return lastActiveChannel;
}

/**
* Get an available channel for the given group, preferring the producer that originally sent the message.
* Falls back to round-robin if the preferred producer is not found or not available.
*
* @param groupId producer group
* @param preferredClientId the clientId of the original producer (from half message properties), may be null
* @return an available channel, or null if none found
*/
public Channel getAvailableChannel(String groupId, String preferredClientId) {
if (groupId == null) {
return null;
}
if (preferredClientId != null) {
ConcurrentMap<Channel, ClientChannelInfo> channelMap = groupChannelTable.get(groupId);
if (channelMap != null) {
for (Map.Entry<Channel, ClientChannelInfo> entry : channelMap.entrySet()) {
if (preferredClientId.equals(entry.getValue().getClientId())
&& entry.getKey().isActive() && entry.getKey().isWritable()) {
return entry.getKey();
}
}
}
}
// Fall back to round-robin selection
return getAvailableChannel(groupId);
}

public Channel findChannel(String clientId) {
return clientChannelTable.get(clientId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
deletePrepareMessage(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void sendCheckMessage(MessageExt msgExt) throws Exception {
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private void sendCheckMessage(MessageExt msgExt) {
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
String producerClientId = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID);
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId, producerClientId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,44 @@ public void testGetAvailableChannel() {
assertThat(c).isNull();
}

@Test
public void testGetAvailableChannelWithPreferredClientId() {
producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true);

// Match: preferred clientId matches registered producer
Channel c = producerManager.getAvailableChannel(group, "clientId");
assertThat(c).isSameAs(channel);
}

@Test
public void testGetAvailableChannelWithPreferredClientIdNotFound() {
producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true);

// No match: falls back to round-robin (returns some channel from group)
Channel c = producerManager.getAvailableChannel(group, "nonExistentClientId");
assertThat(c).isNotNull(); // should fall back to round-robin
}

@Test
public void testGetAvailableChannelWithNullPreferredClientId() {
producerManager.registerProducer(group, clientInfo);
when(channel.isActive()).thenReturn(true);
when(channel.isWritable()).thenReturn(true);

// null clientId: should behave exactly like original getAvailableChannel
Channel c = producerManager.getAvailableChannel(group, null);
assertThat(c).isNotNull();
}

@Test
public void testGetAvailableChannelWithNullGroupId() {
// null groupId with non-null preferredClientId should return null (no NPE)
Channel c = producerManager.getAvailableChannel(null, "someClientId");
assertThat(c).isNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener localTransactionListener, final Object arg)
throws MQClientException {
this.makeSureStateOK();
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionListener && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
Expand All @@ -1444,6 +1445,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID, this.mQClientFactory.getClientId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds an internal routing hint to the half message, but the committed transactional message paths copy all half-message properties into the final message and only clear the existing transaction routing fields. That means consumers can observe __TXN_PRODUCER_CID__, which changes the user-visible message property surface and may expose producer client identity. Please remove MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID when building the final committed message, including both EndTransactionProcessor.endMessageTransaction() and TransactionalMessageUtil.buildTransactionalMessageFromHalfMessage(), and add tests that verify the half message keeps this hint while the committed message does not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed in the latest commit — added MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID) in EndTransactionProcessor alongside the existing PROPERTY_TRANSACTION_PREPARED clear, so the internal routing hint is stripped from the final committed message and not visible to consumers.

try {
sendResult = this.send(msg);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class MessageConst {
public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
public static final String PROPERTY_TRANSACTION_ID = "__transactionId__";
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID = "__TXN_PRODUCER_CID__";
Comment thread
wang-jiahua marked this conversation as resolved.
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT";
Expand Down Expand Up @@ -173,5 +174,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_CRC32);
STRING_HASH_SET.add(PROPERTY_PRIORITY);
STRING_HASH_SET.add(PROPERTY_LITE_TOPIC);
STRING_HASH_SET.add(PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
if (messageExt != null) {
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
final String producerClientId = messageExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PRODUCER_CLIENT_ID);
CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
request.writeCustomHeader(requestHeader);
request.addExtField(ProxyUtils.BROKER_ADDR, NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
Channel channel = this.producerManager.getAvailableChannel(group);
Channel channel = this.producerManager.getAvailableChannel(group, producerClientId);
if (channel != null) {
channel.writeAndFlush(request);
} else {
Expand Down
Loading