Skip to content

Commit

Permalink
feat: Add support for migrations (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 authored Jul 19, 2024
1 parent 9d1fbd2 commit 445ab74
Show file tree
Hide file tree
Showing 17 changed files with 2,954 additions and 67 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Run CI
on:
push:
branches: [ main ]
branches: [ main, 'feat/**' ]
paths-ignore:
- '**.md' # Do not need to run CI for markdown changes.
pull_request:
branches: [ main ]
branches: [ main, 'feat/**' ]
paths-ignore:
- '**.md'

Expand Down Expand Up @@ -57,4 +57,4 @@ jobs:
run: sudo apt-get update && sudo apt-get install -y musl-tools

- name: Build
run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl
run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl -p launchdarkly-server-sdk
2 changes: 2 additions & 0 deletions contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ futures = "0.3.12"
hyper = { version = "0.14.19", features = ["client"] }
hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]}
hyper-tls = { version = "0.5.0", optional = true }
reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] }
async-mutex = "1.4.0"

[features]
default = ["rustls"]
Expand Down
168 changes: 163 additions & 5 deletions contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use launchdarkly_server_sdk::{Context, ContextBuilder, MultiContextBuilder, Reference};
use futures::future::FutureExt;
use launchdarkly_server_sdk::{
Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference,
};
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_POLLING_BASE_URL: &str = "https://sdk.launchdarkly.com";
Expand All @@ -12,7 +16,8 @@ use launchdarkly_server_sdk::{
};

use crate::command_params::{
ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, SecureModeHashResponse,
ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse,
MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse,
};
use crate::HttpsConnector;
use crate::{
Expand All @@ -24,7 +29,7 @@ use crate::{
};

pub struct ClientEntity {
client: Client,
client: Arc<Client>,
}

impl ClientEntity {
Expand Down Expand Up @@ -131,10 +136,15 @@ impl ClientEntity {
client.start_with_default_executor();
client.wait_for_initialization(Duration::from_secs(5)).await;

Ok(Self { client })
Ok(Self {
client: Arc::new(client),
})
}

pub fn do_command(&self, command: CommandParams) -> Result<Option<CommandResponse>, String> {
pub async fn do_command(
&self,
command: CommandParams,
) -> Result<Option<CommandResponse>, String> {
match command.command.as_str() {
"evaluate" => Ok(Some(CommandResponse::EvaluateFlag(
self.evaluate(command.evaluate.ok_or("Evaluate params should be set")?),
Expand Down Expand Up @@ -211,6 +221,132 @@ impl ClientEntity {
},
)))
}
"migrationVariation" => {
let params = command
.migration_variation
.ok_or("migrationVariation params should be set")?;

let (stage, _) = self.client.migration_variation(
&params.context,
&params.key,
params.default_stage,
);

Ok(Some(CommandResponse::MigrationVariation(
MigrationVariationResponse { result: stage },
)))
}
"migrationOperation" => {
let params = command
.migration_operation
.ok_or("migrationOperation params should be set")?;

let mut builder = MigratorBuilder::new(self.client.clone());

builder = builder
.read_execution_order(params.read_execution_order)
.track_errors(params.track_errors)
.track_latency(params.track_latency)
.read(
|payload: &Option<String>| {
let old_endpoint = params.old_endpoint.clone();
async move {
let result = send_payload(&old_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
|payload| {
let new_endpoint = params.new_endpoint.clone();
async move {
let result = send_payload(&new_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
if params.track_consistency {
Some(|lhs, rhs| lhs == rhs)
} else {
None
},
)
.write(
|payload| {
let old_endpoint = params.old_endpoint.clone();
async move {
let result = send_payload(&old_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
|payload| {
let new_endpoint = params.new_endpoint.clone();
async move {
let result = send_payload(&new_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
);

let mut migrator = builder.build().expect("builder failed");
match params.operation {
launchdarkly_server_sdk::Operation::Read => {
let result = migrator
.read(
&params.context,
params.key,
params.default_stage,
params.payload,
)
.await;

let payload = match result.result {
Ok(payload) => payload.unwrap_or_else(|| "success".into()),
Err(e) => e.to_string(),
};

Ok(Some(CommandResponse::MigrationOperation(
MigrationOperationResponse { result: payload },
)))
}
launchdarkly_server_sdk::Operation::Write => {
let result = migrator
.write(
&params.context,
params.key,
params.default_stage,
params.payload,
)
.await;

let payload = match result.authoritative.result {
Ok(payload) => payload.unwrap_or_else(|| "success".into()),
Err(e) => e.to_string(),
};

Ok(Some(CommandResponse::MigrationOperation(
MigrationOperationResponse { result: payload },
)))
}
_ => Err(format!(
"Invalid operation requested: {:?}",
params.operation
)),
}
}
command => Err(format!("Invalid command requested: {}", command)),
}
}
Expand Down Expand Up @@ -430,3 +566,25 @@ impl Drop for ClientEntity {
self.client.close();
}
}

async fn send_payload(endpoint: &str, payload: Option<String>) -> Result<String, String>
where
{
let client = reqwest::Client::new();
let response = client
.post(endpoint)
.body(payload.unwrap_or_default())
.send()
.await
.expect("sending request to SDK test harness");

if response.status().is_success() {
let body = response.text().await.expect("read harness response body");
Ok(body.to_string())
} else {
Err(format!(
"requested failed with status code {}",
response.status()
))
}
}
44 changes: 43 additions & 1 deletion contract-tests/src/command_params.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use launchdarkly_server_sdk::{AttributeValue, Context, FlagDetail, FlagValue, Reason};
use launchdarkly_server_sdk::{
AttributeValue, Context, ExecutionOrder, FlagDetail, FlagValue, Operation, Reason, Stage,
};
use serde::{self, Deserialize, Serialize};
use std::collections::HashMap;

Expand All @@ -9,6 +11,8 @@ pub enum CommandResponse {
EvaluateAll(EvaluateAllFlagsResponse),
ContextBuildOrConvert(ContextResponse),
SecureModeHash(SecureModeHashResponse),
MigrationVariation(MigrationVariationResponse),
MigrationOperation(MigrationOperationResponse),
}

#[derive(Deserialize, Debug)]
Expand All @@ -22,6 +26,8 @@ pub struct CommandParams {
pub context_build: Option<ContextBuildParams>,
pub context_convert: Option<ContextConvertParams>,
pub secure_mode_hash: Option<SecureModeHashParams>,
pub migration_variation: Option<MigrationVariationParams>,
pub migration_operation: Option<MigrationOperationParams>,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -130,3 +136,39 @@ pub struct SecureModeHashParams {
pub struct SecureModeHashResponse {
pub result: String,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MigrationVariationParams {
pub key: String,
pub context: Context,
pub default_stage: Stage,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MigrationVariationResponse {
pub result: Stage,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MigrationOperationParams {
pub key: String,
pub context: Context,
pub default_stage: Stage,
pub read_execution_order: ExecutionOrder,
pub operation: Operation,
pub old_endpoint: String,
pub new_endpoint: String,
pub payload: Option<String>,
pub track_latency: bool,
pub track_errors: bool,
pub track_consistency: bool,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MigrationOperationResponse {
pub result: String,
}
34 changes: 10 additions & 24 deletions contract-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ mod command_params;
use crate::command_params::CommandParams;
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError};
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Result};
use async_mutex::Mutex;
use client_entity::ClientEntity;
use futures::executor;
use hyper::client::HttpConnector;
use launchdarkly_server_sdk::Reference;
use serde::{self, Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Mutex};
use std::sync::mpsc;
use std::thread;

#[derive(Serialize)]
Expand Down Expand Up @@ -106,6 +107,8 @@ async fn status() -> impl Responder {
"inline-context".to_string(),
"anonymous-redaction".to_string(),
"omit-anonymous-contexts".to_string(),
"migrations".to_string(),
"event-sampling".to_string(),
],
})
}
Expand All @@ -132,17 +135,8 @@ async fn create_client(
Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)),
};

let mut counter = match app_state.counter.lock() {
Ok(c) => c,
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve counter"),
};

let mut entities = match app_state.client_entities.lock() {
Ok(h) => h,
Err(_) => {
return HttpResponse::InternalServerError().body("Unable to lock client_entities")
}
};
let mut counter = app_state.counter.lock().await;
let mut entities = app_state.client_entities.lock().await;

*counter += 1;
let client_resource = match req.url_for("client_path", [counter.to_string()]) {
Expand Down Expand Up @@ -171,17 +165,15 @@ async fn do_command(

let client_id = client_id.parse::<u32>().map_err(ErrorInternalServerError)?;

let entities = app_state
.client_entities
.lock()
.expect("Client entities cannot be locked");
let entities = app_state.client_entities.lock().await;

let entity = entities
.get(&client_id)
.ok_or_else(|| ErrorBadRequest("The specified client does not exist"))?;

let result = entity
.do_command(command_params.into_inner())
.await
.map_err(ErrorBadRequest)?;

match result {
Expand All @@ -197,14 +189,8 @@ async fn stop_client(req: HttpRequest, app_state: web::Data<AppState>) -> HttpRe
Err(_) => return HttpResponse::BadRequest().body("Unable to parse client id"),
};

match app_state.client_entities.lock() {
Ok(mut entities) => {
entities.remove(&client_id);
}
Err(_) => {
return HttpResponse::InternalServerError().body("Unable to retrieve handles")
}
};
let mut entities = app_state.client_entities.lock().await;
entities.remove(&client_id);

HttpResponse::NoContent().finish()
} else {
Expand Down
3 changes: 3 additions & 0 deletions launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ moka = { version = "0.12.1", features = ["sync"] }
uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8"

[dev-dependencies]
maplit = "1.0.1"
Expand All @@ -43,6 +44,8 @@ tokio = { version = "1.17.0", features = ["macros", "time"] }
test-case = "3.2.1"
mockito = "1.2.0"
assert-json-diff = "2.0.2"
async-std = "1.12.0"
reqwest = { version = "0.12.4", features = ["json"] }

[features]
default = ["rustls"]
Expand Down
Loading

0 comments on commit 445ab74

Please sign in to comment.