From 445ab74b9da88b8cf3904c50d74900f448ae02fe Mon Sep 17 00:00:00 2001 From: "Matthew M. Keeler" Date: Fri, 19 Jul 2024 09:47:05 -0400 Subject: [PATCH] feat: Add support for migrations (#90) --- .github/workflows/ci.yml | 6 +- contract-tests/Cargo.toml | 2 + contract-tests/src/client_entity.rs | 168 ++- contract-tests/src/command_params.rs | 44 +- contract-tests/src/main.rs | 34 +- launchdarkly-server-sdk/Cargo.toml | 3 + launchdarkly-server-sdk/src/client.rs | 764 +++++++++++- launchdarkly-server-sdk/src/config.rs | 6 +- .../src/events/dispatcher.rs | 144 ++- launchdarkly-server-sdk/src/events/event.rs | 194 ++- .../src/events/processor.rs | 7 +- launchdarkly-server-sdk/src/lib.rs | 6 + .../src/migrations/migrator.rs | 1036 +++++++++++++++++ launchdarkly-server-sdk/src/migrations/mod.rs | 111 ++ .../src/migrations/tracker.rs | 404 +++++++ launchdarkly-server-sdk/src/sampler.rs | 57 + launchdarkly-server-sdk/src/test_common.rs | 35 + 17 files changed, 2954 insertions(+), 67 deletions(-) create mode 100644 launchdarkly-server-sdk/src/migrations/migrator.rs create mode 100644 launchdarkly-server-sdk/src/migrations/mod.rs create mode 100644 launchdarkly-server-sdk/src/migrations/tracker.rs create mode 100644 launchdarkly-server-sdk/src/sampler.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e6b222e..57d7cca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,11 +1,11 @@ name: Run CI on: push: - branches: [ main ] + branches: [ main, 'feat/**' ] paths-ignore: - '**.md' # Do not need to run CI for markdown changes. pull_request: - branches: [ main ] + branches: [ main, 'feat/**' ] paths-ignore: - '**.md' @@ -57,4 +57,4 @@ jobs: run: sudo apt-get update && sudo apt-get install -y musl-tools - name: Build - run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl + run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl -p launchdarkly-server-sdk diff --git a/contract-tests/Cargo.toml b/contract-tests/Cargo.toml index 43431d7..a27aca0 100644 --- a/contract-tests/Cargo.toml +++ b/contract-tests/Cargo.toml @@ -17,6 +17,8 @@ futures = "0.3.12" hyper = { version = "0.14.19", features = ["client"] } hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]} hyper-tls = { version = "0.5.0", optional = true } +reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] } +async-mutex = "1.4.0" [features] default = ["rustls"] diff --git a/contract-tests/src/client_entity.rs b/contract-tests/src/client_entity.rs index d0435a0..4b3b158 100644 --- a/contract-tests/src/client_entity.rs +++ b/contract-tests/src/client_entity.rs @@ -1,4 +1,8 @@ -use launchdarkly_server_sdk::{Context, ContextBuilder, MultiContextBuilder, Reference}; +use futures::future::FutureExt; +use launchdarkly_server_sdk::{ + Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference, +}; +use std::sync::Arc; use std::time::Duration; const DEFAULT_POLLING_BASE_URL: &str = "https://sdk.launchdarkly.com"; @@ -12,7 +16,8 @@ use launchdarkly_server_sdk::{ }; use crate::command_params::{ - ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, SecureModeHashResponse, + ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, + MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse, }; use crate::HttpsConnector; use crate::{ @@ -24,7 +29,7 @@ use crate::{ }; pub struct ClientEntity { - client: Client, + client: Arc, } impl ClientEntity { @@ -131,10 +136,15 @@ impl ClientEntity { client.start_with_default_executor(); client.wait_for_initialization(Duration::from_secs(5)).await; - Ok(Self { client }) + Ok(Self { + client: Arc::new(client), + }) } - pub fn do_command(&self, command: CommandParams) -> Result, String> { + pub async fn do_command( + &self, + command: CommandParams, + ) -> Result, String> { match command.command.as_str() { "evaluate" => Ok(Some(CommandResponse::EvaluateFlag( self.evaluate(command.evaluate.ok_or("Evaluate params should be set")?), @@ -211,6 +221,132 @@ impl ClientEntity { }, ))) } + "migrationVariation" => { + let params = command + .migration_variation + .ok_or("migrationVariation params should be set")?; + + let (stage, _) = self.client.migration_variation( + ¶ms.context, + ¶ms.key, + params.default_stage, + ); + + Ok(Some(CommandResponse::MigrationVariation( + MigrationVariationResponse { result: stage }, + ))) + } + "migrationOperation" => { + let params = command + .migration_operation + .ok_or("migrationOperation params should be set")?; + + let mut builder = MigratorBuilder::new(self.client.clone()); + + builder = builder + .read_execution_order(params.read_execution_order) + .track_errors(params.track_errors) + .track_latency(params.track_latency) + .read( + |payload: &Option| { + let old_endpoint = params.old_endpoint.clone(); + async move { + let result = send_payload(&old_endpoint, payload.clone()).await; + match result { + Ok(r) => Ok(Some(r)), + Err(e) => Err(e), + } + } + .boxed() + }, + |payload| { + let new_endpoint = params.new_endpoint.clone(); + async move { + let result = send_payload(&new_endpoint, payload.clone()).await; + match result { + Ok(r) => Ok(Some(r)), + Err(e) => Err(e), + } + } + .boxed() + }, + if params.track_consistency { + Some(|lhs, rhs| lhs == rhs) + } else { + None + }, + ) + .write( + |payload| { + let old_endpoint = params.old_endpoint.clone(); + async move { + let result = send_payload(&old_endpoint, payload.clone()).await; + match result { + Ok(r) => Ok(Some(r)), + Err(e) => Err(e), + } + } + .boxed() + }, + |payload| { + let new_endpoint = params.new_endpoint.clone(); + async move { + let result = send_payload(&new_endpoint, payload.clone()).await; + match result { + Ok(r) => Ok(Some(r)), + Err(e) => Err(e), + } + } + .boxed() + }, + ); + + let mut migrator = builder.build().expect("builder failed"); + match params.operation { + launchdarkly_server_sdk::Operation::Read => { + let result = migrator + .read( + ¶ms.context, + params.key, + params.default_stage, + params.payload, + ) + .await; + + let payload = match result.result { + Ok(payload) => payload.unwrap_or_else(|| "success".into()), + Err(e) => e.to_string(), + }; + + Ok(Some(CommandResponse::MigrationOperation( + MigrationOperationResponse { result: payload }, + ))) + } + launchdarkly_server_sdk::Operation::Write => { + let result = migrator + .write( + ¶ms.context, + params.key, + params.default_stage, + params.payload, + ) + .await; + + let payload = match result.authoritative.result { + Ok(payload) => payload.unwrap_or_else(|| "success".into()), + Err(e) => e.to_string(), + }; + + Ok(Some(CommandResponse::MigrationOperation( + MigrationOperationResponse { result: payload }, + ))) + } + _ => Err(format!( + "Invalid operation requested: {:?}", + params.operation + )), + } + } command => Err(format!("Invalid command requested: {}", command)), } } @@ -430,3 +566,25 @@ impl Drop for ClientEntity { self.client.close(); } } + +async fn send_payload(endpoint: &str, payload: Option) -> Result +where +{ + let client = reqwest::Client::new(); + let response = client + .post(endpoint) + .body(payload.unwrap_or_default()) + .send() + .await + .expect("sending request to SDK test harness"); + + if response.status().is_success() { + let body = response.text().await.expect("read harness response body"); + Ok(body.to_string()) + } else { + Err(format!( + "requested failed with status code {}", + response.status() + )) + } +} diff --git a/contract-tests/src/command_params.rs b/contract-tests/src/command_params.rs index 06c5279..a02819d 100644 --- a/contract-tests/src/command_params.rs +++ b/contract-tests/src/command_params.rs @@ -1,4 +1,6 @@ -use launchdarkly_server_sdk::{AttributeValue, Context, FlagDetail, FlagValue, Reason}; +use launchdarkly_server_sdk::{ + AttributeValue, Context, ExecutionOrder, FlagDetail, FlagValue, Operation, Reason, Stage, +}; use serde::{self, Deserialize, Serialize}; use std::collections::HashMap; @@ -9,6 +11,8 @@ pub enum CommandResponse { EvaluateAll(EvaluateAllFlagsResponse), ContextBuildOrConvert(ContextResponse), SecureModeHash(SecureModeHashResponse), + MigrationVariation(MigrationVariationResponse), + MigrationOperation(MigrationOperationResponse), } #[derive(Deserialize, Debug)] @@ -22,6 +26,8 @@ pub struct CommandParams { pub context_build: Option, pub context_convert: Option, pub secure_mode_hash: Option, + pub migration_variation: Option, + pub migration_operation: Option, } #[derive(Deserialize, Debug)] @@ -130,3 +136,39 @@ pub struct SecureModeHashParams { pub struct SecureModeHashResponse { pub result: String, } + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MigrationVariationParams { + pub key: String, + pub context: Context, + pub default_stage: Stage, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct MigrationVariationResponse { + pub result: Stage, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MigrationOperationParams { + pub key: String, + pub context: Context, + pub default_stage: Stage, + pub read_execution_order: ExecutionOrder, + pub operation: Operation, + pub old_endpoint: String, + pub new_endpoint: String, + pub payload: Option, + pub track_latency: bool, + pub track_errors: bool, + pub track_consistency: bool, +} + +#[derive(Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct MigrationOperationResponse { + pub result: String, +} diff --git a/contract-tests/src/main.rs b/contract-tests/src/main.rs index c72168b..510fec8 100644 --- a/contract-tests/src/main.rs +++ b/contract-tests/src/main.rs @@ -4,13 +4,14 @@ mod command_params; use crate::command_params::CommandParams; use actix_web::error::{ErrorBadRequest, ErrorInternalServerError}; use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Result}; +use async_mutex::Mutex; use client_entity::ClientEntity; use futures::executor; use hyper::client::HttpConnector; use launchdarkly_server_sdk::Reference; use serde::{self, Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; -use std::sync::{mpsc, Mutex}; +use std::sync::mpsc; use std::thread; #[derive(Serialize)] @@ -106,6 +107,8 @@ async fn status() -> impl Responder { "inline-context".to_string(), "anonymous-redaction".to_string(), "omit-anonymous-contexts".to_string(), + "migrations".to_string(), + "event-sampling".to_string(), ], }) } @@ -132,17 +135,8 @@ async fn create_client( Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)), }; - let mut counter = match app_state.counter.lock() { - Ok(c) => c, - Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve counter"), - }; - - let mut entities = match app_state.client_entities.lock() { - Ok(h) => h, - Err(_) => { - return HttpResponse::InternalServerError().body("Unable to lock client_entities") - } - }; + let mut counter = app_state.counter.lock().await; + let mut entities = app_state.client_entities.lock().await; *counter += 1; let client_resource = match req.url_for("client_path", [counter.to_string()]) { @@ -171,10 +165,7 @@ async fn do_command( let client_id = client_id.parse::().map_err(ErrorInternalServerError)?; - let entities = app_state - .client_entities - .lock() - .expect("Client entities cannot be locked"); + let entities = app_state.client_entities.lock().await; let entity = entities .get(&client_id) @@ -182,6 +173,7 @@ async fn do_command( let result = entity .do_command(command_params.into_inner()) + .await .map_err(ErrorBadRequest)?; match result { @@ -197,14 +189,8 @@ async fn stop_client(req: HttpRequest, app_state: web::Data) -> HttpRe Err(_) => return HttpResponse::BadRequest().body("Unable to parse client id"), }; - match app_state.client_entities.lock() { - Ok(mut entities) => { - entities.remove(&client_id); - } - Err(_) => { - return HttpResponse::InternalServerError().body("Unable to retrieve handles") - } - }; + let mut entities = app_state.client_entities.lock().await; + entities.remove(&client_id); HttpResponse::NoContent().finish() } else { diff --git a/launchdarkly-server-sdk/Cargo.toml b/launchdarkly-server-sdk/Cargo.toml index d8316bf..0ed459e 100644 --- a/launchdarkly-server-sdk/Cargo.toml +++ b/launchdarkly-server-sdk/Cargo.toml @@ -34,6 +34,7 @@ moka = { version = "0.12.1", features = ["sync"] } uuid = {version = "1.2.2", features = ["v4"] } hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] } hyper-rustls = { version = "0.24.1" , optional = true} +rand = "0.8" [dev-dependencies] maplit = "1.0.1" @@ -43,6 +44,8 @@ tokio = { version = "1.17.0", features = ["macros", "time"] } test-case = "3.2.1" mockito = "1.2.0" assert-json-diff = "2.0.2" +async-std = "1.12.0" +reqwest = { version = "0.12.4", features = ["json"] } [features] default = ["rustls"] diff --git a/launchdarkly-server-sdk/src/client.rs b/launchdarkly-server-sdk/src/client.rs index 77b276b..43c36ca 100644 --- a/launchdarkly-server-sdk/src/client.rs +++ b/launchdarkly-server-sdk/src/client.rs @@ -2,7 +2,7 @@ use eval::Context; use parking_lot::RwLock; use std::io; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::runtime::Runtime; @@ -22,6 +22,7 @@ use crate::events::event::EventFactory; use crate::events::event::InputEvent; use crate::events::processor::EventProcessor; use crate::events::processor_builders::BuildError as EventProcessorError; +use crate::{MigrationOpTracker, Stage}; struct EventsScope { disabled: bool, @@ -606,7 +607,9 @@ impl Client { flag_key: &str, default: T, ) -> Detail { - self.variation_internal(context, flag_key, default, &self.events_with_reasons) + let (detail, _) = + self.variation_internal(context, flag_key, default, &self.events_with_reasons); + detail } /// This is a generic function which returns the value of a feature flag for a given context. @@ -625,9 +628,37 @@ impl Client { flag_key: &str, default: T, ) -> FlagValue { - self.variation_internal(context, flag_key, default, &self.events_default) - .value - .unwrap() + let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default); + detail.value.unwrap() + } + + /// This method returns the migration stage of the migration feature flag for the given + /// evaluation context. + /// + /// This method returns the default stage if there is an error or the flag does not exist. + pub fn migration_variation( + &self, + context: &Context, + flag_key: &str, + default_stage: Stage, + ) -> (Stage, Arc>) { + let (detail, flag) = + self.variation_internal(context, flag_key, default_stage, &self.events_default); + + let migration_detail = + detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType); + let tracker = MigrationOpTracker::new( + flag_key.into(), + flag, + context.clone(), + migration_detail.clone(), + default_stage, + ); + + ( + migration_detail.value.unwrap_or(default_stage), + Arc::new(Mutex::new(tracker)), + ) } /// Reports that a context has performed an event. @@ -703,15 +734,51 @@ impl Client { Ok(()) } + /// Tracks the results of a migrations operation. This event includes measurements which can be + /// used to enhance the observability of a migration within the LaunchDarkly UI. + /// + /// This event should be generated through [crate::MigrationOpTracker]. If you are using the + /// [crate::Migrator] to handle migrations, this event will be created and emitted + /// automatically. + pub fn track_migration_op(&self, tracker: Arc>) { + if self.events_default.disabled { + return; + } + + match tracker.lock() { + Ok(tracker) => { + let event = tracker.build(); + match event { + Ok(event) => { + self.send_internal( + self.events_default.event_factory.new_migration_op(event), + ); + } + Err(e) => error!( + "Failed to build migration event, no event will be sent: {}", + e + ), + } + } + Err(e) => error!( + "Failed to lock migration tracker, no event will be sent: {}", + e + ), + } + } + fn variation_internal + Clone>( &self, context: &Context, flag_key: &str, default: T, events_scope: &EventsScope, - ) -> Detail { + ) -> (Detail, Option) { if self.offline { - return Detail::err_default(eval::Error::ClientNotReady, default.into()); + return ( + Detail::err_default(eval::Error::ClientNotReady, default.into()), + None, + ); } let (flag, result) = match self.initialized() { @@ -762,7 +829,7 @@ impl Client { self.send_internal(event); } - result + (result, flag) } fn send_internal(&self, event: InputEvent) { @@ -774,6 +841,7 @@ impl Client { mod tests { use crossbeam_channel::Receiver; use eval::{ContextBuilder, MultiContextBuilder}; + use futures::FutureExt; use hyper::client::HttpConnector; use launchdarkly_server_sdk_evaluation::Reason; use std::collections::HashMap; @@ -787,9 +855,10 @@ mod tests { use crate::events::processor_builders::EventProcessorBuilder; use crate::stores::store_types::{PatchTarget, StorageItem}; use crate::test_common::{ - self, basic_flag, basic_flag_with_prereq, basic_int_flag, basic_off_flag, + self, basic_flag, basic_flag_with_prereq, basic_int_flag, basic_migration_flag, + basic_off_flag, }; - use crate::ConfigBuilder; + use crate::{ConfigBuilder, MigratorBuilder, Operation, Origin}; use test_case::test_case; use super::*; @@ -1525,6 +1594,681 @@ mod tests { Ok(()) } + #[test] + fn migration_handles_flag_not_found() { + let (client, _event_rx) = make_mocked_client(); + client.start_with_default_executor(); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + let (stage, _tracker) = + client.migration_variation(&context, "non-existent-flag-key", Stage::Off); + + assert_eq!(stage, Stage::Off); + } + + #[test] + fn migration_uses_non_migration_flag() { + let (client, _event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "boolean-flag", + PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))), + ) + .expect("patch should apply"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off); + + assert_eq!(stage, Stage::Off); + } + + #[test_case(Stage::Off)] + #[test_case(Stage::DualWrite)] + #[test_case(Stage::Shadow)] + #[test_case(Stage::Live)] + #[test_case(Stage::Rampdown)] + #[test_case(Stage::Complete)] + fn migration_can_determine_correct_stage_from_flag(stage: Stage) { + let (client, _event_rx) = make_mocked_client(); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + let (evaluated_stage, _tracker) = + client.migration_variation(&context, "stage-flag", Stage::Off); + + assert_eq!(evaluated_stage, stage); + } + + #[tokio::test] + async fn migration_tracks_invoked_correctly() { + migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old]) + .await; + migration_tracks_invoked_correctly_driver( + Stage::DualWrite, + Operation::Read, + vec![Origin::Old], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Shadow, + Operation::Read, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Live, + Operation::Read, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Rampdown, + Operation::Read, + vec![Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Complete, + Operation::Read, + vec![Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old]) + .await; + migration_tracks_invoked_correctly_driver( + Stage::DualWrite, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Shadow, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Live, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Rampdown, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_invoked_correctly_driver( + Stage::Complete, + Operation::Write, + vec![Origin::New], + ) + .await; + } + + async fn migration_tracks_invoked_correctly_driver( + stage: Stage, + operation: Operation, + origins: Vec, + ) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .read( + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + Some(|_, _| true), + ) + .write( + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + if let Operation::Read = operation { + migrator + .read( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + } else { + migrator + .write( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + } + + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.invoked.len() == origins.len()); + assert!(event.invoked.iter().all(|i| origins.contains(i))); + } + _ => panic!("Expected migration event"), + } + } + + #[tokio::test] + async fn migration_tracks_latency() { + migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await; + migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await; + migration_tracks_latency_driver( + Stage::Shadow, + Operation::Read, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver( + Stage::Live, + Operation::Read, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await; + migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await; + migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await; + migration_tracks_latency_driver( + Stage::DualWrite, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver( + Stage::Shadow, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver( + Stage::Live, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver( + Stage::Rampdown, + Operation::Write, + vec![Origin::Old, Origin::New], + ) + .await; + migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await; + } + + async fn migration_tracks_latency_driver( + stage: Stage, + operation: Operation, + origins: Vec, + ) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .track_latency(true) + .read( + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + Some(|_, _| true), + ) + .write( + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + if let Operation::Read = operation { + migrator + .read( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + } else { + migrator + .write( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + } + + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.latency.len() == origins.len()); + assert!(event + .latency + .values() + .all(|l| l > &Duration::from_millis(100))); + } + _ => panic!("Expected migration event"), + } + } + + #[tokio::test] + async fn migration_tracks_read_errors() { + migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await; + migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await; + migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await; + migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await; + migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await; + migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await; + } + + async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .track_latency(true) + .read( + |_| async move { Err("fail".into()) }.boxed(), + |_| async move { Err("fail".into()) }.boxed(), + Some(|_: &String, _: &String| true), + ) + .write( + |_| async move { Err("fail".into()) }.boxed(), + |_| async move { Err("fail".into()) }.boxed(), + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + migrator + .read( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.errors.len() == origins.len()); + assert!(event.errors.iter().all(|i| origins.contains(i))); + } + _ => panic!("Expected migration event"), + } + } + + #[tokio::test] + async fn migration_tracks_authoritative_write_errors() { + migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await; + migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old]) + .await; + migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await; + migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await; + migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New]) + .await; + migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New]) + .await; + } + + async fn migration_tracks_authoritative_write_errors_driver( + stage: Stage, + origins: Vec, + ) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .track_latency(true) + .read( + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + None, + ) + .write( + |_| async move { Err("fail".into()) }.boxed(), + |_| async move { Err("fail".into()) }.boxed(), + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + migrator + .write( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.errors.len() == origins.len()); + assert!(event.errors.iter().all(|i| origins.contains(i))); + } + _ => panic!("Expected migration event"), + } + } + + #[tokio::test] + async fn migration_tracks_nonauthoritative_write_errors() { + migration_tracks_nonauthoritative_write_errors_driver( + Stage::DualWrite, + false, + true, + vec![Origin::New], + ) + .await; + migration_tracks_nonauthoritative_write_errors_driver( + Stage::Shadow, + false, + true, + vec![Origin::New], + ) + .await; + migration_tracks_nonauthoritative_write_errors_driver( + Stage::Live, + true, + false, + vec![Origin::Old], + ) + .await; + migration_tracks_nonauthoritative_write_errors_driver( + Stage::Rampdown, + true, + false, + vec![Origin::Old], + ) + .await; + } + + async fn migration_tracks_nonauthoritative_write_errors_driver( + stage: Stage, + fail_old: bool, + fail_new: bool, + origins: Vec, + ) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .track_latency(true) + .read( + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + |_| async move { Ok(serde_json::Value::Null) }.boxed(), + None, + ) + .write( + move |_| { + async move { + if fail_old { + Err("fail".into()) + } else { + Ok(serde_json::Value::Null) + } + } + .boxed() + }, + move |_| { + async move { + if fail_new { + Err("fail".into()) + } else { + Ok(serde_json::Value::Null) + } + } + .boxed() + }, + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + migrator + .write( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.errors.len() == origins.len()); + assert!(event.errors.iter().all(|i| origins.contains(i))); + } + _ => panic!("Expected migration event"), + } + } + + #[tokio::test] + async fn migration_tracks_consistency() { + migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await; + migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await; + migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await; + migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await; + } + + async fn migration_tracks_consistency_driver( + stage: Stage, + old_return: &'static str, + new_return: &'static str, + expected_consistency: bool, + ) { + let (client, event_rx) = make_mocked_client(); + let client = Arc::new(client); + client.start_with_default_executor(); + client + .data_store + .write() + .upsert( + "stage-flag", + PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))), + ) + .expect("patch should apply"); + + let mut migrator = MigratorBuilder::new(client.clone()) + .track_latency(true) + .read( + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::String(old_return.to_string())) + } + .boxed() + }, + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::String(new_return.to_string())) + } + .boxed() + }, + Some(|lhs, rhs| lhs == rhs), + ) + .write( + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + |_| { + async move { + async_std::task::sleep(Duration::from_millis(100)).await; + Ok(serde_json::Value::Null) + } + .boxed() + }, + ) + .build() + .expect("migrator should build"); + + let context = ContextBuilder::new("bob") + .build() + .expect("Failed to create context"); + + migrator + .read( + &context, + "stage-flag".into(), + Stage::Off, + serde_json::Value::Null, + ) + .await; + + client.flush(); + client.close(); + + let events = event_rx.iter().collect::>(); + assert_eq!(events.len(), 3); + match &events[1] { + OutputEvent::MigrationOp(event) => { + assert!(event.consistency_check == Some(expected_consistency)) + } + _ => panic!("Expected migration event"), + } + } + fn make_mocked_client_with_delay(delay: u64, offline: bool) -> (Client, Receiver) { let updates = Arc::new(MockDataSource::new_with_init_delay(delay)); let (event_sender, event_rx) = create_event_sender(); diff --git a/launchdarkly-server-sdk/src/config.rs b/launchdarkly-server-sdk/src/config.rs index 7e53546..1fa86ad 100644 --- a/launchdarkly-server-sdk/src/config.rs +++ b/launchdarkly-server-sdk/src/config.rs @@ -37,9 +37,9 @@ impl Tag { } } -impl ToString for &Tag { - fn to_string(&self) -> String { - format!("{}/{}", self.key, self.value) +impl std::fmt::Display for Tag { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.key, self.value) } } diff --git a/launchdarkly-server-sdk/src/events/dispatcher.rs b/launchdarkly-server-sdk/src/events/dispatcher.rs index 4638331..9d3ab3d 100644 --- a/launchdarkly-server-sdk/src/events/dispatcher.rs +++ b/launchdarkly-server-sdk/src/events/dispatcher.rs @@ -1,10 +1,13 @@ use crossbeam_channel::{bounded, select, tick, Receiver, Sender}; +use rand::thread_rng; use std::time::SystemTime; use launchdarkly_server_sdk_evaluation::Context; use lru::LruCache; use super::event::{BaseEvent, FeatureRequestEvent, IndexEvent}; +use crate::sampler::{Sampler, ThreadRngSampler}; + use super::sender::EventSenderResult; use super::{ event::{EventSummary, InputEvent, OutputEvent}, @@ -74,6 +77,7 @@ pub(super) struct EventDispatcher { last_known_time: u128, disabled: bool, thread_count: usize, + sampler: Box, } impl EventDispatcher { @@ -85,6 +89,7 @@ impl EventDispatcher { last_known_time: 0, disabled: false, thread_count: 5, + sampler: Box::new(ThreadRngSampler::new(thread_rng())), } } @@ -182,8 +187,19 @@ impl EventDispatcher { fn process_event(&mut self, event: InputEvent) { match event { + InputEvent::MigrationOp(migration_op) => { + if self + .sampler + .sample(migration_op.sampling_ratio.unwrap_or(1)) + { + self.outbox + .add_event(OutputEvent::MigrationOp(migration_op)); + } + } InputEvent::FeatureRequest(fre) => { - self.outbox.add_to_summary(&fre); + if !fre.exclude_from_summaries { + self.outbox.add_to_summary(&fre); + } let inlined = fre.clone().into_inline_with_anonymous_redaction( self.events_configuration.all_attributes_private, @@ -206,7 +222,10 @@ impl EventDispatcher { if let Some(debug_events_until_date) = fre.debug_events_until_date { let time = u128::from(debug_events_until_date); - if time > now && time > self.last_known_time { + if time > now + && time > self.last_known_time + && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) + { self.outbox .add_event(OutputEvent::Debug(fre.clone().into_inline( self.events_configuration.all_attributes_private, @@ -215,7 +234,7 @@ impl EventDispatcher { } } - if fre.track_events { + if fre.track_events && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) { self.outbox.add_event(OutputEvent::FeatureRequest(inlined)); } } @@ -228,11 +247,13 @@ impl EventDispatcher { } self.notice_context(&identify.base.context); - self.outbox - .add_event(OutputEvent::Identify(identify.into_inline( - self.events_configuration.all_attributes_private, - self.events_configuration.private_attributes.clone(), - ))); + if self.sampler.sample(identify.sampling_ratio.unwrap_or(1)) { + self.outbox + .add_event(OutputEvent::Identify(identify.into_inline( + self.events_configuration.all_attributes_private, + self.events_configuration.private_attributes.clone(), + ))); + } } InputEvent::Custom(custom) => { if let Some(context) = self.get_indexable_context(&custom.base) { @@ -244,7 +265,9 @@ impl EventDispatcher { .add_event(OutputEvent::Index(IndexEvent::from(base))); } - self.outbox.add_event(OutputEvent::Custom(custom)); + if self.sampler.sample(custom.sampling_ratio.unwrap_or(1)) { + self.outbox.add_event(OutputEvent::Custom(custom)); + } } } } @@ -426,6 +449,88 @@ mod tests { assert_eq!(1, dispatcher.outbox.summary.features.len()); } + #[test] + fn dispatcher_ignores_feature_events_with_0_sampling_ratio() { + let (event_sender, _) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let mut dispatcher = create_dispatcher(events_configuration); + + let context = ContextBuilder::new("context") + .build() + .expect("Failed to create context"); + let mut flag = basic_flag("flag"); + flag.sampling_ratio = Some(0); + flag.debug_events_until_date = Some(64_060_606_800_000); + flag.track_events = true; + + let detail = Detail { + value: Some(FlagValue::from(false)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }; + + let event_factory = EventFactory::new(true); + let feature_request_event = event_factory.new_eval_event( + &flag.key, + context, + &flag, + detail, + FlagValue::from(false), + None, + ); + + dispatcher.process_event(feature_request_event); + assert_eq!(1, dispatcher.outbox.events.len()); + assert_eq!("index", dispatcher.outbox.events[0].kind()); + assert_eq!(1, dispatcher.context_keys.len()); + assert_eq!(1, dispatcher.outbox.summary.features.len()); + } + + #[test] + fn dispatcher_can_exclude_feature_event_from_summaries() { + let (event_sender, _) = create_event_sender(); + let events_configuration = + create_events_configuration(event_sender, Duration::from_secs(100)); + let mut dispatcher = create_dispatcher(events_configuration); + + let context = ContextBuilder::new("context") + .build() + .expect("Failed to create context"); + let mut flag = basic_flag("flag"); + flag.exclude_from_summaries = true; + flag.debug_events_until_date = Some(64_060_606_800_000); + flag.track_events = true; + + let detail = Detail { + value: Some(FlagValue::from(false)), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }; + + let event_factory = EventFactory::new(true); + let feature_request_event = event_factory.new_eval_event( + &flag.key, + context, + &flag, + detail, + FlagValue::from(false), + None, + ); + + dispatcher.process_event(feature_request_event); + assert_eq!(3, dispatcher.outbox.events.len()); + assert_eq!("index", dispatcher.outbox.events[0].kind()); + assert_eq!("debug", dispatcher.outbox.events[1].kind()); + assert_eq!("feature", dispatcher.outbox.events[2].kind()); + assert_eq!(1, dispatcher.context_keys.len()); + assert_eq!(0, dispatcher.outbox.summary.features.len()); + } + #[test_case(0, 64_060_606_800_000, vec!["debug", "index", "summary"])] #[test_case(64_060_606_800_000, 64_060_606_800_000, vec!["index", "summary"])] #[test_case(64_060_606_800_001, 64_060_606_800_000, vec!["index", "summary"])] @@ -462,11 +567,12 @@ mod tests { create_events_configuration(event_sender, Duration::from_secs(100)); let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); - let mut dispatcher = create_dispatcher(events_configuration); - dispatcher.last_known_time = last_known_time; - let dispatcher_handle = thread::Builder::new() - .spawn(move || dispatcher.start(inbox_rx)) + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.last_known_time = last_known_time; + dispatcher.start(inbox_rx) + }) .unwrap(); inbox_tx @@ -653,9 +759,11 @@ mod tests { create_events_configuration(event_sender, Duration::from_secs(100)); let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); - let mut dispatcher = create_dispatcher(events_configuration); let dispatcher_handle = thread::Builder::new() - .spawn(move || dispatcher.start(inbox_rx)) + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.start(inbox_rx) + }) .unwrap(); let context = ContextBuilder::new("context") @@ -689,9 +797,11 @@ mod tests { create_events_configuration(event_sender, Duration::from_millis(200)); let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); - let mut dispatcher = create_dispatcher(events_configuration); let _ = thread::Builder::new() - .spawn(move || dispatcher.start(inbox_rx)) + .spawn(move || { + let mut dispatcher = create_dispatcher(events_configuration); + dispatcher.start(inbox_rx) + }) .unwrap(); let context = ContextBuilder::new("context") diff --git a/launchdarkly-server-sdk/src/events/event.rs b/launchdarkly-server-sdk/src/events/event.rs index 19442de..4a19eb5 100644 --- a/launchdarkly-server-sdk/src/events/event.rs +++ b/launchdarkly-server-sdk/src/events/event.rs @@ -1,6 +1,7 @@ use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Display, Formatter}; +use std::time::Duration; use launchdarkly_server_sdk_evaluation::{ Context, ContextAttributes, Detail, Flag, FlagValue, Kind, Reason, Reference, VariationIndex, @@ -8,6 +9,8 @@ use launchdarkly_server_sdk_evaluation::{ use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; +use crate::migrations::{Operation, Origin, Stage}; + #[derive(Clone, Debug, PartialEq)] pub struct BaseEvent { pub creation_date: u64, @@ -92,6 +95,156 @@ impl BaseEvent { } } +/// A MigrationOpEvent is generated through the migration op tracker provided through the SDK. +#[derive(Clone, Debug)] +pub struct MigrationOpEvent { + pub(crate) base: BaseEvent, + pub(crate) key: String, + pub(crate) version: Option, + pub(crate) operation: Operation, + pub(crate) default_stage: Stage, + pub(crate) evaluation: Detail, + pub(crate) sampling_ratio: Option, + pub(crate) invoked: HashSet, + pub(crate) consistency_check: Option, + pub(crate) consistency_check_ratio: Option, + pub(crate) errors: HashSet, + pub(crate) latency: HashMap, +} + +impl Serialize for MigrationOpEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("MigrationOpEvent", 10)?; + state.serialize_field("kind", "migration_op")?; + state.serialize_field("creationDate", &self.base.creation_date)?; + state.serialize_field("contextKeys", &self.base.context.context_keys())?; + state.serialize_field("operation", &self.operation)?; + + if !is_default_ratio(&self.sampling_ratio) { + state.serialize_field("samplingRatio", &self.sampling_ratio.unwrap_or(1))?; + } + + let evaluation = MigrationOpEvaluation { + key: self.key.clone(), + value: self.evaluation.value, + default: self.default_stage, + reason: self.evaluation.reason.clone(), + variation_index: self.evaluation.variation_index, + version: self.version, + }; + state.serialize_field("evaluation", &evaluation)?; + + let mut measurements = vec![]; + if !self.invoked.is_empty() { + measurements.push(MigrationOpMeasurement::Invoked(&self.invoked)); + } + + if let Some(consistency_check) = self.consistency_check { + measurements.push(MigrationOpMeasurement::ConsistencyCheck( + consistency_check, + self.consistency_check_ratio, + )); + } + + if !self.errors.is_empty() { + measurements.push(MigrationOpMeasurement::Errors(&self.errors)); + } + + if !self.latency.is_empty() { + measurements.push(MigrationOpMeasurement::Latency(&self.latency)); + } + + if !measurements.is_empty() { + state.serialize_field("measurements", &measurements)?; + } + + state.end() + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct MigrationOpEvaluation { + pub key: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, + + pub(crate) default: Stage, + + pub reason: Reason, + + #[serde(rename = "variation", skip_serializing_if = "Option::is_none")] + pub variation_index: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, +} + +enum MigrationOpMeasurement<'a> { + Invoked(&'a HashSet), + ConsistencyCheck(bool, Option), + Errors(&'a HashSet), + Latency(&'a HashMap), +} + +impl Serialize for MigrationOpMeasurement<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + MigrationOpMeasurement::Invoked(invoked) => { + let mut state = serializer.serialize_struct("invoked", 2)?; + state.serialize_field("key", "invoked")?; + + let invoked = invoked + .iter() + .map(|origin| (origin, true)) + .collect::>(); + state.serialize_field("values", &invoked)?; + state.end() + } + MigrationOpMeasurement::ConsistencyCheck(consistency_check, consistency_ratio) => { + let mut state = serializer.serialize_struct("consistency", 2)?; + state.serialize_field("key", "consistent")?; + state.serialize_field("value", &consistency_check)?; + + match consistency_ratio { + None | Some(1) => (), + Some(ratio) => state.serialize_field("samplingRatio", &ratio)?, + } + + state.end() + } + MigrationOpMeasurement::Errors(errors) => { + let mut state = serializer.serialize_struct("errors", 2)?; + state.serialize_field("key", "error")?; + + let errors = errors + .iter() + .map(|origin| (origin, true)) + .collect::>(); + state.serialize_field("values", &errors)?; + state.end() + } + MigrationOpMeasurement::Latency(latency) => { + let mut state = serializer.serialize_struct("latencies", 2)?; + state.serialize_field("key", "latency_ms")?; + let latencies = latency + .iter() + .map(|(origin, duration)| (origin, duration.as_millis() as u64)) + .collect::>(); + state.serialize_field("values", &latencies)?; + state.end() + } + } + } +} + #[derive(Clone, Debug, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct FeatureRequestEvent { @@ -112,6 +265,12 @@ pub struct FeatureRequestEvent { #[serde(skip)] pub(crate) debug_events_until_date: Option, + + #[serde(skip_serializing_if = "is_default_ratio")] + pub(crate) sampling_ratio: Option, + + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub(crate) exclude_from_summaries: bool, } impl FeatureRequestEvent { @@ -176,6 +335,8 @@ pub struct IdentifyEvent { #[serde(flatten)] pub(crate) base: BaseEvent, key: String, + #[serde(skip_serializing_if = "is_default_ratio")] + pub(crate) sampling_ratio: Option, } impl IdentifyEvent { @@ -203,6 +364,8 @@ pub struct CustomEvent { metric_value: Option, #[serde(skip_serializing_if = "serde_json::Value::is_null")] data: serde_json::Value, + #[serde(skip_serializing_if = "is_default_ratio")] + pub(crate) sampling_ratio: Option, } impl CustomEvent { @@ -239,6 +402,9 @@ pub enum OutputEvent { #[serde(rename = "summary")] Summary(EventSummary), + + #[serde(rename = "migration_op")] + MigrationOp(MigrationOpEvent), } impl OutputEvent { @@ -251,6 +417,7 @@ impl OutputEvent { OutputEvent::Identify { .. } => "identify", OutputEvent::Custom { .. } => "custom", OutputEvent::Summary { .. } => "summary", + OutputEvent::MigrationOp { .. } => "migration_op", } } } @@ -260,6 +427,7 @@ pub enum InputEvent { FeatureRequest(FeatureRequestEvent), Identify(IdentifyEvent), Custom(CustomEvent), + MigrationOp(MigrationOpEvent), } impl InputEvent { @@ -269,6 +437,7 @@ impl InputEvent { InputEvent::FeatureRequest(FeatureRequestEvent { base, .. }) => Some(base), InputEvent::Identify(IdentifyEvent { base, .. }) => Some(base), InputEvent::Custom(CustomEvent { base, .. }) => Some(base), + InputEvent::MigrationOp(MigrationOpEvent { base, .. }) => Some(base), } } } @@ -290,7 +459,7 @@ impl EventFactory { Self { send_reason } } - fn now() -> u64 { + pub(crate) fn now() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() @@ -335,15 +504,21 @@ impl EventFactory { let flag_track_events; let require_experiment_data; let debug_events_until_date; + let sampling_ratio; + let exclude_from_summaries; if let Some(f) = flag { flag_track_events = f.track_events; require_experiment_data = f.is_experimentation_enabled(&detail.reason); debug_events_until_date = f.debug_events_until_date; + sampling_ratio = f.sampling_ratio; + exclude_from_summaries = f.exclude_from_summaries; } else { flag_track_events = false; require_experiment_data = false; - debug_events_until_date = None + debug_events_until_date = None; + sampling_ratio = None; + exclude_from_summaries = false; } let reason = if self.send_reason || require_experiment_data { @@ -363,6 +538,8 @@ impl EventFactory { prereq_of, track_events: flag_track_events || require_experiment_data, debug_events_until_date, + sampling_ratio, + exclude_from_summaries, }) } @@ -370,9 +547,14 @@ impl EventFactory { InputEvent::Identify(IdentifyEvent { key: context.key().to_owned(), base: BaseEvent::new(Self::now(), context), + sampling_ratio: None, }) } + pub(crate) fn new_migration_op(&self, event: MigrationOpEvent) -> InputEvent { + InputEvent::MigrationOp(event) + } + pub fn new_custom( &self, context: Context, @@ -387,6 +569,7 @@ impl EventFactory { key: key.into(), metric_value, data, + sampling_ratio: None, })) } } @@ -593,6 +776,11 @@ impl From<(VariationKey, VariationSummary)> for VariationCounterOutput { } } +// Used strictly for serialization to determine if a ratio should be included in the JSON. +fn is_default_ratio(sampling_ratio: &Option) -> bool { + sampling_ratio.unwrap_or(1) == 1 +} + #[cfg(test)] mod tests { use launchdarkly_server_sdk_evaluation::{ @@ -1196,6 +1384,8 @@ mod tests { prereq_of: None, track_events: false, debug_events_until_date: None, + sampling_ratio: flag.sampling_ratio, + exclude_from_summaries: flag.exclude_from_summaries, }; summary.add(&fallthrough_request); diff --git a/launchdarkly-server-sdk/src/events/processor.rs b/launchdarkly-server-sdk/src/events/processor.rs index 636908b..9296602 100644 --- a/launchdarkly-server-sdk/src/events/processor.rs +++ b/launchdarkly-server-sdk/src/events/processor.rs @@ -55,9 +55,12 @@ pub struct EventProcessorImpl { impl EventProcessorImpl { pub fn new(events_configuration: EventsConfiguration) -> Result { let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity); - let mut dispatcher = EventDispatcher::new(events_configuration); + let dispatch_start = move || { + let mut dispatcher = EventDispatcher::new(events_configuration); + dispatcher.start(inbox_rx) + }; - match thread::Builder::new().spawn(move || dispatcher.start(inbox_rx)) { + match thread::Builder::new().spawn(dispatch_start) { Ok(_) => Ok(Self { inbox_tx, inbox_full_once: Once::new(), diff --git a/launchdarkly-server-sdk/src/lib.rs b/launchdarkly-server-sdk/src/lib.rs index af46c8a..06c791f 100644 --- a/launchdarkly-server-sdk/src/lib.rs +++ b/launchdarkly-server-sdk/src/lib.rs @@ -32,6 +32,7 @@ pub use data_source_builders::{ BuildError as DataSourceBuildError, PollingDataSourceBuilder, StreamingDataSourceBuilder, }; pub use evaluation::{FlagDetail, FlagDetailConfig}; +pub use events::event::MigrationOpEvent; pub use events::processor::EventProcessor; pub use events::processor_builders::{ BuildError as EventProcessorBuildError, EventProcessorBuilder, NullEventProcessorBuilder, @@ -40,6 +41,9 @@ pub use feature_requester_builders::{ BuildError as FeatureRequestBuilderError, FeatureRequesterFactory, }; pub use launchdarkly_server_sdk_evaluation::{Flag, Segment, Versioned}; +pub use migrations::{ + ExecutionOrder, MigrationOpTracker, Migrator, MigratorBuilder, Operation, Origin, Stage, +}; pub use service_endpoints::ServiceEndpointsBuilder; pub use stores::persistent_store::{PersistentDataStore, PersistentStoreError}; pub use stores::persistent_store_builders::{ @@ -56,7 +60,9 @@ mod evaluation; mod events; mod feature_requester; mod feature_requester_builders; +mod migrations; mod reqwest; +mod sampler; mod service_endpoints; mod stores; mod test_common; diff --git a/launchdarkly-server-sdk/src/migrations/migrator.rs b/launchdarkly-server-sdk/src/migrations/migrator.rs new file mode 100644 index 0000000..4e00268 --- /dev/null +++ b/launchdarkly-server-sdk/src/migrations/migrator.rs @@ -0,0 +1,1036 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Instant; + +use futures::future::join_all; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use launchdarkly_server_sdk_evaluation::Context; +use rand::thread_rng; +use serde::Serialize; + +use crate::sampler::Sampler; +use crate::sampler::ThreadRngSampler; +use crate::{Client, ExecutionOrder, MigrationOpTracker, Operation, Origin, Stage}; + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// An internally used struct to represent the result of a migration operation along with the +/// origin it was executed against. +pub struct MigrationOriginResult { + pub origin: Origin, + pub result: MigrationResult, +} + +/// MigrationResult represents the result of a migration operation. If the operation was +/// successful, the result will contain a pair of values representing the result of the operation +/// and the origin it was executed against. If the operation failed, the result will contain an +/// error. +type MigrationResult = Result; + +/// A write result contains the operation results against both the authoritative and +/// non-authoritative origins. +/// +/// Authoritative writes are always executed first. In the event of a failure, the +/// non-authoritative write will not be executed, resulting in a `None` value in the final +/// MigrationWriteResult. +pub struct MigrationWriteResult { + pub authoritative: MigrationOriginResult, + pub nonauthoritative: Option>, +} + +// MigrationComparisonFn is used to compare the results of two migration operations. If the +// provided results are equal, this method will return true and false otherwise. +type MigrationComparisonFn = fn(&T, &T) -> bool; + +struct MigrationConfig +where + P: Send + Sync, + T: Send + Sync, + FO: Fn(&P) -> BoxFuture> + Sync + Send, + FN: Fn(&P) -> BoxFuture> + Sync + Send, +{ + old: FO, + new: FN, + compare: Option>, + + _p: std::marker::PhantomData

, +} + +/// The migration builder is used to configure and construct an instance of a [Migrator]. This +/// migrator can be used to perform LaunchDarkly assisted technology migrations through the use of +/// migration-based feature flags. +pub struct MigratorBuilder +where + P: Send + Sync, + T: Send + Sync, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, +{ + client: Arc, + read_execution_order: ExecutionOrder, + measure_latency: bool, + measure_errors: bool, + + read_config: Option>, + write_config: Option>, +} + +impl MigratorBuilder +where + P: Send + Sync, + T: Send + Sync, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, +{ + /// Create a new migrator builder instance with the provided client. + pub fn new(client: Arc) -> Self { + MigratorBuilder { + client, + read_execution_order: ExecutionOrder::Concurrent, + measure_latency: true, + measure_errors: true, + read_config: None, + write_config: None, + } + } + + /// The read execution order influences the concurrency and execution order for read operations + /// involving multiple origins. + pub fn read_execution_order(mut self, order: ExecutionOrder) -> Self { + self.read_execution_order = order; + self + } + + /// Enable or disable latency tracking for migration operations. This latency information can + /// be sent upstream to LaunchDarkly to enhance migration visibility. + pub fn track_latency(mut self, measure: bool) -> Self { + self.measure_latency = measure; + self + } + + /// Enable or disable error tracking for migration operations. This error information can be + /// sent upstream to LaunchDarkly to enhance migration visibility. + pub fn track_errors(mut self, measure: bool) -> Self { + self.measure_errors = measure; + self + } + + /// Read can be used to configure the migration-read behavior of the resulting + /// [Migrator] instance. + /// + /// Users are required to provide two different read methods -- one to read from the old migration origin, and one + /// to read from the new origin. Additionally, users can opt-in to consistency tracking by providing a + /// comparison function. + /// + /// Depending on the migration stage, one or both of these read methods may be called. + pub fn read(mut self, old: FRO, new: FRN, compare: Option>) -> Self { + self.read_config = Some(MigrationConfig { + old, + new, + compare, + _p: std::marker::PhantomData, + }); + self + } + + /// Write can be used to configure the migration-write behavior of the resulting + /// [crate::Migrator] instance. + /// + /// Users are required to provide two different write methods -- one to write to the old + /// migration origin, and one to write to the new origin. Not every stage requires + /// + /// Depending on the migration stage, one or both of these write methods may be called. + pub fn write(mut self, old: FWO, new: FWN) -> Self { + self.write_config = Some(MigrationConfig { + old, + new, + compare: None, + _p: std::marker::PhantomData, + }); + self + } + + /// Build constructs a [crate::Migrator] instance to support migration-based reads and + /// writes. A string describing any failure conditions will be returned if the build fails. + pub fn build(self) -> Result, String> { + let read_config = self.read_config.ok_or("read configuration not provided")?; + let write_config = self + .write_config + .ok_or("write configuration not provided")?; + + Ok(Migrator::new( + self.client, + self.read_execution_order, + self.measure_latency, + self.measure_errors, + read_config, + write_config, + )) + } +} + +/// The migrator is the primary interface for executing migration operations. It is configured +/// through the [MigratorBuilder] and can be used to perform LaunchDarkly assisted technology +/// migrations through the use of migration-based feature flags. +pub struct Migrator +where + P: Send + Sync, + T: Send + Sync, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, +{ + client: Arc, + read_execution_order: ExecutionOrder, + measure_latency: bool, + measure_errors: bool, + read_config: MigrationConfig, + write_config: MigrationConfig, + sampler: Box, +} + +impl Migrator +where + P: Send + Sync, + T: Send + Sync, + FRO: Fn(&P) -> BoxFuture> + Sync + Send, + FRN: Fn(&P) -> BoxFuture> + Sync + Send, + FWO: Fn(&P) -> BoxFuture> + Sync + Send, + FWN: Fn(&P) -> BoxFuture> + Sync + Send, +{ + fn new( + client: Arc, + read_execution_order: ExecutionOrder, + measure_latency: bool, + measure_errors: bool, + read_config: MigrationConfig, + write_config: MigrationConfig, + ) -> Self { + Migrator { + client, + read_execution_order, + measure_latency, + measure_errors, + read_config, + write_config, + sampler: Box::new(ThreadRngSampler::new(thread_rng())), + } + } + + /// Uses the provided flag key and context to execute a migration-backed read operation. + pub async fn read( + &mut self, + context: &Context, + flag_key: String, + default_stage: Stage, + payload: P, + ) -> MigrationOriginResult { + let (stage, tracker) = self + .client + .migration_variation(context, &flag_key, default_stage); + + if let Ok(mut tracker) = tracker.lock() { + tracker.operation(Operation::Read); + } else { + error!("Failed to acquire tracker lock. Cannot track migration write."); + } + + let mut old = Executor { + origin: Origin::Old, + function: &self.read_config.old, + tracker: tracker.clone(), + measure_latency: self.measure_latency, + measure_errors: self.measure_errors, + payload: &payload, + }; + let mut new = Executor { + origin: Origin::New, + function: &self.read_config.new, + tracker: tracker.clone(), + measure_latency: self.measure_latency, + measure_errors: self.measure_errors, + payload: &payload, + }; + + let result = match stage { + Stage::Off => old.run().await, + Stage::DualWrite => old.run().await, + Stage::Shadow => { + read_both( + old, + new, + self.read_config.compare, + self.read_execution_order, + tracker.clone(), + self.sampler.as_mut(), + ) + .await + } + Stage::Live => { + read_both( + new, + old, + self.read_config.compare, + self.read_execution_order, + tracker.clone(), + self.sampler.as_mut(), + ) + .await + } + Stage::Rampdown => new.run().await, + Stage::Complete => new.run().await, + }; + + self.client.track_migration_op(tracker); + + result + } + + /// Uses the provided flag key and context to execute a migration-backed write operation. + pub async fn write( + &mut self, + context: &Context, + flag_key: String, + default_stage: Stage, + payload: P, + ) -> MigrationWriteResult { + let (stage, tracker) = self + .client + .migration_variation(context, &flag_key, default_stage); + + if let Ok(mut tracker) = tracker.lock() { + tracker.operation(Operation::Write); + } else { + error!("Failed to acquire tracker lock. Cannot track migration write."); + } + + let mut old = Executor { + origin: Origin::Old, + function: &self.write_config.old, + tracker: tracker.clone(), + measure_latency: self.measure_latency, + measure_errors: self.measure_errors, + payload: &payload, + }; + let mut new = Executor { + origin: Origin::New, + function: &self.write_config.new, + tracker: tracker.clone(), + measure_latency: self.measure_latency, + measure_errors: self.measure_errors, + payload: &payload, + }; + + let result = match stage { + Stage::Off => MigrationWriteResult { + authoritative: old.run().await, + nonauthoritative: None, + }, + Stage::DualWrite => write_both(old, new).await, + Stage::Shadow => write_both(old, new).await, + Stage::Live => write_both(new, old).await, + Stage::Rampdown => write_both(new, old).await, + Stage::Complete => MigrationWriteResult { + authoritative: new.run().await, + nonauthoritative: None, + }, + }; + + self.client.track_migration_op(tracker); + + result + } +} + +async fn read_both( + mut authoritative: Executor<'_, P, T, FA>, + mut nonauthoritative: Executor<'_, P, T, FB>, + compare: Option>, + execution_order: ExecutionOrder, + tracker: Arc>, + sampler: &mut dyn Sampler, +) -> MigrationOriginResult +where + P: Send + Sync, + T: Send + Sync, + FA: Fn(&P) -> BoxFuture> + Sync + Send, + FB: Fn(&P) -> BoxFuture> + Sync + Send, +{ + let authoritative_result: MigrationOriginResult; + let nonauthoritative_result: MigrationOriginResult; + + match execution_order { + ExecutionOrder::Concurrent => { + let auth_handle = authoritative.run().boxed(); + let nonauth_handle = nonauthoritative.run().boxed(); + let handles = vec![auth_handle, nonauth_handle]; + + let mut results = join_all(handles).await; + + // Note that we are doing this is the reverse order of the handles since we are + // popping the results off the end of the vector. + nonauthoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult { + origin: nonauthoritative.origin, + result: Err("Failed to execute non-authoritative read".into()), + }); + + authoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult { + origin: authoritative.origin, + result: Err("Failed to execute authoritative read".into()), + }); + } + ExecutionOrder::Random if sampler.sample(2) => { + nonauthoritative_result = nonauthoritative.run().await; + authoritative_result = authoritative.run().await; + } + _ => { + authoritative_result = authoritative.run().await; + nonauthoritative_result = nonauthoritative.run().await; + } + }; + + if let Some(compare) = compare { + if let (Ok(authoritative), Ok(nonauthoritative)) = ( + &authoritative_result.result, + &nonauthoritative_result.result, + ) { + if let Ok(mut tracker) = tracker.lock() { + tracker.consistent(|| compare(authoritative, nonauthoritative)); + } else { + error!("Failed to acquire tracker lock. Cannot track consistency."); + } + } + } + + authoritative_result +} + +async fn write_both( + mut authoritative: Executor<'_, P, T, FA>, + mut nonauthoritative: Executor<'_, P, T, FB>, +) -> MigrationWriteResult +where + P: Send + Sync, + T: Send + Sync, + FA: Fn(&P) -> BoxFuture> + Sync + Send, + FB: Fn(&P) -> BoxFuture> + Sync + Send, +{ + let authoritative_result = authoritative.run().await; + + if authoritative_result.result.is_err() { + return MigrationWriteResult { + authoritative: authoritative_result, + nonauthoritative: None, + }; + } + + let nonauthoritative_result = nonauthoritative.run().await; + + MigrationWriteResult { + authoritative: authoritative_result, + nonauthoritative: Some(nonauthoritative_result), + } +} + +struct Executor<'a, P, T, F> +where + P: Send + Sync, + T: Send + Sync, + F: Fn(&P) -> BoxFuture> + Sync + Send, +{ + origin: Origin, + function: &'a F, + tracker: Arc>, + measure_latency: bool, + measure_errors: bool, + payload: &'a P, +} + +impl<'a, P, T, F> Executor<'a, P, T, F> +where + P: Send + Sync, + T: Send + Sync, + F: Fn(&P) -> BoxFuture> + Sync + Send, +{ + async fn run(&mut self) -> MigrationOriginResult { + let start = Instant::now(); + let result = (self.function)(self.payload).await; + let elapsed = start.elapsed(); + + let result = match self.tracker.lock() { + Ok(mut tracker) => { + if self.measure_latency { + tracker.latency(self.origin, elapsed); + } + + if self.measure_errors && result.is_err() { + tracker.error(self.origin); + } + + tracker.invoked(self.origin); + + result + } + Err(_) => Err("Failed to acquire lock".into()), + }; + + MigrationOriginResult { + origin: self.origin, + result, + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{mpsc, Arc}, + time::{Duration, Instant}, + }; + + use crate::{ + migrations::migrator::MigratorBuilder, Client, ConfigBuilder, ExecutionOrder, Stage, + }; + use futures::future::FutureExt; + use launchdarkly_server_sdk_evaluation::ContextBuilder; + use test_case::test_case; + + #[test] + fn can_build_successfully() { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + let migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .read( + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + Some(|_, _| true), + ) + .write( + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + ) + .build(); + + assert!(migrator.is_ok()); + } + + #[tokio::test] + async fn read_passes_payload_through() { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let (sender, receiver) = mpsc::channel(); + let old_sender = sender.clone(); + let new_sender = sender.clone(); + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .write( + |_| async move { Ok(0) }.boxed(), + |_| async move { Ok(0) }.boxed(), + ) + .read_execution_order(ExecutionOrder::Serial) + .read( + move |&payload| { + let old_sender = old_sender.clone(); + async move { + old_sender.send(payload).unwrap(); + Ok(0) + } + .boxed() + }, + move |&payload| { + let new_sender = new_sender.clone(); + async move { + new_sender.send(payload).unwrap(); + Ok(0) + } + .boxed() + }, + None, + ) + .build() + .expect("migrator failed to build"); + + let _result = migrator + .read( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + crate::Stage::Shadow, + 1, + ) + .await; + + let old_payload = receiver.recv().unwrap(); + let new_payload = receiver.recv().unwrap(); + + assert_eq!(old_payload, 1); + assert_eq!(new_payload, 1); + } + + #[tokio::test] + async fn write_passes_payload_through() { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let (sender, receiver) = mpsc::channel(); + let old_sender = sender.clone(); + let new_sender = sender.clone(); + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .read( + |_| async move { Ok(0) }.boxed(), + |_| async move { Ok(0) }.boxed(), + Some(|_, _| true), + ) + .write( + move |&payload| { + let old_sender = old_sender.clone(); + async move { + old_sender.send(payload).unwrap(); + Ok(0) + } + .boxed() + }, + move |&payload| { + let new_sender = new_sender.clone(); + async move { + new_sender.send(payload).unwrap(); + Ok(0) + } + .boxed() + }, + ) + .build() + .expect("migrator failed to build"); + + let _result = migrator + .write( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + crate::Stage::Shadow, + 1, + ) + .await; + + let old_payload = receiver.recv().unwrap(); + let new_payload = receiver.recv().unwrap(); + + assert_eq!(old_payload, 1); + assert_eq!(new_payload, 1); + } + + #[tokio::test] + async fn read_handles_correct_origin() { + read_handles_correct_origin_driver(Stage::Off, true, false).await; + read_handles_correct_origin_driver(Stage::DualWrite, true, false).await; + read_handles_correct_origin_driver(Stage::Shadow, true, true).await; + read_handles_correct_origin_driver(Stage::Live, true, true).await; + read_handles_correct_origin_driver(Stage::Rampdown, false, true).await; + read_handles_correct_origin_driver(Stage::Complete, false, true).await; + } + + async fn read_handles_correct_origin_driver( + stage: Stage, + expected_old: bool, + expected_new: bool, + ) { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let (sender, receiver) = mpsc::channel(); + let old_sender = sender.clone(); + let new_sender = sender.clone(); + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .write( + |_| async move { Ok("write") }.boxed(), + |_| async move { Ok("write") }.boxed(), + ) + .read_execution_order(ExecutionOrder::Serial) + .read( + move |_| { + let old_sender = old_sender.clone(); + async move { + old_sender.send("old").unwrap(); + Ok("read") + } + .boxed() + }, + move |_| { + let new_sender = new_sender.clone(); + async move { + new_sender.send("new").unwrap(); + Ok("read") + } + .boxed() + }, + None, + ) + .build() + .expect("migrator failed to build"); + + let _result = migrator + .read( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + stage, + "payload", + ) + .await; + + let payloads = receiver.try_iter().collect::>(); + + if expected_old { + assert!(payloads.contains(&"old")); + } else { + assert!(!payloads.contains(&"old")); + } + + if expected_new { + assert!(payloads.contains(&"new")); + } else { + assert!(!payloads.contains(&"new")); + } + } + + #[tokio::test] + async fn read_handles_concurrent_execution() { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .write( + |_| async move { Ok(()) }.boxed(), + |_| async move { Ok(()) }.boxed(), + ) + .read_execution_order(ExecutionOrder::Concurrent) + .read( + |_| { + async move { + async_std::task::sleep(std::time::Duration::from_millis(250)).await; + Ok(()) + } + .boxed() + }, + |_| { + async move { + async_std::task::sleep(std::time::Duration::from_millis(250)).await; + Ok(()) + } + .boxed() + }, + None, + ) + .build() + .expect("migrator failed to build"); + + let start = Instant::now(); + let _result = migrator + .read( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + crate::Stage::Shadow, + (), + ) + .await; + let elapsed = start.elapsed(); + assert!(elapsed < Duration::from_millis(500)); + } + + #[tokio::test] + async fn read_handles_nonconcurrent_execution() { + read_handles_nonconcurrent_execution_driver(ExecutionOrder::Serial).await; + read_handles_nonconcurrent_execution_driver(ExecutionOrder::Random).await; + } + + async fn read_handles_nonconcurrent_execution_driver(execution_order: ExecutionOrder) { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .write( + |_| async move { Ok(()) }.boxed(), + |_| async move { Ok(()) }.boxed(), + ) + .read_execution_order(execution_order) + .read( + |_| { + async move { + std::thread::sleep(std::time::Duration::from_millis(250)); + Ok(()) + } + .boxed() + }, + |_| { + async move { + std::thread::sleep(std::time::Duration::from_millis(250)); + Ok(()) + } + .boxed() + }, + None, + ) + .build() + .expect("migrator failed to build"); + + let start = Instant::now(); + let _result = migrator + .read( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + crate::Stage::Shadow, + (), + ) + .await; + let elapsed = start.elapsed(); + assert!(elapsed >= Duration::from_millis(500)); + } + + #[tokio::test] + async fn write_handles_correct_origin() { + write_handles_correct_origin_driver(Stage::Off, true, false).await; + write_handles_correct_origin_driver(Stage::DualWrite, true, true).await; + write_handles_correct_origin_driver(Stage::Shadow, true, true).await; + write_handles_correct_origin_driver(Stage::Live, true, true).await; + write_handles_correct_origin_driver(Stage::Rampdown, true, true).await; + write_handles_correct_origin_driver(Stage::Complete, false, true).await; + } + + async fn write_handles_correct_origin_driver( + stage: Stage, + expected_old: bool, + expected_new: bool, + ) { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let (sender, receiver) = mpsc::channel(); + let old_sender = sender.clone(); + let new_sender = sender.clone(); + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .read( + |_| async move { Ok(()) }.boxed(), + |_| async move { Ok(()) }.boxed(), + Some(|_, _| true), + ) + .write( + move |_| { + let old_sender = old_sender.clone(); + async move { + old_sender.send("old").unwrap(); + Ok(()) + } + .boxed() + }, + move |_| { + let new_sender = new_sender.clone(); + async move { + new_sender.send("new").unwrap(); + Ok(()) + } + .boxed() + }, + ) + .build() + .expect("migrator failed to build"); + + let _result = migrator + .write( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + stage, + (), + ) + .await; + + let payloads = receiver.try_iter().collect::>(); + + if expected_old { + assert!(payloads.contains(&"old")); + } else { + assert!(!payloads.contains(&"old")); + } + + if expected_new { + assert!(payloads.contains(&"new")); + } else { + assert!(!payloads.contains(&"new")); + } + } + + #[tokio::test] + async fn write_stops_if_authoritative_fails() { + // doesn't write to new if old fails + // write_stops_if_authoritative_fails_driver(Stage::Off, true, false).await; + + write_stops_if_authoritative_fails_driver(Stage::DualWrite, true, false).await; + write_stops_if_authoritative_fails_driver(Stage::Shadow, true, false).await; + write_stops_if_authoritative_fails_driver(Stage::Live, false, true).await; + write_stops_if_authoritative_fails_driver(Stage::Rampdown, false, true).await; + + // doesn't write to old if new fails + // write_stops_if_authoritative_fails_driver(Stage::Complete, false, true).await; + } + + async fn write_stops_if_authoritative_fails_driver( + stage: Stage, + expected_old: bool, + expected_new: bool, + ) { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + client.start_with_default_executor(); + + let (sender, receiver) = mpsc::channel(); + let old_sender = sender.clone(); + let new_sender = sender.clone(); + let mut migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .read( + |_| async move { Ok(()) }.boxed(), + |_| async move { Ok(()) }.boxed(), + Some(|_, _| true), + ) + .write( + move |_| { + let old_sender = old_sender.clone(); + async move { + old_sender.send("old").unwrap(); + Err("error".into()) + } + .boxed() + }, + move |_| { + let new_sender = new_sender.clone(); + async move { + new_sender.send("new").unwrap(); + Err("error".into()) + } + .boxed() + }, + ) + .build() + .expect("migrator failed to build"); + + let _result = migrator + .write( + &ContextBuilder::new("user-key") + .build() + .expect("context failed to build"), + "migration-key".into(), + stage, + (), + ) + .await; + + let payloads = receiver.try_iter().collect::>(); + + if expected_old { + assert!(payloads.contains(&"old")); + } else { + assert!(!payloads.contains(&"old")); + } + + if expected_new { + assert!(payloads.contains(&"new")); + } else { + assert!(!payloads.contains(&"new")); + } + } + + #[test_case(ExecutionOrder::Serial)] + #[test_case(ExecutionOrder::Random)] + #[test_case(ExecutionOrder::Concurrent)] + fn can_modify_execution_order(execution_order: ExecutionOrder) { + let config = ConfigBuilder::new("sdk-key") + .offline(true) + .build() + .expect("config failed to build"); + + let client = Arc::new(Client::build(config).expect("client failed to build")); + let migrator = MigratorBuilder::new(client) + .track_latency(false) + .track_errors(false) + .read( + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + Some(|_, _| true), + ) + .write( + |_: &u32| async move { Ok(()) }.boxed(), + |_: &u32| async move { Ok(()) }.boxed(), + ) + .read_execution_order(execution_order) + .build(); + + assert!(migrator.is_ok()); + } +} diff --git a/launchdarkly-server-sdk/src/migrations/mod.rs b/launchdarkly-server-sdk/src/migrations/mod.rs new file mode 100644 index 0000000..91f9541 --- /dev/null +++ b/launchdarkly-server-sdk/src/migrations/mod.rs @@ -0,0 +1,111 @@ +use core::fmt; +use std::fmt::{Display, Formatter}; + +use launchdarkly_server_sdk_evaluation::FlagValue; +use serde::{Deserialize, Serialize}; + +#[non_exhaustive] +#[derive(Debug, Copy, Clone, Serialize, Eq, Hash, PartialEq)] +#[serde(rename_all = "lowercase")] +/// Origin represents the source of origin for a migration-related operation. +pub enum Origin { + /// Old represents the technology source we are migrating away from. + Old, + /// New represents the technology source we are migrating towards. + New, +} + +#[non_exhaustive] +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +/// Operation represents a type of migration operation; namely, read or write. +pub enum Operation { + /// Read denotes a read-related migration operation. + Read, + /// Write denotes a write-related migration operation. + Write, +} + +#[non_exhaustive] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +/// Stage denotes one of six possible stages a technology migration could be a +/// part of, progressing through the following order. +/// +/// Off -> DualWrite -> Shadow -> Live -> RampDown -> Complete +pub enum Stage { + /// Off - migration hasn't started, "old" is authoritative for reads and writes + Off, + /// DualWrite - write to both "old" and "new", "old" is authoritative for reads + DualWrite, + /// Shadow - both "new" and "old" versions run with a preference for "old" + Shadow, + /// Live - both "new" and "old" versions run with a preference for "new" + Live, + /// RampDown - only read from "new", write to "old" and "new" + Rampdown, + /// Complete - migration is done + Complete, +} + +impl Display for Stage { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Stage::Off => write!(f, "off"), + Stage::DualWrite => write!(f, "dualwrite"), + Stage::Shadow => write!(f, "shadow"), + Stage::Live => write!(f, "live"), + Stage::Rampdown => write!(f, "rampdown"), + Stage::Complete => write!(f, "complete"), + } + } +} + +impl From for FlagValue { + fn from(stage: Stage) -> FlagValue { + FlagValue::Str(stage.to_string()) + } +} + +impl TryFrom for Stage { + type Error = String; + + fn try_from(value: FlagValue) -> Result { + if let FlagValue::Str(value) = value { + match value.as_str() { + "off" => Ok(Stage::Off), + "dualwrite" => Ok(Stage::DualWrite), + "shadow" => Ok(Stage::Shadow), + "live" => Ok(Stage::Live), + "rampdown" => Ok(Stage::Rampdown), + "complete" => Ok(Stage::Complete), + _ => Err(format!("Invalid stage: {}", value)), + } + } else { + Err("Cannot convert non-string value to Stage".to_string()) + } + } +} + +#[non_exhaustive] +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +/// ExecutionOrder represents the various execution modes this SDK can operate under while +/// performing migration-assisted reads. +pub enum ExecutionOrder { + /// Serial execution ensures the authoritative read will always complete execution before + /// executing the non-authoritative read. + Serial, + /// Random execution randomly decides if the authoritative read should execute first or second. + Random, + /// Concurrent executes both concurrently, waiting until both calls have finished before + /// proceeding. + Concurrent, +} + +pub use migrator::Migrator; +pub use migrator::MigratorBuilder; +pub use tracker::MigrationOpTracker; + +mod migrator; +mod tracker; diff --git a/launchdarkly-server-sdk/src/migrations/tracker.rs b/launchdarkly-server-sdk/src/migrations/tracker.rs new file mode 100644 index 0000000..2ff747f --- /dev/null +++ b/launchdarkly-server-sdk/src/migrations/tracker.rs @@ -0,0 +1,404 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use launchdarkly_server_sdk_evaluation::{Context, Detail, Flag}; +use rand::thread_rng; + +use crate::{ + events::event::{BaseEvent, EventFactory, MigrationOpEvent}, + sampler::{Sampler, ThreadRngSampler}, +}; + +use super::{Operation, Origin, Stage}; + +/// A MigrationOpTracker is responsible for managing the collection of measurements that a user +/// might wish to record throughout a migration-assisted operation. +/// +/// Example measurements include latency, errors, and consistency. +pub struct MigrationOpTracker { + key: String, + flag: Option, + context: Context, + detail: Detail, + default_stage: Stage, + operation: Option, + invoked: HashSet, + consistent: Option, + consistent_ratio: Option, + errors: HashSet, + latencies: HashMap, +} + +impl MigrationOpTracker { + pub(crate) fn new( + key: String, + flag: Option, + context: Context, + detail: Detail, + default_stage: Stage, + ) -> Self { + let consistent_ratio = match &flag { + Some(f) => f + .migration_settings + .as_ref() + .map(|s| s.check_ratio.unwrap_or(1)), + None => None, + }; + + Self { + key, + flag, + context, + detail, + default_stage, + operation: None, + invoked: HashSet::new(), + consistent: None, + consistent_ratio, + errors: HashSet::new(), + latencies: HashMap::new(), + } + } + + /// Sets the migration related operation associated with these tracking measurements. + pub fn operation(&mut self, operation: Operation) { + self.operation = Some(operation); + } + + /// Allows recording which origins were called during a migration. + pub fn invoked(&mut self, origin: Origin) { + self.invoked.insert(origin); + } + + /// This method accepts a callable which should take no parameters and return a single boolean + /// to represent the consistency check results for a read operation. + /// + /// A callable is provided in case sampling rules do not require consistency checking to run. + /// In this case, we can avoid the overhead of a function by not using the callable. + pub fn consistent(&mut self, is_consistent: impl Fn() -> bool) { + if ThreadRngSampler::new(thread_rng()).sample(self.consistent_ratio.unwrap_or(1)) { + self.consistent = Some(is_consistent()); + } + } + + /// Allows recording which origins were called during a migration. + pub fn error(&mut self, origin: Origin) { + self.errors.insert(origin); + } + + /// Allows tracking the recorded latency for an individual operation. + pub fn latency(&mut self, origin: Origin, latency: Duration) { + if latency.is_zero() { + return; + } + + self.latencies.insert(origin, latency); + } + + /// Creates an instance of [crate::MigrationOpEvent]. This event data can be + /// provided to the [crate::Client::track_migration_op] method to rely this metric + /// information upstream to LaunchDarkly services. + pub fn build(&self) -> Result { + let operation = self + .operation + .ok_or_else(|| "operation not provided".to_string())?; + + self.check_invoked_consistency()?; + + if self.key.is_empty() { + return Err("operation cannot contain an empty key".to_string()); + } + + let invoked = self.invoked.clone(); + if invoked.is_empty() { + return Err("no origins were invoked".to_string()); + } + + Ok(MigrationOpEvent { + base: BaseEvent::new(EventFactory::now(), self.context.clone()), + key: self.key.clone(), + version: self.flag.as_ref().map(|f| f.version), + operation, + default_stage: self.default_stage, + evaluation: self.detail.clone(), + invoked, + consistency_check_ratio: self.consistent_ratio, + consistency_check: self.consistent, + errors: self.errors.clone(), + latency: self.latencies.clone(), + sampling_ratio: self.flag.as_ref().and_then(|f| f.sampling_ratio), + }) + } + + fn check_invoked_consistency(&self) -> Result<(), String> { + for origin in [Origin::Old, Origin::New].iter() { + if self.invoked.contains(origin) { + continue; + } + + if self.errors.contains(origin) { + return Err(format!( + "provided error for origin {:?} without recording invocation", + origin + )); + } + + if self.latencies.contains_key(origin) { + return Err(format!( + "provided latency for origin {:?} without recording invocation", + origin + )); + } + } + + if self.consistent.is_some() && self.invoked.len() != 2 { + return Err("provided consistency without recording both invocations".to_string()); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use launchdarkly_server_sdk_evaluation::{ + ContextBuilder, Detail, Flag, MigrationFlagParameters, Reason, + }; + use test_case::test_case; + + use super::{MigrationOpTracker, Operation, Origin, Stage}; + use crate::test_common::basic_flag; + + fn minimal_tracker(flag: Flag) -> MigrationOpTracker { + let mut tracker = MigrationOpTracker::new( + flag.key.clone(), + Some(flag), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(Stage::Live), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + tracker + } + + #[test] + fn build_minimal_tracker() { + let tracker = minimal_tracker(basic_flag("flag-key")); + let result = tracker.build(); + + assert!(result.is_ok()); + } + + #[test] + fn build_without_flag() { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.flag = None; + let result = tracker.build(); + + assert!(result.is_ok()); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_invocations_individually(origin: Origin) { + let mut tracker = MigrationOpTracker::new( + "flag-key".into(), + Some(basic_flag("flag-key")), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(Stage::Live), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(origin); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.invoked.len(), 1); + assert!(event.invoked.contains(&origin)); + } + + #[test] + fn tracks_both_invocations() { + let mut tracker = MigrationOpTracker::new( + "flag-key".into(), + Some(basic_flag("flag-key")), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(Stage::Live), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.invoked.len(), 2); + assert!(event.invoked.contains(&Origin::Old)); + assert!(event.invoked.contains(&Origin::New)); + } + + #[test_case(false)] + #[test_case(true)] + fn tracks_consistency(expectation: bool) { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.operation(Operation::Read); + tracker.consistent(|| expectation); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.consistency_check, Some(expectation)); + assert_eq!(event.consistency_check_ratio, None); + } + + #[test_case(false)] + #[test_case(true)] + fn consistency_can_be_disabled_through_sampling_ratio(expectation: bool) { + let mut flag = basic_flag("flag-key"); + flag.migration_settings = Some(MigrationFlagParameters { + check_ratio: Some(0), + }); + + let mut tracker = minimal_tracker(flag); + tracker.operation(Operation::Read); + tracker.consistent(|| expectation); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.consistency_check, None); + assert_eq!(event.consistency_check_ratio, Some(0)); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_errors_individually(origin: Origin) { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.error(origin); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.errors.len(), 1); + assert!(event.errors.contains(&origin)); + } + + #[test] + fn tracks_both_errors() { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.error(Origin::Old); + tracker.error(Origin::New); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.errors.len(), 2); + assert!(event.errors.contains(&Origin::Old)); + assert!(event.errors.contains(&Origin::New)); + } + + #[test_case(Origin::Old)] + #[test_case(Origin::New)] + fn track_latencies_individually(origin: Origin) { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.latency(origin, std::time::Duration::from_millis(100)); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.latency.len(), 1); + assert_eq!( + event.latency.get(&origin), + Some(&std::time::Duration::from_millis(100)) + ); + } + + #[test] + fn track_both_latencies() { + let mut tracker = minimal_tracker(basic_flag("flag-key")); + tracker.latency(Origin::Old, std::time::Duration::from_millis(100)); + tracker.latency(Origin::New, std::time::Duration::from_millis(200)); + + let event = tracker.build().expect("failed to build event"); + assert_eq!(event.latency.len(), 2); + assert_eq!( + event.latency.get(&Origin::Old), + Some(&std::time::Duration::from_millis(100)) + ); + assert_eq!( + event.latency.get(&Origin::New), + Some(&std::time::Duration::from_millis(200)) + ); + } + + #[test] + fn fails_without_calling_invocations() { + let mut tracker = MigrationOpTracker::new( + "flag-key".into(), + Some(basic_flag("flag-key")), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(Stage::Live), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.operation(Operation::Read); + + let failure = tracker + .build() + .expect_err("tracker should have failed to build event"); + + assert_eq!(failure, "no origins were invoked"); + } + + #[test] + fn fails_without_operation() { + let mut tracker = MigrationOpTracker::new( + "flag-key".into(), + Some(basic_flag("flag-key")), + ContextBuilder::new("user") + .build() + .expect("failed to build context"), + Detail { + value: Some(Stage::Live), + variation_index: Some(1), + reason: Reason::Fallthrough { + in_experiment: false, + }, + }, + Stage::Live, + ); + tracker.invoked(Origin::Old); + tracker.invoked(Origin::New); + + let failure = tracker + .build() + .expect_err("tracker should have failed to build event"); + + assert_eq!(failure, "operation not provided"); + } +} diff --git a/launchdarkly-server-sdk/src/sampler.rs b/launchdarkly-server-sdk/src/sampler.rs new file mode 100644 index 0000000..1fe96d5 --- /dev/null +++ b/launchdarkly-server-sdk/src/sampler.rs @@ -0,0 +1,57 @@ +use rand::Rng; + +pub trait Sampler { + fn sample(&mut self, ratio: u32) -> bool; +} + +pub struct ThreadRngSampler { + rng: R, +} + +impl ThreadRngSampler { + pub fn new(rng: R) -> Self { + ThreadRngSampler { rng } + } +} + +impl Sampler for ThreadRngSampler { + fn sample(&mut self, ratio: u32) -> bool { + if ratio == 0 { + return false; + } + + if ratio == 1 { + return true; + } + + self.rng.gen_ratio(1, ratio) + } +} + +#[cfg(test)] +mod tests { + use rand::{rngs::StdRng, SeedableRng}; + + use super::*; + + #[test] + fn test_zero_is_false() { + let mut sampler = ThreadRngSampler::new(rand::thread_rng()); + assert!(!sampler.sample(0)); + } + + #[test] + fn test_one_is_true() { + let mut sampler = ThreadRngSampler::new(rand::thread_rng()); + assert!(sampler.sample(1)); + } + + #[test] + fn test_can_affect_sampling_ratio() { + let rng = StdRng::seed_from_u64(0); + let mut sampler = ThreadRngSampler::new(rng); + let sampled_size = (0..1_000).filter(|_| sampler.sample(10)).count(); + + assert_eq!(sampled_size, 110); + } +} diff --git a/launchdarkly-server-sdk/src/test_common.rs b/launchdarkly-server-sdk/src/test_common.rs index 35900ff..475611b 100644 --- a/launchdarkly-server-sdk/src/test_common.rs +++ b/launchdarkly-server-sdk/src/test_common.rs @@ -2,6 +2,8 @@ use launchdarkly_server_sdk_evaluation::{Flag, Segment}; +use crate::Stage; + pub const FLOAT_TO_INT_MAX: i64 = 9007199254740991; pub fn basic_flag(key: &str) -> Flag { @@ -98,6 +100,39 @@ pub fn basic_int_flag(key: &str) -> Flag { .unwrap() } +pub fn basic_migration_flag(key: &str, stage: Stage) -> Flag { + let variation_index = match stage { + Stage::Off => 0, + Stage::DualWrite => 1, + Stage::Shadow => 2, + Stage::Live => 3, + Stage::Rampdown => 4, + Stage::Complete => 5, + }; + + serde_json::from_str(&format!( + r#"{{ + "key": {}, + "version": 42, + "on": true, + "targets": [], + "rules": [], + "prerequisites": [], + "fallthrough": {{"variation": {}}}, + "offVariation": 0, + "variations": ["off", "dualwrite", "shadow", "live", "rampdown", "complete"], + "clientSideAvailability": {{ + "usingMobileKey": false, + "usingEnvironmentId": false + }}, + "salt": "kosher" + }}"#, + serde_json::Value::String(key.to_string()), + variation_index + )) + .unwrap() +} + pub fn basic_segment(key: &str) -> Segment { serde_json::from_str(&format!( r#"{{