From 285aab1147e4ced053e745da2802973c9b2b9397 Mon Sep 17 00:00:00 2001 From: Kris20030907 <3185633428@qq.com> Date: Fri, 29 May 2026 19:16:01 +0800 Subject: [PATCH] fix: correct queue permissions in grpc route response --- .../proxy/grpc/v2/route/RouteActivity.java | 68 +++++++------------ .../grpc/v2/route/RouteActivityTest.java | 25 +++++++ 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index 75f7089c5e0..deb46157218 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -242,55 +242,35 @@ protected List genMessageQueueFromQueueData(QueueData queueData, R TopicMessageType topicMessageType, Broker broker) { List messageQueueList = new ArrayList<>(); - int r = 0; - int w = 0; - int rw = 0; - int n = 0; - if (PermName.isWriteable(queueData.getPerm()) && PermName.isReadable(queueData.getPerm())) { - rw = Math.min(queueData.getWriteQueueNums(), queueData.getReadQueueNums()); - r = queueData.getReadQueueNums() - rw; - w = queueData.getWriteQueueNums() - rw; - } else if (PermName.isWriteable(queueData.getPerm())) { - w = queueData.getWriteQueueNums(); - } else if (PermName.isReadable(queueData.getPerm())) { - r = queueData.getReadQueueNums(); - } else if (!PermName.isAccessible(queueData.getPerm())) { - n = Math.max(1, Math.max(queueData.getWriteQueueNums(), queueData.getReadQueueNums())); + boolean topicReadable = PermName.isReadable(queueData.getPerm()); + boolean topicWritable = PermName.isWriteable(queueData.getPerm()); + int queueNums = 0; + if (topicReadable) { + queueNums = Math.max(queueNums, queueData.getReadQueueNums()); } - - // r here means readOnly queue nums, w means writeOnly queue nums, while rw means both readable and writable queue nums. - int queueIdIndex = 0; - for (int i = 0; i < r; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); - } - - for (int i = 0; i < w; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); + if (topicWritable) { + queueNums = Math.max(queueNums, queueData.getWriteQueueNums()); } - - for (int i = 0; i < rw; i++) { - MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.READ_WRITE) - .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) - .build(); - messageQueueList.add(messageQueue); + if (!PermName.isAccessible(queueData.getPerm())) { + queueNums = Math.max(1, Math.max(queueData.getReadQueueNums(), queueData.getWriteQueueNums())); } - for (int i = 0; i < n; i++) { + for (int i = 0; i < queueNums; i++) { + boolean readable = topicReadable && i < queueData.getReadQueueNums(); + boolean writable = topicWritable && i < queueData.getWriteQueueNums(); + Permission permission; + if (readable && writable) { + permission = Permission.READ_WRITE; + } else if (readable) { + permission = Permission.READ; + } else if (writable) { + permission = Permission.WRITE; + } else { + permission = Permission.NONE; + } MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic) - .setId(queueIdIndex++) - .setPermission(Permission.NONE) + .setId(i) + .setPermission(permission) .addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType)) .build(); messageQueueList.add(messageQueue); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java index abbf82452ef..7bf9eb91707 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java @@ -272,6 +272,31 @@ public void testGenPartitionFromQueueData() throws Exception { assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + for (int i = 0; i < 4; i++) { + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + assertEquals(Permission.READ_WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + } + for (int i = 4; i < 8; i++) { + assertEquals(i, partitionWith4R8WPermRW.get(i).getId()); + assertEquals(Permission.WRITE, partitionWith4R8WPermRW.get(i).getPermission()); + } + + // test queueData with 8 read queues, 4 write queues, and rw permission, expect 4 rw queues and 4 read only queues. + QueueData queueDataWith8R4WPermRW = createQueueData(8, 4, PermName.PERM_READ | PermName.PERM_WRITE); + List partitionWith8R4WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R4WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER); + assertEquals(8, partitionWith8R4WPermRW.size()); + assertEquals(8, partitionWith8R4WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count()); + assertEquals(0, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count()); + assertEquals(4, partitionWith8R4WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count()); + for (int i = 0; i < 4; i++) { + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + assertEquals(Permission.READ_WRITE, partitionWith8R4WPermRW.get(i).getPermission()); + } + for (int i = 4; i < 8; i++) { + assertEquals(i, partitionWith8R4WPermRW.get(i).getId()); + assertEquals(Permission.READ, partitionWith8R4WPermRW.get(i).getPermission()); + } // test queueData with 2 read queues, 2 write queues, and no permission, expect 2 no permission queues. QueueData queueDataWith2R2WNoPerm = createQueueData(2, 2, 0);