Skip to content

Commit

Permalink
Merge pull request #3307 from nspcc-dev/test-register-peers
Browse files Browse the repository at this point in the history
services: fix logging data race after shutdown
  • Loading branch information
roman-khimov authored Feb 27, 2024
2 parents 2248089 + 5cf0c75 commit cc38221
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error {
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer)

go serv.Start()
serv.Start()
if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized {
// Run RPC server in a separate routine. This is necessary to avoid a potential
// deadlock: Start() can write errors to errChan which is not yet read in the
Expand Down
2 changes: 1 addition & 1 deletion internal/testcli/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
})
require.NoError(t, err)
netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
go netSrv.Start()
netSrv.Start()
errCh := make(chan error, 2)
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)
rpcServer.Start()
Expand Down
10 changes: 9 additions & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func (s *service) Start() {
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
s.lastTimestamp = b.Timestamp
s.dbft.Start(s.lastTimestamp * nsInMs)
s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop()
}
}
Expand All @@ -296,9 +295,18 @@ func (s *service) Shutdown() {
s.wallet.Close()
}
}
_ = s.log.Sync()
}

func (s *service) eventLoop() {
s.Chain.SubscribeForBlocks(s.blockEvents)

// Manually sync up with potentially missed fresh blocks that may be added by blockchain
// before the subscription.
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
if b.Timestamp >= s.lastTimestamp {
s.handleChainBlock(b)
}
events:
for {
select {
Expand Down
1 change: 1 addition & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() {
close(bc.stopCh)
<-bc.runToExitCh
bc.addLock.Unlock()
_ = bc.log.Sync()
}

// AddBlock accepts successive block for the Blockchain, verifies it and
Expand Down
48 changes: 38 additions & 10 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ type (
lastRequestedBlock atomic.Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32

register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}

transactions chan *transaction.Transaction

Expand All @@ -138,6 +140,11 @@ type (
stateSync StateSync

log *zap.Logger

// started used to Start and Shutdown server only once.
started atomic.Bool

txHandlerLoopWG sync.WaitGroup
}

peerDrop struct {
Expand Down Expand Up @@ -180,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
Expand Down Expand Up @@ -262,8 +272,12 @@ func (s *Server) ID() uint32 {
}

// Start will start the server and its underlying transport. Calling it twice
// is an error.
// is a no-op. Caller should wait for Start to finish for normal server operation.
func (s *Server) Start() {
if !s.started.CompareAndSwap(false, true) {
s.log.Info("node server already started")
return
}
s.log.Info("node started",
zap.Uint32("blockHeight", s.chain.BlockHeight()),
zap.Uint32("headerHeight", s.chain.HeaderHeight()))
Expand All @@ -272,6 +286,7 @@ func (s *Server) Start() {
s.initStaleMemPools()

var txThreads = optimalNumOfThreads()
s.txHandlerLoopWG.Add(txThreads)
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
Expand All @@ -285,12 +300,15 @@ func (s *Server) Start() {
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
setNeoGoVersion(config.Version)
setSeverID(strconv.FormatUint(uint64(s.id), 10))
s.run()
go s.run()
}

// Shutdown disconnects all peers and stops listening. Calling it twice is an error,
// once stopped the same intance of the Server can't be started again by calling Start.
// Shutdown disconnects all peers and stops listening. Calling it twice is a no-op,
// once stopped the same instance of the Server can't be started again by calling Start.
func (s *Server) Shutdown() {
if !s.started.CompareAndSwap(true, false) {
return
}
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
for _, tr := range s.transports {
tr.Close()
Expand All @@ -309,7 +327,13 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions()
}
close(s.quit)
<-s.broadcastTxFin
<-s.runProtoFin
<-s.relayFin
<-s.runFin
s.txHandlerLoopWG.Wait()

_ = s.log.Sync()
}

// AddService allows to add a service to be started/stopped by Server.
Expand Down Expand Up @@ -423,6 +447,7 @@ func (s *Server) run() {
addrTimer = time.NewTimer(peerCheckTime)
peerTimer = time.NewTimer(s.ProtoTickInterval)
)
defer close(s.runFin)
defer addrTimer.Stop()
defer peerTimer.Stop()
go s.runProto()
Expand Down Expand Up @@ -521,6 +546,7 @@ func (s *Server) run() {

// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
defer close(s.runProtoFin)
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
Expand Down Expand Up @@ -1123,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
}

func (s *Server) txHandlerLoop() {
defer s.txHandlerLoopWG.Done()
txloop:
for {
select {
Expand Down Expand Up @@ -1639,6 +1666,7 @@ func (s *Server) broadcastTxLoop() {
batchSize = 42
)

defer close(s.broadcastTxFin)
txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer

Expand Down
33 changes: 29 additions & 4 deletions pkg/network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@ func TestServerStartAndShutdown(t *testing.T) {
t.Run("no consensus", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})

go s.Start()
s.Start()
p := newLocalPeer(t, s)
s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)

assert.True(t, s.transports[0].(*fakeTransp).started.Load())
require.True(t, s.started.Load())
assert.Nil(t, s.txCallback)

s.Shutdown()

require.False(t, s.started.Load())
require.True(t, s.transports[0].(*fakeTransp).closed.Load())
err, ok := p.droppedWith.Load().(error)
require.True(t, ok)
Expand All @@ -110,16 +112,39 @@ func TestServerStartAndShutdown(t *testing.T) {
cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)

go s.Start()
s.Start()
p := newLocalPeer(t, s)
s.register <- p

assert.True(t, s.services["fake"].(*fakeConsensus).started.Load())
require.True(t, s.started.Load())

s.Shutdown()

require.False(t, s.started.Load())
require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load())
})
t.Run("double start", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
startWithCleanup(t, s)

// Attempt to start the server again.
s.Start()

require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
})
t.Run("double shutdown", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
s.Start()
require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
s.Shutdown()

require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call")
// Attempt to shutdown the server again.
s.Shutdown()
// Verify the server state remains unchanged and is still considered shutdown.
require.False(t, s.started.Load(), "server should remain shutdown after second call")
})
}

func TestServerRegisterPeer(t *testing.T) {
Expand Down Expand Up @@ -312,7 +337,7 @@ func TestServerNotSendsVerack(t *testing.T) {
s.id = 1
finished := make(chan struct{})
go func() {
s.run()
go s.run()
close(finished)
}()
t.Cleanup(func() {
Expand Down Expand Up @@ -389,7 +414,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser
}

func startWithCleanup(t *testing.T, s *Server) {
go s.Start()
s.Start()
t.Cleanup(func() {
s.Shutdown()
})
Expand Down
1 change: 1 addition & 0 deletions pkg/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ func (ms *Service) ShutDown() {
ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err))
}
}
_ = ms.log.Sync()
}
5 changes: 3 additions & 2 deletions pkg/services/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func (n *Notary) Start() {
return
}
n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop()
go n.mainLoop()
}

func (n *Notary) mainLoop() {
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
mainloop:
for {
select {
Expand Down Expand Up @@ -228,6 +228,7 @@ func (n *Notary) Shutdown() {
close(n.stopCh)
<-n.done
n.wallet.Close()
_ = n.Config.Log.Sync()
}

// IsAuthorized returns whether Notary service currently is authorized to collect
Expand Down
1 change: 1 addition & 0 deletions pkg/services/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() {
o.ResponseHandler.Shutdown()
<-o.done
o.wallet.Close()
_ = o.Log.Sync()
}

// Start runs the oracle service in a separate goroutine.
Expand Down
1 change: 1 addition & 0 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (s *Server) Shutdown() {

// Wait for handleSubEvents to finish.
<-s.subEventsToExitCh
_ = s.log.Sync()
}

// SetOracleHandler allows to update oracle handler used by the Server.
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/rpcsrv/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) {
defer chain.Close()
defer rpcSrv.Shutdown()

go rpcSrv.coreServer.Start()
rpcSrv.coreServer.Start()
defer rpcSrv.coreServer.Shutdown()

for _, feed := range subFeeds {
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
}

chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
go rpcSrv.coreServer.Start()
rpcSrv.coreServer.Start()

defer chain.Close()
defer rpcSrv.Shutdown()
Expand Down
3 changes: 2 additions & 1 deletion pkg/services/stateroot/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func (s *service) Start() {
return
}
s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run()
}

func (s *service) run() {
s.chain.SubscribeForBlocks(s.blockCh)
runloop:
for {
select {
Expand Down Expand Up @@ -77,6 +77,7 @@ func (s *service) Shutdown() {
if s.wallet != nil {
s.wallet.Close()
}
_ = s.log.Sync()
}

func (s *service) signAndSend(r *state.MPTRoot) error {
Expand Down

0 comments on commit cc38221

Please sign in to comment.