diff --git a/docker/postgres.Dockerfile b/docker/postgres.Dockerfile index 080d547..2df289c 100644 --- a/docker/postgres.Dockerfile +++ b/docker/postgres.Dockerfile @@ -1,4 +1,4 @@ -FROM postgres:16.4 +FROM postgres:17 WORKDIR /var/lib/postgresql/ COPY certs ./certs diff --git a/src/client/public/job_ops.rs b/src/client/public/job_ops.rs index d7bcb13..da4373f 100644 --- a/src/client/public/job_ops.rs +++ b/src/client/public/job_ops.rs @@ -8,6 +8,14 @@ use uuid::Uuid; impl Client { /// Enqueue a job. + /// + /// Will return [`Error::Conflict`] in case a job with this ID already exist, + /// which happen if you are providing the ID yourself. + /// + /// If the queue does not exist, [`Error::DoesNotExist`] will be returned. + /// + /// In case the system throttles the job (see [`Job::singleton_key`]), [`Error::Throttled`] + /// will be returned. pub async fn send_job<'a, J>(&self, job: J) -> Result where J: Borrow>, @@ -23,13 +31,35 @@ impl Client { .map_err(|e| { if let Some(db_error) = e.as_database_error() { if let Some(constraint) = db_error.constraint() { - if constraint.starts_with('j') && constraint.ends_with("_pkey") { - return Error::Conflict { - msg: "job with this id already exists", - }; + if constraint.starts_with('j') { + if constraint.ends_with("_pkey") { + return Error::Conflict { + msg: "job with this id already exists", + }; + } + if constraint.ends_with("_i1") { + return Error::Throttled { + msg: "policy 'short' is applied to jobs with state 'created'", + }; + } + if constraint.ends_with("_i2") { + return Error::Throttled { + msg: "policy 'singleton' is applied to jobs with state 'active'", + }; + } + if constraint.ends_with("_i3") { + return Error::Throttled { + msg: "policy 'stately' is applied to jobs with state 'created', 'retry' or 'active'", + }; + } + if constraint.ends_with("_i4") { + return Error::Throttled { + msg: "singleton policy applied to jobs with 'singleton_on' property and state not 'cancelled'", + }; + } } if constraint == "dlq_fkey" { - return Error::Unprocessable { + return Error::DoesNotExist { msg: "dead letter queue does not exist", }; } @@ -37,7 +67,7 @@ impl Client { } Error::Sqlx(e) })?; - id.ok_or(Error::Unprocessable { + id.ok_or(Error::DoesNotExist { msg: "queue does not exist", }) } @@ -55,7 +85,7 @@ impl Client { .bind(Json(JobOptions::default())) .fetch_one(&self.pool) .await?; - id.ok_or(Error::Unprocessable { + id.ok_or(Error::DoesNotExist { msg: "queue does not exist", }) } diff --git a/src/error.rs b/src/error.rs index 2153a91..6161a3a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,17 +8,27 @@ pub enum Error { #[error("db driver error")] Sqlx(#[from] sqlx::Error), - /// Application error. + /// Specified queue is not registered in the system. #[error("cannot process: {msg}")] - Unprocessable { - /// Details on what exactly went wrong. + DoesNotExist { + /// Further details. msg: &'static str, }, - /// Constraint violation in the system. + /// Conflict in the system. + /// + /// Most likely, an a job with this ID already exists + /// in the system and so operation fails. #[error("conflict in the system: {msg}")] Conflict { /// Details on what exactly went wrong. msg: &'static str, }, + + /// The job has been throttled by the system. + #[error("throttled: {msg}")] + Throttled { + /// Which throttling has been applied. + msg: &'static str, + }, } diff --git a/src/job.rs b/src/job.rs index ff03b42..60219fe 100644 --- a/src/job.rs +++ b/src/job.rs @@ -201,11 +201,14 @@ pub struct JobDetails { /// Will be `None` for a job that was not consumed just yet. pub started_at: Option>, - /// + /// Date used by the system internally for throttling. + /// + /// This is calculated by the system using [`Job::singleton_for`] period. + /// This is the system's implementation detail and should not be relied on. pub singleton_at: Option, /// Key to use for throttling. - /// + /// /// See [`Job::singleton_key`]. pub singleton_key: Option, } diff --git a/tests/e2e/job_send.rs b/tests/e2e/job_send.rs index 9b15abc..92bc064 100644 --- a/tests/e2e/job_send.rs +++ b/tests/e2e/job_send.rs @@ -4,6 +4,7 @@ use crate::utils; use chrono::Utc; use pgboss::{Client, Error, Job, QueuePolicy}; use serde_json::json; +use tokio::time; #[tokio::test] async fn send_job() { @@ -75,7 +76,7 @@ async fn send_job_with_dead_letter_does_not_exist() { .dead_letter("jobtype_dead_letter") .build(); let err = c.send_job(&job).await.unwrap_err(); - if let Error::Unprocessable { msg } = err { + if let Error::DoesNotExist { msg } = err { assert_eq!(msg, "dead letter queue does not exist"); } else { unreachable!() @@ -89,7 +90,7 @@ async fn send_job_queue_does_not_exist() { let c = Client::builder().schema(local).connect().await.unwrap(); let job = Job::builder().queue_name("jobtype").build(); - if let Error::Unprocessable { msg } = c.send_job(&job).await.unwrap_err() { + if let Error::DoesNotExist { msg } = c.send_job(&job).await.unwrap_err() { assert!(msg.contains("queue does not exist")) } else { unreachable!() @@ -114,7 +115,7 @@ async fn send_data_queue_does_not_exist() { let c = Client::builder().schema(local).connect().await.unwrap(); - if let Error::Unprocessable { msg } = c + if let Error::DoesNotExist { msg } = c .send_data("jobtype", serde_json::json!({"key": "value"})) .await .unwrap_err() @@ -189,3 +190,34 @@ async fn send_job_fully_customized() { .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) ) } + +#[tokio::test] +async fn send_jobs_throttled() { + let local = "send_jobs_throttled"; + utils::drop_schema(&local).await.unwrap(); + + let c = Client::builder().schema(local).connect().await.unwrap(); + c.create_standard_queue("jobtype").await.unwrap(); + + let job1 = Job::builder() + .queue_name("jobtype") + .singleton_for(Duration::from_secs(1)) + .build(); + let job2 = Job::builder() + .queue_name("jobtype") + .singleton_for(Duration::from_secs(1)) + .build(); + + let id1 = c.send_job(&job1).await.expect("no error"); + let err = c.send_job(&job2).await.unwrap_err(); + if let Error::Throttled { msg } = err { + assert_eq!(msg, "singleton policy applied to jobs with 'singleton_on' property and state not 'cancelled'"); + } else { + unreachable!() + } + + time::sleep(Duration::from_secs(1)).await; + + let id2 = c.send_job(&job2).await.expect("queued this time and ID issued"); + assert_ne!(id1, id2); +}