Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(metrics): improve pool/builder metrics/events #290

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,30 +210,23 @@ where
&self,
op: OpFromPool,
block_hash: H256,
) -> anyhow::Result<(UserOperation, Option<SimulationSuccess>)> {
) -> anyhow::Result<(UserOperation, Result<SimulationSuccess, SimulationError>)> {
let result = self
.simulator
.simulate_validation(op.op.clone(), Some(block_hash), Some(op.expected_code_hash))
.await;
match result {
Ok(success) => Ok((
op.op,
Some(success).filter(|success| {
success
.valid_time_range
.contains(Timestamp::now(), TIME_RANGE_BUFFER)
}),
)),
Ok(success) => Ok((op.op, Ok(success))),
Err(error) => match error {
SimulationError::Violations(_) => Ok((op.op, None)),
SimulationError::Violations(_) => Ok((op.op, Err(error))),
SimulationError::Other(error) => Err(error),
},
}
}

async fn assemble_context(
&self,
ops_with_simulations: Vec<(UserOperation, Option<SimulationSuccess>)>,
ops_with_simulations: Vec<(UserOperation, Result<SimulationSuccess, SimulationError>)>,
mut balances_by_paymaster: HashMap<Address, U256>,
) -> ProposalContext {
let all_sender_addresses: HashSet<Address> = ops_with_simulations
Expand All @@ -244,14 +237,33 @@ where
let mut rejected_ops = Vec::<UserOperation>::new();
let mut paymasters_to_reject = Vec::<Address>::new();
for (op, simulation) in ops_with_simulations {
let Some(simulation) = simulation else {
self.emit(BuilderEvent::RejectedOp {
let simulation = match simulation {
Ok(simulation) => simulation,
Err(error) => {
self.emit(BuilderEvent::RejectedOp {
op_hash: self.op_hash(&op),
reason: OpRejectionReason::FailedRevalidation { error },
});
rejected_ops.push(op);
continue;
}
};

// filter time range
if !simulation
.valid_time_range
.contains(Timestamp::now(), TIME_RANGE_BUFFER)
{
self.emit(BuilderEvent::SkippedOp {
op_hash: self.op_hash(&op),
reason: OpRejectionReason::FailedRevalidation,
reason: SkipReason::InvalidTimeRange {
valid_range: simulation.valid_time_range,
},
});
rejected_ops.push(op);
continue;
};
}

if let Some(&other_sender) = simulation
.accessed_addresses
.iter()
Expand All @@ -273,7 +285,7 @@ where
};
let max_cost = op.max_gas_cost();
if *balance < max_cost {
info!("Rejected paymaster ${paymaster:?} becauase its balance was too low.");
info!("Rejected paymaster {paymaster:?} becauase its balance {balance:?} was too low.");
paymasters_to_reject.push(paymaster);
continue;
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/builder/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc};

use ethers::types::{transaction::eip2718::TypedTransaction, Address, H256};

use crate::common::{gas::GasFees, strs};
use crate::common::{gas::GasFees, simulation::SimulationError, strs, types::ValidTimeRange};

#[derive(Clone, Debug)]
pub enum BuilderEvent {
Expand Down Expand Up @@ -45,11 +45,12 @@ pub struct BundleTxDetails {
#[derive(Clone, Debug)]
pub enum SkipReason {
AccessedOtherSender { other_sender: Address },
InvalidTimeRange { valid_range: ValidTimeRange },
}

#[derive(Clone, Debug)]
pub enum OpRejectionReason {
FailedRevalidation,
FailedRevalidation { error: SimulationError },
FailedInBundle { message: Arc<String> },
}

Expand Down
16 changes: 16 additions & 0 deletions src/common/types/violations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ pub enum ViolationError<T> {
Other(#[from] anyhow::Error),
}

impl<T> Clone for ViolationError<T>
where
T: Clone,
{
fn clone(&self) -> Self {
match self {
ViolationError::Violations(violations) => {
ViolationError::Violations(violations.clone())
}
ViolationError::Other(error) => {
ViolationError::Other(anyhow::anyhow!(error.to_string()))
}
}
}
}

impl<T> From<Vec<T>> for ViolationError<T> {
fn from(violations: Vec<T>) -> Self {
Self::Violations(violations)
Expand Down
23 changes: 23 additions & 0 deletions src/op_pool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ impl<P: ProviderLike> Chain<P> {
while self.blocks.len() > self.settings.history_size as usize {
self.blocks.pop_front();
}

ChainMetrics::set_block_height(current_block_number);
if reorg_depth > 0 {
ChainMetrics::increment_reorgs_detected();
ChainMetrics::increment_total_reorg_depth(reorg_depth);
}

self.new_update(reorg_depth, mined_ops, unmined_ops)
}

Expand Down Expand Up @@ -420,6 +427,22 @@ impl ChainUpdate {
}
}

struct ChainMetrics {}

impl ChainMetrics {
fn set_block_height(block_height: u64) {
metrics::gauge!("op_pool_chain_block_height", block_height as f64);
}

fn increment_reorgs_detected() {
metrics::increment_counter!("op_pool_chain_reorgs_detected");
}

fn increment_total_reorg_depth(depth: u64) {
metrics::counter!("op_pool_chain_total_reorg_depth", depth);
}
}

#[cfg(test)]
mod tests {
use std::ops::DerefMut;
Expand Down
2 changes: 1 addition & 1 deletion src/op_pool/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum EntityReputation {

#[derive(Clone, Debug)]
pub enum OpRemovalReason {
Requsted,
Requested,
Mined {
block_number: u64,
block_hash: H256,
Expand Down
7 changes: 1 addition & 6 deletions src/op_pool/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@ use strum::IntoEnumIterator;
use self::error::MempoolResult;
use crate::{
common::types::{Entity, EntityType, UserOperation, ValidTimeRange},
op_pool::{chain::ChainUpdate, reputation::Reputation},
op_pool::reputation::Reputation,
};

/// In-memory operation pool
pub trait Mempool: Send + Sync {
/// Returns the entry point address this pool targets.
fn entry_point(&self) -> Address;

/// Event listener for when a new block is mined.
///
/// Pool is updated according to the chain update event.
fn on_chain_update(&self, update: &ChainUpdate);

/// Adds a validated user operation to the pool.
///
/// Adds a user operation to the pool that was submitted via a local
Expand Down
75 changes: 57 additions & 18 deletions src/op_pool/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub struct PoolInner {
/// Submission ID counter
submission_id: u64,
/// keeps track of the size of the pool in bytes
size: SizeTracker,
pool_size: SizeTracker,
/// keeps track of the size of the removed cache in bytes
cache_size: SizeTracker,
}

impl PoolInner {
Expand All @@ -58,12 +60,15 @@ impl PoolInner {
mined_hashes_with_block_numbers: BTreeSet::new(),
count_by_address: HashMap::new(),
submission_id: 0,
size: SizeTracker::default(),
pool_size: SizeTracker::default(),
cache_size: SizeTracker::default(),
}
}

pub fn add_operation(&mut self, op: PoolOperation) -> MempoolResult<H256> {
self.add_operation_internal(Arc::new(op), None)
let ret = self.add_operation_internal(Arc::new(op), None);
self.update_metrics();
ret
}

pub fn add_operations(
Expand All @@ -85,11 +90,15 @@ impl PoolInner {
}

pub fn remove_operation_by_hash(&mut self, hash: H256) -> Option<Arc<PoolOperation>> {
self.remove_operation_internal(hash, None)
let ret = self.remove_operation_internal(hash, None);
self.update_metrics();
ret
}

pub fn mine_operation(&mut self, hash: H256, block_number: u64) -> Option<Arc<PoolOperation>> {
self.remove_operation_internal(hash, Some(block_number))
let ret = self.remove_operation_internal(hash, Some(block_number));
self.update_metrics();
ret
}

pub fn unmine_operation(&mut self, hash: H256) -> Option<Arc<PoolOperation>> {
Expand All @@ -99,6 +108,7 @@ impl PoolInner {
if let Err(error) = self.put_back_unmined_operation(op.clone()) {
info!("Could not put back unmined operation: {error}");
};
self.update_metrics();
Some(op.po)
}

Expand All @@ -112,8 +122,9 @@ impl PoolInner {
.map(|(hash, _)| *hash)
.collect::<Vec<_>>();
for &hash in &to_remove {
self.remove_operation_by_hash(hash);
self.remove_operation_internal(hash, None);
}
self.update_metrics();
to_remove
}

Expand All @@ -123,9 +134,12 @@ impl PoolInner {
.first()
.filter(|(bn, _)| *bn < block_number)
{
self.mined_at_block_number_by_hash.remove(&hash);
if let Some((op, _)) = self.mined_at_block_number_by_hash.remove(&hash) {
self.cache_size -= op.size();
}
self.mined_hashes_with_block_numbers.remove(&(bn, hash));
}
self.update_metrics();
}

pub fn clear(&mut self) {
Expand All @@ -135,20 +149,22 @@ impl PoolInner {
self.mined_at_block_number_by_hash.clear();
self.mined_hashes_with_block_numbers.clear();
self.count_by_address.clear();
self.size = SizeTracker::default();
self.pool_size = SizeTracker::default();
self.cache_size = SizeTracker::default();
self.update_metrics();
}

fn enforce_size(&mut self) -> anyhow::Result<Vec<H256>> {
let mut removed = Vec::new();

while self.size > self.config.max_size_of_pool_bytes {
while self.pool_size > self.config.max_size_of_pool_bytes {
if let Some(worst) = self.best.pop_last() {
let hash = worst
.uo()
.op_hash(self.config.entry_point, self.config.chain_id);

let _ = self
.remove_operation_by_hash(hash)
.remove_operation_internal(hash, None)
.context("should have removed the worst operation")?;

removed.push(hash);
Expand Down Expand Up @@ -226,7 +242,7 @@ impl PoolInner {
let hash = pool_op
.uo()
.op_hash(self.config.entry_point, self.config.chain_id);
self.size += pool_op.size();
self.pool_size += pool_op.size();
self.by_hash.insert(hash, pool_op.clone());
self.by_id.insert(pool_op.uo().id(), pool_op.clone());
self.best.insert(pool_op);
Expand All @@ -239,8 +255,6 @@ impl PoolInner {
Err(MempoolError::DiscardedOnInsert)?;
}

metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string());
metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string());
Ok(hash)
}

Expand All @@ -253,6 +267,7 @@ impl PoolInner {
self.by_id.remove(&op.uo().id());
self.best.remove(&op);
if let Some(block_number) = block_number {
self.cache_size += op.size();
self.mined_at_block_number_by_hash
.insert(hash, (op.clone(), block_number));
self.mined_hashes_with_block_numbers
Expand All @@ -262,9 +277,7 @@ impl PoolInner {
self.decrement_address_count(e.address);
}

self.size -= op.size();
metrics::gauge!("op_pool_num_ops_in_pool", self.by_hash.len() as f64, "entrypoint_addr" => self.config.entry_point.to_string());
metrics::gauge!("op_pool_size_bytes", self.size.0 as f64, "entrypoint_addr" => self.config.entry_point.to_string());
self.pool_size -= op.size();
Some(op.po)
}

Expand Down Expand Up @@ -294,6 +307,19 @@ impl PoolInner {
);
(replacement_priority_fee, replacement_fee)
}

fn update_metrics(&self) {
PoolMetrics::set_pool_metrics(
self.by_hash.len(),
self.pool_size.0,
self.config.entry_point,
);
PoolMetrics::set_cache_metrics(
self.mined_hashes_with_block_numbers.len(),
self.cache_size.0,
self.config.entry_point,
);
}
}

/// Wrapper around PoolOperation that adds a submission ID to implement
Expand Down Expand Up @@ -339,6 +365,19 @@ impl PartialEq for OrderedPoolOperation {
}
}

struct PoolMetrics {}

impl PoolMetrics {
fn set_pool_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) {
metrics::gauge!("op_pool_num_ops_in_pool", num_ops as f64, "entrypoint_addr" => entry_point.to_string());
metrics::gauge!("op_pool_size_bytes", size_bytes as f64, "entrypoint_addr" => entry_point.to_string());
}
fn set_cache_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) {
metrics::gauge!("op_pool_num_ops_in_cache", num_ops as f64, "entrypoint_addr" => entry_point.to_string());
metrics::gauge!("op_pool_cache_size_bytes", size_bytes as f64, "entrypoint_addr" => entry_point.to_string());
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -599,7 +638,7 @@ mod tests {
}

assert_eq!(pool.address_count(sender), 1);
assert_eq!(pool.size, po1.size());
assert_eq!(pool.pool_size, po1.size());
}

#[test]
Expand All @@ -622,7 +661,7 @@ mod tests {
assert_eq!(pool.address_count(sender), 1);
assert_eq!(pool.address_count(paymaster1), 0);
assert_eq!(pool.address_count(paymaster2), 1);
assert_eq!(pool.size, po2.size());
assert_eq!(pool.pool_size, po2.size());
}

fn conf() -> PoolConfig {
Expand Down
Loading
Loading