Skip to content

Commit

Permalink
query transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Jun 25, 2024
1 parent c556c30 commit 55877b8
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 15 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
anyhow.workspace = true
futures.workspace = true
async-trait.workspace = true
bcs.workspace = true
bin-version.workspace = true
Expand All @@ -20,6 +21,8 @@ mysten-metrics.workspace = true
prometheus.workspace = true
serde_yaml.workspace = true
sui-bridge.workspace = true
sui-sdk.workspace = true
sui-json-rpc-types.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct Config {
pub start_block: u64,
pub metric_url: String,
pub metric_port: u16,
pub sui_rpc_url: Option<String>,
}

/// Load the config to run.
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod sui_worker;

#[derive(Clone)]
pub struct TokenTransfer {
chain_id: u8,
nonce: u64,
Expand All @@ -27,6 +30,7 @@ pub struct TokenTransfer {
data: Option<TokenTransferData>,
}

#[derive(Clone)]
pub struct TokenTransferData {
sender_address: Vec<u8>,
destination_chain: u8,
Expand Down Expand Up @@ -66,6 +70,7 @@ impl TokenTransfer {
}
}

#[derive(Clone)]
pub(crate) enum TokenTransferStatus {
DepositedUnfinalized,
Deposited,
Expand All @@ -85,6 +90,7 @@ impl Display for TokenTransferStatus {
}
}

#[derive(Clone)]
enum BridgeDataSource {
Sui,
Eth,
Expand Down
91 changes: 80 additions & 11 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@

use anyhow::Result;
use clap::*;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use tokio::task::JoinHandle;
use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore};
use sui_bridge_indexer::postgres_manager::{read_sui_progress_store, get_connection_pool, PgProgressStore};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_sdk::SuiClientBuilder;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use tokio::sync::oneshot;
Expand Down Expand Up @@ -86,18 +91,26 @@ async fn main() -> Result<()> {
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone());
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone());

let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()).await.unwrap();
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()).await.unwrap();
let handles = vec![unfinalized_handle, finalized_handle];
// TODO: add retry_with_max_elapsed_time
let progress = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
);
if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url,
db_url.clone(),
indexer_meterics.clone(),
).await.unwrap();
} else {
let _ = start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
).await;
}

let _ = tokio::try_join!(finalized_handle, unfinalized_handle, progress);
let _ = futures::future::join_all(handles).await;

Ok(())
}
Expand Down Expand Up @@ -137,3 +150,59 @@ async fn start_processing_sui_checkpoints(
)
.await
}

async fn start_processing_sui_checkpoints_by_querying_txes(
sui_rpc_url: String,
db_url: String,
indexer_meterics: BridgeIndexerMetrics,
) -> Result<Vec<JoinHandle<()>>> {
// metrics init

let pg_pool = get_connection_pool(db_url.clone());
let (tx, rx) = mysten_metrics::metered_channel::channel(
100,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&["sui_transaction_processing_queue"]),
);
let mut handles = vec![];
// FIXME cursor
let cursor = read_sui_progress_store(&pg_pool).unwrap();
let sui_client = SuiClientBuilder::default().build(sui_rpc_url).await?;
handles.push(spawn_logged_monitored_task!(
start_sui_tx_polling_task(sui_client, cursor, tx),
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_meterics.clone(),),
"handle_sui_transcations_loop"
));
Ok(handles)
// let (_exit_sender, exit_receiver) = oneshot::channel();

// let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint);
// let mut executor = IndexerExecutor::new(
// progress_store,
// 1, /* workflow types */
// ingestion_metrics,
// );

// let indexer_metrics_cloned = indexer_meterics.clone();

// let worker_pool = WorkerPool::new(
// SuiBridgeWorker::new(vec![], db_url, indexer_metrics_cloned),
// "bridge worker".into(),
// config.concurrency as usize,
// );
// executor.register(worker_pool).await?;
// executor
// .run(
// config.checkpoints_path.clone().into(),
// Some(config.remote_store_url.clone()),
// vec![], // optional remote store access options
// ReaderOptions::default(),
// exit_receiver,
// )
// .await
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ CREATE TABLE progress_store
task_name TEXT PRIMARY KEY,
checkpoint BIGINT NOT NULL
);

CREATE TABLE sui_progress_store
(
id INT PRIMARY KEY,
txn_digest bytea NOT NULL
);
9 changes: 8 additions & 1 deletion crates/sui-bridge-indexer/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::{progress_store, token_transfer, token_transfer_data};
use crate::schema::{progress_store, token_transfer, token_transfer_data, sui_progress_store};
use diesel::{Identifiable, Insertable, Queryable, Selectable};

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
Expand All @@ -11,6 +11,13 @@ pub struct ProgressStore {
pub checkpoint: i64,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = sui_progress_store, primary_key(txn_digest))]
pub struct SuiProgressStore {
pub id: i32, // Dummy value
pub txn_digest: Vec<u8>,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = token_transfer, primary_key(chain_id, nonce))]
pub struct TokenTransfer {
Expand Down
32 changes: 32 additions & 0 deletions crates/sui-bridge-indexer/src/postgres_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use crate::models::ProgressStore as DBProgressStore;
use crate::models::SuiProgressStore;
use crate::models::TokenTransfer as DBTokenTransfer;
use crate::models::TokenTransferData as DBTokenTransferData;
use crate::schema::progress_store::checkpoint;
use crate::schema::progress_store::dsl::progress_store;
use crate::schema::sui_progress_store::txn_digest;
use crate::schema::token_transfer_data;
use crate::{schema, schema::token_transfer, TokenTransfer};
use async_trait::async_trait;
Expand All @@ -17,10 +19,13 @@ use diesel::{
Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper,
};
use sui_data_ingestion_core::ProgressStore;
use sui_types::digests::TransactionDigest;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub(crate) type PgPool = Pool<ConnectionManager<PgConnection>>;

const SUI_PROGRESS_STORE_DUMMY_KEY: i32 = 1;

pub fn get_connection_pool(database_url: String) -> PgPool {
let manager = ConnectionManager::<PgConnection>::new(database_url);
Pool::builder()
Expand Down Expand Up @@ -52,6 +57,33 @@ pub fn write(pool: &PgPool, token_txns: Vec<TokenTransfer>) -> Result<(), anyhow
Ok(())
}

pub fn update_sui_progress_store(pool: &PgPool, tx_digest: TransactionDigest) -> Result<(), anyhow::Error> {
let mut conn = pool.get()?;
diesel::insert_into(schema::sui_progress_store::table)
.values(&SuiProgressStore {
id: SUI_PROGRESS_STORE_DUMMY_KEY,
txn_digest: tx_digest.inner().to_vec(),
})
.on_conflict(schema::sui_progress_store::dsl::id)
.do_update()
.set(txn_digest.eq(tx_digest.inner().to_vec()))
.execute(&mut conn)?;
Ok(())
}

pub fn read_sui_progress_store(pool: &PgPool) -> anyhow::Result<Option<TransactionDigest>> {
let mut conn = pool.get()?;
let val: Option<SuiProgressStore> = crate::schema::sui_progress_store::dsl::sui_progress_store
.select(SuiProgressStore::as_select())
.first(&mut conn)
.optional()?;
match val {
Some(val) => Ok(Some(TransactionDigest::try_from(val.txn_digest.as_slice())?)),
None => Ok(None),
}
}


pub fn get_latest_eth_token_transfer(
pool: &PgPool,
finalized: bool,
Expand Down
16 changes: 13 additions & 3 deletions crates/sui-bridge-indexer/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
// @generated automatically by Diesel CLI.

diesel::table! {
Expand All @@ -9,6 +7,13 @@ diesel::table! {
}
}

diesel::table! {
sui_progress_store (id) {
id -> Int4,
txn_digest -> Bytea,
}
}

diesel::table! {
token_transfer (chain_id, nonce, status) {
chain_id -> Int4,
Expand Down Expand Up @@ -38,4 +43,9 @@ diesel::table! {
}
}

diesel::allow_tables_to_appear_in_same_query!(progress_store, token_transfer, token_transfer_data,);
diesel::allow_tables_to_appear_in_same_query!(
progress_store,
sui_progress_store,
token_transfer,
token_transfer_data,
);
Loading

0 comments on commit 55877b8

Please sign in to comment.