Skip to content

Commit

Permalink
more fixes and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Jun 20, 2024
1 parent 4291419 commit 3a590aa
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 6 deletions.
48 changes: 42 additions & 6 deletions crates/sui-bridge/src/action_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
//! collects bridge authority signatures and submit signatures on chain.

use crate::retry_with_max_elapsed_time;
use arc_swap::ArcSwap;
use mysten_metrics::spawn_logged_monitored_task;
use shared_crypto::intent::{Intent, IntentMessage};
Expand Down Expand Up @@ -37,6 +38,7 @@ use crate::{
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::Duration;
use tracing::{error, info, instrument, warn, Instrument};

pub const CHANNEL_SIZE: usize = 1000;
Expand Down Expand Up @@ -215,6 +217,16 @@ where
}
}

async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
let Ok(Ok(is_paused)) =
retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
else {
error!("Failed to get bridge status after retry");
return false;
};
!is_paused
}

#[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
async fn handle_signing_task(
semaphore: &Arc<Semaphore>,
Expand All @@ -234,6 +246,17 @@ where
let action_key = action.0.key();
info!("Received action for signing: {:?}", action.0);

// TODO: this is a temporary fix to avoid signing when the bridge is paused.
// but the way is implemented is not ideal:
// 1. it should check the direction
// 2. should use a better mechanism to check the bridge status instead of polling for each action
let should_proceed = Self::should_proceed_signing(sui_client).await;
if !should_proceed {
metrics.action_executor_signing_queue_skipped_actions.inc();
warn!("skipping signing task: {:?}", action_key);
return;
}

let auth_agg_clone = auth_agg.clone();
let signing_queue_sender_clone = signing_queue_sender.clone();
let execution_queue_sender_clone = execution_queue_sender.clone();
Expand Down Expand Up @@ -264,6 +287,7 @@ where
sui_client: &Arc<SuiClient<C>>,
action: &BridgeAction,
store: &Arc<BridgeOrchestratorTables>,
metrics: &Arc<BridgeMetrics>,
) -> bool {
let status = sui_client
.get_token_transfer_action_onchain_status_until_success(
Expand All @@ -277,6 +301,7 @@ where
"Action already approved or claimed, removing action from pending logs: {:?}",
action
);
metrics.action_executor_already_processed_actions.inc();
store
.remove_pending_actions(&[action.digest()])
.unwrap_or_else(|e| {
Expand All @@ -290,6 +315,8 @@ where
}
}

// TODO: introduce a way to properly stagger the handling
// for various validators.
async fn request_signatures(
semaphore: Arc<Semaphore>,
sui_client: Arc<SuiClient<C>>,
Expand All @@ -316,8 +343,13 @@ where
};

// If the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(&sui_client, &action, &store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
&sui_client,
&action,
&store,
&metrics,
)
.await
{
return;
}
Expand Down Expand Up @@ -443,8 +475,10 @@ where
let ceriticate_clone = certificate.clone();

// Check once: if the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
sui_client, action, store, metrics,
)
.await
{
info!("Action already processed, skipping");
return;
Expand Down Expand Up @@ -480,8 +514,10 @@ where
let tx_digest = *signed_tx.digest();

// Check twice: If the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
sui_client, action, store, metrics,
)
.await
{
info!("Action already processed, skipping");
return;
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge/src/eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ where
}

// TODO: define a type for block number for readability
// TODO: add a metrics for current start block
async fn run_event_listening_task(
contract_address: EthAddress,
mut start_block: u64,
Expand Down
14 changes: 14 additions & 0 deletions crates/sui-bridge/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub struct BridgeMetrics {
pub(crate) eth_watcher_received_events: IntCounter,
pub(crate) eth_watcher_received_actions: IntCounter,
pub(crate) eth_watcher_unrecognized_events: IntCounter,
pub(crate) action_executor_already_processed_actions: IntCounter,
pub(crate) action_executor_signing_queue_received_actions: IntCounter,
pub(crate) action_executor_signing_queue_skipped_actions: IntCounter,
pub(crate) action_executor_execution_queue_received_actions: IntCounter,

pub(crate) gas_coin_balance: IntGauge,
Expand Down Expand Up @@ -128,12 +130,24 @@ impl BridgeMetrics {
registry,
)
.unwrap(),
action_executor_already_processed_actions: register_int_counter_with_registry!(
"bridge_action_executor_already_processed_actions",
"Total number of already processed actions action executor",
registry,
)
.unwrap(),
action_executor_signing_queue_received_actions: register_int_counter_with_registry!(
"bridge_action_executor_signing_queue_received_actions",
"Total number of received actions in action executor signing queue",
registry,
)
.unwrap(),
action_executor_signing_queue_skipped_actions: register_int_counter_with_registry!(
"bridge_action_executor_signing_queue_skipped_actions",
"Total number of skipped actions in action executor signing queue",
registry,
)
.unwrap(),
action_executor_execution_queue_received_actions: register_int_counter_with_registry!(
"bridge_action_executor_execution_queue_received_actions",
"Total number of received actions in action executor execution queue",
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-bridge/src/sui_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ where
.map_err(|e| BridgeError::InternalError(format!("Can't get bridge committee: {e}")))
}

pub async fn is_bridge_paused(&self) -> BridgeResult<bool> {
self.get_bridge_summary()
.await
.map(|summary| summary.is_frozen)
}

pub async fn get_treasury_summary(&self) -> BridgeResult<BridgeTreasurySummary> {
Ok(self.get_bridge_summary().await?.treasury)
}
Expand Down

0 comments on commit 3a590aa

Please sign in to comment.