Skip to content

Commit

Permalink
Dan/dep fix (#15)
Browse files Browse the repository at this point in the history
`cargo-build-sbf` was broken by adding `futures-channel`
  • Loading branch information
daniel-savu authored Jun 27, 2024
1 parent 86d61d6 commit df3510c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 107 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ethers-middleware/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ serde = { version = "1.0.124", default-features = false, features = ["derive"] }
thiserror = { version = "1.0", default-features = false }
futures-util = { version = "^0.3" }
futures-locks = { version = "0.7", default-features = false }
futures-channel.workspace = "0.3.28"
tracing = { version = "0.1.37", default-features = false }
tracing-futures = { version = "0.2.5", default-features = false }

Expand Down
191 changes: 86 additions & 105 deletions ethers-middleware/src/gas_escalator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ pub use linear::LinearGasPrice;

use async_trait::async_trait;

use futures_channel::oneshot;
use futures_util::{lock::Mutex, select_biased};
use futures_util::lock::Mutex;
use instant::Instant;
use std::{pin::Pin, sync::Arc};
use thiserror::Error;
Expand Down Expand Up @@ -60,7 +59,6 @@ pub(crate) struct GasEscalatorMiddlewareInternal<M> {
/// The transactions which are currently being monitored for escalation
#[allow(clippy::type_complexity)]
pub txs: ToEscalate,
_background: oneshot::Sender<()>,
}

/// A Gas escalator allows bumping transactions' gas price to avoid getting them
Expand Down Expand Up @@ -192,18 +190,14 @@ where
E: GasEscalator + 'static,
M: 'static,
{
let (tx, rx) = oneshot::channel();
let inner = Arc::new(inner);

let txs: ToEscalate = Default::default();

let this = Arc::new(GasEscalatorMiddlewareInternal {
inner: inner.clone(),
txs: txs.clone(),
_background: tx,
});
let this =
Arc::new(GasEscalatorMiddlewareInternal { inner: inner.clone(), txs: txs.clone() });

let esc = EscalationTask { inner, escalator, frequency, txs, shutdown: rx };
let esc = EscalationTask { inner, escalator, frequency, txs };

{
spawn(esc.escalate().instrument(tracing::debug_span!("gas-escalation")));
Expand All @@ -220,19 +214,12 @@ pub struct EscalationTask<M, E> {
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
}

#[cfg(not(target_arch = "wasm32"))]
impl<M, E> EscalationTask<M, E> {
pub fn new(
inner: M,
escalator: E,
frequency: Frequency,
txs: ToEscalate,
shutdown: oneshot::Receiver<()>,
) -> Self {
Self { inner, escalator, frequency, txs, shutdown }
pub fn new(inner: M, escalator: E, frequency: Frequency, txs: ToEscalate) -> Self {
Self { inner, escalator, frequency, txs }
}

async fn escalate(mut self) -> Result<(), GasEscalatorError<M>>
Expand All @@ -254,97 +241,91 @@ impl<M, E> EscalationTask<M, E> {

let mut watcher = watcher.fuse();

loop {
select_biased! {
_ = &mut self.shutdown => {
tracing::debug!("Shutting down escalation task, middleware has gone away");
return Ok(())
}
opt = watcher.next() => {
if opt.is_none() {
tracing::error!("timing future has gone away");
return Ok(());
}
let now = Instant::now();

// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

let len = txs.len();
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let (old_tx_hash, mut replacement_tx, time, priority) =
txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists");

if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self
.escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs());

let new_txhash = if new_gas_price == old_gas_price {
old_tx_hash
} else {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);

// the tx hash will be different so we need to update it
match self.inner.send_transaction(replacement_tx.clone(), priority).await {
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
println!("escalated gas price for tx hash: {:?}. New tx hash: {:?}", old_tx_hash, new_tx_hash);
tracing::debug!(
old_tx_hash = ?old_tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
"escalated gas price"
while watcher.next().await.is_some() {
let now = Instant::now();

// We take the contents of the mutex, and then add them back in
// later.
let mut txs: Vec<_> = {
let mut txs = self.txs.lock().await;
std::mem::take(&mut (*txs))
// Lock scope ends
};

let len = txs.len();
// Pop all transactions and re-insert those that have not been included yet
for _ in 0..len {
// this must never panic as we're explicitly within bounds
let (old_tx_hash, mut replacement_tx, time, priority) =
txs.pop().expect("should have element in vector");

let receipt = self
.inner
.get_transaction_receipt(old_tx_hash)
.await
.map_err(GasEscalatorError::MiddlewareError)?;

tracing::trace!(tx_hash = ?old_tx_hash, "checking if exists");

if receipt.is_none() {
let old_gas_price = replacement_tx.gas_price.expect("gas price must be set");
// Get the new gas price based on how much time passed since the
// tx was last broadcast
let new_gas_price = self
.escalator
.get_gas_price(old_gas_price, now.duration_since(time).as_secs());

let new_txhash = if new_gas_price == old_gas_price {
old_tx_hash
} else {
// bump the gas price
replacement_tx.gas_price = Some(new_gas_price);

// the tx hash will be different so we need to update it
match self.inner.send_transaction(replacement_tx.clone(), priority).await {
Ok(new_tx_hash) => {
let new_tx_hash = *new_tx_hash;
println!(
"escalated gas price for tx hash: {:?}. New tx hash: {:?}. Old gas price: {:?}. New gas price: {:?}",
old_tx_hash, new_tx_hash, old_gas_price, new_gas_price
);
tracing::debug!(
old_tx_hash = ?old_tx_hash,
new_tx_hash = ?new_tx_hash,
old_gas_price = ?old_gas_price,
new_gas_price = ?new_gas_price,
"escalated gas price"
);
new_tx_hash
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a higher
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue;
} else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
new_tx_hash
}
Err(err) => {
if err.to_string().contains("nonce too low") {
// may happen if we try to broadcast a higher
// gas price tx when one of the previous ones
// was already mined (meaning we also do not
// push it back to the pending txs vector)
tracing::warn!(err = %err, ?old_tx_hash, ?replacement_tx, "Nonce error when escalating gas price. Tx may have already been mined.");
continue
} else {
tracing::error!(
err = %err,
"Killing escalator backend"
);
return Err(GasEscalatorError::MiddlewareError(err))
}
return Err(GasEscalatorError::MiddlewareError(err));
}
}
};
txs.push((new_txhash, replacement_tx, time, priority));
}
}
};
txs.push((new_txhash, replacement_tx, time, priority));
}
// after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(txs);
}}
}
// after this big ugly loop, we dump everything back in
// we don't replace here, as the vec in the mutex may contain
// items!
self.txs.lock().await.extend(txs);
}
tracing::error!("timing future has gone away");
Ok(())
}
}

Expand Down

0 comments on commit df3510c

Please sign in to comment.