Skip to content

Commit

Permalink
network: fix server shutdown by waiting for goroutines to finish
Browse files Browse the repository at this point in the history
s.Shutdown() does not wait for all goroutines of the node server to
finish normally just because the server exits without dependent
goroutines awaiting. Which causes logs to attempt to write after the
test has ended. The consequence of this bug fix is that corresponding
tests are fixed.

Close #2973
Close #2974

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Feb 20, 2024
1 parent 8e4b590 commit 63ddac4
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,14 @@ type (
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32

register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}

transactions chan *transaction.Transaction

Expand All @@ -143,6 +146,8 @@ type (
started atomic.Bool
// runLoopStarted indicates that the server's main run loop has started.
runLoopStarted atomic.Bool

txHandlerLoopWG sync.WaitGroup
}

peerDrop struct {
Expand Down Expand Up @@ -185,6 +190,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
Expand Down Expand Up @@ -281,6 +289,7 @@ func (s *Server) Start() {
s.initStaleMemPools()

var txThreads = optimalNumOfThreads()
s.txHandlerLoopWG.Add(txThreads)
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
Expand Down Expand Up @@ -321,7 +330,10 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions()
}
close(s.quit)
<-s.broadcastTxFin
<-s.runProtoFin
<-s.relayFin
s.txHandlerLoopWG.Wait()

_ = s.log.Sync()
}
Expand Down Expand Up @@ -440,6 +452,7 @@ func (s *Server) run() {
s.runLoopStarted.Store(true)
defer func() {
s.runLoopStarted.Store(false)
close(s.runFin)
}()
defer addrTimer.Stop()
defer peerTimer.Stop()
Expand Down Expand Up @@ -539,6 +552,7 @@ func (s *Server) run() {

// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
defer close(s.runProtoFin)
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
Expand Down Expand Up @@ -1141,6 +1155,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
}

func (s *Server) txHandlerLoop() {
defer s.txHandlerLoopWG.Done()
txloop:
for {
select {
Expand Down Expand Up @@ -1657,6 +1672,7 @@ func (s *Server) broadcastTxLoop() {
batchSize = 42
)

defer close(s.broadcastTxFin)
txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer

Expand Down

0 comments on commit 63ddac4

Please sign in to comment.