Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services: fix logging data race after shutdown #3307

Merged
merged 6 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error {
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan)
serv.AddService(&rpcServer)

go serv.Start()
serv.Start()
if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized {
// Run RPC server in a separate routine. This is necessary to avoid a potential
// deadlock: Start() can write errors to errChan which is not yet read in the
Expand Down
2 changes: 1 addition & 1 deletion internal/testcli/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
})
require.NoError(t, err)
netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
go netSrv.Start()
netSrv.Start()
errCh := make(chan error, 2)
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)
rpcServer.Start()
Expand Down
10 changes: 9 additions & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func (s *service) Start() {
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
s.lastTimestamp = b.Timestamp
s.dbft.Start(s.lastTimestamp * nsInMs)
s.Chain.SubscribeForBlocks(s.blockEvents)
go s.eventLoop()
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand All @@ -296,9 +295,18 @@ func (s *service) Shutdown() {
s.wallet.Close()
}
}
_ = s.log.Sync()
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *service) eventLoop() {
s.Chain.SubscribeForBlocks(s.blockEvents)
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved

// Manually sync up with potentially missed fresh blocks that may be added by blockchain
// before the subscription.
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
if b.Timestamp >= s.lastTimestamp {
s.handleChainBlock(b)
}
events:
for {
select {
Expand Down
1 change: 1 addition & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() {
close(bc.stopCh)
<-bc.runToExitCh
bc.addLock.Unlock()
_ = bc.log.Sync()
}

// AddBlock accepts successive block for the Blockchain, verifies it and
Expand Down
48 changes: 38 additions & 10 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ type (
lastRequestedBlock atomic.Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32

register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
register chan Peer
unregister chan peerDrop
handshake chan Peer
quit chan struct{}
relayFin chan struct{}
runFin chan struct{}
broadcastTxFin chan struct{}
runProtoFin chan struct{}

transactions chan *transaction.Transaction

Expand All @@ -138,6 +140,11 @@ type (
stateSync StateSync

log *zap.Logger

// started used to Start and Shutdown server only once.
started atomic.Bool
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved

txHandlerLoopWG sync.WaitGroup
}

peerDrop struct {
Expand Down Expand Up @@ -180,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
config: chain.GetConfig().ProtocolConfiguration,
quit: make(chan struct{}),
relayFin: make(chan struct{}),
runFin: make(chan struct{}),
broadcastTxFin: make(chan struct{}),
runProtoFin: make(chan struct{}),
register: make(chan Peer),
unregister: make(chan peerDrop),
handshake: make(chan Peer),
Expand Down Expand Up @@ -262,8 +272,12 @@ func (s *Server) ID() uint32 {
}

// Start will start the server and its underlying transport. Calling it twice
// is an error.
// is a no-op. Caller should wait for Start to finish for normal server operation.
func (s *Server) Start() {
if !s.started.CompareAndSwap(false, true) {
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
s.log.Info("node server already started")
return
}
s.log.Info("node started",
zap.Uint32("blockHeight", s.chain.BlockHeight()),
zap.Uint32("headerHeight", s.chain.HeaderHeight()))
Expand All @@ -272,6 +286,7 @@ func (s *Server) Start() {
s.initStaleMemPools()

var txThreads = optimalNumOfThreads()
s.txHandlerLoopWG.Add(txThreads)
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
Expand All @@ -285,12 +300,15 @@ func (s *Server) Start() {
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
setNeoGoVersion(config.Version)
setSeverID(strconv.FormatUint(uint64(s.id), 10))
s.run()
go s.run()
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
}

// Shutdown disconnects all peers and stops listening. Calling it twice is an error,
// once stopped the same intance of the Server can't be started again by calling Start.
// Shutdown disconnects all peers and stops listening. Calling it twice is a no-op,
// once stopped the same instance of the Server can't be started again by calling Start.
func (s *Server) Shutdown() {
if !s.started.CompareAndSwap(true, false) {
return
}
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
for _, tr := range s.transports {
tr.Close()
Expand All @@ -309,7 +327,13 @@ func (s *Server) Shutdown() {
s.notaryRequestPool.StopSubscriptions()
}
close(s.quit)
<-s.broadcastTxFin
<-s.runProtoFin
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
<-s.relayFin
<-s.runFin
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
s.txHandlerLoopWG.Wait()

_ = s.log.Sync()
}

// AddService allows to add a service to be started/stopped by Server.
Expand Down Expand Up @@ -423,6 +447,7 @@ func (s *Server) run() {
addrTimer = time.NewTimer(peerCheckTime)
peerTimer = time.NewTimer(s.ProtoTickInterval)
)
defer close(s.runFin)
defer addrTimer.Stop()
defer peerTimer.Stop()
go s.runProto()
Expand Down Expand Up @@ -521,6 +546,7 @@ func (s *Server) run() {

// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
defer close(s.runProtoFin)
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
Expand Down Expand Up @@ -1123,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
}

func (s *Server) txHandlerLoop() {
defer s.txHandlerLoopWG.Done()
txloop:
for {
select {
Expand Down Expand Up @@ -1639,6 +1666,7 @@ func (s *Server) broadcastTxLoop() {
batchSize = 42
)

defer close(s.broadcastTxFin)
txs := make([]util.Uint256, 0, batchSize)
var timer *time.Timer

Expand Down
33 changes: 29 additions & 4 deletions pkg/network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@ func TestServerStartAndShutdown(t *testing.T) {
t.Run("no consensus", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})

go s.Start()
s.Start()
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
p := newLocalPeer(t, s)
s.register <- p
require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10)

assert.True(t, s.transports[0].(*fakeTransp).started.Load())
require.True(t, s.started.Load())
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
assert.Nil(t, s.txCallback)

s.Shutdown()

require.False(t, s.started.Load())
require.True(t, s.transports[0].(*fakeTransp).closed.Load())
err, ok := p.droppedWith.Load().(error)
require.True(t, ok)
Expand All @@ -110,16 +112,39 @@ func TestServerStartAndShutdown(t *testing.T) {
cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)

go s.Start()
s.Start()
p := newLocalPeer(t, s)
s.register <- p

assert.True(t, s.services["fake"].(*fakeConsensus).started.Load())
require.True(t, s.started.Load())

s.Shutdown()

require.False(t, s.started.Load())
require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load())
})
t.Run("double start", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
startWithCleanup(t, s)

// Attempt to start the server again.
s.Start()

require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
})
t.Run("double shutdown", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
s.Start()
require.True(t, s.started.Load(), "server should still be marked as started after second Start call")
s.Shutdown()

require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call")
// Attempt to shutdown the server again.
s.Shutdown()
// Verify the server state remains unchanged and is still considered shutdown.
require.False(t, s.started.Load(), "server should remain shutdown after second call")
})
}

func TestServerRegisterPeer(t *testing.T) {
Expand Down Expand Up @@ -312,7 +337,7 @@ func TestServerNotSendsVerack(t *testing.T) {
s.id = 1
finished := make(chan struct{})
go func() {
s.run()
go s.run()
close(finished)
}()
t.Cleanup(func() {
Expand Down Expand Up @@ -389,7 +414,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser
}

func startWithCleanup(t *testing.T, s *Server) {
go s.Start()
s.Start()
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
AnnaShaleva marked this conversation as resolved.
Show resolved Hide resolved
t.Cleanup(func() {
s.Shutdown()
})
Expand Down
1 change: 1 addition & 0 deletions pkg/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ func (ms *Service) ShutDown() {
ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err))
}
}
_ = ms.log.Sync()
}
5 changes: 3 additions & 2 deletions pkg/services/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func (n *Notary) Start() {
return
}
n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
go n.newTxCallbackLoop()
go n.mainLoop()
}

func (n *Notary) mainLoop() {
n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh)
mainloop:
for {
select {
Expand Down Expand Up @@ -228,6 +228,7 @@ func (n *Notary) Shutdown() {
close(n.stopCh)
<-n.done
n.wallet.Close()
_ = n.Config.Log.Sync()
}

// IsAuthorized returns whether Notary service currently is authorized to collect
Expand Down
1 change: 1 addition & 0 deletions pkg/services/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() {
o.ResponseHandler.Shutdown()
<-o.done
o.wallet.Close()
_ = o.Log.Sync()
}

// Start runs the oracle service in a separate goroutine.
Expand Down
1 change: 1 addition & 0 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (s *Server) Shutdown() {

// Wait for handleSubEvents to finish.
<-s.subEventsToExitCh
_ = s.log.Sync()
}

// SetOracleHandler allows to update oracle handler used by the Server.
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/rpcsrv/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) {
defer chain.Close()
defer rpcSrv.Shutdown()

go rpcSrv.coreServer.Start()
rpcSrv.coreServer.Start()
defer rpcSrv.coreServer.Shutdown()

for _, feed := range subFeeds {
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
}

chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
go rpcSrv.coreServer.Start()
rpcSrv.coreServer.Start()

defer chain.Close()
defer rpcSrv.Shutdown()
Expand Down
3 changes: 2 additions & 1 deletion pkg/services/stateroot/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func (s *service) Start() {
return
}
s.log.Info("starting state validation service")
s.chain.SubscribeForBlocks(s.blockCh)
go s.run()
}

func (s *service) run() {
s.chain.SubscribeForBlocks(s.blockCh)
runloop:
for {
select {
Expand Down Expand Up @@ -77,6 +77,7 @@ func (s *service) Shutdown() {
if s.wallet != nil {
s.wallet.Close()
}
_ = s.log.Sync()
}

func (s *service) signAndSend(r *state.MPTRoot) error {
Expand Down
Loading