Skip to content

Commit

Permalink
Add koperator/api changes for KRaft support
Browse files Browse the repository at this point in the history
  • Loading branch information
panyuenlau committed Jul 25, 2023
1 parent 7fbb5e1 commit 06e556e
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 9 deletions.
4 changes: 4 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/cert-manager/cert-manager v1.11.2
github.com/imdario/mergo v0.3.13
github.com/stretchr/testify v1.8.1
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
gotest.tools v2.2.0+incompatible
k8s.io/api v0.26.4
Expand All @@ -15,6 +16,7 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
Expand All @@ -23,12 +25,14 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/text v0.7.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -100,8 +105,10 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
k8s.io/api v0.26.4 h1:qSG2PmtcD23BkYiWfoYAcak870eF/hE7NNYBYavTT94=
Expand Down
10 changes: 10 additions & 0 deletions api/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ func MergeLabels(l ...map[string]string) map[string]string {
func LabelsForKafka(name string) map[string]string {
return map[string]string{"app": "kafka", "kafka_cr": name}
}

// StringSliceContains returns true if list contains s
func StringSliceContains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}
10 changes: 10 additions & 0 deletions api/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ func TestMergeLabels(t *testing.T) {
t.Error("Expected:", expected, "Got:", merged)
}
}

func TestStringSliceContains(t *testing.T) {
slice := []string{"1", "2", "3"}
if !StringSliceContains(slice, "1") {
t.Error("Expected slice contains 1, got false")
}
if StringSliceContains(slice, "4") {
t.Error("Expected slice not contains 4, got true")
}
}
65 changes: 62 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strings"

"emperror.dev/errors"

"github.com/imdario/mergo"

"github.com/banzaicloud/istio-client-go/pkg/networking/v1beta1"
Expand Down Expand Up @@ -62,19 +61,33 @@ const (

// DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage
DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1"

// controllerNodeProcessRole represents the node is a controller node
controllerNodeProcessRole = "controller"
// brokerNodeProcessRole represents the node is a broker node
brokerNodeProcessRole = "broker"
)

// KafkaClusterSpec defines the desired state of KafkaCluster
type KafkaClusterSpec struct {
// kRaft is used to decide where the Kafka cluster is under KRaft mode or ZooKeeper mode.
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=true
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
ZKAddresses []string `json:"zkAddresses"`
// This is not used under KRaft mode.
// +optional
ZKAddresses []string `json:"zkAddresses,omitempty"`
// ZKPath specifies the ZooKeeper chroot path as part
// of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace.
// This is not used under KRaft mode.
// +optional
ZKPath string `json:"zkPath,omitempty"`
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
ClusterImage string `json:"clusterImage,omitempty"`
Expand Down Expand Up @@ -126,6 +139,8 @@ type KafkaClusterStatus struct {
RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"`
AlertCount int `json:"alertCount"`
ListenerStatuses ListenerStatuses `json:"listenerStatuses,omitempty"`
// ClusterID is a based64-encoded random UUID generated to run the Kafka cluster in KRaft mode
ClusterID string `json:"clusterID,omitempty"`
}

// RollingUpgradeStatus defines status of rolling upgrade
Expand Down Expand Up @@ -161,18 +176,24 @@ type DisruptionBudgetWithStrategy struct {
DisruptionBudget `json:",inline"`
// The strategy to be used, either minAvailable or maxUnavailable
// +kubebuilder:validation:Enum=minAvailable;maxUnavailable
Stategy string `json:"strategy,omitempty"`
Strategy string `json:"strategy,omitempty"`
}

// Broker defines the broker basic configuration
type Broker struct {
// id maps to "node.id" configuration in KRaft mode, and it maps to "broker.id" configuration in ZooKeeper mode.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:validation:ExclusiveMaximum=true
Id int32 `json:"id"`
BrokerConfigGroup string `json:"brokerConfigGroup,omitempty"`
ReadOnlyConfig string `json:"readOnlyConfig,omitempty"`
BrokerConfig *BrokerConfig `json:"brokerConfig,omitempty"`
// processRoles defines the role(s) for this particular broker node: broker, controller, or both.
// This must be set in KRaft mode.
// +kubebuilder:validation:MaxItems=2
// +optional
Roles []string `json:"processRoles,omitempty"`
}

// BrokerConfig defines the broker configuration
Expand Down Expand Up @@ -859,6 +880,19 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
return *bConfig.TerminationGracePeriod
}

// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses
func (bConfig *BrokerConfig) GetStorageMountPaths() string {
var mountPaths string
for i, sc := range bConfig.StorageConfigs {
if i != len(bConfig.StorageConfigs)-1 {
mountPaths += sc.MountPath + ","
} else {
mountPaths += sc.MountPath
}
}
return mountPaths
}

// GetNodeSelector returns the node selector for cruise control
func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string {
return cConfig.NodeSelector
Expand Down Expand Up @@ -1137,6 +1171,31 @@ func (b *Broker) GetBrokerConfig(kafkaClusterSpec KafkaClusterSpec) (*BrokerConf
return bConfig, nil
}

// IsBrokerNode returns true when the broker is a broker node
func (b *Broker) IsBrokerNode() bool {
return util.StringSliceContains(b.Roles, brokerNodeProcessRole)
}

// IsControllerNode returns true when the broker is a controller node
func (b *Broker) IsControllerNode() bool {
return util.StringSliceContains(b.Roles, controllerNodeProcessRole)
}

// IsBrokerOnlyNode returns true when the broker is a broker-only node
func (b *Broker) IsBrokerOnlyNode() bool {
return b.IsBrokerNode() && !b.IsControllerNode()
}

// IsControllerOnlyNode returns true when the broker is a controller-only node
func (b *Broker) IsControllerOnlyNode() bool {
return b.IsControllerNode() && !b.IsBrokerNode()
}

// IsCombinedNode returns true when the broker is a broker + controller node
func (b *Broker) IsCombinedNode() bool {
return b.IsBrokerNode() && b.IsControllerNode()
}

func mergeEnvs(kafkaClusterSpec KafkaClusterSpec, groupConfig, bConfig *BrokerConfig) []corev1.EnvVar {
var envs []corev1.EnvVar
envs = append(envs, kafkaClusterSpec.Envs...)
Expand Down
173 changes: 173 additions & 0 deletions api/v1beta1/kafkacluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"testing"

"github.com/stretchr/testify/require"
"gotest.tools/assert"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -458,3 +459,175 @@ func TestGetBrokerLabels(t *testing.T) {
t.Error("Expected:", expected, "Got:", result)
}
}

func TestGetStorageMountPaths(t *testing.T) {
testCases := []struct {
testName string
brokerConfig *BrokerConfig
expectedMountPaths string
}{
{
testName: "BrokerConfig has no StorageConfigs",
brokerConfig: &BrokerConfig{},
expectedMountPaths: "",
},
{
testName: "BrokerConfig has one storage configuration under StorageConfigs",
brokerConfig: &BrokerConfig{
StorageConfigs: []StorageConfig{
{
MountPath: "test-log-1",
},
},
},
expectedMountPaths: "test-log-1",
},
{
testName: "BrokerConfig has multiple storage configuration under StorageConfigs",
brokerConfig: &BrokerConfig{
StorageConfigs: []StorageConfig{
{
MountPath: "test-log-1",
},
{
MountPath: "test-log-2",
},
{
MountPath: "test-log-3",
},
{
MountPath: "test-log-4",
},
{
MountPath: "test-log-5",
},
},
},
expectedMountPaths: "test-log-1,test-log-2,test-log-3,test-log-4,test-log-5",
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
gotMountPaths := test.brokerConfig.GetStorageMountPaths()
require.Equal(t, gotMountPaths, test.expectedMountPaths)
})
}
}

func TestIsBrokerOnlyNode(t *testing.T) {
testCases := []struct {
testName string
broker Broker
isBrokerOnly bool
}{
{
testName: "the broker is a broker-only node",
broker: Broker{
Id: 0,
Roles: []string{"broker"},
},
isBrokerOnly: true,
},
{
testName: "the broker is a controller-only node",
broker: Broker{
Id: 0,
Roles: []string{"controller"},
},
isBrokerOnly: false,
},
{
testName: "the broker is a isCombined node",
broker: Broker{
Id: 0,
Roles: []string{"controller", "broker"},
},
isBrokerOnly: false,
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
require.Equal(t, test.broker.IsBrokerOnlyNode(), test.isBrokerOnly)
})
}
}

func TestIsControllerOnlyNode(t *testing.T) {
testCases := []struct {
testName string
broker Broker
isControllerOnly bool
}{
{
testName: "the broker is a controller-only node",
broker: Broker{
Id: 0,
Roles: []string{"controller"},
},
isControllerOnly: true,
},
{
testName: "the broker is a broker-only node",
broker: Broker{
Id: 0,
Roles: []string{"broker"},
},
isControllerOnly: false,
},
{
testName: "the broker is a isCombined node",
broker: Broker{
Id: 0,
Roles: []string{"broker", "controller"},
},
isControllerOnly: false,
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
require.Equal(t, test.broker.IsControllerOnlyNode(), test.isControllerOnly)
})
}
}

func TestIsCombinedNode(t *testing.T) {
testCases := []struct {
testName string
broker Broker
isCombined bool
}{
{
testName: "the broker is a broker-only node",
broker: Broker{
Id: 0,
Roles: []string{"broker"},
},
isCombined: false,
},
{
testName: "the broker is a controller-only node",
broker: Broker{
Id: 0,
Roles: []string{"controller"},
},
isCombined: false,
},
{
testName: "the broker is a isCombined node",
broker: Broker{
Id: 0,
Roles: []string{"broker", "controller"},
},
isCombined: true,
},
}

for _, test := range testCases {
t.Run(test.testName, func(t *testing.T) {
require.Equal(t, test.broker.IsCombinedNode(), test.isCombined)
})
}
}
Loading

0 comments on commit 06e556e

Please sign in to comment.