From adac7932a9524bfd57b4949693c7c0e558c1a247 Mon Sep 17 00:00:00 2001 From: Lucian Ilie Date: Thu, 3 Aug 2023 15:01:14 +0300 Subject: [PATCH] Address PR comments - part 6 --- pkg/resources/kafka/kafka.go | 14 +++++++------- pkg/resources/kafka/kafka_test.go | 14 +++++++------- pkg/scale/scale.go | 2 +- pkg/scale/types.go | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 698b1e75d..a76f196e3 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -23,7 +23,7 @@ import ( "strings" "emperror.dev/errors" - types2 "github.com/banzaicloud/go-cruise-control/pkg/types" + ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types" "github.com/go-logr/logr" "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" @@ -912,7 +912,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 { - err = r.isCruiseControlRackAwareDistributionGoalViolated() + err = r.checkCCRackAwareDistributionGoal() if err != nil { return err } @@ -983,21 +983,21 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo return nil } -func (r *Reconciler) isCruiseControlRackAwareDistributionGoalViolated() error { +func (r *Reconciler) checkCCRackAwareDistributionGoal() error { cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster) cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL) } - status, err := cc.Status(context.TODO()) + status, err := cc.Status(context.Background()) if err != nil { return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress") } - if !slices.Contains(status.Result.AnalyzerState.ReadyGoals, types2.RackAwareDistributionGoal) { + if !slices.Contains(status.State.AnalyzerState.ReadyGoals, ccTypes.RackAwareDistributionGoal) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress") } - for _, anomaly := range status.Result.AnomalyDetectorState.RecentGoalViolations { - if slices.Contains(anomaly.FixableViolatedGoals, types2.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, types2.RackAwareDistributionGoal) { + for _, anomaly := range status.State.AnomalyDetectorState.RecentGoalViolations { + if slices.Contains(anomaly.FixableViolatedGoals, ccTypes.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, ccTypes.RackAwareDistributionGoal) { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress") } } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 92290400c..3b2de2259 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1061,7 +1061,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1102,7 +1102,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1213,7 +1213,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1329,7 +1329,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { allOfflineReplicas: []int32{}, outOfSyncReplicas: []int32{101}, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1366,7 +1366,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1465,7 +1465,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, ccStatus: &scale.StatusTaskResult{ - Result: &ccTypes.StateResult{ + State: &ccTypes.StateResult{ AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}}, AnomalyDetectorState: ccTypes.AnomalyDetectorState{ RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}}, @@ -1511,7 +1511,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { // Mock Cruise Control client mockCruiseControl := controllerMocks.NewMockCruiseControlScaler(mockCtrl) if test.ccStatus != nil { - mockCruiseControl.EXPECT().Status(context.TODO()).Return(*test.ccStatus, nil) + mockCruiseControl.EXPECT().Status(context.Background()).Return(*test.ccStatus, nil) } r.CruiseControlScalerFactory = controllerMocks.NewMockScaleFactory(mockCruiseControl) diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index 9dd4d9743..90afd7ff5 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -137,7 +137,7 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, er State: v1beta1.CruiseControlTaskCompleted, }, Status: &status, - Result: resp.Result, + State: resp.Result, }, nil } diff --git a/pkg/scale/types.go b/pkg/scale/types.go index 4ea945a02..6a7c0bcbf 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -64,7 +64,7 @@ const ( type StatusTaskResult struct { TaskResult *Result Status *CruiseControlStatus - Result *types.StateResult + State *types.StateResult } // CruiseControlStatus struct is used to describe internal state of Cruise Control.