Skip to content

Commit

Permalink
fix: add events when pre- or post-upgrade check fails
Browse files Browse the repository at this point in the history
Signed-off-by: James Munson <[email protected]>
  • Loading branch information
james-munson committed Oct 16, 2024
1 parent 8387bcf commit eaa24a8
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 41 deletions.
61 changes: 49 additions & 12 deletions app/post_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

const (
PostUpgradeEventer = "longhorn-post-upgrade"

RetryCounts = 360
RetryInterval = 5 * time.Second
)
Expand All @@ -33,8 +41,10 @@ func PostUpgradeCmd() cli.Command {
Usage: "Specify path to kube config (optional)",
},
cli.StringFlag{
Name: FlagNamespace,
EnvVar: types.EnvPodNamespace,
Name: FlagNamespace,
EnvVar: types.EnvPodNamespace,
Required: true,
Usage: "Specify Longhorn namespace",
},
},
Action: func(c *cli.Context) {
Expand All @@ -50,34 +60,61 @@ func PostUpgradeCmd() cli.Command {

func postUpgrade(c *cli.Context) error {
namespace := c.String(FlagNamespace)
if namespace == "" {
return errors.New("namespace is required")
}

config, err := clientcmd.BuildConfigFromFlags("", c.String(FlagKubeConfig))
if err != nil {
return errors.Wrap(err, "failed to get client config")
}

eventBroadcaster, err := createEventBroadcaster(config)
if err != nil {
return errors.Wrap(err, "failed to create event broadcaster")
}

scheme := runtime.NewScheme()
if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to create scheme")
}

eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PostUpgradeEventer})

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to get k8s client")
}

return newPostUpgrader(namespace, kubeClient).Run()
err = newPostUpgrader(namespace, kubeClient, eventRecorder).Run()
if err != nil {
// Hang around so that event can happen before fatal exit.
time.Sleep(5 * time.Minute)
}

return err
}

type postUpgrader struct {
namespace string
kubeClient kubernetes.Interface
namespace string
kubeClient kubernetes.Interface
eventRecorder record.EventRecorder
}

func newPostUpgrader(namespace string, kubeClient kubernetes.Interface) *postUpgrader {
return &postUpgrader{namespace, kubeClient}
func newPostUpgrader(namespace string, kubeClient kubernetes.Interface, eventRecorder record.EventRecorder) *postUpgrader {
return &postUpgrader{namespace, kubeClient, eventRecorder}
}

func (u *postUpgrader) Run() error {
if err := u.waitManagerUpgradeComplete(); err != nil {
var err error
defer func() {
if err != nil {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer},
corev1.EventTypeWarning, constant.EventReasonFailedUpgradePostCheck, err.Error())
} else {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer},
corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "post-upgrade check passed")
}
}()

if err = u.waitManagerUpgradeComplete(); err != nil {
return err
}

Expand Down
67 changes: 62 additions & 5 deletions app/pre_upgrade.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
package app

import (
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
upgradeutil "github.com/longhorn/longhorn-manager/upgrade/util"
)

const (
PreUpgradeEventer = "longhorn-pre-upgrade"
)

func PreUpgradeCmd() cli.Command {
return cli.Command{
Name: "pre-upgrade",
Expand All @@ -29,8 +41,8 @@ func PreUpgradeCmd() cli.Command {
},
},
Action: func(c *cli.Context) {
logrus.Infof("Running pre-upgrade...")
defer logrus.Infof("Completed pre-upgrade.")
logrus.Info("Running pre-upgrade...")
defer logrus.Info("Completed pre-upgrade.")

if err := preUpgrade(c); err != nil {
logrus.WithError(err).Fatalf("Failed to run pre-upgrade")
Expand All @@ -47,17 +59,62 @@ func preUpgrade(c *cli.Context) error {
return errors.Wrap(err, "failed to get client config")
}

eventBroadcaster, err := createEventBroadcaster(config)
if err != nil {
return errors.Wrap(err, "failed to create event broadcaster")
}

scheme := runtime.NewScheme()
if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to create scheme")
}

eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PreUpgradeEventer})

lhClient, err := lhclientset.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to get clientset")
}

if err := upgradeutil.CheckUpgradePath(namespace, lhClient, nil, true); err != nil {
err = newPreUpgrader(namespace, lhClient, eventRecorder).Run()
if err != nil {
logrus.Warnf("Done with Run() ... err is %v", err)
// Hang around so that event can happen before fatal exit.
time.Sleep(5 * time.Minute)
}

return err
}

type preUpgrader struct {
namespace string
lhClient lhclientset.Interface
eventRecorder record.EventRecorder
}

func newPreUpgrader(namespace string, lhClient lhclientset.Interface, eventRecorder record.EventRecorder) *preUpgrader {
return &preUpgrader{namespace, lhClient, eventRecorder}
}

func (u *preUpgrader) Run() error {
var err error
defer func() {
if err != nil {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer},
corev1.EventTypeWarning, constant.EventReasonFailedUpgradePreCheck, err.Error())
} else {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer},
corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "pre-upgrade check passed")
}
}()

if err = upgradeutil.CheckUpgradePath(u.namespace, u.lhClient, u.eventRecorder, true); err != nil {
return err
}

if err := environmentCheck(); err != nil {
return errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host")
if err = environmentCheck(); err != nil {
err = errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host")
return err
}

return nil
Expand Down
16 changes: 0 additions & 16 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"
Expand Down Expand Up @@ -292,20 +290,6 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}, nil
}

func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) {
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "failed to get k8s client")
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

return eventBroadcaster, nil
}

func (job *Job) run() (err error) {
job.logger.Info("job starts running")

Expand Down
26 changes: 26 additions & 0 deletions app/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package app

import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
)

func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) {
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "failed to get k8s client")
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

return eventBroadcaster, nil
}
5 changes: 4 additions & 1 deletion constant/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ const (
EventReasonReady = "Ready"
EventReasonUploaded = "Uploaded"

EventReasonUpgrade = "Upgrade"
EventReasonUpgrade = "Upgrade"
EventReasonFailedUpgradePreCheck = "FailedUpgradePreCheck"
EventReasonFailedUpgradePostCheck = "FailedUpgradePostCheck"
EventReasonPassedUpgradeCheck = "PassedUpgradeCheck"

EventReasonRolloutSkippedFmt = "RolloutSkipped: %v %v"

Expand Down
14 changes: 7 additions & 7 deletions upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return nil
}

// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x.
// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x.
resourceMaps := map[string]interface{}{}
if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.4.x to v1.5.0")
Expand All @@ -256,14 +256,14 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}
}
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x.
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.5.x to v1.6.0")
if err := v15xto160.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x.
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.6.x to v1.7.0")
if err := v16xto170.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand All @@ -276,7 +276,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}
}
// When lhVersionBeforeUpgrade < v1.8.0, it is v1.7.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.7.x.
// When lhVersionBeforeUpgrade < v1.8.0, it is v1.7.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.7.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.8.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.7.x to v1.8.0")
if err := v17xto180.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand All @@ -287,22 +287,22 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}

// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x.
// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x.
resourceMaps = map[string]interface{}{}
if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.4.x to v1.5.0")
if err := v14xto150.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x.
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.5.x to v1.6.0")
if err := v15xto160.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x.
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.6.x to v1.7.0")
if err := v16xto170.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand Down

0 comments on commit eaa24a8

Please sign in to comment.