Skip to content

Commit

Permalink
Merge branch 'develop' into omerfirmak/reorging-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo authored Aug 27, 2024
2 parents 23f5e49 + 77a5659 commit 003d711
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 1 deletion.
3 changes: 3 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,9 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
}
return true // continue in the for-each
})

log.Debug("Scheduling transaction retrieval", "peer", peer, "len(f.announces[peer])", len(f.announces[peer]), "len(hashes)", len(hashes))

// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
Expand Down
2 changes: 2 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
directPeers++
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
log.Debug("Transactions being broadcasted to", "peer", peer.String(), "len", len(hashes))
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
log.Debug("Transactions being announced to", "peer", peer.String(), "len", len(hashes))
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
Expand Down
9 changes: 9 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
)

const (
Expand Down Expand Up @@ -92,10 +93,13 @@ func (p *Peer) broadcastTransactions() {
if len(txs) > 0 {
done = make(chan struct{})
go func() {
log.Debug("Sending transactions", "count", len(txs))
if err := p.SendTransactions(txs); err != nil {
log.Debug("Sending transactions", "count", len(txs), "err", err)
fail <- err
return
}
log.Debug("Sent transactions", "count", len(txs))
close(done)
p.Log().Trace("Sent transactions", "count", len(txs))
}()
Expand All @@ -110,6 +114,7 @@ func (p *Peer) broadcastTransactions() {
}
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in broadcastTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxs", maxQueuedTxs)
if len(queue) > maxQueuedTxs {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
Expand Down Expand Up @@ -159,10 +164,13 @@ func (p *Peer) announceTransactions() {
if len(pending) > 0 {
done = make(chan struct{})
go func() {
log.Debug("Sending transaction announcements", "count", len(pending))
if err := p.sendPooledTransactionHashes(pending); err != nil {
log.Debug("Sending transaction announcements", "count", len(pending), "err", err)
fail <- err
return
}
log.Debug("Sent transaction announcements", "count", len(pending))
close(done)
p.Log().Trace("Sent transaction announcements", "count", len(pending))
}()
Expand All @@ -177,6 +185,7 @@ func (p *Peer) announceTransactions() {
}
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
log.Debug("Queue size in announceTransactions", "len(hashes)", len(hashes), "len(queue)", len(queue), "maxQueuedTxAnns", maxQueuedTxAnns)
if len(queue) > maxQueuedTxAnns {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
Expand Down
9 changes: 9 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,11 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
}
ann := new(NewPooledTransactionHashesPacket)
if err := msg.Decode(ann); err != nil {
log.Debug("Failed to decode `NewPooledTransactionHashesPacket`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
// Schedule all the unknown hashes for retrieval
log.Debug("handleNewPooledTransactionHashes", "peer", peer.String(), "len(ann)", len(*ann))
for _, hash := range *ann {
peer.markTransaction(hash)
}
Expand All @@ -336,9 +338,11 @@ func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) err
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket66
if err := msg.Decode(&query); err != nil {
log.Debug("Failed to decode `GetPooledTransactionsPacket66`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsPacket, peer)
log.Debug("handleGetPooledTransactions", "peer", peer.String(), "RequestId", query.RequestId, "len(query)", len(query.GetPooledTransactionsPacket), "retrieved", len(hashes))
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
}

Expand Down Expand Up @@ -378,11 +382,14 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil {
log.Debug("Failed to decode `TransactionsPacket`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
log.Debug("handleTransactions", "peer", peer.String(), "len(txs)", len(txs))
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
log.Debug("handleTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
Expand All @@ -398,11 +405,13 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
// Transactions can be processed, parse all of them and deliver to the pool
var txs PooledTransactionsPacket66
if err := msg.Decode(&txs); err != nil {
log.Debug("Failed to decode `PooledTransactionsPacket66`", "peer", peer.String(), "err", err)
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
for i, tx := range txs.PooledTransactionsPacket {
// Validate and mark the remote transaction
if tx == nil {
log.Debug("handlePooledTransactions: transaction is nil", "peer", peer.String(), "i", i)
return fmt.Errorf("%w: transaction %d is nil", errDecode, i)
}
peer.markTransaction(tx.Hash())
Expand Down
3 changes: 3 additions & 0 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/p2p"
"github.com/scroll-tech/go-ethereum/rlp"
)
Expand Down Expand Up @@ -419,6 +420,8 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
id := rand.Uint64()

log.Debug("Requesting transactions", "RequestId", id, "Peer.id", p.id, "count", len(hashes))

requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: id,
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 7 // Minor version component of the current release
VersionPatch = 1 // Patch version component of the current release
VersionPatch = 2 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down

0 comments on commit 003d711

Please sign in to comment.