diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index 9e012254329..d34af1decbd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -105,6 +105,12 @@ public class ClientConfig { private int concurrentHeartbeatThreadPoolSize = Runtime.getRuntime().availableProcessors(); + /** + * When true, corrupted local offset files are ignored (logged at WARN level) instead of throwing an exception. + * The consumer starts with an empty offset table and falls back to CONSUME_FROM_TIMESTAMP or broker-side offset. + */ + private boolean localOffsetStoreIgnoreCorrupted = false; + /** * The switch for message trace */ @@ -246,6 +252,7 @@ public void resetClientConfig(final ClientConfig cc) { this.traceTopic = cc.traceTopic; this.enableConcurrentHeartbeat = cc.enableConcurrentHeartbeat; this.concurrentHeartbeatThreadPoolSize = cc.concurrentHeartbeatThreadPoolSize; + this.localOffsetStoreIgnoreCorrupted = cc.localOffsetStoreIgnoreCorrupted; } public ClientConfig cloneClientConfig() { @@ -280,6 +287,7 @@ public ClientConfig cloneClientConfig() { cc.traceTopic = traceTopic; cc.enableConcurrentHeartbeat = enableConcurrentHeartbeat; cc.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize; + cc.localOffsetStoreIgnoreCorrupted = localOffsetStoreIgnoreCorrupted; return cc; } @@ -549,6 +557,14 @@ public void setConcurrentHeartbeatThreadPoolSize(int concurrentHeartbeatThreadPo this.concurrentHeartbeatThreadPoolSize = concurrentHeartbeatThreadPoolSize; } + public boolean isLocalOffsetStoreIgnoreCorrupted() { + return localOffsetStoreIgnoreCorrupted; + } + + public void setLocalOffsetStoreIgnoreCorrupted(boolean localOffsetStoreIgnoreCorrupted) { + this.localOffsetStoreIgnoreCorrupted = localOffsetStoreIgnoreCorrupted; + } + @Override public String toString() { return "ClientConfig{" + diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 38b0a5be35b..200c6b7fb38 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -263,6 +263,10 @@ private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException { offsetSerializeWrapper = OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { + if (this.mQClientFactory.getClientConfig().isLocalOffsetStoreIgnoreCorrupted()) { + log.warn("readLocalOffsetBak: local offset file corrupted, ignoring per config. group={}", this.groupName, e); + return null; + } log.warn("readLocalOffset Exception", e); throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION),