From f8dc5ec44f5ed855ed78c82fe03479eb2da22c19 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 18:07:28 +0300 Subject: [PATCH 1/6] network: change server Start() behavior Previously user should Start server in a separate goroutine. Now separate goroutine is created inside the Start(). For normal server operation, the caller should wait for Start to finish. Also, fixed TestTryInitStateSync test which was exiting earlier than logs are called. Close #3112 Signed-off-by: Ekaterina Pavlova --- cli/server/server.go | 2 +- internal/testcli/executor.go | 2 +- pkg/network/server.go | 4 ++-- pkg/network/server_test.go | 8 ++++---- pkg/services/rpcsrv/subscription_test.go | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index f2670d599d..d51b701598 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -498,7 +498,7 @@ func startServer(ctx *cli.Context) error { rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, serv, oracleSrv, log, errChan) serv.AddService(&rpcServer) - go serv.Start() + serv.Start() if !cfg.ApplicationConfiguration.RPC.StartWhenSynchronized { // Run RPC server in a separate routine. This is necessary to avoid a potential // deadlock: Start() can write errors to errChan which is not yet read in the diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index 0dfa5be135..5fec5e06ae 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -164,7 +164,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch }) require.NoError(t, err) netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go netSrv.Start() + netSrv.Start() errCh := make(chan error, 2) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) rpcServer.Start() diff --git a/pkg/network/server.go b/pkg/network/server.go index e000c18c56..0e919cbdda 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -262,7 +262,7 @@ func (s *Server) ID() uint32 { } // Start will start the server and its underlying transport. Calling it twice -// is an error. +// is an error. Caller should wait for Start to finish for normal server operation. func (s *Server) Start() { s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), @@ -285,7 +285,7 @@ func (s *Server) Start() { setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setNeoGoVersion(config.Version) setSeverID(strconv.FormatUint(uint64(s.id), 10)) - s.run() + go s.run() } // Shutdown disconnects all peers and stops listening. Calling it twice is an error, diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 2ca95885fa..7dbce881dc 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -90,7 +90,7 @@ func TestServerStartAndShutdown(t *testing.T) { t.Run("no consensus", func(t *testing.T) { s := newTestServer(t, ServerConfig{}) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) @@ -110,7 +110,7 @@ func TestServerStartAndShutdown(t *testing.T) { cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - go s.Start() + s.Start() p := newLocalPeer(t, s) s.register <- p @@ -312,7 +312,7 @@ func TestServerNotSendsVerack(t *testing.T) { s.id = 1 finished := make(chan struct{}) go func() { - s.run() + go s.run() close(finished) }() t.Cleanup(func() { @@ -389,7 +389,7 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser } func startWithCleanup(t *testing.T, s *Server) { - go s.Start() + s.Start() t.Cleanup(func() { s.Shutdown() }) diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index 857f3b2d1b..d5a2919032 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -99,7 +99,7 @@ func TestSubscriptions(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer rpcSrv.coreServer.Shutdown() for _, feed := range subFeeds { @@ -395,7 +395,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { } chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) - go rpcSrv.coreServer.Start() + rpcSrv.coreServer.Start() defer chain.Close() defer rpcSrv.Shutdown() From 4715e523e09c4ca49c6c8c538165ad74914b7693 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 22 Feb 2024 16:50:58 +0300 Subject: [PATCH 2/6] services: move blockchain/mempool subscriptions to separate routine Start of some services is bound to blockchain subscriptions, and thus, can't be run before the blockchain notifications dispatcher. Signed-off-by: Ekaterina Pavlova --- pkg/consensus/consensus.go | 9 ++++++++- pkg/services/notary/notary.go | 4 ++-- pkg/services/stateroot/validators.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index c40e72b16e..47aab9c5ad 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -281,7 +281,6 @@ func (s *service) Start() { b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! s.lastTimestamp = b.Timestamp s.dbft.Start(s.lastTimestamp * nsInMs) - s.Chain.SubscribeForBlocks(s.blockEvents) go s.eventLoop() } } @@ -299,6 +298,14 @@ func (s *service) Shutdown() { } func (s *service) eventLoop() { + s.Chain.SubscribeForBlocks(s.blockEvents) + + // Manually sync up with potentially missed fresh blocks that may be added by blockchain + // before the subscription. + b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! + if b.Timestamp >= s.lastTimestamp { + s.handleChainBlock(b) + } events: for { select { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index 7593c5a2a3..f824e2ff1f 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -175,13 +175,13 @@ func (n *Notary) Start() { return } n.Config.Log.Info("starting notary service") - n.Config.Chain.SubscribeForBlocks(n.blocksCh) - n.mp.SubscribeForTransactions(n.reqCh) go n.newTxCallbackLoop() go n.mainLoop() } func (n *Notary) mainLoop() { + n.Config.Chain.SubscribeForBlocks(n.blocksCh) + n.mp.SubscribeForTransactions(n.reqCh) mainloop: for { select { diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index a62e758925..afaad86176 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -29,11 +29,11 @@ func (s *service) Start() { return } s.log.Info("starting state validation service") - s.chain.SubscribeForBlocks(s.blockCh) go s.run() } func (s *service) run() { + s.chain.SubscribeForBlocks(s.blockCh) runloop: for { select { From 775c56e87e742a53f5d117601346c691ac2a4b08 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Sun, 18 Feb 2024 15:27:52 +0300 Subject: [PATCH 3/6] network: ensure server is started and shut down only once Use started atomic.Bool field to ensure that the node server shutdown procedure is executed only once. Prevent the following panic caused by server double-shutdown in testing code: ``` --- FAIL: TestServerRegisterPeer (0 .06s) panic: closed twice goroutine 60 [running]: testing.tRunner.func1.2({0x104c40b20, 0x104d0ec90}) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1545 +0x1c8 testing.tRunner.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1548 +0x360 panic({0x104c40b20?, 0x104d0ec90?}) /opt/homebrew/opt/go/libexec/src/runtime/panic.go:914 +0x218 github.com/nspcc-dev/neo-go/pkg/network.(*fakeTransp).Close (0x14000159e08?) /Users/ekaterinapavlova/Workplace/neo-go/pkg/network /discovery_test.go:83 +0x54 github.com/nspcc-dev/neo-go/pkg/network.(*Server).Shutdown (0x14000343400) /Users/ekaterinapavlova/Workplace/neo-go/pkg/network/server.go:299 +0x104 github.com/nspcc-dev/neo-go/pkg/network.startWithCleanup.func1() /Users/ekaterinapavlova/Workplace/neo-go/pkg/network/server_test .go:408 +0x20 testing.(*common).Cleanup.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1169 +0x110 testing.(*common).runCleanup(0x1400032c340, 0x14000159d80?) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1347 +0xd8 testing.tRunner.func2() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1589 +0x2c testing.tRunner(0x1400032c340, 0x104d0c5d0) /opt/homebrew/opt/go/libexec/src/testing/testing.go:1601 +0x114 created by testing.(*T).Run in goroutine 1 /opt/homebrew/opt/go/libexec/src/testing/testing.go:1648 +0x33c ``` Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 14 ++++++++++++-- pkg/network/server_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 0e919cbdda..b97cf2dd16 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -138,6 +138,9 @@ type ( stateSync StateSync log *zap.Logger + + // started used to Start and Shutdown server only once. + started atomic.Bool } peerDrop struct { @@ -262,8 +265,12 @@ func (s *Server) ID() uint32 { } // Start will start the server and its underlying transport. Calling it twice -// is an error. Caller should wait for Start to finish for normal server operation. +// is a no-op. Caller should wait for Start to finish for normal server operation. func (s *Server) Start() { + if !s.started.CompareAndSwap(false, true) { + s.log.Info("node server already started") + return + } s.log.Info("node started", zap.Uint32("blockHeight", s.chain.BlockHeight()), zap.Uint32("headerHeight", s.chain.HeaderHeight())) @@ -288,9 +295,12 @@ func (s *Server) Start() { go s.run() } -// Shutdown disconnects all peers and stops listening. Calling it twice is an error, +// Shutdown disconnects all peers and stops listening. Calling it twice is a no-op, // once stopped the same intance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { + if !s.started.CompareAndSwap(true, false) { + return + } s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) for _, tr := range s.transports { tr.Close() diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 7dbce881dc..c1b5d96dc4 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -96,10 +96,12 @@ func TestServerStartAndShutdown(t *testing.T) { require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) assert.True(t, s.transports[0].(*fakeTransp).started.Load()) + require.True(t, s.started.Load()) assert.Nil(t, s.txCallback) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.transports[0].(*fakeTransp).closed.Load()) err, ok := p.droppedWith.Load().(error) require.True(t, ok) @@ -115,11 +117,34 @@ func TestServerStartAndShutdown(t *testing.T) { s.register <- p assert.True(t, s.services["fake"].(*fakeConsensus).started.Load()) + require.True(t, s.started.Load()) s.Shutdown() + require.False(t, s.started.Load()) require.True(t, s.services["fake"].(*fakeConsensus).stopped.Load()) }) + t.Run("double start", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + startWithCleanup(t, s) + + // Attempt to start the server again. + s.Start() + + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + }) + t.Run("double shutdown", func(t *testing.T) { + s := newTestServer(t, ServerConfig{}) + s.Start() + require.True(t, s.started.Load(), "server should still be marked as started after second Start call") + s.Shutdown() + + require.False(t, s.started.Load(), "server should be marked as not started after second Shutdown call") + // Attempt to shutdown the server again. + s.Shutdown() + // Verify the server state remains unchanged and is still considered shutdown. + require.False(t, s.started.Load(), "server should remain shutdown after second call") + }) } func TestServerRegisterPeer(t *testing.T) { From 9b540770cd333b06b81e4a181b172983566db6ad Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 21:14:12 +0300 Subject: [PATCH 4/6] network: fix typo Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index b97cf2dd16..814d9e6f5b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -296,7 +296,7 @@ func (s *Server) Start() { } // Shutdown disconnects all peers and stops listening. Calling it twice is a no-op, -// once stopped the same intance of the Server can't be started again by calling Start. +// once stopped the same instance of the Server can't be started again by calling Start. func (s *Server) Shutdown() { if !s.started.CompareAndSwap(true, false) { return From 5bb7c6b715daf6dc032e9900c87da6a213ccb456 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Sun, 18 Feb 2024 15:29:04 +0300 Subject: [PATCH 5/6] services: update logs flush after services shutdown Added sync logs for every service separately to provide the ability to have a custom logger for each service. This commit makes the code follow the zap usages rules: `Sync calls the underlying Core's Sync method, flushing any buffered log entries. Applications should take care to call Sync before exiting.` Signed-off-by: Ekaterina Pavlova --- pkg/consensus/consensus.go | 1 + pkg/core/blockchain.go | 1 + pkg/network/server.go | 2 ++ pkg/services/metrics/metrics.go | 1 + pkg/services/notary/notary.go | 1 + pkg/services/oracle/oracle.go | 1 + pkg/services/rpcsrv/server.go | 1 + pkg/services/stateroot/validators.go | 1 + 8 files changed, 9 insertions(+) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 47aab9c5ad..2eb9d6e156 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -295,6 +295,7 @@ func (s *service) Shutdown() { s.wallet.Close() } } + _ = s.log.Sync() } func (s *service) eventLoop() { diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 73f95b95ca..aa1a0f3247 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1447,6 +1447,7 @@ func (bc *Blockchain) Close() { close(bc.stopCh) <-bc.runToExitCh bc.addLock.Unlock() + _ = bc.log.Sync() } // AddBlock accepts successive block for the Blockchain, verifies it and diff --git a/pkg/network/server.go b/pkg/network/server.go index 814d9e6f5b..6132e69ff9 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -320,6 +320,8 @@ func (s *Server) Shutdown() { } close(s.quit) <-s.relayFin + + _ = s.log.Sync() } // AddService allows to add a service to be started/stopped by Server. diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index e0f12a1ad8..039182bea5 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -75,4 +75,5 @@ func (ms *Service) ShutDown() { ms.log.Error("can't shut service down", zap.String("endpoint", srv.Addr), zap.Error(err)) } } + _ = ms.log.Sync() } diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index f824e2ff1f..205891c274 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -228,6 +228,7 @@ func (n *Notary) Shutdown() { close(n.stopCh) <-n.done n.wallet.Close() + _ = n.Config.Log.Sync() } // IsAuthorized returns whether Notary service currently is authorized to collect diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 8bce1bf952..b9612f4a02 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -192,6 +192,7 @@ func (o *Oracle) Shutdown() { o.ResponseHandler.Shutdown() <-o.done o.wallet.Close() + _ = o.Log.Sync() } // Start runs the oracle service in a separate goroutine. diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 5c4e2e9f63..87db5b4409 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -482,6 +482,7 @@ func (s *Server) Shutdown() { // Wait for handleSubEvents to finish. <-s.subEventsToExitCh + _ = s.log.Sync() } // SetOracleHandler allows to update oracle handler used by the Server. diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index afaad86176..857ecba9e0 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -77,6 +77,7 @@ func (s *service) Shutdown() { if s.wallet != nil { s.wallet.Close() } + _ = s.log.Sync() } func (s *service) signAndSend(r *state.MPTRoot) error { From 5cf0c757448b3e9556e41dad8f7f4082b9e3c5a3 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 12:15:13 +0300 Subject: [PATCH 6/6] network: fix server shutdown by waiting for goroutines to finish s.Shutdown() does not wait for all goroutines of the node server to finish normally just because the server exits without dependent goroutines awaiting. Which causes logs to attempt to write after the test has ended. The consequence of this bug fix is that corresponding tests are fixed. Close #2973 Close #2974 Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6132e69ff9..5bc4b3c8f3 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -124,12 +124,14 @@ type ( lastRequestedBlock atomic.Uint32 // lastRequestedHeader contains a height of the last requested header. lastRequestedHeader atomic.Uint32 - - register chan Peer - unregister chan peerDrop - handshake chan Peer - quit chan struct{} - relayFin chan struct{} + register chan Peer + unregister chan peerDrop + handshake chan Peer + quit chan struct{} + relayFin chan struct{} + runFin chan struct{} + broadcastTxFin chan struct{} + runProtoFin chan struct{} transactions chan *transaction.Transaction @@ -141,6 +143,8 @@ type ( // started used to Start and Shutdown server only once. started atomic.Bool + + txHandlerLoopWG sync.WaitGroup } peerDrop struct { @@ -183,6 +187,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy config: chain.GetConfig().ProtocolConfiguration, quit: make(chan struct{}), relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), handshake: make(chan Peer), @@ -279,6 +286,7 @@ func (s *Server) Start() { s.initStaleMemPools() var txThreads = optimalNumOfThreads() + s.txHandlerLoopWG.Add(txThreads) for i := 0; i < txThreads; i++ { go s.txHandlerLoop() } @@ -319,7 +327,11 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.broadcastTxFin + <-s.runProtoFin <-s.relayFin + <-s.runFin + s.txHandlerLoopWG.Wait() _ = s.log.Sync() } @@ -435,6 +447,7 @@ func (s *Server) run() { addrTimer = time.NewTimer(peerCheckTime) peerTimer = time.NewTimer(s.ProtoTickInterval) ) + defer close(s.runFin) defer addrTimer.Stop() defer peerTimer.Stop() go s.runProto() @@ -533,6 +546,7 @@ func (s *Server) run() { // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { + defer close(s.runProtoFin) pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() @@ -1135,6 +1149,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } func (s *Server) txHandlerLoop() { + defer s.txHandlerLoopWG.Done() txloop: for { select { @@ -1651,6 +1666,7 @@ func (s *Server) broadcastTxLoop() { batchSize = 42 ) + defer close(s.broadcastTxFin) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer