Skip to content

Commit

Permalink
Test Client::failed_job and Client::failed_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Sep 7, 2024
1 parent 19f7060 commit 07ffa40
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use builder::ClientBuilder;
struct Statements {
fetch_jobs: String,
delete_jobs: String,
fail_jobs: String,
create_job: String,
create_queue: String,
get_queue: String,
Expand All @@ -24,6 +25,7 @@ impl Statements {
fetch_jobs: sql::dml::fetch_jobs(name),
delete_jobs: sql::dml::delete_jobs(name),
create_job: sql::proc::create_job(name),
fail_jobs: sql::dml::fail_jobs(name),
create_queue: sql::proc::create_queue(name),
get_queue: sql::dml::get_queue(name),
get_queues: sql::dml::get_queues(name),
Expand Down
43 changes: 42 additions & 1 deletion src/client/public/job_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ impl Client {
/// Delete numerous jobs from a queue.
///
/// In a happy path, returns the number of deleted records, where `0` means
/// the specified queue if the jobs with these ids do not exist.
/// either the specified queue does not exist, or there are no jobs with
/// these ids in the queue.
pub async fn delete_jobs<Q, J>(&self, queue_name: Q, job_ids: J) -> Result<usize, Error>
where
Q: AsRef<str>,
Expand All @@ -120,4 +121,44 @@ impl Client {
.await?;
Ok(deleted_count.0 as usize)
}

/// Mark a job as failed
pub async fn fail_job<Q, O>(
&self,
queue_name: Q,
job_id: Uuid,
details: O,
) -> Result<bool, Error>
where
Q: AsRef<str>,
O: Into<serde_json::Value>,
{
let deleted_count = self.fail_jobs(queue_name, [job_id], details).await?;
Ok(deleted_count == 1)
}

/// Mark numerous jobs as failed.
///
/// In a happy path, returns the number of jobs marked as `failed`,
/// where `0` means either the specified queue does not exist,
/// or there are no jobs with these ids in the queue.
pub async fn fail_jobs<Q, I, O>(
&self,
queue_name: Q,
job_ids: I,
details: O,
) -> Result<usize, Error>
where
Q: AsRef<str>,
I: IntoIterator<Item = Uuid>,
O: Into<serde_json::Value>,
{
let deleted_count: (i64,) = sqlx::query_as(&self.stmt.fail_jobs)
.bind(queue_name.as_ref())
.bind(job_ids.into_iter().collect::<Vec<Uuid>>())
.bind(details.into())
.fetch_one(&self.pool)
.await?;
Ok(deleted_count.0 as usize)
}
}
7 changes: 7 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use sqlx::{postgres::PgRow, prelude::FromRow, Row};
use std::time::Duration;
use uuid::Uuid;

#[cfg(doc)]
use crate::QueueOptions;

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum JobState {
Created,
Expand Down Expand Up @@ -43,6 +46,9 @@ pub struct JobOptions {
pub dead_letter: Option<String>,

/// Number of retry attempts.
///
/// If omitted, a value will be taken from queue via [`QueueOptions::retry_limit`]
/// and - if not set there either - will default to `2` retry attempts.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_limit: Option<usize>,

Expand Down Expand Up @@ -207,6 +213,7 @@ impl JobBuilder {
}

/// Number of retry attempts.

pub fn retry_limit(mut self, value: usize) -> Self {
self.opts.retry_limit = Some(value);
self
Expand Down
40 changes: 37 additions & 3 deletions src/sql/dml.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::job::JobState;

pub(crate) fn check_if_app_installed(schema: &str) -> String {
format!(
"
Expand Down Expand Up @@ -61,7 +63,7 @@ pub(crate) fn get_queues(schema: &str) -> String {
pub(crate) fn fetch_jobs(schema: &str) -> String {
format!(
r#"
WITH next as (
WITH next AS (
SELECT id FROM {schema}.job
WHERE name = $1 AND state < 'active' AND start_after < now()
ORDER BY priority DESC, created_on, id
Expand All @@ -72,7 +74,7 @@ pub(crate) fn fetch_jobs(schema: &str) -> String {
UPDATE {schema}.job j SET
state = 'active',
started_on = now(),
retry_count = CASE WHEN started_on IS NULL THEN 0 ELSE retry_count + 1 END
retry_count = CASE WHEN started_on IS NULL THEN retry_count ELSE retry_count + 1 END
FROM next
WHERE name = $1 AND j.id = next.id
RETURNING j.id, name, data, EXTRACT(epoch FROM expire_in)::float8 as expire_in;
Expand All @@ -83,7 +85,7 @@ pub(crate) fn fetch_jobs(schema: &str) -> String {
pub(crate) fn delete_jobs(schema: &str) -> String {
format!(
r#"
WITH results as (
WITH results AS (
DELETE FROM {schema}.job
WHERE name = $1 AND id IN (SELECT UNNEST($2::uuid[]))
RETURNING 1
Expand All @@ -92,3 +94,35 @@ pub(crate) fn delete_jobs(schema: &str) -> String {
"#
)
}

pub(crate) fn fail_jobs(schema: &str) -> String {
format!(
r#"
WITH results AS (
UPDATE {schema}.job SET
state = CASE WHEN retry_count < retry_limit THEN '{0}'::{schema}.job_state ELSE '{1}'::{schema}.job_state END,
completed_on = CASE WHEN retry_count < retry_limit THEN NULL ELSE now() END,
start_after = CASE
WHEN retry_count = retry_limit THEN start_after
WHEN NOT retry_backoff THEN now() + retry_delay * interval '1'
ELSE now() + (
retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 +
retry_delay * 2 ^ LEAST(16, retry_count + 1) / 2 * random()
) * interval '1'
END,
output = $3
WHERE name = $1 AND id IN (SELECT UNNEST($2::uuid[])) AND state < '{2}'::{schema}.job_state
RETURNING *
), dlq_jobs AS (
INSERT INTO {schema}.job (name, data, output, retry_limit, keep_until)
SELECT dead_letter, data, output, retry_limit, keep_until + (keep_until - start_after)
FROM results WHERE state = '{3}'::{schema}.job_state AND dead_letter IS NOT NULL AND NOT name = dead_letter
)
SELECT COUNT(*) FROM results
"#,
JobState::Retry, // 0
JobState::Failed, // 1
JobState::Completed, // 2
JobState::Failed, // 3
)
}
2 changes: 1 addition & 1 deletion src/sql/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub(super) fn create_delete_queue_function(schema: &str) -> String {
DECLARE
table_name varchar;
BEGIN
WITH deleted as (
WITH deleted AS (
DELETE FROM {schema}.queue WHERE name = queue_name RETURNING partition_name
)
SELECT partition_name FROM deleted INTO table_name;
Expand Down
111 changes: 111 additions & 0 deletions tests/e2e/job_fail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::utils;
use pgboss::{Client, Job};
use serde_json::json;
use uuid::Uuid;

#[tokio::test]
async fn fail_job_without_retry() {
let local = "fail_job_without_fetching";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let job_id = Uuid::new_v4();
let queue_name = "jobtype";
let job = Job::builder()
.id(job_id)
.queue_name(queue_name)
.retry_limit(0)
.build();
c.send_job(job).await.unwrap();

// fetch a job, making it transition
// from `created` to `active`
let job = c
.fetch_job(queue_name)
.await
.expect("no error")
.expect("one job");
assert_eq!(job.id, job_id);

let marked_as_failed = c
.fail_job(queue_name, job_id, json!({"reason": "for demo purposes"}))
.await
.unwrap();
assert!(marked_as_failed);

// job transitioned from `created` directly to `failed`
assert!(c.fetch_job(queue_name).await.unwrap().is_none());
}

#[tokio::test]
async fn fail_job_with_retry() {
let local = "fail_job_with_retry";
utils::drop_schema(&local).await.unwrap();

let c = Client::builder().schema(local).connect().await.unwrap();
c.create_standard_queue("jobtype").await.unwrap();

let job1_id = Uuid::new_v4();
let queue_name = "jobtype";
let job1 = Job::builder()
.id(job1_id)
.queue_name(queue_name)
.retry_limit(1) // NB
.build();
c.send_job(job1).await.unwrap();

let job2_id = Uuid::new_v4();
let job2 = Job::builder()
.id(job2_id)
.queue_name(queue_name)
.retry_limit(0) // NB
.build();
c.send_job(job2).await.unwrap();

// fetch a job
let job = c
.fetch_job(queue_name)
.await
.expect("no error")
.expect("one job");
assert_eq!(job.id, job1_id);

let marked_as_failed = c
.fail_job(queue_name, job1_id, json!({"reason": "for demo purposes"}))
.await
.unwrap();
assert!(marked_as_failed);

// fetch a job again
let job = c
.fetch_job(queue_name)
.await
.expect("no error")
.expect("one job");
assert_eq!(job.id, job1_id); // retry!

let marked_as_failed = c
.fail_job(queue_name, job1_id, json!({"reason": "for demo purposes"}))
.await
.unwrap();
assert!(marked_as_failed);

// fetch a job
let job = c
.fetch_job(queue_name)
.await
.expect("no error")
.expect("one job");
assert_eq!(job.id, job2_id); // now job2's turn

let marked_as_failed = c
.fail_job(queue_name, job2_id, json!({"reason": "for demo purposes"}))
.await
.unwrap();
assert!(marked_as_failed);

// no retry for job2
assert!(c.fetch_job(queue_name).await.unwrap().is_none());
}
1 change: 1 addition & 0 deletions tests/e2e/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod job_delete;
mod job_fail;
mod job_fetch;
mod job_send;
mod queue;
Expand Down

0 comments on commit 07ffa40

Please sign in to comment.