Skip to content

Commit

Permalink
fix(pool): only ineligible if da tracking, always emit events
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 16, 2024
1 parent e53c769 commit 2cf41c1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 53 deletions.
127 changes: 84 additions & 43 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use rundler_types::{
pool::{MempoolError, PoolOperation},
Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant,
};
use rundler_utils::math;
use rundler_utils::{emit::WithEntryPoint, math};
use tokio::sync::broadcast;
use tracing::{info, warn};

use super::{entity_tracker::EntityCounter, size::SizeTracker, MempoolResult, PoolConfig};
Expand Down Expand Up @@ -97,13 +98,19 @@ pub(crate) struct PoolInner<D> {
prev_block_number: u64,
/// The metrics of pool.
metrics: PoolMetrics,
/// Event sender
event_sender: broadcast::Sender<WithEntryPoint<PoolEvent>>,
}

impl<D> PoolInner<D>
where
D: DAGasProvider,
{
pub(crate) fn new(config: PoolInnerConfig, da_gas_provider: D) -> Self {
pub(crate) fn new(
config: PoolInnerConfig,
da_gas_provider: D,
event_sender: broadcast::Sender<WithEntryPoint<PoolEvent>>,
) -> Self {
let entry_point = config.entry_point.to_string();
Self {
config,
Expand All @@ -121,9 +128,17 @@ where
prev_sys_block_time: Duration::default(),
prev_block_number: 0,
metrics: PoolMetrics::new_with_labels(&[("entry_point", entry_point)]),
event_sender,
}
}

fn emit(&self, event: PoolEvent) {
let _ = self.event_sender.send(WithEntryPoint {
entry_point: self.config.entry_point,
event,
});
}

/// Returns hash of operation to replace if operation is a replacement
pub(crate) fn check_replacement(
&self,
Expand Down Expand Up @@ -163,18 +178,37 @@ where
pub(crate) fn add_operation(
&mut self,
op: PoolOperation,
has_required_pvg: bool,
required_pvg: u128,
) -> MempoolResult<B256> {
// only eligibility criteria is required PVG which is enabled when da_gas_tracking is enabled
let is_eligible = if self.config.da_gas_tracking_enabled {
if op.uo.pre_verification_gas() >= required_pvg {
self.emit(PoolEvent::UpdatedDAData {
op_hash: op
.uo
.hash(self.config.entry_point, self.config.chain_spec.id),
eligible: false,
required_pvg,
actual_pvg: op.uo.pre_verification_gas(),
});
false
} else {
true
}
} else {
true
};

// only eligibility requirement is if the op has required pvg
let pool_op = Arc::new(OrderedPoolOperation::new(
Arc::new(op),
self.next_submission_id(),
has_required_pvg,
is_eligible,
));

let ret = self.add_operation_internal(pool_op);
let hash = self.add_operation_internal(pool_op)?;
self.update_metrics();
ret
Ok(hash)
}

pub(crate) fn best_operations(&self) -> impl Iterator<Item = Arc<PoolOperation>> + '_ {
Expand All @@ -201,7 +235,7 @@ where
block_da_data: Option<&DAGasBlockData>,
candidate_gas_fees: GasFees,
base_fee: u128,
) -> Vec<PoolEvent> {
) {
let sys_block_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after epoch");
Expand Down Expand Up @@ -281,12 +315,13 @@ where
for hash in expired {
self.remove_operation_by_hash(hash);
}
for event in events {
self.emit(event);
}

self.metrics.num_candidates.set(num_candidates as f64);
self.prev_block_number = block_number;
self.prev_sys_block_time = sys_block_time;

events
}

pub(crate) fn address_count(&self, address: &Address) -> usize {
Expand Down Expand Up @@ -736,7 +771,7 @@ mod tests {
fn add_single_op() {
let mut pool = pool();
let op = create_op(Address::random(), 0, 1);
let hash = pool.add_operation(op.clone(), true).unwrap();
let hash = pool.add_operation(op.clone(), 0).unwrap();

check_map_entry(pool.by_hash.get(&hash), Some(&op));
check_map_entry(pool.by_id.get(&op.uo.id()), Some(&op));
Expand All @@ -747,7 +782,7 @@ mod tests {
fn test_get_by_hash() {
let mut pool = pool();
let op = create_op(Address::random(), 0, 1);
let hash = pool.add_operation(op.clone(), true).unwrap();
let hash = pool.add_operation(op.clone(), 0).unwrap();

let get_op = pool.get_operation_by_hash(hash).unwrap();
assert_eq!(op, *get_op);
Expand All @@ -759,7 +794,7 @@ mod tests {
fn test_get_by_id() {
let mut pool = pool();
let op = create_op(Address::random(), 0, 1);
pool.add_operation(op.clone(), true).unwrap();
pool.add_operation(op.clone(), 0).unwrap();
let id = op.uo.id();

let get_op = pool.get_operation_by_id(&id).unwrap();
Expand All @@ -784,7 +819,7 @@ mod tests {

let mut hashes = vec![];
for op in ops.iter() {
hashes.push(pool.add_operation(op.clone(), true).unwrap());
hashes.push(pool.add_operation(op.clone(), 0).unwrap());
}

for (hash, op) in hashes.iter().zip(&ops) {
Expand All @@ -810,7 +845,7 @@ mod tests {

let mut hashes = vec![];
for op in ops.iter() {
hashes.push(pool.add_operation(op.clone(), true).unwrap());
hashes.push(pool.add_operation(op.clone(), 0).unwrap());
}

// best should be sorted by gas, then by submission id
Expand All @@ -831,7 +866,7 @@ mod tests {

let mut hashes = vec![];
for op in ops.iter() {
hashes.push(pool.add_operation(op.clone(), true).unwrap());
hashes.push(pool.add_operation(op.clone(), 0).unwrap());
}

assert!(pool.remove_operation_by_hash(hashes[0]).is_some());
Expand Down Expand Up @@ -862,7 +897,7 @@ mod tests {
];
for mut op in ops.into_iter() {
op.aggregator = Some(account);
pool.add_operation(op.clone(), true).unwrap();
pool.add_operation(op.clone(), 0).unwrap();
}
assert_eq!(pool.by_hash.len(), 3);

Expand All @@ -884,7 +919,7 @@ mod tests {
.uo
.hash(pool.config.entry_point, pool.config.chain_spec.id);

pool.add_operation(op, true).unwrap();
pool.add_operation(op, 0).unwrap();

let mined_op = MinedOp {
paymaster: None,
Expand Down Expand Up @@ -915,8 +950,8 @@ mod tests {
.uo
.hash(pool.config.entry_point, pool.config.chain_spec.id);

pool.add_operation(op, true).unwrap();
pool.add_operation(op_2, true).unwrap();
pool.add_operation(op, 0).unwrap();
pool.add_operation(op_2, 0).unwrap();

let mined_op = MinedOp {
paymaster: None,
Expand Down Expand Up @@ -949,7 +984,7 @@ mod tests {
entity: Entity::aggregator(agg),
is_staked: false,
});
pool.add_operation(op.clone(), true).unwrap();
pool.add_operation(op.clone(), 0).unwrap();
}
assert_eq!(pool.by_hash.len(), 3);

Expand All @@ -976,7 +1011,7 @@ mod tests {
entity: Entity::paymaster(paymaster),
is_staked: false,
});
pool.add_operation(op.clone(), true).unwrap();
pool.add_operation(op.clone(), 0).unwrap();
}
assert_eq!(pool.by_hash.len(), 3);

Expand Down Expand Up @@ -1018,7 +1053,7 @@ mod tests {
let mut op = op.clone();
let uo: &mut UserOperation = op.uo.as_mut();
uo.nonce = U256::from(i);
hashes.push(pool.add_operation(op, true).unwrap());
hashes.push(pool.add_operation(op, 0).unwrap());
}

assert_eq!(pool.address_count(&sender), 5);
Expand All @@ -1042,12 +1077,12 @@ mod tests {
let mut pool = pool();
for i in 0..20 {
let op = create_op(Address::random(), i, (i + 1) as u128);
pool.add_operation(op, true).unwrap();
pool.add_operation(op, 0).unwrap();
}

// on greater gas, new op should win
let op = create_op(Address::random(), args.max_size_of_pool_bytes, 2);
let result = pool.add_operation(op, true);
let result = pool.add_operation(op, 0);
assert!(result.is_ok(), "{:?}", result.err());
}

Expand All @@ -1056,15 +1091,15 @@ mod tests {
let mut pool = pool();
for i in 0..20 {
let op = create_op(Address::random(), i, (i + 1) as u128);
pool.add_operation(op, true).unwrap();
pool.add_operation(op, 0).unwrap();
}

let op = create_op(Address::random(), 4, 1);
assert!(pool.add_operation(op, true).is_err());
assert!(pool.add_operation(op, 0).is_err());

// on equal gas, worst should remain because it came first
let op = create_op(Address::random(), 4, 2);
let result = pool.add_operation(op, true);
let result = pool.add_operation(op, 0);
assert!(result.is_ok(), "{:?}", result.err());
}

Expand All @@ -1075,12 +1110,12 @@ mod tests {
let mut po1 = create_op(sender, 0, 100);
let uo1: &mut UserOperation = po1.uo.as_mut();
uo1.max_priority_fee_per_gas = 100;
let _ = pool.add_operation(po1.clone(), true).unwrap();
let _ = pool.add_operation(po1.clone(), 0).unwrap();

let mut po2 = create_op(sender, 0, 101);
let uo2: &mut UserOperation = po2.uo.as_mut();
uo2.max_priority_fee_per_gas = 101;
let res = pool.add_operation(po2, true);
let res = pool.add_operation(po2, 0);
assert!(res.is_err());
match res.err().unwrap() {
MempoolError::ReplacementUnderpriced(a, b) => {
Expand Down Expand Up @@ -1110,7 +1145,7 @@ mod tests {
entity: Entity::paymaster(paymaster1),
is_staked: false,
});
let _ = pool.add_operation(po1, true).unwrap();
let _ = pool.add_operation(po1, 0).unwrap();
assert_eq!(pool.address_count(&paymaster1), 1);

let paymaster2 = Address::random();
Expand All @@ -1122,7 +1157,7 @@ mod tests {
entity: Entity::paymaster(paymaster2),
is_staked: false,
});
let _ = pool.add_operation(po2.clone(), true).unwrap();
let _ = pool.add_operation(po2.clone(), 0).unwrap();

assert_eq!(pool.address_count(&sender), 1);
assert_eq!(pool.address_count(&paymaster1), 0);
Expand All @@ -1140,9 +1175,9 @@ mod tests {
let mut po1 = create_op(sender, 0, 10);
let uo1: &mut UserOperation = po1.uo.as_mut();
uo1.max_priority_fee_per_gas = 10;
let _ = pool.add_operation(po1.clone(), true).unwrap();
let _ = pool.add_operation(po1.clone(), 0).unwrap();

let res = pool.add_operation(po1, true);
let res = pool.add_operation(po1, 0);
assert!(res.is_err());
match res.err().unwrap() {
MempoolError::OperationAlreadyKnown => (),
Expand All @@ -1157,10 +1192,9 @@ mod tests {
let sender = Address::random();
let mut po1 = create_op(sender, 0, 10);
po1.valid_time_range.valid_until = Timestamp::from(1);
let hash = pool.add_operation(po1.clone(), true).unwrap();
let hash = pool.add_operation(po1.clone(), 0).unwrap();

let res = pool.do_maintenance(0, Timestamp::from(2), None, GasFees::default(), 0);
assert_eq!(res.len(), 1);
pool.do_maintenance(0, Timestamp::from(2), None, GasFees::default(), 0);
assert_eq!(None, pool.get_operation_by_hash(hash));
}

Expand All @@ -1171,18 +1205,17 @@ mod tests {

let mut po1 = create_op(Address::random(), 0, 10);
po1.valid_time_range.valid_until = 5.into();
let hash1 = pool.add_operation(po1.clone(), true).unwrap();
let hash1 = pool.add_operation(po1.clone(), 0).unwrap();

let mut po2 = create_op(Address::random(), 0, 10);
po2.valid_time_range.valid_until = 10.into();
let hash2 = pool.add_operation(po2.clone(), true).unwrap();
let hash2 = pool.add_operation(po2.clone(), 0).unwrap();
let mut po3 = create_op(Address::random(), 0, 10);
po3.valid_time_range.valid_until = 9.into();
let hash3 = pool.add_operation(po3.clone(), true).unwrap();
let hash3 = pool.add_operation(po3.clone(), 0).unwrap();

let res = pool.do_maintenance(0, Timestamp::from(10), None, GasFees::default(), 0);
pool.do_maintenance(0, Timestamp::from(10), None, GasFees::default(), 0);

assert_eq!(res.len(), 2);
assert_eq!(None, pool.get_operation_by_hash(hash1));
assert!(pool.get_operation_by_hash(hash2).is_some());
assert_eq!(None, pool.get_operation_by_hash(hash3));
Expand All @@ -1201,11 +1234,19 @@ mod tests {
}

fn pool() -> PoolInner<MockEntryPointV0_6> {
PoolInner::new(conf(), MockEntryPointV0_6::new())
PoolInner::new(
conf(),
MockEntryPointV0_6::new(),
broadcast::channel(100000).0,
)
}

fn pool_with_conf(conf: PoolInnerConfig) -> PoolInner<MockEntryPointV0_6> {
PoolInner::new(conf, MockEntryPointV0_6::new())
PoolInner::new(
conf,
MockEntryPointV0_6::new(),
broadcast::channel(100000).0,
)
}

fn mem_size_of_ordered_pool_op() -> usize {
Expand Down
Loading

0 comments on commit 2cf41c1

Please sign in to comment.