From c100a62cd708b709c283f73e7dd8f71a876b27fd Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Thu, 2 Jul 2026 17:35:03 +0800 Subject: [PATCH] [ISSUE #10575] Fix race condition between scanResponseTable and processResponseCommand processResponseCommand used get+remove (non-atomic) to retrieve ResponseFuture from responseTable. scanResponseTable used iterator.remove(). If the response arrived between scanResponseTable's remove and processResponseCommand's get, the response would be logged as 'not matched any request' and silently dropped. The callback would fire with timeout instead of success, even though the broker had successfully processed the request. Fix: Use ConcurrentHashMap.remove(key) in both methods. This is atomic: either processResponseCommand gets the future (and executes success callback), or scanResponseTable gets it (and executes timeout callback), but never both and never neither. --- .../remoting/netty/NettyRemotingAbstract.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index a735f8455d3..a40aab3ae59 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -467,12 +467,10 @@ private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingC */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); - final ResponseFuture responseFuture = responseTable.get(opaque); + final ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); - responseTable.remove(opaque); - if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { @@ -564,10 +562,12 @@ public void scanResponseTable() { ResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { - rep.release(); - it.remove(); - rfList.add(rep); - log.warn("remove timeout request, " + rep); + ResponseFuture removed = responseTable.remove(next.getKey()); + if (removed != null) { + removed.release(); + rfList.add(removed); + log.warn("remove timeout request, " + removed); + } } }