Skip to content

Commit

Permalink
feat(pool): add da tracking to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 15, 2024
1 parent 707a916 commit bafb3c3
Show file tree
Hide file tree
Showing 28 changed files with 788 additions and 440 deletions.
3 changes: 3 additions & 0 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ impl BuilderArgs {

let sender_args = self.sender_args(&chain_spec, &rpc_url)?;

let da_tracking_enabled = super::lint_da_tracking(common.da_tracking_enabled, &chain_spec);

Ok(BuilderTaskArgs {
entry_points,
chain_spec,
Expand All @@ -353,6 +355,7 @@ impl BuilderArgs {
max_cancellation_fee_increases: self.max_cancellation_fee_increases,
max_replacement_underpriced_blocks: self.max_replacement_underpriced_blocks,
remote_address,
da_tracking_enabled,
})
}

Expand Down
28 changes: 27 additions & 1 deletion bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use rundler_sim::{
EstimationSettings, PrecheckSettings, PriorityFeeMode, SimulationSettings, MIN_CALL_GAS_LIMIT,
};
use rundler_types::{
chain::ChainSpec, v0_6::UserOperation as UserOperationV0_6,
chain::ChainSpec, da::DAGasOracleContractType, v0_6::UserOperation as UserOperationV0_6,
v0_7::UserOperation as UserOperationV0_7,
};

Expand Down Expand Up @@ -328,6 +328,14 @@ pub struct CommonArgs {
global = true
)]
pub num_builders_v0_7: u64,

#[arg(
long = "da_tracking_enabled",
name = "da_tracking_enabled",
env = "DA_TRACKING_ENABLED",
default_value = "false"
)]
pub da_tracking_enabled: bool,
}

const SIMULATION_GAS_OVERHEAD: u64 = 100_000;
Expand Down Expand Up @@ -582,3 +590,21 @@ pub fn construct_providers(
ep_v0_7,
})
}

fn lint_da_tracking(da_tracking_enabled: bool, chain_spec: &ChainSpec) -> bool {
if !da_tracking_enabled {
return false;
}

if !chain_spec.da_pre_verification_gas {
tracing::warn!("DA tracking is disabled because DA pre-verification gas is not enabled");
false
} else if !(chain_spec.da_gas_oracle_contract_type == DAGasOracleContractType::CachedNitro
|| chain_spec.da_gas_oracle_contract_type == DAGasOracleContractType::LocalBedrock)
{
tracing::warn!("DA tracking is disabled because DA gas oracle contract type {:?} does not support caching", chain_spec.da_gas_oracle_contract_type);
false
} else {
true
}
}
6 changes: 4 additions & 2 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,16 @@ impl PoolArgs {
};
tracing::info!("Mempool channel configs: {:?}", mempool_channel_configs);

let chain_id = chain_spec.id;
let da_tracking_enabled = super::lint_da_tracking(common.da_tracking_enabled, &chain_spec);

let pool_config_base = PoolConfig {
// update per entry point
entry_point: Address::ZERO,
entry_point_version: EntryPointVersion::Unspecified,
num_shards: 0,
mempool_channel_configs: HashMap::new(),
// Base config
chain_id,
chain_spec: chain_spec.clone(),
same_sender_mempool_count: self.same_sender_mempool_count,
min_replacement_fee_increase_percentage: self.min_replacement_fee_increase_percentage,
max_size_of_pool_bytes: self.max_size_in_bytes,
Expand All @@ -223,6 +224,7 @@ impl PoolArgs {
paymaster_cache_length: self.paymaster_cache_length,
reputation_tracking_enabled: self.reputation_tracking_enabled,
drop_min_num_blocks: self.drop_min_num_blocks,
da_tracking_enabled,
};

let mut pool_configs = vec![];
Expand Down
197 changes: 123 additions & 74 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ use rundler_provider::{
BundleHandler, DAGasProvider, EntryPoint, EvmProvider, HandleOpsOut, SignatureAggregator,
};
use rundler_sim::{
gas, ExpectedStorage, FeeEstimator, PriorityFeeMode, SimulationError, SimulationResult,
Simulator, ViolationError,
ExpectedStorage, FeeEstimator, PriorityFeeMode, SimulationError, SimulationResult, Simulator,
ViolationError,
};
use rundler_types::{
chain::ChainSpec,
da::DAGasBlockData,
pool::{Pool, PoolOperation, SimulationViolation},
Entity, EntityInfo, EntityInfos, EntityType, EntityUpdate, EntityUpdateType, GasFees,
Timestamp, UserOperation, UserOperationVariant, UserOpsPerAggregator, BUNDLE_BYTE_OVERHEAD,
TIME_RANGE_BUFFER, USER_OP_OFFSET_WORD_SIZE,
};
use rundler_utils::{emit::WithEntryPoint, math};
use tokio::{sync::broadcast, try_join};
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use crate::emit::{BuilderEvent, ConditionNotMetReason, OpRejectionReason, SkipReason};

Expand Down Expand Up @@ -155,6 +156,7 @@ pub(crate) struct Settings {
pub(crate) beneficiary: Address,
pub(crate) bundle_priority_fee_overhead_percent: u32,
pub(crate) priority_fee_mode: PriorityFeeMode,
pub(crate) da_tracking_enabled: bool,
}

#[async_trait]
Expand Down Expand Up @@ -200,7 +202,7 @@ where
return Err(BundleProposerError::NoOperationsInitially);
}

tracing::debug!("Starting bundle proposal with {} ops", ops.len());
debug!("Starting bundle proposal with {} ops", ops.len());

// (0) Determine fees required for ops to be included in a bundle
// if replacing, just require bundle fees increase chances of unsticking
Expand All @@ -214,10 +216,30 @@ where
.filter_map(|op| op.uo.paymaster())
.collect::<Vec<Address>>();

let da_block_data = if self.settings.da_tracking_enabled {
match self.entry_point.block_data(block_hash.into()).await {
Ok(block_data) => Some(block_data),
Err(e) => {
error!("Failed to get block data for block hash {block_hash:?}, falling back to async da gas calculations: {e:?}");
None
}
}
} else {
None
};

// (1) Filter out ops that don't pay enough to be included
let fee_futs = ops
.into_iter()
.map(|op| self.check_fees(op, block_hash, base_fee, required_op_fees))
.map(|op| {
self.check_fees(
op,
block_hash,
da_block_data.as_ref(),
base_fee,
required_op_fees,
)
})
.collect::<Vec<_>>();
let ops = future::join_all(fee_futs)
.await
Expand All @@ -233,7 +255,7 @@ where
// (2) Limit the amount of operations for simulation
let (ops, gas_limit) = self.limit_user_operations_for_simulation(ops);

tracing::debug!(
debug!(
"Bundle proposal after gas limit had {} ops and {:?} gas limit",
ops.len(),
gas_limit
Expand Down Expand Up @@ -340,12 +362,14 @@ where
// Check fees for a single user op. Returns None if the op should be skipped.
//
// Filters on:
// - insufficient gas fees
// - insufficient pre-verification gas
// - Insufficient gas fees
// - Insufficient pre-verification gas. The initial PVG check is done when the block is updated in the mempool. However,
// that check uses the initial gas fee estimate, whereas this check uses the gas fees specifically from this bundle.
async fn check_fees(
&self,
op: PoolOperation,
block_hash: B256,
da_block_data: Option<&DAGasBlockData>,
base_fee: u128,
required_op_fees: GasFees,
) -> Option<PoolOperation> {
Expand All @@ -369,32 +393,52 @@ where
return None;
}

// Check if the pvg is enough
let required_pvg = match gas::calc_required_pre_verification_gas(
&self.settings.chain_spec,
&self.entry_point,
op.uo.as_ref(),
block_hash.into(),
base_fee,
)
.await
{
Ok(pvg) => pvg,
Err(e) => {
error!("Failed to calculate required pre-verification gas for op: {e:?}, skipping");
self.emit(BuilderEvent::skipped_op(
self.builder_index,
op_hash,
SkipReason::Other {
reason: Arc::new(format!(
"Failed to calculate required pre-verification gas for op: {e:?}, skipping"
)),
},
));
return None;
if !self.settings.chain_spec.da_pre_verification_gas {
// Skip PVG check if no da pre-verification gas as this is checked on entry to the mempool.
return Some(op);
}

let required_da_gas = if self.settings.da_tracking_enabled && da_block_data.is_some() {
let da_block_data = da_block_data.unwrap();
self.entry_point.calc_da_gas_sync(
&op.da_gas_data,
da_block_data,
required_op_fees.max_fee_per_gas,
)
} else {
match self
.entry_point
.calc_da_gas(
op.uo.clone().into(),
block_hash.into(),
required_op_fees.max_fee_per_gas,
)
.await
{
Ok((required_da_gas, _, _)) => required_da_gas,
Err(e) => {
error!(
"Failed to calculate required pre-verification gas for op: {e:?}, skipping"
);
self.emit(BuilderEvent::skipped_op(
self.builder_index,
op_hash,
SkipReason::Other {
reason: Arc::new(format!(
"Failed to calculate required pre-verification gas for op: {e:?}, skipping"
)),
},
));
return None;
}
}
};

// This assumes a bundle size of 1
let required_pvg =
op.uo
.required_pre_verification_gas(&self.settings.chain_spec, 1, required_da_gas);

if op.uo.pre_verification_gas() < required_pvg {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
Expand Down Expand Up @@ -1645,49 +1689,52 @@ mod tests {
assert!(bundle.rejected_ops.is_empty());
}

#[tokio::test]
async fn test_drops_but_not_rejects_op_with_too_low_pvg() {
let base_fee = 1000;
let max_priority_fee_per_gas = 50;
let mut op1 = op_with_sender_and_fees(address(1), 1054, 55);
op1.pre_verification_gas = 0; // Should be dropped but not rejected
let op2 = op_with_sender_and_fees(address(2), 1055, 55);
let bundle = mock_make_bundle(
vec![
MockOp {
op: op1.clone(),
simulation_result: Box::new(|| Ok(SimulationResult::default())),
},
MockOp {
op: op2.clone(),
simulation_result: Box::new(|| Ok(SimulationResult::default())),
},
],
vec![],
vec![HandleOpsOut::Success],
vec![],
base_fee,
max_priority_fee_per_gas,
false,
ExpectedStorage::default(),
)
.await;
assert_eq!(
bundle.gas_fees,
GasFees {
max_fee_per_gas: 1050,
max_priority_fee_per_gas: 50,
}
);
assert_eq!(
bundle.ops_per_aggregator,
vec![UserOpsPerAggregator {
user_ops: vec![op2],
..Default::default()
}],
);
assert!(bundle.rejected_ops.is_empty());
}
// TODO(danc): This test is now longer valid as we only recheck PVG for
// chains with external DA. We should add tests for that, but will require adding
// mock calls to the DA provider.
// #[tokio::test]
// async fn test_drops_but_not_rejects_op_with_too_low_pvg() {
// let base_fee = 1000;
// let max_priority_fee_per_gas = 50;
// let mut op1 = op_with_sender_and_fees(address(1), 1054, 55);
// op1.pre_verification_gas = 0; // Should be dropped but not rejected
// let op2 = op_with_sender_and_fees(address(2), 1055, 55);
// let bundle = mock_make_bundle(
// vec![
// MockOp {
// op: op1.clone(),
// simulation_result: Box::new(|| Ok(SimulationResult::default())),
// },
// MockOp {
// op: op2.clone(),
// simulation_result: Box::new(|| Ok(SimulationResult::default())),
// },
// ],
// vec![],
// vec![HandleOpsOut::Success],
// vec![],
// base_fee,
// max_priority_fee_per_gas,
// false,
// ExpectedStorage::default(),
// )
// .await;
// assert_eq!(
// bundle.gas_fees,
// GasFees {
// max_fee_per_gas: 1050,
// max_priority_fee_per_gas: 50,
// }
// );
// assert_eq!(
// bundle.ops_per_aggregator,
// vec![UserOpsPerAggregator {
// user_ops: vec![op2],
// ..Default::default()
// }],
// );
// assert!(bundle.rejected_ops.is_empty());
// }

#[tokio::test]
async fn test_aggregators() {
Expand Down Expand Up @@ -2279,6 +2326,7 @@ mod tests {
valid_time_range: ValidTimeRange::default(),
entity_infos: EntityInfos::default(),
aggregator: None,
da_gas_data: Default::default(),
})
.collect();

Expand Down Expand Up @@ -2374,6 +2422,7 @@ mod tests {
beneficiary,
priority_fee_mode: PriorityFeeMode::PriorityFeeIncreasePercent(0),
bundle_priority_fee_overhead_percent: 0,
da_tracking_enabled: false,
},
event_sender,
);
Expand Down
2 changes: 1 addition & 1 deletion crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ impl Trigger for BundleSenderTrigger {
a = self.bundle_action_receiver.recv() => {
match a {
Some(BundleSenderAction::ChangeMode(mode)) => {
debug!("changing bundling mode to {mode:?}");
info!("changing bundling mode to {mode:?}");
self.bundling_mode = mode;
continue;
},
Expand Down
Loading

0 comments on commit bafb3c3

Please sign in to comment.