diff --git a/scripts/create_schema.sql b/scripts/create_schema.sql index 9745276..3c1598e 100644 --- a/scripts/create_schema.sql +++ b/scripts/create_schema.sql @@ -162,6 +162,7 @@ CREATE TABLE transaction ( signatures BYTEA[], message_hash BYTEA, meta "TransactionStatusMeta", + write_version BIGINT, updated_on TIMESTAMP NOT NULL, CONSTRAINT transaction_pk PRIMARY KEY (slot, signature) ); diff --git a/src/postgres_client.rs b/src/postgres_client.rs index b9f22a4..9cd001d 100644 --- a/src/postgres_client.rs +++ b/src/postgres_client.rs @@ -27,7 +27,7 @@ use { std::{ collections::HashSet, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, thread::{self, sleep, Builder, JoinHandle}, @@ -1032,6 +1032,7 @@ pub struct ParallelPostgresClient { initialized_worker_count: Arc, sender: Sender, last_report: AtomicInterval, + transaction_write_version: AtomicU64, } impl ParallelPostgresClient { @@ -1095,6 +1096,7 @@ impl ParallelPostgresClient { startup_done_count, initialized_worker_count, sender, + transaction_write_version: AtomicU64::default(), }) } diff --git a/src/postgres_client/postgres_client_transaction.rs b/src/postgres_client/postgres_client_transaction.rs index 48eaa9d..a9c215c 100644 --- a/src/postgres_client/postgres_client_transaction.rs +++ b/src/postgres_client/postgres_client_transaction.rs @@ -24,6 +24,7 @@ use { solana_transaction_status::{ InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance, }, + std::sync::atomic::Ordering, }; const MAX_TRANSACTION_STATUS_LEN: usize = 256; @@ -144,6 +145,10 @@ pub struct DbTransaction { pub message_hash: Vec, pub meta: DbTransactionStatusMeta, pub signatures: Vec>, + /// This can be used to tell the order of transaction within a block + /// Given a slot, the transaction with a smaller write_version appears + /// before transactions with higher write_versions in a shred. + pub write_version: i64, } pub struct LogTransactionRequest { @@ -474,7 +479,11 @@ impl From<&TransactionStatusMeta> for DbTransactionStatusMeta { } } -fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> DbTransaction { +fn build_db_transaction( + slot: u64, + transaction_info: &ReplicaTransactionInfo, + transaction_write_version: u64, +) -> DbTransaction { DbTransaction { signature: transaction_info.signature.as_ref().to_vec(), is_vote: transaction_info.is_vote, @@ -505,6 +514,7 @@ fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> .as_ref() .to_vec(), meta: DbTransactionStatusMeta::from(transaction_info.transaction_status_meta), + write_version: transaction_write_version as i64, } } @@ -514,8 +524,8 @@ impl SimplePostgresClient { config: &GeyserPluginPostgresConfig, ) -> Result { let stmt = "INSERT INTO transaction AS txn (signature, is_vote, slot, message_type, legacy_message, \ - v0_loaded_message, signatures, message_hash, meta, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ + v0_loaded_message, signatures, message_hash, meta, write_version, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ ON CONFLICT (slot, signature) DO UPDATE SET is_vote=excluded.is_vote, \ message_type=excluded.message_type, \ legacy_message=excluded.legacy_message, \ @@ -523,6 +533,7 @@ impl SimplePostgresClient { signatures=excluded.signatures, \ message_hash=excluded.message_hash, \ meta=excluded.meta, \ + write_version=excluded.write_version, \ updated_on=excluded.updated_on"; let stmt = client.prepare(stmt); @@ -562,6 +573,7 @@ impl SimplePostgresClient { &transaction_info.signatures, &transaction_info.message_hash, &transaction_info.meta, + &transaction_info.write_version, &updated_on, ], ); @@ -583,9 +595,14 @@ impl ParallelPostgresClient { fn build_transaction_request( slot: u64, transaction_info: &ReplicaTransactionInfo, + transaction_write_version: u64, ) -> LogTransactionRequest { LogTransactionRequest { - transaction_info: build_db_transaction(slot, transaction_info), + transaction_info: build_db_transaction( + slot, + transaction_info, + transaction_write_version, + ), } } @@ -594,9 +611,12 @@ impl ParallelPostgresClient { transaction_info: &ReplicaTransactionInfo, slot: u64, ) -> Result<(), GeyserPluginError> { + self.transaction_write_version + .fetch_add(1, Ordering::Relaxed); let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request( slot, transaction_info, + self.transaction_write_version.load(Ordering::Relaxed), ))); if let Err(err) = self.sender.send(wrk_item) { @@ -1339,7 +1359,7 @@ pub(crate) mod tests { }; let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); + let db_transaction = build_db_transaction(slot, &transaction_info, 1); check_transaction(slot, &transaction_info, &db_transaction); } @@ -1383,7 +1403,7 @@ pub(crate) mod tests { }; let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); + let db_transaction = build_db_transaction(slot, &transaction_info, 1); check_transaction(slot, &transaction_info, &db_transaction); } }