From c860387747bad500086ac83d1c851696c9cb6f85 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 10 Sep 2024 14:35:37 -0500 Subject: [PATCH] netsync: Track peer for requested blocks. This modifies the map that tracks requests for blocks to keep track of which peer the block was requested from. This is currently not used since blocks are only downloaded from a single sync peer, but it helps pave the way to handling downloading blocks from multiple peers in parallel. --- internal/netsync/manager.go | 40 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index 27188df5a..8eb331991 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -338,7 +338,7 @@ type SyncManager struct { rejectedTxns *apbf.Filter rejectedMixMsgs *apbf.Filter requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]*Peer requestedMixMsgs map[chainhash.Hash]struct{} progressLogger *progresslog.Logger syncPeer *Peer @@ -424,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() { } } +// isRequestedBlock returns whether or not the given block hash has already been +// requested from any remote peer. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool { + _, ok := m.requestedBlocks[*hash] + return ok +} + // fetchNextBlocks creates and sends a request to the provided peer for the next // blocks to be downloaded based on the current headers. func (m *SyncManager) fetchNextBlocks(peer *Peer) { @@ -458,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) { // Skip blocks that have already been requested. The needed blocks // might have been updated above thereby potentially repopulating some // blocks that are still in flight. - if _, ok := m.requestedBlocks[*hash]; ok { + if m.isRequestedBlock(hash) { continue } iv := wire.NewInvVect(wire.InvTypeBlock, hash) - m.requestedBlocks[*hash] = struct{}{} + m.requestedBlocks[*hash] = peer peer.requestedBlocks[*hash] = struct{}{} gdmsg.AddInvVect(iv) } @@ -1454,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // Skip the block when it has already been requested or is otherwise // already known. hash := &headerHashes[i] - _, isRequestedBlock := m.requestedBlocks[*hash] - if isRequestedBlock || chain.HaveBlock(hash) { + if m.isRequestedBlock(hash) || chain.HaveBlock(hash) { continue } + // Stop requesting when the request would exceed the max size of the + // map used to track requests. + if len(m.requestedBlocks)+1 > maxRequestedBlocks { + break + } + + m.requestedBlocks[*hash] = peer + peer.requestedBlocks[*hash] = struct{}{} iv := wire.NewInvVect(wire.InvTypeBlock, hash) - limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks) - limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks) gdmsg.AddInvVect(iv) } if len(gdmsg.InvList) > 0 { @@ -1920,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, // Add the blocks to the request. msgResp := wire.NewMsgGetData() for i := range blocks { - // If we've already requested this block, skip it. + // Skip the block when it has already been requested. bh := &blocks[i] - _, alreadyReqP := peer.requestedBlocks[*bh] - _, alreadyReqB := m.requestedBlocks[*bh] - - if alreadyReqP || alreadyReqB { + if m.isRequestedBlock(bh) { continue } @@ -1942,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes, } peer.requestedBlocks[*bh] = struct{}{} - m.requestedBlocks[*bh] = struct{}{} + m.requestedBlocks[*bh] = peer } addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error { @@ -2169,7 +2181,7 @@ func New(config *Config) *SyncManager { rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate), rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate), requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]*Peer), requestedMixMsgs: make(map[chainhash.Hash]struct{}), peers: make(map[*Peer]struct{}), minKnownWork: minKnownWork,