Skip to content

Commit

Permalink
feat(metrics): calculate the real pending tx (#983)
Browse files Browse the repository at this point in the history
* calculate the real pending tx

* update

* move realPendingTx to miner

* update

* calculate the real pending tx by statsWithMinBaseFee

* update

* fix lint

* address comments

* add metrics to StatsWithMinBaseFee

* change read_lock to write_lock
  • Loading branch information
georgehao authored Aug 20, 2024
1 parent 4b85bbc commit 2ccacff
Showing 1 changed file with 65 additions and 36 deletions.
101 changes: 65 additions & 36 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,17 @@ var (
// that this number is pretty low, since txpool reorgs happen very frequently.
dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))

pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
realPendingGauge = metrics.NewRegisteredGauge("txpool/real_pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
realQueuedGauge = metrics.NewRegisteredGauge("txpool/real_queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)

reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)

txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil)
txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil)
statsWithMinBaseFeeTimer = metrics.NewRegisteredTimer("txpool/stats_min_base_fee", nil)
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -263,16 +266,17 @@ type TxPool struct {
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
realTxActivityShutdownCh chan struct{}
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
}
Expand All @@ -289,23 +293,24 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block

// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
realTxActivityShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -336,9 +341,27 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.wg.Add(1)
go pool.loop()

pool.wg.Add(1)
go pool.periodicallyCalculateRealTxActivity()

return pool
}

func (pool *TxPool) periodicallyCalculateRealTxActivity() {
defer pool.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pool.StatsWithMinBaseFee(pool.chain.CurrentBlock().BaseFee())
case <-pool.realTxActivityShutdownCh:
log.Info("Real tx activity calculation stopped")
return
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -372,6 +395,7 @@ func (pool *TxPool) loop() {
// System shutdown.
case <-pool.chainHeadSub.Err():
close(pool.reorgShutdownCh)
close(pool.realTxActivityShutdownCh)
return

// Handle stats reporting ticks
Expand Down Expand Up @@ -503,10 +527,12 @@ func (pool *TxPool) stats() (int, int) {
// StatsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions greater equal minBaseFee.
func (pool *TxPool) StatsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
pool.mu.RLock()
defer pool.mu.RUnlock()

return pool.statsWithMinBaseFee(minBaseFee)
statsStart := time.Now()
pool.mu.Lock()
pendingTxs, queuedTxs := pool.statsWithMinBaseFee(minBaseFee)
pool.mu.Unlock()
statsWithMinBaseFeeTimer.UpdateSince(statsStart)
return pendingTxs, queuedTxs
}

// statsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the
Expand All @@ -521,6 +547,7 @@ func (pool *TxPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
pending++
}
}
realPendingGauge.Update(int64(pending))

queued := 0
for _, list := range pool.queue {
Expand All @@ -531,6 +558,8 @@ func (pool *TxPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
queued++
}
}
realQueuedGauge.Update(int64(queued))

return pending, queued
}

Expand Down

0 comments on commit 2ccacff

Please sign in to comment.