Skip to content

Commit

Permalink
Adding rosa classic to workers scale workload (#107)
Browse files Browse the repository at this point in the history
* Code changes to scale up and scale down machines

Signed-off-by: Vishnu Challa <[email protected]>

* Code changes to calculate metrics

Signed-off-by: Vishnu Challa <[email protected]>

* Adding factory to switch between aws with and w/o autoscaler

Signed-off-by: Vishnu Challa <[email protected]>

* Adding AWS self-managed autoscaler case

Signed-off-by: Vishnu Challa <[email protected]>

* Adding plugin functionality on top of already scaled nodes

Signed-off-by: Vishnu Challa <[email protected]>

* Adding plugin functionality on top of already scaled nodes

Signed-off-by: Vishnu Challa <[email protected]>

* Handling autoscaling through a batch job

Signed-off-by: Vishnu Challa <[email protected]>

---------

Signed-off-by: Vishnu Challa <[email protected]>
Co-authored-by: Vishnu Challa <[email protected]>
  • Loading branch information
vishnuchalla and Vishnu Challa authored Sep 25, 2024
1 parent f4799cd commit 01536b2
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -275,7 +275,7 @@ func waitForMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clie
wg.Add(1)
go func(ms string, r int) {
defer wg.Done()
err := waitForMachineSet(machineClient, ms, int32(r), maxWaitTimeout)
err := waitForMachineSet(machineClient, ms, int32(r))
if err != nil {
log.Errorf("Failed waiting for MachineSet %s: %v", ms, err)
}
Expand All @@ -284,7 +284,7 @@ func waitForMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clie
})
wg.Wait()
log.Infof("All the machinesets have been scaled")
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
log.Infof("Error waiting for nodes: %v", err)
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
}
6 changes: 3 additions & 3 deletions pkg/workers_scale/base.go → pkg/workerscale/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"sort"
Expand All @@ -35,8 +35,8 @@ func (awsScenario *BaseScenario) OrchestrateWorkload(scaleConfig ScaleConfig) {
log.Info("Scale event epoch time specified. Hence calculating node latencies without any scaling")
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
log.Infof("Error waiting for nodes: %v", err)
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import "time"

Expand Down
34 changes: 28 additions & 6 deletions pkg/workers_scale/machines.go → pkg/workerscale/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -90,7 +90,7 @@ func editMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clientS
wg.Add(1)
go func(ms string, r int) {
defer wg.Done()
err := updateMachineSetReplicas(machineClient, ms, int32(r), maxWaitTimeout, machineSetsToEdit)
err := updateMachineSetReplicas(machineClient, ms, int32(r), machineSetsToEdit)
if err != nil {
log.Errorf("Failed to edit MachineSet %s: %v", ms, err)
}
Expand All @@ -99,13 +99,13 @@ func editMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clientS
})
wg.Wait()
log.Infof("All the machinesets have been editted")
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
if err := waitForNodes(clientSet); err != nil {
log.Infof("Error waiting for nodes: %v", err)
}
}

// updateMachineSetsReplicas updates machines replicas
func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, maxWaitTimeout time.Duration, machineSetsToEdit *sync.Map) error {
func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, machineSetsToEdit *sync.Map) error {
machineSet, err := machineClient.MachineSets(machineNamespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting machineset: %s", err)
Expand All @@ -122,7 +122,7 @@ func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client
msInfo.lastUpdatedTime = updateTimestamp
machineSetsToEdit.Store(name, msInfo)

err = waitForMachineSet(machineClient, name, newReplicaCount, maxWaitTimeout)
err = waitForMachineSet(machineClient, name, newReplicaCount)
if err != nil {
return fmt.Errorf("timeout waiting for MachineSet %s to be ready: %v", name, err)
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func getMachineSets(machineClient *machinev1beta1.MachineV1beta1Client) map[int]
}

// waitForMachineSet waits for machinesets to be ready with new replica count
func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, maxWaitTimeout time.Duration) error {
func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(ctx context.Context) (done bool, err error) {
ms, err := machineClient.MachineSets(machineNamespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
Expand All @@ -164,3 +164,25 @@ func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name
return false, nil
})
}

// waitForWorkerMachineSets waits for all the worker machinesets in specific to be ready
func waitForWorkerMachineSets(machineClient *machinev1beta1.MachineV1beta1Client) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(_ context.Context) (done bool, err error) {
// Get all MachineSets with the worker label
labelSelector := metav1.ListOptions{
LabelSelector: "hive.openshift.io/machine-pool=worker",
}
machineSets, err := machineClient.MachineSets(machineNamespace).List(context.TODO(), labelSelector)
if err != nil {
return false, err
}
for _, ms := range machineSets.Items {
if ms.Status.Replicas != ms.Status.ReadyReplicas {
log.Debugf("Waiting for MachineSet %s to reach %d replicas, currently %d ready", ms.Name, ms.Status.Replicas, ms.Status.ReadyReplicas)
return false, nil
}
}
log.Info("All worker MachineSets have reached desired replica count")
return true, nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"strings"
Expand Down
151 changes: 151 additions & 0 deletions pkg/workerscale/rosa.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2024 The Kube-burner Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerscale

import (
"context"
"fmt"
"os/exec"
"sync"
"time"

"github.com/kube-burner/kube-burner/pkg/config"
"github.com/kube-burner/kube-burner/pkg/measurements"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

type RosaScenario struct{}

// Returns a new scenario object
func (rosaScenario *RosaScenario) OrchestrateWorkload(scaleConfig ScaleConfig) {
var err error
var triggerJob string
kubeClientProvider := config.NewKubeClientProvider("", "")
clientSet, restConfig := kubeClientProvider.ClientSet(0, 0)
machineClient := getMachineClient(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
clusterID := getClusterID(dynamicClient)
if scaleConfig.ScaleEventEpoch != 0 {
log.Info("Scale event epoch time specified. Hence calculating node latencies without any scaling")
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
}
scaledMachineDetails, amiID := getMachines(machineClient, scaleConfig.ScaleEventEpoch)
finalizeMetrics(&sync.Map{}, scaledMachineDetails, scaleConfig.Indexer, amiID, scaleConfig.ScaleEventEpoch)
} else {
prevMachineDetails, _ := getMachines(machineClient, 0)
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
log.Info("Updating machinepool to the desired worker count")
triggerTime := editMachinepool(clusterID, len(prevMachineDetails), len(prevMachineDetails)+scaleConfig.AdditionalWorkerNodes, scaleConfig.AutoScalerEnabled)
if scaleConfig.AutoScalerEnabled {
triggerJob, triggerTime = createBatchJob(clientSet)
// Delay for the clusterautoscaler resources to come up
time.Sleep(5 * time.Minute)
} else {
// Delay for the rosa to update the machinesets
time.Sleep(1 * time.Minute)
}
log.Info("Waiting for the machinesets to be ready")
err = waitForWorkerMachineSets(machineClient)
if err != nil {
log.Fatalf("Error waitingMachineSets to be ready: %v", err)
}
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
}
scaledMachineDetails, amiID := getMachines(machineClient, 0)
discardPreviousMachines(prevMachineDetails, scaledMachineDetails)
finalizeMetrics(&sync.Map{}, scaledMachineDetails, scaleConfig.Indexer, amiID, triggerTime.Unix())
if scaleConfig.GC {
if scaleConfig.AutoScalerEnabled {
deleteBatchJob(clientSet, triggerJob)
}
log.Info("Restoring machine sets to previous state")
editMachinepool(clusterID, len(prevMachineDetails), len(prevMachineDetails), false)
}
}
}

// editMachinepool edits machinepool to desired replica count
func editMachinepool(clusterID string, minReplicas int, maxReplicas int, autoScalerEnabled bool) time.Time {
verifyRosaInstall()
triggerTime := time.Now().UTC().Truncate(time.Second)
cmdArgs := []string{"edit", "machinepool", "-c", clusterID, "worker", fmt.Sprintf("--enable-autoscaling=%t", autoScalerEnabled)}
if autoScalerEnabled {
cmdArgs = append(cmdArgs, fmt.Sprintf("--min-replicas=%d", minReplicas))
cmdArgs = append(cmdArgs, fmt.Sprintf("--max-replicas=%d", maxReplicas))
} else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--replicas=%d", minReplicas))
}
cmd := exec.Command("rosa", cmdArgs...)
editOutput, err := cmd.CombinedOutput()
if err != nil {
log.Fatalf("Failed to edit machinepool: %v. Output: %s", err, string(editOutput))
}
log.Infof("Machinepool edited successfully on cluster: %v", clusterID)
log.Debug(string(editOutput))
return triggerTime
}

// verifyRosaInstall verifies rosa installation and login
func verifyRosaInstall() {
if _, err := exec.LookPath("rosa"); err != nil {
log.Fatal("ROSA CLI is not installed. Please install it and retry.")
return
}
log.Info("ROSA CLI is installed.")

cmd := exec.Command("rosa", "whoami")
output, err := cmd.CombinedOutput()
if err != nil {
log.Fatal("You are not logged in. Please login using 'rosa login' and retry.")
}
log.Info("You are already logged in.")
log.Debug(string(output))
}

// getClusterID fetches the clusterID
func getClusterID(dynamicClient dynamic.Interface) string {
clusterVersionGVR := schema.GroupVersionResource{
Group: "config.openshift.io",
Version: "v1",
Resource: "clusterversions",
}

clusterVersion, err := dynamicClient.Resource(clusterVersionGVR).Get(context.TODO(), "version", metav1.GetOptions{})
if err != nil {
log.Fatalf("Error fetching cluster version: %v", err)
}

clusterID, found, err := unstructured.NestedString(clusterVersion.Object, "spec", "clusterID")
if err != nil || !found {
log.Fatalf("Error retrieving cluster ID: %v", err)
}

return clusterID
}
3 changes: 2 additions & 1 deletion pkg/workers_scale/types.go → pkg/workerscale/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"time"
Expand All @@ -34,6 +34,7 @@ type ScaleConfig struct {
Indexer indexers.Indexer
GC bool
ScaleEventEpoch int64
AutoScalerEnabled bool
}

// Struct to extract AMIID from aws provider spec
Expand Down
4 changes: 2 additions & 2 deletions pkg/workers_scale/utils.go → pkg/workerscale/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -62,7 +62,7 @@ func isNodeReady(node *v1.Node) bool {
}

// waitForNodes waits for all the nodes to be ready
func waitForNodes(clientset kubernetes.Interface, maxWaitTimeout time.Duration) error {
func waitForNodes(clientset kubernetes.Interface) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(ctx context.Context) (done bool, err error) {
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions workers-scale.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The Kube-burner Authors.
// Copyright 2024 The Kube-burner Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import (
"github.com/kube-burner/kube-burner/pkg/workloads"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
wscale "kube-burner.io/ocp/pkg/workers_scale"
wscale "kube-burner.io/ocp/pkg/workerscale"
)

// NewWorkersScale orchestrates scaling workers in ocp wrapper
Expand Down Expand Up @@ -126,14 +126,15 @@ func NewWorkersScale(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata
indexerValue = value
break
}
scenario := fetchScenario(enableAutoscaler)
scenario := fetchScenario(enableAutoscaler, clusterMetadata)
scenario.OrchestrateWorkload(wscale.ScaleConfig{
UUID: uuid,
AdditionalWorkerNodes: additionalWorkerNodes,
Metadata: metricsScraper.Metadata,
Indexer: indexerValue,
GC: gc,
ScaleEventEpoch: scaleEventEpoch,
AutoScalerEnabled: enableAutoscaler,
})
end := time.Now().Unix()
for _, prometheusClient := range metricsScraper.PrometheusClients {
Expand Down Expand Up @@ -182,10 +183,13 @@ func NewWorkersScale(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata
}

// FetchScenario helps us to fetch relevant class
func fetchScenario(enableAutoscaler bool) wscale.Scenario {
if enableAutoscaler {
return &wscale.AutoScalerScenario{}
func fetchScenario(enableAutoscaler bool, clusterMetadata ocpmetadata.ClusterMetadata) wscale.Scenario {
if clusterMetadata.ClusterType == "rosa" {
return &wscale.RosaScenario{}
} else {
if enableAutoscaler {
return &wscale.AutoScalerScenario{}
}
return &wscale.BaseScenario{}
}
}

0 comments on commit 01536b2

Please sign in to comment.