diff --git a/core/tx_pool.go b/core/tx_pool.go index ed8b604a034d..14124e7633b7 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -123,14 +123,17 @@ var ( // that this number is pretty low, since txpool reorgs happen very frequently. dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015)) - pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) - queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) - slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + realPendingGauge = metrics.NewRegisteredGauge("txpool/real_pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + realQueuedGauge = metrics.NewRegisteredGauge("txpool/real_queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) + slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) - txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil) + txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil) + statsWithMinBaseFeeTimer = metrics.NewRegisteredTimer("txpool/stats_min_base_fee", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -263,16 +266,17 @@ type TxPool struct { all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - reqResetCh chan *txpoolResetRequest - reqPromoteCh chan *accountSet - queueTxEventCh chan *types.Transaction - reorgDoneCh chan chan struct{} - reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - reorgPauseCh chan bool // requests to pause scheduleReorgLoop - wg sync.WaitGroup // tracks loop, scheduleReorgLoop - initDoneCh chan struct{} // is closed once the pool is initialized (for tests) + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + reorgPauseCh chan bool // requests to pause scheduleReorgLoop + realTxActivityShutdownCh chan struct{} + wg sync.WaitGroup // tracks loop, scheduleReorgLoop + initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. } @@ -289,23 +293,24 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.LatestSigner(chainconfig), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - reqResetCh: make(chan *txpoolResetRequest), - reqPromoteCh: make(chan *accountSet), - queueTxEventCh: make(chan *types.Transaction), - reorgDoneCh: make(chan chan struct{}), - reorgShutdownCh: make(chan struct{}), - reorgPauseCh: make(chan bool), - initDoneCh: make(chan struct{}), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.LatestSigner(chainconfig), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + realTxActivityShutdownCh: make(chan struct{}), + reorgPauseCh: make(chan bool), + initDoneCh: make(chan struct{}), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -336,9 +341,27 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pool.wg.Add(1) go pool.loop() + pool.wg.Add(1) + go pool.periodicallyCalculateRealTxActivity() + return pool } +func (pool *TxPool) periodicallyCalculateRealTxActivity() { + defer pool.wg.Done() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pool.StatsWithMinBaseFee(pool.chain.CurrentBlock().BaseFee()) + case <-pool.realTxActivityShutdownCh: + log.Info("Real tx activity calculation stopped") + return + } + } +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -372,6 +395,7 @@ func (pool *TxPool) loop() { // System shutdown. case <-pool.chainHeadSub.Err(): close(pool.reorgShutdownCh) + close(pool.realTxActivityShutdownCh) return // Handle stats reporting ticks @@ -503,10 +527,12 @@ func (pool *TxPool) stats() (int, int) { // StatsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions greater equal minBaseFee. func (pool *TxPool) StatsWithMinBaseFee(minBaseFee *big.Int) (int, int) { - pool.mu.RLock() - defer pool.mu.RUnlock() - - return pool.statsWithMinBaseFee(minBaseFee) + statsStart := time.Now() + pool.mu.Lock() + pendingTxs, queuedTxs := pool.statsWithMinBaseFee(minBaseFee) + pool.mu.Unlock() + statsWithMinBaseFeeTimer.UpdateSince(statsStart) + return pendingTxs, queuedTxs } // statsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the @@ -521,6 +547,7 @@ func (pool *TxPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) { pending++ } } + realPendingGauge.Update(int64(pending)) queued := 0 for _, list := range pool.queue { @@ -531,6 +558,8 @@ func (pool *TxPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) { queued++ } } + realQueuedGauge.Update(int64(queued)) + return pending, queued }