Skip to content

Commit

Permalink
NRG: Wait for goroutines on shutdown
Browse files Browse the repository at this point in the history
Otherwise a call to `Stop()` or `Delete()` can return before the goroutines
have finished, which may create a race if the node then gets restarted
instantly.

Also don't delete on subsequent calls to `shutdown()` if already closed,
as this can potentially cause us to delete the state on disk from underneath
a new running instance of the Raft group.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Aug 26, 2024
1 parent 52edb12 commit 9613916
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ func (state RaftState) String() string {
type raft struct {
sync.RWMutex

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
group string // Raft group
sd string // Store directory
id string // Node ID
created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
group string // Raft group
sd string // Store directory
id string // Node ID
wg sync.WaitGroup // Wait for running goroutines to exit on shutdown

wal WAL // WAL store (filestore or memstore)
wtype StorageType // WAL type, e.g. FileStorage or MemoryStorage
Expand Down Expand Up @@ -524,6 +525,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
s.registerRaftNode(n.group, n)

// Start the run goroutine for the Raft state machine.
n.wg.Add(1)
s.startGoRoutine(n.run, labels)

return n, nil
Expand Down Expand Up @@ -1624,6 +1626,7 @@ func (n *raft) Delete() {

func (n *raft) shutdown(shouldDelete bool) {
n.Lock()
defer n.Unlock()

// Returned swap value is the previous state. It looks counter-intuitive
// to do this atomic operation with the lock held, but we have to do so in
Expand All @@ -1632,14 +1635,6 @@ func (n *raft) shutdown(shouldDelete bool) {
// allowing shutdown() to be called again. If that happens then the below
// close(n.quit) will panic from trying to close an already-closed channel.
if n.state.Swap(int32(Closed)) == int32(Closed) {
// If we get called again with shouldDelete, in case we were called first with Stop() cleanup
if shouldDelete {
if wal := n.wal; wal != nil {
wal.Delete()
}
os.RemoveAll(n.sd)
}
n.Unlock()
return
}

Expand All @@ -1658,6 +1653,12 @@ func (n *raft) shutdown(shouldDelete bool) {
n.c = nil
}

// Need to unlock here, otherwise the pending goroutines might get
// stuck waiting for the lock and then the shutdown will never finish.
n.Unlock()
n.wg.Wait()
n.Lock()

s, g, wal := n.s, n.group, n.wal

// Unregistering ipQueues do not prevent them from push/pop
Expand All @@ -1671,7 +1672,6 @@ func (n *raft) shutdown(shouldDelete bool) {
q.unregister()
}
sd := n.sd
n.Unlock()

s.unregisterRaftNode(g)

Expand Down Expand Up @@ -1823,6 +1823,7 @@ func (n *raft) resetElectWithLock(et time.Duration) {
// the entire life of the Raft node once started.
func (n *raft) run() {
s := n.s
defer n.wg.Done()
defer s.grWG.Done()

// We want to wait for some routing to be enabled, so we will wait for
Expand Down Expand Up @@ -2700,7 +2701,11 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.progress[ar.peer] = indexUpdates
n.Unlock()

n.s.startGoRoutine(func() { n.runCatchup(ar, indexUpdates) })
n.wg.Add(1)
n.s.startGoRoutine(func() {
defer n.wg.Done()
n.runCatchup(ar, indexUpdates)
})
}

func (n *raft) loadEntry(index uint64) (*appendEntry, error) {
Expand Down

0 comments on commit 9613916

Please sign in to comment.