From 051f9e41169f8982af78b3d4efa7b9c28c367405 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Mon, 29 Jun 2026 16:32:27 -0700 Subject: [PATCH] feat: add k8s job task context resource context --- docs/development/extensions-core/k8s-jobs.md | 65 +++++++ .../overlord/common/DruidK8sConstants.java | 1 + .../overlord/taskadapter/K8sTaskAdapter.java | 12 +- .../taskadapter/K8sTaskResourceContext.java | 177 ++++++++++++++++++ .../MultiContainerTaskAdapter.java | 2 +- .../taskadapter/PodTemplateTaskAdapter.java | 17 +- .../SingleContainerTaskAdapter.java | 2 +- .../taskadapter/K8sTaskAdapterTest.java | 81 ++++++++ .../PodTemplateTaskAdapterTest.java | 50 +++++ 9 files changed, 397 insertions(+), 10 deletions(-) create mode 100644 extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskResourceContext.java diff --git a/docs/development/extensions-core/k8s-jobs.md b/docs/development/extensions-core/k8s-jobs.md index b65a7bb496bd..8caab8e4c564 100644 --- a/docs/development/extensions-core/k8s-jobs.md +++ b/docs/development/extensions-core/k8s-jobs.md @@ -775,6 +775,71 @@ Set the `podTemplateSelectionKey` key in a task's context to pick a configured p This is gated by the runtime property `druid.indexer.runner.allowTaskPodTemplateSelection`, which defaults to `false`. If the key doesn't match any configured template, the task fails to launch. +##### Override task pod resources via context + +Set the `k8sTaskResources` key in a task's context to override Kubernetes resource requests and limits for the task +container. Resource names are Kubernetes resource names, so this supports standard resources such as `cpu`, `memory`, +and `ephemeral-storage`, as well as cluster-specific extended resources. + +Resource entries at the top level are applied to both requests and limits. Use the `requests` and `limits` objects when +the request and limit should differ. + +```json +"context": { + "k8sTaskResources": { + "cpu": "2", + "memory": "8Gi", + "ephemeral-storage": "100Gi", + "requests": { + "example.com/custom-resource": "1" + }, + "limits": { + "example.com/custom-resource": "1" + } + } +} +``` + +Use `byTaskType` to override resources for specific task types. Druid applies the generic resource entries first, then +applies the section matching the task's concrete `type`. This is useful for `index_parallel`, whose supervisor task +creates subtasks with different task types. The subtask context inherits the supervisor context, so each subtask can +resolve its own resource override when the Kubernetes task runner launches it. + +```json +"context": { + "k8sTaskResources": { + "cpu": "1", + "memory": "4Gi", + "byTaskType": { + "partial_index_generate": { + "cpu": "2", + "memory": "8Gi", + "ephemeral-storage": "200Gi" + }, + "partial_index_generic_merge": { + "requests": { + "cpu": "500m", + "memory": "2Gi" + }, + "limits": { + "cpu": "1", + "memory": "4Gi" + } + } + } + } +} +``` + +Common parallel indexing task types include `index_parallel`, `single_phase_sub_task`, `partial_dimension_cardinality`, +`partial_dimension_distribution`, `partial_index_generate`, `partial_range_index_generate`, and +`partial_index_generic_merge`. + +For `overlordSingleContainer` and `overlordMultiContainer`, Druid still computes default `cpu` and `memory` resources +from `druid.indexer.runner.cpuCoreInMicro` and `druid.indexer.runner.javaOptsArray`; `k8sTaskResources` overrides only +the resources it names. For `customTemplateAdapter`, Druid applies the overrides to the first container in the selected +pod template. Existing resource entries that are not overridden are preserved. + #### Running Task Pods in Another Namespace It is possible to run task pods in a different namespace from the rest of your Druid cluster. diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index f0eba8c63e67..da7cf7db6ed8 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -47,6 +47,7 @@ public class DruidK8sConstants public static final String DRUID_LABEL_PREFIX = "druid."; public static final String BASE_TEMPLATE_NAME = "base"; public static final String TASK_CONTEXT_POD_TEMPLATE_SELECTION_KEY = "podTemplateSelectionKey"; + public static final String TASK_CONTEXT_RESOURCES_KEY = "k8sTaskResources"; public static final long MAX_ENV_VARIABLE_KBS = 130048; // 127 KB public static final ImmutableList BLACKLISTED_PEON_POD_ERROR_MESSAGES = ImmutableList.of( diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index cef4588e08ac..461c8e8fb915 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -343,6 +343,7 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context protected Container setupMainContainer( PodSpec podSpec, + Task task, PeonCommandContext context, long containerSize, String taskContents @@ -374,6 +375,7 @@ protected Container setupMainContainer( containerSize, context.getCpuMicroCore() ); + requirements = K8sTaskResourceContext.applyTaskResourceOverrides(requirements, task); mainContainer.setResources(requirements); return mainContainer; } @@ -517,26 +519,26 @@ private List generateCommand(Task task) @VisibleForTesting static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore) { - Map resourceMap = new HashMap<>(); + final Map resourceMap = new HashMap<>(); resourceMap.put( "cpu", new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m") ); resourceMap.put("memory", new Quantity(String.valueOf(containerSize))); - ResourceRequirementsBuilder result = new ResourceRequirementsBuilder(); + final ResourceRequirementsBuilder result = new ResourceRequirementsBuilder(); if (requirements != null) { if (requirements.getRequests() == null || requirements.getRequests().isEmpty()) { - requirements.setRequests(resourceMap); + requirements.setRequests(new HashMap<>(resourceMap)); } else { requirements.getRequests().putAll(resourceMap); } if (requirements.getLimits() == null || requirements.getLimits().isEmpty()) { - requirements.setLimits(resourceMap); + requirements.setLimits(new HashMap<>(resourceMap)); } else { requirements.getLimits().putAll(resourceMap); } } else { - requirements = result.withRequests(resourceMap).withLimits(resourceMap).build(); + requirements = result.withRequests(new HashMap<>(resourceMap)).withLimits(new HashMap<>(resourceMap)).build(); } return requirements; } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskResourceContext.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskResourceContext.java new file mode 100644 index 000000000000..c72e554c65c4 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskResourceContext.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.taskadapter; + +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.k8s.overlord.common.DruidK8sConstants; + +import java.util.HashMap; +import java.util.Map; + +class K8sTaskResourceContext +{ + static final String REQUESTS_KEY = "requests"; + static final String LIMITS_KEY = "limits"; + static final String BY_TASK_TYPE_KEY = "byTaskType"; + + static ResourceRequirements applyTaskResourceOverrides(ResourceRequirements requirements, Task task) + { + final Object rawResourceContext = task.getContextValue(DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY); + if (rawResourceContext == null) { + return requirements; + } + + final Map resourceContext = asMap(rawResourceContext, DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY); + final ResourceRequirements result = applyResourceConfig( + requirements, + resourceContext, + DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY + ); + + final Object rawByTaskType = resourceContext.get(BY_TASK_TYPE_KEY); + if (rawByTaskType == null) { + return result; + } + + final String taskTypePath = DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY + "." + BY_TASK_TYPE_KEY; + final Map byTaskType = asMap(rawByTaskType, taskTypePath); + final Object rawTaskTypeResources = byTaskType.get(task.getType()); + if (rawTaskTypeResources == null) { + return result; + } + + return applyResourceConfig( + result, + asMap(rawTaskTypeResources, taskTypePath + "." + task.getType()), + taskTypePath + "." + task.getType() + ); + } + + private static ResourceRequirements applyResourceConfig( + ResourceRequirements requirements, + Map resourceConfig, + String contextPath + ) + { + final Map sharedResources = getSharedResources(resourceConfig, contextPath); + final Map requestResources = getResourceMap(resourceConfig, REQUESTS_KEY, contextPath); + final Map limitResources = getResourceMap(resourceConfig, LIMITS_KEY, contextPath); + + if (sharedResources.isEmpty() && requestResources.isEmpty() && limitResources.isEmpty()) { + return requirements; + } + + final ResourceRequirements result = requirements == null + ? new ResourceRequirementsBuilder().build() + : requirements; + + if (!sharedResources.isEmpty()) { + addRequests(result, sharedResources); + addLimits(result, sharedResources); + } + if (!requestResources.isEmpty()) { + addRequests(result, requestResources); + } + if (!limitResources.isEmpty()) { + addLimits(result, limitResources); + } + + return result; + } + + private static Map getSharedResources(Map resourceConfig, String contextPath) + { + final Map resources = new HashMap<>(); + for (Map.Entry entry : resourceConfig.entrySet()) { + final String key = asStringKey(entry.getKey(), contextPath); + if (REQUESTS_KEY.equals(key) || LIMITS_KEY.equals(key) || BY_TASK_TYPE_KEY.equals(key)) { + continue; + } + resources.put(key, asQuantity(entry.getValue(), contextPath + "." + key)); + } + return resources; + } + + private static Map getResourceMap(Map resourceConfig, String resourceType, String contextPath) + { + final Object rawResources = resourceConfig.get(resourceType); + if (rawResources == null) { + return Map.of(); + } + + final String resourcePath = contextPath + "." + resourceType; + final Map rawResourceMap = asMap(rawResources, resourcePath); + final Map resources = new HashMap<>(); + for (Map.Entry entry : rawResourceMap.entrySet()) { + final String key = asStringKey(entry.getKey(), resourcePath); + resources.put(key, asQuantity(entry.getValue(), resourcePath + "." + key)); + } + return resources; + } + + private static void addRequests(ResourceRequirements requirements, Map resources) + { + if (requirements.getRequests() == null) { + requirements.setRequests(new HashMap<>()); + } + requirements.getRequests().putAll(resources); + } + + private static void addLimits(ResourceRequirements requirements, Map resources) + { + if (requirements.getLimits() == null) { + requirements.setLimits(new HashMap<>()); + } + requirements.getLimits().putAll(resources); + } + + private static Map asMap(Object value, String contextPath) + { + if (value instanceof Map) { + return (Map) value; + } + + throw InvalidInput.exception("Task context value [%s] must be an object.", contextPath); + } + + private static String asStringKey(Object key, String contextPath) + { + if (key instanceof String) { + return (String) key; + } + + throw InvalidInput.exception("Task context value [%s] must contain only string keys.", contextPath); + } + + private static Quantity asQuantity(Object value, String contextPath) + { + if (value instanceof Quantity) { + return (Quantity) value; + } else if (value instanceof String || value instanceof Number) { + return new Quantity(String.valueOf(value)); + } + + throw InvalidInput.exception("Task context value [%s] must be a string or number.", contextPath); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java index e56b153dcc8e..79c28d3f06b5 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java @@ -109,7 +109,7 @@ Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) // compress the task.json to set as an env variables String taskContents = Base64Compression.compressBase64(mapper.writeValueAsString(task)); - setupMainContainer(podSpec, context, containerSize, taskContents); + setupMainContainer(podSpec, task, context, containerSize, taskContents); // add any optional annotations or labels. Map annotations = addJobSpecificAnnotations(context, k8sTaskId); diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 0ad3385ca212..7cdc58837e6b 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; @@ -119,7 +120,7 @@ public String getAdapterType() @Override public Job fromTask(Task task) throws IOException { - Optional selectedPodTemplate = podTemplateSelector.getPodTemplateForTask(task); + final Optional selectedPodTemplate = podTemplateSelector.getPodTemplateForTask(task); if (selectedPodTemplate == null || !selectedPodTemplate.isPresent()) { throw InternalServerError.exception( "Could not find pod template for task [%s]." @@ -127,9 +128,9 @@ public Job fromTask(Task task) throws IOException task.getId() ); } - PodTemplateWithName podTemplateWithName = selectedPodTemplate.get(); + final PodTemplateWithName podTemplateWithName = selectedPodTemplate.get(); - return new JobBuilder() + final Job job = new JobBuilder() .withNewMetadata() .withName(new K8sTaskId(taskRunnerConfig.getK8sTaskPodNamePrefix(), task).getK8sJobName()) .addToLabels(getJobLabels(taskRunnerConfig, task)) @@ -157,6 +158,16 @@ public Job fromTask(Task task) throws IOException .getStandardSeconds()) .endSpec() .build(); + applyTaskResourceOverrides(job, task); + return job; + } + + private void applyTaskResourceOverrides(Job job, Task task) + { + final Container mainContainer = job.getSpec().getTemplate().getSpec().getContainers().get(0); + mainContainer.setResources( + K8sTaskResourceContext.applyTaskResourceOverrides(mainContainer.getResources(), task) + ); } /** diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java index 4f6dffa6c7b7..665ce14df2e3 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java @@ -97,7 +97,7 @@ Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) // compress the task.json to set as an env variables String taskContents = Base64Compression.compressBase64(mapper.writeValueAsString(task)); - Container mainContainer = setupMainContainer(podSpec, context, containerSize, taskContents); + Container mainContainer = setupMainContainer(podSpec, task, context, containerSize, taskContents); // add any optional annotations or labels. Map annotations = addJobSpecificAnnotations(context, k8sTaskId); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 7275e1278cbf..4485429da89a 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -810,4 +810,85 @@ void testEphemeralStorage() ); Assertions.assertEquals(1, additionalProperties.getAdditionalProperties().size()); } + + @Test + void testTaskContextResourceOverrides() + { + final ResourceRequirements baseRequirements = K8sTaskAdapter.getResourceRequirements( + null, + 100, + 1000 + ); + final Task task = new NoopTask( + "id", + "id", + "datasource", + 0, + 0, + ImmutableMap.of( + DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY, + ImmutableMap.of( + "memory", "4Gi", + "ephemeral-storage", "20Gi", + "example.com/custom-resource", "1", + "requests", ImmutableMap.of("cpu", "1500m"), + "limits", ImmutableMap.of("cpu", "2"), + "byTaskType", ImmutableMap.of( + NoopTask.TYPE, + ImmutableMap.of( + "limits", + ImmutableMap.of( + "ephemeral-storage", "40Gi", + "example.com/custom-resource", "2" + ) + ) + ) + ) + ) + ); + + final ResourceRequirements result = K8sTaskResourceContext.applyTaskResourceOverrides(baseRequirements, task); + + Assertions.assertEquals("1500m", result.getRequests().get("cpu").toString()); + Assertions.assertEquals("4Gi", result.getRequests().get("memory").toString()); + Assertions.assertEquals("20Gi", result.getRequests().get("ephemeral-storage").toString()); + Assertions.assertEquals("1", result.getRequests().get("example.com/custom-resource").toString()); + + Assertions.assertEquals("2", result.getLimits().get("cpu").toString()); + Assertions.assertEquals("4Gi", result.getLimits().get("memory").toString()); + Assertions.assertEquals("40Gi", result.getLimits().get("ephemeral-storage").toString()); + Assertions.assertEquals("2", result.getLimits().get("example.com/custom-resource").toString()); + } + + @Test + void testTaskContextResourceOverridesIgnoreOtherTaskTypes() + { + final ResourceRequirements baseRequirements = K8sTaskAdapter.getResourceRequirements( + null, + 100, + 1000 + ); + final Task task = new NoopTask( + "id", + "id", + "datasource", + 0, + 0, + ImmutableMap.of( + DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY, + ImmutableMap.of( + "requests", ImmutableMap.of("memory", "2Gi"), + "byTaskType", ImmutableMap.of( + "partial_index_generate", + ImmutableMap.of("requests", ImmutableMap.of("memory", "8Gi")) + ) + ) + ) + ); + + final ResourceRequirements result = K8sTaskResourceContext.applyTaskResourceOverrides(baseRequirements, task); + + Assertions.assertEquals("2Gi", result.getRequests().get("memory").toString()); + Assertions.assertEquals("100", result.getLimits().get("memory").toString()); + } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index a6ef7f547eeb..d44c283fc623 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import io.fabric8.kubernetes.api.model.PodTemplate; +import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import org.apache.commons.lang3.RandomStringUtils; @@ -141,6 +142,53 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON Assertions.assertEquals(actual, expected); } + @Test + public void test_fromTask_withResourceOverridesInTaskContext() throws IOException + { + TestPodTemplateSelector podTemplateSelector = new TestPodTemplateSelector(podTemplateSpec); + + PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( + taskRunnerConfig, + taskConfig, + node, + mapper, + taskLogs, + podTemplateSelector + ); + + Task task = new NoopTask( + "id", + "id", + "datasource", + 0, + 0, + ImmutableMap.of( + DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY, + ImmutableMap.of( + "memory", "4Gi", + "requests", ImmutableMap.of( + "cpu", "500m", + "ephemeral-storage", "10Gi" + ), + "limits", ImmutableMap.of( + "cpu", "2", + "example.com/custom-resource", "1" + ) + ) + ) + ); + + Job actual = adapter.fromTask(task); + ResourceRequirements resources = actual.getSpec().getTemplate().getSpec().getContainers().get(0).getResources(); + + Assertions.assertEquals("500m", resources.getRequests().get("cpu").toString()); + Assertions.assertEquals("4Gi", resources.getRequests().get("memory").toString()); + Assertions.assertEquals("10Gi", resources.getRequests().get("ephemeral-storage").toString()); + Assertions.assertEquals("2", resources.getLimits().get("cpu").toString()); + Assertions.assertEquals("4Gi", resources.getLimits().get("memory").toString()); + Assertions.assertEquals("1", resources.getLimits().get("example.com/custom-resource").toString()); + } + @Test public void test_fromTask_withoutAnnotations_throwsDruidException() { @@ -427,6 +475,7 @@ public void test_fromTask_taskSupportsQueries() throws IOException EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); + EasyMock.expect(task.getContextValue(DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY)).andReturn(null).anyTimes(); EasyMock.replay(task); Job actual = adapter.fromTask(task); @@ -460,6 +509,7 @@ public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws IOExcep EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes(); EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes(); EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes(); + EasyMock.expect(task.getContextValue(DruidK8sConstants.TASK_CONTEXT_RESOURCES_KEY)).andReturn(null).anyTimes(); EasyMock.replay(task); Job actual = adapter.fromTask(task);