diff --git a/Cargo.lock b/Cargo.lock index 3afb1ce88..3ecf6d76f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1392,7 +1392,6 @@ dependencies = [ "ethers-providers", "ethers-signers", "ethers-solc", - "futures-channel", "futures-locks", "futures-util", "hex", diff --git a/ethers-middleware/Cargo.toml b/ethers-middleware/Cargo.toml index 8f678f8af..b53788bf8 100644 --- a/ethers-middleware/Cargo.toml +++ b/ethers-middleware/Cargo.toml @@ -27,7 +27,6 @@ serde = { version = "1.0.124", default-features = false, features = ["derive"] } thiserror = { version = "1.0", default-features = false } futures-util = { version = "^0.3" } futures-locks = { version = "0.7", default-features = false } -futures-channel.workspace = "0.3.28" tracing = { version = "0.1.37", default-features = false } tracing-futures = { version = "0.2.5", default-features = false } diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 8a7619c30..d765cd0c0 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -6,8 +6,7 @@ pub use linear::LinearGasPrice; use async_trait::async_trait; -use futures_channel::oneshot; -use futures_util::{lock::Mutex, select_biased}; +use futures_util::lock::Mutex; use instant::Instant; use std::{pin::Pin, sync::Arc}; use thiserror::Error; @@ -60,7 +59,6 @@ pub(crate) struct GasEscalatorMiddlewareInternal { /// The transactions which are currently being monitored for escalation #[allow(clippy::type_complexity)] pub txs: ToEscalate, - _background: oneshot::Sender<()>, } /// A Gas escalator allows bumping transactions' gas price to avoid getting them @@ -192,18 +190,14 @@ where E: GasEscalator + 'static, M: 'static, { - let (tx, rx) = oneshot::channel(); let inner = Arc::new(inner); let txs: ToEscalate = Default::default(); - let this = Arc::new(GasEscalatorMiddlewareInternal { - inner: inner.clone(), - txs: txs.clone(), - _background: tx, - }); + let this = + Arc::new(GasEscalatorMiddlewareInternal { inner: inner.clone(), txs: txs.clone() }); - let esc = EscalationTask { inner, escalator, frequency, txs, shutdown: rx }; + let esc = EscalationTask { inner, escalator, frequency, txs }; { spawn(esc.escalate().instrument(tracing::debug_span!("gas-escalation"))); @@ -220,19 +214,12 @@ pub struct EscalationTask { escalator: E, frequency: Frequency, txs: ToEscalate, - shutdown: oneshot::Receiver<()>, } #[cfg(not(target_arch = "wasm32"))] impl EscalationTask { - pub fn new( - inner: M, - escalator: E, - frequency: Frequency, - txs: ToEscalate, - shutdown: oneshot::Receiver<()>, - ) -> Self { - Self { inner, escalator, frequency, txs, shutdown } + pub fn new(inner: M, escalator: E, frequency: Frequency, txs: ToEscalate) -> Self { + Self { inner, escalator, frequency, txs } } async fn escalate(mut self) -> Result<(), GasEscalatorError> @@ -254,97 +241,91 @@ impl EscalationTask { let mut watcher = watcher.fuse(); - loop { - select_biased! { - _ = &mut self.shutdown => { - tracing::debug!("Shutting down escalation task, middleware has gone away"); - return Ok(()) - } - opt = watcher.next() => { - if opt.is_none() { - tracing::error!("timing future has gone away"); - return Ok(()); - } - let now = Instant::now(); - - // We take the contents of the mutex, and then add them back in - // later. - let mut txs: Vec<_> = { - let mut txs = self.txs.lock().await; - std::mem::take(&mut (*txs)) - // Lock scope ends - }; - - let len = txs.len(); - // Pop all transactions and re-insert those that have not been included yet - for _ in 0..len { - // this must never panic as we're explicitly within bounds - let (old_tx_hash, mut replacement_tx, time, priority) = - txs.pop().expect("should have element in vector"); - - let receipt = self - .inner - .get_transaction_receipt(old_tx_hash) - .await - .map_err(GasEscalatorError::MiddlewareError)?; - - tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists"); - - if receipt.is_none() { - let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); - // Get the new gas price based on how much time passed since the - // tx was last broadcast - let new_gas_price = self - .escalator - .get_gas_price(old_gas_price, now.duration_since(time).as_secs()); - - let new_txhash = if new_gas_price == old_gas_price { - old_tx_hash - } else { - // bump the gas price - replacement_tx.gas_price = Some(new_gas_price); - - // the tx hash will be different so we need to update it - match self.inner.send_transaction(replacement_tx.clone(), priority).await { - Ok(new_tx_hash) => { - let new_tx_hash = *new_tx_hash; - println!("escalated gas price for tx hash: {:?}. New tx hash: {:?}", old_tx_hash, new_tx_hash); - tracing::debug!( - old_tx_hash = ?old_tx_hash, - new_tx_hash = ?new_tx_hash, - old_gas_price = ?old_gas_price, - new_gas_price = ?new_gas_price, - "escalated gas price" + while watcher.next().await.is_some() { + let now = Instant::now(); + + // We take the contents of the mutex, and then add them back in + // later. + let mut txs: Vec<_> = { + let mut txs = self.txs.lock().await; + std::mem::take(&mut (*txs)) + // Lock scope ends + }; + + let len = txs.len(); + // Pop all transactions and re-insert those that have not been included yet + for _ in 0..len { + // this must never panic as we're explicitly within bounds + let (old_tx_hash, mut replacement_tx, time, priority) = + txs.pop().expect("should have element in vector"); + + let receipt = self + .inner + .get_transaction_receipt(old_tx_hash) + .await + .map_err(GasEscalatorError::MiddlewareError)?; + + tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists"); + + if receipt.is_none() { + let old_gas_price = replacement_tx.gas_price.expect("gas price must be set"); + // Get the new gas price based on how much time passed since the + // tx was last broadcast + let new_gas_price = self + .escalator + .get_gas_price(old_gas_price, now.duration_since(time).as_secs()); + + let new_txhash = if new_gas_price == old_gas_price { + old_tx_hash + } else { + // bump the gas price + replacement_tx.gas_price = Some(new_gas_price); + + // the tx hash will be different so we need to update it + match self.inner.send_transaction(replacement_tx.clone(), priority).await { + Ok(new_tx_hash) => { + let new_tx_hash = *new_tx_hash; + println!( + "escalated gas price for tx hash: {:?}. New tx hash: {:?}. Old gas price: {:?}. New gas price: {:?}", + old_tx_hash, new_tx_hash, old_gas_price, new_gas_price + ); + tracing::debug!( + old_tx_hash = ?old_tx_hash, + new_tx_hash = ?new_tx_hash, + old_gas_price = ?old_gas_price, + new_gas_price = ?new_gas_price, + "escalated gas price" + ); + new_tx_hash + } + Err(err) => { + if err.to_string().contains("nonce too low") { + // may happen if we try to broadcast a higher + // gas price tx when one of the previous ones + // was already mined (meaning we also do not + // push it back to the pending txs vector) + tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined."); + continue; + } else { + tracing::error!( + err = %err, + "Killing escalator backend" ); - new_tx_hash - } - Err(err) => { - if err.to_string().contains("nonce too low") { - // may happen if we try to broadcast a higher - // gas price tx when one of the previous ones - // was already mined (meaning we also do not - // push it back to the pending txs vector) - tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined."); - continue - } else { - tracing::error!( - err = %err, - "Killing escalator backend" - ); - return Err(GasEscalatorError::MiddlewareError(err)) - } + return Err(GasEscalatorError::MiddlewareError(err)); } } - }; - txs.push((new_txhash, replacement_tx, time, priority)); - } + } + }; + txs.push((new_txhash, replacement_tx, time, priority)); } - // after this big ugly loop, we dump everything back in - // we don't replace here, as the vec in the mutex may contain - // items! - self.txs.lock().await.extend(txs); - }} + } + // after this big ugly loop, we dump everything back in + // we don't replace here, as the vec in the mutex may contain + // items! + self.txs.lock().await.extend(txs); } + tracing::error!("timing future has gone away"); + Ok(()) } }