diff --git a/manager/manager.go b/manager/manager.go index 70e443ea..307f0bea 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -1,8 +1,11 @@ package manager import ( + "os" + "os/signal" "path/filepath" "strings" + "syscall" "github.com/Dataman-Cloud/swan/api" "github.com/Dataman-Cloud/swan/config" @@ -106,7 +109,34 @@ func (m *Manager) Start() error { return m.start() } +func (m *Manager) stop() { + log.Println("close connection with zookeeper") + m.ZKClient.Close() + close(m.leadershipChangeCh) + close(m.errCh) +} + func (m *Manager) start() error { + // start leader election + m.startElection() + + // start tcp mux + m.startTCPMux() + + // start api server + m.startAPIServer() + + // start cluster master + m.startClusterMaster() + + // process leader change event + m.processLeaderEvent() + + // wait signal or error occured + return m.wait() +} + +func (m *Manager) startElection() { go func() { p, err := m.electLeader() if err != nil { @@ -121,31 +151,60 @@ func (m *Manager) start() error { return } }() +} +func (m *Manager) startTCPMux() { go func() { if err := m.tcpMux.ListenAndServe(); err != nil { log.Errorf("start tcpMux error: %v", err) m.errCh <- err } }() +} +func (m *Manager) startAPIServer() { go func() { if err := m.apiserver.Run(); err != nil { log.Errorf("start apiserver error: %v", err) m.errCh <- err } }() +} +func (m *Manager) startClusterMaster() { go func() { if err := m.clusterMaster.Serve(); err != nil { log.Errorf("start mole master error: %v", err) m.errCh <- err } }() +} + +// wait signal or error occured. +func (m *Manager) wait() error { + c := make(chan os.Signal) + defer close(c) + + signal.Notify(c, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGKILL) for { select { - case c := <-m.leadershipChangeCh: + case sig := <-c: + log.Printf("Got %s signal. aborting...", sig) + m.stop() + case err := <-m.errCh: + log.Errorf("Error: %v", err) + return err + } + } + + return nil +} + +// process event for leader change +func (m *Manager) processLeaderEvent() { + go func() { + for c := range m.leadershipChangeCh { switch c { case LeadershipLeader: if err := m.sched.Subscribe(); err != nil { @@ -160,10 +219,6 @@ func (m *Manager) start() error { m.clusterMaster.CloseAllAgents() m.apiserver.UpdateLeader(m.leader) } - - case err := <-m.errCh: - return err } - } - + }() }