From f7bef92384ab08825e0717b7195ccd0a91789347 Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Wed, 24 Jun 2026 12:04:23 +0800 Subject: [PATCH] [ISSUE #10537] Cache RPC Attributes in RemotingMetricsManager - RemotingMetricsManager: add getOrBuildAttributes() with long-key cache - NettyRemotingAbstract: use cached Attributes instead of per-RPC AttributesBuilder - NettyRemotingAbstract: add log.isDebugEnabled() guard for debug log - RemotingMetricsConstant: add LABEL_*_KEY static AttributeKey instances --- .../metrics/RemotingMetricsManager.java | 39 ++++++++ .../remoting/netty/NettyRemotingAbstract.java | 91 +++++++++---------- .../metrics/RemotingMetricsManagerTest.java | 89 ++++++++++++++++++ 3 files changed, 173 insertions(+), 46 deletions(-) create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManagerTest.java diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java index 09bff2c31e1..f8e011ade80 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java @@ -30,6 +30,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.metrics.NopLongHistogram; @@ -37,14 +38,21 @@ import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT_KEY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED; public class RemotingMetricsManager { private LongHistogram rpcLatency = new NopLongHistogram(); private Supplier attributesBuilderSupplier; + private final ConcurrentHashMap attributesCache = new ConcurrentHashMap<>(); public RemotingMetricsManager() { } @@ -87,6 +95,37 @@ public List> getMetricsView() { return Lists.newArrayList(new Pair<>(selector, viewBuilder)); } + public Attributes getOrBuildAttributes(int requestCode, int responseCode, + boolean isLongPolling, String result) { + int resultIdx; + if (RESULT_SUCCESS.equals(result)) resultIdx = 0; + else if (RESULT_ONEWAY.equals(result)) resultIdx = 1; + else if (RESULT_WRITE_CHANNEL_FAILED.equals(result)) resultIdx = 2; + else if (RESULT_CANCELED.equals(result)) resultIdx = 3; + else resultIdx = -1; + + if (resultIdx < 0) { + return buildAttributes(requestCode, responseCode, isLongPolling, result); + } + + long key = ((long) requestCode << 19) + | ((long) (responseCode & 0xFFFF) << 3) + | (isLongPolling ? 4L : 0L) + | resultIdx; + return attributesCache.computeIfAbsent(key, + k -> buildAttributes(requestCode, responseCode, isLongPolling, result)); + } + + private Attributes buildAttributes(int requestCode, int responseCode, + boolean isLongPolling, String result) { + return newAttributesBuilder() + .put(LABEL_IS_LONG_POLLING_KEY, isLongPolling) + .put(LABEL_REQUEST_CODE_KEY, RemotingHelper.getRequestCodeDesc(requestCode)) + .put(LABEL_RESPONSE_CODE_KEY, RemotingHelper.getResponseCodeDesc(responseCode)) + .put(LABEL_RESULT_KEY, result) + .build(); + } + public String getWriteAndFlushResult(Future future) { String result = RESULT_SUCCESS; if (future.isCancelled()) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index a735f8455d3..2bad8344310 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -66,10 +66,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; +import io.opentelemetry.api.common.Attributes; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT_KEY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED; @@ -235,19 +234,14 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null if (response == null) { return; } - final AttributesBuilder attributesBuilder; - if (remotingMetricsManager != null) { - attributesBuilder = remotingMetricsManager.newAttributesBuilder(); - attributesBuilder.put(LABEL_IS_LONG_POLLING, request.isSuspended()) - .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode())) - .put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode())); - } else { - attributesBuilder = null; - } + final int requestCode = request.getCode(); + final int responseCode = response.getCode(); + final boolean isLongPolling = request.isSuspended(); if (request.isOnewayRPC()) { - if (attributesBuilder != null) { - attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + if (remotingMetricsManager != null) { + Attributes attrs = remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, RESULT_ONEWAY); + remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } return; } @@ -256,15 +250,19 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null try { channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}", - request.getCode(), response.getCode(), response.getOpaque(), channel); + if (log.isDebugEnabled()) { + log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}", + requestCode, responseCode, response.getOpaque(), channel); + } } else { log.error("Failed to write response[request code: {}, response code: {}, opaque: {}] to channel{}", - request.getCode(), response.getCode(), response.getOpaque(), channel, future.cause()); + requestCode, responseCode, response.getOpaque(), channel, future.cause()); } if (remotingMetricsManager != null) { - attributesBuilder.put(LABEL_RESULT, remotingMetricsManager.getWriteAndFlushResult(future)); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + String result = remotingMetricsManager.getWriteAndFlushResult(future); + Attributes attrs = remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, result); + remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } if (callback != null) { callback.accept(future); @@ -275,8 +273,9 @@ public static void writeResponse(Channel channel, RemotingCommand request, @Null log.error(request.toString()); log.error(response.toString()); if (remotingMetricsManager != null) { - attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED); - remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + Attributes attrs = remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, RESULT_WRITE_CHANNEL_FAILED); + remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } } @@ -287,19 +286,14 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re if (response == null) { return; } - final AttributesBuilder attributesBuilder; - if (this.remotingMetricsManager != null) { - attributesBuilder = this.remotingMetricsManager.newAttributesBuilder(); - attributesBuilder.put(LABEL_IS_LONG_POLLING, request.isSuspended()) - .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode())) - .put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(response.getCode())); - } else { - attributesBuilder = null; - } + final int requestCode = request.getCode(); + final int responseCode = response.getCode(); + final boolean isLongPolling = request.isSuspended(); if (request.isOnewayRPC()) { - if (attributesBuilder != null) { - attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + if (this.remotingMetricsManager != null) { + Attributes attrs = this.remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, RESULT_ONEWAY); + this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } return; } @@ -308,15 +302,19 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re try { channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}", - request.getCode(), response.getCode(), response.getOpaque(), channel); + if (log.isDebugEnabled()) { + log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}", + requestCode, responseCode, response.getOpaque(), channel); + } } else { log.error("Failed to write response[request code: {}, response code: {}, opaque: {}] to channel{}", - request.getCode(), response.getCode(), response.getOpaque(), channel, future.cause()); + requestCode, responseCode, response.getOpaque(), channel, future.cause()); } - if (this.remotingMetricsManager != null && attributesBuilder != null) { - attributesBuilder.put(LABEL_RESULT, this.remotingMetricsManager.getWriteAndFlushResult(future)); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + if (this.remotingMetricsManager != null) { + String result = this.remotingMetricsManager.getWriteAndFlushResult(future); + Attributes attrs = this.remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, result); + this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } if (callback != null) { callback.accept(future); @@ -326,9 +324,10 @@ public void writeResponse(Channel channel, RemotingCommand request, @Nullable Re log.error("process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); - if (this.remotingMetricsManager != null && attributesBuilder != null) { - attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED); - this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); + if (this.remotingMetricsManager != null) { + Attributes attrs = this.remotingMetricsManager.getOrBuildAttributes( + requestCode, responseCode, isLongPolling, RESULT_WRITE_CHANNEL_FAILED); + this.remotingMetricsManager.getRpcLatency().record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attrs); } } } @@ -394,8 +393,8 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin } catch (Throwable e) { if (remotingMetricsManager != null) { AttributesBuilder attributesBuilder = remotingMetricsManager.newAttributesBuilder() - .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(cmd.getCode())) - .put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED); + .put(LABEL_REQUEST_CODE_KEY, RemotingHelper.getRequestCodeDesc(cmd.getCode())) + .put(LABEL_RESULT_KEY, RESULT_PROCESS_REQUEST_FAILED); remotingMetricsManager.getRpcLatency().record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build()); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManagerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManagerTest.java new file mode 100644 index 00000000000..4e5823b7a7a --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManagerTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.metrics; + +import io.opentelemetry.api.common.Attributes; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RemotingMetricsManagerTest { + + private RemotingMetricsManager manager; + + @Before + public void setUp() { + manager = new RemotingMetricsManager(); + } + + @Test + public void testCacheHitSameArgs() { + Attributes a1 = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Attributes a2 = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Assert.assertSame("Same args should return cached instance", a1, a2); + } + + @Test + public void testDifferentRequestCode() { + Attributes a = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Attributes b = manager.getOrBuildAttributes(20, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Assert.assertNotSame("Different requestCode should produce different Attributes", a, b); + } + + @Test + public void testDifferentResponseCode() { + Attributes a = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Attributes b = manager.getOrBuildAttributes(10, 500, false, RemotingMetricsConstant.RESULT_SUCCESS); + Assert.assertNotSame("Different responseCode should produce different Attributes", a, b); + } + + @Test + public void testDifferentIsLongPolling() { + Attributes a = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Attributes b = manager.getOrBuildAttributes(10, 200, true, RemotingMetricsConstant.RESULT_SUCCESS); + Assert.assertNotSame("Different isLongPolling should produce different Attributes", a, b); + } + + @Test + public void testDifferentResult() { + Attributes success = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_SUCCESS); + Attributes oneway = manager.getOrBuildAttributes(10, 200, false, RemotingMetricsConstant.RESULT_ONEWAY); + Assert.assertNotSame("Different result should produce different Attributes", success, oneway); + } + + @Test + public void testUnknownResultNotCached() { + Attributes a1 = manager.getOrBuildAttributes(10, 200, false, "UNKNOWN_RESULT"); + Attributes a2 = manager.getOrBuildAttributes(10, 200, false, "UNKNOWN_RESULT"); + Assert.assertNotSame("Unknown result should not be cached (returns new instance each time)", a1, a2); + } + + @Test + public void testKnownResultsCached() { + String[] results = { + RemotingMetricsConstant.RESULT_SUCCESS, + RemotingMetricsConstant.RESULT_ONEWAY, + RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED, + RemotingMetricsConstant.RESULT_CANCELED + }; + for (String result : results) { + Attributes a1 = manager.getOrBuildAttributes(10, 200, false, result); + Attributes a2 = manager.getOrBuildAttributes(10, 200, false, result); + Assert.assertSame("Result " + result + " should be cached", a1, a2); + } + } +}