diff --git a/Cargo.lock b/Cargo.lock index 2fa28164d..983472175 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,21 @@ dependencies = [ "serde", ] +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bit_field" version = "0.10.1" @@ -3242,8 +3257,8 @@ dependencies = [ "serde_json", "sha2 0.10.8", "sqlx", - "strum 0.24.1", - "strum_macros 0.24.3", + "strum", + "strum_macros", "task-manager", "tempfile", "thiserror", @@ -3874,8 +3889,8 @@ dependencies = [ "prost-build", "serde", "serde_json", - "strum 0.26.3", - "strum_macros 0.26.4", + "strum", + "strum_macros", "tonic", "tonic-build", ] @@ -5256,6 +5271,7 @@ dependencies = [ "once_cell", "poc-metrics", "price", + "proptest", "prost", "rand 0.8.5", "regex", @@ -6123,7 +6139,7 @@ dependencies = [ ] [[package]] -name = "promotion_fund" +name = "promotion-fund" version = "0.1.0" dependencies = [ "anyhow", @@ -6148,6 +6164,26 @@ dependencies = [ "triggered", ] +[[package]] +name = "proptest" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c2511913b88df1637da85cc8d96ec8e43a3f8bb8ccb71ee1ac240d6f3df58d" +dependencies = [ + "bit-set", + "bit-vec", + "bitflags 2.5.0", + "lazy_static", + "num-traits", + "rand 0.8.5", + "rand_chacha 0.3.0", + "rand_xorshift", + "regex-syntax 0.8.3", + "rusty-fork", + "tempfile", + "unarray", +] + [[package]] name = "prost" version = "0.12.4" @@ -6290,6 +6326,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quinn" version = "0.10.2" @@ -6424,6 +6466,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xorshift" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "rand_xoshiro" version = "0.6.0" @@ -7007,6 +7058,18 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +[[package]] +name = "rusty-fork" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" +dependencies = [ + "fnv", + "quick-error", + "tempfile", + "wait-timeout", +] + [[package]] name = "ryu" version = "1.0.11" @@ -8754,35 +8817,13 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "strum" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" -dependencies = [ - "strum_macros 0.24.3", -] - [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros 0.26.4", -] - -[[package]] -name = "strum_macros" -version = "0.24.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" -dependencies = [ - "heck 0.4.0", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", + "strum_macros", ] [[package]] @@ -9400,6 +9441,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c52b4cb7830f995903b2fcff3f523d21efc1c11f6c1596dd544b7925a64ff56" +[[package]] +name = "unarray" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -9601,6 +9648,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "wait-timeout" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" +dependencies = [ + "libc", +] + [[package]] name = "waker-fn" version = "1.1.0" diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index a028d2d8b..511a46ea1 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -1,5 +1,7 @@ use crate::{ - file_info_poller::{FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser}, + file_info_poller::{ + FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser, ProstFileInfoPollerParser, + }, file_sink, BytesMutStream, Error, FileStore, }; use async_compression::tokio::bufread::GzipDecoder; @@ -7,17 +9,43 @@ use futures::{ stream::{self}, StreamExt, TryFutureExt, TryStreamExt, }; -use std::path::{Path, PathBuf}; +use std::{ + marker::PhantomData, + path::{Path, PathBuf}, +}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; +pub struct Continuous(PhantomData<(Store, Parser)>); + +impl Continuous { + pub fn msg_source( + ) -> FileInfoPollerConfigBuilder + where + Msg: Clone, + { + FileInfoPollerConfigBuilder::::default() + .parser(MsgDecodeFileInfoPollerParser) + } +} + +impl Continuous { + pub fn prost_source( + ) -> FileInfoPollerConfigBuilder + where + Msg: Clone, + { + FileInfoPollerConfigBuilder::::default() + .parser(ProstFileInfoPollerParser) + } +} + pub fn continuous_source( ) -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerConfigBuilder::::default() - .parser(MsgDecodeFileInfoPollerParser) + Continuous::msg_source::() } pub fn source(paths: I) -> BytesMutStream diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 29d0989ca..84e6c61b3 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -63,3 +63,4 @@ coverage-map = { path = "../coverage_map" } [dev-dependencies] backon = "0" +proptest = "1.5.0" diff --git a/mobile_verifier/migrations/37_sp_promotions.sql b/mobile_verifier/migrations/37_sp_promotions.sql new file mode 100644 index 000000000..1b55e8cc4 --- /dev/null +++ b/mobile_verifier/migrations/37_sp_promotions.sql @@ -0,0 +1,21 @@ +CREATE TABLE IF NOT EXISTS subscriber_promotion_rewards ( + time_of_reward TIMESTAMPTZ NOT NULL, + subscriber_id BYTEA NOT NULL, + carrier_key TEXT NOT NULL, + shares BIGINT NOT NULL, + PRIMARY KEY (time_of_reward, subscriber_id, carrier_key) +); + +CREATE TABLE IF NOT EXISTS gateway_promotion_rewards ( + time_of_reward TIMESTAMPTZ NOT NULL, + gateway_key TEXT NOT NULL, + carrier_key TEXT NOT NULL, + shares BIGINT NOT NULL, + PRIMARY KEY (time_of_reward, gateway_key, carrier_key) +); + +CREATE TABLE IF NOT EXISTS service_provider_promotion_funds ( + service_provider BIGINT NOT NULL PRIMARY KEY, + basis_points BIGINT NOT NULL, + inserted_at TIMESTAMPTZ NOT NULL +); diff --git a/mobile_verifier/pkg/settings-template.toml b/mobile_verifier/pkg/settings-template.toml index 57b462eca..98bd7caeb 100644 --- a/mobile_verifier/pkg/settings-template.toml +++ b/mobile_verifier/pkg/settings-template.toml @@ -47,6 +47,14 @@ block = 0 # bucket = "mainnet-mobile-ingest" +[promotion_ingest] + +# Input bucket details for Service Provider Promotion Funds + +# Name of bucket to access ingest data. Required +# +bucket = "price" + # Region for bucket. Defaults to below # # region = "us-west-2" diff --git a/mobile_verifier/src/cli/mod.rs b/mobile_verifier/src/cli/mod.rs index 611db001b..36cc1a1df 100644 --- a/mobile_verifier/src/cli/mod.rs +++ b/mobile_verifier/src/cli/mod.rs @@ -1,3 +1,4 @@ +pub mod promotion_funds; pub mod reward_from_db; pub mod server; pub mod verify_disktree; diff --git a/mobile_verifier/src/cli/promotion_funds.rs b/mobile_verifier/src/cli/promotion_funds.rs new file mode 100644 index 000000000..0801a9074 --- /dev/null +++ b/mobile_verifier/src/cli/promotion_funds.rs @@ -0,0 +1,59 @@ +use crate::{ + service_provider::promotions::funds::{ + delete_promotion_fund, fetch_promotion_funds, save_promotion_fund, + }, + Settings, +}; + +#[derive(Debug, clap::Args)] +pub struct Cmd { + #[clap(subcommand)] + sub_command: SubCommand, +} + +#[derive(Debug, clap::Subcommand)] +enum SubCommand { + /// Print Service Provider promotions in mobile-verifier db + List, + /// Set Service Provider promotion in mobile-verifier db + Set { + service_provider_id: i32, + basis_points: u16, + }, + /// Remove Service Provider promotion allocation from mobile-verifier db + Unset { service_provider_id: i32 }, +} + +impl Cmd { + pub async fn run(&self, settings: &Settings) -> anyhow::Result<()> { + let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; + + match self.sub_command { + SubCommand::List => { + let funds = fetch_promotion_funds(&pool).await?; + println!("{funds:?}"); + } + SubCommand::Set { + service_provider_id, + basis_points, + } => { + let mut txn = pool.begin().await?; + save_promotion_fund(&mut txn, service_provider_id, basis_points).await?; + txn.commit().await?; + + let funds = fetch_promotion_funds(&pool).await?; + println!("{funds:?}"); + } + SubCommand::Unset { + service_provider_id, + } => { + delete_promotion_fund(&pool, service_provider_id).await?; + + let funds = fetch_promotion_funds(&pool).await?; + println!("{funds:?}"); + } + } + + Ok(()) + } +} diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 193455c5c..50d4fb61e 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -8,6 +8,7 @@ use crate::{ heartbeats::{cbrs::CbrsHeartbeatDaemon, wifi::WifiHeartbeatDaemon}, radio_threshold::RadioThresholdIngestor, rewarder::Rewarder, + service_provider, sp_boosted_rewards_bans::ServiceProviderBoostedRewardsBanIngestor, speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, @@ -45,6 +46,7 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); let report_ingest = FileStore::from_settings(&settings.ingest).await?; + let promotion_fund_ingest = FileStore::from_settings(&settings.promotion_ingest).await?; // mobile config clients let gateway_client = GatewayClient::from_settings(&settings.config_client)?; @@ -137,7 +139,7 @@ impl Cmd { file_upload.clone(), report_ingest.clone(), speedtests_avg.clone(), - gateway_client, + gateway_client.clone(), ) .await?, ) @@ -193,6 +195,19 @@ impl Cmd { ) .await?, ) + .add_task( + service_provider::PromotionDaemon::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + report_ingest.clone(), + promotion_fund_ingest, + gateway_client.clone(), + auth_client.clone(), + entity_client.clone(), + ) + .await?, + ) .add_task(DataSessionIngestor::create_managed_task(pool.clone(), settings).await?) .add_task( ServiceProviderBoostedRewardsBanIngestor::create_managed_task( diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index b519fed64..7f1940777 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -10,8 +10,6 @@ use futures::{ TryFutureExt, }; use helium_crypto::PublicKeyBinary; -use helium_proto::ServiceProvider; -use rust_decimal::Decimal; use sqlx::{PgPool, Pool, Postgres, Row, Transaction}; use std::{collections::HashMap, ops::Range, time::Instant}; use task_manager::{ManagedTask, TaskManager}; @@ -30,12 +28,6 @@ pub struct HotspotReward { pub rewardable_dc: u64, } -#[derive(Clone, Debug)] -pub struct ServiceProviderDataSession { - pub service_provider: ServiceProvider, - pub total_dcs: Decimal, -} - pub type HotspotMap = HashMap; impl DataSessionIngestor { diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 61cb44a9f..9fc3757c0 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -9,6 +9,7 @@ pub mod radio_threshold; pub mod reward_shares; pub mod rewarder; pub mod seniority; +pub mod service_provider; mod settings; pub mod sp_boosted_rewards_bans; pub mod speedtests; @@ -29,6 +30,12 @@ pub enum GatewayResolution { DataOnly, } +impl GatewayResolution { + pub fn is_not_found(&self) -> bool { + matches!(self, GatewayResolution::GatewayNotFound) + } +} + #[async_trait::async_trait] pub trait GatewayResolver: Clone + Send + Sync + 'static { type Error: Error + Send + Sync + 'static; diff --git a/mobile_verifier/src/main.rs b/mobile_verifier/src/main.rs index 3b3c991b7..43d6a3ba2 100644 --- a/mobile_verifier/src/main.rs +++ b/mobile_verifier/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::Parser; use mobile_verifier::{ - cli::{reward_from_db, server, verify_disktree}, + cli::{promotion_funds, reward_from_db, server, verify_disktree}, Settings, }; use std::path; @@ -37,6 +37,7 @@ pub enum Cmd { /// Go through every cell and ensure it's value can be turned into an Assignment. /// NOTE: This can take a very long time. Run with a --release binary. VerifyDisktree(verify_disktree::Cmd), + PromotionFunds(promotion_funds::Cmd), } impl Cmd { @@ -45,6 +46,7 @@ impl Cmd { Self::Server(cmd) => cmd.run(&settings).await, Self::RewardFromDb(cmd) => cmd.run(&settings).await, Self::VerifyDisktree(cmd) => cmd.run(&settings).await, + Self::PromotionFunds(cmd) => cmd.run(&settings).await, } } } diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index e5f2d2d00..713438634 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,11 +1,7 @@ use crate::{ - coverage::CoveredHexStream, - data_session::{HotspotMap, ServiceProviderDataSession}, - heartbeats::HeartbeatReward, - rewarder::boosted_hex_eligibility::BoostedHexEligibility, - seniority::Seniority, - sp_boosted_rewards_bans::BannedRadios, - speedtests_average::SpeedtestAverages, + coverage::CoveredHexStream, data_session::HotspotMap, heartbeats::HeartbeatReward, + rewarder::boosted_hex_eligibility::BoostedHexEligibility, seniority::Seniority, + sp_boosted_rewards_bans::BannedRadios, speedtests_average::SpeedtestAverages, subscriber_location::SubscriberValidatedLocations, subscriber_verified_mapping_event::VerifiedSubscriberVerifiedMappingEventShares, }; @@ -14,19 +10,10 @@ use coverage_point_calculator::{OracleBoostingStatus, SPBoostedRewardEligibility use file_store::traits::TimestampEncode; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::{ - services::{ - poc_mobile as proto, - poc_mobile::{ - mobile_reward_share::Reward as ProtoReward, UnallocatedReward, UnallocatedRewardType, - }, - }, - ServiceProvider, -}; -use mobile_config::{ - boosted_hex_info::BoostedHexes, - client::{carrier_service_client::CarrierServiceVerifier, ClientError}, +use helium_proto::services::{ + poc_mobile as proto, poc_mobile::mobile_reward_share::Reward as ProtoReward, }; +use mobile_config::boosted_hex_info::BoostedHexes; use radio_reward_v2::{RadioRewardV2Ext, ToProtoDecimal}; use rust_decimal::prelude::*; use rust_decimal_macros::dec; @@ -52,7 +39,7 @@ const BOOSTED_POC_REWARDS_PERCENT: Decimal = dec!(0.1); const DC_USD_PRICE: Decimal = dec!(0.00001); /// Default precision used for rounding -const DEFAULT_PREC: u32 = 15; +pub const DEFAULT_PREC: u32 = 15; /// Percent of total emissions allocated for mapper rewards const MAPPERS_REWARDS_PERCENT: Decimal = dec!(0.2); @@ -299,127 +286,6 @@ impl MapperShares { } } -#[derive(Default)] -pub struct ServiceProviderShares { - pub shares: Vec, -} - -impl ServiceProviderShares { - pub fn new(shares: Vec) -> Self { - Self { shares } - } - - pub async fn from_payers_dc( - payer_shares: HashMap, - client: &impl CarrierServiceVerifier, - ) -> anyhow::Result { - let mut sp_shares = ServiceProviderShares::default(); - for (payer, total_dcs) in payer_shares { - let service_provider = Self::payer_key_to_service_provider(&payer, client).await?; - sp_shares.shares.push(ServiceProviderDataSession { - service_provider, - total_dcs: Decimal::from(total_dcs), - }) - } - Ok(sp_shares) - } - - fn total_dc(&self) -> Decimal { - self.shares.iter().map(|v| v.total_dcs).sum() - } - - pub fn rewards_per_share( - &self, - total_sp_rewards: Decimal, - mobile_bone_price: Decimal, - ) -> anyhow::Result { - // the total amount of DC spent across all service providers - let total_sp_dc = self.total_dc(); - // the total amount of service provider rewards in bones based on the spent DC - let total_sp_rewards_used = dc_to_mobile_bones(total_sp_dc, mobile_bone_price); - // cap the service provider rewards if used > pool total - let capped_sp_rewards_used = - Self::maybe_cap_service_provider_rewards(total_sp_rewards_used, total_sp_rewards); - Ok(Self::calc_rewards_per_share( - capped_sp_rewards_used, - total_sp_dc, - )) - } - - pub fn into_service_provider_rewards( - self, - reward_period: &'_ Range>, - reward_per_share: Decimal, - ) -> impl Iterator + '_ { - self.shares - .into_iter() - .map(move |share| proto::ServiceProviderReward { - service_provider_id: share.service_provider as i32, - amount: (share.total_dcs * reward_per_share) - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0), - }) - .filter(|service_provider_reward| service_provider_reward.amount > 0) - .map(|service_provider_reward| { - ( - service_provider_reward.amount, - proto::MobileRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::ServiceProviderReward(service_provider_reward)), - }, - ) - }) - } - - pub fn into_unallocated_reward( - unallocated_amount: Decimal, - reward_period: &'_ Range>, - ) -> anyhow::Result { - let reward = UnallocatedReward { - reward_type: UnallocatedRewardType::ServiceProvider as i32, - amount: unallocated_amount - .round_dp_with_strategy(0, RoundingStrategy::ToZero) - .to_u64() - .unwrap_or(0), - }; - Ok(proto::MobileRewardShare { - start_period: reward_period.start.encode_timestamp(), - end_period: reward_period.end.encode_timestamp(), - reward: Some(ProtoReward::UnallocatedReward(reward)), - }) - } - - fn maybe_cap_service_provider_rewards( - total_sp_rewards_used: Decimal, - total_sp_rewards: Decimal, - ) -> Decimal { - match total_sp_rewards_used <= total_sp_rewards { - true => total_sp_rewards_used, - false => total_sp_rewards, - } - } - - fn calc_rewards_per_share(total_rewards: Decimal, total_shares: Decimal) -> Decimal { - if total_shares > Decimal::ZERO { - (total_rewards / total_shares) - .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) - } else { - Decimal::ZERO - } - } - - async fn payer_key_to_service_provider( - payer: &str, - client: &impl CarrierServiceVerifier, - ) -> anyhow::Result { - tracing::info!(payer, "getting service provider for payer"); - let sp = client.payer_key_to_service_provider(payer).await?; - Ok(sp) - } -} - /// Returns the equivalent amount of Mobile bones for a specified amount of Data Credits pub fn dc_to_mobile_bones(dc_amount: Decimal, mobile_bone_price: Decimal) -> Decimal { let dc_in_usd = dc_amount * DC_USD_PRICE; @@ -905,6 +771,12 @@ mod test { data_session::{self, HotspotDataSession, HotspotReward}, heartbeats::{HeartbeatReward, KeyType, OwnedKeyType}, reward_shares, + service_provider::{ + self, + dc_sessions::ServiceProviderDCSessions, + promotions::{funds::ServiceProviderFunds, rewards::ServiceProviderPromotions}, + ServiceProviderRewardInfos, + }, speedtests::Speedtest, speedtests_average::SpeedtestAverage, subscriber_location::SubscriberValidatedLocations, @@ -2348,8 +2220,8 @@ mod test { .is_none()); } - #[tokio::test] - async fn service_provider_reward_amounts() { + #[test] + fn service_provider_reward_amounts() { let mobile_bone_price = dec!(0.00001); let sp1 = ServiceProvider::HeliumMobile; @@ -2357,22 +2229,21 @@ mod test { let now = Utc::now(); let epoch = (now - Duration::hours(1))..now; - let service_provider_sessions = vec![ServiceProviderDataSession { - service_provider: sp1, - total_dcs: dec!(1000), - }]; - let sp_shares = ServiceProviderShares::new(service_provider_sessions); - let total_sp_rewards = get_scheduled_tokens_for_service_providers(epoch.end - epoch.start); - let rewards_per_share = sp_shares - .rewards_per_share(total_sp_rewards, mobile_bone_price) - .unwrap(); + let total_sp_rewards = service_provider::get_scheduled_tokens(&epoch); + let sp_reward_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(sp1, dec!(1000))]), + ServiceProviderFunds::default(), + ServiceProviderPromotions::default(), + total_sp_rewards, + mobile_bone_price, + epoch.clone(), + ); let mut sp_rewards = HashMap::::new(); let mut allocated_sp_rewards = 0_u64; - for (reward_amount, sp_reward) in - sp_shares.into_service_provider_rewards(&epoch, rewards_per_share) - { - if let Some(MobileReward::ServiceProviderReward(r)) = sp_reward.reward { + + for (reward_amount, reward) in sp_reward_infos.iter_rewards() { + if let Some(MobileReward::ServiceProviderReward(r)) = reward.reward { sp_rewards.insert(r.service_provider_id, r.amount); assert_eq!(reward_amount, r.amount); allocated_sp_rewards += reward_amount; @@ -2392,8 +2263,8 @@ mod test { assert_eq!(unallocated_sp_reward_amount, 342_465_752_424); } - #[tokio::test] - async fn service_provider_reward_amounts_capped() { + #[test] + fn service_provider_reward_amounts_capped() { let mobile_bone_price = dec!(1.0); let sp1 = ServiceProvider::HeliumMobile; @@ -2404,28 +2275,27 @@ mod test { let total_rewards_value_in_dc = mobile_bones_to_dc(total_sp_rewards_in_bones, mobile_bone_price); - let service_provider_sessions = vec![ServiceProviderDataSession { - service_provider: ServiceProvider::HeliumMobile, + let sp_reward_infos = ServiceProviderRewardInfos::new( // force the service provider to have spend more DC than total rewardable - total_dcs: total_rewards_value_in_dc * dec!(2.0), - }]; - - let sp_shares = ServiceProviderShares::new(service_provider_sessions); - let rewards_per_share = sp_shares - .rewards_per_share(total_sp_rewards_in_bones, mobile_bone_price) - .unwrap(); + ServiceProviderDCSessions::from([(sp1, total_rewards_value_in_dc * dec!(2.0))]), + ServiceProviderFunds::default(), + ServiceProviderPromotions::default(), + total_rewards_value_in_dc, + mobile_bone_price, + epoch.clone(), + ); let mut sp_rewards = HashMap::new(); let mut allocated_sp_rewards = 0_u64; - for (reward_amount, sp_reward) in - sp_shares.into_service_provider_rewards(&epoch, rewards_per_share) - { - if let Some(MobileReward::ServiceProviderReward(r)) = sp_reward.reward { + + for (reward_amount, reward) in sp_reward_infos.iter_rewards() { + if let Some(MobileReward::ServiceProviderReward(r)) = reward.reward { sp_rewards.insert(r.service_provider_id, r.amount); assert_eq!(reward_amount, r.amount); allocated_sp_rewards += reward_amount; } } + let sp1_reward_amount = *sp_rewards .get(&(sp1 as i32)) .expect("Could not fetch sp1 shares"); @@ -2442,8 +2312,8 @@ mod test { assert_eq!(unallocated_sp_reward_amount, 0); } - #[tokio::test] - async fn service_provider_reward_hip87_ex1() { + #[test] + fn service_provider_reward_hip87_ex1() { // mobile price from hip example and converted to bones let mobile_bone_price = dec!(0.0001) / dec!(1_000_000); let sp1 = ServiceProvider::HeliumMobile; @@ -2452,22 +2322,20 @@ mod test { let epoch = (now - Duration::hours(1))..now; let total_sp_rewards_in_bones = dec!(500_000_000) * dec!(1_000_000); - let service_provider_sessions = vec![ServiceProviderDataSession { - service_provider: sp1, - total_dcs: dec!(100_000_000), - }]; - - let sp_shares = ServiceProviderShares::new(service_provider_sessions); - let rewards_per_share = sp_shares - .rewards_per_share(total_sp_rewards_in_bones, mobile_bone_price) - .unwrap(); + let sp_reward_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(sp1, dec!(100_000_000))]), + ServiceProviderFunds::default(), + ServiceProviderPromotions::default(), + total_sp_rewards_in_bones, + mobile_bone_price, + epoch.clone(), + ); let mut sp_rewards = HashMap::new(); let mut allocated_sp_rewards = 0_u64; - for (reward_amount, sp_reward) in - sp_shares.into_service_provider_rewards(&epoch, rewards_per_share) - { - if let Some(MobileReward::ServiceProviderReward(r)) = sp_reward.reward { + + for (reward_amount, reward) in sp_reward_infos.iter_rewards() { + if let Some(MobileReward::ServiceProviderReward(r)) = reward.reward { sp_rewards.insert(r.service_provider_id, r.amount); assert_eq!(reward_amount, r.amount); allocated_sp_rewards += reward_amount; @@ -2490,8 +2358,8 @@ mod test { assert_eq!(unallocated_sp_reward_amount, 490_000_000_000_000); } - #[tokio::test] - async fn service_provider_reward_hip87_ex2() { + #[test] + fn service_provider_reward_hip87_ex2() { // mobile price from hip example and converted to bones let mobile_bone_price = dec!(0.0001) / dec!(1_000_000); let sp1 = ServiceProvider::HeliumMobile; @@ -2500,22 +2368,20 @@ mod test { let epoch = (now - Duration::hours(24))..now; let total_sp_rewards_in_bones = dec!(500_000_000) * dec!(1_000_000); - let service_provider_sessions = vec![ServiceProviderDataSession { - service_provider: sp1, - total_dcs: dec!(100_000_000_000), - }]; - - let sp_shares = ServiceProviderShares::new(service_provider_sessions); - let rewards_per_share = sp_shares - .rewards_per_share(total_sp_rewards_in_bones, mobile_bone_price) - .unwrap(); + let sp_reward_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(sp1, dec!(100_000_000_000))]), + ServiceProviderFunds::default(), + ServiceProviderPromotions::default(), + total_sp_rewards_in_bones, + mobile_bone_price, + epoch.clone(), + ); let mut sp_rewards = HashMap::new(); let mut allocated_sp_rewards = 0_u64; - for (reward_amount, sp_reward) in - sp_shares.into_service_provider_rewards(&epoch, rewards_per_share) - { - if let Some(MobileReward::ServiceProviderReward(r)) = sp_reward.reward { + + for (reward_amount, reward) in sp_reward_infos.iter_rewards() { + if let Some(MobileReward::ServiceProviderReward(r)) = reward.reward { sp_rewards.insert(r.service_provider_id, r.amount); assert_eq!(reward_amount, r.amount); allocated_sp_rewards += reward_amount; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 055b4a399..d20f864b4 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -5,9 +5,9 @@ use crate::{ radio_threshold, reward_shares::{ self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets, - MapperShares, ServiceProviderShares, TransferRewards, + MapperShares, TransferRewards, }, - sp_boosted_rewards_bans, speedtests, + service_provider, sp_boosted_rewards_bans, speedtests, speedtests_average::SpeedtestAverages, subscriber_location, subscriber_verified_mapping_event, telemetry, Settings, }; @@ -296,6 +296,8 @@ where coverage::clear_coverage_objects(&mut transaction, &reward_period.start).await?; sp_boosted_rewards_bans::clear_bans(&mut transaction, reward_period.start).await?; subscriber_verified_mapping_event::clear(&mut transaction, &reward_period.start).await?; + service_provider::db::clear_promotion_rewards(&mut transaction, &reward_period.start) + .await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); @@ -311,8 +313,7 @@ where boosted_poc_bones_per_reward_share: Some(helium_proto::Decimal { value: poc_dc_shares.boost.to_string(), }), - // TODO: Filled in with the next PR - sp_allocations: vec![], + sp_allocations: service_provider::reward_data_sp_allocations(&self.pool).await?, }; self.reward_manifests .write( @@ -596,33 +597,37 @@ pub async fn reward_service_providers( reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result<()> { - let payer_dc_sessions = - data_session::sum_data_sessions_to_dc_by_payer(pool, reward_period).await?; - let sp_shares = - ServiceProviderShares::from_payers_dc(payer_dc_sessions, carrier_client).await?; - let total_sp_rewards = reward_shares::get_scheduled_tokens_for_service_providers( - reward_period.end - reward_period.start, + use service_provider::{db, ServiceProviderRewardInfos}; + let dc_sessions = db::fetch_dc_sessions(pool, carrier_client, reward_period).await?; + let promo_funds = db::fetch_promotion_funds(pool).await?; + let promo_rewards = db::fetch_promotion_rewards(pool, carrier_client, reward_period).await?; + + let total_sp_rewards = service_provider::get_scheduled_tokens(reward_period); + + let sps = ServiceProviderRewardInfos::new( + dc_sessions, + promo_funds, + promo_rewards, + total_sp_rewards, + mobile_bone_price, + reward_period.clone(), ); - let rewards_per_share = sp_shares.rewards_per_share(total_sp_rewards, mobile_bone_price)?; - // translate service provider shares into service provider rewards - // track the amount of allocated reward value as we go - let mut allocated_sp_rewards = 0_u64; - for (amount, sp_share) in - sp_shares.into_service_provider_rewards(reward_period, rewards_per_share) - { - allocated_sp_rewards += amount; - mobile_rewards.write(sp_share.clone(), []).await?.await??; - } - // write out any unallocated service provider reward - let unallocated_sp_reward_amount = total_sp_rewards + + let mut unallocated_sp_rewards = total_sp_rewards .round_dp_with_strategy(0, RoundingStrategy::ToZero) .to_u64() - .unwrap_or(0) - - allocated_sp_rewards; + .unwrap_or(0); + + for (amount, reward) in sps.iter_rewards() { + unallocated_sp_rewards -= amount; + mobile_rewards.write(reward, []).await?.await??; + } + + // write out any unallocated service provider reward write_unallocated_reward( mobile_rewards, UnallocatedRewardType::ServiceProvider, - unallocated_sp_reward_amount, + unallocated_sp_rewards, reward_period, ) .await?; diff --git a/mobile_verifier/src/service_provider/dc_sessions.rs b/mobile_verifier/src/service_provider/dc_sessions.rs new file mode 100644 index 000000000..1888c3bfa --- /dev/null +++ b/mobile_verifier/src/service_provider/dc_sessions.rs @@ -0,0 +1,170 @@ +use std::{collections::HashMap, ops::Range}; + +use chrono::{DateTime, Utc}; +use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; +use rust_decimal::{Decimal, RoundingStrategy}; +use sqlx::PgPool; + +use crate::{ + data_session, + reward_shares::{dc_to_mobile_bones, DEFAULT_PREC}, +}; + +use super::ServiceProviderId; + +pub async fn fetch_dc_sessions( + pool: &PgPool, + carrier_client: &impl CarrierServiceVerifier, + reward_period: &Range>, +) -> anyhow::Result { + let payer_dc_sessions = + data_session::sum_data_sessions_to_dc_by_payer(pool, reward_period).await?; + + let mut dc_sessions = ServiceProviderDCSessions::default(); + for (payer_key, dc_amount) in payer_dc_sessions { + let service_provider = carrier_client + .payer_key_to_service_provider(&payer_key) + .await?; + dc_sessions.insert( + service_provider as ServiceProviderId, + Decimal::from(dc_amount), + ); + } + + Ok(dc_sessions) +} + +#[derive(Debug, Default)] +pub struct ServiceProviderDCSessions(pub(crate) HashMap); + +impl ServiceProviderDCSessions { + pub fn insert(&mut self, service_provider: ServiceProviderId, dc: Decimal) { + *self.0.entry(service_provider).or_insert(Decimal::ZERO) += dc; + } + + pub fn all_transfer(&self) -> Decimal { + self.0.values().sum() + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.0.iter().map(|(k, v)| (*k, *v)) + } + + pub fn rewards_per_share( + &self, + total_sp_rewards: Decimal, + mobile_bone_price: Decimal, + ) -> anyhow::Result { + // the total amount of DC spent across all service providers + let total_sp_dc = self.all_transfer(); + // the total amount of service provider rewards in bones based on the spent DC + let total_sp_rewards_used = dc_to_mobile_bones(total_sp_dc, mobile_bone_price); + // cap the service provider rewards if used > pool total + let capped_sp_rewards_used = + Self::maybe_cap_service_provider_rewards(total_sp_rewards_used, total_sp_rewards); + Ok(Self::calc_rewards_per_share( + capped_sp_rewards_used, + total_sp_dc, + )) + } + + fn maybe_cap_service_provider_rewards( + total_sp_rewards_used: Decimal, + total_sp_rewards: Decimal, + ) -> Decimal { + match total_sp_rewards_used <= total_sp_rewards { + true => total_sp_rewards_used, + false => total_sp_rewards, + } + } + + fn calc_rewards_per_share(total_rewards: Decimal, total_shares: Decimal) -> Decimal { + if total_shares > Decimal::ZERO { + (total_rewards / total_shares) + .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) + } else { + Decimal::ZERO + } + } +} + +impl From for ServiceProviderDCSessions +where + F: IntoIterator, + I: Into, +{ + fn from(iter: F) -> Self { + let mut me = Self::default(); + for (k, v) in iter { + me.insert(k.into(), v); + } + me + } +} + +#[cfg(test)] +pub mod tests { + + use chrono::Duration; + use helium_proto::ServiceProvider; + + use crate::data_session::HotspotDataSession; + + use super::*; + + impl ServiceProviderDCSessions { + fn len(&self) -> usize { + self.0.len() + } + } + + #[sqlx::test] + fn multiple_payer_keys_accumulate_to_service_provider(pool: PgPool) -> anyhow::Result<()> { + // Client always resolves to single service provider no matter payer key + struct MockClient; + + #[async_trait::async_trait] + impl CarrierServiceVerifier for MockClient { + type Error = ClientError; + + async fn payer_key_to_service_provider<'a>( + &self, + _pubkey: &str, + ) -> Result { + Ok(ServiceProvider::HeliumMobile) + } + } + + // Save multiple data sessions with different payers + let one = HotspotDataSession { + pub_key: vec![0].into(), + payer: vec![0].into(), + upload_bytes: 1_000, + download_bytes: 1_000, + num_dcs: 2_000, + received_timestamp: Utc::now(), + }; + let two = HotspotDataSession { + pub_key: vec![1].into(), + payer: vec![1].into(), + upload_bytes: 1_000, + download_bytes: 1_000, + num_dcs: 2_000, + received_timestamp: Utc::now(), + }; + let mut txn = pool.begin().await?; + one.save(&mut txn).await?; + two.save(&mut txn).await?; + txn.commit().await?; + + let now = Utc::now(); + let epoch = now - Duration::hours(24)..now; + + // dc sessions should represent single payer, and all dc is combined + let map = fetch_dc_sessions(&pool, &MockClient, &epoch).await?; + assert_eq!(map.len(), 1); + assert_eq!(map.all_transfer(), Decimal::from(4_000)); + + Ok(()) + } +} diff --git a/mobile_verifier/src/service_provider/mod.rs b/mobile_verifier/src/service_provider/mod.rs new file mode 100644 index 000000000..9a6f90885 --- /dev/null +++ b/mobile_verifier/src/service_provider/mod.rs @@ -0,0 +1,43 @@ +use std::ops::Range; + +use chrono::{DateTime, Utc}; +use helium_proto::ServiceProviderAllocation; +pub use promotions::daemon::PromotionDaemon; +pub use reward::ServiceProviderRewardInfos; +use sqlx::PgPool; + +pub mod dc_sessions; +pub mod promotions; +pub mod reward; + +pub mod db { + pub use super::dc_sessions::fetch_dc_sessions; + pub use super::promotions::funds::fetch_promotion_funds; + pub use super::promotions::rewards::{clear_promotion_rewards, fetch_promotion_rewards}; +} + +// This type is used in lieu of the helium_proto::ServiceProvider enum so we can +// handle more than a single value without adding a hard deploy dependency to +// mobile-verifier when a new carrier is added.. +pub type ServiceProviderId = i32; + +pub fn get_scheduled_tokens(reward_period: &Range>) -> rust_decimal::Decimal { + let duration = reward_period.end - reward_period.start; + crate::reward_shares::get_scheduled_tokens_for_service_providers(duration) +} + +pub async fn reward_data_sp_allocations( + pool: &PgPool, +) -> anyhow::Result> { + let funds = db::fetch_promotion_funds(pool).await?; + let mut sp_allocations = vec![]; + + for (sp_id, bps) in funds.0.into_iter() { + sp_allocations.push(ServiceProviderAllocation { + service_provider: sp_id, + incentive_escrow_fund_bps: bps as u32, + }); + } + + Ok(sp_allocations) +} diff --git a/mobile_verifier/src/service_provider/promotions/daemon.rs b/mobile_verifier/src/service_provider/promotions/daemon.rs new file mode 100644 index 000000000..57bf11c4f --- /dev/null +++ b/mobile_verifier/src/service_provider/promotions/daemon.rs @@ -0,0 +1,262 @@ +use std::time::{Duration, Instant}; + +use chrono::Utc; +use file_store::{ + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkClient, + file_source, + file_upload::FileUpload, + promotion_reward::{Entity, PromotionReward}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, + FileType, +}; +use futures::{StreamExt, TryFutureExt}; +use helium_proto::{ + services::{ + mobile_config::NetworkKeyRole, + poc_mobile::{ + PromotionRewardIngestReportV1, PromotionRewardStatus, VerifiedPromotionRewardV1, + }, + }, + ServiceProviderPromotionFundV1, +}; +use mobile_config::{ + client::{ + authorization_client::AuthorizationVerifier, entity_client::EntityVerifier, + AuthorizationClient, EntityClient, + }, + GatewayClient, +}; +use sqlx::PgPool; +use task_manager::{ManagedTask, TaskManager}; +use tokio::sync::mpsc::Receiver; + +use crate::{ + service_provider::promotions::{funds, rewards}, + GatewayResolver, Settings, +}; + +pub struct PromotionDaemon { + pool: PgPool, + gateway_info_resolver: GatewayClient, + authorization_verifier: AuthorizationClient, + entity_verifier: EntityClient, + promotion_funds: Receiver>, + promotion_rewards: Receiver>, + promotion_rewards_sink: FileSinkClient, +} + +impl ManagedTask for PromotionDaemon { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::prelude::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl PromotionDaemon { + #[allow(clippy::too_many_arguments)] + pub async fn create_managed_task( + pool: PgPool, + settings: &Settings, + file_upload: FileUpload, + report_file_store: file_store::FileStore, + promotion_file_store: file_store::FileStore, + gateway_info_resolver: GatewayClient, + authorization_verifier: AuthorizationClient, + entity_verifier: EntityClient, + ) -> anyhow::Result { + let (promotion_rewards_sink, valid_promotion_rewards_server) = + VerifiedPromotionRewardV1::file_sink( + settings.store_base_path(), + file_upload.clone(), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), + env!("CARGO_PKG_NAME"), + ) + .await?; + + let (promotion_rewards, promotion_rewards_server) = + file_source::Continuous::msg_source::() + .state(pool.clone()) + .store(report_file_store.clone()) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::PromotionRewardIngestReport.to_string()) + .create() + .await?; + + let (promotion_funds, promotion_funds_server) = + file_source::Continuous::prost_source::() + .state(pool.clone()) + .store(promotion_file_store) + .lookback(LookbackBehavior::StartAfter(settings.start_after)) + .prefix(FileType::ServiceProviderPromotionFund.to_string()) + .create() + .await?; + + let promotion_reward_daemon = Self { + pool, + gateway_info_resolver, + authorization_verifier, + entity_verifier, + promotion_funds, + promotion_rewards, + promotion_rewards_sink, + }; + + Ok(TaskManager::builder() + .add_task(valid_promotion_rewards_server) + .add_task(promotion_funds_server) + .add_task(promotion_rewards_server) + .add_task(promotion_reward_daemon) + .build()) + } + + async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + loop { + tokio::select! { + _ = shutdown.clone() => { + tracing::info!("PromotionRewardDaemon shutting down"); + break; + } + Some(file) = self.promotion_rewards.recv() => { + let start = Instant::now(); + self.process_rewards_file(file).await?; + metrics::histogram!("promotion_reward_processing_time").record(start.elapsed()); + } + Some(file) = self.promotion_funds.recv() => { + let start = Instant::now(); + self.process_funds_file(file).await?; + metrics::histogram!("promotion_funds_processing_time").record(start.elapsed()); + } + } + } + + Ok(()) + } + + async fn process_rewards_file( + &self, + file: FileInfoStream, + ) -> anyhow::Result<()> { + tracing::info!(key = file.file_info.key, "Processing promotion reward file"); + + let mut transaction = self.pool.begin().await?; + let mut promotion_rewards = file.into_stream(&mut transaction).await?; + + while let Some(promotion_reward) = promotion_rewards.next().await { + let promotion_reward_status = validate_promotion_reward( + &promotion_reward, + &self.authorization_verifier, + &self.gateway_info_resolver, + &self.entity_verifier, + ) + .await?; + + if promotion_reward_status == PromotionRewardStatus::Valid { + rewards::save_promotion_reward(&mut transaction, &promotion_reward).await?; + } + + write_promotion_reward( + &self.promotion_rewards_sink, + &promotion_reward, + promotion_reward_status, + ) + .await?; + } + + self.promotion_rewards_sink.commit().await?; + transaction.commit().await?; + + Ok(()) + } + + async fn process_funds_file( + &self, + file: FileInfoStream, + ) -> anyhow::Result<()> { + tracing::info!(key = file.file_info.key, "Processing promotion funds file"); + + let mut txn = self.pool.begin().await?; + + let mut promotion_funds = file.into_stream(&mut txn).await?; + while let Some(promotion_fund) = promotion_funds.next().await { + funds::save_promotion_fund( + &mut txn, + promotion_fund.service_provider, + promotion_fund.bps as u16, + ) + .await?; + } + + txn.commit().await?; + + Ok(()) + } +} + +async fn validate_promotion_reward( + promotion_reward: &PromotionReward, + authorization_verifier: &impl AuthorizationVerifier, + gateway_info_resolver: &impl GatewayResolver, + entity_verifier: &impl EntityVerifier, +) -> anyhow::Result { + if authorization_verifier + .verify_authorized_key( + &promotion_reward.carrier_pub_key, + NetworkKeyRole::MobileCarrier, + ) + .await + .is_err() + { + return Ok(PromotionRewardStatus::InvalidCarrierKey); + } + match &promotion_reward.entity { + Entity::SubscriberId(ref subscriber_id) + if entity_verifier + .verify_rewardable_entity(subscriber_id) + .await + .is_err() => + { + Ok(PromotionRewardStatus::InvalidSubscriberId) + } + Entity::GatewayKey(ref gateway_key) + if gateway_info_resolver + .resolve_gateway(gateway_key) + .await? + .is_not_found() => + { + Ok(PromotionRewardStatus::InvalidGatewayKey) + } + _ => Ok(PromotionRewardStatus::Valid), + } +} + +async fn write_promotion_reward( + file_sink: &FileSinkClient, + promotion_reward: &PromotionReward, + status: PromotionRewardStatus, +) -> anyhow::Result<()> { + file_sink + .write( + VerifiedPromotionRewardV1 { + report: Some(PromotionRewardIngestReportV1 { + received_timestamp: promotion_reward + .received_timestamp + .encode_timestamp_millis(), + report: Some(promotion_reward.clone().into()), + }), + status: status as i32, + timestamp: Utc::now().encode_timestamp_millis(), + }, + &[("validity", status.as_str_name())], + ) + .await?; + Ok(()) +} diff --git a/mobile_verifier/src/service_provider/promotions/funds.rs b/mobile_verifier/src/service_provider/promotions/funds.rs new file mode 100644 index 000000000..70a0aa28e --- /dev/null +++ b/mobile_verifier/src/service_provider/promotions/funds.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; + +use chrono::Utc; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; +use sqlx::{PgPool, Postgres, Transaction}; + +use crate::service_provider::ServiceProviderId; + +#[derive(Debug, Default)] +pub struct ServiceProviderFunds(pub(crate) HashMap); + +impl ServiceProviderFunds { + pub fn get_fund_percent(&self, service_provider_id: ServiceProviderId) -> Decimal { + let bps = self + .0 + .get(&service_provider_id) + .cloned() + .unwrap_or_default(); + Decimal::from(bps) / dec!(10_000) + } +} + +impl From for ServiceProviderFunds +where + F: IntoIterator, + I: Into, +{ + fn from(funds: F) -> Self { + Self(funds.into_iter().map(|(k, v)| (k.into(), v)).collect()) + } +} + +pub async fn fetch_promotion_funds(pool: &PgPool) -> anyhow::Result { + #[derive(Debug, sqlx::FromRow)] + struct PromotionFund { + #[sqlx(try_from = "i64")] + pub service_provider: ServiceProviderId, + #[sqlx(try_from = "i64")] + pub basis_points: u16, + } + + let funds = sqlx::query_as::<_, PromotionFund>( + r#" + SELECT + service_provider, basis_points + FROM + service_provider_promotion_funds + "#, + ) + .fetch_all(pool) + .await?; + + let funds = funds + .into_iter() + .map(|fund| (fund.service_provider, fund.basis_points)) + .collect(); + + Ok(ServiceProviderFunds(funds)) +} + +pub async fn save_promotion_fund( + transaction: &mut Transaction<'_, Postgres>, + service_provider_id: ServiceProviderId, + basis_points: u16, +) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO service_provider_promotion_funds + (service_provider, basis_points, inserted_at) + VALUES + ($1, $2, $3) + "#, + ) + .bind(service_provider_id) + .bind(basis_points as i64) + .bind(Utc::now()) + .execute(transaction) + .await?; + + Ok(()) +} + +pub async fn delete_promotion_fund( + pool: &PgPool, + service_provider_id: ServiceProviderId, +) -> anyhow::Result<()> { + sqlx::query( + r#" + DELETE FROM service_provider_promotion_funds + WHERE service_provider = $1 + "#, + ) + .bind(service_provider_id) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/mobile_verifier/src/service_provider/promotions/mod.rs b/mobile_verifier/src/service_provider/promotions/mod.rs new file mode 100644 index 000000000..766b23940 --- /dev/null +++ b/mobile_verifier/src/service_provider/promotions/mod.rs @@ -0,0 +1,3 @@ +pub mod daemon; +pub mod funds; +pub mod rewards; diff --git a/mobile_verifier/src/service_provider/promotions/rewards.rs b/mobile_verifier/src/service_provider/promotions/rewards.rs new file mode 100644 index 000000000..70c91ad2a --- /dev/null +++ b/mobile_verifier/src/service_provider/promotions/rewards.rs @@ -0,0 +1,179 @@ +use std::ops::Range; + +use chrono::{DateTime, Utc}; +use file_store::promotion_reward::{Entity, PromotionReward}; +use futures::TryStreamExt; +use helium_crypto::PublicKeyBinary; +use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; +use rust_decimal::Decimal; +use sqlx::{postgres::PgRow, PgPool, Postgres, Row, Transaction}; + +use crate::service_provider::ServiceProviderId; + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct ServiceProviderPromotions(Vec); + +impl ServiceProviderPromotions { + pub fn for_service_provider( + &self, + service_provider_id: ServiceProviderId, + ) -> ServiceProviderPromotions { + let promotions = self + .0 + .iter() + .filter(|x| x.service_provider_id == service_provider_id) + .cloned() + .collect(); + Self(promotions) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn total_shares(&self) -> Decimal { + self.0.iter().map(|x| Decimal::from(x.shares)).sum() + } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } +} + +impl From for ServiceProviderPromotions +where + F: IntoIterator, +{ + fn from(promotions: F) -> Self { + Self(promotions.into_iter().collect()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct PromotionRewardShare { + pub service_provider_id: ServiceProviderId, + pub rewardable_entity: Entity, + pub shares: u64, +} + +pub async fn save_promotion_reward( + transaction: &mut Transaction<'_, Postgres>, + promotion_reward: &PromotionReward, +) -> anyhow::Result<()> { + match &promotion_reward.entity { + Entity::SubscriberId(subscriber_id) => { + sqlx::query( + r#" + INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + "# + ) + .bind(promotion_reward.timestamp) + .bind(subscriber_id) + .bind(&promotion_reward.carrier_pub_key) + .bind(promotion_reward.shares as i64) + .execute(&mut *transaction) + .await?; + } + Entity::GatewayKey(gateway_key) => { + sqlx::query( + r#" + INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + "# + ) + .bind(promotion_reward.timestamp) + .bind(gateway_key) + .bind(&promotion_reward.carrier_pub_key) + .bind(promotion_reward.shares as i64) + .execute(&mut *transaction) + .await?; + } + } + Ok(()) +} + +pub async fn fetch_promotion_rewards( + pool: &PgPool, + carrier: &impl CarrierServiceVerifier, + epoch: &Range>, +) -> anyhow::Result { + let rewards = sqlx::query_as( + r#" + SELECT + subscriber_id, NULL as gateway_key, SUM(shares)::bigint as shares, carrier_key + FROM + subscriber_promotion_rewards + WHERE + time_of_reward >= $1 AND time_of_reward < $2 + GROUP BY + subscriber_id, carrier_key + UNION + SELECT + NULL as subscriber_id, gateway_key, SUM(shares)::bigint as shares, carrier_key + FROM + gateway_promotion_rewards + WHERE + time_of_reward >= $1 AND time_of_reward < $2 + GROUP + BY gateway_key, carrier_key + "#, + ) + .bind(epoch.start) + .bind(epoch.end) + .fetch(pool) + .map_err(anyhow::Error::from) + .and_then(|x: DbPromotionRewardShares| async move { + let service_provider_id = carrier + .payer_key_to_service_provider(&x.carrier_key.to_string()) + .await?; + Ok(PromotionRewardShare { + service_provider_id: service_provider_id as ServiceProviderId, + rewardable_entity: x.rewardable_entity, + shares: x.shares, + }) + }) + .try_collect() + .await?; + + Ok(ServiceProviderPromotions(rewards)) +} + +pub async fn clear_promotion_rewards( + tx: &mut Transaction<'_, Postgres>, + timestamp: &DateTime, +) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM subscriber_promotion_rewards WHERE time_of_reward < $1") + .bind(timestamp) + .execute(&mut *tx) + .await?; + sqlx::query("DELETE FROM gateway_promotion_rewards WHERE time_of_reward < $1") + .bind(timestamp) + .execute(&mut *tx) + .await?; + Ok(()) +} + +struct DbPromotionRewardShares { + pub carrier_key: PublicKeyBinary, + pub rewardable_entity: Entity, + pub shares: u64, +} + +impl sqlx::FromRow<'_, PgRow> for DbPromotionRewardShares { + fn from_row(row: &PgRow) -> sqlx::Result { + let subscriber_id: Option> = row.try_get("subscriber_id")?; + let shares: i64 = row.try_get("shares")?; + Ok(Self { + rewardable_entity: if let Some(subscriber_id) = subscriber_id { + Entity::SubscriberId(subscriber_id) + } else { + Entity::GatewayKey(row.try_get("gateway_key")?) + }, + shares: shares as u64, + carrier_key: row.try_get("carrier_key")?, + }) + } +} diff --git a/mobile_verifier/src/service_provider/reward.rs b/mobile_verifier/src/service_provider/reward.rs new file mode 100644 index 000000000..da460d39b --- /dev/null +++ b/mobile_verifier/src/service_provider/reward.rs @@ -0,0 +1,723 @@ +use std::ops::Range; + +use chrono::{DateTime, Utc}; + +use file_store::traits::TimestampEncode; +use rust_decimal::{Decimal, RoundingStrategy}; +use rust_decimal_macros::dec; + +use crate::reward_shares::{dc_to_mobile_bones, DEFAULT_PREC}; + +use super::{ + dc_sessions::ServiceProviderDCSessions, + promotions::{funds::ServiceProviderFunds, rewards::ServiceProviderPromotions}, +}; + +mod proto { + pub use helium_proto::services::poc_mobile::{ + mobile_reward_share::Reward, MobileRewardShare, PromotionReward, ServiceProviderReward, + }; +} + +pub fn rewards_per_share( + total_sp_dc: Decimal, + total_sp_rewards: Decimal, + mobile_bone_price: Decimal, +) -> Decimal { + let total_sp_rewards_used = dc_to_mobile_bones(total_sp_dc, mobile_bone_price); + let capped_sp_rewards_used = total_sp_rewards_used.min(total_sp_rewards); + + if capped_sp_rewards_used > Decimal::ZERO { + (capped_sp_rewards_used / total_sp_dc) + .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::MidpointNearestEven) + } else { + Decimal::ZERO + } +} + +/// Container for all Service Provider rewarding +#[derive(Debug)] +pub struct ServiceProviderRewardInfos { + coll: Vec, + total_sp_allocation: Decimal, + all_transfer: Decimal, + mobile_bone_price: Decimal, + reward_epoch: Range>, +} + +// Represents a single Service Providers information for rewarding, +// only used internally. +#[derive(Debug, Clone, PartialEq)] +struct RewardInfo { + // proto::ServiceProvider enum repr + sp_id: i32, + + // Total DC transferred for reward epoch + dc: Decimal, + // % of total allocated rewards for dc transfer + dc_perc: Decimal, + // % allocated from DC to promo rewards (found in db from file from on chain) + allocated_promo_perc: Decimal, + + // % of total allocated rewards going towards promotions + realized_promo_perc: Decimal, + // % of total allocated rewards awarded for dc transfer + realized_dc_perc: Decimal, + // % matched promotions from unallocated, can never exceed realized_promo_perc + matched_promo_perc: Decimal, + + // Rewards for the epoch + promotion_rewards: ServiceProviderPromotions, +} + +impl ServiceProviderRewardInfos { + pub fn new( + dc_sessions: ServiceProviderDCSessions, + promo_funds: ServiceProviderFunds, + rewards: ServiceProviderPromotions, + total_sp_allocation: Decimal, + mobile_bone_price: Decimal, + reward_epoch: Range>, + ) -> Self { + let all_transfer = dc_sessions.all_transfer(); + + let mut me = Self { + coll: vec![], + all_transfer, + total_sp_allocation, + mobile_bone_price, + reward_epoch, + }; + + let used_allocation = total_sp_allocation.max(all_transfer); + for (dc_session, dc_transfer) in dc_sessions.iter() { + let promo_fund_perc = promo_funds.get_fund_percent(dc_session); + me.coll.push(RewardInfo::new( + dc_session, + dc_transfer, + promo_fund_perc, + used_allocation, + rewards.for_service_provider(dc_session), + )); + } + + distribute_unallocated(&mut me.coll); + + me + } + + pub fn iter_rewards(&self) -> Vec<(u64, proto::MobileRewardShare)> { + let rewards_per_share = rewards_per_share( + self.all_transfer, + self.total_sp_allocation, + self.mobile_bone_price, + ); + let sp_rewards = self.total_sp_allocation * rewards_per_share; + // NOTE(mj): `rewards_per_share * self.dc` vs `sp_rewards * self.dc_perc` + // They're veeeeery close. But the % multiplication produces a floating point number + // that will typically be rounded down. + + self.coll + .iter() + .flat_map(|sp| { + let mut rewards = sp.promo_rewards(sp_rewards, &self.reward_epoch); + rewards.push(sp.carrier_reward(sp_rewards, &self.reward_epoch)); + rewards + }) + .filter(|(amount, _r)| *amount > 0) + .collect() + } +} + +impl RewardInfo { + pub fn new( + sp_id: i32, + dc_transfer: Decimal, + promo_fund_perc: Decimal, + total_sp_allocation: Decimal, + rewards: ServiceProviderPromotions, + ) -> Self { + let dc_perc = dc_transfer / total_sp_allocation; + let realized_promo_perc = if rewards.is_empty() { + dec!(0) + } else { + dc_perc * promo_fund_perc + }; + let realized_dc_perc = dc_perc - realized_promo_perc; + + Self { + sp_id, + dc: dc_transfer, + allocated_promo_perc: promo_fund_perc, + + dc_perc, + realized_promo_perc, + realized_dc_perc, + matched_promo_perc: dec!(0), + + promotion_rewards: rewards, + } + } + + pub fn carrier_reward( + &self, + total_allocation: Decimal, + reward_period: &Range>, + ) -> (u64, proto::MobileRewardShare) { + let amount = (total_allocation * self.realized_dc_perc).to_u64_rounded(); + + ( + amount, + proto::MobileRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(proto::Reward::ServiceProviderReward( + proto::ServiceProviderReward { + service_provider_id: self.sp_id, + amount, + }, + )), + }, + ) + } + + pub fn promo_rewards( + &self, + total_allocation: Decimal, + reward_period: &Range>, + ) -> Vec<(u64, proto::MobileRewardShare)> { + if self.promotion_rewards.is_empty() { + return vec![]; + } + + let mut rewards = vec![]; + + let sp_amount = total_allocation * self.realized_promo_perc; + let matched_amount = total_allocation * self.matched_promo_perc; + + let total_shares = self.promotion_rewards.total_shares(); + let sp_amount_per_share = sp_amount / total_shares; + let matched_amount_per_share = matched_amount / total_shares; + + for r in self.promotion_rewards.iter() { + let shares = Decimal::from(r.shares); + + let service_provider_amount = (sp_amount_per_share * shares).to_u64_rounded(); + let matched_amount = (matched_amount_per_share * shares).to_u64_rounded(); + + let total_amount = service_provider_amount + matched_amount; + + rewards.push(( + total_amount, + proto::MobileRewardShare { + start_period: reward_period.start.encode_timestamp(), + end_period: reward_period.end.encode_timestamp(), + reward: Some(proto::Reward::PromotionReward(proto::PromotionReward { + service_provider_amount, + matched_amount, + entity: Some(r.rewardable_entity.clone().into()), + })), + }, + )) + } + + rewards + } +} + +fn distribute_unallocated(coll: &mut [RewardInfo]) { + let allocated_perc = coll.iter().map(|x| x.dc_perc).sum::(); + let unallocated_perc = dec!(1) - allocated_perc; + + let maybe_matching_perc = coll + .iter() + .filter(|x| !x.promotion_rewards.is_empty()) + .map(|x| x.realized_promo_perc) + .sum::(); + + if maybe_matching_perc > unallocated_perc { + distribute_unalloc_over_limit(coll, unallocated_perc); + } else { + distribute_unalloc_under_limit(coll); + } +} + +fn distribute_unalloc_over_limit(coll: &mut [RewardInfo], unallocated_perc: Decimal) { + // NOTE: This can also allocate based off the dc_perc of each carrier. + let total = coll.iter().map(|x| x.realized_promo_perc).sum::() * dec!(100); + + for sp in coll.iter_mut() { + if sp.promotion_rewards.is_empty() { + continue; + } + let shares = sp.realized_promo_perc * dec!(100); + sp.matched_promo_perc = (shares / total) * unallocated_perc; + } +} + +fn distribute_unalloc_under_limit(coll: &mut [RewardInfo]) { + for sp in coll.iter_mut() { + if sp.promotion_rewards.is_empty() { + continue; + } + sp.matched_promo_perc = sp.realized_promo_perc + } +} + +trait DecimalRoundingExt { + fn to_u64_rounded(&self) -> u64; +} + +impl DecimalRoundingExt for Decimal { + fn to_u64_rounded(&self) -> u64 { + use rust_decimal::{prelude::ToPrimitive, RoundingStrategy}; + + self.round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0) + } +} + +#[cfg(test)] +mod tests { + use chrono::Duration; + use file_store::promotion_reward::Entity; + use helium_proto::services::poc_mobile::{MobileRewardShare, PromotionReward}; + + use crate::service_provider::{self, promotions::rewards::PromotionRewardShare}; + + use super::*; + + use super::ServiceProviderRewardInfos; + + impl ServiceProviderRewardInfos { + fn iter_sp_rewards(&self, sp_id: i32) -> Vec { + let rewards_per_share = rewards_per_share( + self.all_transfer, + self.total_sp_allocation, + self.mobile_bone_price, + ); + let sp_rewards = self.total_sp_allocation * rewards_per_share; + + for info in self.coll.iter() { + if info.sp_id == sp_id { + let mut result = info.promo_rewards(sp_rewards, &self.reward_epoch); + result.push(info.carrier_reward(sp_rewards, &self.reward_epoch)); + return result.into_iter().map(|(_, x)| x).collect(); + } + } + vec![] + } + fn single_sp_rewards( + &self, + sp_id: i32, + ) -> (proto::PromotionReward, proto::ServiceProviderReward) { + let binding = self.iter_sp_rewards(sp_id); + let mut rewards = binding.iter(); + + let promo = rewards.next().cloned().unwrap().promotion_reward(); + let sp = rewards.next().cloned().unwrap().sp_reward(); + + (promo, sp) + } + } + + trait RewardExt { + fn promotion_reward(self) -> proto::PromotionReward; + fn sp_reward(self) -> proto::ServiceProviderReward; + } + + impl RewardExt for proto::MobileRewardShare { + fn promotion_reward(self) -> proto::PromotionReward { + match self.reward { + Some(proto::Reward::PromotionReward(promo)) => promo.clone(), + other => panic!("expected promotion reward, got {other:?}"), + } + } + + fn sp_reward(self) -> proto::ServiceProviderReward { + match self.reward { + Some(proto::Reward::ServiceProviderReward(promo)) => promo.clone(), + other => panic!("expected sp reward, got {other:?}"), + } + } + } + + #[test] + fn unallocated_reward_scaling_1() { + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dec!(12)), (1, dec!(6))]), + ServiceProviderFunds::from([(0, 5000), (1, 5000)]), + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 1, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 1, + }, + ]), + dec!(100), + dec!(0.00001), + epoch(), + ); + + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 6); + assert_eq!(promo_1.matched_amount, 6); + assert_eq!(sp_1.amount, 6); + + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 3); + assert_eq!(promo_2.matched_amount, 3); + assert_eq!(sp_2.amount, 3); + } + + #[test] + fn unallocated_reward_scaling_2() { + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dec!(12)), (1, dec!(6))]), + ServiceProviderFunds::from([(0, 5000), (1, 10000)]), + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 1, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 1, + }, + ]), + dec!(100), + dec!(0.00001), + epoch(), + ); + + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 6); + assert_eq!(promo_1.matched_amount, 6); + assert_eq!(sp_1.amount, 6); + + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 6); + assert_eq!(promo_2.matched_amount, 6); + assert_eq!(sp_2.amount, 0); + } + + #[test] + fn unallocated_reward_scaling_3() { + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dec!(10)), (1, dec!(1000))]), + ServiceProviderFunds::from([(0, 10000), (1, 200)]), + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 1, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 1, + }, + ]), + dec!(2000), + dec!(0.00001), + epoch(), + ); + + let (promo_1, sp_1) = sp_infos.single_sp_rewards(0); + assert_eq!(promo_1.service_provider_amount, 10); + assert_eq!(promo_1.matched_amount, 10); + assert_eq!(sp_1.amount, 0); + + let (promo_2, sp_2) = sp_infos.single_sp_rewards(1); + assert_eq!(promo_2.service_provider_amount, 20); + assert_eq!(promo_2.matched_amount, 20); + assert_eq!(sp_2.amount, 980); + } + + #[test] + fn no_rewards_if_none_allocated() { + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dec!(100))]), + ServiceProviderFunds::from([(0, 5000)]), + ServiceProviderPromotions::from([PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }]), + dec!(0), + dec!(0.0001), + epoch(), + ); + + assert!(sp_infos.iter_rewards().is_empty()); + } + + #[test] + fn no_matched_rewards_if_no_unallocated() { + let total_rewards = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, total_rewards)]), + ServiceProviderFunds::from([(0, 5000)]), + ServiceProviderPromotions::from([PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }]), + total_rewards, + dec!(0.001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + + assert!(!promo_rewards.is_empty()); + for reward in promo_rewards { + assert_eq!(reward.matched_amount, 0); + } + } + + #[test] + fn single_sp_unallocated_less_than_matched_distributed_by_shares() { + // 100 unallocated + let total_rewards = dec!(1100); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderFunds::from([(0, 10000)]), // All rewards allocated to promotions + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 2, + }, + ]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 333); + assert_eq!(promo_rewards[0].matched_amount, 33); + // + assert_eq!(promo_rewards[1].service_provider_amount, 666); + assert_eq!(promo_rewards[1].matched_amount, 66); + } + + #[test] + fn single_sp_unallocated_more_than_matched_promotion() { + // 1,000 unallocated + let total_rewards = dec!(11_000); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderFunds::from([(0, 10000)]), // All rewards allocated to promotions + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 2, + }, + ]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 333); + assert_eq!(promo_rewards[0].matched_amount, 333); + // + assert_eq!(promo_rewards[1].service_provider_amount, 666); + assert_eq!(promo_rewards[1].matched_amount, 666); + } + + #[test] + fn unallocated_matching_does_not_exceed_promotion() { + // 100 unallocated + let total_rewards = dec!(1100); + let sp_session = dec!(1000); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, sp_session)]), + ServiceProviderFunds::from([(0, 100)]), // Severely limit promotion rewards + ServiceProviderPromotions::from([ + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![0]), + shares: 1, + }, + PromotionRewardShare { + service_provider_id: 0, + rewardable_entity: Entity::SubscriberId(vec![1]), + shares: 2, + }, + ]), + total_rewards, + dec!(0.00001), + epoch(), + ); + + let promo_rewards = sp_infos.iter_rewards().only_promotion_rewards(); + assert_eq!(2, promo_rewards.len()); + + assert_eq!(promo_rewards[0].service_provider_amount, 3); + assert_eq!(promo_rewards[0].matched_amount, 3); + // + assert_eq!(promo_rewards[1].service_provider_amount, 6); + assert_eq!(promo_rewards[1].matched_amount, 6); + } + + fn epoch() -> Range> { + let now = Utc::now(); + now - Duration::hours(24)..now + } + + trait PromoRewardFiltersExt { + fn only_promotion_rewards(&self) -> Vec; + } + + impl PromoRewardFiltersExt for Vec<(u64, MobileRewardShare)> { + fn only_promotion_rewards(&self) -> Vec { + self.clone() + .into_iter() + .filter_map(|(_, r)| { + if let Some(proto::Reward::PromotionReward(reward)) = r.reward { + Some(reward) + } else { + None + } + }) + .collect() + } + } + + use proptest::prelude::*; + + prop_compose! { + fn arb_share()(sp_id in 0..10_i32, ent_id in 0..200u8, shares in 1..=100u64) -> PromotionRewardShare { + PromotionRewardShare { + service_provider_id: sp_id, + rewardable_entity: Entity::SubscriberId(vec![ent_id]), + shares + } + } + } + + prop_compose! { + fn arb_dc_session()( + sp_id in 0..10_i32, + // below 1 trillion + dc_session in (0..=1_000_000_000_000_u64).prop_map(Decimal::from) + ) -> (i32, Decimal) { + (sp_id, dc_session) + } + } + + prop_compose! { + fn arb_fund()(sp_id in 0..10_i32, bps in arb_bps()) -> (i32, u16) { + (sp_id, bps) + } + } + + prop_compose! { + fn arb_bps()(bps in 0..=10_000u16) -> u16 { bps } + } + + proptest! { + // #![proptest_config(ProptestConfig::with_cases(100_000))] + + #[test] + fn single_provider_does_not_overallocate( + dc_session in any::().prop_map(Decimal::from), + fund_bps in arb_bps(), + shares in prop::collection::vec(arb_share(), 0..10), + total_allocation in any::().prop_map(Decimal::from) + ) { + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from([(0, dc_session)]), + ServiceProviderFunds::from([(0, fund_bps)]), + ServiceProviderPromotions::from(shares), + total_allocation, + dec!(0.00001), + epoch() + ); + + let total_perc= sp_infos.total_percent(); + assert!(total_perc <= dec!(1)); + + let mut allocated = dec!(0); + for (amount, _) in sp_infos.iter_rewards() { + allocated += Decimal::from(amount); + } + assert!(allocated <= total_allocation); + } + + #[test] + fn multiple_provider_does_not_overallocate( + dc_sessions in prop::collection::vec(arb_dc_session(), 0..10), + funds in prop::collection::vec(arb_fund(), 0..10), + promotions in prop::collection::vec(arb_share(), 0..100), + ) { + let epoch = epoch(); + let total_allocation = service_provider::get_scheduled_tokens(&epoch); + + let sp_infos = ServiceProviderRewardInfos::new( + ServiceProviderDCSessions::from(dc_sessions), + ServiceProviderFunds::from(funds), + ServiceProviderPromotions::from(promotions), + total_allocation, + dec!(0.00001), + epoch + ); + + let total_perc= sp_infos.total_percent(); + prop_assert!(total_perc <= dec!(1)); + + let mut allocated = dec!(0); + for (amount, _) in sp_infos.iter_rewards() { + allocated += Decimal::from(amount); + } + prop_assert!(allocated <= total_allocation); + } + + } + + impl RewardInfo { + fn total_percent(&self) -> Decimal { + self.realized_dc_perc + self.realized_promo_perc + self.matched_promo_perc + } + } + + impl ServiceProviderRewardInfos { + fn total_percent(&self) -> Decimal { + self.coll.iter().map(|x| x.total_percent()).sum() + } + } +} diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index 54a31d9dd..0ad90d158 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -25,6 +25,8 @@ pub struct Settings { pub database: db_store::Settings, pub ingest: file_store::Settings, pub data_transfer_ingest: file_store::Settings, + /// S3 ingest reading Service Provider Promotion Funds + pub promotion_ingest: file_store::Settings, pub output: file_store::Settings, /// S3 bucket from which new data sets are downloaded for oracle boosting /// assignments diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 47fe1cc30..d1e73dd47 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -9,8 +9,9 @@ use helium_proto::services::{ mobile_config::NetworkKeyRole, poc_mobile::{ mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward, - MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward, - RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, + MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, PromotionReward, + RadioReward, RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, + UnallocatedReward, }, }; use hex_assignments::{Assignment, HexAssignment, HexBoostData}; @@ -134,52 +135,50 @@ impl MockFileSinkReceiver { pub async fn receive_gateway_reward(&mut self) -> GatewayReward { match self.receive("receive_gateway_reward").await { - Some(mobile_reward) => { - println!("mobile_reward: {:?}", mobile_reward); - match mobile_reward.reward { - Some(MobileReward::GatewayReward(r)) => r, - _ => panic!("failed to get gateway reward"), - } - } + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::GatewayReward(r)) => r, + _ => panic!("failed to get gateway reward"), + }, None => panic!("failed to receive gateway reward"), } } pub async fn receive_service_provider_reward(&mut self) -> ServiceProviderReward { match self.receive("receive_service_provider_reward").await { - Some(mobile_reward) => { - println!("mobile_reward: {:?}", mobile_reward); - match mobile_reward.reward { - Some(MobileReward::ServiceProviderReward(r)) => r, - _ => panic!("failed to get service provider reward"), - } - } + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::ServiceProviderReward(r)) => r, + _ => panic!("failed to get service provider reward"), + }, None => panic!("failed to receive service provider reward"), } } pub async fn receive_subscriber_reward(&mut self) -> SubscriberReward { match self.receive("receive_subscriber_reward").await { - Some(mobile_reward) => { - println!("mobile_reward: {:?}", mobile_reward); - match mobile_reward.reward { - Some(MobileReward::SubscriberReward(r)) => r, - _ => panic!("failed to get subscriber reward"), - } - } + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::SubscriberReward(r)) => r, + _ => panic!("failed to get subscriber reward"), + }, None => panic!("failed to receive subscriber reward"), } } + pub async fn receive_promotion_reward(&mut self) -> PromotionReward { + match self.receive("receive_promotion_reward").await { + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::PromotionReward(r)) => r, + _ => panic!("failed to get promotion reward"), + }, + None => panic!("failed to receive promotion reward"), + } + } + pub async fn receive_unallocated_reward(&mut self) -> UnallocatedReward { match self.receive("receive_unallocated_reward").await { - Some(mobile_reward) => { - println!("mobile_reward: {:?}", mobile_reward); - match mobile_reward.reward { - Some(MobileReward::UnallocatedReward(r)) => r, - _ => panic!("failed to get unallocated reward"), - } - } + Some(mobile_reward) => match mobile_reward.reward { + Some(MobileReward::UnallocatedReward(r)) => r, + _ => panic!("failed to get unallocated reward"), + }, None => panic!("failed to receive unallocated reward"), } } diff --git a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs index 9aa63f3dc..336bd57d3 100644 --- a/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs +++ b/mobile_verifier/tests/integrations/rewarder_sp_rewards.rs @@ -3,6 +3,8 @@ use std::string::ToString; use async_trait::async_trait; use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use file_store::promotion_reward::{self, PromotionReward}; +use helium_crypto::PublicKeyBinary; use helium_proto::{ services::poc_mobile::{ MobileRewardShare, ServiceProviderReward, UnallocatedReward, UnallocatedRewardType, @@ -12,10 +14,18 @@ use helium_proto::{ use rust_decimal::prelude::*; use rust_decimal_macros::dec; use sqlx::{PgPool, Postgres, Transaction}; +use uuid::Uuid; use crate::common::{self, MockFileSinkReceiver}; use mobile_config::client::{carrier_service_client::CarrierServiceVerifier, ClientError}; -use mobile_verifier::{data_session, reward_shares, rewarder}; +use mobile_verifier::{ + data_session, reward_shares, rewarder, + service_provider::{ + self, + promotions::{funds::save_promotion_fund, rewards::save_promotion_reward}, + ServiceProviderId, + }, +}; const HOTSPOT_1: &str = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6"; const HOTSPOT_2: &str = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL"; @@ -84,13 +94,14 @@ async fn test_service_provider_rewards(pool: PgPool) -> anyhow::Result<()> { .unwrap() .to_string() ); - assert_eq!(6000, sp_reward.amount); + assert_eq!(5_999, sp_reward.amount); assert_eq!( UnallocatedRewardType::ServiceProvider as i32, unallocated_reward.reward_type ); - assert_eq!(8_219_178_076_191, unallocated_reward.amount); + assert_eq!(8_219_178_076_192, unallocated_reward.amount); + // confirm the total rewards allocated matches expectations let expected_sum = reward_shares::get_scheduled_tokens_for_service_providers(epoch.end - epoch.start) @@ -144,6 +155,110 @@ async fn test_service_provider_rewards_invalid_sp(pool: PgPool) -> anyhow::Resul Ok(()) } +#[sqlx::test] +async fn test_service_provider_promotion_rewards(pool: PgPool) -> anyhow::Result<()> { + // Single SP has allocated shares for a few of their subscribers. + // Rewards are matched by the unallocated SP rewards for the subscribers + + let valid_sps = HashMap::from_iter([(PAYER_1.to_string(), SP_1.to_string())]); + let carrier_client = MockCarrierServiceClient::new(valid_sps); + + let now = Utc::now(); + let epoch = (now - ChronoDuration::hours(24))..now; + let (mobile_rewards_client, mut mobile_rewards) = common::create_file_sink(); + + let mut txn = pool.begin().await?; + seed_hotspot_data(epoch.end, &mut txn).await?; // DC transferred == 6,000 reward amount + seed_sp_promotion_rewards_with_random_subscribers( + PAYER_1.to_string().parse().unwrap(), + &[1, 2, 3], + &mut txn, + ) + .await?; + // promotions allocated 10.00% + seed_sp_promotion_rewards_funds(&[(0, 1500)], &mut txn).await?; + txn.commit().await?; + + let (_, rewards) = tokio::join!( + rewarder::reward_service_providers( + &pool, + &carrier_client, + &mobile_rewards_client, + &epoch, + dec!(0.00001) + ), + async move { + let mut promos = vec![ + mobile_rewards.receive_promotion_reward().await, + mobile_rewards.receive_promotion_reward().await, + mobile_rewards.receive_promotion_reward().await, + ]; + // sort by awarded amount least -> most + promos.sort_by_key(|a| a.service_provider_amount); + + let sp_reward = mobile_rewards.receive_service_provider_reward().await; + let unallocated = mobile_rewards.receive_unallocated_reward().await; + + mobile_rewards.assert_no_messages(); + + (promos, sp_reward, unallocated) + } + ); + + let (promos, sp_reward, unallocated) = rewards; + let promo_reward_1 = promos[0].clone(); + let promo_reward_2 = promos[1].clone(); + let promo_reward_3 = promos[2].clone(); + + // 1 share + assert_eq!(promo_reward_1.service_provider_amount, 1_500); + assert_eq!(promo_reward_1.matched_amount, 1_500); + + // 2 shares + assert_eq!(promo_reward_2.service_provider_amount, 3_000); + assert_eq!(promo_reward_2.matched_amount, 3_000); + + // 3 shares + assert_eq!(promo_reward_3.service_provider_amount, 4_500); + assert_eq!(promo_reward_3.matched_amount, 4_500); + + // dc_percentage * total_sp_allocation rounded down + assert_eq!(sp_reward.amount, 50_999); + + let unallocated_sp_rewards = get_unallocated_sp_rewards(&epoch); + let expected_unallocated = unallocated_sp_rewards + - 50_999 // 85% service provider rewards rounded down + - 9_000 // 15% service provider promotions + - 9_000; // matched promotion + + assert_eq!(unallocated.amount, expected_unallocated); + + // Ensure the cleanup job can run + let mut txn = pool.begin().await?; + + service_provider::db::clear_promotion_rewards(&mut txn, &Utc::now()).await?; + txn.commit().await?; + + let promos = service_provider::db::fetch_promotion_rewards( + &pool, + &carrier_client, + &(epoch.start..Utc::now()), + ) + .await?; + assert!(promos.is_empty()); + + let sp_allocations = service_provider::reward_data_sp_allocations(&pool).await?; + assert_eq!( + vec![helium_proto::ServiceProviderAllocation { + service_provider: 0, + incentive_escrow_fund_bps: 1500 + }], + sp_allocations + ); + + Ok(()) +} + async fn receive_expected_rewards( mobile_rewards: &mut MockFileSinkReceiver, ) -> anyhow::Result<(ServiceProviderReward, UnallocatedReward)> { @@ -171,7 +286,7 @@ async fn seed_hotspot_data( payer: PAYER_1.parse().unwrap(), upload_bytes: 1024 * 1000, download_bytes: 1024 * 10000, - num_dcs: 10000, + num_dcs: 10_000, received_timestamp: ts - ChronoDuration::hours(1), }; @@ -180,7 +295,7 @@ async fn seed_hotspot_data( payer: PAYER_1.parse().unwrap(), upload_bytes: 1024 * 1000, download_bytes: 1024 * 50000, - num_dcs: 50000, + num_dcs: 50_000, received_timestamp: ts - ChronoDuration::hours(2), }; @@ -198,7 +313,7 @@ async fn seed_hotspot_data_invalid_sp( payer: PAYER_1.parse().unwrap(), upload_bytes: 1024 * 1000, download_bytes: 1024 * 10000, - num_dcs: 10000, + num_dcs: 10_000, received_timestamp: ts - ChronoDuration::hours(2), }; @@ -207,7 +322,7 @@ async fn seed_hotspot_data_invalid_sp( payer: PAYER_2.parse().unwrap(), upload_bytes: 1024 * 1000, download_bytes: 1024 * 50000, - num_dcs: 50000, + num_dcs: 50_000, received_timestamp: ts - ChronoDuration::hours(2), }; @@ -215,3 +330,48 @@ async fn seed_hotspot_data_invalid_sp( data_session_2.save(txn).await?; Ok(()) } + +// Service Provider promotion rewards are verified during ingest. When you write +// directly to the database, the assumption is the entity and the payer are +// valid. +async fn seed_sp_promotion_rewards_with_random_subscribers( + payer: PublicKeyBinary, + sub_shares: &[u64], + txn: &mut Transaction<'_, Postgres>, +) -> anyhow::Result<()> { + for &shares in sub_shares { + save_promotion_reward( + txn, + &PromotionReward { + entity: promotion_reward::Entity::SubscriberId(Uuid::new_v4().into()), + shares, + timestamp: Utc::now() - chrono::Duration::hours(2), + received_timestamp: Utc::now(), + carrier_pub_key: payer.clone(), + signature: vec![], + }, + ) + .await?; + } + + Ok(()) +} + +async fn seed_sp_promotion_rewards_funds( + sp_fund_allocations: &[(ServiceProviderId, u16)], + txn: &mut Transaction<'_, Postgres>, +) -> anyhow::Result<()> { + for (sp_id, basis_points) in sp_fund_allocations { + save_promotion_fund(txn, *sp_id, *basis_points).await?; + } + + Ok(()) +} + +// Helper for turning Decimal -> u64 to compare against output rewards +fn get_unallocated_sp_rewards(epoch: &std::ops::Range>) -> u64 { + reward_shares::get_scheduled_tokens_for_service_providers(epoch.end - epoch.start) + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0) +} diff --git a/promotion_fund/Cargo.toml b/promotion_fund/Cargo.toml index eedd3135f..d6d3418dd 100644 --- a/promotion_fund/Cargo.toml +++ b/promotion_fund/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "promotion_fund" +name = "promotion-fund" version = "0.1.0" description = "Service Provider promotion fund tracking for the Helium Network" authors.workspace = true