SetResult set(
return configManager.set(key, configSerde, oldValue, newValue);
}
+ @Nullable
+ public byte[] getCurrentBytes(String key)
+ {
+ return configManager.getCurrentBytes(key);
+ }
+
+ public boolean isCompareAndSwapEnabled()
+ {
+ return configManager.isCompareAndSwapEnabled();
+ }
+
+ /**
+ * Set the config, optionally guarded by an {@code If-Match}-style
+ * precondition. When {@code ifMatchEtag} is {@code null}, behaves like
+ * {@link #set(String, Object, AuditInfo)}. Otherwise the write only commits
+ * if the currently stored bytes hash to {@code ifMatchEtag}; on mismatch the
+ * result reports {@link SetResult#isPreconditionFailed() preconditionFailed}.
+ *
+ * The precondition is enforced via metadata-store CAS, so conditional
+ * writes require {@code druid.manager.config.enableCompareAndSwap} to be
+ * true (the default). With CAS disabled, {@code If-Match} writes fail as a
+ * precondition failure instead of silently degrading to last-writer-wins.
+ */
+ public SetResult setIfMatch(
+ String key,
+ @Nullable String ifMatchEtag,
+ T newValue,
+ AuditInfo auditInfo
+ )
+ {
+ if (newValue == null) {
+ return SetResult.failure(new IllegalArgumentException("input obj is null"));
+ }
+ if (ifMatchEtag == null) {
+ return set(key, newValue, auditInfo);
+ }
+ if (!configManager.isCompareAndSwapEnabled()) {
+ return SetResult.preconditionFailed(
+ new IllegalStateException(
+ "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + key + "]"
+ )
+ );
+ }
+ final byte[] currentBytes = configManager.getCurrentBytes(key);
+ if (!ConfigEtag.matches(ifMatchEtag, currentBytes)) {
+ return SetResult.preconditionFailed(
+ new IllegalStateException("If-Match precondition failed for key[" + key + "]")
+ );
+ }
+ return casConflictAsPreconditionFailed(set(key, currentBytes, newValue, auditInfo), key);
+ }
+
+ /**
+ * Applies {@code updateOperator} to the config deserialized from the bytes
+ * used for {@code If-Match} validation and CAS. This is intended for partial
+ * updates whose output must be built from the same snapshot that the
+ * precondition protects.
+ */
+ public SetResult setIfMatch(
+ String key,
+ @Nullable String ifMatchEtag,
+ Class extends T> clazz,
+ @Nullable T defaultVal,
+ UnaryOperator updateOperator,
+ AuditInfo auditInfo
+ )
+ {
+ if (ifMatchEtag != null && !configManager.isCompareAndSwapEnabled()) {
+ return SetResult.preconditionFailed(
+ new IllegalStateException(
+ "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + key + "]"
+ )
+ );
+ }
+ final byte[] currentBytes = configManager.getCurrentBytes(key);
+ if (ifMatchEtag != null && !ConfigEtag.matches(ifMatchEtag, currentBytes)) {
+ return SetResult.preconditionFailed(
+ new IllegalStateException("If-Match precondition failed for key[" + key + "]")
+ );
+ }
+ final ConfigSerde configSerde = create(clazz, defaultVal);
+ final T currentValue = configSerde.deserialize(currentBytes);
+ final T newValue = updateOperator.apply(currentValue);
+ if (newValue == null) {
+ return SetResult.failure(new IllegalArgumentException("input obj is null"));
+ }
+ if (ifMatchEtag == null) {
+ return set(key, newValue, auditInfo);
+ }
+ return casConflictAsPreconditionFailed(set(key, currentBytes, newValue, auditInfo), key);
+ }
+
+ /**
+ * Maps a CAS-conflict ({@link SetResult#isRetryable() retryable}) outcome to a
+ * precondition failure. A conditional write that loses the CAS means another
+ * writer committed between our read and write, so the supplied {@code If-Match}
+ * no longer describes the stored value — the caller must re-read and retry.
+ */
+ private static SetResult casConflictAsPreconditionFailed(SetResult result, String key)
+ {
+ if (result.isRetryable()) {
+ return SetResult.preconditionFailed(
+ new IllegalStateException("If-Match precondition failed (concurrent update) for key[" + key + "]")
+ );
+ }
+ return result;
+ }
+
@VisibleForTesting
ConfigSerde create(final Class extends T> clazz, final T defaultVal)
{
diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java
index 54382f5274b6..e06702a4dba1 100644
--- a/processing/src/main/java/org/apache/druid/error/DruidException.java
+++ b/processing/src/main/java/org/apache/druid/error/DruidException.java
@@ -379,6 +379,11 @@ public enum Category
* the current state of the target resource.
*/
CONFLICT(409),
+ /**
+ * Indicates that the request could not be completed due to a conflict with
+ * headers supplied in the request and the current state of the target resource.
+ */
+ PRECONDITION_FAILED(412),
/**
* Means that some capacity limit was exceeded, this could be due to throttling or due to some system limit
*/
diff --git a/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java b/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java
new file mode 100644
index 000000000000..ab61d538e8e5
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common.config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class ConfigEtagTest
+{
+ @Test
+ public void testComputeIsDeterministic()
+ {
+ byte[] bytes = "{\"foo\":\"bar\"}".getBytes(StandardCharsets.UTF_8);
+ Assert.assertEquals(ConfigEtag.compute(bytes), ConfigEtag.compute(bytes));
+ }
+
+ @Test
+ public void testComputeDiffersForDifferentInput()
+ {
+ Assert.assertNotEquals(
+ ConfigEtag.compute("a".getBytes(StandardCharsets.UTF_8)),
+ ConfigEtag.compute("b".getBytes(StandardCharsets.UTF_8))
+ );
+ }
+
+ @Test
+ public void testComputeIsQuoted()
+ {
+ String etag = ConfigEtag.compute(new byte[]{1, 2, 3});
+ Assert.assertNotNull(etag);
+ Assert.assertTrue("ETag must be quoted: " + etag, etag.startsWith("\"") && etag.endsWith("\""));
+ }
+
+ @Test
+ public void testComputeNullInput()
+ {
+ Assert.assertNull(ConfigEtag.compute(null));
+ }
+
+ @Test
+ public void testMatchesNullHeaderAlwaysTrue()
+ {
+ Assert.assertTrue(ConfigEtag.matches(null, new byte[]{1, 2, 3}));
+ Assert.assertTrue(ConfigEtag.matches(null, null));
+ }
+
+ @Test
+ public void testMatchesWildcard()
+ {
+ Assert.assertTrue(ConfigEtag.matches("*", new byte[]{1, 2, 3}));
+ Assert.assertFalse("wildcard must not match absent value", ConfigEtag.matches("*", null));
+ }
+
+ @Test
+ public void testMatchesExact()
+ {
+ byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8);
+ String etag = ConfigEtag.compute(bytes);
+ Assert.assertTrue(ConfigEtag.matches(etag, bytes));
+ }
+
+ @Test
+ public void testMatchesMismatch()
+ {
+ byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8);
+ String wrongEtag = ConfigEtag.compute("other".getBytes(StandardCharsets.UTF_8));
+ Assert.assertFalse(ConfigEtag.matches(wrongEtag, bytes));
+ }
+
+ @Test
+ public void testMatchesAgainstNullCurrent()
+ {
+ String etag = ConfigEtag.compute(new byte[]{1});
+ Assert.assertFalse(ConfigEtag.matches(etag, null));
+ }
+
+ @Test
+ public void testMatchesCommaSeparatedList()
+ {
+ byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8);
+ String correct = ConfigEtag.compute(bytes);
+ String header = "\"bogus\", " + correct + ", \"another\"";
+ Assert.assertTrue(ConfigEtag.matches(header, bytes));
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
index c4cf97a236f5..0941822664fd 100644
--- a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
+++ b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
@@ -36,6 +36,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(MockitoExtension.class)
public class JacksonConfigManagerTest
@@ -77,6 +78,243 @@ public void testSet()
Assertions.assertNotNull(auditCapture.getValue());
}
+ @Test
+ public void testSetIfMatchNullEtagDelegatesToUnconditionalSet()
+ {
+ String key = "key";
+ TestConfig val = new TestConfig("v", "s", 1);
+ AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ Mockito.when(mockConfigManager.set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.isNull(),
+ Mockito.eq(val)
+ )).thenReturn(ConfigManager.SetResult.ok());
+
+ ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, null, val, auditInfo);
+
+ Assertions.assertTrue(result.isOk());
+ Mockito.verify(mockConfigManager).set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.isNull(),
+ Mockito.eq(val)
+ );
+ }
+
+ @Test
+ public void testSetIfMatchPreconditionPassesForMatchingEtag()
+ {
+ String key = "key";
+ TestConfig val = new TestConfig("v", "s", 1);
+ AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ byte[] currentBytes = "current".getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true);
+ Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes);
+ Mockito.when(mockConfigManager.set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.eq(currentBytes),
+ Mockito.eq(val)
+ )).thenReturn(ConfigManager.SetResult.ok());
+
+ String etag = ConfigEtag.compute(currentBytes);
+ ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, etag, val, auditInfo);
+
+ Assertions.assertTrue(result.isOk());
+ Assertions.assertFalse(result.isPreconditionFailed());
+ Mockito.verify(mockConfigManager).set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.eq(currentBytes),
+ Mockito.eq(val)
+ );
+ }
+
+ @Test
+ public void testSetIfMatchPreconditionFailsForMismatchedEtag()
+ {
+ String key = "key";
+ TestConfig val = new TestConfig("v", "s", 1);
+ AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true);
+ Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn("current".getBytes(java.nio.charset.StandardCharsets.UTF_8));
+
+ ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(
+ key,
+ ConfigEtag.compute("stale".getBytes(java.nio.charset.StandardCharsets.UTF_8)),
+ val,
+ auditInfo
+ );
+
+ Assertions.assertFalse(result.isOk());
+ Assertions.assertTrue(result.isPreconditionFailed());
+ Mockito.verify(mockConfigManager, Mockito.never()).set(
+ Mockito.anyString(),
+ Mockito.any(ConfigSerde.class),
+ Mockito.any(byte[].class),
+ Mockito.any()
+ );
+ }
+
+ @Test
+ public void testSetIfMatchPreconditionFailsWhenCompareAndSwapDisabled()
+ {
+ final String key = "key";
+ final TestConfig val = new TestConfig("v", "s", 1);
+ final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(false);
+
+ final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, "\"etag\"", val, auditInfo);
+
+ Assertions.assertFalse(result.isOk());
+ Assertions.assertTrue(result.isPreconditionFailed());
+ Mockito.verify(mockConfigManager, Mockito.never()).getCurrentBytes(Mockito.anyString());
+ Mockito.verify(mockConfigManager, Mockito.never()).set(
+ Mockito.anyString(),
+ Mockito.any(ConfigSerde.class),
+ Mockito.any(byte[].class),
+ Mockito.any()
+ );
+ Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class));
+ }
+
+ @Test
+ public void testSetIfMatchTransformBuildsNewValueFromMatchedBytes()
+ {
+ final String key = "key";
+ final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ final TestConfig current = new TestConfig("v1", "current", 1);
+ final TestConfig updated = new TestConfig("v1", "updated", 2);
+ final ConfigSerde serde = jacksonConfigManager.create(TestConfig.class, null);
+ final byte[] currentBytes = serde.serialize(current);
+
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true);
+ Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes);
+ Mockito.when(mockConfigManager.set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.eq(currentBytes),
+ Mockito.eq(updated)
+ )).thenReturn(ConfigManager.SetResult.ok());
+
+ final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(
+ key,
+ ConfigEtag.compute(currentBytes),
+ TestConfig.class,
+ null,
+ currentValue -> new TestConfig(currentValue.getVersion(), "updated", currentValue.getSettingInt() + 1),
+ auditInfo
+ );
+
+ Assertions.assertTrue(result.isOk());
+ Mockito.verify(mockConfigManager).set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.eq(currentBytes),
+ Mockito.eq(updated)
+ );
+ }
+
+ @Test
+ public void testSetIfMatchTransformDoesNotApplyUpdateForMismatchedEtag()
+ {
+ final String key = "key";
+ final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ final TestConfig current = new TestConfig("v1", "current", 1);
+ final ConfigSerde serde = jacksonConfigManager.create(TestConfig.class, null);
+ final byte[] currentBytes = serde.serialize(current);
+ final AtomicBoolean updateCalled = new AtomicBoolean(false);
+
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true);
+ Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes);
+
+ final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(
+ key,
+ ConfigEtag.compute("stale".getBytes(java.nio.charset.StandardCharsets.UTF_8)),
+ TestConfig.class,
+ null,
+ currentValue -> {
+ updateCalled.set(true);
+ return currentValue;
+ },
+ auditInfo
+ );
+
+ Assertions.assertFalse(result.isOk());
+ Assertions.assertTrue(result.isPreconditionFailed());
+ Assertions.assertFalse(updateCalled.get());
+ Mockito.verify(mockConfigManager, Mockito.never()).set(
+ Mockito.anyString(),
+ Mockito.any(ConfigSerde.class),
+ Mockito.any(byte[].class),
+ Mockito.any()
+ );
+ Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class));
+ }
+
+ @Test
+ public void testSetIfMatchTransformPreconditionFailsWhenCompareAndSwapDisabled()
+ {
+ final String key = "key";
+ final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ final AtomicBoolean updateCalled = new AtomicBoolean(false);
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(false);
+
+ final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(
+ key,
+ "\"etag\"",
+ TestConfig.class,
+ null,
+ currentValue -> {
+ updateCalled.set(true);
+ return currentValue;
+ },
+ auditInfo
+ );
+
+ Assertions.assertFalse(result.isOk());
+ Assertions.assertTrue(result.isPreconditionFailed());
+ Assertions.assertFalse(updateCalled.get());
+ Mockito.verify(mockConfigManager, Mockito.never()).getCurrentBytes(Mockito.anyString());
+ Mockito.verify(mockConfigManager, Mockito.never()).set(
+ Mockito.anyString(),
+ Mockito.any(ConfigSerde.class),
+ Mockito.any(byte[].class),
+ Mockito.any()
+ );
+ Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class));
+ }
+
+ @Test
+ public void testSetIfMatchCasConflictReturnsPreconditionFailed()
+ {
+ final String key = "key";
+ final TestConfig val = new TestConfig("v", "s", 1);
+ final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip");
+ final byte[] currentBytes = "current".getBytes(java.nio.charset.StandardCharsets.UTF_8);
+
+ Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true);
+ Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes);
+ Mockito.when(mockConfigManager.set(
+ Mockito.eq(key),
+ Mockito.any(ConfigSerde.class),
+ Mockito.eq(currentBytes),
+ Mockito.eq(val)
+ )).thenReturn(ConfigManager.SetResult.retryableFailure(new IllegalStateException("Config value has changed")));
+
+ final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(
+ key,
+ ConfigEtag.compute(currentBytes),
+ val,
+ auditInfo
+ );
+
+ Assertions.assertFalse(result.isOk());
+ Assertions.assertTrue(result.isPreconditionFailed());
+ Assertions.assertFalse(result.isRetryable());
+ }
+
@Test
public void testConvertByteToConfigWithNullConfigInByte()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java
index a047bf99070d..e906c846c1eb 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java
@@ -26,6 +26,7 @@
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigEtag;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.Configs;
import org.apache.druid.common.config.JacksonConfigManager;
@@ -87,15 +88,43 @@ public CoordinatorDynamicConfig getCurrentDynamicConfig()
return Preconditions.checkNotNull(dynamicConfig, "Got null config from watcher?!");
}
- public ConfigManager.SetResult setDynamicConfig(CoordinatorDynamicConfig config, AuditInfo auditInfo)
+ public CoordinatorDynamicConfig convertBytesToDynamicConfig(@Nullable byte[] bytes)
{
- return jacksonConfigManager.set(
+ return jacksonConfigManager.convertByteToConfig(
+ bytes,
+ CoordinatorDynamicConfig.class,
+ CoordinatorDynamicConfig.builder().build()
+ );
+ }
+
+ public ConfigManager.SetResult updateDynamicConfig(
+ UnaryOperator operator,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
+ {
+ return jacksonConfigManager.setIfMatch(
CoordinatorDynamicConfig.CONFIG_KEY,
- config,
+ ifMatchEtag,
+ CoordinatorDynamicConfig.class,
+ CoordinatorDynamicConfig.builder().build(),
+ operator,
auditInfo
);
}
+ @Nullable
+ public byte[] getCurrentDynamicConfigBytes()
+ {
+ return jacksonConfigManager.getCurrentBytes(CoordinatorDynamicConfig.CONFIG_KEY);
+ }
+
+ @Nullable
+ public byte[] getCurrentCompactionConfigBytes()
+ {
+ return jacksonConfigManager.getCurrentBytes(DruidCompactionConfig.CONFIG_KEY);
+ }
+
public DruidCompactionConfig getCurrentCompactionConfig()
{
DruidCompactionConfig config = jacksonConfigManager.watch(
@@ -120,6 +149,24 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig(
AuditInfo auditInfo
)
{
+ return getAndUpdateCompactionConfig(operator, null, auditInfo);
+ }
+
+ public ConfigManager.SetResult getAndUpdateCompactionConfig(
+ UnaryOperator operator,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
+ {
+ if (ifMatchEtag != null && !jacksonConfigManager.isCompareAndSwapEnabled()) {
+ return ConfigManager.SetResult.preconditionFailed(
+ new IllegalStateException(
+ "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key["
+ + DruidCompactionConfig.CONFIG_KEY
+ + "]"
+ )
+ );
+ }
// Fetch the bytes and use to build the current config and perform compare-and-swap.
// This avoids failures in ConfigManager while updating configs previously
// persisted by older versions of Druid which didn't have fields such as 'granularitySpec'.
@@ -129,22 +176,35 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig(
MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN,
DruidCompactionConfig.CONFIG_KEY
);
+ if (ifMatchEtag != null && !ConfigEtag.matches(ifMatchEtag, currentBytes)) {
+ return ConfigManager.SetResult.preconditionFailed(
+ new IllegalStateException("If-Match precondition failed for compaction config")
+ );
+ }
DruidCompactionConfig current = convertBytesToCompactionConfig(currentBytes);
DruidCompactionConfig updated = operator.apply(current);
if (current.equals(updated)) {
return ConfigManager.SetResult.ok();
- } else {
- return jacksonConfigManager.set(
- DruidCompactionConfig.CONFIG_KEY,
- currentBytes,
- updated,
- auditInfo
+ }
+ final ConfigManager.SetResult result = jacksonConfigManager.set(
+ DruidCompactionConfig.CONFIG_KEY,
+ currentBytes,
+ updated,
+ auditInfo
+ );
+ // Under If-Match, a lost CAS means a concurrent writer committed between our
+ // read and write: the precondition no longer holds, so report it as such
+ // rather than as a retryable failure (conditional writes must not auto-retry).
+ if (ifMatchEtag != null && result.isRetryable()) {
+ return ConfigManager.SetResult.preconditionFailed(
+ new IllegalStateException("If-Match precondition failed (concurrent update) for compaction config")
);
}
+ return result;
}
- public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes)
+ public DruidCompactionConfig convertBytesToCompactionConfig(@Nullable byte[] bytes)
{
return jacksonConfigManager.convertByteToConfig(
bytes,
@@ -158,6 +218,18 @@ public boolean updateCompactionTaskSlots(
@Nullable Integer maxCompactionTaskSlots,
AuditInfo auditInfo
)
+ {
+ return verifyCompactionUpdateSucceeded(
+ updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, null, auditInfo)
+ );
+ }
+
+ public ConfigManager.SetResult updateCompactionTaskSlots(
+ @Nullable Double compactionTaskSlotRatio,
+ @Nullable Integer maxCompactionTaskSlots,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
{
UnaryOperator operator = current -> {
final ClusterCompactionConfig currentClusterConfig = current.clusterConfig();
@@ -173,16 +245,25 @@ public boolean updateCompactionTaskSlots(
return current.withClusterConfig(updatedClusterConfig);
};
- return updateConfigHelper(operator, auditInfo);
+ return updateConfigHelper(operator, ifMatchEtag, auditInfo);
}
public boolean updateClusterCompactionConfig(
ClusterCompactionConfig config,
AuditInfo auditInfo
)
+ {
+ return verifyCompactionUpdateSucceeded(updateClusterCompactionConfig(config, null, auditInfo));
+ }
+
+ public ConfigManager.SetResult updateClusterCompactionConfig(
+ ClusterCompactionConfig config,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
{
UnaryOperator operator = current -> current.withClusterConfig(config);
- return updateConfigHelper(operator, auditInfo);
+ return updateConfigHelper(operator, ifMatchEtag, auditInfo);
}
public ClusterCompactionConfig getClusterCompactionConfig()
@@ -194,9 +275,18 @@ public boolean updateDatasourceCompactionConfig(
DataSourceCompactionConfig config,
AuditInfo auditInfo
)
+ {
+ return verifyCompactionUpdateSucceeded(updateDatasourceCompactionConfig(config, null, auditInfo));
+ }
+
+ public ConfigManager.SetResult updateDatasourceCompactionConfig(
+ DataSourceCompactionConfig config,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
{
UnaryOperator callable = current -> current.withDatasourceConfig(config);
- return updateConfigHelper(callable, auditInfo);
+ return updateConfigHelper(callable, ifMatchEtag, auditInfo);
}
public DataSourceCompactionConfig getDatasourceCompactionConfig(String dataSource)
@@ -214,6 +304,15 @@ public boolean deleteDatasourceCompactionConfig(
String dataSource,
AuditInfo auditInfo
)
+ {
+ return verifyCompactionUpdateSucceeded(deleteDatasourceCompactionConfig(dataSource, null, auditInfo));
+ }
+
+ public ConfigManager.SetResult deleteDatasourceCompactionConfig(
+ String dataSource,
+ @Nullable String ifMatchEtag,
+ AuditInfo auditInfo
+ )
{
UnaryOperator callable = current -> {
final Map configs = current.dataSourceToCompactionConfigMap();
@@ -224,7 +323,7 @@ public boolean deleteDatasourceCompactionConfig(
return current.withDatasourceConfigs(List.copyOf(configs.values()));
};
- return updateConfigHelper(callable, auditInfo);
+ return updateConfigHelper(callable, ifMatchEtag, auditInfo);
}
public List getCompactionConfigHistory(
@@ -266,20 +365,24 @@ public List getCompactionConfigHistory(
}
}
- private boolean updateConfigHelper(
+ private ConfigManager.SetResult updateConfigHelper(
UnaryOperator configUpdateOperator,
+ @Nullable String ifMatchEtag,
AuditInfo auditInfo
)
{
- int attemps = 0;
+ int attempts = 0;
ConfigManager.SetResult setResult = null;
+ // Only an unconditional write can be retried: getAndUpdateCompactionConfig converts a lost
+ // CAS under If-Match into a (non-retryable) precondition failure, so conditional writes exit
+ // this loop on the first attempt rather than silently retrying past the caller's precondition.
try {
- while (attemps < MAX_UPDATE_RETRIES) {
- setResult = getAndUpdateCompactionConfig(configUpdateOperator, auditInfo);
+ while (attempts < MAX_UPDATE_RETRIES) {
+ setResult = getAndUpdateCompactionConfig(configUpdateOperator, ifMatchEtag, auditInfo);
if (setResult.isOk() || !setResult.isRetryable()) {
break;
}
- attemps++;
+ attempts++;
updateRetryDelay();
}
}
@@ -294,9 +397,40 @@ private boolean updateConfigHelper(
);
}
+ if (setResult.isOk() || setResult.isPreconditionFailed()) {
+ return setResult;
+ }
+
+ // getAndUpdateCompactionConfig already converts a lost CAS under If-Match into
+ // a precondition failure, so a retryable result here is only a genuine
+ // (non-conditional) CAS conflict that exhausted its retries.
+ if (setResult.getException() instanceof NoSuchElementException) {
+ log.warn(setResult.getException(), "Update compaction config failed");
+ throw NotFound.exception(
+ Throwables.getRootCause(setResult.getException()),
+ "Compaction config does not exist"
+ );
+ } else {
+ log.warn(setResult.getException(), "Update compaction config failed");
+ throw InternalServerError.exception(
+ Throwables.getRootCause(setResult.getException()),
+ "Failed to perform operation on compaction config"
+ );
+ }
+ }
+
+ private boolean verifyCompactionUpdateSucceeded(ConfigManager.SetResult setResult)
+ {
if (setResult.isOk()) {
return true;
- } else if (setResult.getException() instanceof NoSuchElementException) {
+ }
+ if (setResult.isPreconditionFailed()) {
+ log.info("If-Match precondition failed on compaction config update");
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.PRECONDITION_FAILED)
+ .build(setResult.getException().getMessage());
+ }
+ if (setResult.getException() instanceof NoSuchElementException) {
log.warn(setResult.getException(), "Update compaction config failed");
throw NotFound.exception(
Throwables.getRootCause(setResult.getException()),
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java
index f023b26e31c4..0b49cc3a4f88 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java
@@ -24,8 +24,8 @@
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.broker.BrokerDynamicConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
@@ -41,17 +41,13 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import java.util.concurrent.atomic.AtomicReference;
@Path("/druid/coordinator/v1/broker/config")
@ResourceFilters(ConfigResourceFilter.class)
public class CoordinatorBrokerConfigsResource
{
- private static final Logger log = new Logger(CoordinatorBrokerConfigsResource.class);
-
private final JacksonConfigManager configManager;
private final AuditManager auditManager;
- private final AtomicReference currentConfig;
private final BrokerDynamicConfigSyncer brokerDynamicConfigSyncer;
@Inject
@@ -63,11 +59,6 @@ public CoordinatorBrokerConfigsResource(
{
this.configManager = configManager;
this.auditManager = auditManager;
- this.currentConfig = configManager.watch(
- BrokerDynamicConfig.CONFIG_KEY,
- BrokerDynamicConfig.class,
- BrokerDynamicConfig.builder().build()
- );
this.brokerDynamicConfigSyncer = brokerDynamicConfigSyncer;
}
@@ -75,7 +66,14 @@ public CoordinatorBrokerConfigsResource(
@Produces(MediaType.APPLICATION_JSON)
public Response getBrokerDynamicConfig()
{
- return Response.ok(currentConfig.get()).build();
+ return DynamicConfigEtagHelper.buildReadResponseWithEtag(
+ () -> configManager.getCurrentBytes(BrokerDynamicConfig.CONFIG_KEY),
+ currentBytes -> configManager.convertByteToConfig(
+ currentBytes,
+ BrokerDynamicConfig.class,
+ BrokerDynamicConfig.builder().build()
+ )
+ );
}
@POST
@@ -86,12 +84,12 @@ public Response setBrokerDynamicConfig(
)
{
try {
- BrokerDynamicConfig current = currentConfig.get();
- BrokerDynamicConfig newConfig = configBuilder.build(current);
-
- final SetResult setResult = configManager.set(
+ final SetResult setResult = configManager.setIfMatch(
BrokerDynamicConfig.CONFIG_KEY,
- newConfig,
+ DynamicConfigEtagHelper.getIfMatch(req),
+ BrokerDynamicConfig.class,
+ BrokerDynamicConfig.builder().build(),
+ configBuilder::build,
AuthorizationUtils.buildAuditInfo(req)
);
@@ -99,11 +97,12 @@ public Response setBrokerDynamicConfig(
brokerDynamicConfigSyncer.queueBroadcastConfigToBrokers();
return Response.ok().build();
} else {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(ServletResourceUtils.sanitizeException(setResult.getException()))
- .build();
+ return DynamicConfigEtagHelper.toErrorResponse(setResult);
}
}
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(e))
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index 561670ae628b..d9bf398abeaf 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -22,6 +22,7 @@
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditInfo;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
@@ -60,8 +61,9 @@ public CoordinatorCompactionConfigsResource(
@Produces(MediaType.APPLICATION_JSON)
public Response getCompactionConfig()
{
- return ServletResourceUtils.buildReadResponse(
- configManager::getCurrentCompactionConfig
+ return DynamicConfigEtagHelper.buildReadResponseWithEtag(
+ configManager::getCurrentCompactionConfigBytes,
+ configManager::convertBytesToCompactionConfig
);
}
@@ -83,10 +85,20 @@ public Response setCompactionTaskLimit(
return ServletResourceUtils.buildUpdateResponse(() -> true);
}
- final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
- return ServletResourceUtils.buildUpdateResponse(
- () -> configManager.updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, auditInfo)
- );
+ try {
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
+ return DynamicConfigEtagHelper.buildSetResultUpdateResponse(
+ configManager.updateCompactionTaskSlots(
+ compactionTaskSlotRatio,
+ maxCompactionTaskSlots,
+ DynamicConfigEtagHelper.getIfMatch(req),
+ auditInfo
+ )
+ );
+ }
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
}
@POST
@@ -97,15 +109,24 @@ public Response addOrUpdateDatasourceCompactionConfig(
@Context HttpServletRequest req
)
{
- final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
- return ServletResourceUtils.buildUpdateResponse(() -> {
+ try {
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
if (newConfig.getEngine() == CompactionEngine.MSQ) {
throw InvalidInput.exception(
"MSQ engine is supported only with supervisor-based compaction on the Overlord."
);
}
- return configManager.updateDatasourceCompactionConfig(newConfig, auditInfo);
- });
+ return DynamicConfigEtagHelper.buildSetResultUpdateResponse(
+ configManager.updateDatasourceCompactionConfig(
+ newConfig,
+ DynamicConfigEtagHelper.getIfMatch(req),
+ auditInfo
+ )
+ );
+ }
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
}
@GET
@@ -140,9 +161,18 @@ public Response deleteCompactionConfig(
@Context HttpServletRequest req
)
{
- final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
- return ServletResourceUtils.buildUpdateResponse(
- () -> configManager.deleteDatasourceCompactionConfig(dataSource, auditInfo)
- );
+ try {
+ final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req);
+ return DynamicConfigEtagHelper.buildSetResultUpdateResponse(
+ configManager.deleteDatasourceCompactionConfig(
+ dataSource,
+ DynamicConfigEtagHelper.getIfMatch(req),
+ auditInfo
+ )
+ );
+ }
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
}
}
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java
index 93feb328a8c2..da6d8adeb7be 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java
@@ -22,6 +22,7 @@
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
@@ -74,7 +75,10 @@ public CoordinatorDynamicConfigsResource(
@Produces(MediaType.APPLICATION_JSON)
public Response getDynamicConfigs()
{
- return Response.ok(manager.getCurrentDynamicConfig()).build();
+ return DynamicConfigEtagHelper.buildReadResponseWithEtag(
+ manager::getCurrentDynamicConfigBytes,
+ manager::convertBytesToDynamicConfig
+ );
}
// default value is used for backwards compatibility
@@ -86,10 +90,9 @@ public Response setDynamicConfigs(
)
{
try {
- CoordinatorDynamicConfig current = manager.getCurrentDynamicConfig();
-
- final SetResult setResult = manager.setDynamicConfig(
- dynamicConfigBuilder.build(current),
+ final SetResult setResult = manager.updateDynamicConfig(
+ dynamicConfigBuilder::build,
+ DynamicConfigEtagHelper.getIfMatch(req),
AuthorizationUtils.buildAuditInfo(req)
);
@@ -97,11 +100,12 @@ public Response setDynamicConfigs(
coordinatorDynamicConfigSyncer.queueBroadcastConfigToBrokers();
return Response.ok().build();
} else {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(ServletResourceUtils.sanitizeException(setResult.getException()))
- .build();
+ return DynamicConfigEtagHelper.toErrorResponse(setResult);
}
}
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ServletResourceUtils.sanitizeException(e))
diff --git a/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java
new file mode 100644
index 000000000000..60cbed026c9a
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.base.Throwables;
+import org.apache.druid.common.config.ConfigEtag;
+import org.apache.druid.common.config.ConfigManager.SetResult;
+
+import java.util.Map;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.Nullable;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * HTTP-layer helpers for {@code ETag} / {@code If-Match} on dynamic-config endpoints.
+ */
+public final class DynamicConfigEtagHelper
+{
+ private DynamicConfigEtagHelper()
+ {
+ }
+
+ /**
+ * Read the {@code If-Match} header. Returns {@code null} when absent. Throws
+ * 400 if present but blank (RFC 7232 §3.1 requires a non-empty value).
+ */
+ @Nullable
+ public static String getIfMatch(HttpServletRequest req)
+ {
+ if (req == null) {
+ return null;
+ }
+ final String header = req.getHeader(HttpHeaders.IF_MATCH);
+ if (header == null) {
+ return null;
+ }
+ if (header.trim().isEmpty()) {
+ throw InvalidInput.exception("If-Match header must not be blank");
+ }
+ return header;
+ }
+
+ /** Attach an {@code ETag} header derived from {@code bytes}; no-op if {@code bytes} is null. */
+ public static Response.ResponseBuilder withEtag(Response.ResponseBuilder builder, @Nullable byte[] bytes)
+ {
+ final String etag = ConfigEtag.compute(bytes);
+ if (etag != null) {
+ builder.header(HttpHeaders.ETAG, etag);
+ }
+ return builder;
+ }
+
+ /**
+ * Build a read response whose entity and ETag are both derived from one read
+ * of the stored config bytes.
+ */
+ public static Response buildReadResponseWithEtag(
+ Supplier bytesSupplier,
+ Function entitySupplier
+ )
+ {
+ try {
+ final byte[] currentBytes = bytesSupplier.get();
+ return withEtag(Response.ok(entitySupplier.apply(currentBytes)), currentBytes).build();
+ }
+ catch (DruidException e) {
+ return ServletResourceUtils.buildErrorResponseFrom(e);
+ }
+ catch (Exception e) {
+ return ServletResourceUtils.buildErrorResponseFrom(
+ InternalServerError.exception(Throwables.getRootCause(e), "Unknown error occurred")
+ );
+ }
+ }
+
+ /**
+ * Map a failed {@link SetResult} to 412 (precondition failed) or 400.
+ * Callers handle the success case themselves.
+ */
+ public static Response toErrorResponse(SetResult result)
+ {
+ final Response.Status status = result.isPreconditionFailed()
+ ? Response.Status.PRECONDITION_FAILED
+ : Response.Status.BAD_REQUEST;
+ return Response.status(status)
+ .entity(ServletResourceUtils.sanitizeException(result.getException()))
+ .build();
+ }
+
+ /**
+ * Build a write response from a {@link SetResult}, using the same success and
+ * error shapes as other dynamic-config endpoints.
+ */
+ public static Response buildSetResultUpdateResponse(SetResult result)
+ {
+ if (result.isOk()) {
+ return Response.ok(Map.of("success", true)).build();
+ }
+ return toErrorResponse(result);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java
index 543f2b7bb9bb..dd2c61c32f67 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java
@@ -22,17 +22,24 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigEtag;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.error.ErrorResponse;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.server.broker.BrokerDynamicConfig;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
-import java.util.concurrent.atomic.AtomicReference;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.function.UnaryOperator;
public class CoordinatorBrokerConfigsResourceTest
{
@@ -51,16 +58,21 @@ public void setUp() throws Exception
@Test
public void testGetBrokerDynamicConfig()
{
- BrokerDynamicConfig config = BrokerDynamicConfig.builder().build();
- AtomicReference currentConfig = new AtomicReference<>(config);
-
+ final BrokerDynamicConfig config = BrokerDynamicConfig.builder()
+ .withQueryContext(QueryContext.of(Map.of("priority", 5)))
+ .build();
+ final byte[] currentBytes = "current-broker-config".getBytes(StandardCharsets.UTF_8);
+
+ EasyMock.expect(configManager.getCurrentBytes(BrokerDynamicConfig.CONFIG_KEY))
+ .andReturn(currentBytes)
+ .once();
EasyMock.expect(
- configManager.watch(
- EasyMock.anyObject(String.class),
- EasyMock.anyObject(Class.class),
+ configManager.convertByteToConfig(
+ EasyMock.aryEq(currentBytes),
+ EasyMock.eq(BrokerDynamicConfig.class),
EasyMock.anyObject(BrokerDynamicConfig.class)
)
- ).andReturn(currentConfig).once();
+ ).andReturn(config).once();
EasyMock.replay(configManager, auditManager, brokerDynamicConfigSyncer);
@@ -72,6 +84,7 @@ public void testGetBrokerDynamicConfig()
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(config, response.getEntity());
+ Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG));
EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer);
}
@@ -79,17 +92,6 @@ public void testGetBrokerDynamicConfig()
@Test
public void testGetBrokerDynamicConfigHistory()
{
- BrokerDynamicConfig config = BrokerDynamicConfig.builder().build();
- AtomicReference currentConfig = new AtomicReference<>(config);
-
- EasyMock.expect(
- configManager.watch(
- EasyMock.anyObject(String.class),
- EasyMock.anyObject(Class.class),
- EasyMock.anyObject(BrokerDynamicConfig.class)
- )
- ).andReturn(currentConfig).once();
-
EasyMock.expect(
auditManager.fetchAuditHistory(
EasyMock.anyObject(String.class),
@@ -106,7 +108,7 @@ public void testGetBrokerDynamicConfigHistory()
brokerDynamicConfigSyncer
);
- Response response = resource.getBrokerDynamicConfigHistory(null, 10);
+ final Response response = resource.getBrokerDynamicConfigHistory(null, 10);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer);
@@ -115,17 +117,6 @@ public void testGetBrokerDynamicConfigHistory()
@Test
public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount()
{
- BrokerDynamicConfig config = BrokerDynamicConfig.builder().build();
- AtomicReference currentConfig = new AtomicReference<>(config);
-
- EasyMock.expect(
- configManager.watch(
- EasyMock.anyObject(String.class),
- EasyMock.anyObject(Class.class),
- EasyMock.anyObject(BrokerDynamicConfig.class)
- )
- ).andReturn(currentConfig).once();
-
EasyMock.expect(
auditManager.fetchAuditHistory(
EasyMock.anyObject(String.class),
@@ -142,7 +133,7 @@ public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount()
brokerDynamicConfigSyncer
);
- Response response = resource.getBrokerDynamicConfigHistory(null, null);
+ final Response response = resource.getBrokerDynamicConfigHistory(null, null);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer);
@@ -151,22 +142,20 @@ public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount()
@Test
public void testSetBrokerDynamicConfig()
{
- BrokerDynamicConfig config = BrokerDynamicConfig.builder().build();
- AtomicReference currentConfig = new AtomicReference<>(config);
- HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class);
+ final BrokerDynamicConfig currentConfig = BrokerDynamicConfig.builder()
+ .withQueryContext(QueryContext.of(Map.of("priority", 5)))
+ .build();
+ final BrokerDynamicConfig.Builder updateBuilder = BrokerDynamicConfig.builder();
+ final Capture> updateCapture = EasyMock.newCapture();
+ final HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class);
EasyMock.expect(
- configManager.watch(
- EasyMock.anyObject(String.class),
- EasyMock.anyObject(Class.class),
- EasyMock.anyObject(BrokerDynamicConfig.class)
- )
- ).andReturn(currentConfig).once();
-
- EasyMock.expect(
- configManager.set(
- EasyMock.anyObject(String.class),
+ configManager.setIfMatch(
+ EasyMock.eq(BrokerDynamicConfig.CONFIG_KEY),
+ (String) EasyMock.isNull(),
+ EasyMock.eq(BrokerDynamicConfig.class),
EasyMock.anyObject(BrokerDynamicConfig.class),
+ EasyMock.capture(updateCapture),
EasyMock.anyObject(AuditInfo.class)
)
).andReturn(SetResult.ok()).once();
@@ -182,9 +171,36 @@ public void testSetBrokerDynamicConfig()
brokerDynamicConfigSyncer
);
- Response response = resource.setBrokerDynamicConfig(BrokerDynamicConfig.builder(), request);
+ final Response response = resource.setBrokerDynamicConfig(updateBuilder, request);
Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(updateBuilder.build(currentConfig), updateCapture.getValue().apply(currentConfig));
EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer);
}
+
+ @Test
+ public void testSetBrokerDynamicConfigWithBlankIfMatchReturnsBadRequest()
+ {
+ final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
+
+ EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once();
+
+ EasyMock.replay(configManager, auditManager, brokerDynamicConfigSyncer, request);
+
+ final CoordinatorBrokerConfigsResource resource = new CoordinatorBrokerConfigsResource(
+ configManager,
+ auditManager,
+ brokerDynamicConfigSyncer
+ );
+
+ final Response response = resource.setBrokerDynamicConfig(BrokerDynamicConfig.builder(), request);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
+ Assert.assertEquals(
+ "If-Match header must not be blank",
+ ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
+ );
+
+ EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer, request);
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index e96d800bec87..c3538b977c19 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -26,6 +26,7 @@
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientMSQContext;
+import org.apache.druid.common.config.ConfigEtag;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.config.TestConfigManagerConfig;
@@ -58,7 +59,9 @@
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -109,6 +112,26 @@ public void testGetDefaultClusterConfig()
Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine());
}
+ @Test
+ public void testGetCompactionConfigDerivesBodyAndEtagFromSameBytes()
+ {
+ final CoordinatorConfigManager mockConfigManager = Mockito.mock(CoordinatorConfigManager.class);
+ final CoordinatorCompactionConfigsResource mockResource =
+ new CoordinatorCompactionConfigsResource(mockConfigManager);
+ final byte[] currentBytes = "current-compaction-config".getBytes(StandardCharsets.UTF_8);
+ final DruidCompactionConfig expectedConfig = DruidCompactionConfig.empty();
+
+ Mockito.when(mockConfigManager.getCurrentCompactionConfigBytes()).thenReturn(currentBytes);
+ Mockito.when(mockConfigManager.convertBytesToCompactionConfig(currentBytes)).thenReturn(expectedConfig);
+
+ final Response response = mockResource.getCompactionConfig();
+
+ verifyStatus(Response.Status.OK, response);
+ Assert.assertEquals(expectedConfig, response.getEntity());
+ Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG));
+ Mockito.verify(mockConfigManager, Mockito.never()).getCurrentCompactionConfig();
+ }
+
@Test
public void testSetCompactionTaskLimit()
{
@@ -180,6 +203,48 @@ public void testAddDatasourceConfigWithMSQEngineIsInvalid()
);
}
+ @Test
+ public void testAddDatasourceConfigWithBlankIfMatchReturnsBadRequest()
+ {
+ Mockito.when(mockHttpServletRequest.getHeader(HttpHeaders.IF_MATCH)).thenReturn(" ");
+
+ final DataSourceCompactionConfig newDatasourceConfig
+ = InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build();
+ final Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest);
+
+ verifyStatus(Response.Status.BAD_REQUEST, response);
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
+ Assert.assertEquals(
+ "If-Match header must not be blank",
+ ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
+ );
+ }
+
+ @Test
+ public void testAddDatasourceConfigWithIfMatchReturnsPreconditionFailedWhenCompareAndSwapDisabled()
+ {
+ configManager.setCompareAndSwapEnabled(false);
+ Mockito.when(mockHttpServletRequest.getHeader(HttpHeaders.IF_MATCH)).thenReturn("\"etag\"");
+
+ final DataSourceCompactionConfig newDatasourceConfig
+ = InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build();
+ final Response response = resource.addOrUpdateDatasourceCompactionConfig(
+ newDatasourceConfig,
+ mockHttpServletRequest
+ );
+
+ verifyStatus(Response.Status.PRECONDITION_FAILED, response);
+ @SuppressWarnings("unchecked")
+ final Map entity = (Map) response.getEntity();
+ Assert.assertEquals(
+ "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key["
+ + DruidCompactionConfig.CONFIG_KEY
+ + "]",
+ entity.get("error")
+ );
+ Assert.assertTrue(configManager.getCurrentCompactionConfig().getCompactionConfigs().isEmpty());
+ }
+
@Test
public void testUpdateDatasourceConfig()
{
@@ -417,6 +482,7 @@ public int removeAuditLogsOlderThan(long timestamp)
private static class TestCoordinatorConfigManager extends CoordinatorConfigManager
{
private final ConfigManager delegate;
+ private final TestConfigManagerConfig configManagerConfig;
private int numUpdateAttempts;
private ConfigManager.SetResult configUpdateResult;
@@ -432,15 +498,17 @@ public String getConfigTable()
};
final TestDBConnector dbConnector = new TestDBConnector();
+ final TestConfigManagerConfig configManagerConfig = new TestConfigManagerConfig();
final ConfigManager configManager = new ConfigManager(
dbConnector,
Suppliers.ofInstance(tablesConfig),
- Suppliers.ofInstance(new TestConfigManagerConfig())
+ Suppliers.ofInstance(configManagerConfig)
);
return new TestCoordinatorConfigManager(
new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager),
configManager,
+ configManagerConfig,
auditManager,
dbConnector,
tablesConfig
@@ -450,6 +518,7 @@ public String getConfigTable()
TestCoordinatorConfigManager(
JacksonConfigManager jackson,
ConfigManager configManager,
+ TestConfigManagerConfig configManagerConfig,
AuditManager auditManager,
TestDBConnector dbConnector,
MetadataStorageTablesConfig tablesConfig
@@ -457,17 +526,24 @@ public String getConfigTable()
{
super(jackson, dbConnector, tablesConfig, auditManager);
this.delegate = configManager;
+ this.configManagerConfig = configManagerConfig;
+ }
+
+ void setCompareAndSwapEnabled(boolean enabled)
+ {
+ configManagerConfig.enableCompareAndSwap = enabled;
}
@Override
public ConfigManager.SetResult getAndUpdateCompactionConfig(
UnaryOperator operator,
+ String ifMatchEtag,
AuditInfo auditInfo
)
{
++numUpdateAttempts;
if (configUpdateResult == null) {
- return super.getAndUpdateCompactionConfig(operator, auditInfo);
+ return super.getAndUpdateCompactionConfig(operator, ifMatchEtag, auditInfo);
} else {
return configUpdateResult;
}
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java
index 87d99b0759bd..4e7c6a17e2eb 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java
@@ -21,17 +21,27 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigEtag;
+import org.apache.druid.common.config.ConfigManager.SetResult;
+import org.apache.druid.error.ErrorResponse;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.ServerCloneStatus;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.function.UnaryOperator;
public class CoordinatorDynamicConfigsResourceTest
{
@@ -49,6 +59,34 @@ public void setUp() throws Exception
cloneStatusManager = EasyMock.createStrictMock(CloneStatusManager.class);
}
+ @Test
+ public void testGetDynamicConfigsDerivesBodyAndEtagFromSameBytes()
+ {
+ final byte[] currentBytes = "current-dynamic-config".getBytes(StandardCharsets.UTF_8);
+ final CoordinatorDynamicConfig expectedConfig = CoordinatorDynamicConfig.builder()
+ .withPauseCoordination(true)
+ .build();
+
+ EasyMock.expect(manager.getCurrentDynamicConfigBytes()).andReturn(currentBytes).once();
+ EasyMock.expect(manager.convertBytesToDynamicConfig(EasyMock.aryEq(currentBytes)))
+ .andReturn(expectedConfig)
+ .once();
+ EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager);
+
+ final Response response = new CoordinatorDynamicConfigsResource(
+ manager,
+ auditManager,
+ coordinatorDynamicConfigSyncer,
+ cloneStatusManager
+ ).getDynamicConfigs();
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(expectedConfig, response.getEntity());
+ Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG));
+
+ EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager);
+ }
+
@Test
public void testGetBrokerStatus()
{
@@ -107,4 +145,67 @@ public void testGetCloneStatus()
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ServerCloneStatus.unknown("hist4", "hist3"), response.getEntity());
}
+
+ @Test
+ public void testSetDynamicConfigsUsesTransformUpdate()
+ {
+ final String etag = "\"etag\"";
+ final HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class);
+ final CoordinatorDynamicConfig.Builder updateBuilder = CoordinatorDynamicConfig.builder()
+ .withPauseCoordination(true);
+ final CoordinatorDynamicConfig currentConfig = CoordinatorDynamicConfig.builder()
+ .withMaxSegmentsToMove(5)
+ .build();
+ final Capture> updateCapture = EasyMock.newCapture();
+
+ EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(etag).once();
+ EasyMock.expect(
+ manager.updateDynamicConfig(
+ EasyMock.capture(updateCapture),
+ EasyMock.eq(etag),
+ EasyMock.anyObject(AuditInfo.class)
+ )
+ ).andReturn(SetResult.ok()).once();
+ coordinatorDynamicConfigSyncer.queueBroadcastConfigToBrokers();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request);
+
+ final Response response = new CoordinatorDynamicConfigsResource(
+ manager,
+ auditManager,
+ coordinatorDynamicConfigSyncer,
+ cloneStatusManager
+ ).setDynamicConfigs(updateBuilder, request);
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(updateBuilder.build(currentConfig), updateCapture.getValue().apply(currentConfig));
+
+ EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request);
+ }
+
+ @Test
+ public void testSetDynamicConfigsWithBlankIfMatchReturnsBadRequest()
+ {
+ final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class);
+
+ EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once();
+ EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request);
+
+ final CoordinatorDynamicConfigsResource resource = new CoordinatorDynamicConfigsResource(
+ manager,
+ auditManager,
+ coordinatorDynamicConfigSyncer,
+ cloneStatusManager
+ );
+
+ final Response response = resource.setDynamicConfigs(CoordinatorDynamicConfig.builder(), request);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertTrue(response.getEntity() instanceof ErrorResponse);
+ Assert.assertEquals(
+ "If-Match header must not be blank",
+ ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage()
+ );
+
+ EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request);
+ }
}
diff --git a/website/.spelling b/website/.spelling
index c85463563e13..3e4fda27b7ec 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -105,6 +105,7 @@ ECS
EMR
EMRFS
ETL
+ETag
Elasticsearch
Enums
FIRST_VALUE