Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 47 additions & 33 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -856,44 +856,52 @@ private boolean putMessagePositionInfo(final long offset, final int size, final

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {

if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
if (!mappedFile.hold()) {
log.warn("Failed to hold mapped file for ConsumeQueue write, topic={} queueId={}",
this.topic, this.queueId);
return false;
}
try {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}

if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}

if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.setMaxPhysicOffset(offset + size);
boolean appendResult;
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
} else {
appendResult = mappedFile.appendMessage(this.byteBufferIndex.array());
}
return appendResult;
} finally {
mappedFile.release();
}
this.setMaxPhysicOffset(offset + size);
boolean appendResult;
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
} else {
appendResult = mappedFile.appendMessage(this.byteBufferIndex.array());
}
return appendResult;
}
return false;
}
Expand Down Expand Up @@ -1271,7 +1279,13 @@ public void initializeWithOffset(long offset, long minPhyOffset) {

// transientStorePool is null, only need set wrote position here
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(offset * ConsumeQueue.CQ_STORE_UNIT_SIZE, true);
fillPreBlank(mappedFile, offset * ConsumeQueue.CQ_STORE_UNIT_SIZE);
if (mappedFile != null && mappedFile.hold()) {
try {
Comment thread
wang-jiahua marked this conversation as resolved.
fillPreBlank(mappedFile, offset * ConsumeQueue.CQ_STORE_UNIT_SIZE);
} finally {
mappedFile.release();
}
}
Comment on lines 1281 to +1288

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initializeWithOffset() should not silently continue when the mapped file cannot be created or held. This path relies on continuous ConsumeQueue files, and skipping fillPreBlank() while still calling flush(0) makes initialization look successful even though the pre-blank area was never written. Please fail fast here, or at least log an error and abort the initialization, so a file creation/availability problem does not leave the consume queue in an inconsistent state.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed in the latest commit — instead of silently skipping fillPreBlank(), the method now logs an error and returns early (aborts initialization) when mappedFile is null or hold() fails:

if (mappedFile == null) {
    log.error("initializeWithOffset failed: mappedFile is null for offset {}", offset);
    return;
}
if (!mappedFile.hold()) {
    log.error("initializeWithOffset failed: mappedFile hold() failed for offset {}", offset);
    return;
}

This ensures the consume queue is not left in an inconsistent state (no flush(0) called) when file creation/availability fails, while keeping the hold() reference to prevent the SIGSEGV.


flush(0);
}
Expand Down
Loading