Skip to content

Commit

Permalink
feat: WIP add pool server abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Jul 22, 2023
1 parent 1ce0f80 commit b957229
Show file tree
Hide file tree
Showing 34 changed files with 2,414 additions and 1,812 deletions.
1,234 changes: 613 additions & 621 deletions src/builder/bundle_proposer.rs

Large diffs are not rendered by default.

120 changes: 61 additions & 59 deletions src/builder/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
sync::{mpsc, oneshot},
time,
};
use tonic::transport::Channel;
use tonic::async_trait;
use tracing::{error, info, trace, warn};

use crate::{
Expand All @@ -27,38 +27,42 @@ use crate::{
common::{
gas::GasFees,
math,
protos::op_pool::{
self, op_pool_client::OpPoolClient, RemoveEntitiesRequest, RemoveOpsRequest,
},
types::{Entity, EntryPointLike, ExpectedStorage, UserOperation},
},
op_pool::PoolClient,
};

// Overhead on gas estimates to account for inaccuracies.
const GAS_ESTIMATE_OVERHEAD_PERCENT: u64 = 10;

#[async_trait]
pub trait BundleBuilder: Send + Sync + 'static {
async fn send_bundles_in_loop(&mut self);
}

#[derive(Debug)]
pub struct Settings {
pub replacement_fee_percent_increase: u64,
pub max_fee_increases: u64,
}

#[derive(Debug)]
pub struct BundleSender<P, E, T>
pub struct BundleSender<P, E, T, C>
where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
{
manual_bundling_mode: Arc<AtomicBool>,
send_bundle_receiver: mpsc::Receiver<SendBundleRequest>,
chain_id: u64,
beneficiary: Address,
eth_poll_interval: Duration,
op_pool: OpPoolClient<Channel>,
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
// TODO: Figure out what we really want to do for detecting new blocks.
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
Expand Down Expand Up @@ -91,45 +95,18 @@ pub enum SendBundleResult {
Error(anyhow::Error),
}

impl<P, E, T> BundleSender<P, E, T>
#[async_trait]
impl<P, E, T, C> BundleBuilder for BundleSender<P, E, T, C>
where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
manual_bundling_mode: Arc<AtomicBool>,
send_bundle_receiver: mpsc::Receiver<SendBundleRequest>,
chain_id: u64,
beneficiary: Address,
eth_poll_interval: Duration,
op_pool: OpPoolClient<Channel>,
proposer: P,
entry_point: E,
transaction_tracker: T,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
) -> Self {
Self {
manual_bundling_mode,
send_bundle_receiver,
chain_id,
beneficiary,
eth_poll_interval,
op_pool,
proposer,
entry_point,
transaction_tracker,
provider,
settings,
}
}

/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
pub async fn send_bundles_in_loop(&mut self) -> ! {
async fn send_bundles_in_loop(&mut self) {
let mut last_block_number = 0;
loop {
let mut send_bundle_response: Option<oneshot::Sender<SendBundleResult>> = None;
Expand Down Expand Up @@ -178,6 +155,43 @@ where
}
}
}
}

impl<P, E, T, C> BundleSender<P, E, T, C>
where
P: BundleProposer,
E: EntryPointLike,
T: TransactionTracker,
C: PoolClient,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
manual_bundling_mode: Arc<AtomicBool>,
send_bundle_receiver: mpsc::Receiver<SendBundleRequest>,
chain_id: u64,
beneficiary: Address,
eth_poll_interval: Duration,
proposer: P,
entry_point: E,
transaction_tracker: T,
pool_client: C,
provider: Arc<Provider<RetryClient<Http>>>,
settings: Settings,
) -> Self {
Self {
manual_bundling_mode,
send_bundle_receiver,
chain_id,
beneficiary,
eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
provider,
settings,
}
}

async fn check_for_and_log_transaction_update(&self) {
let update = self.transaction_tracker.check_for_update_now().await;
Expand Down Expand Up @@ -392,34 +406,22 @@ where
}

async fn remove_ops_from_pool(&self, ops: &[UserOperation]) -> anyhow::Result<()> {
self.op_pool
.clone()
.remove_ops(RemoveOpsRequest {
entry_point: self.entry_point.address().as_bytes().to_vec(),
hashes: ops
.iter()
.map(|op| self.op_hash(op).as_bytes().to_vec())
self.pool_client
.remove_ops(
self.entry_point.address(),
ops.iter()
.map(|op| op.op_hash(self.entry_point.address(), self.chain_id))
.collect(),
})
)
.await
.context("builder should remove rejected ops from pool")?;
Ok(())
.context("builder should remove rejected ops from pool")
}

async fn remove_entities_from_pool(&self, entities: &[Entity]) -> anyhow::Result<()> {
self.op_pool
.clone()
.remove_entities(RemoveEntitiesRequest {
entry_point: self.entry_point.address().as_bytes().to_vec(),
entities: entities.iter().map(op_pool::Entity::from).collect(),
})
self.pool_client
.remove_entities(self.entry_point.address(), entities.to_vec())
.await
.context("builder should remove rejected entities from pool")?;
Ok(())
}

fn op_hash(&self, op: &UserOperation) -> H256 {
op.op_hash(self.entry_point.address(), self.chain_id)
.context("builder should remove rejected entities from pool")
}
}

Expand Down
117 changes: 61 additions & 56 deletions src/builder/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,23 @@ use std::{
time::Duration,
};

use anyhow::{bail, Context};
use anyhow::Context;
use ethers::{
providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient, RetryClientBuilder},
types::{Address, H256},
};
use ethers_signers::Signer;
use rusoto_core::Region;
use tokio::{select, sync::mpsc, time};
use tokio::{sync::mpsc, time};
use tokio_util::sync::CancellationToken;
use tonic::{
async_trait,
transport::{Channel, Server},
};
use tonic::{async_trait, transport::Server};
use tracing::info;
use url::Url;

use crate::{
builder::{
bundle_proposer::{self, BundleProposerImpl},
bundle_sender::{self, BundleSender},
bundle_sender::{self, BundleBuilder, BundleSender},
sender::get_sender,
server::BuilderImpl,
signer::{BundlerSigner, KmsSigner, LocalSigner},
Expand All @@ -34,21 +31,18 @@ use crate::{
gas::PriorityFeeMode,
handle::{SpawnGuard, Task},
mempool::MempoolConfig,
protos::{
builder::{builder_server::BuilderServer, BUILDER_FILE_DESCRIPTOR_SET},
op_pool::op_pool_client::OpPoolClient,
},
server::{self, format_socket_addr},
protos::builder::{builder_server::BuilderServer, BUILDER_FILE_DESCRIPTOR_SET},
server::format_socket_addr,
simulation::{self, SimulatorImpl},
},
op_pool::{connect_remote_pool_client, LocalPoolClient, PoolClientMode},
};

#[derive(Debug)]
pub struct Args {
pub port: u16,
pub host: String,
pub rpc_url: String,
pub pool_url: String,
pub entry_point_address: Address,
pub private_key: Option<String>,
pub aws_kms_key_ids: Vec<String>,
Expand All @@ -68,6 +62,7 @@ pub struct Args {
pub max_blocks_to_wait_for_mine: u64,
pub replacement_fee_percent_increase: u64,
pub max_fee_increases: u64,
pub pool_client_mode: PoolClientMode,
}

#[derive(Debug)]
Expand All @@ -77,7 +72,7 @@ pub struct BuilderTask {

#[async_trait]
impl Task for BuilderTask {
async fn run(&self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
let addr = format_socket_addr(&self.args.host, self.args.port).parse()?;
info!("Starting builder server on {}", addr);
tracing::info!("Mempool config: {:?}", self.args.mempool_configs);
Expand Down Expand Up @@ -121,23 +116,13 @@ impl Task for BuilderTask {
priority_fee_mode: self.args.priority_fee_mode,
bundle_priority_fee_overhead_percent: self.args.bundle_priority_fee_overhead_percent,
};
let op_pool =
Self::connect_client_with_shutdown(&self.args.pool_url, shutdown_token.clone()).await?;
let simulator = SimulatorImpl::new(
Arc::clone(&provider),
self.args.entry_point_address,
self.args.sim_settings,
self.args.mempool_configs.clone(),
);
let entry_point = IEntryPoint::new(self.args.entry_point_address, Arc::clone(&provider));
let proposer = BundleProposerImpl::new(
op_pool.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
);
let submit_provider = new_provider(&self.args.submit_url, self.args.eth_poll_interval)?;
let transaction_sender = get_sender(
submit_provider,
Expand All @@ -164,27 +149,62 @@ impl Task for BuilderTask {
let manual_bundling_mode = Arc::new(AtomicBool::new(false));
let (send_bundle_tx, send_bundle_rx) = mpsc::channel(1);

let mut bundle_sender = BundleSender::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
op_pool,
proposer,
entry_point,
transaction_tracker,
provider,
builder_settings,
);

let _builder_loop_guard = {
SpawnGuard::spawn_with_guard(async move { bundle_sender.send_bundles_in_loop().await })
let mut builder: Box<dyn BundleBuilder> = match &self.args.pool_client_mode {
PoolClientMode::Local { sender } => {
let pool_client = LocalPoolClient::new(sender.clone());
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
);
Box::new(BundleSender::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
))
}
PoolClientMode::Remote { url } => {
let pool_client = connect_remote_pool_client(url, shutdown_token.clone()).await?;
let proposer = BundleProposerImpl::new(
pool_client.clone(),
simulator,
entry_point.clone(),
Arc::clone(&provider),
self.args.chain_id,
proposer_settings,
);
Box::new(BundleSender::new(
manual_bundling_mode.clone(),
send_bundle_rx,
self.args.chain_id,
beneficiary,
self.args.eth_poll_interval,
proposer,
entry_point,
transaction_tracker,
pool_client,
provider,
builder_settings,
))
}
};

let builder_server = BuilderImpl::new(manual_bundling_mode, send_bundle_tx);
let _builder_loop_guard =
{ SpawnGuard::spawn_with_guard(async move { builder.send_bundles_in_loop().await }) };

// gRPC server
let builder_server = BuilderImpl::new(manual_bundling_mode, send_bundle_tx);
let builder_server = BuilderServer::new(builder_server);

let reflection_service = tonic_reflection::server::Builder::configure()
Expand Down Expand Up @@ -226,21 +246,6 @@ impl BuilderTask {
pub fn boxed(self) -> Box<dyn Task> {
Box::new(self)
}

async fn connect_client_with_shutdown(
op_pool_url: &str,
shutdown_token: CancellationToken,
) -> anyhow::Result<OpPoolClient<Channel>> {
select! {
_ = shutdown_token.cancelled() => {
tracing::error!("bailing from connecting client, server shutting down");
bail!("Server shutting down")
}
res = server::connect_with_retries("op pool from builder", op_pool_url, OpPoolClient::connect) => {
res
}
}
}
}

fn new_provider(
Expand Down
Loading

0 comments on commit b957229

Please sign in to comment.