diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 762f3f21d..ccff79a63 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -148,6 +148,12 @@ type KafkaClusterSpec struct { // +optional KRaftMode bool `json:"kRaft"` HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` + // Allows ScaleOps or other Admission Hooks to manage Memory and CPU Resource Requests for + // Kafka Broker Pods. This Disables CPU and Memory request reconciliation from the desired + // state defined in the KafkaCluster to the current state in the Kubernetes Cluster + // +kubebuilder:default=false + // +optional + AdmissionWebhooksEnabled bool `json:"admissionWebhooksEnabled,omitempty"` // localDebugEnabled is used to decide whether to create a separate loadbalancer services for the // Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka // cluster with LoadBalancer type, which can be used for running Koperator on a local machine against diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index a7266064a..26bf40b8f 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -96,6 +96,13 @@ spec: - containerPort type: object type: array + admissionWebhooksEnabled: + default: false + description: |- + Allows ScaleOps or other Admission Hooks to manage Memory and CPU Resource Requests for + Kafka Broker Pods. This Disables CPU and Memory request reconciliation from the desired + state defined in the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index a7266064a..26bf40b8f 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -96,6 +96,13 @@ spec: - containerPort type: object type: array + admissionWebhooksEnabled: + default: false + description: |- + Allows ScaleOps or other Admission Hooks to manage Memory and CPU Resource Requests for + Kafka Broker Pods. This Disables CPU and Memory request reconciliation from the desired + state defined in the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean alertManagerConfig: description: AlertManagerConfig defines configuration for alert manager properties: diff --git a/config/samples/simplekafkacluster.yaml b/config/samples/simplekafkacluster.yaml index 307e37999..75b9a4b82 100644 --- a/config/samples/simplekafkacluster.yaml +++ b/config/samples/simplekafkacluster.yaml @@ -6,6 +6,7 @@ metadata: name: kafka spec: localDebugEnabled: true + admissionWebhooksEnabled: true kRaft: false monitoringConfig: jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0" diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index d737754f8..dfc706e9d 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -956,6 +956,16 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo } desiredPod.Spec.Tolerations = uniqueTolerations } + + if r.KafkaCluster.Spec.AdmissionWebhooksEnabled { + // if resources requets are updated by scale ops, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncResourceRequests(desiredPod, currentPod) + // If current pod had affinities created by ScaleOps, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncAffinities(desiredPod, currentPod) + } + // Check if the resource actually updated or if labels match TaintedBrokersSelector patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) switch { diff --git a/pkg/resources/kafka/util.go b/pkg/resources/kafka/util.go index cfafbae14..68a5006a4 100644 --- a/pkg/resources/kafka/util.go +++ b/pkg/resources/kafka/util.go @@ -18,9 +18,11 @@ package kafka import ( "encoding/base64" "fmt" + "reflect" "sort" "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -73,3 +75,133 @@ func generateRandomClusterID() string { randomUUID := uuid.New() return base64.URLEncoding.EncodeToString(randomUUID[:]) } + +// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers +// with the values from currentPod so that request-only changes do not trigger a pod restart. +func syncResourceRequests(desiredPod, currentPod *corev1.Pod) { + syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers) + syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers) +} + +func syncContainerResourceRequests(desired, current []corev1.Container) { + index := make(map[string]corev1.ResourceList, len(current)) + for _, c := range current { + index[c.Name] = c.Resources.Requests + } + for i := range desired { + c := &desired[i] + reqs, ok := index[c.Name] + if !ok { + continue + } + if c.Resources.Requests == nil { + c.Resources.Requests = make(corev1.ResourceList) + } + for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} { + if val, exists := reqs[res]; exists { + c.Resources.Requests[res] = val + } else { + delete(c.Resources.Requests, res) + } + } + } +} + +// syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod. +// This includes pod affinities with scaleOpsManagedUnevictableLabel label selector +// and node affinities with "scaleops.sh/node-packing=true" selector. +func syncAffinities(desiredPod, currentPod *corev1.Pod) { + syncPodAffinities(desiredPod, currentPod) + syncNodeAffinities(desiredPod, currentPod) +} + +// syncScaleOpsPodAffinities syncs preferred pod affinities with scaleOpsManagedUnevictableLabel +// label selector from current pod to desired pod. +func syncPodAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil { + return + } + + currentPodAffinity := currentPod.Spec.Affinity.PodAffinity + + // Filter preferred pod affinities with scaleOpsManagedUnevictableLabel label selector + var admissionWebhookPpreferredAffinities []corev1.WeightedPodAffinityTerm + if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + if term.PodAffinityTerm.LabelSelector != nil { + admissionWebhookPpreferredAffinities = append(admissionWebhookPpreferredAffinities, term) + } + } + } + + // If we found any scale ops preferred affinities, add them to the desired pod + if len(admissionWebhookPpreferredAffinities) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.PodAffinity == nil { + desiredPod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} + } + + // Merge scale ops preferred affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range admissionWebhookPpreferredAffinities { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.PodAffinityTerm, newTerm.PodAffinityTerm) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} + +// syncScaleOpsNodeAffinities syncs preferred node affinities with "scaleops.sh/node-packing=true" +// selector from current pod to desired pod. +func syncNodeAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.NodeAffinity == nil { + return + } + + currentNodeAffinity := currentPod.Spec.Affinity.NodeAffinity + + // Filter preferred node affinities with "scaleops.sh/node-packing=true" selector + var admissionWebhookPreferredTerms []corev1.PreferredSchedulingTerm + if currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + admissionWebhookPreferredTerms = append(admissionWebhookPreferredTerms, + currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...) + } + + // If we found any scale ops node affinities, add them to the desired pod + if len(admissionWebhookPreferredTerms) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.NodeAffinity == nil { + desiredPod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} + } + + // Merge scale ops node affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range admissionWebhookPreferredTerms { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.Preference, newTerm.Preference) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} diff --git a/pkg/resources/kafka/util_test.go b/pkg/resources/kafka/util_test.go index d4c04045e..4a1831ba7 100644 --- a/pkg/resources/kafka/util_test.go +++ b/pkg/resources/kafka/util_test.go @@ -20,6 +20,10 @@ import ( "reflect" "testing" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -402,3 +406,762 @@ func TestGenerateQuorumVoters(t *testing.T) { }) } } + +func TestSyncPodAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedPodAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "no pod affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "pod affinity is Current Pod, should be included in desired pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "desired pod already has pod affinity, scaleops affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp-new", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 80, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + expectedPodAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncPodAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedPodAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + t.Errorf("expected no pod affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d pod affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + }) + } +} + +func TestSyncNodeAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedNodeAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "no node affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "node affinity with admissionWebhoooklabel in MatchExpressions", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "admissionWebhookLabel", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with admissionWebhook label in MatchFields", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchFields: []corev1.NodeSelectorRequirement{ + { + Key: "admissionWebhookLabel", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with multiple Preferred Affinities", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleOpsNodePackingLabel", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 2, + }, + { + name: "desired pod already has node affinity, AdmissionWebhook affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleOpsNodePackingLabel", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 80, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedNodeAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncNodeAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedNodeAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + t.Errorf("expected no node affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d node affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + }) + } +} + +func TestSyncResourceRequests(t *testing.T) { + cpu100m := resource.MustParse("100m") + cpu200m := resource.MustParse("200m") + mem128Mi := resource.MustParse("128Mi") + mem256Mi := resource.MustParse("256Mi") + storage1Gi := resource.MustParse("1Gi") + + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + // verify is called after syncResourceRequests to assert the desired pod state + verify func(t *testing.T, desiredPod *corev1.Pod) + }{ + { + name: "no containers in either pod", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{}, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + if len(desiredPod.Spec.Containers) != 0 { + t.Errorf("expected no containers") + } + }, + }, + { + name: "current cpu and memory are applied to desired container", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected CPU 200m, got %s", gotCPU.String()) + } + gotMem := reqs[corev1.ResourceMemory] + if !gotMem.Equal(mem256Mi) { + t.Errorf("expected memory 256Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "desired container not in current is left unchanged", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "other"}, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu100m) { + t.Errorf("expected CPU unchanged at 100m, got %s", gotCPU.String()) + } + gotMem := reqs[corev1.ResourceMemory] + if !gotMem.Equal(mem128Mi) { + t.Errorf("expected memory unchanged at 128Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "current container missing cpu and memory deletes those keys from desired", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + if _, ok := reqs[corev1.ResourceCPU]; ok { + t.Errorf("expected CPU to be deleted from desired, but it was present") + } + if _, ok := reqs[corev1.ResourceMemory]; ok { + t.Errorf("expected memory to be deleted from desired, but it was present") + } + }, + }, + { + name: "non cpu/memory resources in current are not copied to desired", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceEphemeralStorage: storage1Gi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + reqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := reqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected CPU 200m, got %s", gotCPU.String()) + } + if _, ok := reqs[corev1.ResourceEphemeralStorage]; ok { + t.Errorf("expected ephemeral-storage not to be copied, but it was present") + } + }, + }, + { + name: "init containers are synced independently from regular containers", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + }, + }, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-certs", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{}, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "init-certs", + Resources: corev1.ResourceRequirements{}, + }, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + containerReqs := desiredPod.Spec.Containers[0].Resources.Requests + gotCPU := containerReqs[corev1.ResourceCPU] + if !gotCPU.Equal(cpu200m) { + t.Errorf("expected container CPU 200m, got %s", gotCPU.String()) + } + initReqs := desiredPod.Spec.InitContainers[0].Resources.Requests + gotMem := initReqs[corev1.ResourceMemory] + if !gotMem.Equal(mem256Mi) { + t.Errorf("expected init container memory 256Mi, got %s", gotMem.String()) + } + }, + }, + { + name: "multiple containers: each is matched by name independently", + currentPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "kafka", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu200m, + corev1.ResourceMemory: mem256Mi, + }, + }, + }, + { + Name: "cruise-control", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: cpu100m, + corev1.ResourceMemory: mem128Mi, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "kafka", Resources: corev1.ResourceRequirements{}}, + {Name: "cruise-control", Resources: corev1.ResourceRequirements{}}, + }, + }, + }, + verify: func(t *testing.T, desiredPod *corev1.Pod) { + kafkaReqs := desiredPod.Spec.Containers[0].Resources.Requests + gotKafkaCPU := kafkaReqs[corev1.ResourceCPU] + if !gotKafkaCPU.Equal(cpu200m) { + t.Errorf("kafka: expected CPU 200m, got %s", gotKafkaCPU.String()) + } + ccReqs := desiredPod.Spec.Containers[1].Resources.Requests + gotCCCPU := ccReqs[corev1.ResourceCPU] + if !gotCCCPU.Equal(cpu100m) { + t.Errorf("cruise-control: expected CPU 100m, got %s", gotCCCPU.String()) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncResourceRequests(tt.desiredPod, tt.currentPod) + tt.verify(t, tt.desiredPod) + }) + } +} + +func TestSyncAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectPodAffinity bool + expectNodeAffinity bool + }{ + { + name: "no affinities in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: false, + expectNodeAffinity: false, + }, + { + name: "both pod and node affinities in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleOpsManagedUnevictableLabel": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleOpsNodePackingLabel", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: true, + expectNodeAffinity: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncAffinities(tt.desiredPod, tt.currentPod) + + if tt.expectPodAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no pod affinity") + } + } + } + + if tt.expectNodeAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no node affinity") + } + } + } + }) + } +}