Skip to content

Commit

Permalink
Allow property security-inter-broker-protocol (#85)
Browse files Browse the repository at this point in the history
* adding the ability to use security-inter-broker-protocol in koperator

* updating util.go to remove _ for generated names

* adding replace all for external listener port name

* fixing other places where externallistener name is used to not have _

* adding an alternative way to identify which port to use for kafka administration and cc connection

* taking out comments for pr push

* fixing kafka crd

* setting omitempty so it will not be required

* adding generated crds

* adding comments with context for new flag UsedForKafkaAdminCommunication

* Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set

* Update crds to match generated manifest

---------

Co-authored-by: Cameron Wright <[email protected]>
Co-authored-by: Ha Van <[email protected]>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent b1e9544 commit 1f0d47f
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 14 deletions.
3 changes: 3 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,9 @@ type CommonListenerSpec struct {
// At least one of the listeners should have this flag enabled
// +optional
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
// UsedForKafkaAdminCommunication allows for a different port to be returned when the koperator is checking for the port to use to check if kafka is operating.
// +optional
UsedForKafkaAdminCommunication bool `json:"usedForKafkaAdminCommunication,omitempty"`
}

func (c *CommonListenerSpec) GetServerSSLCertSecretName() string {
Expand Down
10 changes: 10 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21677,6 +21677,11 @@ spec:
description: At least one of the listeners should have this
flag enabled
type: boolean
usedForKafkaAdminCommunication:
description: UsedForKafkaAdminCommunication allows for a
different port to be returned when the koperator is checking
for the port to use to check if kafka is operating.
type: boolean
required:
- containerPort
- externalStartingPort
Expand Down Expand Up @@ -21753,6 +21758,11 @@ spec:
description: At least one of the listeners should have this
flag enabled
type: boolean
usedForKafkaAdminCommunication:
description: UsedForKafkaAdminCommunication allows for a
different port to be returned when the koperator is checking
for the port to use to check if kafka is operating.
type: boolean
required:
- containerPort
- name
Expand Down
10 changes: 10 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21677,6 +21677,11 @@ spec:
description: At least one of the listeners should have this
flag enabled
type: boolean
usedForKafkaAdminCommunication:
description: UsedForKafkaAdminCommunication allows for a
different port to be returned when the koperator is checking
for the port to use to check if kafka is operating.
type: boolean
required:
- containerPort
- externalStartingPort
Expand Down Expand Up @@ -21753,6 +21758,11 @@ spec:
description: At least one of the listeners should have this
flag enabled
type: boolean
usedForKafkaAdminCommunication:
description: UsedForKafkaAdminCommunication allows for a
different port to be returned when the koperator is checking
for the port to use to check if kafka is operating.
type: boolean
required:
- containerPort
- name
Expand Down
23 changes: 16 additions & 7 deletions pkg/resources/kafka/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
config := properties.NewProperties()

// Add listener configuration
listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, serverPasses, log)
listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
config.Merge(listenerConf)

// Add listener configuration
Expand Down Expand Up @@ -87,9 +87,13 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
}
}

// Add Cruise Control Metrics Reporter configuration
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters))
// Add Cruise Control Metrics Reporter configuration.
// When "security.inter.broker.protocol" (e.g. inter broker communication is secure) is configured, the operator disables the reporter.
_, isSecurityInterBrokerProtocolConfigured := getBrokerReadOnlyConfig(id, r.KafkaCluster, log).Get(kafkautils.KafkaConfigSecurityInterBrokerProtocol)
if !isSecurityInterBrokerProtocolConfigured {
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters))
}
}
bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster)
if err != nil {
Expand Down Expand Up @@ -244,12 +248,14 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s
return controlPlaneListener
}

func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) *properties.Properties {
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) *properties.Properties {
var (
interBrokerListenerName string
securityProtocolMapConfig []string
listenerConfig []string
)
l := kcs.ListenersConfig
r := kcs.ReadOnlyConfig

config := properties.NewProperties()

Expand Down Expand Up @@ -292,9 +298,12 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map
if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap))
}
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
}
}

if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/resources/kafka/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,26 @@ metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlM
super.users=User:CN=kafka-headless.kafka.svc.cluster.local
zookeeper.connect=example.zk:2181/`,
},
{
testName: "security_inter_broker_protocol_Set",
readOnlyConfig: `security.inter.broker.protocol=SASL_SSL`,
zkAddresses: []string{"example.zk:2181"},
zkPath: ``,
kubernetesClusterDomain: ``,
clusterWideConfig: ``,
perBrokerConfig: ``,
perBrokerReadOnlyConfig: ``,
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
listenerType: "plaintext",
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
broker.id=0
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
cruise.control.metrics.reporter.kubernetes.mode=true
listener.security.protocol.map=INTERNAL:PLAINTEXT
listeners=INTERNAL://:9092
zookeeper.connect=example.zk:2181/
security.inter.broker.protocol=SASL_SSL`,
},
}

t.Parallel()
Expand Down
10 changes: 8 additions & 2 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,20 +1513,26 @@ func getServiceFromExternalListener(client client.Client, cluster *v1beta1.Kafka
case istioingressutils.IngressControllerName:
if ingressConfigName == util.IngressConfigGlobalName {
iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplate, eListenerName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
} else {
iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplateWithScope, eListenerName, ingressConfigName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
}
case envoyutils.IngressControllerName:
if ingressConfigName == util.IngressConfigGlobalName {
iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceName, eListenerName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
} else {
iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
}
case contourutils.IngressControllerName:
if ingressConfigName == util.IngressConfigGlobalName {
iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceName, eListenerName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
} else {
iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName())
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
}
}

Expand Down Expand Up @@ -1602,7 +1608,7 @@ func generateServicePortForIListeners(listeners []v1beta1.InternalListenerConfig
var usedPorts []corev1.ServicePort
for _, iListener := range listeners {
usedPorts = append(usedPorts, corev1.ServicePort{
Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", ""),
Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", "-"),
Port: iListener.ContainerPort,
TargetPort: intstr.FromInt(int(iListener.ContainerPort)),
Protocol: corev1.ProtocolTCP,
Expand All @@ -1615,7 +1621,7 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig
var usedPorts []corev1.ServicePort
for _, eListener := range listeners {
usedPorts = append(usedPorts, corev1.ServicePort{
Name: eListener.GetListenerServiceName(),
Name: strings.ReplaceAll(eListener.GetListenerServiceName(), "_", "-"),
Protocol: corev1.ProtocolTCP,
Port: eListener.ContainerPort,
TargetPort: intstr.FromInt(int(eListener.ContainerPort)),
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ func UseSSL(cluster *v1beta1.KafkaCluster) bool {

func getContainerPortForInnerCom(internalListeners []v1beta1.InternalListenerConfig, extListeners []v1beta1.ExternalListenerConfig) int32 {
for _, val := range internalListeners {
if val.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port.
return val.ContainerPort
}
if val.UsedForInnerBrokerCommunication {
return val.ContainerPort
}
}

for _, val := range extListeners {
if val.UsedForKafkaAdminCommunication {
return val.ContainerPort
}
if val.UsedForInnerBrokerCommunication {
return val.ContainerPort
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/kafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,22 @@ func GetBootstrapServersService(cluster *v1beta1.KafkaCluster) (string, error) {
// GetBrokerContainerPort return broker container port
func GetBrokerContainerPort(cluster *v1beta1.KafkaCluster) (int32, error) {
containerPort := int32(0)

for _, lc := range cluster.Spec.ListenersConfig.InternalListeners {
if lc.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port.
containerPort = lc.ContainerPort
break
}
if lc.UsedForInnerBrokerCommunication && !lc.UsedForControllerCommunication {
containerPort = lc.ContainerPort
break
}
}

for _, lc := range cluster.Spec.ListenersConfig.ExternalListeners {
if lc.UsedForKafkaAdminCommunication {
containerPort = lc.ContainerPort
break
}
if lc.UsedForInnerBrokerCommunication {
containerPort = lc.ContainerPort
break
Expand Down
1 change: 1 addition & 0 deletions pkg/util/kafka/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
KafkaConfigListenerName = "listener.name"
KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map"
KafkaConfigInterBrokerListenerName = "inter.broker.listener.name"
KafkaConfigSecurityInterBrokerProtocol = "security.inter.broker.protocol"
KafkaConfigAdvertisedListeners = "advertised.listeners"
KafkaConfigControlPlaneListener = "control.plane.listener.name"

Expand Down
10 changes: 6 additions & 4 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,12 @@ func IsIngressConfigInUse(iConfigName, defaultConfigName string, cluster *v1beta

// ConstructEListenerLabelName construct an eListener label name based on ingress config name and listener name
func ConstructEListenerLabelName(ingressConfigName, eListenerName string) string {
externalListenerName := strings.ReplaceAll(eListenerName, "_", "-")
if ingressConfigName == IngressConfigGlobalName {
return eListenerName
return externalListenerName
}

return fmt.Sprintf(ExternalListenerLabelNameTemplate, eListenerName, ingressConfigName)
return fmt.Sprintf(ExternalListenerLabelNameTemplate, externalListenerName, ingressConfigName)
}

// ShouldIncludeBroker returns true if the broker should be included as a resource on external listener resources
Expand Down Expand Up @@ -437,10 +438,11 @@ func ConvertConfigEntryListToProperties(config []sarama.ConfigEntry) (*propertie
func GenerateEnvoyResourceName(resourceNameFormat string, resourceNameWithScopeFormat string, extListener v1beta1.ExternalListenerConfig, ingressConfig v1beta1.IngressConfig,
ingressConfigName, clusterName string) string {
var resourceName string
externalListenerName := strings.ReplaceAll(extListener.Name, "_", "-")
if ingressConfigName == IngressConfigGlobalName {
resourceName = fmt.Sprintf(resourceNameFormat, extListener.Name, clusterName)
resourceName = fmt.Sprintf(resourceNameFormat, externalListenerName, clusterName)
} else {
resourceName = fmt.Sprintf(resourceNameWithScopeFormat, extListener.Name, ingressConfigName, clusterName)
resourceName = fmt.Sprintf(resourceNameWithScopeFormat, externalListenerName, ingressConfigName, clusterName)
}

return resourceName
Expand Down
83 changes: 83 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,89 @@ cruise.control.metrics.reporter.kubernetes.mode=true`,
}
}

func TestConstructEListenerLabelName(t *testing.T) {
tests := []struct {
ingressConfigName string
eListenerName string
expected string
}{
{"globalConfig", "example_listener_name", "example-listener-name"},
{"globalConfig", "no_underscores", "no-underscores"},
{"globalConfig", "multiple___underscores", "multiple---underscores"},
{"globalConfig", "noUnderscoresHere", "noUnderscoresHere"},
{"nonGlobalConfig", "example_listener_name", "example-listener-name-nonGlobalConfig"},
}

for _, test := range tests {
result := ConstructEListenerLabelName(test.ingressConfigName, test.eListenerName)
if result != test.expected {
t.Errorf("ConstructEListenerLabelName(%q, %q) = %q; want %q", test.ingressConfigName, test.eListenerName, result, test.expected)
}
}
}

func TestGenerateEnvoyResourceName(t *testing.T) {
testCases := []struct {
resourceNameFormat string
resourceNameWithScopeFormat string
extListener v1beta1.ExternalListenerConfig
ingressConfig v1beta1.IngressConfig
ingressConfigName, clusterName, expected string
}{
{
resourceNameFormat: "%s-%s",
resourceNameWithScopeFormat: "%s-%s-%s",
extListener: v1beta1.ExternalListenerConfig{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"},
},
ingressConfig: v1beta1.IngressConfig{},
ingressConfigName: "globalConfig",
clusterName: "clusterName",
expected: "noUnderscores-clusterName",
},
{
resourceNameFormat: "%s-%s",
resourceNameWithScopeFormat: "%s-%s-%s",
extListener: v1beta1.ExternalListenerConfig{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"},
},
ingressConfig: v1beta1.IngressConfig{},
ingressConfigName: "globalConfig",
clusterName: "clusterName",
expected: "under-scores-clusterName",
},
{
resourceNameFormat: "%s-%s",
resourceNameWithScopeFormat: "%s-%s-%s",
extListener: v1beta1.ExternalListenerConfig{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"},
},
ingressConfig: v1beta1.IngressConfig{},
ingressConfigName: "nonGlobalConfig",
clusterName: "clusterName",
expected: "noUnderscores-nonGlobalConfig-clusterName",
},
{
resourceNameFormat: "%s-%s",
resourceNameWithScopeFormat: "%s-%s-%s",
extListener: v1beta1.ExternalListenerConfig{
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"},
},
ingressConfig: v1beta1.IngressConfig{},
ingressConfigName: "nonGlobalConfig",
clusterName: "clusterName",
expected: "under-scores-nonGlobalConfig-clusterName",
},
}
for _, test := range testCases {
hash := GenerateEnvoyResourceName(test.resourceNameFormat, test.resourceNameWithScopeFormat, test.extListener, test.ingressConfig,
test.ingressConfigName, test.clusterName)
if hash != test.expected {
t.Errorf("Expected: %s Got: %s", test.expected, hash)
}
}
}

func TestGetMD5Hash(t *testing.T) {
testCases := []struct {
testName string
Expand Down

0 comments on commit 1f0d47f

Please sign in to comment.