From 50ecf33d3840269eb5d32882f1745106b5e6651c Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Wed, 19 Jul 2023 17:26:33 +0200 Subject: [PATCH 1/5] Added internal SSL listener test --- tests/e2e/const.go | 21 ++++++++++++-------- tests/e2e/kafka.go | 23 ++++++++++++++++++++++ tests/e2e/kcat.go | 20 ++++++++++++++----- tests/e2e/koperator_suite_test.go | 6 ++++-- tests/e2e/produce_consume.go | 31 ++++++++++++++++++++---------- tests/e2e/templates/kcat.yaml.tmpl | 9 +++++++++ tests/e2e/templates/user.yaml.tmpl | 11 +++++++++++ tests/e2e/test_produce_consume.go | 25 ++++++++++++++++++++++-- 8 files changed, 119 insertions(+), 27 deletions(-) create mode 100644 tests/e2e/templates/user.yaml.tmpl diff --git a/tests/e2e/const.go b/tests/e2e/const.go index 16e4a45b4..cbb97c399 100644 --- a/tests/e2e/const.go +++ b/tests/e2e/const.go @@ -31,14 +31,16 @@ const ( kubectlArgGoTemplateExternalListenersName = `-o=go-template='{{range $key,$value := .status.listenerStatuses.externalListeners}}{{$key}}{{"\n"}}{{end}}` kubectlArgGoTemplateExternalListenerAddressesTemplate = `-o=go-template='{{range .status.listenerStatuses.externalListeners.%s}}{{.address}}{{"\n"}}{{end}}` - - crdKind = "customresourcedefinitions.apiextensions.k8s.io" - kafkaKind = "kafkaclusters.kafka.banzaicloud.io" - kafkaTopicKind = "kafkatopics.kafka.banzaicloud.io" - kafkaClusterName = "kafka" - testExternalTopicName = "topic-test-external" - testInternalTopicName = "topic-test-internal" - + crdKind = "customresourcedefinitions.apiextensions.k8s.io" + kafkaKind = "kafkaclusters.kafka.banzaicloud.io" + kafkaTopicKind = "kafkatopics.kafka.banzaicloud.io" + kafkaUserKind = "kafkausers.kafka.banzaicloud.io" + kafkaClusterName = "kafka" + kafkaUserName = "test-user" + testExternalTopicName = "topic-test-external" + testInternalTopicName = "topic-test-internal" + + defaultTLSSecretName = "test-secret" kcatPodName = "kcat" zookeeperKind = "zookeeperclusters.zookeeper.pravega.io" zookeeperClusterName = "zookeeper-server" @@ -49,8 +51,10 @@ const ( defaultDeletionTimeout = 20 * time.Second defaultPodReadinessWaitTime = 10 * time.Second defaultTopicCreationWaitTime = 10 * time.Second + defaultUserCreationWaitTime = 10 * time.Second kafkaClusterCreateTimeout = 500 * time.Second kafkaClusterResourceCleanupTimeout = 120 * time.Second + kcatDeleetionTimeout = 40 * time.Second zookeeperClusterCreateTimeout = 4 * time.Minute zookeeperClusterResourceCleanupTimeout = 60 * time.Second externalConsumerTimeout = 5 * time.Second @@ -60,6 +64,7 @@ const ( kcatPodTemplate = "templates/kcat.yaml.tmpl" kafkaTopicTemplate = "templates/topic.yaml.tmpl" + kafkaUserTemplate = "templates/user.yaml.tmpl" zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl" kubectlNotFoundErrorMsg = "NotFound" diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 7553913fe..fd4718df3 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -48,3 +48,26 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str }) } + +// requireCreatingKafkaUser creates a KafkaUser resource from a template +func requireCreatingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) { + It("Deploying KafkaUser CR", func() { + templateParameters := map[string]interface{}{ + "Name": userName, + "Namespace": kubectlOptions.Namespace, + } + if tlsSecretName != "" { + templateParameters["TLSSecretName"] = tlsSecretName + } + + err := applyK8sResourceFromTemplate(kubectlOptions, + kafkaUserTemplate, + templateParameters, + ) + Expect(err).ShouldNot(HaveOccurred()) + + secretFound := isExistingK8SResource(kubectlOptions, "secret", tlsSecretName) + Expect(secretFound).To(BeTrue()) + }) + +} diff --git a/tests/e2e/kcat.go b/tests/e2e/kcat.go index 784a723b2..8d39f9e5c 100644 --- a/tests/e2e/kcat.go +++ b/tests/e2e/kcat.go @@ -23,16 +23,21 @@ import ( // consumingMessagesInternally consuming messages based on parameters from Kafka cluster. // It returns messages in string slice. -func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string) (string, error) { +func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, tlsMode bool) (string, error) { By(fmt.Sprintf("Consuming messages from internalKafkaAddress: '%s' topicName: '%s'", internalKafkaAddress, topicName)) + kcatTLSParameters := "" + if tlsMode { + kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt" + } + consumedMessages, err := k8s.RunKubectlAndGetOutputE(GinkgoT(), k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""), "exec", kcatPodName, "-n", kubectlOptions.Namespace, "--", - "/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s -t %s -e -C ", internalKafkaAddress, topicName), + "/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s %s -t %s -e -C ", internalKafkaAddress, kcatTLSParameters, topicName), ) if err != nil { @@ -43,16 +48,21 @@ func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName } // producingMessagesInternally produces messages based on the parameters into kafka cluster. -func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string) error { +func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string, tlsMode bool) error { By(fmt.Sprintf("Producing messages: '%s' to internalKafkaAddress: '%s' topicName: '%s'", message, internalKafkaAddress, topicName)) + kcatTLSParameters := "" + if tlsMode { + kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt" + } + _, err := k8s.RunKubectlAndGetOutputE(GinkgoT(), k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""), "exec", kcatPodName, "-n", kubectlOptions.Namespace, "--", - "/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s -t %s -P", - message, internalKafkaAddress, topicName), + "/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s %s -t %s -P", + message, internalKafkaAddress, kcatTLSParameters, topicName), ) return err diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index b17b31db1..c879269d9 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -54,12 +54,14 @@ var _ = BeforeSuite(func() { }) var _ = When("Testing e2e test altogether", Ordered, func() { - //testInstall() + testInstall() testInstallZookeeperCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + testProduceConsumeInternal() testUninstallKafkaCluster() testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") + testProduceConsumeInternalSSL(defaultTLSSecretName) testUninstallKafkaCluster() testUninstallZookeeperCluster() - //testUninstall() + testUninstall() }) diff --git a/tests/e2e/produce_consume.go b/tests/e2e/produce_consume.go index d54dee9fb..a656070e2 100644 --- a/tests/e2e/produce_consume.go +++ b/tests/e2e/produce_consume.go @@ -26,14 +26,19 @@ import ( ) // requireDeployingKcatPod deploys kcat pod form a template and checks the pod readiness -func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string) { +func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string, tlsSecretName string) { It("Deploying Kcat Pod", func() { + templateParameters := map[string]interface{}{ + "Name": kcatPodName, + "Namespace": kubectlOptions.Namespace, + } + if tlsSecretName != "" { + templateParameters["TLSSecretName"] = tlsSecretName + } + err := applyK8sResourceFromTemplate(kubectlOptions, kcatPodTemplate, - map[string]interface{}{ - "Name": kcatPodName, - "Namespace": kubectlOptions.Namespace, - }, + templateParameters, ) Expect(err).ShouldNot(HaveOccurred()) @@ -48,15 +53,16 @@ func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string) // requireDeleteKcatPod deletes kcat pod. func requireDeleteKcatPod(kubectlOptions k8s.KubectlOptions, podName string) { It("Deleting Kcat pod", func() { - err := deleteK8sResource(kubectlOptions, defaultDeletionTimeout, "pods", "", podName) + err := deleteK8sResource(kubectlOptions, kcatDeleetionTimeout, "pods", "", podName) Expect(err).NotTo(HaveOccurred()) }) } // requireInternalProducingConsumingMessage produces and consumes messages internally through a kcat pod // and makes comparisons between the produced and consumed messages. -// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status -func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string) { +// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status. +// When tlsSecretName is set +func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string, tlsSecretName string) { It(fmt.Sprintf("Producing and consuming messages to/from topicName: '%s", topicName), func() { if internalAddress == "" { By("Getting Kafka cluster internal addresses") @@ -83,11 +89,16 @@ func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress = internalListenerAddresses[0] } + tlsMode := false + if tlsSecretName != "" { + tlsMode = true + } + currentTime := time.Now() - err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String()) + err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String(), tlsMode) Expect(err).NotTo(HaveOccurred()) - consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName) + consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, tlsMode) Expect(err).NotTo(HaveOccurred()) Expect(consumedMessages).Should(ContainSubstring(currentTime.String())) diff --git a/tests/e2e/templates/kcat.yaml.tmpl b/tests/e2e/templates/kcat.yaml.tmpl index 47e783070..e04978992 100644 --- a/tests/e2e/templates/kcat.yaml.tmpl +++ b/tests/e2e/templates/kcat.yaml.tmpl @@ -11,3 +11,12 @@ spec: # Just spin & wait forever command: [ "/bin/sh", "-c", "--" ] args: [ "while true; do sleep 3000; done;" ] + {{ with .TLSSecretName }} + volumeMounts: + - name: sslcerts + mountPath: "/ssl/certs" + volumes: + - name: sslcerts + secret: + secretName: {{ . }} + {{ end }} diff --git a/tests/e2e/templates/user.yaml.tmpl b/tests/e2e/templates/user.yaml.tmpl new file mode 100644 index 000000000..124db3510 --- /dev/null +++ b/tests/e2e/templates/user.yaml.tmpl @@ -0,0 +1,11 @@ +apiVersion: kafka.banzaicloud.io/v1alpha1 +kind: KafkaUser +metadata: + name: {{or .Name "test-user"}} + namespace: {{or .Namespace "kafka" }} +spec: + clusterRef: + name: kafka + {{ with .TLSSecretName }} + secretName: {{ . }} + {{ end }} diff --git a/tests/e2e/test_produce_consume.go b/tests/e2e/test_produce_consume.go index a20b0f1be..bc59332a8 100644 --- a/tests/e2e/test_produce_consume.go +++ b/tests/e2e/test_produce_consume.go @@ -18,9 +18,30 @@ func testProduceConsumeInternal() bool { kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace - requireDeployingKcatPod(kubectlOptions, kcatPodName) + requireDeployingKcatPod(kubectlOptions, kcatPodName, "") requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) - requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName) + requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, "") + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + requireDeleteKcatPod(kubectlOptions, kcatPodName) + }) +} + +func testProduceConsumeInternalSSL(tlsSecretName string) bool { + return When("Internally produce and consume message to/from Kafka cluster using SSL", func() { + var kubectlOptions k8s.KubectlOptions + var err error + + It("Acquiring K8s config and context", func() { + kubectlOptions, err = kubectlOptionsForCurrentContext() + Expect(err).NotTo(HaveOccurred()) + }) + + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + + requireCreatingKafkaUser(kubectlOptions, kafkaUserName, tlsSecretName) + requireDeployingKcatPod(kubectlOptions, kcatPodName, tlsSecretName) + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, tlsSecretName) requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) requireDeleteKcatPod(kubectlOptions, kcatPodName) }) From aaedad0e718b8dc982bdc3c8a3787384475893fb Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Thu, 20 Jul 2023 14:47:39 +0200 Subject: [PATCH 2/5] Fix review suggestion --- tests/e2e/kafka.go | 5 +++-- tests/e2e/templates/user.yaml.tmpl | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index fd4718df3..b142c9e78 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -53,8 +53,9 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str func requireCreatingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) { It("Deploying KafkaUser CR", func() { templateParameters := map[string]interface{}{ - "Name": userName, - "Namespace": kubectlOptions.Namespace, + "Name": userName, + "Namespace": kubectlOptions.Namespace, + "ClusterName": kafkaClusterName, } if tlsSecretName != "" { templateParameters["TLSSecretName"] = tlsSecretName diff --git a/tests/e2e/templates/user.yaml.tmpl b/tests/e2e/templates/user.yaml.tmpl index 124db3510..be1ed92b6 100644 --- a/tests/e2e/templates/user.yaml.tmpl +++ b/tests/e2e/templates/user.yaml.tmpl @@ -5,7 +5,7 @@ metadata: namespace: {{or .Namespace "kafka" }} spec: clusterRef: - name: kafka + name: {{or .ClusterName "kafka" }} {{ with .TLSSecretName }} secretName: {{ . }} {{ end }} From 580b898a6265228084a7229675e6c444b1872a41 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Thu, 20 Jul 2023 15:16:42 +0200 Subject: [PATCH 3/5] Added deleteKafkaUser function --- tests/e2e/kafka.go | 12 ++++++++++-- tests/e2e/test_produce_consume.go | 3 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index b142c9e78..35394b6f2 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -49,8 +49,16 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str } -// requireCreatingKafkaUser creates a KafkaUser resource from a template -func requireCreatingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) { +// requireDeleteKafkaUser deletes a kafkaUser resource by name +func requireDeleteKafkaUser(kubectlOptions k8s.KubectlOptions, userName string) { + It("Deleting KafkaUser CR", func() { + err := deleteK8sResource(kubectlOptions, defaultDeletionTimeout, kafkaUserKind, "", userName) + Expect(err).NotTo(HaveOccurred()) + }) +} + +// requireDeployingKafkaUser creates a KafkaUser resource from a template +func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) { It("Deploying KafkaUser CR", func() { templateParameters := map[string]interface{}{ "Name": userName, diff --git a/tests/e2e/test_produce_consume.go b/tests/e2e/test_produce_consume.go index bc59332a8..429b2b8be 100644 --- a/tests/e2e/test_produce_consume.go +++ b/tests/e2e/test_produce_consume.go @@ -38,12 +38,13 @@ func testProduceConsumeInternalSSL(tlsSecretName string) bool { kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace - requireCreatingKafkaUser(kubectlOptions, kafkaUserName, tlsSecretName) + requireDeployingKafkaUser(kubectlOptions, kafkaUserName, tlsSecretName) requireDeployingKcatPod(kubectlOptions, kcatPodName, tlsSecretName) requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, tlsSecretName) requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) requireDeleteKcatPod(kubectlOptions, kcatPodName) + requireDeleteKafkaUser(kubectlOptions, kafkaUserName) }) } From ba2179ca342e590c6431d52e8ba5ce4bb63195c9 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Mon, 24 Jul 2023 14:51:36 +0200 Subject: [PATCH 4/5] Corrected typo --- tests/e2e/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 35394b6f2..1106d0c63 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -75,7 +75,7 @@ func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName strin ) Expect(err).ShouldNot(HaveOccurred()) - secretFound := isExistingK8SResource(kubectlOptions, "secret", tlsSecretName) + secretFound := isExistingK8SResource(kubectlOptions, "Secret", tlsSecretName) Expect(secretFound).To(BeTrue()) }) From a987b597c5932caf43afba48363e707fbeffef98 Mon Sep 17 00:00:00 2001 From: Kuvesz Date: Mon, 24 Jul 2023 19:17:18 +0200 Subject: [PATCH 5/5] Added eventually to secret check --- tests/e2e/kafka.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 1106d0c63..835fc8cd3 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -15,6 +15,9 @@ package e2e import ( + "context" + "time" + "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -75,8 +78,9 @@ func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName strin ) Expect(err).ShouldNot(HaveOccurred()) - secretFound := isExistingK8SResource(kubectlOptions, "Secret", tlsSecretName) - Expect(secretFound).To(BeTrue()) + Eventually(context.Background(), func() bool { + return isExistingK8SResource(kubectlOptions, "Secret", tlsSecretName) + }, defaultUserCreationWaitTime, 3*time.Second).Should(Equal(true)) }) }