Skip to content

Commit

Permalink
Add loadtest
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Oct 7, 2024
1 parent 5afbf8f commit d98cc49
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 7 deletions.
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ exclude = [".github", "docker", ".gitignore", "Makefile"]

[features]
default = []
binaries = ["dep:clap", "dep:tracing-subscriber"]
binaries = ["dep:clap", "dep:tracing-subscriber", "dep:tokio", "dep:lazy_static"]

[dependencies]
chrono = { version = "0.4.38", features = ["serde"] }
Expand All @@ -27,10 +27,16 @@ sqlx = { version = "=0.8.2", features = [
"uuid",
] }
thiserror = "1.0.63"
tracing = "0.1.40"
uuid = { version = "1.10.0", features = ["v4", "serde"] }

clap = { version = "4", features = ["derive"], optional = true }
tracing-subscriber = { version = "0.3", optional = true }
tracing = "0.1.40"
tokio = { version = "1.39", features = [
"macros",
"rt-multi-thread",
], optional = true }
lazy_static = { version = "1.5.0", optional = true }

[dev-dependencies]
lazy_static = "1.5.0"
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ test/cov:

.PHONY: test/load
test/load:
POSTGRES_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE} \
cargo run --release --features binaries --bin loadtest -- $(args)
79 changes: 74 additions & 5 deletions src/bin/loadtest.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,92 @@
use clap::Parser;
use pgboss::{Client, Error};
use serde_json::json;
use std::sync::{atomic, Arc};

lazy_static::lazy_static! {
static ref SCHEMA_NAME: String = format!("schema_{}", uuid::Uuid::new_v4().as_simple());
}

static QUEUES: &[&str] = &["qname"];

#[derive(Parser)]
#[command(version, about = "Loadtest for Rust implementation of PgBoss job queueing service.", long_about = None)]
struct Cli {
#[arg(short, long, default_value_t = 1)]
schemas_count: usize,

#[arg(short, long, default_value_t = 30_000)]
jobs_count: usize,

#[arg(short, long, default_value_t = 10)]
threads_count: usize,
}

fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::INFO)
.init();

let cli = Cli::parse();
log::info!("Running a loadtest with the following settings schemas_count={}, jobs_count={}, threads_count={}", cli.schemas_count, cli.jobs_count, cli.threads_count);
log::info!("Running a loadtest with the following settings: jobs_count={}, threads_count={}. Schema name will be {}", cli.jobs_count, cli.threads_count, SCHEMA_NAME.as_str());

let jobs_sent = Arc::new(atomic::AtomicUsize::new(0));
let jobs_fetched = Arc::new(atomic::AtomicUsize::new(0));

let c = Client::builder()
.schema(SCHEMA_NAME.as_str())
.connect()
.await
.expect("connected and installed app");
for &q in QUEUES {
c.create_standard_queue(q).await.unwrap();
}

let start = std::time::Instant::now();

let mut set = tokio::task::JoinSet::new();
let threads_count = cli.threads_count;
let _: Vec<_> = (0..threads_count)
.map(|_| {
let jobs_sent = jobs_sent.clone();
let jobs_fetched = jobs_fetched.clone();
set.spawn(async move {
let c = Client::builder()
.schema(SCHEMA_NAME.as_str())
.connect()
.await?;
for idx in 0..cli.jobs_count {
if idx % 2 == 0 {
let _id = c.send_data(QUEUES[0], json!({"key": "value"})).await?;
if jobs_sent.fetch_add(1, atomic::Ordering::SeqCst) >= cli.jobs_count {
return Ok(idx);
}
} else {
let _maybe_job = c.fetch_job(QUEUES[0]).await?;
if jobs_fetched.fetch_add(1, atomic::Ordering::SeqCst) >= cli.jobs_count {
return Ok(idx);
}
}
}
Ok::<usize, Error>(cli.jobs_count)
})
})
.collect();

let mut results = Vec::with_capacity(threads_count);
while let Some(res) = set.join_next().await {
results.push(res.unwrap());
}

let time_elapsed = start.elapsed();
let seconds_elapsed = (time_elapsed.as_secs() * 1_000_000_000
+ time_elapsed.subsec_nanos() as u64) as f64
/ 1_000_000_000.0;

log::info!(
"Sent {} jobs and consumed {} jobs in {:.2} seconds, rate: {} jobs per second. Results: {:?}",
jobs_sent.load(atomic::Ordering::SeqCst),
jobs_fetched.load(atomic::Ordering::SeqCst),
seconds_elapsed,
cli.jobs_count as f64 / seconds_elapsed,
results,
);
}

0 comments on commit d98cc49

Please sign in to comment.