diff --git a/Makefile b/Makefile index 142b5e3e6..0b12076a5 100644 --- a/Makefile +++ b/Makefile @@ -275,3 +275,8 @@ mock-generate: bin/mockgen -package mocks \ -destination pkg/resources/kafka/mocks/Client.go \ sigs.k8s.io/controller-runtime/pkg/client Client + $(BIN_DIR)/mockgen \ + -copyright_file $(BOILERPLATE_DIR)/header.generated.txt \ + -package mocks \ + -destination pkg/resources/kafka/mocks/KafkaClient.go \ + -source pkg/kafkaclient/client.go diff --git a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml index 47aefbd0a..24ec8c0b7 100644 --- a/config/samples/banzaicloud_v1beta1_kafkacluster.yaml +++ b/config/samples/banzaicloud_v1beta1_kafkacluster.yaml @@ -55,6 +55,14 @@ spec: # alerts with 'rollingupgrade' # failureThreshold: 1 + # concurrentBrokerRestartsAllowed controls how many brokers can be restarted in parallel during a rolling upgrade. If + # it is set to a value greater than 1, the operator will restart up to that amount of brokers in parallel, if the + # brokers are within the same AZ (as specified by "broker.rack" in broker read-only configs). Since using Kafka broker + # racks spreads out the replicas, we know that restarting multiple brokers in the same rack will not cause more than + # 1/Nth of the replicas of a topic-partition to be unavailable at the same time, where N is the number of racks used. + # This is a safe way to speed up the rolling upgrade. + # concurrentBrokerRestartsAllowed: 1 + # brokerConfigGroups specifies multiple broker configs with unique name brokerConfigGroups: # Specify desired group name (eg., 'default_group') diff --git a/controllers/tests/common_test.go b/controllers/tests/common_test.go index c164a2498..15938c3e9 100644 --- a/controllers/tests/common_test.go +++ b/controllers/tests/common_test.go @@ -124,7 +124,7 @@ func createMinimalKafkaClusterCR(name, namespace string) *v1beta1.KafkaCluster { CCJMXExporterConfig: "custom_property: custom_value", }, ReadOnlyConfig: "cruise.control.metrics.topic.auto.create=true", - RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{FailureThreshold: 1, ConcurrentBrokerRestartCountPerRack: 1}, }, } } diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 03f7e14af..11690d62b 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -77,7 +77,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is an add_broker operation for execution", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock1()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock1()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -105,7 +105,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("add_broker operation is finished with completedWithError and 30s has not elapsed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock2()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock2()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -132,7 +132,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("add_broker operation is finished with completedWithError and 30s has elapsed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock5()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock5()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) @@ -161,7 +161,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is an errored remove_broker and an add_broker operation", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock3()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock3()) // First operation will get completedWithError operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) @@ -208,7 +208,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is a new remove_broker and an errored remove_broker operation with pause annotation", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4()) operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Labels["pause"] = "true" err := k8sClient.Create(ctx, &operation) @@ -257,7 +257,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("there is a new remove_broker and an errored remove_broker operation with ignore ErrorPolicy", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock4()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock4()) // Creating first operation operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) operation.Spec.ErrorPolicy = v1alpha1.ErrorPolicyIgnore @@ -307,7 +307,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("Cruise Control makes the Status operation async", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7()) + cruiseControlOperationReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMock7()) operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName()) err := k8sClient.Create(ctx, &operation) Expect(err).NotTo(HaveOccurred()) diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index 1b99978a2..8547de098 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -88,7 +88,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { When("new storage is added", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { if err := k8sClient.Get(ctx, types.NamespacedName{ @@ -147,7 +147,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new storage is added but there is a not JBOD capacityConfig for that", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -227,7 +227,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new storage is added and one broker is JBOD and another is not JBOD", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask2([]string{mountPath})) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -312,7 +312,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("new broker is added", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, Namespace: kafkaCluster.Namespace, @@ -400,7 +400,7 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) When("a broker is removed", Serial, func() { JustBeforeEach(func(ctx SpecContext) { - kafkaClusterCCReconciler.ScaleFactory = NewMockScaleFactory(getScaleMockCCTask1()) + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { if err := k8sClient.Get(ctx, types.NamespacedName{ Name: kafkaCluster.Name, @@ -447,12 +447,6 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }) }) -func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { - return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { - return mock, nil - } -} - func getScaleMockCCTask1() *mocks.MockCruiseControlScaler { mockCtrl := gomock.NewController(GinkgoT()) scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) diff --git a/controllers/tests/mocks/scale_factory.go b/controllers/tests/mocks/scale_factory.go new file mode 100644 index 000000000..ca707b402 --- /dev/null +++ b/controllers/tests/mocks/scale_factory.go @@ -0,0 +1,28 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "context" + + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/scale" +) + +func NewMockScaleFactory(mock scale.CruiseControlScaler) func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { + return func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (scale.CruiseControlScaler, error) { + return mock, nil + } +} diff --git a/go.mod b/go.mod index 00cbd7668..2d7ef2c1f 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/banzaicloud/istio-client-go v0.0.17 github.com/banzaicloud/istio-operator/api/v2 v2.15.1 github.com/banzaicloud/k8s-objectmatcher v1.8.0 - github.com/banzaicloud/koperator/api v0.28.6 + github.com/banzaicloud/koperator/api v0.28.7 github.com/banzaicloud/koperator/properties v0.4.1 github.com/cert-manager/cert-manager v1.11.2 github.com/cisco-open/cluster-registry-controller/api v0.2.5 @@ -38,6 +38,7 @@ require ( require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect + github.com/stretchr/objx v0.5.0 // indirect golang.org/x/tools v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index ca2457a75..3b2fa7f1d 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg= github.com/banzaicloud/koperator/api v0.28.6 h1:ZsOAXAsg34O78qVCEHx84cdp57HlCje6zjzXHhvtXf4= github.com/banzaicloud/koperator/api v0.28.6/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE= +github.com/banzaicloud/koperator/api v0.28.7 h1:G6ICLzuz6Tumcsl9ZaqZ46ccwdAc1rXjidP03v6Kqp4= +github.com/banzaicloud/koperator/api v0.28.7/go.mod h1:AGGQ+aTBklaaG8ErotNPlP/nS47MYLc/jFVW7AsDiEE= github.com/banzaicloud/koperator/properties v0.4.1 h1:SB2QgXlcK1Dc7Z1rg65PJifErDa8OQnoWCCJgmC7SGc= github.com/banzaicloud/koperator/properties v0.4.1/go.mod h1:TcL+llxuhW3UeQtVEDYEXGouFLF2P+LuZZVudSb6jyA= github.com/banzaicloud/operator-tools v0.28.0 h1:GSfc0qZr6zo7WrNxdgWZE1LcTChPU8QFYOTDirYVtIM= @@ -433,6 +435,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/kafkaclient/provider.go b/pkg/kafkaclient/provider.go index 4432ad34d..7a71372c8 100644 --- a/pkg/kafkaclient/provider.go +++ b/pkg/kafkaclient/provider.go @@ -15,6 +15,7 @@ package kafkaclient import ( + "github.com/stretchr/testify/mock" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/banzaicloud/koperator/api/v1beta1" @@ -45,3 +46,13 @@ func NewDefaultProvider() Provider { func (dp *defaultProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) { return NewFromCluster(client, cluster) } + +// MockedProvider is a Testify mock for providing Kafka clients that can be mocks too +type MockedProvider struct { + mock.Mock +} + +func (m *MockedProvider) NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error) { + args := m.Called(client, cluster) + return args.Get(0).(KafkaClient), args.Get(1).(func()), args.Error(2) +} diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 81d91a7ff..a76f196e3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -23,7 +23,9 @@ import ( "strings" "emperror.dev/errors" + ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" "github.com/go-logr/logr" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,12 +33,15 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + properties "github.com/banzaicloud/koperator/properties/pkg" + apiutil "github.com/banzaicloud/koperator/api/util" "github.com/banzaicloud/k8s-objectmatcher/patch" "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" + banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/jmxextractor" "github.com/banzaicloud/koperator/pkg/k8sutil" @@ -91,7 +96,8 @@ const ( // Reconciler implements the Component Reconciler type Reconciler struct { resources.Reconciler - kafkaClientProvider kafkaclient.Provider + kafkaClientProvider kafkaclient.Provider + CruiseControlScalerFactory func(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster) (scale.CruiseControlScaler, error) } // New creates a new reconciler for Kafka @@ -102,9 +108,31 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk DirectClient: directClient, KafkaCluster: cluster, }, - kafkaClientProvider: kafkaClientProvider, + kafkaClientProvider: kafkaClientProvider, + CruiseControlScalerFactory: scale.ScaleFactoryFn(), + } +} + +func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { + brokerAzMap := make(map[int32]string) + for _, broker := range cluster.Spec.Brokers { + readOnlyConfigs, err := properties.NewFromString(broker.ReadOnlyConfig) + if err == nil { + brokerRack, brokerRackConfigFound := readOnlyConfigs.Get("broker.rack") + if brokerRackConfigFound { + brokerAzMap[broker.Id] = brokerRack.Value() + } + } + } + // if incomplete broker AZ information, consider all brokers as being in different AZs + if len(brokerAzMap) != len(cluster.Spec.Brokers) { + for _, broker := range cluster.Spec.Brokers { + brokerAzMap[broker.Id] = fmt.Sprintf("%d", broker.Id) + } } + return brokerAzMap } + func getCreatedPvcForBroker( ctx context.Context, c client.Reader, @@ -432,12 +460,11 @@ func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logge if len(podsDeletedFromSpec) > 0 { if !arePodsAlreadyDeleted(podsDeletedFromSpec, log) { - cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) // FIXME: we should reuse the context of the Kafka Controller - cc, err := scale.NewCruiseControlScaler(context.TODO(), scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)) + cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, - "failed to initialize Cruise Control Scaler", "cruise control url", cruiseControlURL) + "failed to initialize Cruise Control Scaler", "cruise control url", scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)) } brokerStates := []scale.KafkaBrokerState{ @@ -874,18 +901,30 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if err != nil { return errors.WrapIf(err, "failed to reconcile resource") } - for _, pod := range podList.Items { - pod := pod - if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) { - return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating"), "rolling upgrade in progress") - } - if k8sutil.IsPodContainsPendingContainer(&pod) { - return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still creating"), "rolling upgrade in progress") + // Check that all pods are present as in spec, before checking for terminating or pending pods, as we can have absent pods + if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress") + } + + // Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once + terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items) + if len(terminatingOrPendingPods) >= r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") + } + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 { + err = r.checkCCRackAwareDistributionGoal() + if err != nil { + return err } } + kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster) + currentPodAz, _ := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap) + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress") + } + // Check broker count with out-of-sync and offline replicas against the rolling upgrade failure threshold errorCount := r.KafkaCluster.Status.RollingUpgrade.ErrorCount - kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.BrokersUnreachable{}, err, "could not connect to kafka brokers") @@ -905,7 +944,6 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo if len(outOfSyncReplicas) > 0 { log.Info("out-of-sync replicas", "IDs", outOfSyncReplicas) } - impactedReplicas := make(map[int32]struct{}) for _, brokerID := range allOfflineReplicas { impactedReplicas[brokerID] = struct{}{} @@ -913,12 +951,17 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo for _, brokerID := range outOfSyncReplicas { impactedReplicas[brokerID] = struct{}{} } - errorCount += len(impactedReplicas) - if errorCount >= r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("cluster is not healthy"), "rolling upgrade in progress") } + + // If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ + if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 { + if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas, kafkaBrokerAvailabilityZoneMap) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress") + } + } } } @@ -940,6 +983,53 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } +func (r *Reconciler) checkCCRackAwareDistributionGoal() error { + cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) + cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) + if err != nil { + return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL) + } + status, err := cc.Status(context.Background()) + if err != nil { + return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress") + } + if !slices.Contains(status.State.AnalyzerState.ReadyGoals, ccTypes.RackAwareDistributionGoal) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress") + } + for _, anomaly := range status.State.AnomalyDetectorState.RecentGoalViolations { + if slices.Contains(anomaly.FixableViolatedGoals, ccTypes.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, ccTypes.RackAwareDistributionGoal) { + return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress") + } + } + return nil +} + +func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}, kafkaBrokerAvailabilityZoneMap map[int32]string) bool { + if currentPodAz == "" && len(impactedReplicas) > 0 { + return true + } + for brokerWithFailure := range impactedReplicas { + if currentPodAz != kafkaBrokerAvailabilityZoneMap[brokerWithFailure] { + return true + } + } + return false +} + +func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) bool { + if currentPodAz == "" && len(terminatingOrPendingPods) > 0 { + return true + } + for _, terminatingOrPendingPod := range terminatingOrPendingPods { + terminatingOrPendingPodAz, err := r.getBrokerAz(&terminatingOrPendingPod, kafkaBrokerAvailabilityZoneMap) + if err != nil || currentPodAz != terminatingOrPendingPodAz { + return true + } + } + return false +} + +//nolint:funlen func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState) var brokerIds []string @@ -1289,6 +1379,27 @@ func (r *Reconciler) determineControllerId() (int32, error) { return controllerID, nil } +func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod { + var pods []corev1.Pod + for _, pod := range items { + if k8sutil.IsMarkedForDeletion(pod.ObjectMeta) { + pods = append(pods, pod) + } + if k8sutil.IsPodContainsPendingContainer(&pod) { + pods = append(pods, pod) + } + } + return pods +} + +func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) (string, error) { + brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32) + if err != nil { + return "", err + } + return kafkaBrokerAvailabilityZoneMap[int32(brokerId)], nil +} + func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster, eListenerName string, ingressConfigName string) (*corev1.Service, error) { foundLBService := &corev1.Service{} diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index a1a5cda40..3b2de2259 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -18,13 +18,20 @@ import ( "context" "reflect" "testing" + "time" "emperror.dev/errors" + ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/banzaicloud/koperator/pkg/scale" + + "github.com/banzaicloud/koperator/pkg/kafkaclient" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,6 +39,7 @@ import ( "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" + controllerMocks "github.com/banzaicloud/koperator/controllers/tests/mocks" "github.com/banzaicloud/koperator/pkg/resources" mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" ) @@ -950,3 +958,654 @@ func TestGetServerPasswordKeysAndUsers(t *testing.T) { //nolint funlen }) } } + +// nolint funlen +func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { + t.Parallel() + testCases := []struct { + testName string + kafkaCluster v1beta1.KafkaCluster + desiredPod *corev1.Pod + currentPod *corev1.Pod + pods []corev1.Pod + allOfflineReplicas []int32 + outOfSyncReplicas []int32 + ccStatus *scale.StatusTaskResult + errorExpected bool + }{ + { + testName: "Pod is not deleted if pod list count different from spec", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{}, + pods: []corev1.Pod{}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts not specified (default=1) and another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts equals pods restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 201}, {Id: 301}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if broker.rack is not set in all read-only configs, if another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 101}, {Id: 102}, {Id: 201}, {Id: 102}, {Id: 301}, {Id: 302}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if broker.rack is not set in some read-only configs, if another pod is restarting", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: ""}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: ""}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if allowed concurrent restarts is not specified and failure threshold is reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 1, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is deleted if allowed concurrent restarts is default and failure threshold is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 1, + ConcurrentBrokerRestartCountPerRack: 1, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{}, + errorExpected: false, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, even if allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if failure is in another AZ, even if allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{201}, + errorExpected: true, + }, + { + testName: "Pod is deleted if all pods are running and CC RackAwareDistributionGoal is not ready and allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + errorExpected: false, + }, + { + testName: "Pod is deleted if failure is in same AZ and allowed concurrent restarts is not reached", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}}, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-102", Labels: map[string]string{"brokerId": "102"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-202", Labels: map[string]string{"brokerId": "202"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: false, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + allOfflineReplicas: []int32{}, + outOfSyncReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + allOfflineReplicas: []int32{101}, + outOfSyncReplicas: []int32{}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartCountPerRack: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + ccStatus: &scale.StatusTaskResult{ + State: &ccTypes.StateResult{ + AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, + AnomalyDetectorState: ccTypes.AnomalyDetectorState{ + RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, + }, + }, + }, + errorExpected: true, + }, + } + + mockCtrl := gomock.NewController(t) + + for _, test := range testCases { + mockClient := mocks.NewMockClient(mockCtrl) + mockKafkaClientProvider := new(kafkaclient.MockedProvider) + + t.Run(test.testName, func(t *testing.T) { + r := New(mockClient, nil, &test.kafkaCluster, mockKafkaClientProvider) + + // Mock client + mockClient.EXPECT().List( + context.TODO(), + gomock.AssignableToTypeOf(&corev1.PodList{}), + client.InNamespace("kafka"), + gomock.Any(), + ).Do(func(ctx context.Context, list *corev1.PodList, opts ...client.ListOption) { + list.Items = test.pods + }).Return(nil) + if !test.errorExpected { + mockClient.EXPECT().Delete(context.TODO(), test.currentPod).Return(nil) + } + + // Mock kafka client + mockedKafkaClient := mocks.NewMockKafkaClient(mockCtrl) + if test.allOfflineReplicas != nil { + mockedKafkaClient.EXPECT().AllOfflineReplicas().Return(test.allOfflineReplicas, nil) + } + if test.outOfSyncReplicas != nil { + mockedKafkaClient.EXPECT().OutOfSyncReplicas().Return(test.outOfSyncReplicas, nil) + } + mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) + + // Mock Cruise Control client + mockCruiseControl := controllerMocks.NewMockCruiseControlScaler(mockCtrl) + if test.ccStatus != nil { + mockCruiseControl.EXPECT().Status(context.Background()).Return(*test.ccStatus, nil) + } + r.CruiseControlScalerFactory = controllerMocks.NewMockScaleFactory(mockCruiseControl) + + // Call the handleRollingUpgrade function with the provided test.desiredPod and test.currentPod + err := r.handleRollingUpgrade(logf.Log, test.desiredPod, test.currentPod, reflect.TypeOf(test.desiredPod)) + + // Test that the expected error is returned + if test.errorExpected { + assert.NotNil(t, err, "Expected an error but got nil") + } else { + assert.Nil(t, err, "Expected no error but got one") + } + }) + } +} + +func TestGetBrokerAzMap(t *testing.T) { + t.Parallel() + testCases := []struct { + testName string + kafkaCluster v1beta1.KafkaCluster + expectedAzMap map[int32]string + }{ + { + testName: "Brokers have different AZs if no broker rack value is set", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: ""}, + {Id: 301, ReadOnlyConfig: ""}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"}, + }, + { + testName: "Brokers have different AZs if one broker has no broker rack value set", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 102, ReadOnlyConfig: ""}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az3"}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 102: "102", 201: "201", 202: "202", 301: "301", 302: "302"}, + }, + { + testName: "Brokers have different AZs if read only configs is a corrupted string for one broker", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack;az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + }, + }, + expectedAzMap: map[int32]string{101: "101", 201: "201", 301: "301"}, + }, + { + testName: "Brokers have correct AZs if read only configs is valid for all brokers", + kafkaCluster: v1beta1.KafkaCluster{ + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 102, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 202, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + {Id: 302, ReadOnlyConfig: "broker.rack=az-3"}, + }, + }, + }, + expectedAzMap: map[int32]string{ + 101: "az-1", + 102: "az-1", + 201: "az-2", + 202: "az-2", + 301: "az-3", + 302: "az-3", + }, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + azMap := getBrokerAzMap(&test.kafkaCluster) + assert.Equal(t, test.expectedAzMap, azMap) + }) + } +} diff --git a/pkg/resources/kafka/mocks/KafkaClient.go b/pkg/resources/kafka/mocks/KafkaClient.go new file mode 100644 index 000000000..e2b10acdd --- /dev/null +++ b/pkg/resources/kafka/mocks/KafkaClient.go @@ -0,0 +1,371 @@ +// Copyright 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/kafkaclient/client.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + sarama "github.com/Shopify/sarama" + v1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" + kafkaclient "github.com/banzaicloud/koperator/pkg/kafkaclient" + gomock "github.com/golang/mock/gomock" +) + +// MockKafkaClient is a mock of KafkaClient interface. +type MockKafkaClient struct { + ctrl *gomock.Controller + recorder *MockKafkaClientMockRecorder +} + +// MockKafkaClientMockRecorder is the mock recorder for MockKafkaClient. +type MockKafkaClientMockRecorder struct { + mock *MockKafkaClient +} + +// NewMockKafkaClient creates a new mock instance. +func NewMockKafkaClient(ctrl *gomock.Controller) *MockKafkaClient { + mock := &MockKafkaClient{ctrl: ctrl} + mock.recorder = &MockKafkaClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockKafkaClient) EXPECT() *MockKafkaClientMockRecorder { + return m.recorder +} + +// AllOfflineReplicas mocks base method. +func (m *MockKafkaClient) AllOfflineReplicas() ([]int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AllOfflineReplicas") + ret0, _ := ret[0].([]int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AllOfflineReplicas indicates an expected call of AllOfflineReplicas. +func (mr *MockKafkaClientMockRecorder) AllOfflineReplicas() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllOfflineReplicas", reflect.TypeOf((*MockKafkaClient)(nil).AllOfflineReplicas)) +} + +// AlterClusterWideConfig mocks base method. +func (m *MockKafkaClient) AlterClusterWideConfig(arg0 map[string]*string, arg1 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AlterClusterWideConfig", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AlterClusterWideConfig indicates an expected call of AlterClusterWideConfig. +func (mr *MockKafkaClientMockRecorder) AlterClusterWideConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterClusterWideConfig", reflect.TypeOf((*MockKafkaClient)(nil).AlterClusterWideConfig), arg0, arg1) +} + +// AlterPerBrokerConfig mocks base method. +func (m *MockKafkaClient) AlterPerBrokerConfig(arg0 int32, arg1 map[string]*string, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AlterPerBrokerConfig", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AlterPerBrokerConfig indicates an expected call of AlterPerBrokerConfig. +func (mr *MockKafkaClientMockRecorder) AlterPerBrokerConfig(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AlterPerBrokerConfig", reflect.TypeOf((*MockKafkaClient)(nil).AlterPerBrokerConfig), arg0, arg1, arg2) +} + +// Brokers mocks base method. +func (m *MockKafkaClient) Brokers() map[int32]string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Brokers") + ret0, _ := ret[0].(map[int32]string) + return ret0 +} + +// Brokers indicates an expected call of Brokers. +func (mr *MockKafkaClientMockRecorder) Brokers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Brokers", reflect.TypeOf((*MockKafkaClient)(nil).Brokers)) +} + +// Close mocks base method. +func (m *MockKafkaClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockKafkaClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockKafkaClient)(nil).Close)) +} + +// CreateTopic mocks base method. +func (m *MockKafkaClient) CreateTopic(arg0 *kafkaclient.CreateTopicOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTopic", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateTopic indicates an expected call of CreateTopic. +func (mr *MockKafkaClientMockRecorder) CreateTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockKafkaClient)(nil).CreateTopic), arg0) +} + +// CreateUserACLs mocks base method. +func (m *MockKafkaClient) CreateUserACLs(arg0 v1alpha1.KafkaAccessType, arg1 v1alpha1.KafkaPatternType, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateUserACLs", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateUserACLs indicates an expected call of CreateUserACLs. +func (mr *MockKafkaClientMockRecorder) CreateUserACLs(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).CreateUserACLs), arg0, arg1, arg2, arg3) +} + +// DeleteTopic mocks base method. +func (m *MockKafkaClient) DeleteTopic(arg0 string, arg1 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTopic indicates an expected call of DeleteTopic. +func (mr *MockKafkaClientMockRecorder) DeleteTopic(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockKafkaClient)(nil).DeleteTopic), arg0, arg1) +} + +// DeleteUserACLs mocks base method. +func (m *MockKafkaClient) DeleteUserACLs(arg0 string, arg1 v1alpha1.KafkaPatternType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteUserACLs", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteUserACLs indicates an expected call of DeleteUserACLs. +func (mr *MockKafkaClientMockRecorder) DeleteUserACLs(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).DeleteUserACLs), arg0, arg1) +} + +// DescribeCluster mocks base method. +func (m *MockKafkaClient) DescribeCluster() ([]*sarama.Broker, int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeCluster") + ret0, _ := ret[0].([]*sarama.Broker) + ret1, _ := ret[1].(int32) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// DescribeCluster indicates an expected call of DescribeCluster. +func (mr *MockKafkaClientMockRecorder) DescribeCluster() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeCluster", reflect.TypeOf((*MockKafkaClient)(nil).DescribeCluster)) +} + +// DescribeClusterWideConfig mocks base method. +func (m *MockKafkaClient) DescribeClusterWideConfig() ([]sarama.ConfigEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeClusterWideConfig") + ret0, _ := ret[0].([]sarama.ConfigEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeClusterWideConfig indicates an expected call of DescribeClusterWideConfig. +func (mr *MockKafkaClientMockRecorder) DescribeClusterWideConfig() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeClusterWideConfig", reflect.TypeOf((*MockKafkaClient)(nil).DescribeClusterWideConfig)) +} + +// DescribePerBrokerConfig mocks base method. +func (m *MockKafkaClient) DescribePerBrokerConfig(arg0 int32, arg1 []string) ([]*sarama.ConfigEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribePerBrokerConfig", arg0, arg1) + ret0, _ := ret[0].([]*sarama.ConfigEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribePerBrokerConfig indicates an expected call of DescribePerBrokerConfig. +func (mr *MockKafkaClientMockRecorder) DescribePerBrokerConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribePerBrokerConfig", reflect.TypeOf((*MockKafkaClient)(nil).DescribePerBrokerConfig), arg0, arg1) +} + +// DescribeTopic mocks base method. +func (m *MockKafkaClient) DescribeTopic(arg0 string) (*sarama.TopicMetadata, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeTopic", arg0) + ret0, _ := ret[0].(*sarama.TopicMetadata) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeTopic indicates an expected call of DescribeTopic. +func (mr *MockKafkaClientMockRecorder) DescribeTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeTopic", reflect.TypeOf((*MockKafkaClient)(nil).DescribeTopic), arg0) +} + +// EnsurePartitionCount mocks base method. +func (m *MockKafkaClient) EnsurePartitionCount(arg0 string, arg1 int32) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnsurePartitionCount", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// EnsurePartitionCount indicates an expected call of EnsurePartitionCount. +func (mr *MockKafkaClientMockRecorder) EnsurePartitionCount(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsurePartitionCount", reflect.TypeOf((*MockKafkaClient)(nil).EnsurePartitionCount), arg0, arg1) +} + +// EnsureTopicConfig mocks base method. +func (m *MockKafkaClient) EnsureTopicConfig(arg0 string, arg1 map[string]*string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnsureTopicConfig", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// EnsureTopicConfig indicates an expected call of EnsureTopicConfig. +func (mr *MockKafkaClientMockRecorder) EnsureTopicConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnsureTopicConfig", reflect.TypeOf((*MockKafkaClient)(nil).EnsureTopicConfig), arg0, arg1) +} + +// GetTopic mocks base method. +func (m *MockKafkaClient) GetTopic(arg0 string) (*sarama.TopicDetail, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopic", arg0) + ret0, _ := ret[0].(*sarama.TopicDetail) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTopic indicates an expected call of GetTopic. +func (mr *MockKafkaClientMockRecorder) GetTopic(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopic", reflect.TypeOf((*MockKafkaClient)(nil).GetTopic), arg0) +} + +// ListTopics mocks base method. +func (m *MockKafkaClient) ListTopics() (map[string]sarama.TopicDetail, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTopics") + ret0, _ := ret[0].(map[string]sarama.TopicDetail) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTopics indicates an expected call of ListTopics. +func (mr *MockKafkaClientMockRecorder) ListTopics() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopics", reflect.TypeOf((*MockKafkaClient)(nil).ListTopics)) +} + +// ListUserACLs mocks base method. +func (m *MockKafkaClient) ListUserACLs() ([]sarama.ResourceAcls, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListUserACLs") + ret0, _ := ret[0].([]sarama.ResourceAcls) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListUserACLs indicates an expected call of ListUserACLs. +func (mr *MockKafkaClientMockRecorder) ListUserACLs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListUserACLs", reflect.TypeOf((*MockKafkaClient)(nil).ListUserACLs)) +} + +// NumBrokers mocks base method. +func (m *MockKafkaClient) NumBrokers() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NumBrokers") + ret0, _ := ret[0].(int) + return ret0 +} + +// NumBrokers indicates an expected call of NumBrokers. +func (mr *MockKafkaClientMockRecorder) NumBrokers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NumBrokers", reflect.TypeOf((*MockKafkaClient)(nil).NumBrokers)) +} + +// Open mocks base method. +func (m *MockKafkaClient) Open() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open") + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *MockKafkaClientMockRecorder) Open() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockKafkaClient)(nil).Open)) +} + +// OutOfSyncReplicas mocks base method. +func (m *MockKafkaClient) OutOfSyncReplicas() ([]int32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OutOfSyncReplicas") + ret0, _ := ret[0].([]int32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OutOfSyncReplicas indicates an expected call of OutOfSyncReplicas. +func (mr *MockKafkaClientMockRecorder) OutOfSyncReplicas() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutOfSyncReplicas", reflect.TypeOf((*MockKafkaClient)(nil).OutOfSyncReplicas)) +} + +// TopicMetaToStatus mocks base method. +func (m *MockKafkaClient) TopicMetaToStatus(meta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TopicMetaToStatus", meta) + ret0, _ := ret[0].(*v1alpha1.KafkaTopicStatus) + return ret0 +} + +// TopicMetaToStatus indicates an expected call of TopicMetaToStatus. +func (mr *MockKafkaClientMockRecorder) TopicMetaToStatus(meta interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopicMetaToStatus", reflect.TypeOf((*MockKafkaClient)(nil).TopicMetaToStatus), meta) +} diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index 5fcac7ea3..90afd7ff5 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -137,6 +137,7 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, er State: v1beta1.CruiseControlTaskCompleted, }, Status: &status, + State: resp.Result, }, nil } diff --git a/pkg/scale/types.go b/pkg/scale/types.go index c383e30d5..6a7c0bcbf 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -64,6 +64,7 @@ const ( type StatusTaskResult struct { TaskResult *Result Status *CruiseControlStatus + State *types.StateResult } // CruiseControlStatus struct is used to describe internal state of Cruise Control.