Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat async install helm #97

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
189 changes: 20 additions & 169 deletions pkg/service/HelmAppService.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sync"

"github.com/caarlos0/env"
"github.com/devtron-labs/common-lib/pubsub-lib"
k8sUtils "github.com/devtron-labs/common-lib/utils/k8s"
"github.com/devtron-labs/common-lib/utils/yaml"
"github.com/devtron-labs/kubelink/bean"
Expand Down Expand Up @@ -105,10 +104,9 @@ type HelmAppService interface {
}

type HelmReleaseConfig struct {
EnableHelmReleaseCache bool `env:"ENABLE_HELM_RELEASE_CACHE" envDefault:"true"`
MaxCountForHelmRelease int `env:"MAX_COUNT_FOR_HELM_RELEASE" envDefault:"20"`
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"2"`
RunHelmInstallInAsyncMode bool `env:"RUN_HELM_INSTALL_IN_ASYNC_MODE" envDefault:"false"`
EnableHelmReleaseCache bool `env:"ENABLE_HELM_RELEASE_CACHE" envDefault:"true"`
MaxCountForHelmRelease int `env:"MAX_COUNT_FOR_HELM_RELEASE" envDefault:"20"`
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"2"`
}

func GetHelmReleaseConfig() (*HelmReleaseConfig, error) {
Expand All @@ -124,7 +122,6 @@ type HelmAppServiceImpl struct {
K8sInformer k8sInformer.K8sInformer
helmReleaseConfig *HelmReleaseConfig
k8sUtil *k8sUtils.K8sUtil
pubsubClient *pubsub_lib.PubSubClientServiceImpl
clusterRepository repository.ClusterRepository
clusterCache cache.ClusterCache
}
Expand All @@ -134,18 +131,12 @@ func NewHelmAppServiceImpl(logger *zap.SugaredLogger, k8sService K8sService,
k8sUtil *k8sUtils.K8sUtil,
clusterRepository repository.ClusterRepository,
clusterCache cache.ClusterCache) *HelmAppServiceImpl {

var pubsubClient *pubsub_lib.PubSubClientServiceImpl
if helmReleaseConfig.RunHelmInstallInAsyncMode {
pubsubClient = pubsub_lib.NewPubSubClientServiceImpl(logger)
}
helmAppServiceImpl := &HelmAppServiceImpl{
logger: logger,
k8sService: k8sService,
randSource: rand.NewSource(time.Now().UnixNano()),
K8sInformer: k8sInformer,
helmReleaseConfig: helmReleaseConfig,
pubsubClient: pubsubClient,
k8sUtil: k8sUtil,
clusterRepository: clusterRepository,
clusterCache: clusterCache,
Expand Down Expand Up @@ -833,67 +824,12 @@ func (impl HelmAppServiceImpl) installRelease(ctx context.Context, request *clie
}

impl.logger.Debugw("Installing release", "name", releaseIdentifier.ReleaseName, "namespace", releaseIdentifier.ReleaseNamespace, "dry-run", dryRun)
switch impl.helmReleaseConfig.RunHelmInstallInAsyncMode {
case false:
impl.logger.Debugw("Installing release", "name", releaseIdentifier.ReleaseName, "namespace", releaseIdentifier.ReleaseNamespace, "dry-run", dryRun)
rel, err := helmClientObj.InstallChart(context.Background(), chartSpec)
if err != nil {
impl.logger.Errorw("Error in install release ", "err", err)
return nil, err
}
//helmInstallMessage := HelmReleaseStatusConfig{
// InstallAppVersionHistoryId: int(request.InstallAppVersionHistoryId),
//}
//helmInstallMessagedata, err := impl.GetNatsMessageForHelmInstallSuccess(helmInstallMessage)
//if err != nil {
// impl.logger.Errorw("Error in parsing nats message for helm install success ", "err", err)
//}
//_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, helmInstallMessagedata)
// Install release ends
return rel, nil
case true:
go func() {
helmInstallMessage := HelmReleaseStatusConfig{
InstallAppVersionHistoryId: int(request.InstallAppVersionHistoryId),
}
// Checking release exist because there can be case when release already exist with same name
releaseExist := impl.K8sInformer.CheckReleaseExists(releaseIdentifier.ClusterConfig.ClusterId, releaseIdentifier.ReleaseName)
if releaseExist {
// release with name already exist, will not continue with release
helmInstallMessage.ErrorInInstallation = true
helmInstallMessage.IsReleaseInstalled = false
helmInstallMessage.Message = fmt.Sprintf("Release with name - %s already exist", releaseIdentifier.ReleaseName)
data, err := json.Marshal(helmInstallMessage)
if err != nil {
impl.logger.Errorw("error in marshalling nats message")
return
}
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, string(data))
}

_, err = helmClientObj.InstallChart(context.Background(), chartSpec)

if err != nil {
HelmInstallFailureNatsMessage, err := impl.GetNatsMessageForHelmInstallError(ctx, helmInstallMessage, releaseIdentifier, err)
if err != nil {
impl.logger.Errorw("Error in parsing nats message for helm install failure")
}
// in case of err we will communicate about the error to orchestrator
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, HelmInstallFailureNatsMessage)
return
}
helmInstallMessage.Message = RELEASE_INSTALLED
helmInstallMessage.IsReleaseInstalled = true
helmInstallMessage.ErrorInInstallation = false
data, err := json.Marshal(helmInstallMessage)
if err != nil {
impl.logger.Errorw("error in marshalling nats message")
}
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, string(data))
}()
rel, err := helmClientObj.InstallChart(ctx, chartSpec)
if err != nil {
impl.logger.Errorw("Error in install release ", "err", err)
return nil, err
}
// Install release ends
return nil, nil
return rel, nil
}

func (impl HelmAppServiceImpl) GetNotes(ctx context.Context, request *client.InstallReleaseRequest) (string, error) {
Expand Down Expand Up @@ -990,81 +926,28 @@ func (impl HelmAppServiceImpl) UpgradeReleaseWithChartInfo(ctx context.Context,
MaxHistory: int(request.HistoryMax),
RegistryClient: registryClient,
}

switch impl.helmReleaseConfig.RunHelmInstallInAsyncMode {
case false:
impl.logger.Debug("Upgrading release with chart info")
_, err = helmClientObj.UpgradeReleaseWithChartInfo(context.Background(), chartSpec)
if UpgradeErr, ok := err.(*driver.StorageDriverError); ok {
if UpgradeErr != nil {
if UpgradeErr.Err == driver.ErrReleaseNotFound {
_, err := helmClientObj.InstallChart(context.Background(), chartSpec)
if err != nil {
impl.logger.Errorw("Error in install release ", "err", err)
return nil, err
}

} else {
impl.logger.Errorw("Error in upgrade release with chart info", "err", err)
impl.logger.Debug("Upgrading release with chart info")
_, err = helmClientObj.UpgradeReleaseWithChartInfo(ctx, chartSpec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the async install/upgrade flow is flag driven, we should expose a flag in our gRPC payload (e.g: runInContext: bool) and use native context/ context.background() accordingly. The current flow will break for sync deployments as it will run with a default timeout now.

if UpgradeErr, ok := err.(*driver.StorageDriverError); ok {
if UpgradeErr != nil {
if UpgradeErr.Err == driver.ErrNoDeployedReleases {
_, err := helmClientObj.InstallChart(ctx, chartSpec)
if err != nil {
impl.logger.Errorw("Error in install release ", "err", err)
return nil, err

}
}
}
//helmInstallMessage := HelmReleaseStatusConfig{
// InstallAppVersionHistoryId: int(request.InstallAppVersionHistoryId),
//}
//helmInstallMessagedata, err := impl.GetNatsMessageForHelmInstallSuccess(helmInstallMessage)
//if err != nil {
// impl.logger.Errorw("Error in parsing nats message for helm install success ", "err", err)
//}
//_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, helmInstallMessagedata)
case true:
go func() {
impl.logger.Debug("Upgrading release with chart info")
_, err = helmClientObj.UpgradeReleaseWithChartInfo(context.Background(), chartSpec)
helmInstallMessage := HelmReleaseStatusConfig{
InstallAppVersionHistoryId: int(request.InstallAppVersionHistoryId),
}
var HelmInstallFailureNatsMessage string

if UpgradeErr, ok := err.(*driver.StorageDriverError); ok {
if UpgradeErr != nil {
if UpgradeErr.Err == driver.ErrReleaseNotFound {
_, err := helmClientObj.InstallChart(context.Background(), chartSpec)
if err != nil {
HelmInstallFailureNatsMessage, _ = impl.GetNatsMessageForHelmInstallError(ctx, helmInstallMessage, releaseIdentifier, err)
}
} else {
HelmInstallFailureNatsMessage, _ = impl.GetNatsMessageForHelmInstallError(ctx, helmInstallMessage, releaseIdentifier, err)
impl.logger.Errorw("Error in upgrade release with chart info", "err", err)
} else {
impl.logger.Errorw("Error in upgrade release with chart info", "err", err)
return nil, err

}
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, HelmInstallFailureNatsMessage)
return
}
} else if err != nil {
HelmInstallFailureNatsMessage, _ = impl.GetNatsMessageForHelmInstallError(ctx, helmInstallMessage, releaseIdentifier, err)
Ashish-devtron marked this conversation as resolved.
Show resolved Hide resolved
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, HelmInstallFailureNatsMessage)
return
}
helmInstallMessage.Message = RELEASE_INSTALLED
helmInstallMessage.IsReleaseInstalled = true
data, err := json.Marshal(helmInstallMessage)
if err != nil {
impl.logger.Errorw("error in marshalling nats message")
}
_ = impl.pubsubClient.Publish(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, string(data))
// Update release ends
}()
}
}

upgradeReleaseResponse := &client.UpgradeReleaseResponse{
Success: true,
}

return upgradeReleaseResponse, nil

}

func (impl HelmAppServiceImpl) IsReleaseInstalled(ctx context.Context, releaseIdentifier *client.ReleaseIdentifier) (bool, error) {
Expand Down Expand Up @@ -2104,38 +1987,6 @@ func (impl HelmAppServiceImpl) PushHelmChartToOCIRegistryRepo(ctx context.Contex
return registryPushResponse, err
}

func (impl HelmAppServiceImpl) GetNatsMessageForHelmInstallError(ctx context.Context, helmInstallMessage HelmReleaseStatusConfig, releaseIdentifier *client.ReleaseIdentifier, installationErr error) (string, error) {
helmInstallMessage.Message = installationErr.Error()
isReleaseInstalled, err := impl.IsReleaseInstalled(ctx, releaseIdentifier)
if err != nil {
impl.logger.Errorw("error in checking if release is installed or not")
return "", err
}
if isReleaseInstalled {
helmInstallMessage.IsReleaseInstalled = true
} else {
helmInstallMessage.IsReleaseInstalled = false
}
helmInstallMessage.ErrorInInstallation = true
data, err := json.Marshal(helmInstallMessage)
if err != nil {
impl.logger.Errorw("error in marshalling nats message")
return string(data), err
}
return string(data), nil
}

func (impl HelmAppServiceImpl) GetNatsMessageForHelmInstallSuccess(helmInstallMessage HelmReleaseStatusConfig) (string, error) {
helmInstallMessage.Message = RELEASE_INSTALLED
helmInstallMessage.IsReleaseInstalled = true
helmInstallMessage.ErrorInInstallation = false
data, err := json.Marshal(helmInstallMessage)
if err != nil {
impl.logger.Errorw("error in marshalling nats message")
return string(data), err
}
return string(data), nil
}
func GetClusterConfigFromClientBean(config *client.ClusterConfig) *k8sUtils.ClusterConfig {
clusterConfig := &k8sUtils.ClusterConfig{}
if config != nil {
Expand Down