Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
[5.5] restore restart phase on workers that reset hung etcd watches (#…
Browse files Browse the repository at this point in the history
…2330)

* restore restart phase on workers that reset hung etcd watches

* adjust etcd upgrade dependencies to work while etcd is out of sync

* address review feedback
  • Loading branch information
Kevin Nisbet authored Nov 23, 2020
1 parent 27c5ff8 commit 1b66030
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
26 changes: 21 additions & 5 deletions lib/update/cluster/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *PlanSuite) TestPlanWithRuntimeAppsUpdate(c *check.C) {
params.coreDNS("/bootstrap"),
params.masters(leadMaster, updates[0:1], gravityPackage, "id", "/coredns"),
params.nodes(updates[2:], leadMaster.Server, gravityPackage, "id", "/masters"),
params.etcd(leadMaster.Server, updates[0:1], *params.targetStep.etcd),
params.etcd(leadMaster.Server, updates[0:1], updates[2:], *params.targetStep.etcd),
params.config("/etcd"),
params.runtime(runtimeUpdates, "/config"),
params.migration("/runtime"),
Expand Down Expand Up @@ -393,6 +393,7 @@ func (s *PlanSuite) TestPlanWithIntermediateRuntimeUpdate(c *check.C) {
params.nodes(intermediateNodes, intermediateLeadMaster.Server, intermediateGravityPackage, "id2", "/masters"),
params.etcd(intermediateLeadMaster.Server,
intermediateOtherMasters,
intermediateNodes,
*params.steps[0].etcd),
params.config("/etcd"),
params.runtime(intermediateRuntimeUpdates, "/config"),
Expand All @@ -402,7 +403,7 @@ func (s *PlanSuite) TestPlanWithIntermediateRuntimeUpdate(c *check.C) {
params.coreDNS("/bootstrap"),
params.masters(leadMaster, otherMasters, gravityPackage, "id", "/coredns"),
params.nodes(nodes, leadMaster.Server, gravityPackage, "id", "/masters"),
params.etcd(leadMaster.Server, otherMasters, *params.targetStep.etcd),
params.etcd(leadMaster.Server, otherMasters, nodes, *params.targetStep.etcd),
params.config("/etcd"),
params.runtime(runtimeUpdates, "/config"),
),
Expand Down Expand Up @@ -1125,7 +1126,7 @@ func (r *params) bootstrapNodeVersioned(server storage.UpdateServer, version str
}
}

func (r params) etcd(leadMaster storage.Server, otherMasters []storage.UpdateServer, etcd etcdVersion) storage.OperationPhase {
func (r params) etcd(leadMaster storage.Server, otherMasters, nodes []storage.UpdateServer, etcd etcdVersion) storage.OperationPhase {
return storage.OperationPhase{
ID: "/etcd",
Description: fmt.Sprintf("Upgrade etcd %v to %v", etcd.installed, etcd.update),
Expand Down Expand Up @@ -1172,7 +1173,8 @@ func (r params) etcd(leadMaster storage.Server, otherMasters []storage.UpdateSer
Phases: []storage.OperationPhase{
r.etcdRestartLeaderNode(leadMaster),
// FIXME: assumes len(otherMasters) == 1
r.etcdRestartNode(otherMasters[0].Server),
r.etcdRestartController(otherMasters[0].Server),
r.etcdRestartWorker(nodes[0].Server),
r.etcdRestartGravity(leadMaster),
},
},
Expand Down Expand Up @@ -1255,7 +1257,7 @@ func (r params) etcdRestartLeaderNode(leadMaster storage.Server) storage.Operati
}
}

func (r params) etcdRestartNode(server storage.Server) storage.OperationPhase {
func (r params) etcdRestartController(server storage.Server) storage.OperationPhase {
t := func(format string) string {
return fmt.Sprintf(format, server.Hostname)
}
Expand All @@ -1270,6 +1272,20 @@ func (r params) etcdRestartNode(server storage.Server) storage.OperationPhase {
}
}

func (r params) etcdRestartWorker(server storage.Server) storage.OperationPhase {
t := func(format string) string {
return fmt.Sprintf(format, server.Hostname)
}
return storage.OperationPhase{
ID: t("/etcd/restart/%v"),
Description: t("Restart etcd on node %q"),
Executor: updateEtcdRestart,
Data: &storage.OperationPhaseData{
Server: &server,
},
}
}

func (r params) etcdRestartGravity(leadMaster storage.Server) storage.OperationPhase {
return storage.OperationPhase{
ID: fmt.Sprint("/etcd/restart/", constants.GravityServiceName),
Expand Down
37 changes: 21 additions & 16 deletions lib/update/cluster/phases/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/gravitational/gravity/lib/defaults"
"github.com/gravitational/gravity/lib/fsm"
"github.com/gravitational/gravity/lib/kubernetes"
"github.com/gravitational/gravity/lib/schema"
"github.com/gravitational/gravity/lib/state"
"github.com/gravitational/gravity/lib/storage"
"github.com/gravitational/gravity/lib/update"
Expand Down Expand Up @@ -79,7 +80,7 @@ func backupFile() (path string) {

func (p *PhaseUpgradeEtcdBackup) Execute(ctx context.Context) error {
p.Info("Backup etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "backup", backupFile())
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "backup", backupFile())
if err != nil {
return trace.Wrap(err, "failed to backup etcd").AddField("output", string(out))
}
Expand Down Expand Up @@ -121,7 +122,7 @@ func NewPhaseUpgradeEtcdShutdown(phase storage.OperationPhase, client *kubeapi.C

func (p *PhaseUpgradeEtcdShutdown) Execute(ctx context.Context) error {
p.Info("Shutdown etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "disable", "--stop-api")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand All @@ -131,7 +132,7 @@ func (p *PhaseUpgradeEtcdShutdown) Execute(ctx context.Context) error {

func (p *PhaseUpgradeEtcdShutdown) Rollback(ctx context.Context) error {
p.Info("Enable etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "enable")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand Down Expand Up @@ -170,13 +171,13 @@ func NewPhaseUpgradeEtcd(phase storage.OperationPhase, logger log.FieldLogger) (
func (p *PhaseUpgradeEtcd) Execute(ctx context.Context) error {
p.Info("Upgrade etcd.")
// TODO(knisbet) only wipe the etcd database when required
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "upgrade")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade")
out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "enable", "--upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand All @@ -187,13 +188,13 @@ func (p *PhaseUpgradeEtcd) Execute(ctx context.Context) error {

func (p *PhaseUpgradeEtcd) Rollback(ctx context.Context) error {
p.Info("Rollback upgrade of etcd.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "disable", "--upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "rollback")
out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "rollback")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand Down Expand Up @@ -228,7 +229,7 @@ func NewPhaseUpgradeEtcdRestore(phase storage.OperationPhase, logger log.FieldLo
// 10. Restart etcd on the correct ports on first node // API outage ends
func (p *PhaseUpgradeEtcdRestore) Execute(ctx context.Context) error {
p.Info("Restore etcd data from backup.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "restore", backupFile())
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "restore", backupFile())
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand Down Expand Up @@ -270,14 +271,18 @@ func NewPhaseUpgradeEtcdRestart(phase storage.OperationPhase, logger log.FieldLo
}

func (p *PhaseUpgradeEtcdRestart) Execute(ctx context.Context) error {
p.Info("Restart etcd after upgrade.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
// The etcd-upgrade service will not be started on the workers. The command is not safe to run on a worker, as
// the gateway service will be running, and gets detected by the service code as a running etcd instance
if p.Server.ClusterRole == string(schema.ServiceRoleMaster) {
p.Info("Restart etcd after upgrade.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "disable", "--upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("Command output: ", string(out))
}
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "enable")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand All @@ -287,13 +292,13 @@ func (p *PhaseUpgradeEtcdRestart) Execute(ctx context.Context) error {

func (p *PhaseUpgradeEtcdRestart) Rollback(ctx context.Context) error {
p.Info("Reenable etcd upgrade service.")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "disable", "--stop-api")
out, err := utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "disable", "--stop-api")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
p.Info("Command output: ", string(out))

out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "etcd", "enable", "--upgrade")
out, err = utils.RunPlanetCommand(ctx, p.FieldLogger, "--debug", "etcd", "enable", "--upgrade")
if err != nil {
return trace.Wrap(err).AddField("output", string(out))
}
Expand Down
9 changes: 8 additions & 1 deletion lib/update/cluster/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ func (r updateStep) addTo(root *builder.Phase, masters, nodes []storage.UpdateSe
root.AddParallel(r.etcdPhase(
leadMaster.Server,
serversToStorage(masters[1:]...),
serversToStorage(nodes...),
))
}
// The "config" phase pulls new teleport master config packages used
Expand Down Expand Up @@ -583,7 +584,7 @@ func (r updateStep) configPhase(nodes []storage.Server) *builder.Phase {
return root
}

func (r updateStep) etcdPhase(leadMaster storage.Server, otherMasters []storage.Server) *builder.Phase {
func (r updateStep) etcdPhase(leadMaster storage.Server, otherMasters, workers []storage.Server) *builder.Phase {
description := fmt.Sprintf("Upgrade etcd %v to %v", r.etcd.installed, r.etcd.update)
if r.etcd.installed == "" {
description = fmt.Sprintf("Upgrade etcd to %v", r.etcd.update)
Expand Down Expand Up @@ -668,6 +669,12 @@ func (r updateStep) etcdPhase(leadMaster storage.Server, otherMasters []storage.
restartMasters.AddWithDependency(builder.DependencyForServer(upgradeServers, server), p)
}

// The etcd restart phase resets any etcd clients that may hang watches to the etcd cluster. So ensure the restart
// phase is called on each worker to ensure all watches are properly reset.
for _, server := range workers {
restartMasters.AddParallel(r.etcdRestartPhase(server))
}

// also restart gravity-site, so that elections get unbroken
restartMasters.AddParallelRaw(storage.OperationPhase{
ID: constants.GravityServiceName,
Expand Down

0 comments on commit 1b66030

Please sign in to comment.