Skip to content

Commit

Permalink
[tmpnet] Refactor boostrap monitor kubernetes functions for reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
marun committed Oct 7, 2024
1 parent 43d5b43 commit 4cf5d20
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 245 deletions.
51 changes: 2 additions & 49 deletions tests/fixture/bootstrapmonitor/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/ava-labs/avalanchego/tests/fixture/tmpnet"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/version"

Expand All @@ -39,54 +40,6 @@ type bootstrapTestDetails struct {
StartTime time.Time `json:"startTime"`
}

// WaitForPodCondition watches the specified pod until the status includes the specified condition.
func WaitForPodCondition(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podName string, conditionType corev1.PodConditionType) error {
return waitForPodStatus(
ctx,
clientset,
namespace,
podName,
func(status *corev1.PodStatus) bool {
for _, condition := range status.Conditions {
if condition.Type == conditionType && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
},
)
}

// waitForPodStatus watches the specified pod until the status is deemed acceptable by the provided test function.
func waitForPodStatus(
ctx context.Context,
clientset *kubernetes.Clientset,
namespace string,
name string,
acceptable func(*corev1.PodStatus) bool,
) error {
watch, err := clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.SingleObject(metav1.ObjectMeta{Name: name}))
if err != nil {
return fmt.Errorf("failed to initiate watch of pod %s/%s: %w", namespace, name, err)
}

for {
select {
case event := <-watch.ResultChan():
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}

if acceptable(&pod.Status) {
return nil
}
case <-ctx.Done():
return fmt.Errorf("timeout waiting for pod readiness: %w", ctx.Err())
}
}
}

// setImageDetails updates the pod's owning statefulset with the image of the specified container and associated version details
func setImageDetails(ctx context.Context, log logging.Logger, clientset *kubernetes.Clientset, namespace string, podName string, imageDetails *ImageDetails) error {
// Determine the name of the statefulset to update
Expand Down Expand Up @@ -212,7 +165,7 @@ func getLatestImageDetails(
}
qualifiedPodName := fmt.Sprintf("%s.%s", namespace, createdPod.Name)

err = waitForPodStatus(ctx, clientset, namespace, createdPod.Name, func(status *corev1.PodStatus) bool {
err = tmpnet.WaitForPodStatus(ctx, clientset, namespace, createdPod.Name, func(status *corev1.PodStatus) bool {
return status.Phase == corev1.PodSucceeded || status.Phase == corev1.PodFailed
})
if err != nil {
Expand Down
220 changes: 25 additions & 195 deletions tests/fixture/bootstrapmonitor/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"bufio"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -18,23 +15,16 @@ import (

"github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/utils/pointer"

"github.com/ava-labs/avalanchego/api/info"
"github.com/ava-labs/avalanchego/config"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/tests"
"github.com/ava-labs/avalanchego/tests/fixture/bootstrapmonitor"
"github.com/ava-labs/avalanchego/tests/fixture/e2e"
"github.com/ava-labs/avalanchego/tests/fixture/tmpnet"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -130,7 +120,7 @@ var _ = ginkgo.Describe("[Bootstrap Tester]", func() {
ginkgo.By(fmt.Sprintf("Created namespace %q", namespace))

ginkgo.By("Creating a node to bootstrap from")
nodeStatefulSet := newNodeStatefulSet("avalanchego-node", defaultNodeFlags())
nodeStatefulSet := newNodeStatefulSet("avalanchego-node", defaultPodFlags())
createdNodeStatefulSet, err := clientset.AppsV1().StatefulSets(namespace).Create(tc.DefaultContext(), nodeStatefulSet, metav1.CreateOptions{})
require.NoError(err)
nodePodName := createdNodeStatefulSet.Name + "-0"
Expand Down Expand Up @@ -254,205 +244,45 @@ func buildImage(tc tests.TestContext, imageName string, forceNewHash bool, scrip
require.NoError(err, "Image build failed: %s", output)
}

// newNodeStatefulSet returns a statefulset for an avalanchego node.
func newNodeStatefulSet(name string, flags map[string]string) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
GenerateName: name + "-",
},
Spec: appsv1.StatefulSetSpec{
Replicas: pointer.Int32(1),
ServiceName: name,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(volumeSize),
},
},
},
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
Annotations: map[string]string{
// This needs to be present to ensure compatibility with json patch replace
bootstrapmonitor.VersionsAnnotationKey: "",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: nodeContainerName,
Image: latestAvalanchegoImage,
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: config.DefaultHTTPPort,
},
{
Name: "staker",
ContainerPort: config.DefaultStakingPort,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: volumeName,
MountPath: nodeDataDir,
},
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/ext/health/liveness",
Port: intstr.FromInt(config.DefaultHTTPPort),
},
},
PeriodSeconds: 1,
SuccessThreshold: 1,
},
Env: stringMapToEnvVarSlice(flags),
},
},
},
},
},
}
}

// stringMapToEnvVarSlice converts a string map to a kube EnvVar slice.
func stringMapToEnvVarSlice(mapping map[string]string) []corev1.EnvVar {
envVars := make([]corev1.EnvVar, len(mapping))
var i int
for k, v := range mapping {
envVars[i] = corev1.EnvVar{
Name: config.EnvVarName(config.EnvPrefix, k),
Value: v,
}
i++
}
return envVars
return tmpnet.NewNodeStatefulSet(
name,
latestAvalanchegoImage,
nodeContainerName,
volumeName,
volumeSize,
nodeDataDir,
flags,
)
}

// defaultNodeFlags defines common flags for avalanchego nodes used by this test
func defaultNodeFlags() map[string]string {
return map[string]string{
config.DataDirKey: nodeDataDir,
config.NetworkNameKey: constants.LocalName,
config.SybilProtectionEnabledKey: "false",
config.HealthCheckFreqKey: "500ms", // Ensure rapid detection of a healthy state
config.LogDisplayLevelKey: logging.Debug.String(),
config.LogLevelKey: logging.Debug.String(),
config.HTTPHostKey: "0.0.0.0", // Need to bind to pod IP to ensure kubelet can access the http port for the readiness check
}
func defaultPodFlags() map[string]string {
return tmpnet.DefaultPodFlags(constants.LocalName, nodeDataDir)
}

// waitForPodCondition waits until the specified pod reports the specified condition
func waitForPodCondition(tc tests.TestContext, clientset *kubernetes.Clientset, namespace string, podName string, conditionType corev1.PodConditionType) {
require.NoError(tc, bootstrapmonitor.WaitForPodCondition(tc.DefaultContext(), clientset, namespace, podName, conditionType))
require.NoError(tc, tmpnet.WaitForPodCondition(tc.DefaultContext(), clientset, namespace, podName, conditionType))
}

// waitForNodeHealthy waits for the node running in the specified pod to report healthy.
func waitForNodeHealthy(tc tests.TestContext, kubeconfig *restclient.Config, namespace string, podName string) ids.NodeID {
require := require.New(tc)

// A forwarded connection enables connectivity without exposing the node external to the kube cluster
ginkgo.By(fmt.Sprintf("Enabling a local forward for pod %s.%s", namespace, podName))
localPort, localPortStopChan, err := enableLocalForwardForPod(kubeconfig, namespace, podName, config.DefaultHTTPPort, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter)
require.NoError(err)
defer close(localPortStopChan)
localNodeURI := fmt.Sprintf("http://127.0.0.1:%d", localPort)

infoClient := info.NewClient(localNodeURI)
bootstrapNodeID, _, err := infoClient.GetNodeID(tc.DefaultContext())
require.NoError(err)

ginkgo.By(fmt.Sprintf("Waiting for pod %s.%s to report a healthy status at %s", namespace, podName, localNodeURI))
require.Eventually(func() bool {
healthReply, err := tmpnet.CheckNodeHealth(tc.DefaultContext(), localNodeURI)
if err != nil {
tc.Outf("Error checking node health: %v\n", err)
return false
}
return healthReply.Healthy
}, e2e.DefaultTimeout, e2e.DefaultPollingInterval)

return bootstrapNodeID
}

// enableLocalForwardForPod enables traffic forwarding from a local port to the specified pod with client-go. The returned
// stop channel should be closed to stop the port forwarding.
func enableLocalForwardForPod(kubeconfig *restclient.Config, namespace string, name string, port int, out, errOut io.Writer) (uint16, chan struct{}, error) {
transport, upgrader, err := spdy.RoundTripperFor(kubeconfig)
if err != nil {
return 0, nil, fmt.Errorf("failed to create round tripper: %w", err)
}

dialer := spdy.NewDialer(
upgrader,
&http.Client{
Transport: transport,
},
http.MethodPost,
&url.URL{
Scheme: "https",
Path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, name),
Host: strings.TrimPrefix(kubeconfig.Host, "https://"),
},
nodeID, err := tmpnet.WaitForNodeHealthy(
tc.DefaultContext(),
tc.Outf,
kubeconfig,
namespace,
podName,
e2e.DefaultPollingInterval,
ginkgo.GinkgoWriter,
ginkgo.GinkgoWriter,
)
ports := []string{fmt.Sprintf("0:%d", port)}

// Need to specify 127.0.0.1 to ensure that forwarding is only attempted for the ipv4
// address of the pod. By default, kind is deployed with only ipv4, and attempting to
// connect to a pod with ipv6 will fail.
addresses := []string{"127.0.0.1"}

stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
forwarder, err := portforward.NewOnAddresses(dialer, addresses, ports, stopChan, readyChan, out, errOut)
if err != nil {
return 0, nil, fmt.Errorf("failed to create forwarder: %w", err)
}

go func() {
if err := forwarder.ForwardPorts(); err != nil {
// TODO(marun) Need better error handling here? Or is ok for test-only usage?
panic(err)
}
}()

<-readyChan // Wait for port forwarding to be ready

// Retrieve the dynamically allocated local port
forwardedPorts, err := forwarder.GetPorts()
if err != nil {
close(stopChan)
return 0, nil, fmt.Errorf("failed to get forwarded ports: %w", err)
}
if len(forwardedPorts) == 0 {
close(stopChan)
return 0, nil, fmt.Errorf("failed to find at least one forwarded port: %w", err)
}
return forwardedPorts[0].Local, stopChan, nil
require.NoError(tc, err)
return nodeID
}

// createBootstrapTester creates a pod that can continuously bootstrap from the specified bootstrap IP+ID.
func createBootstrapTester(tc tests.TestContext, clientset *kubernetes.Clientset, namespace string, bootstrapIP string, bootstrapNodeID ids.NodeID) *appsv1.StatefulSet {
flags := defaultNodeFlags()
flags := defaultPodFlags()
flags[config.BootstrapIPsKey] = fmt.Sprintf("%s:%d", bootstrapIP, config.DefaultStakingPort)
flags[config.BootstrapIDsKey] = bootstrapNodeID.String()

Expand Down
2 changes: 1 addition & 1 deletion tests/fixture/bootstrapmonitor/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func WaitForCompletion(

// Avoid checking node health before it reports initial ready
log.Info("Waiting for pod readiness")
if err := WaitForPodCondition(ctx, clientset, namespace, podName, corev1.PodReady); err != nil {
if err := tmpnet.WaitForPodCondition(ctx, clientset, namespace, podName, corev1.PodReady); err != nil {
return fmt.Errorf("failed to wait for pod condition: %w", err)
}

Expand Down
Loading

0 comments on commit 4cf5d20

Please sign in to comment.