Skip to content

Commit

Permalink
Add test for throttling with 'Job::singleton_for'
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Oct 1, 2024
1 parent bc78577 commit fcb5994
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docker/postgres.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM postgres:16.4
FROM postgres:17

WORKDIR /var/lib/postgresql/
COPY certs ./certs
Expand Down
44 changes: 37 additions & 7 deletions src/client/public/job_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ use uuid::Uuid;

impl Client {
/// Enqueue a job.
///
/// Will return [`Error::Conflict`] in case a job with this ID already exist,
/// which happen if you are providing the ID yourself.
///
/// If the queue does not exist, [`Error::DoesNotExist`] will be returned.
///
/// In case the system throttles the job (see [`Job::singleton_key`]), [`Error::Throttled`]
/// will be returned.
pub async fn send_job<'a, J>(&self, job: J) -> Result<Uuid, Error>
where
J: Borrow<Job<'a>>,
Expand All @@ -23,21 +31,43 @@ impl Client {
.map_err(|e| {
if let Some(db_error) = e.as_database_error() {
if let Some(constraint) = db_error.constraint() {
if constraint.starts_with('j') && constraint.ends_with("_pkey") {
return Error::Conflict {
msg: "job with this id already exists",
};
if constraint.starts_with('j') {
if constraint.ends_with("_pkey") {
return Error::Conflict {
msg: "job with this id already exists",
};
}
if constraint.ends_with("_i1") {
return Error::Throttled {
msg: "policy 'short' is applied to jobs with state 'created'",
};
}
if constraint.ends_with("_i2") {
return Error::Throttled {
msg: "policy 'singleton' is applied to jobs with state 'active'",
};
}
if constraint.ends_with("_i3") {
return Error::Throttled {
msg: "policy 'stately' is applied to jobs with state 'created', 'retry' or 'active'",
};
}
if constraint.ends_with("_i4") {
return Error::Throttled {
msg: "singleton policy applied to jobs with 'singleton_on' property and state not 'cancelled'",
};
}
}
if constraint == "dlq_fkey" {
return Error::Unprocessable {
return Error::DoesNotExist {
msg: "dead letter queue does not exist",
};
}
}
}
Error::Sqlx(e)
})?;
id.ok_or(Error::Unprocessable {
id.ok_or(Error::DoesNotExist {
msg: "queue does not exist",
})
}
Expand All @@ -55,7 +85,7 @@ impl Client {
.bind(Json(JobOptions::default()))
.fetch_one(&self.pool)
.await?;
id.ok_or(Error::Unprocessable {
id.ok_or(Error::DoesNotExist {
msg: "queue does not exist",
})
}
Expand Down
18 changes: 14 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@ pub enum Error {
#[error("db driver error")]
Sqlx(#[from] sqlx::Error),

/// Application error.
/// Specified queue is not registered in the system.
#[error("cannot process: {msg}")]
Unprocessable {
/// Details on what exactly went wrong.
DoesNotExist {
/// Further details.
msg: &'static str,
},

/// Constraint violation in the system.
/// Conflict in the system.
///
/// Most likely, an a job with this ID already exists
/// in the system and so operation fails.
#[error("conflict in the system: {msg}")]
Conflict {
/// Details on what exactly went wrong.
msg: &'static str,
},

/// The job has been throttled by the system.
#[error("throttled: {msg}")]
Throttled {
/// Which throttling has been applied.
msg: &'static str,
},
}
7 changes: 5 additions & 2 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,14 @@ pub struct JobDetails {
/// Will be `None` for a job that was not consumed just yet.
pub started_at: Option<DateTime<Utc>>,

///
/// Date used by the system internally for throttling.
///
/// This is calculated by the system using [`Job::singleton_for`] period.
/// This is the system's implementation detail and should not be relied on.
pub singleton_at: Option<NaiveDateTime>,

/// Key to use for throttling.
///
///
/// See [`Job::singleton_key`].
pub singleton_key: Option<String>,
}
Expand Down
38 changes: 35 additions & 3 deletions tests/e2e/job_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::utils;
use chrono::Utc;
use pgboss::{Client, Error, Job, QueuePolicy};
use serde_json::json;
use tokio::time;

#[tokio::test]
async fn send_job() {
Expand Down Expand Up @@ -75,7 +76,7 @@ async fn send_job_with_dead_letter_does_not_exist() {
.dead_letter("jobtype_dead_letter")
.build();
let err = c.send_job(&job).await.unwrap_err();
if let Error::Unprocessable { msg } = err {
if let Error::DoesNotExist { msg } = err {
assert_eq!(msg, "dead letter queue does not exist");
} else {
unreachable!()
Expand All @@ -89,7 +90,7 @@ async fn send_job_queue_does_not_exist() {

let c = Client::builder().schema(local).connect().await.unwrap();
let job = Job::builder().queue_name("jobtype").build();
if let Error::Unprocessable { msg } = c.send_job(&job).await.unwrap_err() {
if let Error::DoesNotExist { msg } = c.send_job(&job).await.unwrap_err() {
assert!(msg.contains("queue does not exist"))
} else {
unreachable!()
Expand All @@ -114,7 +115,7 @@ async fn send_data_queue_does_not_exist() {

let c = Client::builder().schema(local).connect().await.unwrap();

if let Error::Unprocessable { msg } = c
if let Error::DoesNotExist { msg } = c
.send_data("jobtype", serde_json::json!({"key": "value"}))
.await
.unwrap_err()
Expand Down Expand Up @@ -189,3 +190,34 @@ async fn send_job_fully_customized() {
.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
)
}

#[tokio::test]
async fn send_jobs_throttled() {
let local = "send_jobs_throttled";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let job1 = Job::builder()
.queue_name("jobtype")
.singleton_for(Duration::from_secs(1))
.build();
let job2 = Job::builder()
.queue_name("jobtype")
.singleton_for(Duration::from_secs(1))
.build();

let id1 = c.send_job(&job1).await.expect("no error");
let err = c.send_job(&job2).await.unwrap_err();
if let Error::Throttled { msg } = err {
assert_eq!(msg, "singleton policy applied to jobs with 'singleton_on' property and state not 'cancelled'");
} else {
unreachable!()
}

time::sleep(Duration::from_secs(1)).await;

let id2 = c.send_job(&job2).await.expect("queued this time and ID issued");
assert_ne!(id1, id2);
}

0 comments on commit fcb5994

Please sign in to comment.