Skip to content

Commit

Permalink
Address PR comments - part 6
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc committed Aug 3, 2023
1 parent 86d16b2 commit adac793
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
14 changes: 7 additions & 7 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"

"emperror.dev/errors"
types2 "github.com/banzaicloud/go-cruise-control/pkg/types"
ccTypes "github.com/banzaicloud/go-cruise-control/pkg/types"
"github.com/go-logr/logr"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -912,7 +912,7 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && len(terminatingOrPendingPods) > 0 {
err = r.isCruiseControlRackAwareDistributionGoalViolated()
err = r.checkCCRackAwareDistributionGoal()
if err != nil {
return err
}
Expand Down Expand Up @@ -983,21 +983,21 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) isCruiseControlRackAwareDistributionGoalViolated() error {
func (r *Reconciler) checkCCRackAwareDistributionGoal() error {
cruiseControlURL := scale.CruiseControlURLFromKafkaCluster(r.KafkaCluster)
cc, err := r.CruiseControlScalerFactory(context.TODO(), r.KafkaCluster)
if err != nil {
return errorfactory.New(errorfactory.CruiseControlNotReady{}, err, "failed to initialize Cruise Control", "cruise control url", cruiseControlURL)
}
status, err := cc.Status(context.TODO())
status, err := cc.Status(context.Background())
if err != nil {
return errorfactory.New(errorfactory.CruiseControlNotReady{}, errors.New("failed to get status from Cruise Control"), "rolling upgrade in progress")
}
if !slices.Contains(status.Result.AnalyzerState.ReadyGoals, types2.RackAwareDistributionGoal) {
if !slices.Contains(status.State.AnalyzerState.ReadyGoals, ccTypes.RackAwareDistributionGoal) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is not ready"), "rolling upgrade in progress")
}
for _, anomaly := range status.Result.AnomalyDetectorState.RecentGoalViolations {
if slices.Contains(anomaly.FixableViolatedGoals, types2.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, types2.RackAwareDistributionGoal) {
for _, anomaly := range status.State.AnomalyDetectorState.RecentGoalViolations {
if slices.Contains(anomaly.FixableViolatedGoals, ccTypes.RackAwareDistributionGoal) || slices.Contains(anomaly.UnfixableViolatedGoals, ccTypes.RackAwareDistributionGoal) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("RackAwareDistributionGoal is violated"), "rolling upgrade in progress")
}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}},
},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}},
},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-302", Labels: map[string]string{"brokerId": "302"}}},
},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
allOfflineReplicas: []int32{},
outOfSyncReplicas: []int32{101},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1366,7 +1366,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
ccStatus: &scale.StatusTaskResult{
Result: &ccTypes.StateResult{
State: &ccTypes.StateResult{
AnalyzerState: ccTypes.AnalyzerState{ReadyGoals: []ccTypes.Goal{ccTypes.RackAwareDistributionGoal}},
AnomalyDetectorState: ccTypes.AnomalyDetectorState{
RecentGoalViolations: []ccTypes.AnomalyDetails{{UnfixableViolatedGoals: []ccTypes.Goal{}, FixableViolatedGoals: []ccTypes.Goal{}}},
Expand Down Expand Up @@ -1511,7 +1511,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
// Mock Cruise Control client
mockCruiseControl := controllerMocks.NewMockCruiseControlScaler(mockCtrl)
if test.ccStatus != nil {
mockCruiseControl.EXPECT().Status(context.TODO()).Return(*test.ccStatus, nil)
mockCruiseControl.EXPECT().Status(context.Background()).Return(*test.ccStatus, nil)
}
r.CruiseControlScalerFactory = controllerMocks.NewMockScaleFactory(mockCruiseControl)

Expand Down
2 changes: 1 addition & 1 deletion pkg/scale/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, er
State: v1beta1.CruiseControlTaskCompleted,
},
Status: &status,
Result: resp.Result,
State: resp.Result,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scale/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
type StatusTaskResult struct {
TaskResult *Result
Status *CruiseControlStatus
Result *types.StateResult
State *types.StateResult
}

// CruiseControlStatus struct is used to describe internal state of Cruise Control.
Expand Down

0 comments on commit adac793

Please sign in to comment.