Skip to content

Commit

Permalink
Merge pull request #23 from lijunwangs/write_version_in_transaction
Browse files Browse the repository at this point in the history
Write_version in transaction to tell the order of transaction within a block
  • Loading branch information
lijunwangs authored May 11, 2022
2 parents 5f4ef3a + 85e6c6c commit 736d3f9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
1 change: 1 addition & 0 deletions scripts/create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ CREATE TABLE transaction (
signatures BYTEA[],
message_hash BYTEA,
meta "TransactionStatusMeta",
write_version BIGINT,
updated_on TIMESTAMP NOT NULL,
CONSTRAINT transaction_pk PRIMARY KEY (slot, signature)
);
Expand Down
4 changes: 3 additions & 1 deletion src/postgres_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
std::{
collections::HashSet,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
thread::{self, sleep, Builder, JoinHandle},
Expand Down Expand Up @@ -1032,6 +1032,7 @@ pub struct ParallelPostgresClient {
initialized_worker_count: Arc<AtomicUsize>,
sender: Sender<DbWorkItem>,
last_report: AtomicInterval,
transaction_write_version: AtomicU64,
}

impl ParallelPostgresClient {
Expand Down Expand Up @@ -1095,6 +1096,7 @@ impl ParallelPostgresClient {
startup_done_count,
initialized_worker_count,
sender,
transaction_write_version: AtomicU64::default(),
})
}

Expand Down
32 changes: 26 additions & 6 deletions src/postgres_client/postgres_client_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use {
solana_transaction_status::{
InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance,
},
std::sync::atomic::Ordering,
};

const MAX_TRANSACTION_STATUS_LEN: usize = 256;
Expand Down Expand Up @@ -144,6 +145,10 @@ pub struct DbTransaction {
pub message_hash: Vec<u8>,
pub meta: DbTransactionStatusMeta,
pub signatures: Vec<Vec<u8>>,
/// This can be used to tell the order of transaction within a block
/// Given a slot, the transaction with a smaller write_version appears
/// before transactions with higher write_versions in a shred.
pub write_version: i64,
}

pub struct LogTransactionRequest {
Expand Down Expand Up @@ -474,7 +479,11 @@ impl From<&TransactionStatusMeta> for DbTransactionStatusMeta {
}
}

fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> DbTransaction {
fn build_db_transaction(
slot: u64,
transaction_info: &ReplicaTransactionInfo,
transaction_write_version: u64,
) -> DbTransaction {
DbTransaction {
signature: transaction_info.signature.as_ref().to_vec(),
is_vote: transaction_info.is_vote,
Expand Down Expand Up @@ -505,6 +514,7 @@ fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) ->
.as_ref()
.to_vec(),
meta: DbTransactionStatusMeta::from(transaction_info.transaction_status_meta),
write_version: transaction_write_version as i64,
}
}

Expand All @@ -514,15 +524,16 @@ impl SimplePostgresClient {
config: &GeyserPluginPostgresConfig,
) -> Result<Statement, GeyserPluginError> {
let stmt = "INSERT INTO transaction AS txn (signature, is_vote, slot, message_type, legacy_message, \
v0_loaded_message, signatures, message_hash, meta, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \
v0_loaded_message, signatures, message_hash, meta, write_version, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
ON CONFLICT (slot, signature) DO UPDATE SET is_vote=excluded.is_vote, \
message_type=excluded.message_type, \
legacy_message=excluded.legacy_message, \
v0_loaded_message=excluded.v0_loaded_message, \
signatures=excluded.signatures, \
message_hash=excluded.message_hash, \
meta=excluded.meta, \
write_version=excluded.write_version, \
updated_on=excluded.updated_on";

let stmt = client.prepare(stmt);
Expand Down Expand Up @@ -562,6 +573,7 @@ impl SimplePostgresClient {
&transaction_info.signatures,
&transaction_info.message_hash,
&transaction_info.meta,
&transaction_info.write_version,
&updated_on,
],
);
Expand All @@ -583,9 +595,14 @@ impl ParallelPostgresClient {
fn build_transaction_request(
slot: u64,
transaction_info: &ReplicaTransactionInfo,
transaction_write_version: u64,
) -> LogTransactionRequest {
LogTransactionRequest {
transaction_info: build_db_transaction(slot, transaction_info),
transaction_info: build_db_transaction(
slot,
transaction_info,
transaction_write_version,
),
}
}

Expand All @@ -594,9 +611,12 @@ impl ParallelPostgresClient {
transaction_info: &ReplicaTransactionInfo,
slot: u64,
) -> Result<(), GeyserPluginError> {
self.transaction_write_version
.fetch_add(1, Ordering::Relaxed);
let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request(
slot,
transaction_info,
self.transaction_write_version.load(Ordering::Relaxed),
)));

if let Err(err) = self.sender.send(wrk_item) {
Expand Down Expand Up @@ -1339,7 +1359,7 @@ pub(crate) mod tests {
};

let slot = 54;
let db_transaction = build_db_transaction(slot, &transaction_info);
let db_transaction = build_db_transaction(slot, &transaction_info, 1);
check_transaction(slot, &transaction_info, &db_transaction);
}

Expand Down Expand Up @@ -1383,7 +1403,7 @@ pub(crate) mod tests {
};

let slot = 54;
let db_transaction = build_db_transaction(slot, &transaction_info);
let db_transaction = build_db_transaction(slot, &transaction_info, 1);
check_transaction(slot, &transaction_info, &db_transaction);
}
}

0 comments on commit 736d3f9

Please sign in to comment.