Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add events when pre- or post-upgrade check fails #3211

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions app/post_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,24 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

const (
PostUpgradeEventer = "longhorn-post-upgrade"

RetryCounts = 360
RetryInterval = 5 * time.Second
)
Expand All @@ -33,8 +41,10 @@ func PostUpgradeCmd() cli.Command {
Usage: "Specify path to kube config (optional)",
},
cli.StringFlag{
Name: FlagNamespace,
EnvVar: types.EnvPodNamespace,
Name: FlagNamespace,
EnvVar: types.EnvPodNamespace,
Required: true,
Usage: "Specify Longhorn namespace",
},
},
Action: func(c *cli.Context) {
Expand All @@ -50,34 +60,61 @@ func PostUpgradeCmd() cli.Command {

func postUpgrade(c *cli.Context) error {
namespace := c.String(FlagNamespace)
if namespace == "" {
return errors.New("namespace is required")
}

config, err := clientcmd.BuildConfigFromFlags("", c.String(FlagKubeConfig))
if err != nil {
return errors.Wrap(err, "failed to get client config")
}

eventBroadcaster, err := createEventBroadcaster(config)
if err != nil {
return errors.Wrap(err, "failed to create event broadcaster")
}
defer eventBroadcaster.Shutdown()

scheme := runtime.NewScheme()
if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to create scheme")
}

eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PostUpgradeEventer})

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to get k8s client")
}

return newPostUpgrader(namespace, kubeClient).Run()
err = newPostUpgrader(namespace, kubeClient, eventRecorder).Run()
if err != nil {
logrus.Warnf("Done with Run() ... err is %v", err)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that with the event captured, this is not as necessary, but it might be useful. I'm not sure what to use for the time to wait. The pre-upgrade job itself has spec.activeDeadlineSeconds: 900 so the pod will be killed after 15 minutes anyway, and perhaps that is a reasonable value to use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the event is emitted, does it need to sleep for minutes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. The event will last for an hour, so if a support bundle is collected in that time, the event should be there.
It would be the way to accomplish the goal in longhorn/longhorn#9448.
I can certainly take it out if preferred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it back. Without it, sometimes the panic from the "fatal" error means the event does not get created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it back. Without it, sometimes the panic from the "fatal" error means the event does not get created.

Can you elaborate more on the statement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it back. Without it, sometimes the panic from the "fatal" error means the event does not get created.

Looks like the AI suggestion can solve this one. WDYT @james-munson ?

https://github.com/longhorn/longhorn-manager/pull/3211/files#r1803865276

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The events are queued by Event/Eventf, but not necessarily propagated to the sink before the Event() call returns. They can be lost when the next thing that happens is an os.Exit as part of the log.Fatal.
However, the AI suggestion is a good one. I tested it, and it looks like eventBroadcaster.Shutdown() forces a flush of the queued events, so they show up even without a sleep to delay the exit. I have pushed up the change.


return err
}

type postUpgrader struct {
namespace string
kubeClient kubernetes.Interface
namespace string
kubeClient kubernetes.Interface
eventRecorder record.EventRecorder
}

func newPostUpgrader(namespace string, kubeClient kubernetes.Interface) *postUpgrader {
return &postUpgrader{namespace, kubeClient}
func newPostUpgrader(namespace string, kubeClient kubernetes.Interface, eventRecorder record.EventRecorder) *postUpgrader {
return &postUpgrader{namespace, kubeClient, eventRecorder}
}

func (u *postUpgrader) Run() error {
if err := u.waitManagerUpgradeComplete(); err != nil {
var err error
defer func() {
if err != nil {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer},
corev1.EventTypeWarning, constant.EventReasonFailedUpgradePostCheck, err.Error())
} else {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer},
corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "post-upgrade check passed")
}
}()

if err = u.waitManagerUpgradeComplete(); err != nil {
james-munson marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand Down
64 changes: 59 additions & 5 deletions app/pre_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
upgradeutil "github.com/longhorn/longhorn-manager/upgrade/util"
)

const (
PreUpgradeEventer = "longhorn-pre-upgrade"
)

func PreUpgradeCmd() cli.Command {
return cli.Command{
Name: "pre-upgrade",
Expand All @@ -29,8 +39,8 @@ func PreUpgradeCmd() cli.Command {
},
},
Action: func(c *cli.Context) {
logrus.Infof("Running pre-upgrade...")
defer logrus.Infof("Completed pre-upgrade.")
logrus.Info("Running pre-upgrade...")
defer logrus.Info("Completed pre-upgrade.")
james-munson marked this conversation as resolved.
Show resolved Hide resolved

if err := preUpgrade(c); err != nil {
logrus.WithError(err).Fatalf("Failed to run pre-upgrade")
Expand All @@ -47,17 +57,61 @@ func preUpgrade(c *cli.Context) error {
return errors.Wrap(err, "failed to get client config")
}

eventBroadcaster, err := createEventBroadcaster(config)
if err != nil {
return errors.Wrap(err, "failed to create event broadcaster")
}
defer eventBroadcaster.Shutdown()

scheme := runtime.NewScheme()
if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to create scheme")
}

eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PreUpgradeEventer})

lhClient, err := lhclientset.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "failed to get clientset")
}

if err := upgradeutil.CheckUpgradePath(namespace, lhClient, nil, true); err != nil {
err = newPreUpgrader(namespace, lhClient, eventRecorder).Run()
if err != nil {
logrus.Warnf("Done with Run() ... err is %v", err)
}

return err
}

type preUpgrader struct {
namespace string
lhClient lhclientset.Interface
eventRecorder record.EventRecorder
}

func newPreUpgrader(namespace string, lhClient lhclientset.Interface, eventRecorder record.EventRecorder) *preUpgrader {
return &preUpgrader{namespace, lhClient, eventRecorder}
}

func (u *preUpgrader) Run() error {
var err error
defer func() {
if err != nil {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer},
corev1.EventTypeWarning, constant.EventReasonFailedUpgradePreCheck, err.Error())
} else {
u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer},
corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "pre-upgrade check passed")
}
}()

if err = upgradeutil.CheckUpgradePath(u.namespace, u.lhClient, u.eventRecorder, true); err != nil {
return err
}

if err := environmentCheck(); err != nil {
return errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host")
if err = environmentCheck(); err != nil {
err = errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host")
return err
}

return nil
Expand Down
16 changes: 0 additions & 16 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/longhorn/longhorn-manager/constant"
"github.com/longhorn/longhorn-manager/types"
Expand Down Expand Up @@ -292,20 +290,6 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}, nil
}

func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) {
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "failed to get k8s client")
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

return eventBroadcaster, nil
}

func (job *Job) run() (err error) {
job.logger.Info("job starts running")

Expand Down
26 changes: 26 additions & 0 deletions app/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package app

import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"

clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
)

func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) {
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
return nil, errors.Wrap(err, "failed to get k8s client")
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")})

return eventBroadcaster, nil
}
5 changes: 4 additions & 1 deletion constant/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ const (
EventReasonReady = "Ready"
EventReasonUploaded = "Uploaded"

EventReasonUpgrade = "Upgrade"
EventReasonUpgrade = "Upgrade"
EventReasonFailedUpgradePreCheck = "FailedUpgradePreCheck"
EventReasonFailedUpgradePostCheck = "FailedUpgradePostCheck"
EventReasonPassedUpgradeCheck = "PassedUpgradeCheck"

EventReasonRolloutSkippedFmt = "RolloutSkipped: %v %v"

Expand Down
14 changes: 7 additions & 7 deletions upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return nil
}

// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x.
// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x.
resourceMaps := map[string]interface{}{}
if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.4.x to v1.5.0")
Expand All @@ -256,14 +256,14 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}
}
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x.
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.5.x to v1.6.0")
if err := v15xto160.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x.
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.6.x to v1.7.0")
if err := v16xto170.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand All @@ -276,7 +276,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}
}
// When lhVersionBeforeUpgrade < v1.8.0, it is v1.7.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.7.x.
// When lhVersionBeforeUpgrade < v1.8.0, it is v1.7.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.7.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.8.0") < 0 {
logrus.Info("Walking through the resource upgrade path v1.7.x to v1.8.0")
if err := v17xto180.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand All @@ -287,22 +287,22 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl
return err
}

// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x.
// When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x.
resourceMaps = map[string]interface{}{}
if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.4.x to v1.5.0")
if err := v14xto150.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x.
// When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.5.x to v1.6.0")
if err := v15xto160.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
return err
}
}
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x.
// When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x.
if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 {
logrus.Info("Walking through the resource status upgrade path v1.6.x to v1.7.0")
if err := v16xto170.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {
Expand Down