From df341dbac8f725f8432c6f4ddf6ea5dc33c80c6f Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 4 Jan 2024 16:34:23 -0500 Subject: [PATCH 1/5] Two phase commit for mobile --- Cargo.lock | 1 + mobile_packet_verifier/Cargo.toml | 1 + .../migrations/7_two_phase_commit.sql | 11 ++ mobile_packet_verifier/src/accumulate.rs | 14 +- mobile_packet_verifier/src/burner.rs | 171 +++++++++++------- mobile_packet_verifier/src/lib.rs | 8 + 6 files changed, 135 insertions(+), 71 deletions(-) create mode 100644 mobile_packet_verifier/migrations/7_two_phase_commit.sql diff --git a/Cargo.lock b/Cargo.lock index f90c2bc30..877cce7a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4149,6 +4149,7 @@ dependencies = [ "serde", "sha2 0.10.6", "solana", + "solana-sdk", "sqlx", "task-manager", "thiserror", diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 6e49b156d..c2ce10587 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -21,6 +21,7 @@ helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "sol metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} +solana-sdk = {workspace = true} serde = {workspace = true} sqlx = {workspace = true} solana = {path = "../solana"} diff --git a/mobile_packet_verifier/migrations/7_two_phase_commit.sql b/mobile_packet_verifier/migrations/7_two_phase_commit.sql new file mode 100644 index 000000000..06df4b6bd --- /dev/null +++ b/mobile_packet_verifier/migrations/7_two_phase_commit.sql @@ -0,0 +1,11 @@ +CREATE TYPE solana_transaction AS ( + signature TEXT NOT NULL, + time_of_submission TIMESTAMPTZ NOT NULL +); + +CREATE TABLE payer_totals ( + payer TEXT PRIMARY KEY, + total_dcs BIGINT NOT NULL, + txn solana_transaction +); + diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index b0da9b8e6..63d4521e3 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -59,13 +59,25 @@ pub async fn accumulate_sessions( "# ) .bind(event.pub_key) - .bind(event.payer) + .bind(&event.payer) .bind(event.upload_bytes as i64) .bind(event.download_bytes as i64) .bind(report.report.rewardable_bytes as i64) .bind(curr_file_ts) .execute(&mut *conn) .await?; + sqlx::query( + r#" + INSERT INTO payer_totals (payer, total_dcs) + VALUES ($1, $2) + ON CONFLICT (payer) DO UPDATE SET + total_dcs = total_dcs + EXCLUDED.total_dcs + "#, + ) + .bind(event.payer) + .bind(crate::bytes_to_dc(event.upload_bytes + event.download_bytes) as i64) + .execute(&mut *conn) + .await?; } Ok(()) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index df206a029..7d3e3b465 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,10 +1,10 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::SolanaNetwork; +use solana::{GetSignature, SolanaNetwork}; +use solana_sdk::signature::Signature; use sqlx::{FromRow, Pool, Postgres}; -use std::collections::HashMap; #[derive(FromRow)] pub struct DataTransferSession { @@ -17,17 +17,19 @@ pub struct DataTransferSession { last_timestamp: DateTime, } -#[derive(Default)] +#[derive(FromRow)] pub struct PayerTotals { - total_dcs: u64, - sessions: Vec, + payer: PublicKeyBinary, + total_dcs: i64, + txn: Option, } -impl PayerTotals { - fn push_sess(&mut self, sess: DataTransferSession) { - self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64); - self.sessions.push(sess); - } +#[derive(sqlx::Type)] +#[sqlx(type_name = "solana_transaction")] +pub struct SolanaTransaction { + signature: String, + amount: i64, + time_of_submission: DateTime, } pub struct Burner { @@ -50,6 +52,8 @@ pub enum BurnError { FileStoreError(#[from] file_store::Error), #[error("sql error: {0}")] SqlError(#[from] sqlx::Error), + #[error("Chrono error: {0}")] + ChronoError(#[from] chrono::OutOfRangeError), #[error("solana error: {0}")] SolanaError(E), } @@ -59,60 +63,105 @@ where S: SolanaNetwork, { pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { - // Fetch all of the sessions - let sessions: Vec = - sqlx::query_as("SELECT * FROM data_transfer_sessions") - .fetch_all(pool) - .await?; - - // Fetch all of the sessions and group by the payer - let mut payer_totals = HashMap::::new(); - for session in sessions.into_iter() { - payer_totals - .entry(session.payer.clone()) - .or_default() - .push_sess(session); - } + // Fetch all of the payer totals: + let totals: Vec = sqlx::query_as("SELECT * FROM payer_totals") + .fetch_all(pool) + .await?; - for ( + for PayerTotals { payer, - PayerTotals { - total_dcs, - sessions, - }, - ) in payer_totals.into_iter() + total_dcs, + txn, + } in totals { - let payer_balance = self - .solana - .payer_balance(&payer) - .await - .map_err(BurnError::SolanaError)?; - - if payer_balance < total_dcs { - tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); - continue; - } - - tracing::info!(%total_dcs, %payer, "Burning DC"); - if self.burn_data_credits(&payer, total_dcs).await.is_err() { - // We have failed to burn data credits: - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false"); - continue; + let mut total_dcs = total_dcs as u64; + + // Check if there is a pending transaction + if let Some(SolanaTransaction { + signature, + amount, + time_of_submission, + }) = txn + { + // Sleep for at least a minute since the time of submission to + // give the transaction plenty of time to be confirmed: + let time_since_submission = Utc::now() - time_of_submission; + if Duration::minutes(1) > time_since_submission { + tokio::time::sleep((Duration::minutes(1) - time_since_submission).to_std()?) + .await; + } + + let signature: Signature = signature.parse().unwrap(); + if self + .solana + .confirm_transaction(&signature) + .await + .map_err(BurnError::SolanaError)? + { + // This transaction has been confirmed. Subtract the amount confirmed from + // the total amount burned and remove the transaction. + total_dcs -= amount as u64; + sqlx::query( + "UPDATE payer_totals SET txn = NULL, total_dcs = $2 WHERE payer = $1", + ) + .bind(&payer) + .bind(total_dcs as i64) + .execute(pool) + .await?; + } else { + // Transaction is no longer valid. Remove it from the payer totals. + sqlx::query("UPDATE payer_totals SET txn = NULL WHERE payer = $1") + .bind(&payer) + .execute(pool) + .await?; + } } - // We succesfully managed to burn data credits: - - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "true"); + // Get the current sessions we need to write, before creating any new transactions + let sessions: Vec = + sqlx::query_as("SELECT * FROM data_transfer_session WHERE payer = $1") + .bind(&payer) + .fetch_all(pool) + .await?; - // Delete from the data transfer session and write out to S3 + // Create a new transaction for the given amount, if there is any left. + // If total_dcs is zero, that means we need to clear out the current sessions as they are paid for. + if total_dcs != 0 { + let txn = self + .solana + .make_burn_transaction(&payer, total_dcs) + .await + .map_err(BurnError::SolanaError)?; + sqlx::query("UPDATE payer_totals SET txn = $2 WHERE payer = $1") + .bind(&payer) + .bind(SolanaTransaction { + signature: txn.get_signature().to_string(), + amount: total_dcs as i64, + time_of_submission: Utc::now(), + }) + .execute(pool) + .await?; + // Attempt to execute the transaction + if self.solana.submit_transaction(&txn).await.is_err() { + // We have failed to burn data credits: + metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false"); + continue; + } + } + // Submit the sessions sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") .bind(&payer) .execute(pool) .await?; + sqlx::query("DELETE FROM payer_totals WHERE payer = $1") + .bind(&payer) + .execute(pool) + .await?; + for session in sessions { - let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); + let num_dcs = crate::bytes_to_dc(session.rewardable_bytes as u64); self.valid_sessions .write( ValidDataTransferSession { @@ -133,22 +182,4 @@ where Ok(()) } - - async fn burn_data_credits( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result<(), S::Error> { - let txn = self.solana.make_burn_transaction(payer, amount).await?; - self.solana.submit_transaction(&txn).await?; - Ok(()) - } -} - -const BYTES_PER_DC: u64 = 20_000; - -fn bytes_to_dc(bytes: u64) -> u64 { - let bytes = bytes.max(BYTES_PER_DC); - // Integer div/ceil from: https://stackoverflow.com/a/2745086 - (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC } diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 4d6c71332..3a0a93520 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -3,3 +3,11 @@ pub mod burner; pub mod daemon; pub mod event_ids; pub mod settings; + +const BYTES_PER_DC: u64 = 20_000; + +pub fn bytes_to_dc(bytes: u64) -> u64 { + let bytes = bytes.max(BYTES_PER_DC); + // Integer div/ceil from: https://stackoverflow.com/a/2745086 + (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC +} From 3d1833f0c020f3068e4e4ee0491e62e78ebbb8d7 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 4 Jan 2024 16:37:42 -0500 Subject: [PATCH 2/5] Simplify code a bit --- mobile_packet_verifier/src/burner.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 7d3e3b465..e09e50d68 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -101,20 +101,14 @@ where // This transaction has been confirmed. Subtract the amount confirmed from // the total amount burned and remove the transaction. total_dcs -= amount as u64; - sqlx::query( - "UPDATE payer_totals SET txn = NULL, total_dcs = $2 WHERE payer = $1", - ) + } + // If the transaction has not been confirmed, we still want to remove the transaction. + // The total_dcs column remains the same. + sqlx::query("UPDATE payer_totals SET txn = NULL, total_dcs = $2 WHERE payer = $1") .bind(&payer) .bind(total_dcs as i64) .execute(pool) .await?; - } else { - // Transaction is no longer valid. Remove it from the payer totals. - sqlx::query("UPDATE payer_totals SET txn = NULL WHERE payer = $1") - .bind(&payer) - .execute(pool) - .await?; - } } // Get the current sessions we need to write, before creating any new transactions From 1dad0da8c037463cbba1c3eb4d17235e16fb1fe8 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 18 Jan 2024 14:53:44 -0500 Subject: [PATCH 3/5] Add amount to solana transaction struct --- mobile_packet_verifier/migrations/7_two_phase_commit.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/mobile_packet_verifier/migrations/7_two_phase_commit.sql b/mobile_packet_verifier/migrations/7_two_phase_commit.sql index 06df4b6bd..08d370417 100644 --- a/mobile_packet_verifier/migrations/7_two_phase_commit.sql +++ b/mobile_packet_verifier/migrations/7_two_phase_commit.sql @@ -1,5 +1,6 @@ CREATE TYPE solana_transaction AS ( signature TEXT NOT NULL, + amount BIGINT NOT NULL, time_of_submission TIMESTAMPTZ NOT NULL ); From 7082836518d1a649d4f9fc66f396a650c679cf27 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 1 Feb 2024 13:55:07 -0500 Subject: [PATCH 4/5] Make solana_transaction type pending_txns table --- .../migrations/7_two_phase_commit.sql | 7 ++- mobile_packet_verifier/src/burner.rs | 62 ++++++++++--------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/mobile_packet_verifier/migrations/7_two_phase_commit.sql b/mobile_packet_verifier/migrations/7_two_phase_commit.sql index 08d370417..9e39f42bf 100644 --- a/mobile_packet_verifier/migrations/7_two_phase_commit.sql +++ b/mobile_packet_verifier/migrations/7_two_phase_commit.sql @@ -1,7 +1,8 @@ -CREATE TYPE solana_transaction AS ( - signature TEXT NOT NULL, +CREATE TABLE pending_txns AS ( + signature TEXT PRIMARY KEY, + payer TEXT NOT NULL, amount BIGINT NOT NULL, - time_of_submission TIMESTAMPTZ NOT NULL + time_of_submission TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE payer_totals ( diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index e09e50d68..037e8836f 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -21,12 +21,10 @@ pub struct DataTransferSession { pub struct PayerTotals { payer: PublicKeyBinary, total_dcs: i64, - txn: Option, } -#[derive(sqlx::Type)] -#[sqlx(type_name = "solana_transaction")] -pub struct SolanaTransaction { +#[derive(FromRow)] +pub struct PendingTxn { signature: String, amount: i64, time_of_submission: DateTime, @@ -68,20 +66,22 @@ where .fetch_all(pool) .await?; - for PayerTotals { - payer, - total_dcs, - txn, - } in totals - { + for PayerTotals { payer, total_dcs } in totals { let mut total_dcs = total_dcs as u64; // Check if there is a pending transaction - if let Some(SolanaTransaction { + if let Some(PendingTxn { signature, amount, time_of_submission, - }) = txn + }) = sqlx::query_as( + r#" + SELECT signature, amount, time_of_submission FROM pending_txns WHERE payer = $1 + "#, + ) + .bind(&payer) + .fetch_optional(pool) + .await? { // Sleep for at least a minute since the time of submission to // give the transaction plenty of time to be confirmed: @@ -91,10 +91,10 @@ where .await; } - let signature: Signature = signature.parse().unwrap(); + let sig: Signature = signature.parse().unwrap(); if self .solana - .confirm_transaction(&signature) + .confirm_transaction(&sig) .await .map_err(BurnError::SolanaError)? { @@ -104,11 +104,17 @@ where } // If the transaction has not been confirmed, we still want to remove the transaction. // The total_dcs column remains the same. - sqlx::query("UPDATE payer_totals SET txn = NULL, total_dcs = $2 WHERE payer = $1") + let mut transaction = pool.begin().await?; + sqlx::query("UPDATE payer_totals SET total_dcs = $2 WHERE payer = $1") .bind(&payer) .bind(total_dcs as i64) - .execute(pool) + .execute(&mut transaction) + .await?; + sqlx::query("DELETE FROM pending_txns WHERE signature = $1") + .bind(&signature) + .execute(&mut transaction) .await?; + transaction.commit().await?; } // Get the current sessions we need to write, before creating any new transactions @@ -126,15 +132,14 @@ where .make_burn_transaction(&payer, total_dcs) .await .map_err(BurnError::SolanaError)?; - sqlx::query("UPDATE payer_totals SET txn = $2 WHERE payer = $1") - .bind(&payer) - .bind(SolanaTransaction { - signature: txn.get_signature().to_string(), - amount: total_dcs as i64, - time_of_submission: Utc::now(), - }) - .execute(pool) - .await?; + sqlx::query( + "INSERT INTO pending_txns (signature, payer, amount) VALUES ($1, $2, $3)", + ) + .bind(txn.get_signature().to_string()) + .bind(&payer) + .bind(total_dcs as i64) + .execute(pool) + .await?; // Attempt to execute the transaction if self.solana.submit_transaction(&txn).await.is_err() { // We have failed to burn data credits: @@ -144,15 +149,16 @@ where } // Submit the sessions + let mut transaction = pool.begin().await?; sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") .bind(&payer) - .execute(pool) + .execute(&mut transaction) .await?; - sqlx::query("DELETE FROM payer_totals WHERE payer = $1") .bind(&payer) - .execute(pool) + .execute(&mut transaction) .await?; + transaction.commit().await?; for session in sessions { let num_dcs = crate::bytes_to_dc(session.rewardable_bytes as u64); From 8f31682844cfa69e1419290a66830a3243a866b3 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Mon, 5 Feb 2024 17:27:10 -0500 Subject: [PATCH 5/5] Use separated out transaction initiation and confirmation --- .../migrations/7_two_phase_commit.sql | 28 +- mobile_packet_verifier/src/accumulate.rs | 20 +- mobile_packet_verifier/src/burner.rs | 340 +++++++++++------- mobile_packet_verifier/src/daemon.rs | 55 ++- mobile_packet_verifier/src/lib.rs | 8 - mobile_packet_verifier/src/settings.rs | 11 +- 6 files changed, 271 insertions(+), 191 deletions(-) diff --git a/mobile_packet_verifier/migrations/7_two_phase_commit.sql b/mobile_packet_verifier/migrations/7_two_phase_commit.sql index 9e39f42bf..2ca555cdd 100644 --- a/mobile_packet_verifier/migrations/7_two_phase_commit.sql +++ b/mobile_packet_verifier/migrations/7_two_phase_commit.sql @@ -1,13 +1,29 @@ CREATE TABLE pending_txns AS ( signature TEXT PRIMARY KEY, + pub_key TEXT NOT NULL, payer TEXT NOT NULL, - amount BIGINT NOT NULL, - time_of_submission TIMESTAMPTZ NOT NULL DEFAULT NOW() + num_dcs BIGINT NOT NULL, + uploaded_bytes BIGINT NOT NULL, + downloaded_bytes BIGINT NOT NULL, + rewardable_bytes BIGINT NOT NULL, + first_timestamp TIMESTAMPTZ NOT NULL, + last_timestmap TIMESTAMPTZ NOT NULL ); -CREATE TABLE payer_totals ( - payer TEXT PRIMARY KEY, - total_dcs BIGINT NOT NULL, - txn solana_transaction +CREATE TABLE data_transfer_sessions_by_row ( + pub_key TEXT NOT NULL, + payer TEXT NOT NULL, + uploaded_bytes BIGINT NOT NULL, + downloaded_bytes BIGINT NOT NULL, + rewardable_bytes BIGINT NOT NULL, + session_timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY(pub_key, payer, session_timestamp) ); +INSERT INTO data_transfer_sessions_by_row + (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, session_timestamp) +SELECT pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, last_timestamp as session_timestamp FROM data_transfer_sessions; + +ALTER TABLE data_transfer_sessions RENAME TO old_data_transfer_sessions; +ALTER TABLE data_transfer_sessions_by_row to data_transfer_sessions; +DROP TABLE data_transfer_sessions; diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 63d4521e3..a125250f0 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -49,13 +49,7 @@ pub async fn accumulate_sessions( let event = report.report.data_transfer_usage; sqlx::query( r#" - INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $6) - ON CONFLICT (pub_key, payer) DO UPDATE SET - uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, - last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, session_timestamp) VALUES ($1, $2, $3, $4, $5, $6); "# ) .bind(event.pub_key) @@ -66,18 +60,6 @@ pub async fn accumulate_sessions( .bind(curr_file_ts) .execute(&mut *conn) .await?; - sqlx::query( - r#" - INSERT INTO payer_totals (payer, total_dcs) - VALUES ($1, $2) - ON CONFLICT (payer) DO UPDATE SET - total_dcs = total_dcs + EXCLUDED.total_dcs - "#, - ) - .bind(event.payer) - .bind(crate::bytes_to_dc(event.upload_bytes + event.download_bytes) as i64) - .execute(&mut *conn) - .await?; } Ok(()) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 037e8836f..ebf6aa352 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,10 +1,12 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; use solana::{GetSignature, SolanaNetwork}; -use solana_sdk::signature::Signature; -use sqlx::{FromRow, Pool, Postgres}; +use solana_sdk::signature::{ParseSignatureError, Signature}; +use sqlx::{FromRow, PgPool}; +use task_manager::ManagedTask; +use tokio::time::{sleep_until, Duration, Instant}; #[derive(FromRow)] pub struct DataTransferSession { @@ -17,169 +19,253 @@ pub struct DataTransferSession { last_timestamp: DateTime, } -#[derive(FromRow)] -pub struct PayerTotals { - payer: PublicKeyBinary, - total_dcs: i64, +#[derive(thiserror::Error, Debug)] +pub enum BurnError { + #[error("file store error: {0}")] + FileStoreError(#[from] file_store::Error), + #[error("sql error: {0}")] + SqlError(#[from] sqlx::Error), + #[error("Chrono error: {0}")] + ChronoError(#[from] chrono::OutOfRangeError), + #[error("solana error: {0}")] + SolanaError(E), + #[error("parse signature error: {0}")] + ParseSignatureError(#[from] ParseSignatureError), +} + +pub struct BurnInitiator { + solana: S, + pool: PgPool, + burn_period: Duration, +} + +impl BurnInitiator { + pub fn new(solana: S, pool: PgPool, burn_period: Duration) -> Self { + Self { + solana, + pool, + burn_period, + } + } +} + +impl ManagedTask for BurnInitiator +where + S: SolanaNetwork, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + +impl BurnInitiator +where + S: SolanaNetwork, +{ + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + // Initial burn period is one minute + let mut burn_time = Instant::now() + Duration::from_secs(60); + loop { + #[rustfmt::skip] + tokio::select! { + _ = sleep_until(burn_time) => { + // Initiate new burns + self.initiate_burns().await?; + burn_time = Instant::now() + self.burn_period; + } + _ = shutdown.clone() => return Ok(()), + } + } + } + + pub async fn initiate_burns(&self) -> anyhow::Result<()> { + let sessions: Vec = sqlx::query_as( + r#" + SELECT pub_key, payer, SUM(uploaded_bytes) as uploaded_bytes, SUM(downloaded_bytes) as downloaded_bytes, + SUM(rewardable_bytes) as rewardable_bytes, MIN(session_timestamp) as first_timestamp, MAX(session_timestamp) as last_timestamp + FROM data_transfer_sessions GROUP BY pub_key, payer + WHERE session_timestamp < $1 + "# + ).bind(Utc::now()) + .fetch_all(&self.pool) + .await?; + + for session in sessions.into_iter() { + let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); + let txn = self + .solana + .make_burn_transaction(&session.payer, num_dcs) + .await?; + let mut transaction = self.pool.begin().await?; + sqlx::query( + r#" + INSERT INTO pending_txns (signature, pub_key, payer, num_dcs, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timesetamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + "# + ) + .bind(txn.get_signature().to_string()) + .bind(&session.pub_key) + .bind(&session.payer) + .bind(session.uploaded_bytes) + .bind(session.downloaded_bytes) + .bind(session.rewardable_bytes) + .bind(session.first_timestamp) + .bind(session.last_timestamp) + .execute(&mut transaction) + .await?; + sqlx::query( + r#" + DELETE FROM pending_txns WHERE pub_key = $1 AND payer = $2 AND session_timestamp >= $3 and session_timestamp <= $4 + "# + ) + .bind(&session.pub_key) + .bind(&session.payer) + .bind(session.first_timestamp) + .bind(session.last_timestamp) + .execute(&mut transaction) + .await?; + transaction.commit().await?; + // We should make this a quick submission that doesn't check for confirmation + let _ = self.solana.submit_transaction(&txn).await; + } + Ok(()) + } } #[derive(FromRow)] pub struct PendingTxn { signature: String, - amount: i64, - time_of_submission: DateTime, + pub_key: PublicKeyBinary, + payer: PublicKeyBinary, + uploaded_bytes: i64, + downloaded_bytes: i64, + rewardable_bytes: i64, + first_timestamp: DateTime, + last_timestamp: DateTime, } -pub struct Burner { +pub struct BurnConfirmer { valid_sessions: FileSinkClient, solana: S, + pool: PgPool, + confirmation_period: Duration, } -impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { +impl BurnConfirmer { + pub fn new( + valid_sessions: FileSinkClient, + solana: S, + pool: PgPool, + confirmation_period: Duration, + ) -> Self { Self { valid_sessions, solana, + pool, + confirmation_period, } } } -#[derive(thiserror::Error, Debug)] -pub enum BurnError { - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), - #[error("Chrono error: {0}")] - ChronoError(#[from] chrono::OutOfRangeError), - #[error("solana error: {0}")] - SolanaError(E), +impl ManagedTask for BurnConfirmer +where + S: SolanaNetwork, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } } -impl Burner +impl BurnConfirmer where S: SolanaNetwork, { - pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { - // Fetch all of the payer totals: - let totals: Vec = sqlx::query_as("SELECT * FROM payer_totals") - .fetch_all(pool) - .await?; - - for PayerTotals { payer, total_dcs } in totals { - let mut total_dcs = total_dcs as u64; - - // Check if there is a pending transaction - if let Some(PendingTxn { - signature, - amount, - time_of_submission, - }) = sqlx::query_as( - r#" - SELECT signature, amount, time_of_submission FROM pending_txns WHERE payer = $1 - "#, - ) - .bind(&payer) - .fetch_optional(pool) - .await? - { - // Sleep for at least a minute since the time of submission to - // give the transaction plenty of time to be confirmed: - let time_since_submission = Utc::now() - time_of_submission; - if Duration::minutes(1) > time_since_submission { - tokio::time::sleep((Duration::minutes(1) - time_since_submission).to_std()?) - .await; - } - - let sig: Signature = signature.parse().unwrap(); - if self - .solana - .confirm_transaction(&sig) - .await - .map_err(BurnError::SolanaError)? - { - // This transaction has been confirmed. Subtract the amount confirmed from - // the total amount burned and remove the transaction. - total_dcs -= amount as u64; - } - // If the transaction has not been confirmed, we still want to remove the transaction. - // The total_dcs column remains the same. - let mut transaction = pool.begin().await?; - sqlx::query("UPDATE payer_totals SET total_dcs = $2 WHERE payer = $1") - .bind(&payer) - .bind(total_dcs as i64) - .execute(&mut transaction) - .await?; - sqlx::query("DELETE FROM pending_txns WHERE signature = $1") - .bind(&signature) - .execute(&mut transaction) - .await?; - transaction.commit().await?; + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + // Initial confirmation period is two minutes + let mut confirm_time = Instant::now() + Duration::from_secs(120); + loop { + #[rustfmt::skip] + tokio::select! { + _ = sleep_until(confirm_time) => { + // Initiate new burns + self.confirm_burns().await?; + confirm_time = Instant::now() + self.confirmation_period; + } + _ = shutdown.clone() => return Ok(()), } + } + } - // Get the current sessions we need to write, before creating any new transactions - let sessions: Vec = - sqlx::query_as("SELECT * FROM data_transfer_session WHERE payer = $1") - .bind(&payer) - .fetch_all(pool) - .await?; - - // Create a new transaction for the given amount, if there is any left. - // If total_dcs is zero, that means we need to clear out the current sessions as they are paid for. - if total_dcs != 0 { - let txn = self - .solana - .make_burn_transaction(&payer, total_dcs) - .await - .map_err(BurnError::SolanaError)?; - sqlx::query( - "INSERT INTO pending_txns (signature, payer, amount) VALUES ($1, $2, $3)", - ) - .bind(txn.get_signature().to_string()) - .bind(&payer) - .bind(total_dcs as i64) - .execute(pool) - .await?; - // Attempt to execute the transaction - if self.solana.submit_transaction(&txn).await.is_err() { - // We have failed to burn data credits: - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false"); - continue; - } - } + pub async fn confirm_burns(&self) -> anyhow::Result<()> { + let pending_txns: Vec = sqlx::query_as(r#"SELECT * FROM pending_txns"#) + .fetch_all(&self.pool) + .await?; - // Submit the sessions - let mut transaction = pool.begin().await?; - sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") - .bind(&payer) + for pending_txn in pending_txns { + let txn: Signature = pending_txn.signature.parse()?; + let mut transaction = self.pool.begin().await?; + let num_dcs = bytes_to_dc(pending_txn.rewardable_bytes as u64); + sqlx::query(r#"DELETE FROM pending_txns WHERE signature = $1"#) + .bind(pending_txn.signature) .execute(&mut transaction) .await?; - sqlx::query("DELETE FROM payer_totals WHERE payer = $1") - .bind(&payer) - .execute(&mut transaction) - .await?; - transaction.commit().await?; - - for session in sessions { - let num_dcs = crate::bytes_to_dc(session.rewardable_bytes as u64); + if self.solana.confirm_transaction(&txn).await? { self.valid_sessions .write( ValidDataTransferSession { - pub_key: session.pub_key.into(), - payer: session.payer.into(), - upload_bytes: session.uploaded_bytes as u64, - download_bytes: session.downloaded_bytes as u64, - rewardable_bytes: session.rewardable_bytes as u64, + pub_key: pending_txn.pub_key.into(), + payer: pending_txn.payer.into(), + upload_bytes: pending_txn.uploaded_bytes as u64, + download_bytes: pending_txn.downloaded_bytes as u64, + rewardable_bytes: pending_txn.rewardable_bytes as u64, num_dcs, - first_timestamp: session.first_timestamp.encode_timestamp_millis(), - last_timestamp: session.last_timestamp.encode_timestamp_millis(), + first_timestamp: pending_txn.first_timestamp.encode_timestamp_millis(), + last_timestamp: pending_txn.last_timestamp.encode_timestamp_millis(), }, &[], ) .await?; + } else { + // If we can't confirm the transaction, we can just submit it and check the pending + // transaction next time + let new_txn = self + .solana + .make_burn_transaction(&pending_txn.payer, num_dcs) + .await?; + sqlx::query( + r#" + INSERT INTO pending_txns (signature, pub_key, payer, num_dcs, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + "# + ) + .bind(new_txn.get_signature().to_string()) + .bind(pending_txn.pub_key) + .bind(pending_txn.payer) + .bind(num_dcs as i64) + .bind(pending_txn.uploaded_bytes) + .bind(pending_txn.downloaded_bytes) + .bind(pending_txn.rewardable_bytes) + .bind(pending_txn.first_timestamp) + .bind(pending_txn.last_timestamp) + .execute(&mut transaction) + .await?; + let _ = self.solana.submit_transaction(&new_txn).await; } } Ok(()) } } + +const BYTES_PER_DC: u64 = 20_000; + +pub fn bytes_to_dc(bytes: u64) -> u64 { + let bytes = bytes.max(BYTES_PER_DC); + // Integer div/ceil from: https://stackoverflow.com/a/2745086 + (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC +} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index cf09dfc0f..1dbc377fc 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,4 +1,8 @@ -use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; +use crate::{ + burner::{BurnConfirmer, BurnInitiator}, + event_ids::EventIdPurger, + settings::Settings, +}; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -13,39 +17,30 @@ use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, AuthorizationClient, GatewayClient, }; -use solana::{SolanaNetwork, SolanaRpc}; +use solana::SolanaRpc; use sqlx::{Pool, Postgres}; use task_manager::{ManagedTask, TaskManager}; -use tokio::{ - sync::mpsc::Receiver, - time::{sleep_until, Duration, Instant}, -}; +use tokio::{sync::mpsc::Receiver, time::Duration}; -pub struct Daemon { +pub struct Daemon { pool: Pool, - burner: Burner, reports: Receiver>, - burn_period: Duration, gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, } -impl Daemon { +impl Daemon { pub fn new( - settings: &Settings, pool: Pool, reports: Receiver>, - burner: Burner, gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, - burner, reports, - burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64), gateway_info_resolver, authorization_verifier, invalid_data_session_report_sink, @@ -53,9 +48,8 @@ impl Daemon { } } -impl ManagedTask for Daemon +impl ManagedTask for Daemon where - S: SolanaNetwork, GIR: GatewayInfoResolver, AV: AuthorizationVerifier + 'static, { @@ -67,15 +61,12 @@ where } } -impl Daemon +impl Daemon where - S: SolanaNetwork, GIR: GatewayInfoResolver, AV: AuthorizationVerifier, { pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { - // Set the initial burn period to one minute - let mut burn_time = Instant::now() + Duration::from_secs(60); loop { tokio::select! { file = self.reports.recv() => { @@ -90,11 +81,6 @@ where transaction.commit().await?; self.invalid_data_session_report_sink.commit().await?; }, - _ = sleep_until(burn_time) => { - // It's time to burn - self.burner.burn(&self.pool).await?; - burn_time = Instant::now() + self.burn_period; - } _ = shutdown.clone() => return Ok(()), } } @@ -138,6 +124,19 @@ impl Cmd { .create() .await?; + let burn_initiator = BurnInitiator::new( + solana.clone(), + pool.clone(), + Duration::from_secs(60 * 60 * settings.burn_period), + ); + + let burn_confirmer = BurnConfirmer::new( + valid_sessions, + solana, + pool.clone(), + Duration::from_secs(60 * settings.confirmation_period), + ); + let (invalid_sessions, invalid_sessions_server) = FileSinkBuilder::new( FileType::InvalidDataTransferSessionIngestReport, store_base_path, @@ -148,8 +147,6 @@ impl Cmd { .create() .await?; - let burner = Burner::new(valid_sessions, solana); - let file_store = FileStore::from_settings(&settings.ingest).await?; let (reports, reports_server) = @@ -167,10 +164,8 @@ impl Cmd { let auth_client = AuthorizationClient::from_settings(&settings.config_client)?; let daemon = Daemon::new( - settings, pool.clone(), reports, - burner, gateway_client, auth_client, invalid_sessions, @@ -184,6 +179,8 @@ impl Cmd { .add_task(invalid_sessions_server) .add_task(reports_server) .add_task(event_id_purger) + .add_task(burn_initiator) + .add_task(burn_confirmer) .add_task(daemon) .start() .await diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 3a0a93520..4d6c71332 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -3,11 +3,3 @@ pub mod burner; pub mod daemon; pub mod event_ids; pub mod settings; - -const BYTES_PER_DC: u64 = 20_000; - -pub fn bytes_to_dc(bytes: u64) -> u64 { - let bytes = bytes.max(BYTES_PER_DC); - // Integer div/ceil from: https://stackoverflow.com/a/2745086 - (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC -} diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 0ae5e2ee4..628946c98 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -13,7 +13,10 @@ pub struct Settings { pub cache: String, /// Burn period in hours. (Default is 1) #[serde(default = "default_burn_period")] - pub burn_period: i64, + pub burn_period: u64, + /// Confirmation period in minutes. (Default is 10) + #[serde(default = "default_confirmation_period")] + pub confirmation_period: u64, pub database: db_store::Settings, pub ingest: file_store::Settings, pub output: file_store::Settings, @@ -50,10 +53,14 @@ pub fn default_log() -> String { "mobile_packet_verifier=debug,poc_store=info".to_string() } -pub fn default_burn_period() -> i64 { +pub fn default_burn_period() -> u64 { 1 } +pub fn default_confirmation_period() -> u64 { + 10 +} + impl Settings { /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overriden with environment variables.