From 39c92da60074f4f4c17446c5507561679c2765af Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Tue, 30 Jun 2026 13:11:57 +0800 Subject: [PATCH] [ISSUE #10518] Add client config to ignore corrupted local offset files When ~/.rocketmq_offsets/ files are corrupted (e.g., truncated due to power loss during persistAll()), readLocalOffsetBak() unconditionally throws MQClientException, crashing the consumer application. Fix: Add ClientConfig.localOffsetStoreIgnoreCorrupted (default false). When true, corrupted offset files are silently skipped with a warning log, and the consumer starts with an empty offset table (falling back to CONSUME_FROM_TIMESTAMP or broker-side offset). Default false preserves existing behavior (fully backward compatible). Changes: - ClientConfig: add localOffsetStoreIgnoreCorrupted field + getter/setter - LocalFileOffsetStore.readLocalOffsetBak(): check config before throwing --- .../org/apache/rocketmq/client/ClientConfig.java | 16 ++++++++++++++++ .../consumer/store/LocalFileOffsetStore.java | 4 ++++ 2 files changed, 20 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 9e012254329..d34af1decbd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -105,6 +105,12 @@ public class ClientConfig { private int concurrentHeartbeatThreadPoolSize = Runtime.getRuntime().availableProcessors(); + /** + * When true, corrupted local offset files are ignored (logged at WARN level) instead of throwing an exception. + * The consumer starts with an empty offset table and falls back to CONSUME_FROM_TIMESTAMP or broker-side offset. + */ + private boolean localOffsetStoreIgnoreCorrupted = false; + /** * The switch for message trace */ @@ -246,6 +252,7 @@ public void resetClientConfig(final ClientConfig cc) { this.traceTopic = cc.traceTopic; this.enableConcurrentHeartbeat = cc.enableConcurrentHeartbeat; this.concurrentHeartbeatThreadPoolSize = cc.concurrentHeartbeatThreadPoolSize; + this.localOffsetStoreIgnoreCorrupted = cc.localOffsetStoreIgnoreCorrupted; } public ClientConfig cloneClientConfig() { @@ -280,6 +287,7 @@ public ClientConfig cloneClientConfig() { cc.traceTopic = traceTopic; cc.enableConcurrentHeartbeat = enableConcurrentHeartbeat; cc.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize; + cc.localOffsetStoreIgnoreCorrupted = localOffsetStoreIgnoreCorrupted; return cc; } @@ -549,6 +557,14 @@ public void setConcurrentHeartbeatThreadPoolSize(int concurrentHeartbeatThreadPo this.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize; } + public boolean isLocalOffsetStoreIgnoreCorrupted() { + return localOffsetStoreIgnoreCorrupted; + } + + public void setLocalOffsetStoreIgnoreCorrupted(boolean localOffsetStoreIgnoreCorrupted) { + this.localOffsetStoreIgnoreCorrupted = localOffsetStoreIgnoreCorrupted; + } + @Override public String toString() { return "ClientConfig{" + diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 38b0a5be35b..200c6b7fb38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -263,6 +263,10 @@ private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException { offsetSerializeWrapper = OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { + if (this.mQClientFactory.getClientConfig().isLocalOffsetStoreIgnoreCorrupted()) { + log.warn("readLocalOffsetBak: local offset file corrupted, ignoring per config. group={}", this.groupName, e); + return null; + } log.warn("readLocalOffset Exception", e); throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION),