Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added internal SSL listener test #1017

Merged
merged 9 commits into from
Jul 26, 2023
Merged
6 changes: 6 additions & 0 deletions tests/e2e/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ const (
crdKind = "customresourcedefinitions.apiextensions.k8s.io"
kafkaKind = "kafkaclusters.kafka.banzaicloud.io"
kafkaTopicKind = "kafkatopics.kafka.banzaicloud.io"
kafkaUserKind = "kafkausers.kafka.banzaicloud.io"
kafkaClusterName = "kafka"
kafkaUserName = "test-user"
testExternalTopicName = "topic-test-external"
testInternalTopicName = "topic-test-internal"

defaultTLSSecretName = "test-secret"
kcatPodName = "kcat"
zookeeperKind = "zookeeperclusters.zookeeper.pravega.io"
zookeeperClusterName = "zookeeper-server"
Expand All @@ -48,8 +51,10 @@ const (
defaultDeletionTimeout = 20 * time.Second
defaultPodReadinessWaitTime = 10 * time.Second
defaultTopicCreationWaitTime = 10 * time.Second
defaultUserCreationWaitTime = 10 * time.Second
kafkaClusterCreateTimeout = 500 * time.Second
kafkaClusterResourceCleanupTimeout = 120 * time.Second
kcatDeleetionTimeout = 40 * time.Second
zookeeperClusterCreateTimeout = 4 * time.Minute
zookeeperClusterResourceCleanupTimeout = 60 * time.Second
externalConsumerTimeout = 5 * time.Second
Expand All @@ -59,6 +64,7 @@ const (

kcatPodTemplate = "templates/kcat.yaml.tmpl"
kafkaTopicTemplate = "templates/topic.yaml.tmpl"
kafkaUserTemplate = "templates/user.yaml.tmpl"
zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl"

kubectlNotFoundErrorMsg = "NotFound"
Expand Down
36 changes: 36 additions & 0 deletions tests/e2e/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package e2e

import (
"context"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -48,3 +51,36 @@ func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName str
})

}

// requireDeleteKafkaUser deletes a kafkaUser resource by name
func requireDeleteKafkaUser(kubectlOptions k8s.KubectlOptions, userName string) {
It("Deleting KafkaUser CR", func() {
err := deleteK8sResource(kubectlOptions, defaultDeletionTimeout, kafkaUserKind, "", userName)
Expect(err).NotTo(HaveOccurred())
})
}

// requireDeployingKafkaUser creates a KafkaUser resource from a template
func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName string, tlsSecretName string) {
It("Deploying KafkaUser CR", func() {
templateParameters := map[string]interface{}{
"Name": userName,
"Namespace": kubectlOptions.Namespace,
"ClusterName": kafkaClusterName,
}
if tlsSecretName != "" {
templateParameters["TLSSecretName"] = tlsSecretName
}

err := applyK8sResourceFromTemplate(kubectlOptions,
kafkaUserTemplate,
templateParameters,
)
Expect(err).ShouldNot(HaveOccurred())

Eventually(context.Background(), func() bool {
return isExistingK8SResource(kubectlOptions, "Secret", tlsSecretName)
}, defaultUserCreationWaitTime, 3*time.Second).Should(Equal(true))
})

}
20 changes: 15 additions & 5 deletions tests/e2e/kcat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@ import (

// consumingMessagesInternally consuming messages based on parameters from Kafka cluster.
// It returns messages in string slice.
func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string) (string, error) {
func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, tlsMode bool) (string, error) {

By(fmt.Sprintf("Consuming messages from internalKafkaAddress: '%s' topicName: '%s'", internalKafkaAddress, topicName))

kcatTLSParameters := ""
if tlsMode {
kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt"
}

consumedMessages, err := k8s.RunKubectlAndGetOutputE(GinkgoT(),
k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""),
"exec", kcatPodName,
"-n", kubectlOptions.Namespace,
"--",
"/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s -t %s -e -C ", internalKafkaAddress, topicName),
"/bin/sh", "-c", fmt.Sprintf("kcat -L -b %s %s -t %s -e -C ", internalKafkaAddress, kcatTLSParameters, topicName),
)

if err != nil {
Expand All @@ -43,16 +48,21 @@ func consumingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName
}

// producingMessagesInternally produces messages based on the parameters into kafka cluster.
func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string) error {
func producingMessagesInternally(kubectlOptions k8s.KubectlOptions, kcatPodName string, internalKafkaAddress string, topicName string, message string, tlsMode bool) error {
By(fmt.Sprintf("Producing messages: '%s' to internalKafkaAddress: '%s' topicName: '%s'", message, internalKafkaAddress, topicName))

kcatTLSParameters := ""
if tlsMode {
kcatTLSParameters += "-X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt"
}

_, err := k8s.RunKubectlAndGetOutputE(GinkgoT(),
k8s.NewKubectlOptions(kubectlOptions.ContextName, kubectlOptions.ConfigPath, ""),
"exec", kcatPodName,
"-n", kubectlOptions.Namespace,
"--",
"/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s -t %s -P",
message, internalKafkaAddress, topicName),
"/bin/sh", "-c", fmt.Sprintf("echo %s | kcat -L -b %s %s -t %s -P",
message, internalKafkaAddress, kcatTLSParameters, topicName),
)

return err
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/koperator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ var _ = When("Testing e2e test altogether", Ordered, func() {
testInstall()
testInstallZookeeperCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml")
testProduceConsumeInternal()
testUninstallKafkaCluster()
testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml")
testProduceConsumeInternalSSL(defaultTLSSecretName)
testUninstallKafkaCluster()
testUninstallZookeeperCluster()
testUninstall()
Expand Down
31 changes: 21 additions & 10 deletions tests/e2e/produce_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ import (
)

// requireDeployingKcatPod deploys kcat pod form a template and checks the pod readiness
func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string) {
func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string, tlsSecretName string) {
It("Deploying Kcat Pod", func() {
templateParameters := map[string]interface{}{
"Name": kcatPodName,
"Namespace": kubectlOptions.Namespace,
}
if tlsSecretName != "" {
templateParameters["TLSSecretName"] = tlsSecretName
}

err := applyK8sResourceFromTemplate(kubectlOptions,
kcatPodTemplate,
map[string]interface{}{
"Name": kcatPodName,
"Namespace": kubectlOptions.Namespace,
},
templateParameters,
)
Expect(err).ShouldNot(HaveOccurred())

Expand All @@ -48,15 +53,16 @@ func requireDeployingKcatPod(kubectlOptions k8s.KubectlOptions, podName string)
// requireDeleteKcatPod deletes kcat pod.
func requireDeleteKcatPod(kubectlOptions k8s.KubectlOptions, podName string) {
It("Deleting Kcat pod", func() {
err := deleteK8sResource(kubectlOptions, defaultDeletionTimeout, "pods", "", podName)
err := deleteK8sResource(kubectlOptions, kcatDeleetionTimeout, "pods", "", podName)
Expect(err).NotTo(HaveOccurred())
})
}

// requireInternalProducingConsumingMessage produces and consumes messages internally through a kcat pod
// and makes comparisons between the produced and consumed messages.
// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status
func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string) {
// When internalAddress parameter is empty, it gets the internal address from the kafkaCluster CR status.
// When tlsSecretName is set
func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions, internalAddress, kcatPodName, topicName string, tlsSecretName string) {
It(fmt.Sprintf("Producing and consuming messages to/from topicName: '%s", topicName), func() {
if internalAddress == "" {
By("Getting Kafka cluster internal addresses")
Expand All @@ -83,11 +89,16 @@ func requireInternalProducingConsumingMessage(kubectlOptions k8s.KubectlOptions,
internalAddress = internalListenerAddresses[0]
}

tlsMode := false
if tlsSecretName != "" {
tlsMode = true
}

currentTime := time.Now()
err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String())
err := producingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, currentTime.String(), tlsMode)
Expect(err).NotTo(HaveOccurred())

consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName)
consumedMessages, err := consumingMessagesInternally(kubectlOptions, kcatPodName, internalAddress, topicName, tlsMode)

Expect(err).NotTo(HaveOccurred())
Expect(consumedMessages).Should(ContainSubstring(currentTime.String()))
Expand Down
9 changes: 9 additions & 0 deletions tests/e2e/templates/kcat.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ spec:
# Just spin & wait forever
command: [ "/bin/sh", "-c", "--" ]
args: [ "while true; do sleep 3000; done;" ]
{{ with .TLSSecretName }}
volumeMounts:
- name: sslcerts
mountPath: "/ssl/certs"
volumes:
- name: sslcerts
secret:
secretName: {{ . }}
{{ end }}
11 changes: 11 additions & 0 deletions tests/e2e/templates/user.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
name: {{or .Name "test-user"}}
namespace: {{or .Namespace "kafka" }}
spec:
clusterRef:
name: {{or .ClusterName "kafka" }}
{{ with .TLSSecretName }}
secretName: {{ . }}
{{ end }}
26 changes: 24 additions & 2 deletions tests/e2e/test_produce_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,36 @@ func testProduceConsumeInternal() bool {

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace

requireDeployingKcatPod(kubectlOptions, kcatPodName)
requireDeployingKcatPod(kubectlOptions, kcatPodName, "")
requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, "")
requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName)
requireDeleteKcatPod(kubectlOptions, kcatPodName)
})
}

func testProduceConsumeInternalSSL(tlsSecretName string) bool {
return When("Internally produce and consume message to/from Kafka cluster using SSL", func() {
var kubectlOptions k8s.KubectlOptions
var err error

It("Acquiring K8s config and context", func() {
kubectlOptions, err = kubectlOptionsForCurrentContext()
Expect(err).NotTo(HaveOccurred())
})

kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace

requireDeployingKafkaUser(kubectlOptions, kafkaUserName, tlsSecretName)
requireDeployingKcatPod(kubectlOptions, kcatPodName, tlsSecretName)
requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName)
requireInternalProducingConsumingMessage(kubectlOptions, "", kcatPodName, testInternalTopicName, tlsSecretName)
requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName)
requireDeleteKcatPod(kubectlOptions, kcatPodName)
requireDeleteKafkaUser(kubectlOptions, kafkaUserName)
})
}

func testProduceConsumeExternal(tlsSecretName string) bool {
return When("Externally produce and consume message to/from Kafka cluster", func() {
var kubectlOptions k8s.KubectlOptions
Expand Down
Loading