Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dan/dep fix #15

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ethers-middleware/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ serde = { version = "1.0.124", default-features = false, features = ["derive"] }
thiserror = { version = "1.0", default-features = false }
futures-util = { version = "^0.3" }
futures-locks = { version = "0.7", default-features = false }
futures-channel.workspace = "0.3.28"
tracing = { version = "0.1.37", default-features = false }
tracing-futures = { version = "0.2.5", default-features = false }

Expand Down
191 changes: 86 additions & 105 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@

use async_trait::async_trait;

use futures_channel::oneshot;
use futures_util::{lock::Mutex, select_biased};
use futures_util::lock::Mutex;
use instant::Instant;
use std::{pin::Pin, sync::Arc};
use thiserror::Error;
use tracing;

Check warning on line 13 in ethers-middleware/src/gas_escalator/mod.rs

View workflow job for this annotation

GitHub Actions / WASM

unused import: `tracing`
use tracing_futures::Instrument;

use ethers_core::types::{
transaction::eip2718::TypedTransaction, BlockId, TransactionRequest, TxHash, U256,
};
use ethers_providers::{interval, FromErr, Middleware, PendingTransaction, StreamExt};

Check warning on line 19 in ethers-middleware/src/gas_escalator/mod.rs

View workflow job for this annotation

GitHub Actions / WASM

unused import: `interval`

#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
Expand Down Expand Up @@ -60,7 +59,6 @@
/// The transactions which are currently being monitored for escalation
#[allow(clippy::type_complexity)]
pub txs: ToEscalate,
_background: oneshot::Sender<()>,
}

/// A Gas escalator allows bumping transactions' gas price to avoid getting them
Expand Down Expand Up @@ -192,18 +190,14 @@
E: GasEscalator + 'static,
M: 'static,
{
let (tx, rx) = oneshot::channel();
let inner = Arc::new(inner);

let txs: ToEscalate = Default::default();

let this = Arc::new(GasEscalatorMiddlewareInternal {
inner: inner.clone(),
txs: txs.clone(),
_background: tx,
});
let this =
Arc::new(GasEscalatorMiddlewareInternal { inner: inner.clone(), txs: txs.clone() });

let esc = EscalationTask { inner, escalator, frequency, txs, shutdown: rx };
let esc = EscalationTask { inner, escalator, frequency, txs };

{
spawn(esc.escalate().instrument(tracing::debug_span!("gas-escalation")));
Expand All @@ -220,19 +214,12 @@
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
}

#[cfg(not(target_arch = "wasm32"))]
impl<M, E> EscalationTask<M, E> {
pub fn new(
inner: M,
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
) -> Self {
Self { inner, escalator, frequency, txs, shutdown }
pub fn new(inner: M, escalator: E, frequency: Frequency, txs: ToEscalate) -> Self {
Self { inner, escalator, frequency, txs }
}

async fn escalate(mut self) -> Result<(), GasEscalatorError<M>>
Expand All @@ -254,97 +241,91 @@

let mut watcher = watcher.fuse();

loop {
select_biased! {
_ = &mut self.shutdown => {
tracing::debug!("Shutting down escalation task, middleware has gone away");
return Ok(())
}
opt = watcher.next() => {
if opt.is_none() {
tracing::error!("timing future has gone away");
return Ok(());
}
let now = Instant::now();

// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

let len = txs.len();
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let (old_tx_hash, mut replacement_tx, time, priority) =
txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists");

if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self
.escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs());

let new_txhash = if new_gas_price == old_gas_price {
old_tx_hash
} else {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);

// the tx hash will be different so we need to update it
match self.inner.send_transaction(replacement_tx.clone(), priority).await {
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
println!("escalated gas price for tx hash: {:?}. New tx hash: {:?}", old_tx_hash, new_tx_hash);
tracing::debug!(
old_tx_hash = ?old_tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
"escalated gas price"
while watcher.next().await.is_some() {
let now = Instant::now();

// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

let len = txs.len();
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let (old_tx_hash, mut replacement_tx, time, priority) =
txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists");

if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self
.escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs());

let new_txhash = if new_gas_price == old_gas_price {
old_tx_hash
} else {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);

// the tx hash will be different so we need to update it
match self.inner.send_transaction(replacement_tx.clone(), priority).await {
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
println!(
"escalated gas price for tx hash: {:?}. New tx hash: {:?}. Old gas price: {:?}. New gas price: {:?}",
old_tx_hash, new_tx_hash, old_gas_price, new_gas_price
);
tracing::debug!(
old_tx_hash = ?old_tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
"escalated gas price"
);
new_tx_hash
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a higher
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue;
} else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
new_tx_hash
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a higher
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue
} else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
return Err(GasEscalatorError::MiddlewareError(err))
}
return Err(GasEscalatorError::MiddlewareError(err));
}
}
};
txs.push((new_txhash, replacement_tx, time, priority));
}
}
};
txs.push((new_txhash, replacement_tx, time, priority));
}
// after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(txs);
}}
}
// after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(txs);
}
tracing::error!("timing future has gone away");
Ok(())
}
}

Expand Down
Loading