From 4cbf1effcac5e3ccc9de378dcffb4c07aface822 Mon Sep 17 00:00:00 2001 From: pwzgorilla Date: Mon, 11 Sep 2017 21:22:32 +0800 Subject: [PATCH 1/2] Election improvement: deleted ephemeral node immediately after connection closed --- manager/manager.go | 52 +++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 70e443ea..ec1605cb 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -1,8 +1,11 @@ package manager import ( + "os" + "os/signal" "path/filepath" "strings" + "syscall" "github.com/Dataman-Cloud/swan/api" "github.com/Dataman-Cloud/swan/config" @@ -107,6 +110,11 @@ func (m *Manager) Start() error { } func (m *Manager) start() error { + defer func() { + log.Println("close connection with zookeeper") + m.ZKClient.Close() + }() + go func() { p, err := m.electLeader() if err != nil { @@ -143,24 +151,40 @@ func (m *Manager) start() error { } }() - for { - select { - case c := <-m.leadershipChangeCh: - switch c { - case LeadershipLeader: - if err := m.sched.Subscribe(); err != nil { - log.Errorf("subscribe to mesos leader error: %v", err) - m.errCh <- err + go func() { + for { + select { + case c := <-m.leadershipChangeCh: + switch c { + case LeadershipLeader: + if err := m.sched.Subscribe(); err != nil { + log.Errorf("subscribe to mesos leader error: %v", err) + m.errCh <- err + } + + m.apiserver.UpdateLeader(m.leader) + + case LeadershipFollower: + log.Warnln("became follower, closing all agents ...") + m.clusterMaster.CloseAllAgents() + m.apiserver.UpdateLeader(m.leader) } + } + } + }() - m.apiserver.UpdateLeader(m.leader) - - case LeadershipFollower: - log.Warnln("became follower, closing all agents ...") - m.clusterMaster.CloseAllAgents() - m.apiserver.UpdateLeader(m.leader) + // wait signal + sigChan := make(chan os.Signal) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGUSR1) + for { + select { + case sig := <-sigChan: + switch sig { + case syscall.SIGUSR1: + continue } + return nil case err := <-m.errCh: return err } From 06cdcf48e1421e41bfa9571d3f1e0f84892975d2 Mon Sep 17 00:00:00 2001 From: pwzgorilla Date: Tue, 12 Sep 2017 14:45:11 +0800 Subject: [PATCH 2/2] tiny improvement for manange relevant logic --- manager/manager.go | 101 +++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 35 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index ec1605cb..307f0bea 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -109,12 +109,34 @@ func (m *Manager) Start() error { return m.start() } +func (m *Manager) stop() { + log.Println("close connection with zookeeper") + m.ZKClient.Close() + close(m.leadershipChangeCh) + close(m.errCh) +} + func (m *Manager) start() error { - defer func() { - log.Println("close connection with zookeeper") - m.ZKClient.Close() - }() + // start leader election + m.startElection() + + // start tcp mux + m.startTCPMux() + // start api server + m.startAPIServer() + + // start cluster master + m.startClusterMaster() + + // process leader change event + m.processLeaderEvent() + + // wait signal or error occured + return m.wait() +} + +func (m *Manager) startElection() { go func() { p, err := m.electLeader() if err != nil { @@ -129,65 +151,74 @@ func (m *Manager) start() error { return } }() +} +func (m *Manager) startTCPMux() { go func() { if err := m.tcpMux.ListenAndServe(); err != nil { log.Errorf("start tcpMux error: %v", err) m.errCh <- err } }() +} +func (m *Manager) startAPIServer() { go func() { if err := m.apiserver.Run(); err != nil { log.Errorf("start apiserver error: %v", err) m.errCh <- err } }() +} +func (m *Manager) startClusterMaster() { go func() { if err := m.clusterMaster.Serve(); err != nil { log.Errorf("start mole master error: %v", err) m.errCh <- err } }() +} - go func() { - for { - select { - case c := <-m.leadershipChangeCh: - switch c { - case LeadershipLeader: - if err := m.sched.Subscribe(); err != nil { - log.Errorf("subscribe to mesos leader error: %v", err) - m.errCh <- err - } - - m.apiserver.UpdateLeader(m.leader) - - case LeadershipFollower: - log.Warnln("became follower, closing all agents ...") - m.clusterMaster.CloseAllAgents() - m.apiserver.UpdateLeader(m.leader) - } - } - } - }() +// wait signal or error occured. +func (m *Manager) wait() error { + c := make(chan os.Signal) + defer close(c) + + signal.Notify(c, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGKILL) - // wait signal - sigChan := make(chan os.Signal) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGUSR1) for { select { - case sig := <-sigChan: - switch sig { - case syscall.SIGUSR1: - continue - } - - return nil + case sig := <-c: + log.Printf("Got %s signal. aborting...", sig) + m.stop() case err := <-m.errCh: + log.Errorf("Error: %v", err) return err } } + return nil +} + +// process event for leader change +func (m *Manager) processLeaderEvent() { + go func() { + for c := range m.leadershipChangeCh { + switch c { + case LeadershipLeader: + if err := m.sched.Subscribe(); err != nil { + log.Errorf("subscribe to mesos leader error: %v", err) + m.errCh <- err + } + + m.apiserver.UpdateLeader(m.leader) + + case LeadershipFollower: + log.Warnln("became follower, closing all agents ...") + m.clusterMaster.CloseAllAgents() + m.apiserver.UpdateLeader(m.leader) + } + } + }() }