diff --git a/Cargo.lock b/Cargo.lock index 7958d1363..cb1ee7b67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4149,6 +4149,7 @@ dependencies = [ "serde", "sha2 0.10.6", "solana", + "solana-sdk", "sqlx", "task-manager", "thiserror", diff --git a/mobile_packet_verifier/Cargo.toml b/mobile_packet_verifier/Cargo.toml index 6e49b156d..c2ce10587 100644 --- a/mobile_packet_verifier/Cargo.toml +++ b/mobile_packet_verifier/Cargo.toml @@ -21,6 +21,7 @@ helium-crypto = {workspace = true, features = ["sqlx-postgres", "multisig", "sol metrics = {workspace = true} poc-metrics = {path = "../metrics"} prost = {workspace = true} +solana-sdk = {workspace = true} serde = {workspace = true} sqlx = {workspace = true} solana = {path = "../solana"} diff --git a/mobile_packet_verifier/migrations/7_two_phase_commit.sql b/mobile_packet_verifier/migrations/7_two_phase_commit.sql new file mode 100644 index 000000000..2ca555cdd --- /dev/null +++ b/mobile_packet_verifier/migrations/7_two_phase_commit.sql @@ -0,0 +1,29 @@ +CREATE TABLE pending_txns AS ( + signature TEXT PRIMARY KEY, + pub_key TEXT NOT NULL, + payer TEXT NOT NULL, + num_dcs BIGINT NOT NULL, + uploaded_bytes BIGINT NOT NULL, + downloaded_bytes BIGINT NOT NULL, + rewardable_bytes BIGINT NOT NULL, + first_timestamp TIMESTAMPTZ NOT NULL, + last_timestmap TIMESTAMPTZ NOT NULL +); + +CREATE TABLE data_transfer_sessions_by_row ( + pub_key TEXT NOT NULL, + payer TEXT NOT NULL, + uploaded_bytes BIGINT NOT NULL, + downloaded_bytes BIGINT NOT NULL, + rewardable_bytes BIGINT NOT NULL, + session_timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY(pub_key, payer, session_timestamp) +); + +INSERT INTO data_transfer_sessions_by_row + (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, session_timestamp) +SELECT pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, last_timestamp as session_timestamp FROM data_transfer_sessions; + +ALTER TABLE data_transfer_sessions RENAME TO old_data_transfer_sessions; +ALTER TABLE data_transfer_sessions_by_row to data_transfer_sessions; +DROP TABLE data_transfer_sessions; diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index b0da9b8e6..a125250f0 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -49,17 +49,11 @@ pub async fn accumulate_sessions( let event = report.report.data_transfer_usage; sqlx::query( r#" - INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $6) - ON CONFLICT (pub_key, payer) DO UPDATE SET - uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, - last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, session_timestamp) VALUES ($1, $2, $3, $4, $5, $6); "# ) .bind(event.pub_key) - .bind(event.payer) + .bind(&event.payer) .bind(event.upload_bytes as i64) .bind(event.download_bytes as i64) .bind(report.report.rewardable_bytes as i64) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index df206a029..ebf6aa352 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -2,9 +2,11 @@ use chrono::{DateTime, Utc}; use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::SolanaNetwork; -use sqlx::{FromRow, Pool, Postgres}; -use std::collections::HashMap; +use solana::{GetSignature, SolanaNetwork}; +use solana_sdk::signature::{ParseSignatureError, Signature}; +use sqlx::{FromRow, PgPool}; +use task_manager::ManagedTask; +use tokio::time::{sleep_until, Duration, Instant}; #[derive(FromRow)] pub struct DataTransferSession { @@ -17,137 +19,252 @@ pub struct DataTransferSession { last_timestamp: DateTime, } -#[derive(Default)] -pub struct PayerTotals { - total_dcs: u64, - sessions: Vec, -} - -impl PayerTotals { - fn push_sess(&mut self, sess: DataTransferSession) { - self.total_dcs += bytes_to_dc(sess.rewardable_bytes as u64); - self.sessions.push(sess); - } +#[derive(thiserror::Error, Debug)] +pub enum BurnError { + #[error("file store error: {0}")] + FileStoreError(#[from] file_store::Error), + #[error("sql error: {0}")] + SqlError(#[from] sqlx::Error), + #[error("Chrono error: {0}")] + ChronoError(#[from] chrono::OutOfRangeError), + #[error("solana error: {0}")] + SolanaError(E), + #[error("parse signature error: {0}")] + ParseSignatureError(#[from] ParseSignatureError), } -pub struct Burner { - valid_sessions: FileSinkClient, +pub struct BurnInitiator { solana: S, + pool: PgPool, + burn_period: Duration, } -impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { +impl BurnInitiator { + pub fn new(solana: S, pool: PgPool, burn_period: Duration) -> Self { Self { - valid_sessions, solana, + pool, + burn_period, } } } -#[derive(thiserror::Error, Debug)] -pub enum BurnError { - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), - #[error("sql error: {0}")] - SqlError(#[from] sqlx::Error), - #[error("solana error: {0}")] - SolanaError(E), +impl ManagedTask for BurnInitiator +where + S: SolanaNetwork, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } } -impl Burner +impl BurnInitiator where S: SolanaNetwork, { - pub async fn burn(&self, pool: &Pool) -> Result<(), BurnError> { - // Fetch all of the sessions - let sessions: Vec = - sqlx::query_as("SELECT * FROM data_transfer_sessions") - .fetch_all(pool) - .await?; + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + // Initial burn period is one minute + let mut burn_time = Instant::now() + Duration::from_secs(60); + loop { + #[rustfmt::skip] + tokio::select! { + _ = sleep_until(burn_time) => { + // Initiate new burns + self.initiate_burns().await?; + burn_time = Instant::now() + self.burn_period; + } + _ = shutdown.clone() => return Ok(()), + } + } + } + + pub async fn initiate_burns(&self) -> anyhow::Result<()> { + let sessions: Vec = sqlx::query_as( + r#" + SELECT pub_key, payer, SUM(uploaded_bytes) as uploaded_bytes, SUM(downloaded_bytes) as downloaded_bytes, + SUM(rewardable_bytes) as rewardable_bytes, MIN(session_timestamp) as first_timestamp, MAX(session_timestamp) as last_timestamp + FROM data_transfer_sessions GROUP BY pub_key, payer + WHERE session_timestamp < $1 + "# + ).bind(Utc::now()) + .fetch_all(&self.pool) + .await?; - // Fetch all of the sessions and group by the payer - let mut payer_totals = HashMap::::new(); for session in sessions.into_iter() { - payer_totals - .entry(session.payer.clone()) - .or_default() - .push_sess(session); + let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); + let txn = self + .solana + .make_burn_transaction(&session.payer, num_dcs) + .await?; + let mut transaction = self.pool.begin().await?; + sqlx::query( + r#" + INSERT INTO pending_txns (signature, pub_key, payer, num_dcs, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timesetamp) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + "# + ) + .bind(txn.get_signature().to_string()) + .bind(&session.pub_key) + .bind(&session.payer) + .bind(session.uploaded_bytes) + .bind(session.downloaded_bytes) + .bind(session.rewardable_bytes) + .bind(session.first_timestamp) + .bind(session.last_timestamp) + .execute(&mut transaction) + .await?; + sqlx::query( + r#" + DELETE FROM pending_txns WHERE pub_key = $1 AND payer = $2 AND session_timestamp >= $3 and session_timestamp <= $4 + "# + ) + .bind(&session.pub_key) + .bind(&session.payer) + .bind(session.first_timestamp) + .bind(session.last_timestamp) + .execute(&mut transaction) + .await?; + transaction.commit().await?; + // We should make this a quick submission that doesn't check for confirmation + let _ = self.solana.submit_transaction(&txn).await; } + Ok(()) + } +} - for ( - payer, - PayerTotals { - total_dcs, - sessions, - }, - ) in payer_totals.into_iter() - { - let payer_balance = self - .solana - .payer_balance(&payer) - .await - .map_err(BurnError::SolanaError)?; +#[derive(FromRow)] +pub struct PendingTxn { + signature: String, + pub_key: PublicKeyBinary, + payer: PublicKeyBinary, + uploaded_bytes: i64, + downloaded_bytes: i64, + rewardable_bytes: i64, + first_timestamp: DateTime, + last_timestamp: DateTime, +} - if payer_balance < total_dcs { - tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); - continue; - } +pub struct BurnConfirmer { + valid_sessions: FileSinkClient, + solana: S, + pool: PgPool, + confirmation_period: Duration, +} - tracing::info!(%total_dcs, %payer, "Burning DC"); - if self.burn_data_credits(&payer, total_dcs).await.is_err() { - // We have failed to burn data credits: - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false"); - continue; - } +impl BurnConfirmer { + pub fn new( + valid_sessions: FileSinkClient, + solana: S, + pool: PgPool, + confirmation_period: Duration, + ) -> Self { + Self { + valid_sessions, + solana, + pool, + confirmation_period, + } + } +} - // We succesfully managed to burn data credits: +impl ManagedTask for BurnConfirmer +where + S: SolanaNetwork, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "true"); +impl BurnConfirmer +where + S: SolanaNetwork, +{ + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + // Initial confirmation period is two minutes + let mut confirm_time = Instant::now() + Duration::from_secs(120); + loop { + #[rustfmt::skip] + tokio::select! { + _ = sleep_until(confirm_time) => { + // Initiate new burns + self.confirm_burns().await?; + confirm_time = Instant::now() + self.confirmation_period; + } + _ = shutdown.clone() => return Ok(()), + } + } + } - // Delete from the data transfer session and write out to S3 + pub async fn confirm_burns(&self) -> anyhow::Result<()> { + let pending_txns: Vec = sqlx::query_as(r#"SELECT * FROM pending_txns"#) + .fetch_all(&self.pool) + .await?; - sqlx::query("DELETE FROM data_transfer_sessions WHERE payer = $1") - .bind(&payer) - .execute(pool) + for pending_txn in pending_txns { + let txn: Signature = pending_txn.signature.parse()?; + let mut transaction = self.pool.begin().await?; + let num_dcs = bytes_to_dc(pending_txn.rewardable_bytes as u64); + sqlx::query(r#"DELETE FROM pending_txns WHERE signature = $1"#) + .bind(pending_txn.signature) + .execute(&mut transaction) .await?; - - for session in sessions { - let num_dcs = bytes_to_dc(session.rewardable_bytes as u64); + if self.solana.confirm_transaction(&txn).await? { self.valid_sessions .write( ValidDataTransferSession { - pub_key: session.pub_key.into(), - payer: session.payer.into(), - upload_bytes: session.uploaded_bytes as u64, - download_bytes: session.downloaded_bytes as u64, - rewardable_bytes: session.rewardable_bytes as u64, + pub_key: pending_txn.pub_key.into(), + payer: pending_txn.payer.into(), + upload_bytes: pending_txn.uploaded_bytes as u64, + download_bytes: pending_txn.downloaded_bytes as u64, + rewardable_bytes: pending_txn.rewardable_bytes as u64, num_dcs, - first_timestamp: session.first_timestamp.encode_timestamp_millis(), - last_timestamp: session.last_timestamp.encode_timestamp_millis(), + first_timestamp: pending_txn.first_timestamp.encode_timestamp_millis(), + last_timestamp: pending_txn.last_timestamp.encode_timestamp_millis(), }, &[], ) .await?; + } else { + // If we can't confirm the transaction, we can just submit it and check the pending + // transaction next time + let new_txn = self + .solana + .make_burn_transaction(&pending_txn.payer, num_dcs) + .await?; + sqlx::query( + r#" + INSERT INTO pending_txns (signature, pub_key, payer, num_dcs, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + "# + ) + .bind(new_txn.get_signature().to_string()) + .bind(pending_txn.pub_key) + .bind(pending_txn.payer) + .bind(num_dcs as i64) + .bind(pending_txn.uploaded_bytes) + .bind(pending_txn.downloaded_bytes) + .bind(pending_txn.rewardable_bytes) + .bind(pending_txn.first_timestamp) + .bind(pending_txn.last_timestamp) + .execute(&mut transaction) + .await?; + let _ = self.solana.submit_transaction(&new_txn).await; } } Ok(()) } - - async fn burn_data_credits( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result<(), S::Error> { - let txn = self.solana.make_burn_transaction(payer, amount).await?; - self.solana.submit_transaction(&txn).await?; - Ok(()) - } } const BYTES_PER_DC: u64 = 20_000; -fn bytes_to_dc(bytes: u64) -> u64 { +pub fn bytes_to_dc(bytes: u64) -> u64 { let bytes = bytes.max(BYTES_PER_DC); // Integer div/ceil from: https://stackoverflow.com/a/2745086 (bytes + BYTES_PER_DC - 1) / BYTES_PER_DC diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index cf09dfc0f..1dbc377fc 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,4 +1,8 @@ -use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; +use crate::{ + burner::{BurnConfirmer, BurnInitiator}, + event_ids::EventIdPurger, + settings::Settings, +}; use anyhow::{bail, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -13,39 +17,30 @@ use mobile_config::client::{ authorization_client::AuthorizationVerifier, gateway_client::GatewayInfoResolver, AuthorizationClient, GatewayClient, }; -use solana::{SolanaNetwork, SolanaRpc}; +use solana::SolanaRpc; use sqlx::{Pool, Postgres}; use task_manager::{ManagedTask, TaskManager}; -use tokio::{ - sync::mpsc::Receiver, - time::{sleep_until, Duration, Instant}, -}; +use tokio::{sync::mpsc::Receiver, time::Duration}; -pub struct Daemon { +pub struct Daemon { pool: Pool, - burner: Burner, reports: Receiver>, - burn_period: Duration, gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, } -impl Daemon { +impl Daemon { pub fn new( - settings: &Settings, pool: Pool, reports: Receiver>, - burner: Burner, gateway_info_resolver: GIR, authorization_verifier: AV, invalid_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, - burner, reports, - burn_period: Duration::from_secs(60 * 60 * settings.burn_period as u64), gateway_info_resolver, authorization_verifier, invalid_data_session_report_sink, @@ -53,9 +48,8 @@ impl Daemon { } } -impl ManagedTask for Daemon +impl ManagedTask for Daemon where - S: SolanaNetwork, GIR: GatewayInfoResolver, AV: AuthorizationVerifier + 'static, { @@ -67,15 +61,12 @@ where } } -impl Daemon +impl Daemon where - S: SolanaNetwork, GIR: GatewayInfoResolver, AV: AuthorizationVerifier, { pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { - // Set the initial burn period to one minute - let mut burn_time = Instant::now() + Duration::from_secs(60); loop { tokio::select! { file = self.reports.recv() => { @@ -90,11 +81,6 @@ where transaction.commit().await?; self.invalid_data_session_report_sink.commit().await?; }, - _ = sleep_until(burn_time) => { - // It's time to burn - self.burner.burn(&self.pool).await?; - burn_time = Instant::now() + self.burn_period; - } _ = shutdown.clone() => return Ok(()), } } @@ -138,6 +124,19 @@ impl Cmd { .create() .await?; + let burn_initiator = BurnInitiator::new( + solana.clone(), + pool.clone(), + Duration::from_secs(60 * 60 * settings.burn_period), + ); + + let burn_confirmer = BurnConfirmer::new( + valid_sessions, + solana, + pool.clone(), + Duration::from_secs(60 * settings.confirmation_period), + ); + let (invalid_sessions, invalid_sessions_server) = FileSinkBuilder::new( FileType::InvalidDataTransferSessionIngestReport, store_base_path, @@ -148,8 +147,6 @@ impl Cmd { .create() .await?; - let burner = Burner::new(valid_sessions, solana); - let file_store = FileStore::from_settings(&settings.ingest).await?; let (reports, reports_server) = @@ -167,10 +164,8 @@ impl Cmd { let auth_client = AuthorizationClient::from_settings(&settings.config_client)?; let daemon = Daemon::new( - settings, pool.clone(), reports, - burner, gateway_client, auth_client, invalid_sessions, @@ -184,6 +179,8 @@ impl Cmd { .add_task(invalid_sessions_server) .add_task(reports_server) .add_task(event_id_purger) + .add_task(burn_initiator) + .add_task(burn_confirmer) .add_task(daemon) .start() .await diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 0ae5e2ee4..628946c98 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -13,7 +13,10 @@ pub struct Settings { pub cache: String, /// Burn period in hours. (Default is 1) #[serde(default = "default_burn_period")] - pub burn_period: i64, + pub burn_period: u64, + /// Confirmation period in minutes. (Default is 10) + #[serde(default = "default_confirmation_period")] + pub confirmation_period: u64, pub database: db_store::Settings, pub ingest: file_store::Settings, pub output: file_store::Settings, @@ -50,10 +53,14 @@ pub fn default_log() -> String { "mobile_packet_verifier=debug,poc_store=info".to_string() } -pub fn default_burn_period() -> i64 { +pub fn default_burn_period() -> u64 { 1 } +pub fn default_confirmation_period() -> u64 { + 10 +} + impl Settings { /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overriden with environment variables.