From d98cc49de6a270f2efe668ac64d69c0f3717c007 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 7 Oct 2024 21:38:14 +0400 Subject: [PATCH] Add loadtest --- Cargo.toml | 10 ++++-- Makefile | 1 + src/bin/loadtest.rs | 79 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9bc6d9b..de064d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ exclude = [".github", "docker", ".gitignore", "Makefile"] [features] default = [] -binaries = ["dep:clap", "dep:tracing-subscriber"] +binaries = ["dep:clap", "dep:tracing-subscriber", "dep:tokio", "dep:lazy_static"] [dependencies] chrono = { version = "0.4.38", features = ["serde"] } @@ -27,10 +27,16 @@ sqlx = { version = "=0.8.2", features = [ "uuid", ] } thiserror = "1.0.63" +tracing = "0.1.40" uuid = { version = "1.10.0", features = ["v4", "serde"] } + clap = { version = "4", features = ["derive"], optional = true } tracing-subscriber = { version = "0.3", optional = true } -tracing = "0.1.40" +tokio = { version = "1.39", features = [ + "macros", + "rt-multi-thread", +], optional = true } +lazy_static = { version = "1.5.0", optional = true } [dev-dependencies] lazy_static = "1.5.0" diff --git a/Makefile b/Makefile index 8bafbd2..4de9565 100644 --- a/Makefile +++ b/Makefile @@ -61,4 +61,5 @@ test/cov: .PHONY: test/load test/load: + POSTGRES_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE} \ cargo run --release --features binaries --bin loadtest -- $(args) diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index 83b4b91..434f5a7 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -1,11 +1,17 @@ use clap::Parser; +use pgboss::{Client, Error}; +use serde_json::json; +use std::sync::{atomic, Arc}; + +lazy_static::lazy_static! { + static ref SCHEMA_NAME: String = format!("schema_{}", uuid::Uuid::new_v4().as_simple()); +} + +static QUEUES: &[&str] = &["qname"]; #[derive(Parser)] #[command(version, about = "Loadtest for Rust implementation of PgBoss job queueing service.", long_about = None)] struct Cli { - #[arg(short, long, default_value_t = 1)] - schemas_count: usize, - #[arg(short, long, default_value_t = 30_000)] jobs_count: usize, @@ -13,11 +19,74 @@ struct Cli { threads_count: usize, } -fn main() { +#[tokio::main] +async fn main() { tracing_subscriber::fmt::fmt() .with_max_level(tracing::Level::INFO) .init(); let cli = Cli::parse(); - log::info!("Running a loadtest with the following settings schemas_count={}, jobs_count={}, threads_count={}", cli.schemas_count, cli.jobs_count, cli.threads_count); + log::info!("Running a loadtest with the following settings: jobs_count={}, threads_count={}. Schema name will be {}", cli.jobs_count, cli.threads_count, SCHEMA_NAME.as_str()); + + let jobs_sent = Arc::new(atomic::AtomicUsize::new(0)); + let jobs_fetched = Arc::new(atomic::AtomicUsize::new(0)); + + let c = Client::builder() + .schema(SCHEMA_NAME.as_str()) + .connect() + .await + .expect("connected and installed app"); + for &q in QUEUES { + c.create_standard_queue(q).await.unwrap(); + } + + let start = std::time::Instant::now(); + + let mut set = tokio::task::JoinSet::new(); + let threads_count = cli.threads_count; + let _: Vec<_> = (0..threads_count) + .map(|_| { + let jobs_sent = jobs_sent.clone(); + let jobs_fetched = jobs_fetched.clone(); + set.spawn(async move { + let c = Client::builder() + .schema(SCHEMA_NAME.as_str()) + .connect() + .await?; + for idx in 0..cli.jobs_count { + if idx % 2 == 0 { + let _id = c.send_data(QUEUES[0], json!({"key": "value"})).await?; + if jobs_sent.fetch_add(1, atomic::Ordering::SeqCst) >= cli.jobs_count { + return Ok(idx); + } + } else { + let _maybe_job = c.fetch_job(QUEUES[0]).await?; + if jobs_fetched.fetch_add(1, atomic::Ordering::SeqCst) >= cli.jobs_count { + return Ok(idx); + } + } + } + Ok::(cli.jobs_count) + }) + }) + .collect(); + + let mut results = Vec::with_capacity(threads_count); + while let Some(res) = set.join_next().await { + results.push(res.unwrap()); + } + + let time_elapsed = start.elapsed(); + let seconds_elapsed = (time_elapsed.as_secs() * 1_000_000_000 + + time_elapsed.subsec_nanos() as u64) as f64 + / 1_000_000_000.0; + + log::info!( + "Sent {} jobs and consumed {} jobs in {:.2} seconds, rate: {} jobs per second. Results: {:?}", + jobs_sent.load(atomic::Ordering::SeqCst), + jobs_fetched.load(atomic::Ordering::SeqCst), + seconds_elapsed, + cli.jobs_count as f64 / seconds_elapsed, + results, + ); }