From 3aaaf3f23254c2f04166fe50e0643e769b73af0a Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sat, 28 Oct 2023 13:36:31 -0500 Subject: [PATCH] server: Support concurrent getdata requests. Currently, each peer only supports one getdata request at a time and the inbound handler is blocked until all of the requested data is served. This means messages that would otherwise be fast to process, such as pings, can potentially be delayed for long periods of time which is clearly not ideal. Moreover, the existing behavior means that it is theoretically possible for a pair of peers to experience a situation where neither one can make progress because they're both waiting on each other to serve each other data which then would ultimately result in hitting idle timeouts and disconnection. In practice, the aforementioned live lock situation basically never happens currently due to a variety of other factors such as duplicate request filtering and not notifying peers about inventory they are known to have. However, the combination of factors that prevent the situation will not necessarily apply to newer data types such as those for mesh-based mixing. The aforementioned blocking behavior is a holdover from very early network code that relied on it once upon a time, but is no longer relevant. Thus, to improve overall throughput and address the root of the aforementioned concerns, this changes the semantics to serve getdata requests asynchronously so other inbound messages can be processed concurrently. This means the remote peer is then free to send other messages while waiting for the requested data to be served. Next, limits to the amount of concurrent getdata requests and the total maximum number of pending individual data item requests are now imposed in order to prevent peers from be able to consume copious amounts of memory and other malicious behavior this change would otherwise allow. The limits are enforced such that peers may now mix and match simultaneous getdata requests for varying amounts of data items so long as they do not exceed the maximum number of allowed simultaneous pending getdata messages or the maximum number of total overall pending individual data item requests. This approach of applying the limits on both dimensions offers more flexibility and the potential for future efficiency gains while still keeping memory usage bounded to reasonable limits and protecting against other malicious behavior. --- server.go | 414 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 253 insertions(+), 161 deletions(-) diff --git a/server.go b/server.go index dfb346a597..8d5442056c 100644 --- a/server.go +++ b/server.go @@ -94,6 +94,29 @@ const ( // submissions cached. maxCachedNaSubmissions = 20 + // These constants control the maximum number of simultaneous pending + // getdata messages and the individual data item requests they make without + // being disconnected. + // + // Since each getdata message is comprised of several individual data item + // requests, the limiting is applied on both dimensions to offer more + // flexibility while still keeping memory usage bounded to reasonable + // limits. + // + // maxConcurrentGetDataReqs is the maximum number of simultaneous pending + // getdata message requests. + // + // maxPendingGetDataItemReqs is the maximum number of overall total + // simultaneous pending individual data item requests. + // + // In other words, when combined, a peer may mix and match simultaneous + // getdata requests for varying amounts of data items so long as it does not + // exceed the maximum specified number of simultaneous pending getdata + // messages or the maximum number of total overall pending individual data + // item requests. + maxConcurrentGetDataReqs = 1000 + maxPendingGetDataItemReqs = 2 * wire.MaxInvPerMsg + // maxReorgDepthNotify specifies the maximum reorganization depth for which // winning ticket notifications will be sent over RPC. The reorg depth is // the number of blocks that would be reorganized out of the current best @@ -527,7 +550,7 @@ type serverPeer struct { connReq *connmgr.ConnReq server *server persistent bool - continueHash *chainhash.Hash + continueHash atomic.Pointer[chainhash.Hash] relayMtx sync.Mutex disableRelayTx bool isWhitelisted bool @@ -558,6 +581,17 @@ type serverPeer struct { // announcedBlock tracks the most recent block announced to this peer and is // used to filter duplicates. announcedBlock *chainhash.Hash + + // The following fields are used to serve getdata requests asynchronously as + // opposed to directly in the peer input handler. + // + // getDataQueue is a buffered channel for queueing up concurrent getdata + // requests. + // + // numPendingGetDataItemReqs tracks the total number of pending individual + // data item requests that still need to be served. + getDataQueue chan []*wire.InvVect + numPendingGetDataItemReqs atomic.Uint32 } // newServerPeer returns a new serverPeer instance. The peer needs to be set by @@ -570,9 +604,188 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { quit: make(chan struct{}), txProcessed: make(chan struct{}, 1), blockProcessed: make(chan struct{}, 1), + getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs), } } +// handleServeGetData is the primary logic for servicing queued getdata +// requests. +// +// It makes use of the given send done channel and semaphore to provide +// a little pipelining of database loads while keeping the memory usage bounded +// to reasonable limits. +// +// It is invoked from the serveGetData goroutine. +func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect, + sendDoneChan chan struct{}, semaphore chan struct{}) { + + var notFoundMsg *wire.MsgNotFound + for _, iv := range invVects { + var sendInv bool + var dataMsg wire.Message + switch iv.Type { + case wire.InvTypeTx: + // Attempt to fetch the requested transaction from the pool. A call + // could be made to check for existence first, but simply trying to + // fetch a missing transaction results in the same behavior. Do not + // allow peers to request transactions already in a block but are + // unconfirmed, as they may be expensive. Restrict that to the + // authenticated RPC only. + txHash := &iv.Hash + tx, err := sp.server.txMemPool.FetchTransaction(txHash) + if err != nil { + peerLog.Tracef("Unable to fetch tx %v from transaction pool: %v", + txHash, err) + break + } + dataMsg = tx.MsgTx() + + case wire.InvTypeBlock: + blockHash := &iv.Hash + block, err := sp.server.chain.BlockByHash(blockHash) + if err != nil { + peerLog.Tracef("Unable to fetch requested block hash %v: %v", + blockHash, err) + break + } + dataMsg = block.MsgBlock() + + // When the peer requests the final block that was advertised in + // response to a getblocks message which requested more blocks than + // would fit into a single message, it requires a new inventory + // message to trigger it to issue another getblocks message for the + // next batch of inventory. + // + // However, that inventory message should not be sent until after + // the block itself is sent, so keep a flag for later use. + // + // Note that this is to support the legacy syncing model that is no + // longer used in dcrd which is now based on a much more robust + // headers-based syncing model. Nevertheless, this behavior is + // still a required part of the getblocks protocol semantics. It + // can be removed if a future protocol upgrade also removes the + // getblocks message. + continueHash := sp.continueHash.Load() + sendInv = continueHash != nil && *continueHash == *blockHash + + default: + peerLog.Warnf("Unknown type '%d' in inventory request from %s", + iv.Type, sp) + continue + } + if dataMsg == nil { + if notFoundMsg == nil { + notFoundMsg = wire.NewMsgNotFound() + } + notFoundMsg.AddInvVect(iv) + } + + // Limit the number of items that can be queued to prevent wasting a + // bunch of memory by queuing far more data than can be sent in a + // reasonable time. The waiting occurs after the database fetch for the + // next one to provide a little pipelining. + // + // This also monitors the channel that is notified when queued messages + // are sent in order to release the semaphore without needing a separate + // monitoring goroutine. + for semAcquired := false; !semAcquired; { + select { + case <-sp.quit: + return + + case semaphore <- struct{}{}: + semAcquired = true + + case <-sendDoneChan: + // Release semaphore. + <-semaphore + } + } + + // Decrement the pending data item requests accordingly and queue the + // data to be sent to the peer. + sp.numPendingGetDataItemReqs.Add(^uint32(0)) + sp.QueueMessage(dataMsg, sendDoneChan) + + // Send a new inventory message to trigger the peer to issue another + // getblocks message for the next batch of inventory if needed. + if sendInv { + best := sp.server.chain.BestSnapshot() + invMsg := wire.NewMsgInvSizeHint(1) + iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) + invMsg.AddInvVect(iv) + sp.QueueMessage(invMsg, nil) + sp.continueHash.Store(nil) + } + } + if notFoundMsg != nil { + sp.QueueMessage(notFoundMsg, nil) + } +} + +// serveGetData provides an asynchronous queue that services all data requested +// via getdata requests such that the peer may mix and match simultaneous +// getdata requests for varying amounts of data items so long as it does not +// exceed the maximum number of simultaneous pending getdata messages or the +// maximum number of total overall pending data item requests. +// +// It must be run in a goroutine. +func (sp *serverPeer) serveGetData() { + // Allow a max number of items to be loaded from the database/mempool and + // queued for send. + const maxPendingSend = 3 + sendDoneChan := make(chan struct{}, maxPendingSend+1) + semaphore := make(chan struct{}, maxPendingSend) + + for { + select { + case <-sp.quit: + return + + case invVects := <-sp.getDataQueue: + sp.handleServeGetData(invVects, sendDoneChan, semaphore) + + // Release the semaphore as queued messages are sent. + case <-sendDoneChan: + <-semaphore + } + } +} + +// Run starts additional async processing for the peer and blocks until the peer +// disconnects at which point it notifies the server and net sync manager that +// the peer has disconnected and performs other associated cleanup such as +// evicting any remaining orphans sent by the peer and shutting down all +// goroutines. +func (sp *serverPeer) Run() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + sp.serveGetData() + wg.Done() + }() + + // Wait for the peer to disconnect and notify the net sync manager and + // server accordingly. + sp.WaitForDisconnect() + srvr := sp.server + srvr.DonePeer(sp) + srvr.syncManager.PeerDisconnected(sp.syncMgrPeer) + + if sp.VersionKnown() { + // Evict any remaining orphans that were sent by the peer. + numEvicted := srvr.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) + if numEvicted > 0 { + srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted, + pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID()) + } + } + + // Shutdown remaining peer goroutines. + close(sp.quit) + wg.Wait() +} + // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) { @@ -1132,8 +1345,8 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { sp.server.syncManager.QueueHeaders(msg, sp.syncMgrPeer) } -// handleGetData is invoked when a peer receives a getdata wire message and is -// used to deliver block and transaction information. +// OnGetData is invoked when a peer receives a getdata wire message and is used +// to deliver block and transaction information. func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { // Ban peers sending empty getdata requests. if len(msg.InvList) == 0 { @@ -1141,74 +1354,47 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { return } - numAdded := 0 - notFound := wire.NewMsgNotFound() - - length := len(msg.InvList) // A decaying ban score increase is applied to prevent exhausting resources // with unusually large inventory queries. + // // Requesting more than the maximum inventory vector length within a short - // period of time yields a score above the default ban threshold. Sustained + // period of time yields a score above the default ban threshold. Sustained // bursts of small requests are not penalized as that would potentially ban - // peers performing IBD. + // peers performing the inintial chain sync. + // // This incremental score decays each minute to half of its value. - if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") { + numNewReqs := uint32(len(msg.InvList)) + if sp.addBanScore(0, numNewReqs*99/wire.MaxInvPerMsg, "getdata") { return } - // We wait on this wait channel periodically to prevent queuing - // far more data than we can send in a reasonable time, wasting memory. - // The waiting occurs after the database fetch for the next one to - // provide a little pipelining. - var waitChan chan struct{} - doneChan := make(chan struct{}, 1) - - for i, iv := range msg.InvList { - var c chan struct{} - // If this will be the last message we send. - if i == length-1 && len(notFound.InvList) == 0 { - c = doneChan - } else if (i+1)%3 == 0 { - // Buffered so as to not make the send goroutine block. - c = make(chan struct{}, 1) - } - var err error - switch iv.Type { - case wire.InvTypeTx: - err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan) - case wire.InvTypeBlock: - err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan) - default: - peerLog.Warnf("Unknown type '%d' in inventory request from %s", - iv.Type, sp) - continue - } - if err != nil { - notFound.AddInvVect(iv) - - // When there is a failure fetching the final entry - // and the done channel was sent in due to there - // being no outstanding not found inventory, consume - // it here because there is now not found inventory - // that will use the channel momentarily. - if i == len(msg.InvList)-1 && c != nil { - <-c - } - } - numAdded++ - waitChan = c + // Prevent too many outstanding requests while still allowing the + // flexibility to send multiple simultaneous getdata requests that are + // served asynchronously. + numPendingGetDataReqs := len(sp.getDataQueue) + if numPendingGetDataReqs+1 > maxConcurrentGetDataReqs { + peerLog.Debugf("%s exceeded max allowed concurrent pending getdata "+ + "requests (max %d) -- disconnecting", sp, maxConcurrentGetDataReqs) + sp.Disconnect() + return } - if len(notFound.InvList) != 0 { - sp.QueueMessage(notFound, doneChan) + numPendingDataItemReqs := sp.numPendingGetDataItemReqs.Load() + if numPendingDataItemReqs+numNewReqs > maxPendingGetDataItemReqs { + peerLog.Debugf("%s exceeded max allowed pending data item requests "+ + "(new %d, pending %d, max %d) -- disconnecting", sp, numNewReqs, + numPendingDataItemReqs, maxPendingGetDataItemReqs) + sp.Disconnect() + return } - // Wait for messages to be sent. We can send quite a lot of data at this - // point and this will keep the peer busy for a decent amount of time. - // We don't process anything else by them in this time so that we - // have an idea of when we should hear back from them - else the idle - // timeout could fire when we were only half done sending the blocks. - if numAdded > 0 { - <-doneChan + // Queue the data requests to be served asynchronously. Note that this will + // not block due to the use of a buffered channel and the checks above that + // disconnect the peer when the new request would otherwise exceed the + // capacity. + sp.numPendingGetDataItemReqs.Add(numNewReqs) + select { + case <-sp.quit: + case sp.getDataQueue <- msg.InvList: } } @@ -1246,7 +1432,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // would prevent the entire slice from being eligible // for GC as soon as it's sent. continueHash := invMsg.InvList[invListLen-1].Hash - sp.continueHash = &continueHash + sp.continueHash.Store(&continueHash) } sp.QueueMessage(invMsg, nil) } @@ -1597,81 +1783,6 @@ func (s *server) TransactionConfirmed(tx *dcrutil.Tx) { } } -// pushTxMsg sends a tx message for the provided transaction hash to the -// connected peer. An error is returned if the transaction hash is not known. -func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { - // Attempt to fetch the requested transaction from the pool. A - // call could be made to check for existence first, but simply trying - // to fetch a missing transaction results in the same behavior. - // Do not allow peers to request transactions already in a block - // but are unconfirmed, as they may be expensive. Restrict that - // to the authenticated RPC only. - tx, err := s.txMemPool.FetchTransaction(hash) - if err != nil { - peerLog.Tracef("Unable to fetch tx %v from transaction "+ - "pool: %v", hash, err) - - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - // Once we have fetched data wait for any previous operation to finish. - if waitChan != nil { - <-waitChan - } - - sp.QueueMessage(tx.MsgTx(), doneChan) - - return nil -} - -// pushBlockMsg sends a block message for the provided block hash to the -// connected peer. An error is returned if the block hash is not known. -func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { - block, err := sp.server.chain.BlockByHash(hash) - if err != nil { - peerLog.Tracef("Unable to fetch requested block hash %v: %v", - hash, err) - - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - // Once we have fetched data wait for any previous operation to finish. - if waitChan != nil { - <-waitChan - } - - // We only send the channel for this message if we aren't sending - // an inv straight after. - var dc chan<- struct{} - continueHash := sp.continueHash - sendInv := continueHash != nil && continueHash.IsEqual(hash) - if !sendInv { - dc = doneChan - } - sp.QueueMessage(block.MsgBlock(), dc) - - // When the peer requests the final block that was advertised in - // response to a getblocks message which requested more blocks than - // would fit into a single message, send it a new inventory message - // to trigger it to issue another getblocks message for the next - // batch of inventory. - if sendInv { - best := sp.server.chain.BestSnapshot() - invMsg := wire.NewMsgInvSizeHint(1) - iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) - invMsg.AddInvVect(iv) - sp.QueueMessage(invMsg, doneChan) - sp.continueHash = nil - } - return nil -} - // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { @@ -2225,21 +2336,22 @@ func newPeerConfig(sp *serverPeer) *peer.Config { // inboundPeerConnected is invoked by the connection manager when a new inbound // connection is established. It initializes a new inbound server peer -// instance, associates it with the connection, and starts a goroutine to wait -// for disconnection. +// instance, associates it with the connection, and starts all additional server +// peer processing goroutines. func (s *server) inboundPeerConnected(conn net.Conn) { sp := newServerPeer(s, false) sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.syncMgrPeer = netsync.NewPeer(sp.Peer) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + go sp.Run() } // outboundPeerConnected is invoked by the connection manager when a new // outbound connection is established. It initializes a new outbound server // peer instance, associates it with the relevant state such as the connection -// request instance and the connection itself. +// request instance and the connection itself, and start all additional server +// peer processing goroutines. func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp := newServerPeer(s, c.Permanent) p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) @@ -2253,27 +2365,7 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) -} - -// peerDoneHandler handles peer disconnects by notifying the server that it's -// done along with other performing other desirable cleanup. -func (s *server) peerDoneHandler(sp *serverPeer) { - sp.WaitForDisconnect() - s.DonePeer(sp) - - // Notify the net sync manager the peer is gone. - s.syncManager.PeerDisconnected(sp.syncMgrPeer) - - if sp.VersionKnown() { - // Evict any remaining orphans that were sent by the peer. - numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) - if numEvicted > 0 { - srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted, - pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID()) - } - } - close(sp.quit) + go sp.Run() } // peerHandler is used to handle peer operations such as adding and removing