From 01dfc8502f9bae9b659ff94f64bd9974dc05e7e2 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 10 Sep 2024 11:21:02 -0500 Subject: [PATCH 1/2] netsync: Track best known blocks per peer. This reworks the net sync manager block announcement handling to keep track of the best known block announced by each peer as determined by having the most cumulative proof of work. The primary motivation is to provide a relatively efficient mechanism to discover which blocks are available to download from each peer to eventually support downloading multiple blocks in parallel. It also doubles to increase robustness of best height reporting of each peer once the initial headers sync process is complete since the values are only updated when the header has more cumulative proof of work versus simply having a larger height since, although exceedingly rare in practice, it is possible for a chain with fewer blocks to have more cumulative work. --- internal/netsync/manager.go | 215 ++++++++++++++++++++++++++---------- 1 file changed, 158 insertions(+), 57 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index d13a1267f..3f0911668 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -223,7 +223,26 @@ type Peer struct { // longer useful or are otherwise being malicious. numConsecutiveOrphanHeaders int32 - lastAnnouncedBlock *chainhash.Hash + // These fields are used to track the best known block announced by the peer + // which in turn provides a means to discover which blocks are available to + // download from the peer. + // + // announcedOrphanBlock is the hash of the most recently announced block + // that did not connect to any headers known to the local chain at the time + // of the announcement. It is tracked because such announcements are + // typically for newly found blocks whose parent headers will eventually + // become known and therefore have a fairly good chance of becoming the + // block with the most cumulative proof of work that the peer has announced. + // + // bestAnnouncedBlock is the hash of the block with the most cumulative + // proof of work that the peer has announced that is also known to the local + // chain. + // + // bestAnnouncedWork is the cumulative proof of work for the associated best + // announced block hash. + announcedOrphanBlock *chainhash.Hash + bestAnnouncedBlock *chainhash.Hash + bestAnnouncedWork *uint256.Uint256 } // NewPeer returns a new instance of a peer that wraps the provided underlying @@ -649,6 +668,21 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) { m.peers[peer] = struct{}{} + // Request headers starting from the parent of the best known header for the + // local chain immediately when the initial headers sync process is complete + // and the peer is a sync candidate. + // + // This primarily serves two purposes: + // + // 1) It immediately discovers any blocks that are not already known + // 2) It provides accurate discovery of the best known block of the peer + // + // Note that the parent is used because the request would otherwise result + // in an empty response when both the local and remote tips are the same. + if peer.syncCandidate && m.hdrSyncState.headersSynced { + m.fetchNextHeaders(peer) + } + // Start syncing by choosing the best candidate if needed. if peer.syncCandidate && m.syncPeer == nil { m.startSync() @@ -891,6 +925,55 @@ func (m *SyncManager) maybeUpdateIsCurrent() { } } +// maybeUpdateBestAnnouncedBlock potentially updates the block with the most +// cumulative proof of work that the given peer has announced which includes its +// associated hash, cumulative work sum, and height. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) maybeUpdateBestAnnouncedBlock(p *Peer, hash *chainhash.Hash, header *wire.BlockHeader) { + chain := m.cfg.Chain + workSum, err := chain.ChainWork(hash) + if err != nil { + return + } + + // Update the best block and associated values when the cumulative work for + // given block exceeds that of the current best known block for the peer. + if p.bestAnnouncedWork == nil || workSum.Gt(p.bestAnnouncedWork) { + p.bestAnnouncedBlock = hash + p.bestAnnouncedWork = &workSum + p.UpdateLastBlockHeight(int64(header.Height)) + } +} + +// maybeResolveOrphanBlock potentially resolves the most recently announced +// block by the peer that did not connect to any headers known to the local +// chain at the time of the announcement by checking if it is now known and, +// when it is, potentially making it the block with the most cumulative proof of +// work announced by the peer if needed. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) maybeResolveOrphanBlock(p *Peer) { + // Nothing to do if there isn't a pending orphan block announcement that has + // not yet been resolved or the block still isn't known. + chain := m.cfg.Chain + blockHash := p.announcedOrphanBlock + if blockHash == nil || !chain.HaveHeader(blockHash) { + return + } + + // The block has now been resolved, so potentially make it the block with + // the most cumulative proof of work announced by the peer. + header, err := chain.HeaderByHash(blockHash) + if err != nil { + log.Warnf("Unable to retrieve known good header %s: %v", blockHash, err) + return + } + m.maybeUpdateBestAnnouncedBlock(p, blockHash, &header) +} + // processBlock processes the provided block using the internal chain instance. // // When no errors occurred during processing, the first return value indicates @@ -1057,30 +1140,6 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { m.cfg.MixPool.ExpireMessagesInBackground(header.Height) } - // Update the latest block height for the peer to avoid stale heights when - // looking for future potential sync node candidacy. - // - // Also, when the chain is considered current and the block was accepted to - // the main chain, update the heights of other peers whose invs may have - // been ignored when actively syncing while the chain was not yet current or - // lost the lock announcement race. - blockHeight := int64(header.Height) - peer.UpdateLastBlockHeight(blockHeight) - if onMainChain && m.IsCurrent() { - for p := range m.peers { - // The height for the sending peer is already updated. - if p == peer { - continue - } - - lastAnnBlock := p.lastAnnouncedBlock - if lastAnnBlock != nil && *lastAnnBlock == *blockHash { - p.UpdateLastBlockHeight(blockHeight) - p.lastAnnouncedBlock = nil - } - } - } - // Request more blocks using the headers when the request queue is getting // short. if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks { @@ -1167,21 +1226,15 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock) headersSynced := m.hdrSyncState.headersSynced if !firstHeaderConnects { - // Ignore headers that do not connect to any known headers when the - // initial headers sync is taking place. It is expected that headers - // will be announced that are not yet known. - if !headersSynced { - return - } - // Attempt to detect block announcements which do not connect to any // known headers and request any headers starting from the best header // the local chain knows in order to (hopefully) discover the missing - // headers. + // headers unless the initial headers sync process is still in progress. // // Meanwhile, also keep track of how many times the peer has - // consecutively sent a headers message that does not connect and - // disconnect it once the max allowed threshold has been reached. + // consecutively sent a headers message that looks like an announcement + // that does not connect and disconnect it once the max allowed + // threshold has been reached. if numHeaders < maxExpectedHeaderAnnouncementsPerMsg { peer.numConsecutiveOrphanHeaders++ if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders { @@ -1189,23 +1242,50 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { "not connect from peer %s -- disconnecting", peer.numConsecutiveOrphanHeaders, peer) peer.Disconnect() + return } - log.Debugf("Requesting missing parents for header %s (height %d) "+ - "received from peer %s", firstHeaderHash, firstHeader.Height, - peer) - bestHeaderHash, _ := chain.BestHeader() - blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) - locator := chainBlockLocatorToHashes(blkLocator) - peer.PushGetHeadersMsg(locator, &zeroHash) + if headersSynced { + log.Debugf("Requesting missing parents for header %s (height "+ + "%d) received from peer %s", firstHeaderHash, + firstHeader.Height, peer) + bestHeaderHash, _ := chain.BestHeader() + blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) + } + // Track the final announced header as the most recently announced + // block by the peer that does not connect to any headers known to + // the local chain since there is a good chance it will eventually + // become known either from this peer or others. + m.maybeResolveOrphanBlock(peer) + finalHeader := headers[len(headers)-1] + finalHeaderHash := finalHeader.BlockHash() + peer.announcedOrphanBlock = &finalHeaderHash + + // Update the latest block height for the peer to avoid stale + // heights when looking for future potential header sync node + // candidacy when the initial headers sync process is still in + // progess. + if !headersSynced { + peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + } return } - // The initial headers sync process is done and this does not appear to - // be a block announcement, so disconnect the peer. - log.Debugf("Received orphan header from peer %s -- disconnecting", peer) - peer.Disconnect() + // Disconnect the peer when the initial headers sync process is done and + // this does not appear to be a block announcement. + if headersSynced { + log.Debugf("Received orphan header from peer %s -- disconnecting", + peer) + peer.Disconnect() + return + } + + // Ignore headers that do not connect to any known headers when the + // initial headers sync is taking place. It is expected that headers + // will be announced that are not yet known. return } @@ -1273,12 +1353,13 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // of the provided headers are successfully processed above. peer.numConsecutiveOrphanHeaders = 0 - // Update the last announced block to the final one in the announced headers - // above and update the height for the peer too. + // Potentially resolve a previously unknown announced block and then update + // the block with the most cumulative proof of work the peer has announced + // to the final announced header if needed. finalHeader := headers[len(headers)-1] finalReceivedHash := &headerHashes[len(headerHashes)-1] - peer.lastAnnouncedBlock = finalReceivedHash - peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + m.maybeResolveOrphanBlock(peer) + m.maybeUpdateBestAnnouncedBlock(peer, finalReceivedHash, finalHeader) // Update the sync height if the new best known header height exceeds it. syncHeight := m.SyncHeight() @@ -1335,6 +1416,18 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Info("Syncing chain") m.progressLogger.SetLastLogTime(time.Now()) + // Request headers starting from the parent of the best known header + // for the local chain from any sync candidates that have not yet + // had their best known block discovered now that the initial + // headers sync process is complete. + for peer := range m.peers { + m.maybeResolveOrphanBlock(peer) + if !peer.syncCandidate || peer.bestAnnouncedBlock != nil { + continue + } + m.fetchNextHeaders(peer) + } + // Potentially update whether the chain believes it is current now // that the headers are synced. chain.MaybeUpdateIsCurrent() @@ -1534,15 +1627,23 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { } if lastBlock != nil { - // Update the last announced block to the final one in the announced - // inventory above (if any). In the case the header for that block is - // already known, use that information to update the height for the peer - // too. - peer.lastAnnouncedBlock = &lastBlock.Hash - if isCurrent { + // Determine if the final announced block is already known to the local + // chain and then either track it as the most recently announced + // block by the peer that does not connect to any headers known to the + // local chain or potentially make it the block with the most cumulative + // proof of work announced by the peer when it is already known. + if !m.cfg.Chain.HaveHeader(&lastBlock.Hash) { + // Notice a copy of the hash is made here to avoid keeping a + // reference into the inventory vector which would prevent it from + // being GCd. + lastBlockHash := lastBlock.Hash + m.maybeResolveOrphanBlock(peer) + peer.announcedOrphanBlock = &lastBlockHash + } else { header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash) if err == nil { - peer.UpdateLastBlockHeight(int64(header.Height)) + m.maybeResolveOrphanBlock(peer) + m.maybeUpdateBestAnnouncedBlock(peer, &lastBlock.Hash, &header) } } } From d2e603be0a8a08c4ab90af32f0512e2c501c541d Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 10 Sep 2024 14:35:37 -0500 Subject: [PATCH 2/2] netsync: Track peer for requested blocks. This modifies the map that tracks requests for blocks to keep track of which peer the block was requested from. This is currently not used since blocks are only downloaded from a single sync peer, but it helps pave the way to handling downloading blocks from multiple peers in parallel. --- internal/netsync/manager.go | 40 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 3f0911668..2b3504101 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -338,7 +338,7 @@ type SyncManager struct { rejectedTxns *apbf.Filter rejectedMixMsgs *apbf.Filter requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]*Peer requestedMixMsgs map[chainhash.Hash]struct{} progressLogger *progresslog.Logger syncPeer *Peer @@ -424,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() { } } +// isRequestedBlock returns whether or not the given block hash has already been +// requested from any remote peer. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool { + _, ok := m.requestedBlocks[*hash] + return ok +} + // fetchNextBlocks creates and sends a request to the provided peer for the next // blocks to be downloaded based on the current headers. func (m *SyncManager) fetchNextBlocks(peer *Peer) { @@ -458,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) { // Skip blocks that have already been requested. The needed blocks // might have been updated above thereby potentially repopulating some // blocks that are still in flight. - if _, ok := m.requestedBlocks[*hash]; ok { + if m.isRequestedBlock(hash) { continue } iv := wire.NewInvVect(wire.InvTypeBlock, hash) - m.requestedBlocks[*hash] = struct{}{} + m.requestedBlocks[*hash] = peer peer.requestedBlocks[*hash] = struct{}{} gdmsg.AddInvVect(iv) } @@ -1454,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // Skip the block when it has already been requested or is otherwise // already known. hash := &headerHashes[i] - _, isRequestedBlock := m.requestedBlocks[*hash] - if isRequestedBlock || chain.HaveBlock(hash) { + if m.isRequestedBlock(hash) || chain.HaveBlock(hash) { continue } + // Stop requesting when the request would exceed the max size of the + // map used to track requests. + if len(m.requestedBlocks)+1 > maxRequestedBlocks { + break + } + + m.requestedBlocks[*hash] = peer + peer.requestedBlocks[*hash] = struct{}{} iv := wire.NewInvVect(wire.InvTypeBlock, hash) - limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) } if len(gdmsg.InvList) > 0 { @@ -1920,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, // Add the blocks to the request. msgResp := wire.NewMsgGetData() for i := range blocks { - // If we've already requested this block, skip it. + // Skip the block when it has already been requested. bh := &blocks[i] - _, alreadyReqP := peer.requestedBlocks[*bh] - _, alreadyReqB := m.requestedBlocks[*bh] - - if alreadyReqP || alreadyReqB { + if m.isRequestedBlock(bh) { continue } @@ -1942,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, } peer.requestedBlocks[*bh] = struct{}{} - m.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = peer } addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error { @@ -2169,7 +2181,7 @@ func New(config *Config) *SyncManager { rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]*Peer), requestedMixMsgs: make(map[chainhash.Hash]struct{}), peers: make(map[*Peer]struct{}), minKnownWork: minKnownWork,