diff --git a/api/go.mod b/api/go.mod index 55a13bb52f..b8c959c26e 100644 --- a/api/go.mod +++ b/api/go.mod @@ -7,6 +7,7 @@ require ( github.com/banzaicloud/istio-client-go v0.0.17 github.com/cert-manager/cert-manager v1.11.2 github.com/imdario/mergo v0.3.13 + github.com/stretchr/testify v1.8.1 golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 gotest.tools v2.2.0+incompatible k8s.io/api v0.26.4 @@ -15,6 +16,7 @@ require ( ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -23,12 +25,14 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/text v0.7.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect diff --git a/api/go.sum b/api/go.sum index bc91b36aae..33435c8427 100644 --- a/api/go.sum +++ b/api/go.sum @@ -47,8 +47,13 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -100,8 +105,10 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= k8s.io/api v0.26.4 h1:qSG2PmtcD23BkYiWfoYAcak870eF/hE7NNYBYavTT94= diff --git a/api/util/util.go b/api/util/util.go index c2b0e3b6ce..42ed1693f9 100644 --- a/api/util/util.go +++ b/api/util/util.go @@ -39,3 +39,13 @@ func MergeLabels(l ...map[string]string) map[string]string { func LabelsForKafka(name string) map[string]string { return map[string]string{"app": "kafka", "kafka_cr": name} } + +// StringSliceContains returns true if list contains s +func StringSliceContains(list []string, s string) bool { + for _, v := range list { + if v == s { + return true + } + } + return false +} diff --git a/api/util/util_test.go b/api/util/util_test.go index a2fa1a8ab8..133bd3dd7a 100644 --- a/api/util/util_test.go +++ b/api/util/util_test.go @@ -35,3 +35,13 @@ func TestMergeLabels(t *testing.T) { t.Error("Expected:", expected, "Got:", merged) } } + +func TestStringSliceContains(t *testing.T) { + slice := []string{"1", "2", "3"} + if !StringSliceContains(slice, "1") { + t.Error("Expected slice contains 1, got false") + } + if StringSliceContains(slice, "4") { + t.Error("Expected slice not contains 4, got true") + } +} diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index a44d358bc6..d8951f035d 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -19,7 +19,6 @@ import ( "strings" "emperror.dev/errors" - "github.com/imdario/mergo" "github.com/banzaicloud/istio-client-go/pkg/networking/v1beta1" @@ -62,19 +61,33 @@ const ( // DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1" + + // controllerNodeProcessRole represents the node is a controller node + controllerNodeProcessRole = "controller" + // brokerNodeProcessRole represents the node is a broker node + brokerNodeProcessRole = "broker" ) // KafkaClusterSpec defines the desired state of KafkaCluster type KafkaClusterSpec struct { + // kRaft is used to decide where the Kafka cluster is under KRaft mode or ZooKeeper mode. + // This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode. + // +kubebuilder:default=true + // +optional + KRaftMode bool `json:"kRaft"` HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` ListenersConfig ListenersConfig `json:"listenersConfig"` // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string // in the form hostname:port where host and port are the host and port of a ZooKeeper server. - ZKAddresses []string `json:"zkAddresses"` + // This is not used under KRaft mode. + // +optional + ZKAddresses []string `json:"zkAddresses,omitempty"` // ZKPath specifies the ZooKeeper chroot path as part // of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. + // This is not used under KRaft mode. + // +optional ZKPath string `json:"zkPath,omitempty"` RackAwareness *RackAwareness `json:"rackAwareness,omitempty"` ClusterImage string `json:"clusterImage,omitempty"` @@ -126,6 +139,8 @@ type KafkaClusterStatus struct { RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"` AlertCount int `json:"alertCount"` ListenerStatuses ListenerStatuses `json:"listenerStatuses,omitempty"` + // ClusterID is a based64-encoded random UUID generated to run the Kafka cluster in KRaft mode + ClusterID string `json:"clusterID,omitempty"` } // RollingUpgradeStatus defines status of rolling upgrade @@ -161,11 +176,12 @@ type DisruptionBudgetWithStrategy struct { DisruptionBudget `json:",inline"` // The strategy to be used, either minAvailable or maxUnavailable // +kubebuilder:validation:Enum=minAvailable;maxUnavailable - Stategy string `json:"strategy,omitempty"` + Strategy string `json:"strategy,omitempty"` } // Broker defines the broker basic configuration type Broker struct { + // id maps to "node.id" configuration in KRaft mode, and it maps to "broker.id" configuration in ZooKeeper mode. // +kubebuilder:validation:Minimum=0 // +kubebuilder:validation:Maximum=65535 // +kubebuilder:validation:ExclusiveMaximum=true @@ -173,6 +189,11 @@ type Broker struct { BrokerConfigGroup string `json:"brokerConfigGroup,omitempty"` ReadOnlyConfig string `json:"readOnlyConfig,omitempty"` BrokerConfig *BrokerConfig `json:"brokerConfig,omitempty"` + // processRoles defines the role(s) for this particular broker node: broker, controller, or both. + // This must be set in KRaft mode. + // +kubebuilder:validation:MaxItems=2 + // +optional + Roles []string `json:"processRoles,omitempty"` } // BrokerConfig defines the broker configuration @@ -859,6 +880,19 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 { return *bConfig.TerminationGracePeriod } +// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses +func (bConfig *BrokerConfig) GetStorageMountPaths() string { + var mountPaths string + for i, sc := range bConfig.StorageConfigs { + if i != len(bConfig.StorageConfigs)-1 { + mountPaths += sc.MountPath + "," + } else { + mountPaths += sc.MountPath + } + } + return mountPaths +} + // GetNodeSelector returns the node selector for cruise control func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string { return cConfig.NodeSelector @@ -1137,6 +1171,31 @@ func (b *Broker) GetBrokerConfig(kafkaClusterSpec KafkaClusterSpec) (*BrokerConf return bConfig, nil } +// IsBrokerNode returns true when the broker is a broker node +func (b *Broker) IsBrokerNode() bool { + return util.StringSliceContains(b.Roles, brokerNodeProcessRole) +} + +// IsControllerNode returns true when the broker is a controller node +func (b *Broker) IsControllerNode() bool { + return util.StringSliceContains(b.Roles, controllerNodeProcessRole) +} + +// IsBrokerOnlyNode returns true when the broker is a broker-only node +func (b *Broker) IsBrokerOnlyNode() bool { + return b.IsBrokerNode() && !b.IsControllerNode() +} + +// IsControllerOnlyNode returns true when the broker is a controller-only node +func (b *Broker) IsControllerOnlyNode() bool { + return b.IsControllerNode() && !b.IsBrokerNode() +} + +// IsCombinedNode returns true when the broker is a broker + controller node +func (b *Broker) IsCombinedNode() bool { + return b.IsBrokerNode() && b.IsControllerNode() +} + func mergeEnvs(kafkaClusterSpec KafkaClusterSpec, groupConfig, bConfig *BrokerConfig) []corev1.EnvVar { var envs []corev1.EnvVar envs = append(envs, kafkaClusterSpec.Envs...) diff --git a/api/v1beta1/kafkacluster_types_test.go b/api/v1beta1/kafkacluster_types_test.go index 72e8e7b171..8c8bb0d6f2 100644 --- a/api/v1beta1/kafkacluster_types_test.go +++ b/api/v1beta1/kafkacluster_types_test.go @@ -19,6 +19,7 @@ import ( "strconv" "testing" + "github.com/stretchr/testify/require" "gotest.tools/assert" corev1 "k8s.io/api/core/v1" @@ -458,3 +459,175 @@ func TestGetBrokerLabels(t *testing.T) { t.Error("Expected:", expected, "Got:", result) } } + +func TestGetStorageMountPaths(t *testing.T) { + testCases := []struct { + testName string + brokerConfig *BrokerConfig + expectedMountPaths string + }{ + { + testName: "BrokerConfig has no StorageConfigs", + brokerConfig: &BrokerConfig{}, + expectedMountPaths: "", + }, + { + testName: "BrokerConfig has one storage configuration under StorageConfigs", + brokerConfig: &BrokerConfig{ + StorageConfigs: []StorageConfig{ + { + MountPath: "test-log-1", + }, + }, + }, + expectedMountPaths: "test-log-1", + }, + { + testName: "BrokerConfig has multiple storage configuration under StorageConfigs", + brokerConfig: &BrokerConfig{ + StorageConfigs: []StorageConfig{ + { + MountPath: "test-log-1", + }, + { + MountPath: "test-log-2", + }, + { + MountPath: "test-log-3", + }, + { + MountPath: "test-log-4", + }, + { + MountPath: "test-log-5", + }, + }, + }, + expectedMountPaths: "test-log-1,test-log-2,test-log-3,test-log-4,test-log-5", + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + gotMountPaths := test.brokerConfig.GetStorageMountPaths() + require.Equal(t, gotMountPaths, test.expectedMountPaths) + }) + } +} + +func TestIsBrokerOnlyNode(t *testing.T) { + testCases := []struct { + testName string + broker Broker + isBrokerOnly bool + }{ + { + testName: "the broker is a broker-only node", + broker: Broker{ + Id: 0, + Roles: []string{"broker"}, + }, + isBrokerOnly: true, + }, + { + testName: "the broker is a controller-only node", + broker: Broker{ + Id: 0, + Roles: []string{"controller"}, + }, + isBrokerOnly: false, + }, + { + testName: "the broker is a isCombined node", + broker: Broker{ + Id: 0, + Roles: []string{"controller", "broker"}, + }, + isBrokerOnly: false, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + require.Equal(t, test.broker.IsBrokerOnlyNode(), test.isBrokerOnly) + }) + } +} + +func TestIsControllerOnlyNode(t *testing.T) { + testCases := []struct { + testName string + broker Broker + isControllerOnly bool + }{ + { + testName: "the broker is a controller-only node", + broker: Broker{ + Id: 0, + Roles: []string{"controller"}, + }, + isControllerOnly: true, + }, + { + testName: "the broker is a broker-only node", + broker: Broker{ + Id: 0, + Roles: []string{"broker"}, + }, + isControllerOnly: false, + }, + { + testName: "the broker is a isCombined node", + broker: Broker{ + Id: 0, + Roles: []string{"broker", "controller"}, + }, + isControllerOnly: false, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + require.Equal(t, test.broker.IsControllerOnlyNode(), test.isControllerOnly) + }) + } +} + +func TestIsCombinedNode(t *testing.T) { + testCases := []struct { + testName string + broker Broker + isCombined bool + }{ + { + testName: "the broker is a broker-only node", + broker: Broker{ + Id: 0, + Roles: []string{"broker"}, + }, + isCombined: false, + }, + { + testName: "the broker is a controller-only node", + broker: Broker{ + Id: 0, + Roles: []string{"controller"}, + }, + isCombined: false, + }, + { + testName: "the broker is a isCombined node", + broker: Broker{ + Id: 0, + Roles: []string{"broker", "controller"}, + }, + isCombined: true, + }, + } + + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + require.Equal(t, test.broker.IsCombinedNode(), test.isCombined) + }) + } +} diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index a3e79c9ca3..0463d6e2f8 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -51,6 +51,11 @@ func (in *Broker) DeepCopyInto(out *Broker) { *out = new(BrokerConfig) (*in).DeepCopyInto(*out) } + if in.Roles != nil { + in, out := &in.Roles, &out.Roles + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Broker. diff --git a/charts/kafka-operator/templates/crds.yaml b/charts/kafka-operator/templates/crds.yaml index 9368b81178..8d12b99507 100644 --- a/charts/kafka-operator/templates/crds.yaml +++ b/charts/kafka-operator/templates/crds.yaml @@ -12823,11 +12823,21 @@ spec: brokerConfigGroup: type: string id: + description: id maps to "node.id" configuration in KRaft mode, + and it maps to "broker.id" configuration in ZooKeeper mode. exclusiveMaximum: true format: int32 maximum: 65535 minimum: 0 type: integer + processRoles: + description: 'processRoles defines the role(s) for this particular + broker node: broker, controller, or both. This must be set + in KRaft mode.' + items: + type: string + maxItems: 2 + type: array readOnlyConfig: type: string required: @@ -19056,6 +19066,12 @@ spec: type: string type: object type: object + kRaft: + default: true + description: kRaft is used to decide where the Kafka cluster is under + KRaft mode or ZooKeeper mode. This is default to be true; if set + to false, the Kafka cluster is in ZooKeeper mode. + type: boolean kubernetesClusterDomain: type: string listenersConfig: @@ -21703,14 +21719,15 @@ spec: zkAddresses: description: ZKAddresses specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port - of a ZooKeeper server. + of a ZooKeeper server. This is not used under KRaft mode. items: type: string type: array zkPath: description: ZKPath specifies the ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path - in the global ZooKeeper namespace. + in the global ZooKeeper namespace. This is not used under KRaft + mode. type: string required: - brokers @@ -21719,7 +21736,6 @@ spec: - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig - - zkAddresses type: object status: description: KafkaClusterStatus defines the observed state of KafkaCluster @@ -21813,6 +21829,10 @@ spec: - rackAwarenessState type: object type: object + clusterID: + description: ClusterID is a based64-encoded random UUID generated + to run the Kafka cluster in KRaft mode + type: string cruiseControlTopicStatus: description: CruiseControlTopicStatus holds info about the CC topic status diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index b09d32ff4c..330866a3eb 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -12660,11 +12660,21 @@ spec: brokerConfigGroup: type: string id: + description: id maps to "node.id" configuration in KRaft mode, + and it maps to "broker.id" configuration in ZooKeeper mode. exclusiveMaximum: true format: int32 maximum: 65535 minimum: 0 type: integer + processRoles: + description: 'processRoles defines the role(s) for this particular + broker node: broker, controller, or both. This must be set + in KRaft mode.' + items: + type: string + maxItems: 2 + type: array readOnlyConfig: type: string required: @@ -18893,6 +18903,12 @@ spec: type: string type: object type: object + kRaft: + default: true + description: kRaft is used to decide where the Kafka cluster is under + KRaft mode or ZooKeeper mode. This is default to be true; if set + to false, the Kafka cluster is in ZooKeeper mode. + type: boolean kubernetesClusterDomain: type: string listenersConfig: @@ -21540,14 +21556,15 @@ spec: zkAddresses: description: ZKAddresses specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port - of a ZooKeeper server. + of a ZooKeeper server. This is not used under KRaft mode. items: type: string type: array zkPath: description: ZKPath specifies the ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path - in the global ZooKeeper namespace. + in the global ZooKeeper namespace. This is not used under KRaft + mode. type: string required: - brokers @@ -21556,7 +21573,6 @@ spec: - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig - - zkAddresses type: object status: description: KafkaClusterStatus defines the observed state of KafkaCluster @@ -21650,6 +21666,10 @@ spec: - rackAwarenessState type: object type: object + clusterID: + description: ClusterID is a based64-encoded random UUID generated + to run the Kafka cluster in KRaft mode + type: string cruiseControlTopicStatus: description: CruiseControlTopicStatus holds info about the CC topic status