Skip to content

Commit

Permalink
feat(oppool): Return reorged ops to the mempool
Browse files Browse the repository at this point in the history
Previously, ops could be lost forever if they were mined into a block,
but then that block got reorged away. Now, we detect reorgs and return
any ops contained in them to our mempool.

This requires some involved changes:

* We replace the entire `events` mod and all its listeners. Instead, we
  introduce a new type, `Chain`, which represents our current knowledge
  of what blocks make up the blockchain.
* We watch for the block number to go up. When it does (possibly by more
  than one at once), we read backwards from the latest number until we
  connect back to our known blocks, which lets us see any blocks that
  were replaced as well.
* This process produces a `ChainUpdate` event, which is sent to the op
  pool (replacing `NewBlockEvent`). This may cause the op pool to not
  only remove mined ops but also restore unmined ops.
* In order for the op pool to do so, it remembers mined ops for a time
  rather than deleting them fully. We add new methods to the op pool for
  restoring unmined blocks to make this possible.
  • Loading branch information
dphilipson committed Jul 31, 2023
1 parent 716844a commit b37127d
Show file tree
Hide file tree
Showing 25 changed files with 801 additions and 1,282 deletions.
28 changes: 5 additions & 23 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use anyhow::{bail, Context};
use ethers::{
providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder},
types::{Address, H256},
};
use ethers::types::{Address, H256};
use ethers_signers::Signer;
use rusoto_core::Region;
use tokio::{select, sync::broadcast, time};
Expand All @@ -14,7 +11,6 @@ use tonic::{
transport::{Channel, Server},
};
use tracing::info;
use url::Url;

use crate::{
builder::{
Expand All @@ -29,6 +25,7 @@ use crate::{
common::{
contracts::i_entry_point::IEntryPoint,
emit::WithEntryPoint,
eth,
gas::PriorityFeeMode,
handle::{SpawnGuard, Task},
mempool::MempoolConfig,
Expand Down Expand Up @@ -81,7 +78,7 @@ impl Task for BuilderTask {
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);

let provider = new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?;
let provider = eth::new_provider(&self.args.rpc_url, self.args.eth_poll_interval)?;
let signer = if let Some(pk) = &self.args.private_key {
info!("Using local signer");
BundlerSigner::Local(
Expand Down Expand Up @@ -139,7 +136,8 @@ impl Task for BuilderTask {
proposer_settings,
self.event_sender.clone(),
);
let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?;
let submit_provider =
eth::new_provider(&self.args.submit_url, self.args.eth_poll_interval)?;
let transaction_sender = get_sender(
submit_provider,
signer,
Expand Down Expand Up @@ -240,19 +238,3 @@ impl BuilderTask {
}
}
}

fn new_provider(
url: &str,
poll_interval: Duration,
) -> anyhow::Result<Arc<Provider<RetryClient<Http>>>> {
let parsed_url = Url::parse(url).context("provider url should be a valid")?;
let http = Http::new(parsed_url);
let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());
Ok(Arc::new(Provider::new(client).interval(poll_interval)))
}
5 changes: 1 addition & 4 deletions src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ impl BuilderArgs {
common.priority_fee_mode_value,
)?;

let rpc_url = common
.node_http
.clone()
.context("should have a node HTTP URL")?;
let rpc_url = common.node_http.clone();
let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone());

let mempool_configs = match &common.mempool_config_path {
Expand Down
6 changes: 1 addition & 5 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ pub struct CommonArgs {
)]
chain_id: u64,

/// ETH Node websocket URL to connect to
#[arg(long = "node_ws", name = "node_ws", env = "NODE_WS", global = true)]
node_ws: Option<String>,

/// ETH Node HTTP URL to connect to
#[arg(
long = "node_http",
name = "node_http",
env = "NODE_HTTP",
global = true
)]
node_http: Option<String>,
node_http: String,

#[arg(
long = "max_verification_gas",
Expand Down
27 changes: 26 additions & 1 deletion src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::Context;
use clap::Args;
use ethers::types::Chain;
use tokio::sync::broadcast;

use super::CommonArgs;
Expand Down Expand Up @@ -83,6 +84,13 @@ pub struct PoolArgs {
env = "POOL_ALLOWLIST_PATH"
)]
pub allowlist_path: Option<String>,

#[arg(
long = "pool.chain_history_size",
name = "pool.chain_history_size",
env = "POOL_CHAIN_HISTORY_SIZE"
)]
pub chain_history_size: Option<u64>,
}

impl PoolArgs {
Expand Down Expand Up @@ -122,14 +130,31 @@ impl PoolArgs {
port: self.port,
host: self.host.clone(),
chain_id: common.chain_id,
ws_url: common.node_ws.clone(),
chain_history_size: self
.chain_history_size
.unwrap_or_else(|| default_chain_history_size(common.chain_id)),
http_url: common.node_http.clone(),
http_poll_interval: Duration::from_millis(self.http_poll_interval_millis),
pool_configs,
})
}
}

const SMALL_HISTORY_SIZE: u64 = 16;
const LARGE_HISTORY_SIZE: u64 = 128;

// Mainnets that are known to not have large reorgs can use the small history
// size. Use the large history size for all testnets because I don't trust them.
const SMALL_HISTORY_CHAIN_IDS: &[u64] = &[Chain::Mainnet as u64, Chain::Arbitrum as u64];

fn default_chain_history_size(chain_id: u64) -> u64 {
if SMALL_HISTORY_CHAIN_IDS.contains(&chain_id) {
SMALL_HISTORY_SIZE
} else {
LARGE_HISTORY_SIZE
}
}

/// CLI options for the Pool server standalone
#[derive(Args, Debug)]
pub struct PoolCliArgs {
Expand Down
5 changes: 1 addition & 4 deletions src/cli/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ impl RpcArgs {
.map(|ep| ep.parse())
.collect::<Result<Vec<_>, _>>()
.context("Invalid entry_points argument")?,
rpc_url: common
.node_http
.clone()
.context("rpc requires node_http arg")?,
rpc_url: common.node_http.clone(),
chain_id: common.chain_id,
api_namespaces: apis,
precheck_settings,
Expand Down
24 changes: 24 additions & 0 deletions src/common/block_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::time::Duration;

use tokio::time;

use crate::common::{retry, retry::UnlimitedRetryOpts, types::ProviderLike};

pub async fn wait_for_new_block_number(
provider: &impl ProviderLike,
last_block_number: u64,
poll_interval: Duration,
) -> u64 {
loop {
let block_number = retry::with_unlimited_retries(
"watch latest block number",
|| provider.get_block_number(),
UnlimitedRetryOpts::default(),
)
.await;
if last_block_number < block_number {
return block_number;
}
time::sleep(poll_interval).await;
}
}
24 changes: 22 additions & 2 deletions src/common/eth.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
use std::{error, future::Future, ops::Deref, sync::Arc};
use std::{error, future::Future, ops::Deref, sync::Arc, time::Duration};

use anyhow::Context;
use ethers::{
abi::{AbiDecode, AbiEncode, RawLog},
contract::{builders::ContractCall, Contract, ContractDeployer, ContractError},
providers::{JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError},
providers::{
Http, HttpRateLimitRetryPolicy, JsonRpcClient, Middleware, PendingTransaction, Provider,
ProviderError, RetryClient, RetryClientBuilder,
},
types::{
Address, BlockId, Bytes, Eip1559TransactionRequest, Log, Selector, TransactionReceipt, H256,
},
};
use url::Url;

use crate::common::contracts::get_code_hashes::{CodeHashesResult, GETCODEHASHES_BYTECODE};

pub fn new_provider(
url: &str,
poll_interval: Duration,
) -> anyhow::Result<Arc<Provider<RetryClient<Http>>>> {
let parsed_url = Url::parse(url).context("provider url should be valid")?;
let http = Http::new(parsed_url);
let client = RetryClientBuilder::default()
// these retries are if the server returns a 429
.rate_limit_retries(10)
// these retries are if the connection is dubious
.timeout_retries(3)
.initial_backoff(Duration::from_millis(500))
.build(http, Box::<HttpRateLimitRetryPolicy>::default());
Ok(Arc::new(Provider::new(client).interval(poll_interval)))
}

/// Waits for a pending transaction to be mined, providing appropriate error
/// messages for each point of failure.
pub async fn await_mined_tx<'a, Fut, C, Err>(
Expand Down
11 changes: 8 additions & 3 deletions src/common/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ use tracing::{error, info};
///
/// Flattens the two types of errors that can occur when awaiting a handle.
/// Useful when using tokio::try_join! to await multiple handles.
pub async fn flatten_handle<T>(
handle: JoinHandle<Result<T, anyhow::Error>>,
) -> Result<T, anyhow::Error> {
pub async fn flatten_handle<T>(handle: JoinHandle<anyhow::Result<T>>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err)?,
Err(err) => Err(err).context("handling failed")?,
}
}

/// Converts a JoinHandle result into an `anyhow::Result`. Like
/// `flatten_handle`, useful when using `tokio::try_join!` to await multiple
/// handles.
pub async fn as_anyhow_handle<T>(handle: JoinHandle<T>) -> anyhow::Result<T> {
handle.await.context("handling failed")
}

/// A guard that aborts a spawned task when dropped.
#[derive(Debug)]
pub struct SpawnGuard(AbortHandle);
Expand Down
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block_watcher;
pub mod context;
pub mod contracts;
pub mod dev;
Expand All @@ -10,6 +11,7 @@ pub mod math;
pub mod mempool;
pub mod precheck;
pub mod protos;
pub mod retry;
pub mod server;
pub mod simulation;
pub mod strs;
Expand Down
89 changes: 89 additions & 0 deletions src/common/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{future::Future, time::Duration};

use rand::Rng;
use tokio::time;
use tracing::warn;

#[derive(Clone, Copy, Debug)]
pub struct RetryOpts {
pub max_attempts: u64,
/// The first retry is immediately after the first failure (plus jitter).
/// The next retry after that will wait this long.
pub min_nonzero_wait: Duration,
pub max_wait: Duration,
pub max_jitter: Duration,
}

impl Default for RetryOpts {
fn default() -> Self {
UnlimitedRetryOpts::default().to_retry_opts_with_max_attempts(10)
}
}

pub async fn with_retries<Func, Fut, Out, Err>(
description: &str,
func: Func,
opts: RetryOpts,
) -> Result<Out, Err>
where
Func: Fn() -> Fut,
Fut: Future<Output = Result<Out, Err>>,
{
let mut next_wait = Duration::ZERO;
let mut last_error: Option<Err> = None;
for attempt_number in 1..opts.max_attempts + 1 {
match func().await {
Ok(out) => return Ok(out),
Err(error) => {
last_error = Some(error);
warn!("Failed to {description} (attempt {attempt_number})");
}
}
// Grab a new rng each iteration because we can't hold it across awaits.
let jitter = rand::thread_rng().gen_range(Duration::ZERO..opts.max_jitter);
time::sleep(next_wait + jitter).await;
next_wait = (2 * next_wait).clamp(opts.min_nonzero_wait, opts.max_wait);
}
Err(last_error.unwrap())
}

#[derive(Clone, Copy, Debug)]
pub struct UnlimitedRetryOpts {
pub min_nonzero_wait: Duration,
pub max_wait: Duration,
pub max_jitter: Duration,
}

impl Default for UnlimitedRetryOpts {
fn default() -> Self {
Self {
min_nonzero_wait: Duration::from_secs(1),
max_wait: Duration::from_secs(10),
max_jitter: Duration::from_secs(1),
}
}
}

impl UnlimitedRetryOpts {
fn to_retry_opts_with_max_attempts(self, max_attempts: u64) -> RetryOpts {
RetryOpts {
max_attempts,
min_nonzero_wait: self.min_nonzero_wait,
max_wait: self.max_wait,
max_jitter: self.max_jitter,
}
}
}

pub async fn with_unlimited_retries<Func, Fut, Out, Err>(
description: &str,
func: Func,
opts: UnlimitedRetryOpts,
) -> Out
where
Func: Fn() -> Fut,
Fut: Future<Output = Result<Out, Err>>,
{
let opts = opts.to_retry_opts_with_max_attempts(u64::MAX);
with_retries(description, func, opts).await.ok().unwrap()
}
Loading

0 comments on commit b37127d

Please sign in to comment.