Skip to content

Commit

Permalink
feat(oppool): Return reorged ops to the mempool
Browse files Browse the repository at this point in the history
Previously, ops could be lost forever if they were mined into a block,
but then that block got reorged away. Now, we detect reorgs and return
any ops contained in them to our mempool.

This requires some involved changes:

* We replace the entire `events` mod and all its listeners. Instead, we
  introduce a new type, `Chain`, which represents our current knowledge
  of what blocks make up the blockchain.
* We watch for the latest block hash to change. When it does, we read
  backwards from the latest block until we connect back to our known
  blocks, which lets us see any blocks that were replaced as well.
* This process produces a `ChainUpdate` event, which is sent to the op
  pool (replacing `NewBlockEvent`). This may cause the op pool to not
  only remove mined ops but also restore unmined ops.
* In order for the op pool to do so, it remembers mined ops for a time
  rather than deleting them fully. We add new methods to the op pool for
  restoring unmined blocks to make this possible.
  • Loading branch information
dphilipson committed Aug 3, 2023
1 parent 716844a commit fef2b62
Show file tree
Hide file tree
Showing 26 changed files with 1,273 additions and 1,366 deletions.
30 changes: 8 additions & 22 deletions src/builder/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use anyhow::{bail, Context};
use ethers::{
providers::{Http, Middleware, Provider, RetryClient},
providers::{Http, Provider, RetryClient},
types::{transaction::eip2718::TypedTransaction, Address, H256, U256},
};
use tokio::{join, sync::broadcast, time};
Expand All @@ -22,6 +22,7 @@ use crate::{
transaction_tracker::{SendResult, TrackerUpdate, TransactionTracker},
},
common::{
block_watcher,
emit::WithEntryPoint,
gas::GasFees,
math,
Expand Down Expand Up @@ -136,7 +137,12 @@ where
time::sleep(self.eth_poll_interval).await;
continue;
}
last_block_number = self.wait_for_new_block_number(last_block_number).await;
last_block_number = block_watcher::wait_for_new_block_number(
&*self.provider,
last_block_number,
self.eth_poll_interval,
)
.await;
self.check_for_and_log_transaction_update().await;
let result = self.send_bundle_with_increasing_gas_fees().await;
match result {
Expand Down Expand Up @@ -337,26 +343,6 @@ where
Ok(SendBundleResult::StalledAtMaxFeeIncreases)
}

async fn wait_for_new_block_number(&self, prev_block_number: u64) -> u64 {
loop {
let block_number = self.provider.get_block_number().await;
match block_number {
Ok(n) => {
let n = n.as_u64();
if n > prev_block_number {
return n;
}
}
Err(error) => {
error!(
"Failed to load latest block number in builder. Will keep trying: {error}"
);
}
}
time::sleep(self.eth_poll_interval).await;
}
}

/// Builds a bundle and returns some metadata and the transaction to send
/// it, or `None` if there are no valid operations available.
async fn get_bundle_tx(
Expand Down
28 changes: 5 additions & 23 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder},
types::{Address, H256},
};
use ethers::types::{Address, H256};
use ethers_signers::Signer;
use rusoto_core::Region;
use tokio::{select, sync::broadcast, time};
Expand All @@ -14,7 +11,6 @@ use tonic::{
transport::{Channel, Server},
};
use tracing::info;
use url::Url;

use crate::{
builder::{
Expand All @@ -29,6 +25,7 @@ use crate::{
common::{
contracts::i_entry_point::IEntryPoint,
emit::WithEntryPoint,
eth,
gas::PriorityFeeMode,
handle::{SpawnGuard, Task},
mempool::MempoolConfig,
Expand Down Expand Up @@ -81,7 +78,7 @@ impl Task for BuilderTask {
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);

let provider = new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?;
let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?;
let signer = if let Some(pk) = &self.args.private_key {
info!("Using local signer");
BundlerSigner::Local(
Expand Down Expand Up @@ -139,7 +136,8 @@ impl Task for BuilderTask {
proposer_settings,
self.event_sender.clone(),
);
let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?;
let submit_provider =
eth::new_provider(&self.args.submit_url, self.args.eth_poll_interval)?;
let transaction_sender = get_sender(
submit_provider,
signer,
Expand Down Expand Up @@ -240,19 +238,3 @@ impl BuilderTask {
}
}
}

fn new_provider(
url: &str,
poll_interval: Duration,
) -> anyhow::Result<Arc<Provider<RetryClient<Http>>>> {
let parsed_url = Url::parse(url).context("provider url should be a valid")?;
let http = Http::new(parsed_url);
let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());
Ok(Arc::new(Provider::new(client).interval(poll_interval)))
}
5 changes: 1 addition & 4 deletions src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ impl BuilderArgs {
common.priority_fee_mode_value,
)?;

let rpc_url = common
.node_http
.clone()
.context("should have a node HTTP URL")?;
let rpc_url = common.node_http.clone();
let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone());

let mempool_configs = match &common.mempool_config_path {
Expand Down
6 changes: 1 addition & 5 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ pub struct CommonArgs {
)]
chain_id: u64,

/// ETH Node websocket URL to connect to
#[arg(long = "node_ws", name = "node_ws", env = "NODE_WS", global = true)]
node_ws: Option<String>,

/// ETH Node HTTP URL to connect to
#[arg(
long = "node_http",
name = "node_http",
env = "NODE_HTTP",
global = true
)]
node_http: Option<String>,
node_http: String,

#[arg(
long = "max_verification_gas",
Expand Down
31 changes: 30 additions & 1 deletion src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::Context;
use clap::Args;
use ethers::types::Chain;
use tokio::sync::broadcast;

use super::CommonArgs;
Expand Down Expand Up @@ -83,6 +84,13 @@ pub struct PoolArgs {
env = "POOL_ALLOWLIST_PATH"
)]
pub allowlist_path: Option<String>,

#[arg(
long = "pool.chain_history_size",
name = "pool.chain_history_size",
env = "POOL_CHAIN_HISTORY_SIZE"
)]
pub chain_history_size: Option<u64>,
}

impl PoolArgs {
Expand Down Expand Up @@ -122,14 +130,35 @@ impl PoolArgs {
port: self.port,
host: self.host.clone(),
chain_id: common.chain_id,
ws_url: common.node_ws.clone(),
chain_history_size: self
.chain_history_size
.unwrap_or_else(|| default_chain_history_size(common.chain_id)),
http_url: common.node_http.clone(),
http_poll_interval: Duration::from_millis(self.http_poll_interval_millis),
pool_configs,
})
}
}

const SMALL_HISTORY_SIZE: u64 = 16;
const LARGE_HISTORY_SIZE: u64 = 128;

// Mainnets that are known to not have large reorgs can use the small history
// size. Use the large history size for all testnets because I don't trust them.
const SMALL_HISTORY_CHAIN_IDS: &[u64] = &[
Chain::Mainnet as u64,
Chain::Arbitrum as u64,
Chain::Optimism as u64,
];

fn default_chain_history_size(chain_id: u64) -> u64 {
if SMALL_HISTORY_CHAIN_IDS.contains(&chain_id) {
SMALL_HISTORY_SIZE
} else {
LARGE_HISTORY_SIZE
}
}

/// CLI options for the Pool server standalone
#[derive(Args, Debug)]
pub struct PoolCliArgs {
Expand Down
5 changes: 1 addition & 4 deletions src/cli/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ impl RpcArgs {
.map(|ep| ep.parse())
.collect::<Result<Vec<_>, _>>()
.context("Invalid entry_points argument")?,
rpc_url: common
.node_http
.clone()
.context("rpc requires node_http arg")?,
rpc_url: common.node_http.clone(),
chain_id: common.chain_id,
api_namespaces: apis,
precheck_settings,
Expand Down
53 changes: 53 additions & 0 deletions src/common/block_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Duration;

use ethers::types::{Block, BlockNumber, H256};
use tokio::time;
use tracing::error;

use crate::common::{retry, retry::UnlimitedRetryOpts, types::ProviderLike};

pub async fn wait_for_new_block(
provider: &impl ProviderLike,
last_block_hash: H256,
poll_interval: Duration,
) -> (H256, Block<H256>) {
loop {
let block = retry::with_unlimited_retries(
"watch latest block",
|| provider.get_block(BlockNumber::Latest),
UnlimitedRetryOpts::default(),
)
.await;
let Some(block) = block else {
error!("Latest block should be present when waiting for new block.");
continue;
};
let Some(hash) = block.hash else {
error!("Latest block should have hash.");
continue;
};
if last_block_hash != hash {
return (hash, block);
}
time::sleep(poll_interval).await;
}
}

pub async fn wait_for_new_block_number(
provider: &impl ProviderLike,
last_block_number: u64,
poll_interval: Duration,
) -> u64 {
loop {
let block_number = retry::with_unlimited_retries(
"watch latest block number",
|| provider.get_block_number(),
UnlimitedRetryOpts::default(),
)
.await;
if last_block_number < block_number {
return block_number;
}
time::sleep(poll_interval).await;
}
}
24 changes: 22 additions & 2 deletions src/common/eth.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
use std::{error, future::Future, ops::Deref, sync::Arc};
use std::{error, future::Future, ops::Deref, sync::Arc, time::Duration};

use anyhow::Context;
use ethers::{
abi::{AbiDecode, AbiEncode, RawLog},
contract::{builders::ContractCall, Contract, ContractDeployer, ContractError},
providers::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError},
providers::{
Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, PendingTransaction, Provider,
ProviderError, RetryClient, RetryClientBuilder,
},
types::{
Address, BlockId, Bytes, Eip1559TransactionRequest, Log, Selector, TransactionReceipt, H256,
},
};
use url::Url;

use crate::common::contracts::get_code_hashes::{CodeHashesResult, GETCODEHASHES_BYTECODE};

pub fn new_provider(
url: &str,
poll_interval: Duration,
) -> anyhow::Result<Arc<Provider<RetryClient<Http>>>> {
let parsed_url = Url::parse(url).context("provider url should be valid")?;
let http = Http::new(parsed_url);
let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());
Ok(Arc::new(Provider::new(client).interval(poll_interval)))
}

/// Waits for a pending transaction to be mined, providing appropriate error
/// messages for each point of failure.
pub async fn await_mined_tx<'a, Fut, C, Err>(
Expand Down
11 changes: 8 additions & 3 deletions src/common/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ use tracing::{error, info};
///
/// Flattens the two types of errors that can occur when awaiting a handle.
/// Useful when using tokio::try_join! to await multiple handles.
pub async fn flatten_handle<T>(
handle: JoinHandle<Result<T, anyhow::Error>>,
) -> Result<T, anyhow::Error> {
pub async fn flatten_handle<T>(handle: JoinHandle<anyhow::Result<T>>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err)?,
Err(err) => Err(err).context("handling failed")?,
}
}

/// Converts a JoinHandle result into an `anyhow::Result`. Like
/// `flatten_handle`, useful when using `tokio::try_join!` to await multiple
/// handles.
pub async fn as_anyhow_handle<T>(handle: JoinHandle<T>) -> anyhow::Result<T> {
handle.await.context("handling failed")
}

/// A guard that aborts a spawned task when dropped.
#[derive(Debug)]
pub struct SpawnGuard(AbortHandle);
Expand Down
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block_watcher;
pub mod context;
pub mod contracts;
pub mod dev;
Expand All @@ -10,6 +11,7 @@ pub mod math;
pub mod mempool;
pub mod precheck;
pub mod protos;
pub mod retry;
pub mod server;
pub mod simulation;
pub mod strs;
Expand Down
Loading

0 comments on commit fef2b62

Please sign in to comment.