Skip to content

Commit

Permalink
Added internal SSL listener test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuvesz committed Jul 19, 2023
1 parent 34071dc commit 50ecf33
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 27 deletions.
21 changes: 13 additions & 8 deletions tests/e2e/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ const (
kubectlArgGoTemplateExternalListenersName = `-o=go-template='{{range $key,$value := .status.listenerStatuses.externalListeners}}{{$key}}{{"\n"}}{{end}}`
kubectlArgGoTemplateExternalListenerAddressesTemplate = `-o=go-template='{{range .status.listenerStatuses.externalListeners.%s}}{{.address}}{{"\n"}}{{end}}`


crdKind = "customresourcedefinitions.apiextensions.k8s.io"
kafkaKind = "kafkaclusters.kafka.banzaicloud.io"
kafkaTopicKind = "kafkatopics.kafka.banzaicloud.io"
kafkaClusterName = "kafka"
testExternalTopicName = "topic-test-external"
testInternalTopicName = "topic-test-internal"

crdKind = "customresourcedefinitions.apiextensions.k8s.io"
kafkaKind = "kafkaclusters.kafka.banzaicloud.io"
kafkaTopicKind = "kafkatopics.kafka.banzaicloud.io"
kafkaUserKind = "kafkausers.kafka.banzaicloud.io"
kafkaClusterName = "kafka"
kafkaUserName = "test-user"
testExternalTopicName = "topic-test-external"
testInternalTopicName = "topic-test-internal"

defaultTLSSecretName = "test-secret"
kcatPodName = "kcat"
zookeeperKind = "zookeeperclusters.zookeeper.pravega.io"
zookeeperClusterName = "zookeeper-server"
Expand All @@ -49,8 +51,10 @@ const (
defaultDeletionTimeout = 20 * time.Second
defaultPodReadinessWaitTime = 10 * time.Second
defaultTopicCreationWaitTime = 10 * time.Second
defaultUserCreationWaitTime = 10 * time.Second
kafkaClusterCreateTimeout = 500 * time.Second
kafkaClusterResourceCleanupTimeout = 120 * time.Second
kcatDeleetionTimeout = 40 * time.Second
zookeeperClusterCreateTimeout = 4 * time.Minute
zookeeperClusterResourceCleanupTimeout = 60 * time.Second
externalConsumerTimeout = 5 * time.Second
Expand All @@ -60,6 +64,7 @@ const (

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
kafkaUserTemplate = "templates/user.yaml.tmpl"
zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl"

kubectlNotFoundErrorMsg = "NotFound"
Expand Down
23 changes: 23 additions & 0 deletions tests/e2e/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,26 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str
})

}

// requireCreatingKafkaUser creates a KafkaUser resource from a template
func requireCreatingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) {
It("Deploying KafkaUser CR", func() {
templateParameters := map[string]interface{}{
"Name": userName,
"Namespace": kubectlOptions.Namespace,
}
if tlsSecretName != "" {
templateParameters["TLSSecretName"] = tlsSecretName
}

err := applyK8sResourceFromTemplate(kubectlOptions,
kafkaUserTemplate,
templateParameters,
)
Expect(err).ShouldNot(HaveOccurred())

secretFound := isExistingK8SResource(kubectlOptions, "secret", tlsSecretName)
Expect(secretFound).To(BeTrue())
})

}
20 changes: 15 additions & 5 deletions tests/e2e/kcat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ import (

// consumingMessagesInternally consuming messages based on parameters from Kafka cluster.
// It returns messages in string slice.
func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string) (string, error) {
func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, tlsMode bool) (string, error) {

By(fmt.Sprintf("Consuming messages from internalKafkaAddress: '%s' topicName: '%s'", internalKafkaAddress, topicName))

kcatTLSParameters := ""
if tlsMode {
kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt"
}

consumedMessages, err := k8s.RunKubectlAndGetOutputE(GinkgoT(),
k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""),
"exec", kcatPodName,
"-n", kubectlOptions.Namespace,
"--",
"/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s -t %s -e -C ", internalKafkaAddress, topicName),
"/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s %s -t %s -e -C ", internalKafkaAddress, kcatTLSParameters, topicName),
)

if err != nil {
Expand All @@ -43,16 +48,21 @@ func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName
}

// producingMessagesInternally produces messages based on the parameters into kafka cluster.
func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string) error {
func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string, tlsMode bool) error {
By(fmt.Sprintf("Producing messages: '%s' to internalKafkaAddress: '%s' topicName: '%s'", message, internalKafkaAddress, topicName))

kcatTLSParameters := ""
if tlsMode {
kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt"
}

_, err := k8s.RunKubectlAndGetOutputE(GinkgoT(),
k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""),
"exec", kcatPodName,
"-n", kubectlOptions.Namespace,
"--",
"/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s -t %s -P",
message, internalKafkaAddress, topicName),
"/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s %s -t %s -P",
message, internalKafkaAddress, kcatTLSParameters, topicName),
)

return err
Expand Down
6 changes: 4 additions & 2 deletions tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ var _ = BeforeSuite(func() {
})

var _ = When("Testing e2e test altogether", Ordered, func() {
//testInstall()
testInstall()
testInstallZookeeperCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml")
testProduceConsumeInternal()
testUninstallKafkaCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml")
testProduceConsumeInternalSSL(defaultTLSSecretName)
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
//testUninstall()
testUninstall()
})
31 changes: 21 additions & 10 deletions tests/e2e/produce_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ import (
)

// requireDeployingKcatPod deploys kcat pod form a template and checks the pod readiness
func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string) {
func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string, tlsSecretName string) {
It("Deploying Kcat Pod", func() {
templateParameters := map[string]interface{}{
"Name": kcatPodName,
"Namespace": kubectlOptions.Namespace,
}
if tlsSecretName != "" {
templateParameters["TLSSecretName"] = tlsSecretName
}

err := applyK8sResourceFromTemplate(kubectlOptions,
kcatPodTemplate,
map[string]interface{}{
"Name": kcatPodName,
"Namespace": kubectlOptions.Namespace,
},
templateParameters,
)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -48,15 +53,16 @@ func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string)
// requireDeleteKcatPod deletes kcat pod.
func requireDeleteKcatPod(kubectlOptions k8s.KubectlOptions, podName string) {
It("Deleting Kcat pod", func() {
err := deleteK8sResource(kubectlOptions, defaultDeletionTimeout, "pods", "", podName)
err := deleteK8sResource(kubectlOptions, kcatDeleetionTimeout, "pods", "", podName)
Expect(err).NotTo(HaveOccurred())
})
}

// requireInternalProducingConsumingMessage produces and consumes messages internally through a kcat pod
// and makes comparisons between the produced and consumed messages.
// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status
func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string) {
// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status.
// When tlsSecretName is set
func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string, tlsSecretName string) {
It(fmt.Sprintf("Producing and consuming messages to/from topicName: '%s", topicName), func() {
if internalAddress == "" {
By("Getting Kafka cluster internal addresses")
Expand All @@ -83,11 +89,16 @@ func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions,
internalAddress = internalListenerAddresses[0]
}

tlsMode := false
if tlsSecretName != "" {
tlsMode = true
}

currentTime := time.Now()
err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String())
err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String(), tlsMode)
Expect(err).NotTo(HaveOccurred())

consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName)
consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, tlsMode)

Expect(err).NotTo(HaveOccurred())
Expect(consumedMessages).Should(ContainSubstring(currentTime.String()))
Expand Down
9 changes: 9 additions & 0 deletions tests/e2e/templates/kcat.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ spec:
# Just spin & wait forever
command: [ "/bin/sh", "-c", "--" ]
args: [ "while true; do sleep 3000; done;" ]
{{ with .TLSSecretName }}
volumeMounts:
- name: sslcerts
mountPath: "/ssl/certs"
volumes:
- name: sslcerts
secret:
secretName: {{ . }}
{{ end }}
11 changes: 11 additions & 0 deletions tests/e2e/templates/user.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
name: {{or .Name "test-user"}}
namespace: {{or .Namespace "kafka" }}
spec:
clusterRef:
name: kafka
{{ with .TLSSecretName }}
secretName: {{ . }}
{{ end }}
25 changes: 23 additions & 2 deletions tests/e2e/test_produce_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,30 @@ func testProduceConsumeInternal() bool {

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace

requireDeployingKcatPod(kubectlOptions, kcatPodName)
requireDeployingKcatPod(kubectlOptions, kcatPodName, "")
requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, "")
requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName)
requireDeleteKcatPod(kubectlOptions, kcatPodName)
})
}

func testProduceConsumeInternalSSL(tlsSecretName string) bool {
return When("Internally produce and consume message to/from Kafka cluster using SSL", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace

requireCreatingKafkaUser(kubectlOptions, kafkaUserName, tlsSecretName)
requireDeployingKcatPod(kubectlOptions, kcatPodName, tlsSecretName)
requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, tlsSecretName)
requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName)
requireDeleteKcatPod(kubectlOptions, kcatPodName)
})
Expand Down

0 comments on commit 50ecf33

Please sign in to comment.