Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for migrations #90

Merged
merged 9 commits into from
Jul 19, 2024
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Run CI
on:
push:
branches: [ main ]
branches: [ main, 'feat/**' ]
paths-ignore:
- '**.md' # Do not need to run CI for markdown changes.
pull_request:
branches: [ main ]
branches: [ main, 'feat/**' ]
paths-ignore:
- '**.md'

Expand Down Expand Up @@ -57,4 +57,4 @@ jobs:
run: sudo apt-get update && sudo apt-get install -y musl-tools

- name: Build
run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl
run: TARGET_CC=musl-gcc RUSTFLAGS="-C linker=musl-gcc" cargo build --release --target=x86_64-unknown-linux-musl -p launchdarkly-server-sdk
2 changes: 2 additions & 0 deletions contract-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ futures = "0.3.12"
hyper = { version = "0.14.19", features = ["client"] }
hyper-rustls = { version = "0.24.1" , optional = true, features = ["http2"]}
hyper-tls = { version = "0.5.0", optional = true }
reqwest = { version = "0.12.4", features = ["default", "blocking", "json"] }
async-mutex = "1.4.0"

[features]
default = ["rustls"]
Expand Down
168 changes: 163 additions & 5 deletions contract-tests/src/client_entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use launchdarkly_server_sdk::{Context, ContextBuilder, MultiContextBuilder, Reference};
use futures::future::FutureExt;
use launchdarkly_server_sdk::{
Context, ContextBuilder, MigratorBuilder, MultiContextBuilder, Reference,
};
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_POLLING_BASE_URL: &str = "https://sdk.launchdarkly.com";
Expand All @@ -12,7 +16,8 @@ use launchdarkly_server_sdk::{
};

use crate::command_params::{
ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse, SecureModeHashResponse,
ContextBuildParams, ContextConvertParams, ContextParam, ContextResponse,
MigrationOperationResponse, MigrationVariationResponse, SecureModeHashResponse,
};
use crate::HttpsConnector;
use crate::{
Expand All @@ -24,7 +29,7 @@ use crate::{
};

pub struct ClientEntity {
client: Client,
client: Arc<Client>,
}

impl ClientEntity {
Expand Down Expand Up @@ -131,10 +136,15 @@ impl ClientEntity {
client.start_with_default_executor();
client.wait_for_initialization(Duration::from_secs(5)).await;

Ok(Self { client })
Ok(Self {
client: Arc::new(client),
})
}

pub fn do_command(&self, command: CommandParams) -> Result<Option<CommandResponse>, String> {
pub async fn do_command(
&self,
command: CommandParams,
) -> Result<Option<CommandResponse>, String> {
match command.command.as_str() {
"evaluate" => Ok(Some(CommandResponse::EvaluateFlag(
self.evaluate(command.evaluate.ok_or("Evaluate params should be set")?),
Expand Down Expand Up @@ -211,6 +221,132 @@ impl ClientEntity {
},
)))
}
"migrationVariation" => {
let params = command
.migration_variation
.ok_or("migrationVariation params should be set")?;

let (stage, _) = self.client.migration_variation(
&params.context,
&params.key,
params.default_stage,
);

Ok(Some(CommandResponse::MigrationVariation(
MigrationVariationResponse { result: stage },
)))
}
"migrationOperation" => {
let params = command
.migration_operation
.ok_or("migrationOperation params should be set")?;

let mut builder = MigratorBuilder::new(self.client.clone());

builder = builder
.read_execution_order(params.read_execution_order)
.track_errors(params.track_errors)
.track_latency(params.track_latency)
.read(
|payload: &Option<String>| {
let old_endpoint = params.old_endpoint.clone();
async move {
let result = send_payload(&old_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
|payload| {
let new_endpoint = params.new_endpoint.clone();
async move {
let result = send_payload(&new_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
if params.track_consistency {
Some(|lhs, rhs| lhs == rhs)
} else {
None
},
)
.write(
|payload| {
let old_endpoint = params.old_endpoint.clone();
async move {
let result = send_payload(&old_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
|payload| {
let new_endpoint = params.new_endpoint.clone();
async move {
let result = send_payload(&new_endpoint, payload.clone()).await;
match result {
Ok(r) => Ok(Some(r)),
Err(e) => Err(e),
}
}
.boxed()
},
);

let mut migrator = builder.build().expect("builder failed");
match params.operation {
launchdarkly_server_sdk::Operation::Read => {
let result = migrator
.read(
&params.context,
params.key,
params.default_stage,
params.payload,
)
.await;

let payload = match result.result {
Ok(payload) => payload.unwrap_or_else(|| "success".into()),
Err(e) => e.to_string(),
};

Ok(Some(CommandResponse::MigrationOperation(
MigrationOperationResponse { result: payload },
)))
}
launchdarkly_server_sdk::Operation::Write => {
let result = migrator
.write(
&params.context,
params.key,
params.default_stage,
params.payload,
)
.await;

let payload = match result.authoritative.result {
Ok(payload) => payload.unwrap_or_else(|| "success".into()),
Err(e) => e.to_string(),
};

Ok(Some(CommandResponse::MigrationOperation(
MigrationOperationResponse { result: payload },
)))
}
_ => Err(format!(
"Invalid operation requested: {:?}",
params.operation
)),
}
}
command => Err(format!("Invalid command requested: {}", command)),
}
}
Expand Down Expand Up @@ -430,3 +566,25 @@ impl Drop for ClientEntity {
self.client.close();
}
}

async fn send_payload(endpoint: &str, payload: Option<String>) -> Result<String, String>
where
{
let client = reqwest::Client::new();
let response = client
.post(endpoint)
.body(payload.unwrap_or_default())
.send()
.await
.expect("sending request to SDK test harness");

if response.status().is_success() {
let body = response.text().await.expect("read harness response body");
Ok(body.to_string())
} else {
Err(format!(
"requested failed with status code {}",
response.status()
))
}
}
44 changes: 43 additions & 1 deletion contract-tests/src/command_params.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use launchdarkly_server_sdk::{AttributeValue, Context, FlagDetail, FlagValue, Reason};
use launchdarkly_server_sdk::{
AttributeValue, Context, ExecutionOrder, FlagDetail, FlagValue, Operation, Reason, Stage,
};
use serde::{self, Deserialize, Serialize};
use std::collections::HashMap;

Expand All @@ -9,6 +11,8 @@ pub enum CommandResponse {
EvaluateAll(EvaluateAllFlagsResponse),
ContextBuildOrConvert(ContextResponse),
SecureModeHash(SecureModeHashResponse),
MigrationVariation(MigrationVariationResponse),
MigrationOperation(MigrationOperationResponse),
}

#[derive(Deserialize, Debug)]
Expand All @@ -22,6 +26,8 @@ pub struct CommandParams {
pub context_build: Option<ContextBuildParams>,
pub context_convert: Option<ContextConvertParams>,
pub secure_mode_hash: Option<SecureModeHashParams>,
pub migration_variation: Option<MigrationVariationParams>,
pub migration_operation: Option<MigrationOperationParams>,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -130,3 +136,39 @@ pub struct SecureModeHashParams {
pub struct SecureModeHashResponse {
pub result: String,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MigrationVariationParams {
pub key: String,
pub context: Context,
pub default_stage: Stage,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MigrationVariationResponse {
pub result: Stage,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MigrationOperationParams {
pub key: String,
pub context: Context,
pub default_stage: Stage,
pub read_execution_order: ExecutionOrder,
pub operation: Operation,
pub old_endpoint: String,
pub new_endpoint: String,
pub payload: Option<String>,
pub track_latency: bool,
pub track_errors: bool,
pub track_consistency: bool,
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MigrationOperationResponse {
pub result: String,
}
34 changes: 10 additions & 24 deletions contract-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ mod command_params;
use crate::command_params::CommandParams;
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError};
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, Result};
use async_mutex::Mutex;
use client_entity::ClientEntity;
use futures::executor;
use hyper::client::HttpConnector;
use launchdarkly_server_sdk::Reference;
use serde::{self, Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Mutex};
use std::sync::mpsc;
use std::thread;

#[derive(Serialize)]
Expand Down Expand Up @@ -106,6 +107,8 @@ async fn status() -> impl Responder {
"inline-context".to_string(),
"anonymous-redaction".to_string(),
"omit-anonymous-contexts".to_string(),
"migrations".to_string(),
"event-sampling".to_string(),
],
})
}
Expand All @@ -132,17 +135,8 @@ async fn create_client(
Err(e) => return HttpResponse::InternalServerError().body(format!("{}", e)),
};

let mut counter = match app_state.counter.lock() {
Ok(c) => c,
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve counter"),
};

let mut entities = match app_state.client_entities.lock() {
Ok(h) => h,
Err(_) => {
return HttpResponse::InternalServerError().body("Unable to lock client_entities")
}
};
let mut counter = app_state.counter.lock().await;
let mut entities = app_state.client_entities.lock().await;

*counter += 1;
let client_resource = match req.url_for("client_path", [counter.to_string()]) {
Expand Down Expand Up @@ -171,17 +165,15 @@ async fn do_command(

let client_id = client_id.parse::<u32>().map_err(ErrorInternalServerError)?;

let entities = app_state
.client_entities
.lock()
.expect("Client entities cannot be locked");
let entities = app_state.client_entities.lock().await;

let entity = entities
.get(&client_id)
.ok_or_else(|| ErrorBadRequest("The specified client does not exist"))?;

let result = entity
.do_command(command_params.into_inner())
.await
.map_err(ErrorBadRequest)?;

match result {
Expand All @@ -197,14 +189,8 @@ async fn stop_client(req: HttpRequest, app_state: web::Data<AppState>) -> HttpRe
Err(_) => return HttpResponse::BadRequest().body("Unable to parse client id"),
};

match app_state.client_entities.lock() {
Ok(mut entities) => {
entities.remove(&client_id);
}
Err(_) => {
return HttpResponse::InternalServerError().body("Unable to retrieve handles")
}
};
let mut entities = app_state.client_entities.lock().await;
entities.remove(&client_id);

HttpResponse::NoContent().finish()
} else {
Expand Down
3 changes: 3 additions & 0 deletions launchdarkly-server-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ triomphe = { version = "<=0.1.10" }
uuid = {version = "1.2.2", features = ["v4"] }
hyper = { version = "0.14.19", features = ["client", "http1", "http2", "tcp"] }
hyper-rustls = { version = "0.24.1" , optional = true}
rand = "0.8"

[dev-dependencies]
maplit = "1.0.1"
Expand All @@ -49,6 +50,8 @@ tokio = { version = "1.17.0", features = ["macros", "time"] }
test-case = "3.2.1"
mockito = "1.2.0"
assert-json-diff = "2.0.2"
async-std = "1.12.0"
reqwest = { version = "0.12.4", features = ["json"] }

[features]
default = ["rustls"]
Expand Down
Loading