Skip to content

Commit

Permalink
clean up and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Jun 25, 2024
1 parent 55877b8 commit 59d6a5e
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 83 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"

[dependencies]
serde.workspace = true
tap.workspace = true
diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-bridge-indexer/src/eth_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents};
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge::types::EthLog;
use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer};
use tokio::task::JoinHandle;
use tracing::info;
use tracing::log::error;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct EthBridgeWorker {
provider: Arc<Provider<Http>>,
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
bridge_address: EthAddress,
config: Config,
Expand All @@ -34,6 +36,7 @@ pub struct EthBridgeWorker {
impl EthBridgeWorker {
pub fn new(
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
config: Config,
) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -47,6 +50,7 @@ impl EthBridgeWorker {
Ok(Self {
provider,
pg_pool,
bridge_metrics,
metrics,
bridge_address,
config,
Expand All @@ -69,7 +73,7 @@ impl EthBridgeWorker {

let (_task_handles, eth_events_rx, _) =
EthSyncer::new(eth_client, finalized_contract_addresses)
.run()
.run(self.bridge_metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -114,7 +118,7 @@ impl EthBridgeWorker {
self.provider.clone(),
unfinalized_contract_addresses.clone(),
)
.run()
.run(self.metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;

Expand Down Expand Up @@ -146,6 +150,11 @@ async fn process_eth_events(
mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
finalized: bool,
) {
let progress_gauge = if finalized {
metrics.last_committed_eth_block.clone()
} else {
metrics.last_committed_unfinalized_eth_block.clone()
};
while let Some((_, _, logs)) = eth_events_rx.recv().await {
for log in logs.iter() {
let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log);
Expand Down Expand Up @@ -240,8 +249,11 @@ async fn process_eth_events(
}
};

// TODO: we either scream here or keep retrying this until we succeed
if let Err(e) = write(&pg_pool, vec![transfer]) {
error!("Error writing token transfer to database: {:?}", e);
} else {
progress_gauge.set(block_number as i64);
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion crates/sui-bridge-indexer/src/latest_eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tracing::error;

use crate::metrics::BridgeIndexerMetrics;

const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000;
const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
const BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -51,6 +53,7 @@ where

pub async fn run(
self,
metrics: BridgeIndexerMetrics,
) -> BridgeResult<(
Vec<JoinHandle<()>>,
mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<EthLog>)>,
Expand All @@ -66,16 +69,17 @@ where
let mut task_handles = vec![];
for (contract_address, start_block) in self.contract_addresses {
let eth_events_tx_clone = eth_evnets_tx.clone();
// let latest_block_rx_clone = latest_block_rx.clone();
let eth_client_clone = self.eth_client.clone();
let provider_clone = self.provider.clone();
let metrics_clone = metrics.clone();
task_handles.push(spawn_logged_monitored_task!(
Self::run_event_listening_task(
contract_address,
start_block,
provider_clone,
eth_events_tx_clone,
eth_client_clone,
metrics_clone,
)
));
}
Expand All @@ -88,6 +92,7 @@ where
provider: Arc<Provider<Http>>,
events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<EthLog>)>,
eth_client: Arc<EthClient<P>>,
metrics: BridgeIndexerMetrics,
) {
tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
loop {
Expand Down Expand Up @@ -124,6 +129,7 @@ where
continue;
};
let len = events.len();
let last_block = events.last().map(|e| e.block_number);

// Note 1: we always events to the channel even when it is empty. This is because of
// how `eth_getLogs` api is designed - we want cursor to move forward continuously.
Expand All @@ -143,6 +149,11 @@ where
"Observed {len} new Eth events",
);
}
if let Some(last_block) = last_block {
metrics
.last_synced_unfinalized_eth_block
.set(last_block as i64);
}
start_block = end_block + 1;
}
}
Expand Down
75 changes: 31 additions & 44 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@ use clap::*;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use tokio::task::JoinHandle;
use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::postgres_manager::{read_sui_progress_store, get_connection_pool, PgProgressStore};
use sui_bridge_indexer::postgres_manager::{
get_connection_pool, read_sui_progress_store, PgProgressStore,
};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_sdk::SuiClientBuilder;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::task::JoinHandle;

use tokio::sync::oneshot;
use tracing::info;
Expand Down Expand Up @@ -76,6 +78,7 @@ async fn main() -> Result<()> {
// TODO: retry_with_max_elapsed_time
let eth_worker = EthBridgeWorker::new(
get_connection_pool(db_url.clone()),
bridge_metrics.clone(),
indexer_meterics.clone(),
config.clone(),
)
Expand All @@ -85,31 +88,42 @@ async fn main() -> Result<()> {
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![eth_worker.bridge_address()]),
bridge_metrics,
bridge_metrics.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()).await.unwrap();
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()).await.unwrap();
let unfinalized_handle = eth_worker
.start_indexing_unfinalized_events(eth_client.clone())
.await
.unwrap();
let finalized_handle = eth_worker
.start_indexing_finalized_events(eth_client.clone())
.await
.unwrap();
let handles = vec![unfinalized_handle, finalized_handle];
// TODO: add retry_with_max_elapsed_time

if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url,
db_url.clone(),
indexer_meterics.clone(),
).await.unwrap();
bridge_metrics,
)
.await
.unwrap();
} else {
let _ = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
).await;
)
.await;
}

// We are not waiting for the sui tasks to finish here, which is ok.
let _ = futures::future::join_all(handles).await;

Ok(())
Expand Down Expand Up @@ -154,55 +168,28 @@ async fn start_processing_sui_checkpoints(
async fn start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url: String,
db_url: String,
indexer_meterics: BridgeIndexerMetrics,
indexer_metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<Vec<JoinHandle<()>>> {
// metrics init

let pg_pool = get_connection_pool(db_url.clone());
let (tx, rx) = mysten_metrics::metered_channel::channel(
100,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&["sui_transaction_processing_queue"]),
.unwrap()
.channel_inflight
.with_label_values(&["sui_transaction_processing_queue"]),
);
let mut handles = vec![];
// FIXME cursor
let cursor = read_sui_progress_store(&pg_pool).unwrap();
let cursor =
read_sui_progress_store(&pg_pool).expect("Failed to read cursor from sui progress store");
let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?;
handles.push(spawn_logged_monitored_task!(
start_sui_tx_polling_task(sui_client, cursor, tx),
start_sui_tx_polling_task(sui_client, cursor, tx, bridge_metrics),
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_meterics.clone(),),
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
"handle_sui_transcations_loop"
));
Ok(handles)
// let (_exit_sender, exit_receiver) = oneshot::channel();

// let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint);
// let mut executor = IndexerExecutor::new(
// progress_store,
// 1, /* workflow types */
// ingestion_metrics,
// );

// let indexer_metrics_cloned = indexer_meterics.clone();

// let worker_pool = WorkerPool::new(
// SuiBridgeWorker::new(vec![], db_url, indexer_metrics_cloned),
// "bridge worker".into(),
// config.concurrency as usize,
// );
// executor.register(worker_pool).await?;
// executor
// .run(
// config.checkpoints_path.clone().into(),
// Some(config.remote_store_url.clone()),
// vec![], // optional remote store access options
// ReaderOptions::default(),
// exit_receiver,
// )
// .await
}
33 changes: 32 additions & 1 deletion crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
};

#[derive(Clone, Debug)]
pub struct BridgeIndexerMetrics {
Expand All @@ -14,6 +17,10 @@ pub struct BridgeIndexerMetrics {
pub(crate) total_eth_token_deposited: IntCounter,
pub(crate) total_eth_token_transfer_claimed: IntCounter,
pub(crate) total_eth_bridge_txn_other: IntCounter,
pub(crate) last_committed_sui_checkpoint: IntGauge,
pub(crate) last_committed_eth_block: IntGauge,
pub(crate) last_synced_unfinalized_eth_block: IntGauge,
pub(crate) last_committed_unfinalized_eth_block: IntGauge,
}

impl BridgeIndexerMetrics {
Expand Down Expand Up @@ -73,6 +80,30 @@ impl BridgeIndexerMetrics {
registry,
)
.unwrap(),
last_committed_sui_checkpoint: register_int_gauge_with_registry!(
"last_committed_sui_checkpoint",
"The latest sui checkpoint that indexer committed to DB",
registry,
)
.unwrap(),
last_committed_eth_block: register_int_gauge_with_registry!(
"last_committed_eth_block",
"The latest eth block that indexer committed to DB",
registry,
)
.unwrap(),
last_synced_unfinalized_eth_block: register_int_gauge_with_registry!(
"last_synced_unfinalized_eth_block",
"The latest unfinalized block that indexer synced",
registry,
)
.unwrap(),
last_committed_unfinalized_eth_block: register_int_gauge_with_registry!(
"last_committed_unfinalized_eth_block",
"The latest unfinalized block that indexer comitted to DB",
registry,
)
.unwrap(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ CREATE TABLE progress_store

CREATE TABLE sui_progress_store
(
id INT PRIMARY KEY,
id INT PRIMARY KEY, -- dummy value
txn_digest bytea NOT NULL
);
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::{progress_store, token_transfer, token_transfer_data, sui_progress_store};
use crate::schema::{progress_store, sui_progress_store, token_transfer, token_transfer_data};
use diesel::{Identifiable, Insertable, Queryable, Selectable};

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
Expand Down
Loading

0 comments on commit 59d6a5e

Please sign in to comment.