Skip to content

Commit

Permalink
fix(builder): capture results of sender handles and log on failures
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Sep 21, 2023
1 parent 68e48da commit 39aa5f3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
10 changes: 5 additions & 5 deletions crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{

#[async_trait]
pub(crate) trait BundleSender: Send + Sync + 'static {
async fn send_bundles_in_loop(&mut self);
async fn send_bundles_in_loop(self) -> anyhow::Result<()>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -99,10 +99,10 @@ where
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
async fn send_bundles_in_loop(&mut self) {
async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> {
let Ok(mut new_heads) = self.pool.subscribe_new_heads().await else {
error!("Failed to subscribe to new blocks");
return;
bail!("failed to subscribe to new blocks");
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
Expand Down Expand Up @@ -147,7 +147,7 @@ where
Some(b) => b,
None => {
error!("Block stream closed");
return;
bail!("Block stream closed");
}
};
// Consume any other blocks that may have been buffered up
Expand All @@ -161,7 +161,7 @@ where
}
Err(mpsc::error::TryRecvError::Disconnected) => {
error!("Block stream closed");
return;
bail!("Block stream closed");
}
}
}
Expand Down
33 changes: 20 additions & 13 deletions crates/builder/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ use ethers::{
types::{Address, H256},
};
use ethers_signers::Signer;
use futures::future;
use futures_util::TryFutureExt;
use rundler_pool::PoolServer;
use rundler_sim::{
MempoolConfig, PriorityFeeMode, SimulateValidationTracerImpl, SimulationSettings, SimulatorImpl,
};
use rundler_task::Task;
use rundler_types::contracts::i_entry_point::IEntryPoint;
use rundler_utils::{
emit::WithEntryPoint,
eth,
handle::{self, SpawnGuard},
};
use rundler_utils::{emit::WithEntryPoint, eth, handle};
use rusoto_core::Region;
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
time, try_join,
};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -123,7 +122,7 @@ where
let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?;
let manual_bundling_mode = Arc::new(AtomicBool::new(false));

let mut spawn_guards = vec![];
let mut sender_handles = vec![];
let mut send_bundle_txs = vec![];
for i in 0..self.args.num_bundle_builders {
let (spawn_guard, send_bundle_tx) = self
Expand All @@ -133,9 +132,15 @@ where
Arc::clone(&provider),
)
.await?;
spawn_guards.push(spawn_guard);
sender_handles.push(spawn_guard);
send_bundle_txs.push(send_bundle_tx);
}
// flatten the senders handles to one handle, short-circuit on errors
let sender_handle = tokio::spawn(
future::try_join_all(sender_handles)
.map_ok(|_| ())
.map_err(|e| anyhow::anyhow!(e)),
);

let builder_handle = self.builder_builder.get_handle();
let builder_runnder_handle = self.builder_builder.run(
Expand All @@ -161,6 +166,7 @@ where
info!("Started bundle builder");

match try_join!(
handle::flatten_handle(sender_handle),
handle::flatten_handle(builder_runnder_handle),
handle::flatten_handle(remote_handle),
) {
Expand Down Expand Up @@ -205,7 +211,10 @@ where
index: u64,
manual_bundling_mode: Arc<AtomicBool>,
provider: Arc<Provider<C>>,
) -> anyhow::Result<(SpawnGuard, mpsc::Sender<SendBundleRequest>)> {
) -> anyhow::Result<(
JoinHandle<anyhow::Result<()>>,
mpsc::Sender<SendBundleRequest>,
)> {
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let signer = if let Some(pk) = &self.args.private_key {
Expand Down Expand Up @@ -297,7 +306,7 @@ where
proposer_settings,
self.event_sender.clone(),
);
let mut builder = BundleSenderImpl::new(
let builder = BundleSenderImpl::new(
index,
manual_bundling_mode.clone(),
send_bundle_rx,
Expand All @@ -312,9 +321,7 @@ where
self.event_sender.clone(),
);

Ok((
SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }),
send_bundle_tx,
))
// Spawn each sender as its own independent task
Ok((tokio::spawn(builder.send_bundles_in_loop()), send_bundle_tx))
}
}

0 comments on commit 39aa5f3

Please sign in to comment.