From 8a094a1eb43ddda99f6a9d53d31c2ba21b3f2d9f Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Tue, 30 Jun 2026 11:18:45 +0800 Subject: [PATCH] [ISSUE #10288] Hold MappedFile reference in ConsumeQueue write path to prevent SIGSEGV ConsumeQueue.putMessagePositionInfo() accesses MappedFile without holding a reference count. A concurrent cleanExpiredConsumeQueue can destroy() the MappedFile (unmapping its buffer) while the writer is still using it, causing SIGSEGV in JIT-compiled Unsafe.copyMemory. Fix: Call mappedFile.hold() before accessing the buffer and release() in a finally block, following the same pattern used in DefaultMappedFile.getData(). Also protect the fillPreBlank call in initializeWithOffset. Reported crash signatures: - Case 1 (v4.9.4): SEGV_MAPERR in ReputMessageService thread at ConsumeQueue.putMessagePositionInfo - Case 2 (v5.3.3): SEGV in AdminBrokerThread at ConsumeQueueStore.cleanExpired --- .../apache/rocketmq/store/ConsumeQueue.java | 80 +++++++++++-------- 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 0d698dacfe1..bab42bc9560 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -856,44 +856,52 @@ private boolean putMessagePositionInfo(final long offset, final int size, final MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { - - if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { - this.minLogicOffset = expectLogicOffset; - this.mappedFileQueue.setFlushedWhere(expectLogicOffset); - this.mappedFileQueue.setCommittedWhere(expectLogicOffset); - this.fillPreBlank(mappedFile, expectLogicOffset); - log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " - + mappedFile.getWrotePosition()); + if (!mappedFile.hold()) { + log.warn("Failed to hold mapped file for ConsumeQueue write, topic={} queueId={}", + this.topic, this.queueId); + return false; } + try { + if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { + this.minLogicOffset = expectLogicOffset; + this.mappedFileQueue.setFlushedWhere(expectLogicOffset); + this.mappedFileQueue.setCommittedWhere(expectLogicOffset); + this.fillPreBlank(mappedFile, expectLogicOffset); + log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + + mappedFile.getWrotePosition()); + } - if (cqOffset != 0) { - long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + if (cqOffset != 0) { + long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); - if (expectLogicOffset < currentLogicOffset) { - log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", - expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); - return true; - } + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } - if (expectLogicOffset != currentLogicOffset) { - LOG_ERROR.warn( - "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", - expectLogicOffset, - currentLogicOffset, - this.topic, - this.queueId, - expectLogicOffset - currentLogicOffset - ); + if (expectLogicOffset != currentLogicOffset) { + LOG_ERROR.warn( + "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, + currentLogicOffset, + this.topic, + this.queueId, + expectLogicOffset - currentLogicOffset + ); + } } + this.setMaxPhysicOffset(offset + size); + boolean appendResult; + if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { + appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array()); + } else { + appendResult = mappedFile.appendMessage(this.byteBufferIndex.array()); + } + return appendResult; + } finally { + mappedFile.release(); } - this.setMaxPhysicOffset(offset + size); - boolean appendResult; - if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { - appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array()); - } else { - appendResult = mappedFile.appendMessage(this.byteBufferIndex.array()); - } - return appendResult; } return false; } @@ -1271,7 +1279,13 @@ public void initializeWithOffset(long offset, long minPhyOffset) { // transientStorePool is null, only need set wrote position here MappedFile mappedFile = mappedFileQueue.getLastMappedFile(offset * ConsumeQueue.CQ_STORE_UNIT_SIZE, true); - fillPreBlank(mappedFile, offset * ConsumeQueue.CQ_STORE_UNIT_SIZE); + if (mappedFile != null && mappedFile.hold()) { + try { + fillPreBlank(mappedFile, offset * ConsumeQueue.CQ_STORE_UNIT_SIZE); + } finally { + mappedFile.release(); + } + } flush(0); }