diff --git a/src/builder/task.rs b/src/builder/task.rs index cd7f7ffef..4d5e7e05a 100644 --- a/src/builder/task.rs +++ b/src/builder/task.rs @@ -1,10 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::{bail, Context}; -use ethers::{ - providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder}, - types::{Address, H256}, -}; +use ethers::types::{Address, H256}; use ethers_signers::Signer; use rusoto_core::Region; use tokio::{select, sync::broadcast, time}; @@ -14,7 +11,6 @@ use tonic::{ transport::{Channel, Server}, }; use tracing::info; -use url::Url; use crate::{ builder::{ @@ -29,6 +25,7 @@ use crate::{ common::{ contracts::i_entry_point::IEntryPoint, emit::WithEntryPoint, + eth, gas::PriorityFeeMode, handle::{SpawnGuard, Task}, mempool::MempoolConfig, @@ -81,7 +78,7 @@ impl Task for BuilderTask { info!("Starting builder server on {}", addr); tracing::info!("Mempool config: {:?}", self.args.mempool_configs); - let provider = new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; + let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?; let signer = if let Some(pk) = &self.args.private_key { info!("Using local signer"); BundlerSigner::Local( @@ -139,7 +136,8 @@ impl Task for BuilderTask { proposer_settings, self.event_sender.clone(), ); - let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; + let submit_provider = + eth::new_provider(&self.args.submit_url, self.args.eth_poll_interval)?; let transaction_sender = get_sender( submit_provider, signer, @@ -240,19 +238,3 @@ impl BuilderTask { } } } - -fn new_provider( - url: &str, - poll_interval: Duration, -) -> anyhow::Result>>> { - let parsed_url = Url::parse(url).context("provider url should be a valid")?; - let http = Http::new(parsed_url); - let client = RetryClientBuilder::default() - // these retries are if the server returns a 429 - .rate_limit_retries(10) - // these retries are if the connection is dubious - .timeout_retries(3) - .initial_backoff(Duration::from_millis(500)) - .build(http, Box::::default()); - Ok(Arc::new(Provider::new(client).interval(poll_interval))) -} diff --git a/src/cli/builder.rs b/src/cli/builder.rs index 2463aa58e..7fef9d4df 100644 --- a/src/cli/builder.rs +++ b/src/cli/builder.rs @@ -159,10 +159,7 @@ impl BuilderArgs { common.priority_fee_mode_value, )?; - let rpc_url = common - .node_http - .clone() - .context("should have a node HTTP URL")?; + let rpc_url = common.node_http.clone(); let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone()); let mempool_configs = match &common.mempool_config_path { diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 231dbe46b..7d91a8d20 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -100,10 +100,6 @@ pub struct CommonArgs { )] chain_id: u64, - /// ETH Node websocket URL to connect to - #[arg(long = "node_ws", name = "node_ws", env = "NODE_WS", global = true)] - node_ws: Option, - /// ETH Node HTTP URL to connect to #[arg( long = "node_http", @@ -111,7 +107,7 @@ pub struct CommonArgs { env = "NODE_HTTP", global = true )] - node_http: Option, + node_http: String, #[arg( long = "max_verification_gas", diff --git a/src/cli/pool.rs b/src/cli/pool.rs index 65a29c387..307ffb340 100644 --- a/src/cli/pool.rs +++ b/src/cli/pool.rs @@ -2,6 +2,7 @@ use std::time::Duration; use anyhow::Context; use clap::Args; +use ethers::types::Chain; use tokio::sync::broadcast; use super::CommonArgs; @@ -83,6 +84,13 @@ pub struct PoolArgs { env = "POOL_ALLOWLIST_PATH" )] pub allowlist_path: Option, + + #[arg( + long = "pool.chain_history_size", + name = "pool.chain_history_size", + env = "POOL_CHAIN_HISTORY_SIZE" + )] + pub chain_history_size: Option, } impl PoolArgs { @@ -122,7 +130,9 @@ impl PoolArgs { port: self.port, host: self.host.clone(), chain_id: common.chain_id, - ws_url: common.node_ws.clone(), + chain_history_size: self + .chain_history_size + .unwrap_or_else(|| default_chain_history_size(common.chain_id)), http_url: common.node_http.clone(), http_poll_interval: Duration::from_millis(self.http_poll_interval_millis), pool_configs, @@ -130,6 +140,21 @@ impl PoolArgs { } } +const SMALL_HISTORY_SIZE: u64 = 16; +const LARGE_HISTORY_SIZE: u64 = 128; + +// Mainnets that are known to not have large reorgs can use the small history +// size. Use the large history size for all testnets because I don't trust them. +const SMALL_HISTORY_CHAIN_IDS: &[u64] = &[Chain::Mainnet as u64, Chain::Arbitrum as u64]; + +fn default_chain_history_size(chain_id: u64) -> u64 { + if SMALL_HISTORY_CHAIN_IDS.contains(&chain_id) { + SMALL_HISTORY_SIZE + } else { + LARGE_HISTORY_SIZE + } +} + /// CLI options for the Pool server standalone #[derive(Args, Debug)] pub struct PoolCliArgs { diff --git a/src/cli/rpc.rs b/src/cli/rpc.rs index 98af54455..741286864 100644 --- a/src/cli/rpc.rs +++ b/src/cli/rpc.rs @@ -98,10 +98,7 @@ impl RpcArgs { .map(|ep| ep.parse()) .collect::, _>>() .context("Invalid entry_points argument")?, - rpc_url: common - .node_http - .clone() - .context("rpc requires node_http arg")?, + rpc_url: common.node_http.clone(), chain_id: common.chain_id, api_namespaces: apis, precheck_settings, diff --git a/src/common/block_watcher.rs b/src/common/block_watcher.rs new file mode 100644 index 000000000..e0252ae25 --- /dev/null +++ b/src/common/block_watcher.rs @@ -0,0 +1,24 @@ +use std::time::Duration; + +use tokio::time; + +use crate::common::{retry, retry::UnlimitedRetryOpts, types::ProviderLike}; + +pub async fn wait_for_new_block_number( + provider: &impl ProviderLike, + last_block_number: u64, + poll_interval: Duration, +) -> u64 { + loop { + let block_number = retry::with_unlimited_retries( + "watch latest block number", + || provider.get_block_number(), + UnlimitedRetryOpts::default(), + ) + .await; + if last_block_number < block_number { + return block_number; + } + time::sleep(poll_interval).await; + } +} diff --git a/src/common/eth.rs b/src/common/eth.rs index 938434141..dedf0c0a6 100644 --- a/src/common/eth.rs +++ b/src/common/eth.rs @@ -1,17 +1,37 @@ -use std::{error, future::Future, ops::Deref, sync::Arc}; +use std::{error, future::Future, ops::Deref, sync::Arc, time::Duration}; use anyhow::Context; use ethers::{ abi::{AbiDecode, AbiEncode, RawLog}, contract::{builders::ContractCall, Contract, ContractDeployer, ContractError}, - providers::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError}, + providers::{ + Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, PendingTransaction, Provider, + ProviderError, RetryClient, RetryClientBuilder, + }, types::{ Address, BlockId, Bytes, Eip1559TransactionRequest, Log, Selector, TransactionReceipt, H256, }, }; +use url::Url; use crate::common::contracts::get_code_hashes::{CodeHashesResult, GETCODEHASHES_BYTECODE}; +pub fn new_provider( + url: &str, + poll_interval: Duration, +) -> anyhow::Result>>> { + let parsed_url = Url::parse(url).context("provider url should be valid")?; + let http = Http::new(parsed_url); + let client = RetryClientBuilder::default() + // these retries are if the server returns a 429 + .rate_limit_retries(10) + // these retries are if the connection is dubious + .timeout_retries(3) + .initial_backoff(Duration::from_millis(500)) + .build(http, Box::::default()); + Ok(Arc::new(Provider::new(client).interval(poll_interval))) +} + /// Waits for a pending transaction to be mined, providing appropriate error /// messages for each point of failure. pub async fn await_mined_tx<'a, Fut, C, Err>( diff --git a/src/common/handle.rs b/src/common/handle.rs index f933ce854..f3f4b7654 100644 --- a/src/common/handle.rs +++ b/src/common/handle.rs @@ -12,9 +12,7 @@ use tracing::{error, info}; /// /// Flattens the two types of errors that can occur when awaiting a handle. /// Useful when using tokio::try_join! to await multiple handles. -pub async fn flatten_handle( - handle: JoinHandle>, -) -> Result { +pub async fn flatten_handle(handle: JoinHandle>) -> anyhow::Result { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => Err(err)?, @@ -22,6 +20,13 @@ pub async fn flatten_handle( } } +/// Converts a JoinHandle result into an `anyhow::Result`. Like +/// `flatten_handle`, useful when using `tokio::try_join!` to await multiple +/// handles. +pub async fn as_anyhow_handle(handle: JoinHandle) -> anyhow::Result { + handle.await.context("handling failed") +} + /// A guard that aborts a spawned task when dropped. #[derive(Debug)] pub struct SpawnGuard(AbortHandle); diff --git a/src/common/mod.rs b/src/common/mod.rs index 30104d5f6..66e976e19 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,3 +1,4 @@ +pub mod block_watcher; pub mod context; pub mod contracts; pub mod dev; @@ -10,6 +11,7 @@ pub mod math; pub mod mempool; pub mod precheck; pub mod protos; +pub mod retry; pub mod server; pub mod simulation; pub mod strs; diff --git a/src/common/retry.rs b/src/common/retry.rs new file mode 100644 index 000000000..4f4923f6b --- /dev/null +++ b/src/common/retry.rs @@ -0,0 +1,89 @@ +use std::{future::Future, time::Duration}; + +use rand::Rng; +use tokio::time; +use tracing::warn; + +#[derive(Clone, Copy, Debug)] +pub struct RetryOpts { + pub max_attempts: u64, + /// The first retry is immediately after the first failure (plus jitter). + /// The next retry after that will wait this long. + pub min_nonzero_wait: Duration, + pub max_wait: Duration, + pub max_jitter: Duration, +} + +impl Default for RetryOpts { + fn default() -> Self { + UnlimitedRetryOpts::default().to_retry_opts_with_max_attempts(10) + } +} + +pub async fn with_retries( + description: &str, + func: Func, + opts: RetryOpts, +) -> Result +where + Func: Fn() -> Fut, + Fut: Future>, +{ + let mut next_wait = Duration::ZERO; + let mut last_error: Option = None; + for attempt_number in 1..opts.max_attempts + 1 { + match func().await { + Ok(out) => return Ok(out), + Err(error) => { + last_error = Some(error); + warn!("Failed to {description} (attempt {attempt_number})"); + } + } + // Grab a new rng each iteration because we can't hold it across awaits. + let jitter = rand::thread_rng().gen_range(Duration::ZERO..opts.max_jitter); + time::sleep(next_wait + jitter).await; + next_wait = (2 * next_wait).clamp(opts.min_nonzero_wait, opts.max_wait); + } + Err(last_error.unwrap()) +} + +#[derive(Clone, Copy, Debug)] +pub struct UnlimitedRetryOpts { + pub min_nonzero_wait: Duration, + pub max_wait: Duration, + pub max_jitter: Duration, +} + +impl Default for UnlimitedRetryOpts { + fn default() -> Self { + Self { + min_nonzero_wait: Duration::from_secs(1), + max_wait: Duration::from_secs(10), + max_jitter: Duration::from_secs(1), + } + } +} + +impl UnlimitedRetryOpts { + fn to_retry_opts_with_max_attempts(self, max_attempts: u64) -> RetryOpts { + RetryOpts { + max_attempts, + min_nonzero_wait: self.min_nonzero_wait, + max_wait: self.max_wait, + max_jitter: self.max_jitter, + } + } +} + +pub async fn with_unlimited_retries( + description: &str, + func: Func, + opts: UnlimitedRetryOpts, +) -> Out +where + Func: Fn() -> Fut, + Fut: Future>, +{ + let opts = opts.to_retry_opts_with_max_attempts(u64::MAX); + with_retries(description, func, opts).await.ok().unwrap() +} diff --git a/src/common/server.rs b/src/common/server.rs index 8c094f921..f5bc64eeb 100644 --- a/src/common/server.rs +++ b/src/common/server.rs @@ -1,7 +1,8 @@ -use std::{cmp, future::Future, time::Duration}; +use std::{future::Future, time::Duration}; -use anyhow::bail; -use rand::Rng; +use anyhow::Context; + +use crate::common::{retry, retry::RetryOpts}; pub fn format_server_addr(host: &String, port: u16, secure: bool) -> String { if secure { @@ -24,21 +25,17 @@ where F: Fn(String) -> FutF, FutF: Future> + Send + 'static, { - for i in 0..10 { - match func(url.to_owned()).await { - Ok(client) => return Ok(client), - Err(e) => tracing::warn!( - "Failed to connect to {server_name} at {url} {e:?} (attempt {})", - i - ), - } - let sleep_dur = { - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..1000); - let millis = cmp::min(10, 2_u64.pow(i)) * 1000 + jitter; - Duration::from_millis(millis) - }; - tokio::time::sleep(sleep_dur).await; - } - bail!("Failed to connect to {server_name} at {url} after 10 attempts"); + let description = format!("connect to {server_name} at {url}"); + retry::with_retries( + &description, + || func(url.to_owned()), + RetryOpts { + max_attempts: 10, + min_nonzero_wait: Duration::from_secs(1), + max_wait: Duration::from_secs(10), + max_jitter: Duration::from_secs(1), + }, + ) + .await + .context("should connect to server when retrying") } diff --git a/src/common/types/provider_like.rs b/src/common/types/provider_like.rs index 237e2c86c..a636fcd8e 100644 --- a/src/common/types/provider_like.rs +++ b/src/common/types/provider_like.rs @@ -4,7 +4,7 @@ use anyhow::Context; use ethers::{ contract::ContractError, providers::{JsonRpcClient, Middleware, Provider}, - types::{Address, Block, BlockId, BlockNumber, Bytes, H160, H256, U256}, + types::{Address, Block, BlockId, BlockNumber, Bytes, Filter, Log, H160, H256, U256}, }; #[cfg(test)] use mockall::automock; @@ -48,6 +48,8 @@ pub trait ProviderLike: Send + Sync + 'static { async fn get_transaction_count(&self, address: Address) -> anyhow::Result; + async fn get_logs(&self, filter: &Filter) -> anyhow::Result>; + async fn aggregate_signatures( self: Arc, aggregator_address: Address, @@ -114,10 +116,15 @@ impl ProviderLike for Provider { } async fn get_max_priority_fee(&self) -> anyhow::Result { - Ok(self - .request("eth_maxPriorityFeePerGas", ()) + self.request("eth_maxPriorityFeePerGas", ()) + .await + .context("should get max priority fee from provider") + } + + async fn get_logs(&self, filter: &Filter) -> anyhow::Result> { + Middleware::get_logs(self, filter) .await - .context("should get max priority fee from provider")?) + .context("provider should get logs") } async fn aggregate_signatures( diff --git a/src/op_pool/chain.rs b/src/op_pool/chain.rs new file mode 100644 index 000000000..245c0a558 --- /dev/null +++ b/src/op_pool/chain.rs @@ -0,0 +1,400 @@ +use std::{collections::VecDeque, ops::RangeInclusive, sync::Arc, time::Duration}; + +use anyhow::{ensure, Context}; +use ethers::{ + contract, + prelude::EthEvent, + types::{Address, Block, Filter, H256, U256}, +}; +use futures::future; +use tokio::{ + select, + sync::{broadcast, Semaphore}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::common::{ + block_watcher, contracts::i_entry_point::UserOperationEventFilter, types::ProviderLike, +}; + +const MAX_LOAD_OPS_CONCURRENCY: usize = 16; + +/// A data structure that holds the currently known recent state of the chain, +/// with logic for updating itself and returning what has changed. +/// +/// Will update itself when `.sync_to_block_number` is called, at which point it +/// will query a node to determine the new state of the chain. +#[derive(Debug)] +pub struct Chain { + provider: Arc

, + settings: Settings, + /// Blocks are stored from earliest to latest, so the oldest block is at the + /// front of this deque and the newest at the back. + blocks: VecDeque, + /// Semaphore to limit the number of concurrent `eth_getLogs` calls. + load_ops_semaphore: Semaphore, +} + +#[derive(Debug)] +pub struct ChainUpdate { + pub latest_block_number: u64, + pub latest_block_hash: H256, + /// Blocks before this number are no longer tracked in this `Chain`, so no + /// further updates related to them will be sent. + pub earliest_remembered_block_number: u64, + pub reorg_depth: u64, + pub mined_ops: Vec, + pub unmined_ops: Vec, +} + +#[derive(Clone, Copy, Debug)] +pub struct MinedOp { + pub hash: H256, + pub entry_point: Address, + pub sender: Address, + pub nonce: U256, +} + +#[derive(Debug)] +pub struct Settings { + pub history_size: u64, + pub max_backfill: u64, + pub poll_interval: Duration, + pub entry_point_addresses: Vec

, +} + +#[derive(Debug)] +struct BlockSummary { + pub number: u64, + pub hash: H256, + pub parent_hash: H256, + pub ops: Vec, +} + +impl Chain

{ + pub fn new(provider: Arc

, settings: Settings) -> Self { + let history_size = settings.history_size as usize; + assert!(history_size > 0, "history size should be positive"); + Self { + provider, + settings, + blocks: VecDeque::new(), + load_ops_semaphore: Semaphore::new(MAX_LOAD_OPS_CONCURRENCY), + } + } + + pub fn spawn_watcher( + mut self, + sender: broadcast::Sender>, + shutdown_token: CancellationToken, + ) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + select! { + update = self.wait_for_update() => { + let _ = sender.send(Arc::new(update)); + } + _ = shutdown_token.cancelled() => { + info!("Shutting down chain watcher"); + break; + } + } + } + }) + } + + async fn wait_for_update(&mut self) -> ChainUpdate { + let mut block_number = self + .blocks + .back() + .map(|block| block.number) + .unwrap_or_default(); + loop { + block_number = block_watcher::wait_for_new_block_number( + &*self.provider, + block_number, + self.settings.poll_interval, + ) + .await; + let update = self.sync_to_block_number(block_number).await; + match update { + Ok(update) => return update, + Err(error) => { + error!("Failed to update chain at block {block_number}. Will try again at next block. {error:?}"); + } + } + } + } + + pub async fn sync_to_block_number( + &mut self, + new_block_number: u64, + ) -> anyhow::Result { + let Some(latest_block) = self.blocks.back() else { + return self.reset_and_initialize(new_block_number).await; + }; + let current_block_number = latest_block.number; + ensure!( + current_block_number < new_block_number, + "provided new block number should be ahead of latest number known by chain" + ); + if current_block_number + self.settings.max_backfill < new_block_number { + warn!( + "New block {new_block_number} number is {} blocks ahead of the previously known head. Chain history will skip ahead.", + new_block_number - current_block_number, + ); + return self.reset_and_initialize(new_block_number).await; + } + let added_blocks = self + .load_added_blocks_connecting_to_existing_chain(current_block_number, new_block_number) + .await?; + Ok(self.update_with_blocks(current_block_number, added_blocks)) + } + + async fn reset_and_initialize(&mut self, new_block_number: u64) -> anyhow::Result { + let min_block_number = new_block_number.saturating_sub(self.settings.history_size - 1); + let mut blocks = self + .load_blocks_in_range_no_ops(min_block_number..=new_block_number) + .await?; + self.load_ops_into_block_summaries(&mut blocks).await?; + self.blocks = blocks; + let mined_ops: Vec<_> = self + .blocks + .iter() + .flat_map(|block| &block.ops) + .copied() + .collect(); + Ok(self.new_update(0, mined_ops, vec![])) + } + + /// Given a collection of blocks to add to the chain, whose numbers may + /// overlap the current numbers in the case of reorgs, update the state of + /// this data structure and return an update struct. + fn update_with_blocks( + &mut self, + current_block_number: u64, + added_blocks: VecDeque, + ) -> ChainUpdate { + let mined_ops: Vec<_> = added_blocks + .iter() + .flat_map(|block| &block.ops) + .copied() + .collect(); + let reorg_depth = current_block_number + 1 - added_blocks[0].number; + let unmined_ops: Vec<_> = self + .blocks + .iter() + .skip(self.blocks.len() - reorg_depth as usize) + .flat_map(|block| &block.ops) + .copied() + .collect(); + for _ in 0..reorg_depth { + self.blocks.pop_back(); + } + self.blocks.extend(added_blocks); + while self.blocks.len() > self.settings.history_size as usize { + self.blocks.pop_front(); + } + self.new_update(reorg_depth, mined_ops, unmined_ops) + } + + async fn load_added_blocks_connecting_to_existing_chain( + &self, + current_block_number: u64, + new_block_number: u64, + ) -> anyhow::Result> { + // Load blocks from last known number to current. + let mut added_blocks = self + .load_blocks_in_range_no_ops((current_block_number + 1)..=new_block_number) + .await + .context("chain should load blocks from last processed to latest block")?; + ensure!( + !added_blocks.is_empty(), + "added blocks should never be empty" + ); + // Continue to load blocks backwards until we connect with the known chain, if necessary. + loop { + let earliest_new_block = &added_blocks[0]; + if earliest_new_block.number == 0 { + break; + } + let Some(presumed_parent) = self.block_with_number(earliest_new_block.number - 1) + else { + warn!( + "Reorg is deeper than chain history size ({})", + self.blocks.len() + ); + break; + }; + if presumed_parent.hash == earliest_new_block.parent_hash { + break; + } + // The earliest newly loaded block's parent does not match the known + // chain, so continue to load blocks backwards, replacing the known + // chain, until it does. + let block = self + .provider + .get_block(earliest_new_block.parent_hash) + .await + .context("should load parent block when handling reorg")? + .context("block with parent hash of known block should exist")?; + let block = + BlockSummary::try_from_block_without_ops(block, earliest_new_block.number - 1)?; + added_blocks.push_front(block); + } + self.load_ops_into_block_summaries(&mut added_blocks) + .await?; + Ok(added_blocks) + } + + async fn load_blocks_in_range_no_ops( + &self, + range: RangeInclusive, + ) -> anyhow::Result> { + // Load blocks back from latest one at a time following parent hashes. + // While it's tempting to try to load in bulk, doing so makes it + // difficult to handle reorgs that occur in the middle of loading. + let num_blocks_to_load = range_len(&range); + if num_blocks_to_load == 0 { + return Ok(VecDeque::new()); + } + let mut blocks = VecDeque::with_capacity(num_blocks_to_load); + let block = self + .provider + .get_block(*range.end()) + .await? + .context("latest block in range should exist")?; + blocks.push_front(BlockSummary::try_from_block_without_ops( + block, + *range.end(), + )?); + while blocks.len() < num_blocks_to_load { + let parent_hash = blocks[0].parent_hash; + let parent = self + .provider + .get_block(parent_hash) + .await + .context("should load ")? + .context("block with parent hash of known block should exist")?; + blocks.push_front(BlockSummary::try_from_block_without_ops( + parent, + blocks[0].number - 1, + )?); + } + Ok(blocks) + } + + async fn load_ops_into_block_summaries( + &self, + blocks: &mut VecDeque, + ) -> anyhow::Result<()> { + // As when loading blocks, load op events block-by-block, specifying + // block hash. Don't load with a single call by block number range + // because if the network is in the middle of a reorg, then we can't + // tell which branch we read events from. + let future_opses = blocks + .iter() + .map(|block| self.load_ops_in_block_with_hash(block.hash)); + let opses = future::try_join_all(future_opses) + .await + .context("should load ops for new blocks")?; + for (i, ops) in opses.into_iter().enumerate() { + blocks[i].ops = ops; + } + Ok(()) + } + + async fn load_ops_in_block_with_hash(&self, block_hash: H256) -> anyhow::Result> { + let _permit = self + .load_ops_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + let filter = Filter::new() + .address(self.settings.entry_point_addresses.clone()) + .event(&UserOperationEventFilter::abi_signature()) + .at_block_hash(block_hash); + let logs = self + .provider + .get_logs(&filter) + .await + .context("chain state should load user operation events")?; + logs.into_iter() + .map(|log| { + let entry_point = log.address; + let event = contract::parse_log::(log)?; + Ok(MinedOp { + hash: event.user_op_hash.into(), + entry_point, + sender: event.sender, + nonce: event.nonce, + }) + }) + .collect() + } + + fn block_with_number(&self, number: u64) -> Option<&BlockSummary> { + let earliest_number = self.blocks.front()?.number; + if number < earliest_number { + return None; + } + self.blocks.get((number - earliest_number) as usize) + } + + fn new_update( + &self, + reorg_depth: u64, + mined_ops: Vec, + unmined_ops: Vec, + ) -> ChainUpdate { + let latest_block = self + .blocks + .back() + .expect("new_update should not be called when blocks is empty"); + ChainUpdate { + latest_block_number: latest_block.number, + latest_block_hash: latest_block.hash, + earliest_remembered_block_number: self.blocks[0].number, + reorg_depth, + mined_ops, + unmined_ops, + } + } +} + +impl BlockSummary { + /// Converts a block returned from a provider into a `BlockSummary` with no + /// ops. Takes an expected block number and returns an error if it doesn't + /// match the block. While a provider should never return a block number + /// that doesn't match what we expect, if the provider does return bad data + /// it's better to catch it now than run into panics from bad indexing math + /// later. + fn try_from_block_without_ops( + block: Block, + expected_block_number: u64, + ) -> anyhow::Result { + let number = block + .number + .context("block number should be present")? + .as_u64(); + ensure!( + number == expected_block_number, + "block number {number} should match expected {expected_block_number}" + ); + Ok(Self { + number: block + .number + .context("block number should be present")? + .as_u64(), + hash: block.hash.context("block hash should exist")?, + parent_hash: block.parent_hash, + ops: Vec::new(), + }) + } +} + +fn range_len(range: &RangeInclusive) -> usize { + (*range.end() - *range.start() + 1) as usize +} diff --git a/src/op_pool/event/http.rs b/src/op_pool/event/http.rs deleted file mode 100644 index d042f1bf9..000000000 --- a/src/op_pool/event/http.rs +++ /dev/null @@ -1,189 +0,0 @@ -use std::time::Duration; - -use ethers::{ - providers::{ - Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClientBuilder, StreamExt, - }, - types::{Block, Filter, Log, H256}, -}; -use tokio::{ - select, - sync::{mpsc, oneshot}, - try_join, -}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::async_trait; -use tracing::error; -use url::Url; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -/// A block provider factory -#[derive(Debug)] -pub struct HttpBlockProviderFactory { - http_url: String, - poll_interval: Duration, - num_retries: u64, -} - -impl HttpBlockProviderFactory { - pub fn new(http_url: String, poll_interval: Duration, num_retries: u64) -> Self { - Self { - http_url, - poll_interval, - num_retries, - } - } -} - -#[async_trait] -impl BlockProviderFactory for HttpBlockProviderFactory { - type Provider = HttpBlockProvider; - - fn new_provider(&self) -> Self::Provider { - HttpBlockProvider::new(self.http_url.clone(), self.poll_interval, self.num_retries) - } -} - -/// An http provider that uses ethers to stream blocks from a node's http endpoint -/// Polls the node for new blocks at a given interval -#[derive(Debug)] -pub struct HttpBlockProvider { - http_url: String, - poll_interval: Duration, - num_retries: u64, - shutdown_sender: Option>, -} - -impl HttpBlockProvider { - pub fn new(http_url: String, poll_interval: Duration, num_retries: u64) -> Self { - Self { - http_url, - poll_interval, - num_retries, - shutdown_sender: None, - } - } -} - -#[async_trait] -impl BlockProvider for HttpBlockProvider { - type BlockStream = ReceiverStream>; - - async fn subscribe(&mut self, filter: Filter) -> Result { - if self.shutdown_sender.is_some() { - return Err(BlockProviderError::ConnectionError( - "already subscribed".to_string(), - )); - } - - let parsed_url = Url::parse(&self.http_url) - .map_err(|e| BlockProviderError::ConnectionError(e.to_string()))?; - let num_retries = self.num_retries.try_into().map_err(|_| { - BlockProviderError::ConnectionError(format!( - "num_retries {} is too large", - self.num_retries - )) - })?; - let http = Http::new(parsed_url); - let client = RetryClientBuilder::default() - .rate_limit_retries(num_retries) - .timeout_retries(num_retries) - .initial_backoff(self.poll_interval) - .build(http, Box::::default()); - let mut provider = Provider::new(client); - provider.set_interval(self.poll_interval); - - // test the connection - let _ = provider - .get_block_number() - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - - let (tx, rx) = mpsc::channel(10_000); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - self.shutdown_sender = Some(shutdown_tx); - - tokio::spawn(async move { - let mut block_stream = match provider.watch_blocks().await { - Ok(stream) => stream, - Err(err) => { - tracing::error!("Error subscribing to blocks: {:?}", err); - let _ = tx - .send(Err(BlockProviderError::ConnectionError(err.to_string()))) - .await; - return; - } - }; - - loop { - select! { - block = block_stream.next() => { - let msg = match block { - Some(block_hash) => { - let block_filter = filter.clone().at_block_hash(block_hash); - match try_join!(Self::get_block(&provider, block_hash), Self::get_logs(&provider, &block_filter)) { - Ok((block, logs)) => { - Ok(BlockWithLogs { block, logs }) - }, - Err(err) => { - error!("Error getting block or logs: {:?}", err); - Err(err) - } - } - }, - None => { - error!("Block stream ended"); - Err(BlockProviderError::ConnectionError("Block stream ended".to_string())) - } - }; - let was_err = msg.is_err(); - if tx.send(msg).await.is_err() { - error!("Receiver dropped"); - return; - } - if was_err { - error!("Provider error getting block or logs, ending subscription"); - return; - } - }, - _ = &mut shutdown_rx => { - return; - } - } - } - }); - - Ok(ReceiverStream::new(rx)) - } - - async fn unsubscribe(&mut self) { - let shutdown_sender = self.shutdown_sender.take(); - if let Some(ss) = shutdown_sender { - let _ = ss.send(()); - } - } -} - -impl HttpBlockProvider { - async fn get_block( - provider: &P, - block_hash: H256, - ) -> Result, BlockProviderError> { - provider - .get_block(block_hash) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))? - .ok_or_else(|| BlockProviderError::RpcError("Block not found".to_string())) - } - - async fn get_logs( - provider: &P, - filter: &Filter, - ) -> Result, BlockProviderError> { - provider - .get_logs(filter) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string())) - } -} diff --git a/src/op_pool/event/listener.rs b/src/op_pool/event/listener.rs deleted file mode 100644 index e7c15b03b..000000000 --- a/src/op_pool/event/listener.rs +++ /dev/null @@ -1,609 +0,0 @@ -use std::{ - cmp, - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::Context; -use ethers::{ - prelude::parse_log, - types::{Address, Filter}, -}; -use rand::Rng; -use tokio::{ - select, - sync::broadcast, - task::JoinHandle, - time::{sleep, timeout}, -}; -use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -use super::{ - BlockProvider, BlockProviderFactory, BlockWithLogs, EntryPointEvent, EventProvider, - NewBlockEvent, -}; - -const MAX_BACKOFF_SECONDS: u64 = 60; - -/// An event listener that listens for new blocks and emits events -/// for each provided entry point -pub struct EventListener { - provider_factory: F, - provider: Option, - log_filter_base: Filter, - entrypoint_event_broadcasts: HashMap>>, - last_reconnect: Option, - backoff_idx: u32, -} - -impl EventProvider for EventListener { - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>> { - self.entrypoint_event_broadcasts - .get(&entry_point) - .map(|b| b.subscribe()) - } - - fn spawn( - self: Box, - shutdown_token: CancellationToken, - ) -> JoinHandle> { - tokio::spawn(async move { self.listen_with_shutdown(shutdown_token).await }) - } -} - -impl EventListener { - /// Create a new event listener from a block provider factory and list entry points - /// Must call listen_with_shutdown to start listening - pub fn new<'a>( - provider_factory: F, - entry_points: impl IntoIterator, - ) -> Self { - let mut entry_point_addresses = vec![]; - let mut entrypoint_event_broadcasts = HashMap::new(); - for ep in entry_points { - entry_point_addresses.push(*ep); - entrypoint_event_broadcasts.insert(*ep, broadcast::channel(1000).0); - } - - let log_filter_base = Filter::new().address(entry_point_addresses); - - Self { - provider_factory, - provider: None, - log_filter_base, - entrypoint_event_broadcasts, - last_reconnect: None, - backoff_idx: 0, - } - } - - /// Consumes the listener and starts listening for new blocks - /// until the shutdown signal is received - pub async fn listen_with_shutdown( - mut self, - shutdown_token: CancellationToken, - ) -> anyhow::Result<()> { - let mut block_stream = self.reconnect(shutdown_token.clone()).await?; - loop { - select! { - block_with_logs = block_stream.next() => { - match block_with_logs { - Some(Ok(block_with_logs)) => { - if let Err(err) = self.handle_block_event(block_with_logs).await { - error!("Error handling block event: {err:?}"); - EventListenerMetrics::increment_block_handler_errors(); - } - } - Some(Err(err)) => { - error!("Error getting block: {:?}", err); - EventListenerMetrics::increment_provider_errors(); - block_stream = self.reconnect(shutdown_token.clone()).await?; - } - None => { - error!("Block stream ended unexpectedly"); - EventListenerMetrics::increment_provider_errors(); - block_stream = self.reconnect(shutdown_token.clone()).await?; - } - } - } - _ = shutdown_token.cancelled() => { - info!("Shutting down event listener"); - break; - } - } - } - - Ok(()) - } - - async fn reconnect( - &mut self, - shutdown_token: CancellationToken, - ) -> anyhow::Result<::BlockStream> { - if let Some(mut provider) = self.provider.take() { - provider.unsubscribe().await; - } - - let since_last_recovery = self.last_reconnect.map(|t| Instant::now() - t); - if let Some(since_last_recovery) = since_last_recovery { - let expected_backoff = self.backoff_time(); - if since_last_recovery < expected_backoff { - let sleep_duration = expected_backoff - since_last_recovery; - info!( - "Reconnecting too quickly, sleeping {:?} before reconnecting", - sleep_duration - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } else { - info!("Reconnecting after {since_last_recovery:?}, expected backoff {expected_backoff:?}"); - self.backoff_idx = 0; - } - } - - loop { - self.provider = Some(self.provider_factory.new_provider()); - match timeout( - Duration::from_secs(5), - self.provider - .as_mut() - .unwrap() - .subscribe(self.log_filter_base.clone()), - ) - .await - { - Ok(Ok(block_stream)) => { - info!("Connected to event provider"); - self.last_reconnect = Some(Instant::now()); - return Ok(block_stream); - } - Ok(Err(err)) => { - EventListenerMetrics::increment_provider_errors(); - let sleep_duration = self.backoff_time(); - error!( - "Error connecting to event provider, sleeping {sleep_duration:?}: {err:?}" - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } - Err(err) => { - EventListenerMetrics::increment_provider_errors(); - let sleep_duration = self.backoff_time(); - error!( - "Timeout connecting to event provider, sleeping {sleep_duration:?}: {err:?}" - ); - if self - .sleep_or_shutdown(shutdown_token.clone(), sleep_duration) - .await - { - return Err(anyhow::anyhow!("Shutdown signal received")); - } - } - } - } - } - - async fn handle_block_event(&self, mut block_with_logs: BlockWithLogs) -> anyhow::Result<()> { - block_with_logs - .logs - .sort_by(|a, b| a.log_index.cmp(&b.log_index)); - let block_hash = block_with_logs - .block - .hash - .context("block should have hash")?; - let block_number = block_with_logs - .block - .number - .context("block should have number")?; - let mut block_events = HashMap::new(); - - EventListenerMetrics::increment_blocks_seen(); - EventListenerMetrics::set_block_height(block_number.as_u64()); - - for ep in self.entrypoint_event_broadcasts.keys() { - block_events.insert( - *ep, - NewBlockEvent { - address: *ep, - hash: block_hash, - number: block_number, - events: vec![], - }, - ); - } - - for log in block_with_logs.logs { - let ep_address = log.address; - if !block_events.contains_key(&ep_address) { - error!("Received log for unknown entrypoint {ep_address:?}"); - continue; - } - - let txn_hash = log - .transaction_hash - .context("log should have transaction hash")?; - let txn_index = log - .transaction_index - .context("log should have transaction index")?; - - let event = EntryPointEvent { - contract_event: parse_log(log)?, - txn_hash, - txn_index, - }; - - block_events.entry(ep_address).and_modify(|e| { - e.events.push(event); - }); - EventListenerMetrics::increment_events_seen(); - } - - for (ep, block_event) in block_events { - match self.entrypoint_event_broadcasts.get(&ep) { - Some(broadcast) => { - // ignore sender errors, which can only happen if there are no receivers - let _ = broadcast.send(Arc::new(block_event)); - } - None => { - error!("No broadcast channel for entry point: {:?}", ep); - } - } - } - - Ok(()) - } - - fn backoff_time(&self) -> Duration { - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..1000); - let millis = cmp::min(MAX_BACKOFF_SECONDS, 2_u64.pow(self.backoff_idx)) * 1000 + jitter; - Duration::from_millis(millis) - } - - async fn sleep_or_shutdown( - &mut self, - shutdown_token: CancellationToken, - duration: Duration, - ) -> bool { - select! { - _ = sleep(duration) => { - self.backoff_idx += 1; - false - }, - _ = shutdown_token.cancelled() => { - info!("Shutting down event listener"); - true - } - } - } -} - -struct EventListenerMetrics {} - -impl EventListenerMetrics { - fn set_block_height(block_height: u64) { - metrics::gauge!("op_pool_event_listener_block_height", block_height as f64); - } - - fn increment_blocks_seen() { - metrics::increment_counter!("op_pool_event_listener_blocks_seen"); - } - - fn increment_provider_errors() { - metrics::increment_counter!("op_pool_event_listener_provider_errors"); - } - - fn increment_block_handler_errors() { - metrics::increment_counter!("op_pool_event_listener_block_handler_errors"); - } - - fn increment_events_seen() { - metrics::increment_counter!("op_pool_event_listener_events_seen"); - } -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicBool, Ordering}; - - use ethers::{ - abi::{encode, Token}, - contract::EthEvent, - types::{Block, Log, H256, U256, U64}, - }; - use tokio::{ - join, - sync::{broadcast, mpsc}, - task::JoinHandle, - }; - - use super::*; - use crate::{ - common::contracts::i_entry_point::{IEntryPointEvents, UserOperationEventFilter}, - op_pool::event::{mock::MockBlockProviderFactory, BlockProviderError}, - }; - - #[tokio::test] - async fn start_stop() { - let state = setup().await; - teardown(state).await; - } - - #[tokio::test] - async fn single_block_no_logs() { - let mut state = setup().await; - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - #[tokio::test] - async fn single_block_with_log() { - let mut state = setup().await; - let (event, log) = make_random_log(state.ep); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![log], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - assert_eq!(block_event.events.len(), 1); - - if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event - { - assert_eq!(block_event, &event); - } else { - panic!("wrong event type"); - } - - teardown(state).await; - } - - #[tokio::test] - async fn multile_blocks_with_log() { - let mut state = setup().await; - - for n in 0..5 { - // send a block to the listener - let (event, log) = make_random_log(state.ep); - let block_hash = H256::random(); - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(block_hash), - number: Some(U64::from(n)), - ..Default::default() - }, - logs: vec![log], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::from(n)); - assert_eq!(block_event.events.len(), 1); - - if let IEntryPointEvents::UserOperationEventFilter(block_event) = - &block_event.events[0].contract_event - { - assert_eq!(block_event, &event); - } else { - panic!("wrong event type"); - } - } - - teardown(state).await; - } - - #[tokio::test] - async fn reconnect_once() { - let mut state = setup().await; - - // send an error to the listener - state - .tx - .send(Err(BlockProviderError::ConnectionError( - "error".to_string(), - ))) - .unwrap(); - - // wait for the listener to reconnect - state.connection_event_rx.recv().await.unwrap(); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - #[tokio::test] - async fn reconnect_multiple() { - let mut state = setup().await; - - state.set_fail_connection(); - // send an initial error to the listener, causing reconnect - state - .tx - .send(Err(BlockProviderError::ConnectionError( - "error".to_string(), - ))) - .unwrap(); - - // wait for the listener to reconnect 2 times, then allow through - for _ in 0..2 { - state.connection_event_rx.recv().await.unwrap(); - } - state.unset_fail_connection(); - - // wait for the listener to connect - state.connection_event_rx.recv().await.unwrap(); - - // send a block to the listener - state - .tx - .send(Ok(BlockWithLogs { - block: Block { - hash: Some(H256::zero()), - number: Some(U64::zero()), - ..Default::default() - }, - logs: vec![], - })) - .unwrap(); - - // receive the block from the listener - let block_event = state.events.recv().await.unwrap(); - assert_eq!(block_event.number, U64::zero()); - - teardown(state).await; - } - - struct TestState { - ep: Address, - tx: broadcast::Sender>, - shutdown_token: CancellationToken, - handle: JoinHandle>, - events: broadcast::Receiver>, - connection_event_rx: mpsc::Receiver<()>, - should_fail_connection: Arc, - } - - impl TestState { - fn set_fail_connection(&self) { - self.should_fail_connection.store(true, Ordering::SeqCst); - } - - fn unset_fail_connection(&self) { - self.should_fail_connection.store(false, Ordering::SeqCst); - } - } - - async fn setup() -> TestState { - let (tx, rx) = broadcast::channel(100); - let shutdown_token = CancellationToken::new(); - let (connection_event_tx, mut connection_event_rx) = mpsc::channel(100); - let should_fail_connection = Arc::new(AtomicBool::new(false)); - let factory = - MockBlockProviderFactory::new(rx, connection_event_tx, should_fail_connection.clone()); - let ep = Address::random(); - let listener = EventListener::new(factory, vec![&ep]); - let events = listener.subscribe_by_entrypoint(ep).unwrap(); - let listener_shutdown = shutdown_token.clone(); - let handle = - tokio::spawn(async move { listener.listen_with_shutdown(listener_shutdown).await }); - - // wait for the listener to connect - connection_event_rx.recv().await.unwrap(); - - TestState { - ep, - tx, - shutdown_token, - handle, - events, - connection_event_rx, - should_fail_connection, - } - } - - async fn teardown(state: TestState) { - // send a shutdown signal - state.shutdown_token.cancel(); - // wait for the listener to shutdown - join!(state.handle).0.unwrap().unwrap(); - } - - fn make_random_log(ep: Address) -> (UserOperationEventFilter, Log) { - let hash = H256::random(); - let sender = Address::random(); - let paymaster = Address::random(); - - // UserOperationEvent([INDEXED]bytes32,address,address,[NON_INDEXED]uint256,bool,uint256,uint256) - let log = Log { - address: ep, - topics: vec![ - UserOperationEventFilter::signature(), - hash, - address_to_topic(sender), - address_to_topic(paymaster), - ], - data: encode(&[ - Token::Uint(U256::zero()), - Token::Bool(false), - Token::Uint(U256::zero()), - Token::Uint(U256::zero()), - ]) - .into(), - block_hash: Some(H256::zero()), - block_number: Some(U64::zero()), - transaction_hash: Some(H256::zero()), - transaction_index: Some(U64::zero()), - ..Default::default() - }; - let event: UserOperationEventFilter = parse_log(log.clone()).unwrap(); - (event, log) - } - - fn address_to_topic(src: Address) -> H256 { - let mut bytes = [0; 32]; - bytes[12..32].copy_from_slice(src.as_bytes()); - H256::from(bytes) - } -} diff --git a/src/op_pool/event/mock.rs b/src/op_pool/event/mock.rs deleted file mode 100644 index c6ddd4ea2..000000000 --- a/src/op_pool/event/mock.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use ethers::types::Filter; -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt}; -use tonic::async_trait; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -pub struct MockBlockProvider { - rx: broadcast::Receiver>, - fail_subscription: bool, - subscription_event: mpsc::Sender<()>, -} - -#[async_trait] -impl BlockProvider for MockBlockProvider { - type BlockStream = - Box> + Send + Unpin>; - - async fn subscribe( - &mut self, - _filter: Filter, - ) -> Result { - let ret = match self.fail_subscription { - true => Err(BlockProviderError::ConnectionError( - "forced error".to_string(), - )), - false => Ok(Box::new( - BroadcastStream::new(self.rx.resubscribe()).map(|res| match res { - Ok(res) => res, - Err(_) => Err(BlockProviderError::ConnectionError( - "strem recv error".to_string(), - )), - }), - ) as Self::BlockStream), - }; - self.subscription_event - .send(()) - .await - .expect("failed to send connection event"); - ret - } - - async fn unsubscribe(&mut self) {} -} - -pub struct MockBlockProviderFactory { - rx: broadcast::Receiver>, - connection_event_tx: mpsc::Sender<()>, - should_fail_connection: Arc, -} - -impl BlockProviderFactory for MockBlockProviderFactory { - type Provider = MockBlockProvider; - - fn new_provider(&self) -> Self::Provider { - MockBlockProvider { - fail_subscription: self.should_fail_connection.load(Ordering::SeqCst), - rx: self.rx.resubscribe(), - subscription_event: self.connection_event_tx.clone(), - } - } -} - -impl MockBlockProviderFactory { - pub fn new( - rx: broadcast::Receiver>, - connection_event_tx: mpsc::Sender<()>, - should_fail_connection: Arc, - ) -> Self { - Self { - rx, - connection_event_tx, - should_fail_connection, - } - } -} diff --git a/src/op_pool/event/mod.rs b/src/op_pool/event/mod.rs deleted file mode 100644 index 974d107c2..000000000 --- a/src/op_pool/event/mod.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::sync::Arc; - -use ethers::types::{Address, Block, Filter, Log, H256, U64}; -use tokio::{sync::broadcast, task::JoinHandle}; -use tokio_util::sync::CancellationToken; -use tonic::async_trait; - -use crate::common::contracts::i_entry_point::IEntryPointEvents; - -mod listener; -pub use listener::EventListener; -#[cfg(test)] -mod mock; -mod ws; -pub use ws::WsBlockProviderFactory; -mod http; -pub use http::HttpBlockProviderFactory; - -/// Event when a new block is mined. -/// Events correspond to a single entry point -#[derive(Debug)] -pub struct NewBlockEvent { - /// The entry point address - pub address: Address, - /// The block hash - pub hash: H256, - /// The block number - pub number: U64, - /// Ordered EntryPoint events - pub events: Vec, -} - -/// An event emitted by an entry point with metadata -#[derive(Debug)] -pub struct EntryPointEvent { - /// The entry point contract event - pub contract_event: IEntryPointEvents, - /// The transaction hash that emitted the event - pub txn_hash: H256, - /// The transaction index that emitted the event - pub txn_index: U64, -} - -/// A trait that provides a stream of new blocks with their events by entrypoint -pub trait EventProvider: Send + Sync { - /// Subscribe to new blocks by entrypoint - fn subscribe_by_entrypoint( - &self, - entry_point: Address, - ) -> Option>>; - - /// Spawn the event provider - fn spawn( - self: Box, - shutdown_token: CancellationToken, - ) -> JoinHandle>; -} - -/// A factory that creates a new event provider -#[async_trait] -pub trait BlockProviderFactory: Send + Sync + 'static { - type Provider: BlockProvider + Send + Sync + 'static; - - /// Create a new block provider - fn new_provider(&self) -> Self::Provider; -} - -/// Block provider errors -#[derive(Debug, Clone, thiserror::Error)] -pub enum BlockProviderError { - #[error("Connection error: {0}")] - ConnectionError(String), - #[error("Rpc error: {0}")] - RpcError(String), -} - -/// A block with its logs correspoinding to a filter -/// given to a block provider -#[derive(Debug, Clone)] -pub struct BlockWithLogs { - /// The block - block: Block, - /// The logs that correspond to the filter - logs: Vec, -} - -/// A trait that provides a stream of blocks -#[async_trait] -pub trait BlockProvider { - type BlockStream: tokio_stream::Stream> - + Send - + Unpin; - - /// Subscribe to a block stream - async fn subscribe(&mut self, filter: Filter) -> Result; - - /// Unsubscribe from a block stream - async fn unsubscribe(&mut self); -} diff --git a/src/op_pool/event/ws.rs b/src/op_pool/event/ws.rs deleted file mode 100644 index fb3914fda..000000000 --- a/src/op_pool/event/ws.rs +++ /dev/null @@ -1,144 +0,0 @@ -use ethers::{ - providers::{Middleware, Provider, StreamExt, Ws}, - types::{Block, Filter, H256}, -}; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::async_trait; - -use super::{BlockProvider, BlockProviderError, BlockProviderFactory, BlockWithLogs}; - -/// A block provider factory that uses an ethers websocket provider -/// to stream blocks from a node's websocket endpoint -pub struct WsBlockProviderFactory { - ws_url: String, - num_retries: usize, -} - -impl WsBlockProviderFactory { - pub fn new(ws_url: String, num_retries: usize) -> Self { - Self { - ws_url, - num_retries, - } - } -} - -#[async_trait] -impl BlockProviderFactory for WsBlockProviderFactory { - type Provider = WsBlockProvider; - - fn new_provider(&self) -> Self::Provider { - WsBlockProvider::new(self.ws_url.clone(), self.num_retries) - } -} - -/// A block provider that uses an ethers websocket provider -#[derive(Debug)] -pub struct WsBlockProvider { - ws_url: String, - num_retries: usize, - shutdown_sender: Option>, -} - -impl WsBlockProvider { - pub fn new(ws_url: String, num_retries: usize) -> Self { - Self { - ws_url, - num_retries, - shutdown_sender: None, - } - } -} - -#[async_trait] -impl BlockProvider for WsBlockProvider { - type BlockStream = ReceiverStream>; - - async fn subscribe(&mut self, filter: Filter) -> Result { - if self.shutdown_sender.is_some() { - return Err(BlockProviderError::ConnectionError( - "already subscribed".to_string(), - )); - } - - let provider = - Provider::::connect_with_reconnects(self.ws_url.clone(), self.num_retries) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - - let (tx, rx) = mpsc::channel(10_000); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - self.shutdown_sender = Some(shutdown_tx); - - tokio::spawn(async move { - let mut block_subscription = match provider.subscribe_blocks().await { - Ok(sub) => sub, - Err(err) => { - tracing::error!("Error subscribing to blocks: {:?}", err); - if tx - .send(Err(BlockProviderError::ConnectionError(err.to_string()))) - .await - .is_err() - { - tracing::error!("Receiver dropped"); - } - return; - } - }; - - loop { - tokio::select! { - res = block_subscription.next() => { - let msg = match res { - Some(block) => { - Self::process_block(&provider, block, &filter).await - } - None => { - tracing::error!("Block subscription ended"); - Err(BlockProviderError::ConnectionError("block subscription closed".to_string())) - } - }; - let was_err = msg.is_err(); - if tx.send(msg).await.is_err() { - tracing::error!("Receiver dropped"); - return; - } - if was_err { - return; - } - }, - _ = &mut shutdown_rx => { - return; - } - } - } - }); - - Ok(ReceiverStream::new(rx)) - } - - async fn unsubscribe(&mut self) { - let shutdown_sender = self.shutdown_sender.take(); - if let Some(ss) = shutdown_sender { - let _ = ss.send(()); - } - } -} - -impl WsBlockProvider { - async fn process_block( - provider: &Provider, - block: Block, - filter: &Filter, - ) -> Result { - let block_hash = block.hash.unwrap_or_default(); - let filter = filter.clone().at_block_hash(block_hash); - let logs = provider - .get_logs(&filter) - .await - .map_err(|err| BlockProviderError::ConnectionError(err.to_string()))?; - let block_with_logs = BlockWithLogs { block, logs }; - Ok(block_with_logs) - } -} diff --git a/src/op_pool/mempool/mod.rs b/src/op_pool/mempool/mod.rs index e720cb68f..0e027d3ec 100644 --- a/src/op_pool/mempool/mod.rs +++ b/src/op_pool/mempool/mod.rs @@ -9,8 +9,10 @@ use ethers::types::{Address, H256}; use strum::IntoEnumIterator; use self::error::MempoolResult; -use super::{event::NewBlockEvent, reputation::Reputation}; -use crate::common::types::{Entity, EntityType, UserOperation, ValidTimeRange}; +use crate::{ + common::types::{Entity, EntityType, UserOperation, ValidTimeRange}, + op_pool::{chain::ChainUpdate, reputation::Reputation}, +}; /// In-memory operation pool pub trait Mempool: Send + Sync { @@ -19,8 +21,8 @@ pub trait Mempool: Send + Sync { /// Event listener for when a new block is mined. /// - /// Pool is updated according to the new blocks events. - fn on_new_block(&self, event: &NewBlockEvent); + /// Pool is updated according to the chain update event. + fn on_chain_update(&self, update: &ChainUpdate); /// Adds a validated user operation to the pool. /// @@ -94,6 +96,9 @@ pub enum OperationOrigin { Local, /// The operation was discovered via the P2P gossip protocol. External, + /// The operation was returned to the pool when the block it was in was + /// reorged away. + ReturnedAfterReorg, } /// A user operation with additional metadata from validation. diff --git a/src/op_pool/mempool/pool.rs b/src/op_pool/mempool/pool.rs index fa20d11b2..07705d36b 100644 --- a/src/op_pool/mempool/pool.rs +++ b/src/op_pool/mempool/pool.rs @@ -9,6 +9,7 @@ use ethers::{ abi::Address, types::{H256, U256}, }; +use tracing::info; use super::{ error::{MempoolError, MempoolResult}, @@ -23,19 +24,26 @@ use crate::common::{ /// Pool of user operations #[derive(Debug)] pub struct PoolInner { - // Pool settings + /// Pool settings config: PoolConfig, - // Operations by hash + /// Operations by hash by_hash: HashMap, - // Operations by operation ID + /// Operations by operation ID by_id: HashMap, - // Best operations, sorted by gas price + /// Best operations, sorted by gas price best: BTreeSet, - // Count of operations by sender + /// Removed operations, temporarily kept around in case their blocks are + /// reorged away. Stored along with the block number at which it was + /// removed. + mined_at_block_number_by_hash: HashMap, + /// Removed operation hashes sorted by block number, so we can forget them + /// when enough new blocks have passed. + mined_hashes_with_block_numbers: BTreeSet<(u64, H256)>, + /// Count of operations by sender count_by_address: HashMap, - // Submission ID counter + /// Submission ID counter submission_id: u64, - // keeps track of the size of the pool in bytes + /// keeps track of the size of the pool in bytes size: SizeTracker, } @@ -46,6 +54,8 @@ impl PoolInner { by_hash: HashMap::new(), by_id: HashMap::new(), best: BTreeSet::new(), + mined_at_block_number_by_hash: HashMap::new(), + mined_hashes_with_block_numbers: BTreeSet::new(), count_by_address: HashMap::new(), submission_id: 0, size: SizeTracker::default(), @@ -53,6 +63,18 @@ impl PoolInner { } pub fn add_operation(&mut self, op: PoolOperation) -> MempoolResult { + self.add_operation_internal(Arc::new(op), None) + } + + fn put_back_unmined_operation(&mut self, op: OrderedPoolOperation) -> MempoolResult { + self.add_operation_internal(op.po, Some(op.submission_id)) + } + + fn add_operation_internal( + &mut self, + op: Arc, + submission_id: Option, + ) -> MempoolResult { // Check for replacement by ID if let Some(pool_op) = self.by_id.get(&op.uo.id()) { if op.uo.max_fee_per_gas > u128::MAX.into() @@ -99,8 +121,8 @@ impl PoolInner { } let pool_op = OrderedPoolOperation { - po: Arc::new(op), - submission_id: self.next_submission_id(), + po: op, + submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()), }; // update counts @@ -149,21 +171,45 @@ impl PoolInner { } pub fn remove_operation_by_hash(&mut self, hash: H256) -> Option> { - if let Some(op) = self.by_hash.remove(&hash) { - self.by_id.remove(&op.uo().id()); - self.best.remove(&op); + self.remove_operation_internal(hash, None) + } - for e in op.po.entities() { - self.decrement_address_count(e.address); - } + pub fn mine_operation(&mut self, hash: H256, block_number: u64) -> Option> { + self.remove_operation_internal(hash, Some(block_number)) + } - self.size -= op.size(); - metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string()); - metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string()); - return Some(op.po); + fn remove_operation_internal( + &mut self, + hash: H256, + block_number: Option, + ) -> Option> { + let op = self.by_hash.remove(&hash)?; + self.by_id.remove(&op.uo().id()); + self.best.remove(&op); + if let Some(block_number) = block_number { + self.mined_at_block_number_by_hash + .insert(hash, (op.clone(), block_number)); + self.mined_hashes_with_block_numbers + .insert((block_number, hash)); + } + for e in op.po.entities() { + self.decrement_address_count(e.address); } - None + self.size -= op.size(); + metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string()); + metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string()); + Some(op.po) + } + + pub fn unmine_operation(&mut self, hash: H256) -> Option> { + let (op, block_number) = self.mined_at_block_number_by_hash.remove(&hash)?; + self.mined_hashes_with_block_numbers + .remove(&(block_number, hash)); + if let Err(error) = self.put_back_unmined_operation(op.clone()) { + info!("Could not put back unmined operation: {error}"); + }; + Some(op.po) } /// Removes all operations using the given entity, returning the hashes of @@ -181,10 +227,23 @@ impl PoolInner { to_remove } + pub fn forget_mined_operations_before_block(&mut self, block_number: u64) { + while let Some(&(bn, hash)) = self + .mined_hashes_with_block_numbers + .first() + .filter(|(bn, _)| *bn < block_number) + { + self.mined_at_block_number_by_hash.remove(&hash); + self.mined_hashes_with_block_numbers.remove(&(bn, hash)); + } + } + pub fn clear(&mut self) { self.by_hash.clear(); self.by_id.clear(); self.best.clear(); + self.mined_at_block_number_by_hash.clear(); + self.mined_hashes_with_block_numbers.clear(); self.count_by_address.clear(); self.size = SizeTracker::default(); } diff --git a/src/op_pool/mempool/uo_pool.rs b/src/op_pool/mempool/uo_pool.rs index c26806a64..e6199d4c8 100644 --- a/src/op_pool/mempool/uo_pool.rs +++ b/src/op_pool/mempool/uo_pool.rs @@ -7,6 +7,7 @@ use ethers::types::{Address, H256}; use parking_lot::RwLock; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; +use tracing::info; use super::{ error::{MempoolError, MempoolResult}, @@ -14,10 +15,10 @@ use super::{ Mempool, OperationOrigin, PoolConfig, PoolOperation, }; use crate::{ - common::{contracts::i_entry_point::IEntryPointEvents, emit::WithEntryPoint, types::Entity}, + common::{emit::WithEntryPoint, types::Entity}, op_pool::{ + chain::ChainUpdate, emit::{EntityReputation, EntityStatus, EntitySummary, OpPoolEvent, OpRemovalReason}, - event::NewBlockEvent, reputation::{Reputation, ReputationManager, ReputationStatus}, }, }; @@ -68,7 +69,7 @@ where pub async fn run( self: Arc, - mut new_block_events: broadcast::Receiver>, + mut chain_events: broadcast::Receiver>, shutdown_token: CancellationToken, ) { loop { @@ -77,9 +78,9 @@ where tracing::info!("Shutting down UoPool"); break; } - new_block = new_block_events.recv() => { - if let Ok(new_block) = new_block { - self.on_new_block(&new_block); + update = chain_events.recv() => { + if let Ok(update) = update { + self.on_chain_update(&update); } } } @@ -102,42 +103,68 @@ where self.entry_point } - fn on_new_block(&self, new_block: &NewBlockEvent) { + fn on_chain_update(&self, update: &ChainUpdate) { let mut state = self.state.write(); - tracing::info!( - "New block: {:?} with {} entrypoint events", - new_block.number, - new_block.events.len() - ); - for event in &new_block.events { - if let IEntryPointEvents::UserOperationEventFilter(uo_event) = &event.contract_event { - let op_hash = uo_event.user_op_hash.into(); - if let Some(op) = state.pool.remove_operation_by_hash(op_hash) { - for e in op.staked_entities() { - self.reputation.add_included(e.address); - } + let mined_ops: Vec<_> = update + .mined_ops + .iter() + .filter(|op| op.entry_point == self.entry_point) + .copied() + .collect(); + let unmined_ops: Vec<_> = update + .unmined_ops + .iter() + .filter(|op| op.entry_point == self.entry_point) + .copied() + .collect(); + if !mined_ops.is_empty() { + info!( + "{} op(s) mined on entry point {:?}.", + mined_ops.len(), + self.entry_point, + ); + } + if !unmined_ops.is_empty() { + info!( + "{} op(s) unmined in reorg on entry point {:?}.", + unmined_ops.len(), + self.entry_point + ) + } + for op in mined_ops { + // Remove throttled ops that were included in the block + state.throttled_ops.remove(&op.hash); + if let Some(op) = state + .pool + .mine_operation(op.hash, update.latest_block_number) + { + for entity in op.staked_entities() { + self.reputation.add_included(entity.address); } - - // Remove throttled ops that were included in the block - state.throttled_ops.remove(&op_hash); } } - + for op in unmined_ops { + if let Some(op) = state.pool.unmine_operation(op.hash) { + for entity in op.staked_entities() { + self.reputation.remove_included(entity.address); + } + } + } + state + .pool + .forget_mined_operations_before_block(update.earliest_remembered_block_number); // Remove throttled ops that are too old - let new_block_number = new_block.number.as_u64(); let mut to_remove = HashSet::new(); for (hash, block) in state.throttled_ops.iter() { - if new_block_number - block > THROTTLED_OPS_BLOCK_LIMIT { + if update.latest_block_number - block > THROTTLED_OPS_BLOCK_LIMIT { to_remove.insert(*hash); } } - for hash in to_remove { state.pool.remove_operation_by_hash(hash); state.throttled_ops.remove(&hash); } - - state.block_number = new_block_number; + state.block_number = update.latest_block_number; } fn add_operation(&self, origin: OperationOrigin, op: PoolOperation) -> MempoolResult { @@ -372,6 +399,8 @@ mod tests { fn add_included(&self, _address: Address) {} + fn remove_included(&self, _address: Address) {} + fn dump_reputation(&self) -> Vec { vec![] } diff --git a/src/op_pool/mod.rs b/src/op_pool/mod.rs index 47862bbad..8f54276a9 100644 --- a/src/op_pool/mod.rs +++ b/src/op_pool/mod.rs @@ -1,5 +1,5 @@ +mod chain; pub mod emit; -mod event; mod mempool; mod reputation; mod server; diff --git a/src/op_pool/reputation.rs b/src/op_pool/reputation.rs index b2de9ac59..aaf3370d8 100644 --- a/src/op_pool/reputation.rs +++ b/src/op_pool/reputation.rs @@ -30,12 +30,18 @@ pub trait ReputationManager: Send + Sync { /// Called by mempool before returning operations to bundler fn status(&self, address: Address) -> ReputationStatus; - /// Called by mempool when an operation that requires stake is added to the pool + /// Called by mempool when an operation that requires stake is added to the + /// pool fn add_seen(&self, address: Address); - /// Called by the mempool when an operation that requires stake is removed from the pool + /// Called by the mempool when an operation that requires stake is removed + /// from the pool fn add_included(&self, address: Address); + /// Called by the mempool when a previously mined operation that requires + /// stake is returned to the pool. + fn remove_included(&self, address: Address); + /// Called by debug API fn dump_reputation(&self) -> Vec; @@ -78,14 +84,18 @@ impl ReputationManager for HourlyMovingAverageReputation { self.reputation.read().status(address) } - fn add_seen<'a>(&self, address: Address) { + fn add_seen(&self, address: Address) { self.reputation.write().add_seen(address); } - fn add_included<'a>(&self, address: Address) { + fn add_included(&self, address: Address) { self.reputation.write().add_included(address); } + fn remove_included(&self, address: Address) { + self.reputation.write().remove_included(address); + } + fn dump_reputation(&self) -> Vec { let reputation = self.reputation.read(); reputation @@ -193,6 +203,11 @@ impl AddressReputation { count.ops_included += 1; } + pub fn remove_included(&mut self, address: Address) { + let count = self.counts.entry(address).or_default(); + count.ops_included = count.ops_included.saturating_sub(1) + } + pub fn set_reputation(&mut self, address: Address, ops_seen: u64, ops_included: u64) { let count = self.counts.entry(address).or_default(); count.ops_seen = ops_seen; diff --git a/src/op_pool/server.rs b/src/op_pool/server.rs index a2e43c49f..09663ab81 100644 --- a/src/op_pool/server.rs +++ b/src/op_pool/server.rs @@ -277,7 +277,7 @@ mod tests { use crate::{ common::types::Entity, op_pool::{ - event::NewBlockEvent, + chain::ChainUpdate, mempool::{error::MempoolResult, PoolOperation}, reputation::Reputation, }, @@ -359,7 +359,7 @@ mod tests { self.entry_point } - fn on_new_block(&self, _event: &NewBlockEvent) {} + fn on_chain_update(&self, _update: &ChainUpdate) {} fn add_operation( &self, diff --git a/src/op_pool/task.rs b/src/op_pool/task.rs index 88f749afa..7f86e7997 100644 --- a/src/op_pool/task.rs +++ b/src/op_pool/task.rs @@ -9,13 +9,15 @@ use tonic::{async_trait, transport::Server}; use crate::{ common::{ emit::WithEntryPoint, + eth, grpc::metrics::GrpcMetricsLayer, - handle::{flatten_handle, Task}, + handle, + handle::Task, protos::op_pool::{op_pool_server::OpPoolServer, OP_POOL_FILE_DESCRIPTOR_SET}, }, op_pool::{ + chain::{self, Chain, ChainUpdate}, emit::OpPoolEvent, - event::{EventListener, EventProvider, HttpBlockProviderFactory, WsBlockProviderFactory}, mempool::{uo_pool::UoPool, Mempool, PoolConfig}, reputation::{HourlyMovingAverageReputation, ReputationParams}, server::OpPoolImpl, @@ -26,10 +28,10 @@ use crate::{ pub struct Args { pub port: u16, pub host: String, - pub ws_url: Option, - pub http_url: Option, + pub http_url: String, pub http_poll_interval: Duration, pub chain_id: u64, + pub chain_history_size: u64, pub pool_configs: Vec, } @@ -44,26 +46,26 @@ impl Task for PoolTask { async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> { let addr = format!("{}:{}", self.args.host, self.args.port).parse()?; let chain_id = self.args.chain_id; - let entry_points = self.args.pool_configs.iter().map(|pc| &pc.entry_point); tracing::info!("Starting server on {addr}"); tracing::info!("Chain id: {chain_id}"); - tracing::info!("Websocket url: {:?}", self.args.ws_url); tracing::info!("Http url: {:?}", self.args.http_url); - // Events listener - let event_provider: Box = if let Some(ws_url) = &self.args.ws_url { - let connection_factory = WsBlockProviderFactory::new(ws_url.to_owned(), 10); - Box::new(EventListener::new(connection_factory, entry_points)) - } else if let Some(http_url) = &self.args.http_url { - let connection_factory = HttpBlockProviderFactory::new( - http_url.to_owned(), - self.args.http_poll_interval, - 10, - ); - Box::new(EventListener::new(connection_factory, entry_points)) - } else { - bail!("Either ws_url or http_url must be provided"); + // create chain + let chain_settings = chain::Settings { + history_size: self.args.chain_history_size, + max_backfill: self.args.chain_history_size, + poll_interval: self.args.http_poll_interval, + entry_point_addresses: self + .args + .pool_configs + .iter() + .map(|config| config.entry_point) + .collect(), }; + let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?; + let chain = Chain::new(provider, chain_settings); + let (update_sender, _) = broadcast::channel(1000); + let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone()); // create mempools let mut mempools = Vec::new(); @@ -71,7 +73,7 @@ impl Task for PoolTask { for pool_config in &self.args.pool_configs { let (pool, handle) = PoolTask::create_mempool( pool_config, - event_provider.as_ref(), + update_sender.subscribe(), self.event_sender.clone(), shutdown_token.clone(), ) @@ -96,9 +98,6 @@ impl Task for PoolTask { .context("should have joined mempool handles") }); - // Start events listener - let events_provider_handle = event_provider.spawn(shutdown_token.clone()); - // gRPC server let op_pool_server = OpPoolServer::new(OpPoolImpl::new(chain_id, mempool_map)); let reflection_service = tonic_reflection::server::Builder::configure() @@ -126,9 +125,9 @@ impl Task for PoolTask { tracing::info!("Started op_pool"); match try_join!( - flatten_handle(mempool_handle), - flatten_handle(server_handle), - flatten_handle(events_provider_handle) + handle::flatten_handle(mempool_handle), + handle::flatten_handle(server_handle), + handle::as_anyhow_handle(chain_handle), ) { Ok(_) => { tracing::info!("Pool server shutdown"); @@ -156,11 +155,10 @@ impl PoolTask { async fn create_mempool( pool_config: &PoolConfig, - event_provider: &dyn EventProvider, + update_rx: broadcast::Receiver>, event_sender: broadcast::Sender>, shutdown_token: CancellationToken, ) -> anyhow::Result<(Arc>, JoinHandle<()>)> { - let entry_point = pool_config.entry_point; // Reputation manager let reputation = Arc::new(HourlyMovingAverageReputation::new( ReputationParams::bundler_default(), @@ -177,15 +175,9 @@ impl PoolTask { Arc::clone(&reputation), event_sender, )); - // Start mempool - let mempool_events = event_provider - .subscribe_by_entrypoint(entry_point) - .context("event listener should have entrypoint subscriber")?; let mp_runner = Arc::clone(&mp); let handle = - tokio::spawn( - async move { mp_runner.run(mempool_events, shutdown_token.clone()).await }, - ); + tokio::spawn(async move { mp_runner.run(update_rx, shutdown_token.clone()).await }); Ok((mp, handle)) }