From 3a590aa40362df2bc7e7eea99b3abfedb61da483 Mon Sep 17 00:00:00 2001 From: longbowlu Date: Thu, 20 Jun 2024 10:58:02 -0700 Subject: [PATCH] more fixes and metrics --- crates/sui-bridge/src/action_executor.rs | 48 +++++++++++++++++++++--- crates/sui-bridge/src/eth_syncer.rs | 1 + crates/sui-bridge/src/metrics.rs | 14 +++++++ crates/sui-bridge/src/sui_client.rs | 6 +++ 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/crates/sui-bridge/src/action_executor.rs b/crates/sui-bridge/src/action_executor.rs index e7538d864928b9..e1835feb277091 100644 --- a/crates/sui-bridge/src/action_executor.rs +++ b/crates/sui-bridge/src/action_executor.rs @@ -4,6 +4,7 @@ //! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator), //! collects bridge authority signatures and submit signatures on chain. +use crate::retry_with_max_elapsed_time; use arc_swap::ArcSwap; use mysten_metrics::spawn_logged_monitored_task; use shared_crypto::intent::{Intent, IntentMessage}; @@ -37,6 +38,7 @@ use crate::{ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Semaphore; +use tokio::time::Duration; use tracing::{error, info, instrument, warn, Instrument}; pub const CHANNEL_SIZE: usize = 1000; @@ -215,6 +217,16 @@ where } } + async fn should_proceed_signing(sui_client: &Arc>) -> bool { + let Ok(Ok(is_paused)) = + retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600)) + else { + error!("Failed to get bridge status after retry"); + return false; + }; + !is_paused + } + #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))] async fn handle_signing_task( semaphore: &Arc, @@ -234,6 +246,17 @@ where let action_key = action.0.key(); info!("Received action for signing: {:?}", action.0); + // TODO: this is a temporary fix to avoid signing when the bridge is paused. + // but the way is implemented is not ideal: + // 1. it should check the direction + // 2. should use a better mechanism to check the bridge status instead of polling for each action + let should_proceed = Self::should_proceed_signing(sui_client).await; + if !should_proceed { + metrics.action_executor_signing_queue_skipped_actions.inc(); + warn!("skipping signing task: {:?}", action_key); + return; + } + let auth_agg_clone = auth_agg.clone(); let signing_queue_sender_clone = signing_queue_sender.clone(); let execution_queue_sender_clone = execution_queue_sender.clone(); @@ -264,6 +287,7 @@ where sui_client: &Arc>, action: &BridgeAction, store: &Arc, + metrics: &Arc, ) -> bool { let status = sui_client .get_token_transfer_action_onchain_status_until_success( @@ -277,6 +301,7 @@ where "Action already approved or claimed, removing action from pending logs: {:?}", action ); + metrics.action_executor_already_processed_actions.inc(); store .remove_pending_actions(&[action.digest()]) .unwrap_or_else(|e| { @@ -290,6 +315,8 @@ where } } + // TODO: introduce a way to properly stagger the handling + // for various validators. async fn request_signatures( semaphore: Arc, sui_client: Arc>, @@ -316,8 +343,13 @@ where }; // If the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(&sui_client, &action, &store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + &sui_client, + &action, + &store, + &metrics, + ) + .await { return; } @@ -443,8 +475,10 @@ where let ceriticate_clone = certificate.clone(); // Check once: if the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + sui_client, action, store, metrics, + ) + .await { info!("Action already processed, skipping"); return; @@ -480,8 +514,10 @@ where let tx_digest = *signed_tx.digest(); // Check twice: If the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + sui_client, action, store, metrics, + ) + .await { info!("Action already processed, skipping"); return; diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index 80da73d6196980..207631049640fd 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -114,6 +114,7 @@ where } // TODO: define a type for block number for readability + // TODO: add a metrics for current start block async fn run_event_listening_task( contract_address: EthAddress, mut start_block: u64, diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 007a4a7860ff2a..6e142a0fbc0903 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -25,7 +25,9 @@ pub struct BridgeMetrics { pub(crate) eth_watcher_received_events: IntCounter, pub(crate) eth_watcher_received_actions: IntCounter, pub(crate) eth_watcher_unrecognized_events: IntCounter, + pub(crate) action_executor_already_processed_actions: IntCounter, pub(crate) action_executor_signing_queue_received_actions: IntCounter, + pub(crate) action_executor_signing_queue_skipped_actions: IntCounter, pub(crate) action_executor_execution_queue_received_actions: IntCounter, pub(crate) gas_coin_balance: IntGauge, @@ -128,12 +130,24 @@ impl BridgeMetrics { registry, ) .unwrap(), + action_executor_already_processed_actions: register_int_counter_with_registry!( + "bridge_action_executor_already_processed_actions", + "Total number of already processed actions action executor", + registry, + ) + .unwrap(), action_executor_signing_queue_received_actions: register_int_counter_with_registry!( "bridge_action_executor_signing_queue_received_actions", "Total number of received actions in action executor signing queue", registry, ) .unwrap(), + action_executor_signing_queue_skipped_actions: register_int_counter_with_registry!( + "bridge_action_executor_signing_queue_skipped_actions", + "Total number of skipped actions in action executor signing queue", + registry, + ) + .unwrap(), action_executor_execution_queue_received_actions: register_int_counter_with_registry!( "bridge_action_executor_execution_queue_received_actions", "Total number of received actions in action executor execution queue", diff --git a/crates/sui-bridge/src/sui_client.rs b/crates/sui-bridge/src/sui_client.rs index 6cec2440a50fb9..71012a2de4f385 100644 --- a/crates/sui-bridge/src/sui_client.rs +++ b/crates/sui-bridge/src/sui_client.rs @@ -167,6 +167,12 @@ where .map_err(|e| BridgeError::InternalError(format!("Can't get bridge committee: {e}"))) } + pub async fn is_bridge_paused(&self) -> BridgeResult { + self.get_bridge_summary() + .await + .map(|summary| summary.is_frozen) + } + pub async fn get_treasury_summary(&self) -> BridgeResult { Ok(self.get_bridge_summary().await?.treasury) }