Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow dashes when parsing broker rack #68

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ const (

var (
// kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`)
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*([\w-]+)`)
)

// Reconciler implements the Component Reconciler
Expand Down Expand Up @@ -219,6 +219,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {

log.V(1).Info("Reconciling")

log.Info("broker rack map", "kafkaBrokerAvailabilityZoneMap", r.kafkaBrokerAvailabilityZoneMap)

ctx := context.Background()
if err := k8sutil.UpdateBrokerConfigurationBackup(r.Client, r.KafkaCluster); err != nil {
log.Error(err, "failed to update broker configuration backup")
Expand Down Expand Up @@ -926,6 +928,9 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) > 0 {
log.Info("terminating or pending pods", "terminatingOrPendingPods", terminatingOrPendingPods)
}
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
Expand Down
132 changes: 125 additions & 7 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,9 +1269,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
Expand All @@ -1293,9 +1293,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
Expand Down Expand Up @@ -1531,6 +1531,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
outOfSyncReplicas: []int32{101},
errorExpected: false,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
outOfSyncReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
allOfflineReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az-1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az-2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az-3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
}

mockCtrl := gomock.NewController(t)
Expand All @@ -1557,7 +1675,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {

// Mock kafka client
mockedKafkaClient := new(mocks.KafkaClient)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil)
mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil)
mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil)

Expand Down