From 3028183f75634173adc1f397772de5d7042b38f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Wed, 27 May 2026 19:35:26 -0700 Subject: [PATCH 1/2] feat: add etag conditional write support to dynamic config --- .../api-reference/automatic-compaction-api.md | 64 +++++- .../dynamic-configuration-api.md | 65 +++++- ...KubernetesTaskExecutionConfigResource.java | 56 +++-- ...rnetesTaskExecutionConfigResourceTest.java | 198 +++++++++++------ .../http/OverlordCompactionResource.java | 12 +- .../overlord/http/OverlordResource.java | 36 ++- .../http/OverlordCompactionResourceTest.java | 36 ++- .../overlord/http/OverlordResourceTest.java | 46 ++++ .../druid/common/config/ConfigEtag.java | 88 ++++++++ .../druid/common/config/ConfigManager.java | 45 +++- .../common/config/JacksonConfigManager.java | 107 +++++++++ .../apache/druid/error/DruidException.java | 5 + .../druid/common/config/ConfigEtagTest.java | 104 +++++++++ .../config/JacksonConfigManagerTest.java | 209 ++++++++++++++++++ .../coordinator/CoordinatorConfigManager.java | 145 +++++++++++- .../CoordinatorBrokerConfigsResource.java | 37 ++-- .../CoordinatorCompactionConfigsResource.java | 21 +- .../CoordinatorDynamicConfigsResource.java | 20 +- .../server/http/DynamicConfigEtagHelper.java | 111 ++++++++++ .../CoordinatorBrokerConfigsResourceTest.java | 108 +++++---- ...rdinatorCompactionConfigsResourceTest.java | 79 ++++++- ...CoordinatorDynamicConfigsResourceTest.java | 101 +++++++++ website/.spelling | 1 + 23 files changed, 1483 insertions(+), 211 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java create mode 100644 processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java create mode 100644 server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java diff --git a/docs/api-reference/automatic-compaction-api.md b/docs/api-reference/automatic-compaction-api.md index 4f2ce3d62afd..21b6131bd4bf 100644 --- a/docs/api-reference/automatic-compaction-api.md +++ b/docs/api-reference/automatic-compaction-api.md @@ -37,6 +37,14 @@ Instead of the automatic compaction API, you can use the supervisor API to submi In this topic, `http://ROUTER_IP:ROUTER_PORT` is a placeholder for your Router service address and port. Replace it with the information for your deployment. For example, use `http://localhost:8888` for quickstart deployments. +## Concurrency control with ETag and If-Match + +The automatic compaction write endpoints support optimistic concurrency control using HTTP `ETag` and `If-Match`. `GET /druid/coordinator/v1/config/compaction` returns an `ETag` header whose value is a stable hash of the underlying compaction configuration document (the union of cluster-level config and all per-datasource configs). + +To guard a write against concurrent updates, pass the most recently observed ETag back in an `If-Match` header on `POST` or `DELETE`. The change only commits if the stored configuration still hashes to that ETag; otherwise the request fails with `412 Precondition Failed`. Re-`GET` to obtain the new value and ETag, re-apply your change, and retry. The `If-Match` header is optional; requests without it preserve the previous last-writer-wins behavior. `If-Match: *` matches any existing configuration. + +Endpoints with this behavior: `POST /druid/coordinator/v1/config/compaction`, `DELETE /druid/coordinator/v1/config/compaction/{dataSource}`, `POST /druid/coordinator/v1/config/compaction/taskslots`, and the unified `POST /druid/indexer/v1/compaction/config/cluster`. See the equivalent section in the [Dynamic configuration API](./dynamic-configuration-api.md#concurrency-control-with-etag-and-if-match) for the general protocol and a client-flow example. + ## Manage automatic compaction ### Create or update automatic compaction configuration @@ -51,6 +59,12 @@ Note that this endpoint returns an HTTP `200 OK` message code even if the dataso `POST` `/druid/coordinator/v1/config/compaction` +#### Header parameters + +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/coordinator/v1/config/compaction`. When supplied, the update only commits if the stored compaction configuration still matches this ETag. Pass `*` to require only that some value is already stored. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### Responses @@ -60,6 +74,12 @@ Note that this endpoint returns an HTTP `200 OK` message code even if the dataso *Successfully submitted auto compaction configuration* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -140,6 +160,12 @@ Removes the automatic compaction configuration for a datasource. This updates th `DELETE` `/druid/coordinator/v1/config/compaction/{dataSource}` +#### Header parameters + +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/coordinator/v1/config/compaction`. When supplied, the delete only commits if the stored compaction configuration still matches this ETag. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### Responses @@ -155,6 +181,12 @@ Removes the automatic compaction configuration for a datasource. This updates th *Datasource does not have automatic compaction or invalid datasource name* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -215,6 +247,12 @@ To limit the maximum number of compaction tasks, use the optional query paramete * Default: 2147483647 * Limits the maximum number of task slots for compaction tasks. +#### Header parameters + +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/coordinator/v1/config/compaction`. When supplied, the update only commits if the stored compaction configuration still matches this ETag. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### Responses @@ -230,6 +268,12 @@ To limit the maximum number of compaction tasks, use the optional query paramete *Invalid `max` value* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -270,6 +314,8 @@ Retrieves all automatic compaction configurations. Returns a `compactionConfigs` You can use this endpoint to retrieve `compactionTaskSlotRatio` and `maxCompactionTaskSlots` values for managing resource allocation of compaction tasks. +The response includes an `ETag` header that you can pass back in `If-Match` on a subsequent write to detect concurrent updates; see [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### URL `GET` `/druid/coordinator/v1/config/compaction` @@ -281,7 +327,7 @@ You can use this endpoint to retrieve `compactionTaskSlotRatio` and `maxCompacti -*Successfully retrieved automatic compaction configurations* +*Successfully retrieved automatic compaction configurations. The `ETag` response header carries an opaque identifier for the returned configuration version.* @@ -926,6 +972,12 @@ This policy specifies the datasources and intervals eligible for compaction and `POST` `/druid/indexer/v1/compaction/config/cluster` +#### Header parameters + +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/indexer/v1/compaction/config/cluster` (or `GET /druid/coordinator/v1/config/compaction`, since both reflect the same underlying configuration). When supplied, the update only commits if the stored configuration still matches this ETag. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### Responses @@ -941,6 +993,12 @@ This policy specifies the datasources and intervals eligible for compaction and *Invalid `max` value* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -1002,6 +1060,8 @@ A successful request returns an HTTP `200 OK` message code and an empty response Retrieves cluster-level configuration for compaction tasks which applies to all datasources, unless explicitly overridden in the datasource compaction config. This includes all the fields listed in [Update cluster-level compaction config](#update-cluster-level-compaction-config). +The response includes an `ETag` header that you can pass back in `If-Match` on a subsequent write; see [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### URL `GET` `/druid/indexer/v1/compaction/config/cluster` @@ -1012,7 +1072,7 @@ This includes all the fields listed in [Update cluster-level compaction config]( -*Successfully retrieved cluster compaction configuration* +*Successfully retrieved cluster compaction configuration. The `ETag` response header carries an opaque identifier for the returned configuration version.* diff --git a/docs/api-reference/dynamic-configuration-api.md b/docs/api-reference/dynamic-configuration-api.md index 7805d7287c4e..53999abda49e 100644 --- a/docs/api-reference/dynamic-configuration-api.md +++ b/docs/api-reference/dynamic-configuration-api.md @@ -33,6 +33,32 @@ In this topic, `http://ROUTER_IP:ROUTER_PORT` is a placeholder for your Router s Replace it with the information for your deployment. For example, use `http://localhost:8888` for quickstart deployments. +## Concurrency control with ETag and If-Match + +Without coordination, two concurrent updates to the same dynamic configuration silently overwrite each other—the last writer wins and the earlier writer has no signal that their change was lost. To prevent this, the dynamic configuration endpoints support an optimistic-concurrency-control protocol modeled on HTTP `ETag` and `If-Match`: + +* Every `GET` response for a dynamic configuration includes an `ETag` header. The value is a stable hash of the stored configuration bytes—identical bytes always produce the same ETag. +* On a `POST` (or `DELETE`), supply the ETag you last observed in an `If-Match` header. The server only commits the change if the currently stored configuration still hashes to that ETag. +* If another writer changed the configuration after your read, your request fails with `412 Precondition Failed`. Re-`GET` to obtain the new value and ETag, re-apply your change, and retry. +* The `If-Match` header is **optional**. Requests without it preserve the previous last-writer-wins behavior. +* `If-Match: *` is supported and matches any existing configuration (it only fails if no value has been stored yet). + +Typical client flow: + +```shell +# Read current config and capture its ETag +ETAG=$(curl -sD - "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" | awk '/^ETag:/ {print $2}' | tr -d '\r') + +# Submit an update guarded by the ETag +curl -X POST "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \ + -H "Content-Type: application/json" \ + -H "If-Match: $ETAG" \ + -d @new-config.json +# 200 on success, 412 if someone else updated the config in between. +``` + +Endpoints that support this protocol are noted individually below. + ## Coordinator dynamic configuration The Coordinator has dynamic configurations to tune certain behavior on the fly, without requiring a service restart. @@ -40,7 +66,7 @@ For information on the supported properties, see [Coordinator dynamic configurat ### Get dynamic configuration -Retrieves the current Coordinator dynamic configuration. Returns a JSON object with the dynamic configuration properties. +Retrieves the current Coordinator dynamic configuration. Returns a JSON object with the dynamic configuration properties. The response includes an `ETag` header that can be used with `If-Match` on a subsequent update; see [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). #### URL @@ -53,7 +79,7 @@ Retrieves the current Coordinator dynamic configuration. Returns a JSON object w -*Successfully retrieved dynamic configuration* +*Successfully retrieved dynamic configuration. The `ETag` response header carries an opaque identifier for the returned configuration version.* @@ -136,6 +162,9 @@ The endpoint supports a set of optional header parameters to populate the `autho * `X-Druid-Comment` * Type: String * Description for the update. +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/coordinator/v1/config`. When supplied, the update only commits if the currently stored configuration still matches this ETag. Pass `*` to require only that some value is already stored. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). #### Responses @@ -146,6 +175,12 @@ The endpoint supports a set of optional header parameters to populate the `autho *Successfully updated dynamic configuration* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -319,7 +354,7 @@ These settings control broker behavior such as query blocking rules and default ### Get broker dynamic configuration -Retrieves the current Broker dynamic configuration. Returns a JSON object with the dynamic configuration properties. +Retrieves the current Broker dynamic configuration. Returns a JSON object with the dynamic configuration properties. The response includes an `ETag` header that can be used with `If-Match` on a subsequent update; see [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). #### URL @@ -332,7 +367,7 @@ Retrieves the current Broker dynamic configuration. Returns a JSON object with t -*Successfully retrieved broker dynamic configuration* +*Successfully retrieved broker dynamic configuration. The `ETag` response header carries an opaque identifier for the returned configuration version.* @@ -412,6 +447,10 @@ The endpoint supports a set of optional header parameters to populate the audit * Type: String * Comment describing the change. +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/coordinator/v1/broker/config`. When supplied, the update only commits if the currently stored configuration still matches this ETag. Pass `*` to require only that some value is already stored. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). + #### Responses @@ -421,6 +460,12 @@ The endpoint supports a set of optional header parameters to populate the audit *Successfully updated configuration* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + @@ -719,6 +764,7 @@ For information on the supported properties, see [Overlord dynamic configuration Retrieves the current Overlord dynamic configuration. Returns a JSON object with the dynamic configuration properties. Returns an empty response body if there is no current Overlord dynamic configuration. +When a configuration is present, the response includes an `ETag` header that can be used with `If-Match` on a subsequent update; see [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). #### URL @@ -731,7 +777,7 @@ Returns an empty response body if there is no current Overlord dynamic configura -*Successfully retrieved dynamic configuration* +*Successfully retrieved dynamic configuration. When a configuration is stored, the `ETag` response header carries an opaque identifier for the returned configuration version.* @@ -799,6 +845,9 @@ The endpoint supports a set of optional header parameters to populate the `autho * `X-Druid-Comment` * Type: String * Description for the update. +* `If-Match` + * Type: String + * Optional. Quoted ETag previously returned by `GET /druid/indexer/v1/worker`. When supplied, the update only commits if the currently stored configuration still matches this ETag. Pass `*` to require only that some value is already stored. See [Concurrency control with ETag and If-Match](#concurrency-control-with-etag-and-if-match). #### Responses @@ -809,6 +858,12 @@ The endpoint supports a set of optional header parameters to populate the `autho *Successfully updated dynamic configuration* + + + + +*The `If-Match` header did not match the currently stored configuration, or another writer committed a change between this request's precondition check and write. Re-read the configuration and retry.* + diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java index 432a41933ede..3ede2d82627e 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResource.java @@ -25,8 +25,10 @@ import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.http.DynamicConfigEtagHelper; +import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; import org.joda.time.Interval; @@ -43,7 +45,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** * Resource that manages Kubernetes-specific execution configurations for running tasks. @@ -54,10 +55,8 @@ @Path("/druid/indexer/v1/k8s/taskrunner/executionconfig") public class KubernetesTaskExecutionConfigResource { - private static final Logger log = new Logger(KubernetesTaskExecutionConfigResource.class); private final JacksonConfigManager configManager; private final AuditManager auditManager; - private AtomicReference dynamicConfigRef = null; @Inject public KubernetesTaskExecutionConfigResource( @@ -84,23 +83,23 @@ public Response setExecutionConfig( @Context final HttpServletRequest req ) { - KubernetesTaskRunnerDynamicConfig currentConfig = getDynamicConfig(); - KubernetesTaskRunnerDynamicConfig mergedConfig = dynamicConfig; - - if (currentConfig != null) { - mergedConfig = currentConfig.merge(dynamicConfig); + try { + final ConfigManager.SetResult setResult = configManager.setIfMatch( + KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, + DynamicConfigEtagHelper.getIfMatch(req), + KubernetesTaskRunnerDynamicConfig.class, + null, + currentConfig -> currentConfig == null ? dynamicConfig : currentConfig.merge(dynamicConfig), + AuthorizationUtils.buildAuditInfo(req) + ); + if (setResult.isOk()) { + return Response.ok().build(); + } else { + return DynamicConfigEtagHelper.toErrorResponse(setResult); + } } - final ConfigManager.SetResult setResult = configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - mergedConfig, - AuthorizationUtils.buildAuditInfo(req) - ); - if (setResult.isOk()) { - log.info("Updating K8s execution configs: %s", mergedConfig); - - return Response.ok().build(); - } else { - return Response.status(Response.Status.BAD_REQUEST).build(); + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); } } @@ -154,14 +153,13 @@ public Response getExecutionConfigHistory( @ResourceFilters(ConfigResourceFilter.class) public Response getExecutionConfig() { - return Response.ok(getDynamicConfig()).build(); - } - - private KubernetesTaskRunnerDynamicConfig getDynamicConfig() - { - if (dynamicConfigRef == null) { - dynamicConfigRef = configManager.watch(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class); - } - return dynamicConfigRef.get(); + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + () -> configManager.getCurrentBytes(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + currentBytes -> configManager.convertByteToConfig( + currentBytes, + KubernetesTaskRunnerDynamicConfig.class, + null + ) + ); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java index 95371b7affda..42d00625a92f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/execution/KubernetesTaskExecutionConfigResourceTest.java @@ -19,19 +19,24 @@ package org.apache.druid.k8s.overlord.execution; +import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.error.ErrorResponse; import org.apache.druid.server.security.AuthConfig; -import org.apache.druid.server.security.AuthorizationUtils; +import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; -import java.util.concurrent.atomic.AtomicReference; +import java.nio.charset.StandardCharsets; +import java.util.function.UnaryOperator; public class KubernetesTaskExecutionConfigResourceTest { @@ -49,169 +54,232 @@ public void setUp() dynamicConfig = EasyMock.createMock(KubernetesTaskRunnerDynamicConfig.class); } + @Test + public void getExecutionConfigDerivesBodyAndEtagFromSameBytes() + { + final byte[] currentBytes = "current-k8s-execution-config".getBytes(StandardCharsets.UTF_8); + final KubernetesTaskRunnerDynamicConfig expectedConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(new TaskTypePodTemplateSelectStrategy(), 10); + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + configManager, + auditManager + ); + + EasyMock.expect(configManager.getCurrentBytes(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY)) + .andReturn(currentBytes) + .once(); + EasyMock.expect( + configManager.convertByteToConfig( + EasyMock.aryEq(currentBytes), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull() + ) + ).andReturn(expectedConfig).once(); + EasyMock.replay(configManager, auditManager); + + final Response result = testedResource.getExecutionConfig(); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + Assertions.assertEquals(expectedConfig, result.getEntity()); + Assertions.assertEquals(ConfigEtag.compute(currentBytes), result.getMetadata().getFirst(HttpHeaders.ETAG)); + } + @Test public void setExecutionConfigSuccessfulUpdate() { - EasyMock.expect(configManager.watch( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - KubernetesTaskRunnerDynamicConfig.class - )).andReturn(new AtomicReference<>(null)); - KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager ); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader("If-Match")).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); EasyMock.replay(req); - EasyMock.expect(configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - dynamicConfig, - AuthorizationUtils.buildAuditInfo(req) + EasyMock.expect(configManager.setIfMatch( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull(), + EasyMock.anyObject(UnaryOperator.class), + EasyMock.anyObject(AuditInfo.class) )).andReturn(ConfigManager.SetResult.ok()); EasyMock.replay(configManager, auditManager, dynamicConfig); - Response result = testedResource.setExecutionConfig(dynamicConfig, req); + final Response result = testedResource.setExecutionConfig(dynamicConfig, req); Assertions.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); } @Test public void setExecutionConfigFailedUpdate() { - EasyMock.expect(configManager.watch( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - KubernetesTaskRunnerDynamicConfig.class - )).andReturn(new AtomicReference<>(null)); - KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager ); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader("If-Match")).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); EasyMock.replay(req); - EasyMock.expect(configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - dynamicConfig, - AuthorizationUtils.buildAuditInfo(req) + EasyMock.expect(configManager.setIfMatch( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull(), + EasyMock.anyObject(UnaryOperator.class), + EasyMock.anyObject(AuditInfo.class) )).andReturn(ConfigManager.SetResult.failure(new RuntimeException())); EasyMock.replay(configManager, auditManager, dynamicConfig); - Response result = testedResource.setExecutionConfig(dynamicConfig, req); + final Response result = testedResource.setExecutionConfig(dynamicConfig, req); Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), result.getStatus()); } + @Test + public void setExecutionConfigWithBlankIfMatchReturnsBadRequest() + { + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + configManager, + auditManager + ); + EasyMock.expect(req.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once(); + EasyMock.replay(req, configManager, auditManager, dynamicConfig); + + final Response result = testedResource.setExecutionConfig(dynamicConfig, req); + + Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), result.getStatus()); + Assertions.assertTrue(result.getEntity() instanceof ErrorResponse); + Assertions.assertEquals( + "If-Match header must not be blank", + ((ErrorResponse) result.getEntity()).getUnderlyingException().getMessage() + ); + } + @Test public void setExecutionConfig_MergeUsesCurrentCapacityWhenRequestCapacityNull() { - KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager ); - PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); - KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 5); - EasyMock.expect(configManager.watch( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - KubernetesTaskRunnerDynamicConfig.class - )).andReturn(new AtomicReference<>(currentConfig)); + final PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + final KubernetesTaskRunnerDynamicConfig currentConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 5); - PodTemplateSelectStrategy requestStrategy = new TaskTypePodTemplateSelectStrategy(); - KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, null); + final PodTemplateSelectStrategy requestStrategy = new TaskTypePodTemplateSelectStrategy(); + final KubernetesTaskRunnerDynamicConfig requestConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, null); - KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, 5); + final KubernetesTaskRunnerDynamicConfig expectedMergedConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(requestStrategy, 5); + final Capture> updateCapture = EasyMock.newCapture(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader("If-Match")).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); EasyMock.replay(req); - EasyMock.expect(configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - expectedMergedConfig, - AuthorizationUtils.buildAuditInfo(req) + EasyMock.expect(configManager.setIfMatch( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull(), + EasyMock.capture(updateCapture), + EasyMock.anyObject(AuditInfo.class) )).andReturn(ConfigManager.SetResult.ok()); EasyMock.replay(configManager, auditManager); - Response result = testedResource.setExecutionConfig(requestConfig, req); + final Response result = testedResource.setExecutionConfig(requestConfig, req); Assertions.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + Assertions.assertEquals(expectedMergedConfig, updateCapture.getValue().apply(currentConfig)); } @Test public void setExecutionConfig_MergeUsesCurrentStrategyWhenRequestStrategyNull() { - KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager ); - PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); - KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 2); - EasyMock.expect(configManager.watch( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - KubernetesTaskRunnerDynamicConfig.class - )).andReturn(new AtomicReference<>(currentConfig)); + final PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + final KubernetesTaskRunnerDynamicConfig currentConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 2); - KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, 7); + final KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, 7); - KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 7); + final KubernetesTaskRunnerDynamicConfig expectedMergedConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 7); + final Capture> updateCapture = EasyMock.newCapture(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader("If-Match")).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); EasyMock.replay(req); - EasyMock.expect(configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - expectedMergedConfig, - AuthorizationUtils.buildAuditInfo(req) + EasyMock.expect(configManager.setIfMatch( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull(), + EasyMock.capture(updateCapture), + EasyMock.anyObject(AuditInfo.class) )).andReturn(ConfigManager.SetResult.ok()); EasyMock.replay(configManager, auditManager); - Response result = testedResource.setExecutionConfig(requestConfig, req); + final Response result = testedResource.setExecutionConfig(requestConfig, req); Assertions.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + Assertions.assertEquals(expectedMergedConfig, updateCapture.getValue().apply(currentConfig)); } @Test public void setExecutionConfig_MergeUsesCurrentWhenBothRequestFieldsNull() { - KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( + final KubernetesTaskExecutionConfigResource testedResource = new KubernetesTaskExecutionConfigResource( configManager, auditManager ); - PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); - KubernetesTaskRunnerDynamicConfig currentConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); - EasyMock.expect(configManager.watch( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - KubernetesTaskRunnerDynamicConfig.class - )).andReturn(new AtomicReference<>(currentConfig)); + final PodTemplateSelectStrategy currentStrategy = new TaskTypePodTemplateSelectStrategy(); + final KubernetesTaskRunnerDynamicConfig currentConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); - KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, null); + final KubernetesTaskRunnerDynamicConfig requestConfig = new DefaultKubernetesTaskRunnerDynamicConfig(null, null); - KubernetesTaskRunnerDynamicConfig expectedMergedConfig = new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); + final KubernetesTaskRunnerDynamicConfig expectedMergedConfig = + new DefaultKubernetesTaskRunnerDynamicConfig(currentStrategy, 9); + final Capture> updateCapture = EasyMock.newCapture(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn(null).anyTimes(); EasyMock.expect(req.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn(null).anyTimes(); + EasyMock.expect(req.getHeader("If-Match")).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(null).anyTimes(); EasyMock.expect(req.getRemoteAddr()).andReturn("127.0.0.1").anyTimes(); EasyMock.replay(req); - EasyMock.expect(configManager.set( - KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, - expectedMergedConfig, - AuthorizationUtils.buildAuditInfo(req) + EasyMock.expect(configManager.setIfMatch( + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(KubernetesTaskRunnerDynamicConfig.class), + (KubernetesTaskRunnerDynamicConfig) EasyMock.isNull(), + EasyMock.capture(updateCapture), + EasyMock.anyObject(AuditInfo.class) )).andReturn(ConfigManager.SetResult.ok()); EasyMock.replay(configManager, auditManager); - Response result = testedResource.setExecutionConfig(requestConfig, req); + final Response result = testedResource.setExecutionConfig(requestConfig, req); Assertions.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus()); + Assertions.assertEquals(expectedMergedConfig, updateCapture.getValue().apply(currentConfig)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java index 7d90bf1047b6..218968136021 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java @@ -44,6 +44,7 @@ import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry; +import org.apache.druid.server.http.DynamicConfigEtagHelper; import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -123,7 +124,11 @@ public Response updateClusterCompactionConfig( { final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateClusterCompactionConfig(updatePayload, auditInfo) + () -> configManager.updateClusterCompactionConfig( + updatePayload, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) ); } @@ -133,8 +138,9 @@ public Response updateClusterCompactionConfig( @ResourceFilters(ConfigResourceFilter.class) public Response getClusterCompactionConfig() { - return ServletResourceUtils.buildReadResponse( - configManager::getClusterCompactionConfig + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + configManager::getCurrentCompactionConfigBytes, + currentBytes -> configManager.convertBytesToCompactionConfig(currentBytes).clusterConfig() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index a12a9ea22feb..f48b75ae79db 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -60,6 +60,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.server.http.DynamicConfigEtagHelper; import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.ServletResourceUtils; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -426,7 +427,14 @@ public Response getMultipleTaskStatuses(Set taskIds) @ResourceFilters(ConfigResourceFilter.class) public Response getWorkerConfig() { - return Response.ok(taskQueryTool.getLatestWorkerConfig()).build(); + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + () -> configManager.getCurrentBytes(WorkerBehaviorConfig.CONFIG_KEY), + currentBytes -> configManager.convertByteToConfig( + currentBytes, + WorkerBehaviorConfig.class, + null + ) + ); } /** @@ -455,17 +463,23 @@ public Response setWorkerConfig( @Context final HttpServletRequest req ) { - final SetResult setResult = configManager.set( - WorkerBehaviorConfig.CONFIG_KEY, - workerBehaviorConfig, - AuthorizationUtils.buildAuditInfo(req) - ); - if (setResult.isOk()) { - log.info("Updating Worker configs: %s", workerBehaviorConfig); + try { + final SetResult setResult = configManager.setIfMatch( + WorkerBehaviorConfig.CONFIG_KEY, + DynamicConfigEtagHelper.getIfMatch(req), + workerBehaviorConfig, + AuthorizationUtils.buildAuditInfo(req) + ); + if (setResult.isOk()) { + log.info("Updating Worker configs: %s", workerBehaviorConfig); - return Response.ok().build(); - } else { - return Response.status(Response.Status.BAD_REQUEST).build(); + return Response.ok().build(); + } else { + return DynamicConfigEtagHelper.toErrorResponse(setResult); + } + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java index 3518e1dea409..a74e811697c8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Futures; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.CompactionEngine; @@ -41,6 +42,7 @@ import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry; +import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; import org.apache.druid.server.security.AllowAllAuthorizer; import org.apache.druid.server.security.AuthConfig; @@ -55,7 +57,9 @@ import org.junit.Test; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Random; @@ -151,11 +155,16 @@ private void replayAll() @Test public void test_updateClusterConfig() { - EasyMock.expect(configManager.updateClusterCompactionConfig(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(configManager.updateClusterCompactionConfig( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + )) .andReturn(true) .once(); setupMockRequestForAudit(); + EasyMock.expect(httpRequest.getHeader("If-Match")).andReturn(null).once(); replayAll(); Response response = compactionResource.updateClusterCompactionConfig( @@ -166,19 +175,40 @@ public void test_updateClusterConfig() Assert.assertEquals(Map.of("success", true), response.getEntity()); } + @Test + public void test_updateClusterConfigWithBlankIfMatchReturnsBadRequest() + { + setupMockRequestForAudit(); + EasyMock.expect(httpRequest.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once(); + replayAll(); + + final Response response = compactionResource.updateClusterCompactionConfig( + new ClusterCompactionConfig(0.5, 10, null, true, CompactionEngine.MSQ, true), + httpRequest + ); + + verifyInvalidInputResponse(response, "If-Match header must not be blank"); + } + @Test public void test_getClusterConfig() { final ClusterCompactionConfig clusterConfig = new ClusterCompactionConfig(0.4, 100, null, true, CompactionEngine.MSQ, true); - EasyMock.expect(configManager.getClusterCompactionConfig()) - .andReturn(clusterConfig) + final DruidCompactionConfig compactionConfig = DruidCompactionConfig.empty().withClusterConfig(clusterConfig); + final byte[] currentBytes = "current-compaction-config".getBytes(StandardCharsets.UTF_8); + EasyMock.expect(configManager.getCurrentCompactionConfigBytes()) + .andReturn(currentBytes) + .once(); + EasyMock.expect(configManager.convertBytesToCompactionConfig(EasyMock.aryEq(currentBytes))) + .andReturn(compactionConfig) .once(); replayAll(); final Response response = compactionResource.getClusterCompactionConfig(); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(clusterConfig, response.getEntity()); + Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG)); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 288ae1bf387f..ffc903caa0b7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -27,8 +27,10 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.DruidException; +import org.apache.druid.error.ErrorResponse; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; @@ -90,8 +92,10 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1363,6 +1367,48 @@ public void testGetTotalWorkerCapacityWithMaximumCapacity() Assert.assertEquals(expectedWorkerCapacityWithAutoscale, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } + @Test + public void testGetWorkerConfigDerivesBodyAndEtagFromSameBytes() + { + final WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); + final byte[] currentBytes = "current-worker-config".getBytes(StandardCharsets.UTF_8); + + EasyMock.expect(configManager.getCurrentBytes(WorkerBehaviorConfig.CONFIG_KEY)) + .andReturn(currentBytes) + .once(); + EasyMock.expect( + configManager.convertByteToConfig( + EasyMock.aryEq(currentBytes), + EasyMock.eq(WorkerBehaviorConfig.class), + (WorkerBehaviorConfig) EasyMock.isNull() + ) + ).andReturn(workerBehaviorConfig).once(); + replayAll(); + + final Response response = overlordResource.getWorkerConfig(); + + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Assert.assertEquals(workerBehaviorConfig, response.getEntity()); + Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG)); + } + + @Test + public void testSetWorkerConfigWithBlankIfMatchReturnsBadRequest() + { + final WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); + EasyMock.expect(req.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once(); + replayAll(); + + final Response response = overlordResource.setWorkerConfig(workerBehaviorConfig, req); + + Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "If-Match header must not be blank", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + } + @Test public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled() { diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java b/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java new file mode 100644 index 000000000000..310c2ed65a70 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.config; + +import javax.annotation.Nullable; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Base64; + +/** + * Computes and compares ETags for stored dynamic config payloads. The ETag is a + * pure function of the payload bytes — used for {@code If-Match} preconditions. + */ +public final class ConfigEtag +{ + private static final int ETAG_HASH_BYTES = 16; + + private ConfigEtag() + { + } + + /** + * Quoted ETag for the given payload bytes, or {@code null} if {@code bytes} + * is {@code null}. SHA-256 truncated to {@value #ETAG_HASH_BYTES} bytes, + * base64url-encoded (no padding), wrapped in double quotes per RFC 7232. + */ + @Nullable + public static String compute(@Nullable byte[] bytes) + { + if (bytes == null) { + return null; + } + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + byte[] full = md.digest(bytes); + byte[] truncated = Arrays.copyOf(full, ETAG_HASH_BYTES); + return "\"" + Base64.getUrlEncoder().withoutPadding().encodeToString(truncated) + "\""; + } + catch (NoSuchAlgorithmException e) { + // SHA-256 is required by every JRE. + throw new IllegalStateException("SHA-256 not available", e); + } + } + + /** + * Whether {@code ifMatchHeader} matches the ETag of {@code currentBytes}. + * Wildcard {@code *} matches any existing value. A comma-separated list is + * satisfied if any element matches. + */ + public static boolean matches(String ifMatchHeader, @Nullable byte[] currentBytes) + { + if (ifMatchHeader == null) { + return true; + } + final String trimmed = ifMatchHeader.trim(); + if ("*".equals(trimmed)) { + return currentBytes != null; + } + final String currentEtag = compute(currentBytes); + if (currentEtag == null) { + return false; + } + for (String candidate : trimmed.split(",")) { + if (currentEtag.equals(candidate.trim())) { + return true; + } + } + return false; + } +} diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java index 12265df31e27..756405717c64 100644 --- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java @@ -181,6 +181,30 @@ public SetResult set(final String key, final ConfigSerde serde, final T o return set(key, serde, null, obj); } + /** + * Returns the raw payload bytes currently stored for {@code key}, or + * {@code null} if none. Prefers the in-memory watched copy; falls back to the + * metadata store. Returns a defensive copy — the cached array backs equality + * checks and CAS oldValue payloads. + */ + @Nullable + public byte[] getCurrentBytes(final String key) + { + final ConfigHolder holder = watchedConfigs.get(key); + if (holder != null) { + final byte[] cached = holder.rawBytes.get(); + if (cached != null) { + return cached.clone(); + } + } + return dbConnector.lookup(configTable, "name", "payload", key); + } + + public boolean isCompareAndSwapEnabled() + { + return config.get().isEnableCompareAndSwap(); + } + public SetResult set(final String key, final ConfigSerde serde, @Nullable final byte[] oldValue, final T newObject) { if (newObject == null || !started) { @@ -259,10 +283,11 @@ private MetadataCASUpdate createMetadataCASUpdate( public static class SetResult { - private static final SetResult SUCCESS = new SetResult(null, false); + private static final SetResult SUCCESS = new SetResult(null, false, false); private final Exception exception; private final boolean retryableException; + private final boolean preconditionFailed; public static SetResult ok() { @@ -271,18 +296,25 @@ public static SetResult ok() public static SetResult failure(Exception e) { - return new SetResult(e, false); + return new SetResult(e, false, false); } public static SetResult retryableFailure(Exception e) { - return new SetResult(e, true); + return new SetResult(e, true, false); + } + + /** Client-supplied precondition (e.g. {@code If-Match}) failed; maps to HTTP 412. */ + public static SetResult preconditionFailed(Exception e) + { + return new SetResult(e, false, true); } - private SetResult(@Nullable Exception exception, boolean retryableException) + private SetResult(@Nullable Exception exception, boolean retryableException, boolean preconditionFailed) { this.exception = exception; this.retryableException = retryableException; + this.preconditionFailed = preconditionFailed; } public boolean isOk() @@ -295,6 +327,11 @@ public boolean isRetryable() return retryableException; } + public boolean isPreconditionFailed() + { + return preconditionFailed; + } + public Exception getException() { return exception; diff --git a/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java index aa432058f0af..1811cb0232a2 100644 --- a/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; /** */ @@ -127,6 +128,112 @@ public SetResult set( return configManager.set(key, configSerde, oldValue, newValue); } + @Nullable + public byte[] getCurrentBytes(String key) + { + return configManager.getCurrentBytes(key); + } + + public boolean isCompareAndSwapEnabled() + { + return configManager.isCompareAndSwapEnabled(); + } + + /** + * Set the config, optionally guarded by an {@code If-Match}-style + * precondition. When {@code ifMatchEtag} is {@code null}, behaves like + * {@link #set(String, Object, AuditInfo)}. Otherwise the write only commits + * if the currently stored bytes hash to {@code ifMatchEtag}; on mismatch the + * result reports {@link SetResult#isPreconditionFailed() preconditionFailed}. + * + *

The precondition is enforced via metadata-store CAS, so conditional + * writes require {@code druid.manager.config.enableCompareAndSwap} to be + * true (the default). With CAS disabled, {@code If-Match} writes fail as a + * precondition failure instead of silently degrading to last-writer-wins. + */ + public SetResult setIfMatch( + String key, + @Nullable String ifMatchEtag, + T newValue, + AuditInfo auditInfo + ) + { + if (newValue == null) { + return SetResult.failure(new IllegalArgumentException("input obj is null")); + } + if (ifMatchEtag == null) { + return set(key, newValue, auditInfo); + } + if (!configManager.isCompareAndSwapEnabled()) { + return SetResult.preconditionFailed( + new IllegalStateException( + "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + key + "]" + ) + ); + } + final byte[] currentBytes = configManager.getCurrentBytes(key); + if (!ConfigEtag.matches(ifMatchEtag, currentBytes)) { + return SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed for key[" + key + "]") + ); + } + final SetResult result = set(key, currentBytes, newValue, auditInfo); + // Retryable CAS failure here = concurrent writer between our read and CAS; + // surface as precondition failed since the caller asked us to reject that. + if (!result.isOk() && result.isRetryable()) { + return SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed (concurrent update) for key[" + key + "]") + ); + } + return result; + } + + /** + * Applies {@code updateOperator} to the config deserialized from the bytes + * used for {@code If-Match} validation and CAS. This is intended for partial + * updates whose output must be built from the same snapshot that the + * precondition protects. + */ + public SetResult setIfMatch( + String key, + @Nullable String ifMatchEtag, + Class clazz, + @Nullable T defaultVal, + UnaryOperator updateOperator, + AuditInfo auditInfo + ) + { + if (ifMatchEtag != null && !configManager.isCompareAndSwapEnabled()) { + return SetResult.preconditionFailed( + new IllegalStateException( + "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + key + "]" + ) + ); + } + final byte[] currentBytes = configManager.getCurrentBytes(key); + if (ifMatchEtag != null && !ConfigEtag.matches(ifMatchEtag, currentBytes)) { + return SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed for key[" + key + "]") + ); + } + final ConfigSerde configSerde = create(clazz, defaultVal); + final T currentValue = configSerde.deserialize(currentBytes); + final T newValue = updateOperator.apply(currentValue); + if (newValue == null) { + return SetResult.failure(new IllegalArgumentException("input obj is null")); + } + if (ifMatchEtag == null) { + return set(key, newValue, auditInfo); + } + final SetResult result = set(key, currentBytes, newValue, auditInfo); + if (!result.isOk() && result.isRetryable()) { + return SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed (concurrent update) for key[" + key + "]") + ); + } + return result; + } + @VisibleForTesting ConfigSerde create(final Class clazz, final T defaultVal) { diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index 54382f5274b6..e06702a4dba1 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -379,6 +379,11 @@ public enum Category * the current state of the target resource. */ CONFLICT(409), + /** + * Indicates that the request could not be completed due to a conflict with + * headers supplied in the request and the current state of the target resource. + */ + PRECONDITION_FAILED(412), /** * Means that some capacity limit was exceeded, this could be due to throttling or due to some system limit */ diff --git a/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java b/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java new file mode 100644 index 000000000000..ab61d538e8e5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/common/config/ConfigEtagTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.config; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +public class ConfigEtagTest +{ + @Test + public void testComputeIsDeterministic() + { + byte[] bytes = "{\"foo\":\"bar\"}".getBytes(StandardCharsets.UTF_8); + Assert.assertEquals(ConfigEtag.compute(bytes), ConfigEtag.compute(bytes)); + } + + @Test + public void testComputeDiffersForDifferentInput() + { + Assert.assertNotEquals( + ConfigEtag.compute("a".getBytes(StandardCharsets.UTF_8)), + ConfigEtag.compute("b".getBytes(StandardCharsets.UTF_8)) + ); + } + + @Test + public void testComputeIsQuoted() + { + String etag = ConfigEtag.compute(new byte[]{1, 2, 3}); + Assert.assertNotNull(etag); + Assert.assertTrue("ETag must be quoted: " + etag, etag.startsWith("\"") && etag.endsWith("\"")); + } + + @Test + public void testComputeNullInput() + { + Assert.assertNull(ConfigEtag.compute(null)); + } + + @Test + public void testMatchesNullHeaderAlwaysTrue() + { + Assert.assertTrue(ConfigEtag.matches(null, new byte[]{1, 2, 3})); + Assert.assertTrue(ConfigEtag.matches(null, null)); + } + + @Test + public void testMatchesWildcard() + { + Assert.assertTrue(ConfigEtag.matches("*", new byte[]{1, 2, 3})); + Assert.assertFalse("wildcard must not match absent value", ConfigEtag.matches("*", null)); + } + + @Test + public void testMatchesExact() + { + byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8); + String etag = ConfigEtag.compute(bytes); + Assert.assertTrue(ConfigEtag.matches(etag, bytes)); + } + + @Test + public void testMatchesMismatch() + { + byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8); + String wrongEtag = ConfigEtag.compute("other".getBytes(StandardCharsets.UTF_8)); + Assert.assertFalse(ConfigEtag.matches(wrongEtag, bytes)); + } + + @Test + public void testMatchesAgainstNullCurrent() + { + String etag = ConfigEtag.compute(new byte[]{1}); + Assert.assertFalse(ConfigEtag.matches(etag, null)); + } + + @Test + public void testMatchesCommaSeparatedList() + { + byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8); + String correct = ConfigEtag.compute(bytes); + String header = "\"bogus\", " + correct + ", \"another\""; + Assert.assertTrue(ConfigEtag.matches(header, bytes)); + } +} diff --git a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java index c4cf97a236f5..7da2f4aa0bca 100644 --- a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java +++ b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java @@ -36,6 +36,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; @ExtendWith(MockitoExtension.class) public class JacksonConfigManagerTest @@ -77,6 +78,214 @@ public void testSet() Assertions.assertNotNull(auditCapture.getValue()); } + @Test + public void testSetIfMatchNullEtagDelegatesToUnconditionalSet() + { + String key = "key"; + TestConfig val = new TestConfig("v", "s", 1); + AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + Mockito.when(mockConfigManager.set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.isNull(), + Mockito.eq(val) + )).thenReturn(ConfigManager.SetResult.ok()); + + ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, null, val, auditInfo); + + Assertions.assertTrue(result.isOk()); + Mockito.verify(mockConfigManager).set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.isNull(), + Mockito.eq(val) + ); + } + + @Test + public void testSetIfMatchPreconditionPassesForMatchingEtag() + { + String key = "key"; + TestConfig val = new TestConfig("v", "s", 1); + AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + byte[] currentBytes = "current".getBytes(java.nio.charset.StandardCharsets.UTF_8); + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true); + Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes); + Mockito.when(mockConfigManager.set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.eq(currentBytes), + Mockito.eq(val) + )).thenReturn(ConfigManager.SetResult.ok()); + + String etag = ConfigEtag.compute(currentBytes); + ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, etag, val, auditInfo); + + Assertions.assertTrue(result.isOk()); + Assertions.assertFalse(result.isPreconditionFailed()); + Mockito.verify(mockConfigManager).set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.eq(currentBytes), + Mockito.eq(val) + ); + } + + @Test + public void testSetIfMatchPreconditionFailsForMismatchedEtag() + { + String key = "key"; + TestConfig val = new TestConfig("v", "s", 1); + AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true); + Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn("current".getBytes(java.nio.charset.StandardCharsets.UTF_8)); + + ConfigManager.SetResult result = jacksonConfigManager.setIfMatch( + key, + ConfigEtag.compute("stale".getBytes(java.nio.charset.StandardCharsets.UTF_8)), + val, + auditInfo + ); + + Assertions.assertFalse(result.isOk()); + Assertions.assertTrue(result.isPreconditionFailed()); + Mockito.verify(mockConfigManager, Mockito.never()).set( + Mockito.anyString(), + Mockito.any(ConfigSerde.class), + Mockito.any(byte[].class), + Mockito.any() + ); + } + + @Test + public void testSetIfMatchPreconditionFailsWhenCompareAndSwapDisabled() + { + final String key = "key"; + final TestConfig val = new TestConfig("v", "s", 1); + final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(false); + + final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch(key, "\"etag\"", val, auditInfo); + + Assertions.assertFalse(result.isOk()); + Assertions.assertTrue(result.isPreconditionFailed()); + Mockito.verify(mockConfigManager, Mockito.never()).getCurrentBytes(Mockito.anyString()); + Mockito.verify(mockConfigManager, Mockito.never()).set( + Mockito.anyString(), + Mockito.any(ConfigSerde.class), + Mockito.any(byte[].class), + Mockito.any() + ); + Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class)); + } + + @Test + public void testSetIfMatchTransformBuildsNewValueFromMatchedBytes() + { + final String key = "key"; + final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + final TestConfig current = new TestConfig("v1", "current", 1); + final TestConfig updated = new TestConfig("v1", "updated", 2); + final ConfigSerde serde = jacksonConfigManager.create(TestConfig.class, null); + final byte[] currentBytes = serde.serialize(current); + + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true); + Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes); + Mockito.when(mockConfigManager.set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.eq(currentBytes), + Mockito.eq(updated) + )).thenReturn(ConfigManager.SetResult.ok()); + + final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch( + key, + ConfigEtag.compute(currentBytes), + TestConfig.class, + null, + currentValue -> new TestConfig(currentValue.getVersion(), "updated", currentValue.getSettingInt() + 1), + auditInfo + ); + + Assertions.assertTrue(result.isOk()); + Mockito.verify(mockConfigManager).set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.eq(currentBytes), + Mockito.eq(updated) + ); + } + + @Test + public void testSetIfMatchTransformDoesNotApplyUpdateForMismatchedEtag() + { + final String key = "key"; + final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + final TestConfig current = new TestConfig("v1", "current", 1); + final ConfigSerde serde = jacksonConfigManager.create(TestConfig.class, null); + final byte[] currentBytes = serde.serialize(current); + final AtomicBoolean updateCalled = new AtomicBoolean(false); + + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true); + Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes); + + final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch( + key, + ConfigEtag.compute("stale".getBytes(java.nio.charset.StandardCharsets.UTF_8)), + TestConfig.class, + null, + currentValue -> { + updateCalled.set(true); + return currentValue; + }, + auditInfo + ); + + Assertions.assertFalse(result.isOk()); + Assertions.assertTrue(result.isPreconditionFailed()); + Assertions.assertFalse(updateCalled.get()); + Mockito.verify(mockConfigManager, Mockito.never()).set( + Mockito.anyString(), + Mockito.any(ConfigSerde.class), + Mockito.any(byte[].class), + Mockito.any() + ); + Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class)); + } + + @Test + public void testSetIfMatchTransformPreconditionFailsWhenCompareAndSwapDisabled() + { + final String key = "key"; + final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + final AtomicBoolean updateCalled = new AtomicBoolean(false); + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(false); + + final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch( + key, + "\"etag\"", + TestConfig.class, + null, + currentValue -> { + updateCalled.set(true); + return currentValue; + }, + auditInfo + ); + + Assertions.assertFalse(result.isOk()); + Assertions.assertTrue(result.isPreconditionFailed()); + Assertions.assertFalse(updateCalled.get()); + Mockito.verify(mockConfigManager, Mockito.never()).getCurrentBytes(Mockito.anyString()); + Mockito.verify(mockConfigManager, Mockito.never()).set( + Mockito.anyString(), + Mockito.any(ConfigSerde.class), + Mockito.any(byte[].class), + Mockito.any() + ); + Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class)); + } + @Test public void testConvertByteToConfigWithNullConfigInByte() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index a047bf99070d..f76f421054d4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -26,6 +26,7 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; @@ -87,15 +88,62 @@ public CoordinatorDynamicConfig getCurrentDynamicConfig() return Preconditions.checkNotNull(dynamicConfig, "Got null config from watcher?!"); } + public CoordinatorDynamicConfig convertBytesToDynamicConfig(@Nullable byte[] bytes) + { + return jacksonConfigManager.convertByteToConfig( + bytes, + CoordinatorDynamicConfig.class, + CoordinatorDynamicConfig.builder().build() + ); + } + public ConfigManager.SetResult setDynamicConfig(CoordinatorDynamicConfig config, AuditInfo auditInfo) { - return jacksonConfigManager.set( + return setDynamicConfig(config, null, auditInfo); + } + + public ConfigManager.SetResult setDynamicConfig( + CoordinatorDynamicConfig config, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) + { + return jacksonConfigManager.setIfMatch( CoordinatorDynamicConfig.CONFIG_KEY, + ifMatchEtag, config, auditInfo ); } + public ConfigManager.SetResult updateDynamicConfig( + UnaryOperator operator, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) + { + return jacksonConfigManager.setIfMatch( + CoordinatorDynamicConfig.CONFIG_KEY, + ifMatchEtag, + CoordinatorDynamicConfig.class, + CoordinatorDynamicConfig.builder().build(), + operator, + auditInfo + ); + } + + @Nullable + public byte[] getCurrentDynamicConfigBytes() + { + return jacksonConfigManager.getCurrentBytes(CoordinatorDynamicConfig.CONFIG_KEY); + } + + @Nullable + public byte[] getCurrentCompactionConfigBytes() + { + return jacksonConfigManager.getCurrentBytes(DruidCompactionConfig.CONFIG_KEY); + } + public DruidCompactionConfig getCurrentCompactionConfig() { DruidCompactionConfig config = jacksonConfigManager.watch( @@ -120,6 +168,24 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( AuditInfo auditInfo ) { + return getAndUpdateCompactionConfig(operator, null, auditInfo); + } + + public ConfigManager.SetResult getAndUpdateCompactionConfig( + UnaryOperator operator, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) + { + if (ifMatchEtag != null && !jacksonConfigManager.isCompareAndSwapEnabled()) { + return ConfigManager.SetResult.preconditionFailed( + new IllegalStateException( + "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + + DruidCompactionConfig.CONFIG_KEY + + "]" + ) + ); + } // Fetch the bytes and use to build the current config and perform compare-and-swap. // This avoids failures in ConfigManager while updating configs previously // persisted by older versions of Druid which didn't have fields such as 'granularitySpec'. @@ -129,6 +195,11 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, DruidCompactionConfig.CONFIG_KEY ); + if (ifMatchEtag != null && !ConfigEtag.matches(ifMatchEtag, currentBytes)) { + return ConfigManager.SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed for compaction config") + ); + } DruidCompactionConfig current = convertBytesToCompactionConfig(currentBytes); DruidCompactionConfig updated = operator.apply(current); @@ -144,7 +215,7 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( } } - public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes) + public DruidCompactionConfig convertBytesToCompactionConfig(@Nullable byte[] bytes) { return jacksonConfigManager.convertByteToConfig( bytes, @@ -158,6 +229,16 @@ public boolean updateCompactionTaskSlots( @Nullable Integer maxCompactionTaskSlots, AuditInfo auditInfo ) + { + return updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, null, auditInfo); + } + + public boolean updateCompactionTaskSlots( + @Nullable Double compactionTaskSlotRatio, + @Nullable Integer maxCompactionTaskSlots, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) { UnaryOperator operator = current -> { final ClusterCompactionConfig currentClusterConfig = current.clusterConfig(); @@ -173,16 +254,25 @@ public boolean updateCompactionTaskSlots( return current.withClusterConfig(updatedClusterConfig); }; - return updateConfigHelper(operator, auditInfo); + return updateConfigHelper(operator, ifMatchEtag, auditInfo); + } + + public boolean updateClusterCompactionConfig( + ClusterCompactionConfig config, + AuditInfo auditInfo + ) + { + return updateClusterCompactionConfig(config, null, auditInfo); } public boolean updateClusterCompactionConfig( ClusterCompactionConfig config, + @Nullable String ifMatchEtag, AuditInfo auditInfo ) { UnaryOperator operator = current -> current.withClusterConfig(config); - return updateConfigHelper(operator, auditInfo); + return updateConfigHelper(operator, ifMatchEtag, auditInfo); } public ClusterCompactionConfig getClusterCompactionConfig() @@ -194,9 +284,18 @@ public boolean updateDatasourceCompactionConfig( DataSourceCompactionConfig config, AuditInfo auditInfo ) + { + return updateDatasourceCompactionConfig(config, null, auditInfo); + } + + public boolean updateDatasourceCompactionConfig( + DataSourceCompactionConfig config, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) { UnaryOperator callable = current -> current.withDatasourceConfig(config); - return updateConfigHelper(callable, auditInfo); + return updateConfigHelper(callable, ifMatchEtag, auditInfo); } public DataSourceCompactionConfig getDatasourceCompactionConfig(String dataSource) @@ -214,6 +313,15 @@ public boolean deleteDatasourceCompactionConfig( String dataSource, AuditInfo auditInfo ) + { + return deleteDatasourceCompactionConfig(dataSource, null, auditInfo); + } + + public boolean deleteDatasourceCompactionConfig( + String dataSource, + @Nullable String ifMatchEtag, + AuditInfo auditInfo + ) { UnaryOperator callable = current -> { final Map configs = current.dataSourceToCompactionConfigMap(); @@ -224,7 +332,7 @@ public boolean deleteDatasourceCompactionConfig( return current.withDatasourceConfigs(List.copyOf(configs.values())); }; - return updateConfigHelper(callable, auditInfo); + return updateConfigHelper(callable, ifMatchEtag, auditInfo); } public List getCompactionConfigHistory( @@ -268,14 +376,17 @@ public List getCompactionConfigHistory( private boolean updateConfigHelper( UnaryOperator configUpdateOperator, + @Nullable String ifMatchEtag, AuditInfo auditInfo ) { int attemps = 0; ConfigManager.SetResult setResult = null; + // When the caller has supplied an If-Match precondition, do not retry on CAS failure. + final int maxAttempts = ifMatchEtag != null ? 1 : MAX_UPDATE_RETRIES; try { - while (attemps < MAX_UPDATE_RETRIES) { - setResult = getAndUpdateCompactionConfig(configUpdateOperator, auditInfo); + while (attemps < maxAttempts) { + setResult = getAndUpdateCompactionConfig(configUpdateOperator, ifMatchEtag, auditInfo); if (setResult.isOk() || !setResult.isRetryable()) { break; } @@ -296,7 +407,23 @@ private boolean updateConfigHelper( if (setResult.isOk()) { return true; - } else if (setResult.getException() instanceof NoSuchElementException) { + } + + // With If-Match, a retryable CAS failure means a concurrent writer beat us + // between the precondition check and the CAS — surface as 412 too. + final boolean preconditionFailed = setResult.isPreconditionFailed() + || (ifMatchEtag != null && setResult.isRetryable()); + if (preconditionFailed) { + log.info("If-Match precondition failed on compaction config update"); + final String message = setResult.isPreconditionFailed() + ? setResult.getException().getMessage() + : "If-Match precondition failed (concurrent update) on compaction config"; + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.PRECONDITION_FAILED) + .build(message); + } + + if (setResult.getException() instanceof NoSuchElementException) { log.warn(setResult.getException(), "Update compaction config failed"); throw NotFound.exception( Throwables.getRootCause(setResult.getException()), diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java index f023b26e31c4..0b49cc3a4f88 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResource.java @@ -24,8 +24,8 @@ import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.broker.BrokerDynamicConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; @@ -41,17 +41,13 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.util.concurrent.atomic.AtomicReference; @Path("/druid/coordinator/v1/broker/config") @ResourceFilters(ConfigResourceFilter.class) public class CoordinatorBrokerConfigsResource { - private static final Logger log = new Logger(CoordinatorBrokerConfigsResource.class); - private final JacksonConfigManager configManager; private final AuditManager auditManager; - private final AtomicReference currentConfig; private final BrokerDynamicConfigSyncer brokerDynamicConfigSyncer; @Inject @@ -63,11 +59,6 @@ public CoordinatorBrokerConfigsResource( { this.configManager = configManager; this.auditManager = auditManager; - this.currentConfig = configManager.watch( - BrokerDynamicConfig.CONFIG_KEY, - BrokerDynamicConfig.class, - BrokerDynamicConfig.builder().build() - ); this.brokerDynamicConfigSyncer = brokerDynamicConfigSyncer; } @@ -75,7 +66,14 @@ public CoordinatorBrokerConfigsResource( @Produces(MediaType.APPLICATION_JSON) public Response getBrokerDynamicConfig() { - return Response.ok(currentConfig.get()).build(); + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + () -> configManager.getCurrentBytes(BrokerDynamicConfig.CONFIG_KEY), + currentBytes -> configManager.convertByteToConfig( + currentBytes, + BrokerDynamicConfig.class, + BrokerDynamicConfig.builder().build() + ) + ); } @POST @@ -86,12 +84,12 @@ public Response setBrokerDynamicConfig( ) { try { - BrokerDynamicConfig current = currentConfig.get(); - BrokerDynamicConfig newConfig = configBuilder.build(current); - - final SetResult setResult = configManager.set( + final SetResult setResult = configManager.setIfMatch( BrokerDynamicConfig.CONFIG_KEY, - newConfig, + DynamicConfigEtagHelper.getIfMatch(req), + BrokerDynamicConfig.class, + BrokerDynamicConfig.builder().build(), + configBuilder::build, AuthorizationUtils.buildAuditInfo(req) ); @@ -99,11 +97,12 @@ public Response setBrokerDynamicConfig( brokerDynamicConfigSyncer.queueBroadcastConfigToBrokers(); return Response.ok().build(); } else { - return Response.status(Response.Status.BAD_REQUEST) - .entity(ServletResourceUtils.sanitizeException(setResult.getException())) - .build(); + return DynamicConfigEtagHelper.toErrorResponse(setResult); } } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } catch (IllegalArgumentException e) { return Response.status(Response.Status.BAD_REQUEST) .entity(ServletResourceUtils.sanitizeException(e)) diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index 561670ae628b..3aa830db6ca9 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -60,8 +60,9 @@ public CoordinatorCompactionConfigsResource( @Produces(MediaType.APPLICATION_JSON) public Response getCompactionConfig() { - return ServletResourceUtils.buildReadResponse( - configManager::getCurrentCompactionConfig + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + configManager::getCurrentCompactionConfigBytes, + configManager::convertBytesToCompactionConfig ); } @@ -85,7 +86,12 @@ public Response setCompactionTaskLimit( final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, auditInfo) + () -> configManager.updateCompactionTaskSlots( + compactionTaskSlotRatio, + maxCompactionTaskSlots, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) ); } @@ -99,12 +105,13 @@ public Response addOrUpdateDatasourceCompactionConfig( { final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse(() -> { + final String ifMatch = DynamicConfigEtagHelper.getIfMatch(req); if (newConfig.getEngine() == CompactionEngine.MSQ) { throw InvalidInput.exception( "MSQ engine is supported only with supervisor-based compaction on the Overlord." ); } - return configManager.updateDatasourceCompactionConfig(newConfig, auditInfo); + return configManager.updateDatasourceCompactionConfig(newConfig, ifMatch, auditInfo); }); } @@ -142,7 +149,11 @@ public Response deleteCompactionConfig( { final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse( - () -> configManager.deleteDatasourceCompactionConfig(dataSource, auditInfo) + () -> configManager.deleteDatasourceCompactionConfig( + dataSource, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) ); } } diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java index 93feb328a8c2..da6d8adeb7be 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java @@ -22,6 +22,7 @@ import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorConfigManager; @@ -74,7 +75,10 @@ public CoordinatorDynamicConfigsResource( @Produces(MediaType.APPLICATION_JSON) public Response getDynamicConfigs() { - return Response.ok(manager.getCurrentDynamicConfig()).build(); + return DynamicConfigEtagHelper.buildReadResponseWithEtag( + manager::getCurrentDynamicConfigBytes, + manager::convertBytesToDynamicConfig + ); } // default value is used for backwards compatibility @@ -86,10 +90,9 @@ public Response setDynamicConfigs( ) { try { - CoordinatorDynamicConfig current = manager.getCurrentDynamicConfig(); - - final SetResult setResult = manager.setDynamicConfig( - dynamicConfigBuilder.build(current), + final SetResult setResult = manager.updateDynamicConfig( + dynamicConfigBuilder::build, + DynamicConfigEtagHelper.getIfMatch(req), AuthorizationUtils.buildAuditInfo(req) ); @@ -97,11 +100,12 @@ public Response setDynamicConfigs( coordinatorDynamicConfigSyncer.queueBroadcastConfigToBrokers(); return Response.ok().build(); } else { - return Response.status(Response.Status.BAD_REQUEST) - .entity(ServletResourceUtils.sanitizeException(setResult.getException())) - .build(); + return DynamicConfigEtagHelper.toErrorResponse(setResult); } } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } catch (IllegalArgumentException e) { return Response.status(Response.Status.BAD_REQUEST) .entity(ServletResourceUtils.sanitizeException(e)) diff --git a/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java new file mode 100644 index 000000000000..a2298349a62d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.base.Throwables; +import org.apache.druid.common.config.ConfigEtag; +import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; +import org.apache.druid.error.InvalidInput; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * HTTP-layer helpers for {@code ETag} / {@code If-Match} on dynamic-config endpoints. + */ +public final class DynamicConfigEtagHelper +{ + private DynamicConfigEtagHelper() + { + } + + /** + * Read the {@code If-Match} header. Returns {@code null} when absent. Throws + * 400 if present but blank (RFC 7232 §3.1 requires a non-empty value). + */ + @Nullable + public static String getIfMatch(HttpServletRequest req) + { + if (req == null) { + return null; + } + final String header = req.getHeader(HttpHeaders.IF_MATCH); + if (header == null) { + return null; + } + if (header.trim().isEmpty()) { + throw InvalidInput.exception("If-Match header must not be blank"); + } + return header; + } + + /** Attach an {@code ETag} header derived from {@code bytes}; no-op if {@code bytes} is null. */ + public static Response.ResponseBuilder withEtag(Response.ResponseBuilder builder, @Nullable byte[] bytes) + { + final String etag = ConfigEtag.compute(bytes); + if (etag != null) { + builder.header(HttpHeaders.ETAG, etag); + } + return builder; + } + + /** + * Build a read response whose entity and ETag are both derived from one read + * of the stored config bytes. + */ + public static Response buildReadResponseWithEtag( + Supplier bytesSupplier, + Function entitySupplier + ) + { + try { + final byte[] currentBytes = bytesSupplier.get(); + return withEtag(Response.ok(entitySupplier.apply(currentBytes)), currentBytes).build(); + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } + catch (Exception e) { + return ServletResourceUtils.buildErrorResponseFrom( + InternalServerError.exception(Throwables.getRootCause(e), "Unknown error occurred") + ); + } + } + + /** + * Map a failed {@link SetResult} to 412 (precondition failed) or 400. + * Callers handle the success case themselves. + */ + public static Response toErrorResponse(SetResult result) + { + final Response.Status status = result.isPreconditionFailed() + ? Response.Status.PRECONDITION_FAILED + : Response.Status.BAD_REQUEST; + return Response.status(status) + .entity(ServletResourceUtils.sanitizeException(result.getException())) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java index 543f2b7bb9bb..dd2c61c32f67 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorBrokerConfigsResourceTest.java @@ -22,17 +22,24 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.error.ErrorResponse; +import org.apache.druid.query.QueryContext; import org.apache.druid.server.broker.BrokerDynamicConfig; +import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; -import java.util.concurrent.atomic.AtomicReference; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.function.UnaryOperator; public class CoordinatorBrokerConfigsResourceTest { @@ -51,16 +58,21 @@ public void setUp() throws Exception @Test public void testGetBrokerDynamicConfig() { - BrokerDynamicConfig config = BrokerDynamicConfig.builder().build(); - AtomicReference currentConfig = new AtomicReference<>(config); - + final BrokerDynamicConfig config = BrokerDynamicConfig.builder() + .withQueryContext(QueryContext.of(Map.of("priority", 5))) + .build(); + final byte[] currentBytes = "current-broker-config".getBytes(StandardCharsets.UTF_8); + + EasyMock.expect(configManager.getCurrentBytes(BrokerDynamicConfig.CONFIG_KEY)) + .andReturn(currentBytes) + .once(); EasyMock.expect( - configManager.watch( - EasyMock.anyObject(String.class), - EasyMock.anyObject(Class.class), + configManager.convertByteToConfig( + EasyMock.aryEq(currentBytes), + EasyMock.eq(BrokerDynamicConfig.class), EasyMock.anyObject(BrokerDynamicConfig.class) ) - ).andReturn(currentConfig).once(); + ).andReturn(config).once(); EasyMock.replay(configManager, auditManager, brokerDynamicConfigSyncer); @@ -72,6 +84,7 @@ public void testGetBrokerDynamicConfig() Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(config, response.getEntity()); + Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG)); EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer); } @@ -79,17 +92,6 @@ public void testGetBrokerDynamicConfig() @Test public void testGetBrokerDynamicConfigHistory() { - BrokerDynamicConfig config = BrokerDynamicConfig.builder().build(); - AtomicReference currentConfig = new AtomicReference<>(config); - - EasyMock.expect( - configManager.watch( - EasyMock.anyObject(String.class), - EasyMock.anyObject(Class.class), - EasyMock.anyObject(BrokerDynamicConfig.class) - ) - ).andReturn(currentConfig).once(); - EasyMock.expect( auditManager.fetchAuditHistory( EasyMock.anyObject(String.class), @@ -106,7 +108,7 @@ public void testGetBrokerDynamicConfigHistory() brokerDynamicConfigSyncer ); - Response response = resource.getBrokerDynamicConfigHistory(null, 10); + final Response response = resource.getBrokerDynamicConfigHistory(null, 10); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer); @@ -115,17 +117,6 @@ public void testGetBrokerDynamicConfigHistory() @Test public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount() { - BrokerDynamicConfig config = BrokerDynamicConfig.builder().build(); - AtomicReference currentConfig = new AtomicReference<>(config); - - EasyMock.expect( - configManager.watch( - EasyMock.anyObject(String.class), - EasyMock.anyObject(Class.class), - EasyMock.anyObject(BrokerDynamicConfig.class) - ) - ).andReturn(currentConfig).once(); - EasyMock.expect( auditManager.fetchAuditHistory( EasyMock.anyObject(String.class), @@ -142,7 +133,7 @@ public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount() brokerDynamicConfigSyncer ); - Response response = resource.getBrokerDynamicConfigHistory(null, null); + final Response response = resource.getBrokerDynamicConfigHistory(null, null); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer); @@ -151,22 +142,20 @@ public void testGetBrokerDynamicConfigHistoryWithNullIntervalAndCount() @Test public void testSetBrokerDynamicConfig() { - BrokerDynamicConfig config = BrokerDynamicConfig.builder().build(); - AtomicReference currentConfig = new AtomicReference<>(config); - HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class); + final BrokerDynamicConfig currentConfig = BrokerDynamicConfig.builder() + .withQueryContext(QueryContext.of(Map.of("priority", 5))) + .build(); + final BrokerDynamicConfig.Builder updateBuilder = BrokerDynamicConfig.builder(); + final Capture> updateCapture = EasyMock.newCapture(); + final HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class); EasyMock.expect( - configManager.watch( - EasyMock.anyObject(String.class), - EasyMock.anyObject(Class.class), - EasyMock.anyObject(BrokerDynamicConfig.class) - ) - ).andReturn(currentConfig).once(); - - EasyMock.expect( - configManager.set( - EasyMock.anyObject(String.class), + configManager.setIfMatch( + EasyMock.eq(BrokerDynamicConfig.CONFIG_KEY), + (String) EasyMock.isNull(), + EasyMock.eq(BrokerDynamicConfig.class), EasyMock.anyObject(BrokerDynamicConfig.class), + EasyMock.capture(updateCapture), EasyMock.anyObject(AuditInfo.class) ) ).andReturn(SetResult.ok()).once(); @@ -182,9 +171,36 @@ public void testSetBrokerDynamicConfig() brokerDynamicConfigSyncer ); - Response response = resource.setBrokerDynamicConfig(BrokerDynamicConfig.builder(), request); + final Response response = resource.setBrokerDynamicConfig(updateBuilder, request); Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(updateBuilder.build(currentConfig), updateCapture.getValue().apply(currentConfig)); EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer); } + + @Test + public void testSetBrokerDynamicConfigWithBlankIfMatchReturnsBadRequest() + { + final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class); + + EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once(); + + EasyMock.replay(configManager, auditManager, brokerDynamicConfigSyncer, request); + + final CoordinatorBrokerConfigsResource resource = new CoordinatorBrokerConfigsResource( + configManager, + auditManager, + brokerDynamicConfigSyncer + ); + + final Response response = resource.setBrokerDynamicConfig(BrokerDynamicConfig.builder(), request); + Assert.assertEquals(400, response.getStatus()); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "If-Match header must not be blank", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + + EasyMock.verify(configManager, auditManager, brokerDynamicConfigSyncer, request); + } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index e96d800bec87..c6a0db816957 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientMSQContext; +import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.config.TestConfigManagerConfig; @@ -58,7 +59,9 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -109,6 +112,26 @@ public void testGetDefaultClusterConfig() Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine()); } + @Test + public void testGetCompactionConfigDerivesBodyAndEtagFromSameBytes() + { + final CoordinatorConfigManager mockConfigManager = Mockito.mock(CoordinatorConfigManager.class); + final CoordinatorCompactionConfigsResource mockResource = + new CoordinatorCompactionConfigsResource(mockConfigManager); + final byte[] currentBytes = "current-compaction-config".getBytes(StandardCharsets.UTF_8); + final DruidCompactionConfig expectedConfig = DruidCompactionConfig.empty(); + + Mockito.when(mockConfigManager.getCurrentCompactionConfigBytes()).thenReturn(currentBytes); + Mockito.when(mockConfigManager.convertBytesToCompactionConfig(currentBytes)).thenReturn(expectedConfig); + + final Response response = mockResource.getCompactionConfig(); + + verifyStatus(Response.Status.OK, response); + Assert.assertEquals(expectedConfig, response.getEntity()); + Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG)); + Mockito.verify(mockConfigManager, Mockito.never()).getCurrentCompactionConfig(); + } + @Test public void testSetCompactionTaskLimit() { @@ -180,6 +203,47 @@ public void testAddDatasourceConfigWithMSQEngineIsInvalid() ); } + @Test + public void testAddDatasourceConfigWithBlankIfMatchReturnsBadRequest() + { + Mockito.when(mockHttpServletRequest.getHeader(HttpHeaders.IF_MATCH)).thenReturn(" "); + + final DataSourceCompactionConfig newDatasourceConfig + = InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(); + final Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest); + + verifyStatus(Response.Status.BAD_REQUEST, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "If-Match header must not be blank", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + } + + @Test + public void testAddDatasourceConfigWithIfMatchReturnsPreconditionFailedWhenCompareAndSwapDisabled() + { + configManager.setCompareAndSwapEnabled(false); + Mockito.when(mockHttpServletRequest.getHeader(HttpHeaders.IF_MATCH)).thenReturn("\"etag\""); + + final DataSourceCompactionConfig newDatasourceConfig + = InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(); + final Response response = resource.addOrUpdateDatasourceCompactionConfig( + newDatasourceConfig, + mockHttpServletRequest + ); + + verifyStatus(Response.Status.PRECONDITION_FAILED, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + + DruidCompactionConfig.CONFIG_KEY + + "]", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + Assert.assertTrue(configManager.getCurrentCompactionConfig().getCompactionConfigs().isEmpty()); + } + @Test public void testUpdateDatasourceConfig() { @@ -417,6 +481,7 @@ public int removeAuditLogsOlderThan(long timestamp) private static class TestCoordinatorConfigManager extends CoordinatorConfigManager { private final ConfigManager delegate; + private final TestConfigManagerConfig configManagerConfig; private int numUpdateAttempts; private ConfigManager.SetResult configUpdateResult; @@ -432,15 +497,17 @@ public String getConfigTable() }; final TestDBConnector dbConnector = new TestDBConnector(); + final TestConfigManagerConfig configManagerConfig = new TestConfigManagerConfig(); final ConfigManager configManager = new ConfigManager( dbConnector, Suppliers.ofInstance(tablesConfig), - Suppliers.ofInstance(new TestConfigManagerConfig()) + Suppliers.ofInstance(configManagerConfig) ); return new TestCoordinatorConfigManager( new JacksonConfigManager(configManager, OBJECT_MAPPER, auditManager), configManager, + configManagerConfig, auditManager, dbConnector, tablesConfig @@ -450,6 +517,7 @@ public String getConfigTable() TestCoordinatorConfigManager( JacksonConfigManager jackson, ConfigManager configManager, + TestConfigManagerConfig configManagerConfig, AuditManager auditManager, TestDBConnector dbConnector, MetadataStorageTablesConfig tablesConfig @@ -457,17 +525,24 @@ public String getConfigTable() { super(jackson, dbConnector, tablesConfig, auditManager); this.delegate = configManager; + this.configManagerConfig = configManagerConfig; + } + + void setCompareAndSwapEnabled(boolean enabled) + { + configManagerConfig.enableCompareAndSwap = enabled; } @Override public ConfigManager.SetResult getAndUpdateCompactionConfig( UnaryOperator operator, + String ifMatchEtag, AuditInfo auditInfo ) { ++numUpdateAttempts; if (configUpdateResult == null) { - return super.getAndUpdateCompactionConfig(operator, auditInfo); + return super.getAndUpdateCompactionConfig(operator, ifMatchEtag, auditInfo); } else { return configUpdateResult; } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java index 87d99b0759bd..4e7c6a17e2eb 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java @@ -21,17 +21,27 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigEtag; +import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.error.ErrorResponse; import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.ServerCloneStatus; +import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.function.UnaryOperator; public class CoordinatorDynamicConfigsResourceTest { @@ -49,6 +59,34 @@ public void setUp() throws Exception cloneStatusManager = EasyMock.createStrictMock(CloneStatusManager.class); } + @Test + public void testGetDynamicConfigsDerivesBodyAndEtagFromSameBytes() + { + final byte[] currentBytes = "current-dynamic-config".getBytes(StandardCharsets.UTF_8); + final CoordinatorDynamicConfig expectedConfig = CoordinatorDynamicConfig.builder() + .withPauseCoordination(true) + .build(); + + EasyMock.expect(manager.getCurrentDynamicConfigBytes()).andReturn(currentBytes).once(); + EasyMock.expect(manager.convertBytesToDynamicConfig(EasyMock.aryEq(currentBytes))) + .andReturn(expectedConfig) + .once(); + EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager); + + final Response response = new CoordinatorDynamicConfigsResource( + manager, + auditManager, + coordinatorDynamicConfigSyncer, + cloneStatusManager + ).getDynamicConfigs(); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(expectedConfig, response.getEntity()); + Assert.assertEquals(ConfigEtag.compute(currentBytes), response.getMetadata().getFirst(HttpHeaders.ETAG)); + + EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager); + } + @Test public void testGetBrokerStatus() { @@ -107,4 +145,67 @@ public void testGetCloneStatus() Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ServerCloneStatus.unknown("hist4", "hist3"), response.getEntity()); } + + @Test + public void testSetDynamicConfigsUsesTransformUpdate() + { + final String etag = "\"etag\""; + final HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class); + final CoordinatorDynamicConfig.Builder updateBuilder = CoordinatorDynamicConfig.builder() + .withPauseCoordination(true); + final CoordinatorDynamicConfig currentConfig = CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(5) + .build(); + final Capture> updateCapture = EasyMock.newCapture(); + + EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(etag).once(); + EasyMock.expect( + manager.updateDynamicConfig( + EasyMock.capture(updateCapture), + EasyMock.eq(etag), + EasyMock.anyObject(AuditInfo.class) + ) + ).andReturn(SetResult.ok()).once(); + coordinatorDynamicConfigSyncer.queueBroadcastConfigToBrokers(); + EasyMock.expectLastCall().once(); + EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request); + + final Response response = new CoordinatorDynamicConfigsResource( + manager, + auditManager, + coordinatorDynamicConfigSyncer, + cloneStatusManager + ).setDynamicConfigs(updateBuilder, request); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(updateBuilder.build(currentConfig), updateCapture.getValue().apply(currentConfig)); + + EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request); + } + + @Test + public void testSetDynamicConfigsWithBlankIfMatchReturnsBadRequest() + { + final HttpServletRequest request = EasyMock.createStrictMock(HttpServletRequest.class); + + EasyMock.expect(request.getHeader(HttpHeaders.IF_MATCH)).andReturn(" ").once(); + EasyMock.replay(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request); + + final CoordinatorDynamicConfigsResource resource = new CoordinatorDynamicConfigsResource( + manager, + auditManager, + coordinatorDynamicConfigSyncer, + cloneStatusManager + ); + + final Response response = resource.setDynamicConfigs(CoordinatorDynamicConfig.builder(), request); + Assert.assertEquals(400, response.getStatus()); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "If-Match header must not be blank", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + + EasyMock.verify(manager, auditManager, coordinatorDynamicConfigSyncer, cloneStatusManager, request); + } } diff --git a/website/.spelling b/website/.spelling index c85463563e13..3e4fda27b7ec 100644 --- a/website/.spelling +++ b/website/.spelling @@ -105,6 +105,7 @@ ECS EMR EMRFS ETL +ETag Elasticsearch Enums FIRST_VALUE From 133da6d55c38e4088e2d5e2461ae60f06c37336d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Tue, 2 Jun 2026 19:52:19 -0700 Subject: [PATCH 2/2] more stuff --- .../http/OverlordCompactionResource.java | 21 ++-- .../http/OverlordCompactionResourceTest.java | 3 +- .../druid/common/config/ConfigEtag.java | 25 ++-- .../druid/common/config/ConfigManager.java | 42 ++++--- .../common/config/JacksonConfigManager.java | 24 ++-- .../config/JacksonConfigManagerTest.java | 29 +++++ .../coordinator/CoordinatorConfigManager.java | 107 ++++++++++-------- .../CoordinatorCompactionConfigsResource.java | 63 +++++++---- .../server/http/DynamicConfigEtagHelper.java | 14 +++ ...rdinatorCompactionConfigsResourceTest.java | 5 +- 10 files changed, 215 insertions(+), 118 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java index 218968136021..e7c948c259bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java @@ -122,14 +122,19 @@ public Response updateClusterCompactionConfig( @Context HttpServletRequest req ) { - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); - return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateClusterCompactionConfig( - updatePayload, - DynamicConfigEtagHelper.getIfMatch(req), - auditInfo - ) - ); + try { + final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); + return DynamicConfigEtagHelper.buildSetResultUpdateResponse( + configManager.updateClusterCompactionConfig( + updatePayload, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) + ); + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java index a74e811697c8..216e084019f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java @@ -24,6 +24,7 @@ import org.apache.druid.audit.AuditManager; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.common.config.ConfigEtag; +import org.apache.druid.common.config.ConfigManager; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ErrorResponse; import org.apache.druid.indexer.CompactionEngine; @@ -160,7 +161,7 @@ public void test_updateClusterConfig() EasyMock.anyObject(), EasyMock.anyObject() )) - .andReturn(true) + .andReturn(ConfigManager.SetResult.ok()) .once(); setupMockRequestForAudit(); diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java b/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java index 310c2ed65a70..fc6fd15205ea 100644 --- a/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigEtag.java @@ -32,6 +32,14 @@ public final class ConfigEtag { private static final int ETAG_HASH_BYTES = 16; + private static final ThreadLocal SHA_256 = ThreadLocal.withInitial(() -> { + try { + return MessageDigest.getInstance("SHA-256"); + } + catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 not available", e); + } + }); private ConfigEtag() { @@ -48,16 +56,11 @@ public static String compute(@Nullable byte[] bytes) if (bytes == null) { return null; } - try { - MessageDigest md = MessageDigest.getInstance("SHA-256"); - byte[] full = md.digest(bytes); - byte[] truncated = Arrays.copyOf(full, ETAG_HASH_BYTES); - return "\"" + Base64.getUrlEncoder().withoutPadding().encodeToString(truncated) + "\""; - } - catch (NoSuchAlgorithmException e) { - // SHA-256 is required by every JRE. - throw new IllegalStateException("SHA-256 not available", e); - } + final MessageDigest md = SHA_256.get(); + md.reset(); + final byte[] full = md.digest(bytes); + final byte[] truncated = Arrays.copyOf(full, ETAG_HASH_BYTES); + return "\"" + Base64.getUrlEncoder().withoutPadding().encodeToString(truncated) + "\""; } /** @@ -65,7 +68,7 @@ public static String compute(@Nullable byte[] bytes) * Wildcard {@code *} matches any existing value. A comma-separated list is * satisfied if any element matches. */ - public static boolean matches(String ifMatchHeader, @Nullable byte[] currentBytes) + public static boolean matches(@Nullable String ifMatchHeader, @Nullable byte[] currentBytes) { if (ifMatchHeader == null) { return true; diff --git a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java index 756405717c64..5e36947730cd 100644 --- a/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/ConfigManager.java @@ -283,11 +283,27 @@ private MetadataCASUpdate createMetadataCASUpdate( public static class SetResult { - private static final SetResult SUCCESS = new SetResult(null, false, false); - private final Exception exception; + /** + * Outcome of a {@link #set} attempt. Mutually exclusive, so callers never + * have to reconcile overlapping flags. + */ + private enum Status + { + /** The write committed. */ + OK, + /** A concurrent writer won the CAS; the caller may re-read and retry. */ + RETRYABLE, + /** A client-supplied precondition (e.g. {@code If-Match}) failed; maps to HTTP 412. */ + PRECONDITION_FAILED, + /** A non-retryable failure (bad input, manager not started, etc.). */ + FAILURE + } - private final boolean retryableException; - private final boolean preconditionFailed; + private static final SetResult SUCCESS = new SetResult(Status.OK, null); + + private final Status status; + @Nullable + private final Exception exception; public static SetResult ok() { @@ -296,42 +312,42 @@ public static SetResult ok() public static SetResult failure(Exception e) { - return new SetResult(e, false, false); + return new SetResult(Status.FAILURE, e); } public static SetResult retryableFailure(Exception e) { - return new SetResult(e, true, false); + return new SetResult(Status.RETRYABLE, e); } /** Client-supplied precondition (e.g. {@code If-Match}) failed; maps to HTTP 412. */ public static SetResult preconditionFailed(Exception e) { - return new SetResult(e, false, true); + return new SetResult(Status.PRECONDITION_FAILED, e); } - private SetResult(@Nullable Exception exception, boolean retryableException, boolean preconditionFailed) + private SetResult(Status status, @Nullable Exception exception) { + this.status = status; this.exception = exception; - this.retryableException = retryableException; - this.preconditionFailed = preconditionFailed; } public boolean isOk() { - return exception == null; + return status == Status.OK; } public boolean isRetryable() { - return retryableException; + return status == Status.RETRYABLE; } public boolean isPreconditionFailed() { - return preconditionFailed; + return status == Status.PRECONDITION_FAILED; } + @Nullable public Exception getException() { return exception; diff --git a/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java index 1811cb0232a2..f9560475487a 100644 --- a/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java +++ b/processing/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java @@ -177,15 +177,7 @@ public SetResult setIfMatch( new IllegalStateException("If-Match precondition failed for key[" + key + "]") ); } - final SetResult result = set(key, currentBytes, newValue, auditInfo); - // Retryable CAS failure here = concurrent writer between our read and CAS; - // surface as precondition failed since the caller asked us to reject that. - if (!result.isOk() && result.isRetryable()) { - return SetResult.preconditionFailed( - new IllegalStateException("If-Match precondition failed (concurrent update) for key[" + key + "]") - ); - } - return result; + return casConflictAsPreconditionFailed(set(key, currentBytes, newValue, auditInfo), key); } /** @@ -225,8 +217,18 @@ public SetResult setIfMatch( if (ifMatchEtag == null) { return set(key, newValue, auditInfo); } - final SetResult result = set(key, currentBytes, newValue, auditInfo); - if (!result.isOk() && result.isRetryable()) { + return casConflictAsPreconditionFailed(set(key, currentBytes, newValue, auditInfo), key); + } + + /** + * Maps a CAS-conflict ({@link SetResult#isRetryable() retryable}) outcome to a + * precondition failure. A conditional write that loses the CAS means another + * writer committed between our read and write, so the supplied {@code If-Match} + * no longer describes the stored value — the caller must re-read and retry. + */ + private static SetResult casConflictAsPreconditionFailed(SetResult result, String key) + { + if (result.isRetryable()) { return SetResult.preconditionFailed( new IllegalStateException("If-Match precondition failed (concurrent update) for key[" + key + "]") ); diff --git a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java index 7da2f4aa0bca..0941822664fd 100644 --- a/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java +++ b/processing/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java @@ -286,6 +286,35 @@ public void testSetIfMatchTransformPreconditionFailsWhenCompareAndSwapDisabled() Mockito.verify(mockAuditManager, Mockito.never()).doAudit(Mockito.any(AuditEntry.class)); } + @Test + public void testSetIfMatchCasConflictReturnsPreconditionFailed() + { + final String key = "key"; + final TestConfig val = new TestConfig("v", "s", 1); + final AuditInfo auditInfo = new AuditInfo("a", "i", "c", "ip"); + final byte[] currentBytes = "current".getBytes(java.nio.charset.StandardCharsets.UTF_8); + + Mockito.when(mockConfigManager.isCompareAndSwapEnabled()).thenReturn(true); + Mockito.when(mockConfigManager.getCurrentBytes(key)).thenReturn(currentBytes); + Mockito.when(mockConfigManager.set( + Mockito.eq(key), + Mockito.any(ConfigSerde.class), + Mockito.eq(currentBytes), + Mockito.eq(val) + )).thenReturn(ConfigManager.SetResult.retryableFailure(new IllegalStateException("Config value has changed"))); + + final ConfigManager.SetResult result = jacksonConfigManager.setIfMatch( + key, + ConfigEtag.compute(currentBytes), + val, + auditInfo + ); + + Assertions.assertFalse(result.isOk()); + Assertions.assertTrue(result.isPreconditionFailed()); + Assertions.assertFalse(result.isRetryable()); + } + @Test public void testConvertByteToConfigWithNullConfigInByte() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index f76f421054d4..e906c846c1eb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -97,25 +97,6 @@ public CoordinatorDynamicConfig convertBytesToDynamicConfig(@Nullable byte[] byt ); } - public ConfigManager.SetResult setDynamicConfig(CoordinatorDynamicConfig config, AuditInfo auditInfo) - { - return setDynamicConfig(config, null, auditInfo); - } - - public ConfigManager.SetResult setDynamicConfig( - CoordinatorDynamicConfig config, - @Nullable String ifMatchEtag, - AuditInfo auditInfo - ) - { - return jacksonConfigManager.setIfMatch( - CoordinatorDynamicConfig.CONFIG_KEY, - ifMatchEtag, - config, - auditInfo - ); - } - public ConfigManager.SetResult updateDynamicConfig( UnaryOperator operator, @Nullable String ifMatchEtag, @@ -205,14 +186,22 @@ public ConfigManager.SetResult getAndUpdateCompactionConfig( if (current.equals(updated)) { return ConfigManager.SetResult.ok(); - } else { - return jacksonConfigManager.set( - DruidCompactionConfig.CONFIG_KEY, - currentBytes, - updated, - auditInfo + } + final ConfigManager.SetResult result = jacksonConfigManager.set( + DruidCompactionConfig.CONFIG_KEY, + currentBytes, + updated, + auditInfo + ); + // Under If-Match, a lost CAS means a concurrent writer committed between our + // read and write: the precondition no longer holds, so report it as such + // rather than as a retryable failure (conditional writes must not auto-retry). + if (ifMatchEtag != null && result.isRetryable()) { + return ConfigManager.SetResult.preconditionFailed( + new IllegalStateException("If-Match precondition failed (concurrent update) for compaction config") ); } + return result; } public DruidCompactionConfig convertBytesToCompactionConfig(@Nullable byte[] bytes) @@ -230,10 +219,12 @@ public boolean updateCompactionTaskSlots( AuditInfo auditInfo ) { - return updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, null, auditInfo); + return verifyCompactionUpdateSucceeded( + updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, null, auditInfo) + ); } - public boolean updateCompactionTaskSlots( + public ConfigManager.SetResult updateCompactionTaskSlots( @Nullable Double compactionTaskSlotRatio, @Nullable Integer maxCompactionTaskSlots, @Nullable String ifMatchEtag, @@ -262,10 +253,10 @@ public boolean updateClusterCompactionConfig( AuditInfo auditInfo ) { - return updateClusterCompactionConfig(config, null, auditInfo); + return verifyCompactionUpdateSucceeded(updateClusterCompactionConfig(config, null, auditInfo)); } - public boolean updateClusterCompactionConfig( + public ConfigManager.SetResult updateClusterCompactionConfig( ClusterCompactionConfig config, @Nullable String ifMatchEtag, AuditInfo auditInfo @@ -285,10 +276,10 @@ public boolean updateDatasourceCompactionConfig( AuditInfo auditInfo ) { - return updateDatasourceCompactionConfig(config, null, auditInfo); + return verifyCompactionUpdateSucceeded(updateDatasourceCompactionConfig(config, null, auditInfo)); } - public boolean updateDatasourceCompactionConfig( + public ConfigManager.SetResult updateDatasourceCompactionConfig( DataSourceCompactionConfig config, @Nullable String ifMatchEtag, AuditInfo auditInfo @@ -314,10 +305,10 @@ public boolean deleteDatasourceCompactionConfig( AuditInfo auditInfo ) { - return deleteDatasourceCompactionConfig(dataSource, null, auditInfo); + return verifyCompactionUpdateSucceeded(deleteDatasourceCompactionConfig(dataSource, null, auditInfo)); } - public boolean deleteDatasourceCompactionConfig( + public ConfigManager.SetResult deleteDatasourceCompactionConfig( String dataSource, @Nullable String ifMatchEtag, AuditInfo auditInfo @@ -374,23 +365,24 @@ public List getCompactionConfigHistory( } } - private boolean updateConfigHelper( + private ConfigManager.SetResult updateConfigHelper( UnaryOperator configUpdateOperator, @Nullable String ifMatchEtag, AuditInfo auditInfo ) { - int attemps = 0; + int attempts = 0; ConfigManager.SetResult setResult = null; - // When the caller has supplied an If-Match precondition, do not retry on CAS failure. - final int maxAttempts = ifMatchEtag != null ? 1 : MAX_UPDATE_RETRIES; + // Only an unconditional write can be retried: getAndUpdateCompactionConfig converts a lost + // CAS under If-Match into a (non-retryable) precondition failure, so conditional writes exit + // this loop on the first attempt rather than silently retrying past the caller's precondition. try { - while (attemps < maxAttempts) { + while (attempts < MAX_UPDATE_RETRIES) { setResult = getAndUpdateCompactionConfig(configUpdateOperator, ifMatchEtag, auditInfo); if (setResult.isOk() || !setResult.isRetryable()) { break; } - attemps++; + attempts++; updateRetryDelay(); } } @@ -405,24 +397,39 @@ private boolean updateConfigHelper( ); } + if (setResult.isOk() || setResult.isPreconditionFailed()) { + return setResult; + } + + // getAndUpdateCompactionConfig already converts a lost CAS under If-Match into + // a precondition failure, so a retryable result here is only a genuine + // (non-conditional) CAS conflict that exhausted its retries. + if (setResult.getException() instanceof NoSuchElementException) { + log.warn(setResult.getException(), "Update compaction config failed"); + throw NotFound.exception( + Throwables.getRootCause(setResult.getException()), + "Compaction config does not exist" + ); + } else { + log.warn(setResult.getException(), "Update compaction config failed"); + throw InternalServerError.exception( + Throwables.getRootCause(setResult.getException()), + "Failed to perform operation on compaction config" + ); + } + } + + private boolean verifyCompactionUpdateSucceeded(ConfigManager.SetResult setResult) + { if (setResult.isOk()) { return true; } - - // With If-Match, a retryable CAS failure means a concurrent writer beat us - // between the precondition check and the CAS — surface as 412 too. - final boolean preconditionFailed = setResult.isPreconditionFailed() - || (ifMatchEtag != null && setResult.isRetryable()); - if (preconditionFailed) { + if (setResult.isPreconditionFailed()) { log.info("If-Match precondition failed on compaction config update"); - final String message = setResult.isPreconditionFailed() - ? setResult.getException().getMessage() - : "If-Match precondition failed (concurrent update) on compaction config"; throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.PRECONDITION_FAILED) - .build(message); + .build(setResult.getException().getMessage()); } - if (setResult.getException() instanceof NoSuchElementException) { log.warn(setResult.getException(), "Update compaction config failed"); throw NotFound.exception( diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index 3aa830db6ca9..d9bf398abeaf 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -22,6 +22,7 @@ import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditInfo; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.server.coordinator.CoordinatorConfigManager; @@ -84,15 +85,20 @@ public Response setCompactionTaskLimit( return ServletResourceUtils.buildUpdateResponse(() -> true); } - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); - return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateCompactionTaskSlots( - compactionTaskSlotRatio, - maxCompactionTaskSlots, - DynamicConfigEtagHelper.getIfMatch(req), - auditInfo - ) - ); + try { + final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); + return DynamicConfigEtagHelper.buildSetResultUpdateResponse( + configManager.updateCompactionTaskSlots( + compactionTaskSlotRatio, + maxCompactionTaskSlots, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) + ); + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } } @POST @@ -103,16 +109,24 @@ public Response addOrUpdateDatasourceCompactionConfig( @Context HttpServletRequest req ) { - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); - return ServletResourceUtils.buildUpdateResponse(() -> { - final String ifMatch = DynamicConfigEtagHelper.getIfMatch(req); + try { + final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); if (newConfig.getEngine() == CompactionEngine.MSQ) { throw InvalidInput.exception( "MSQ engine is supported only with supervisor-based compaction on the Overlord." ); } - return configManager.updateDatasourceCompactionConfig(newConfig, ifMatch, auditInfo); - }); + return DynamicConfigEtagHelper.buildSetResultUpdateResponse( + configManager.updateDatasourceCompactionConfig( + newConfig, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) + ); + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } } @GET @@ -147,13 +161,18 @@ public Response deleteCompactionConfig( @Context HttpServletRequest req ) { - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); - return ServletResourceUtils.buildUpdateResponse( - () -> configManager.deleteDatasourceCompactionConfig( - dataSource, - DynamicConfigEtagHelper.getIfMatch(req), - auditInfo - ) - ); + try { + final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); + return DynamicConfigEtagHelper.buildSetResultUpdateResponse( + configManager.deleteDatasourceCompactionConfig( + dataSource, + DynamicConfigEtagHelper.getIfMatch(req), + auditInfo + ) + ); + } + catch (DruidException e) { + return ServletResourceUtils.buildErrorResponseFrom(e); + } } } diff --git a/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java index a2298349a62d..60cbed026c9a 100644 --- a/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java +++ b/server/src/main/java/org/apache/druid/server/http/DynamicConfigEtagHelper.java @@ -22,6 +22,8 @@ import com.google.common.base.Throwables; import org.apache.druid.common.config.ConfigEtag; import org.apache.druid.common.config.ConfigManager.SetResult; + +import java.util.Map; import org.apache.druid.error.DruidException; import org.apache.druid.error.InternalServerError; import org.apache.druid.error.InvalidInput; @@ -108,4 +110,16 @@ public static Response toErrorResponse(SetResult result) .entity(ServletResourceUtils.sanitizeException(result.getException())) .build(); } + + /** + * Build a write response from a {@link SetResult}, using the same success and + * error shapes as other dynamic-config endpoints. + */ + public static Response buildSetResultUpdateResponse(SetResult result) + { + if (result.isOk()) { + return Response.ok(Map.of("success", true)).build(); + } + return toErrorResponse(result); + } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index c6a0db816957..c3538b977c19 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -234,12 +234,13 @@ public void testAddDatasourceConfigWithIfMatchReturnsPreconditionFailedWhenCompa ); verifyStatus(Response.Status.PRECONDITION_FAILED, response); - Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + @SuppressWarnings("unchecked") + final Map entity = (Map) response.getEntity(); Assert.assertEquals( "If-Match requires druid.manager.config.enableCompareAndSwap to be enabled for key[" + DruidCompactionConfig.CONFIG_KEY + "]", - ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + entity.get("error") ); Assert.assertTrue(configManager.getCurrentCompactionConfig().getCompactionConfigs().isEmpty()); }