Skip to content

Commit

Permalink
Add Job::singleton_key
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Sep 30, 2024
1 parent 6d43b99 commit bc78577
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
28 changes: 27 additions & 1 deletion src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ pub(crate) struct JobOptions<'a> {
skip_serializing_if = "Option::is_none"
)]
singleton_for: Option<Duration>,

#[serde(skip_serializing_if = "Option::is_none")]
singleton_key: Option<&'a str>,
}

/// A job to be sent to the server.
Expand Down Expand Up @@ -140,6 +143,11 @@ pub struct Job<'a> {
/// If you set this to, say, 60s and then submit 2 jobs within the same minute,
/// only the first job will be registered.
pub singleton_for: Option<Duration>,

/// Key to use for throttling.
///
/// Will extend throttling to allow one job per key within the time slot.
pub singleton_key: Option<&'a str>,
}

/// A job fetched from the server.
Expand Down Expand Up @@ -193,8 +201,13 @@ pub struct JobDetails {
/// Will be `None` for a job that was not consumed just yet.
pub started_at: Option<DateTime<Utc>>,

/// ...
///
pub singleton_at: Option<NaiveDateTime>,

/// Key to use for throttling.
///
/// See [`Job::singleton_key`].
pub singleton_key: Option<String>,
}

impl FromRow<'_, PgRow> for JobDetails {
Expand Down Expand Up @@ -248,6 +261,7 @@ impl FromRow<'_, PgRow> for JobDetails {
let started_at: Option<DateTime<Utc>> = row.try_get("started_at")?;
let start_after: DateTime<Utc> = row.try_get("start_after")?;
let singleton_at: Option<NaiveDateTime> = row.try_get("singleton_at")?;
let singleton_key: Option<String> = row.try_get("singleton_key")?;

Ok(JobDetails {
id,
Expand All @@ -264,6 +278,7 @@ impl FromRow<'_, PgRow> for JobDetails {
start_after,
started_at,
singleton_at,
singleton_key,
})
}
}
Expand All @@ -285,6 +300,7 @@ impl<'a> Job<'a> {
retain_for: self.retain_for,
start_after: self.start_after,
singleton_for: self.singleton_for,
singleton_key: self.singleton_key,
}
}
}
Expand All @@ -305,6 +321,7 @@ pub struct JobBuilder<'a> {
pub(crate) retain_for: Option<Duration>,
pub(crate) start_after: Option<DateTime<Utc>>,
pub(crate) singleton_for: Option<Duration>,
pub(crate) singleton_key: Option<&'a str>,
}

impl<'a> JobBuilder<'a> {
Expand Down Expand Up @@ -396,6 +413,14 @@ impl<'a> JobBuilder<'a> {
self
}

/// Key to use for throttling.
///
/// Will extend throttling to allow one job per key within the time slot.
pub fn singleton_key(mut self, value: &'a str) -> Self {
self.singleton_key = Some(value);
self
}

/// Creates a job.
pub fn build(self) -> Job<'a> {
Job {
Expand All @@ -411,6 +436,7 @@ impl<'a> JobBuilder<'a> {
retain_for: self.retain_for,
start_after: self.start_after,
singleton_for: self.singleton_for,
singleton_key: self.singleton_key,
}
}
}
12 changes: 7 additions & 5 deletions src/sql/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub(crate) fn fetch_jobs(schema: &str) -> String {
start_after,
created_on as created_at,
started_on as started_at,
singleton_on as singleton_at;
singleton_on as singleton_at,
singleton_key;
"#
)
}
Expand Down Expand Up @@ -259,7 +260,7 @@ pub(crate) fn complete_jobs(schema: &str) -> String {
JobState::Completed, // 1
)
}
// + + + + + + + + + + + + + +
// + + + + + + + + + + + + + + +
// id | name | priority | data | state | retry_limit | retry_count | retry_delay | retry_backoff | start_after | started_on | singleton_key | singleton_on | expire_in | created_on | completed_on | keep_until | output | dead_letter | policy
// --------------------------------------+---------+----------+------+-----------+-------------+-------------+-------------+---------------+-------------------------------+-------------------------------+---------------+--------------+-----------+-------------------------------+-------------------------------+-------------------------------+------------------------+-------------+----------
// 71c7e215-0528-417c-951b-fc01b3fac4b3 | jobtype | 0 | null | completed | 0 | 0 | 0 | f | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.514796+00 | | | 00:15:00 | 2024-09-29 09:23:09.502695+00 | 2024-09-29 09:23:09.526609+00 | 2024-10-13 09:23:09.502695+00 | {"result": "success!"} | | standard
Expand All @@ -278,9 +279,10 @@ pub(crate) fn get_job_info(schema: &str) -> String {
retry_count,
retry_backoff,
start_after,
created_on as "created_at",
started_on as "started_at",
singleton_on as "singleton_at"
created_on as created_at,
started_on as started_at,
singleton_on as singleton_at,
singleton_key
FROM {schema}.job
WHERE name = $1 and id = $2;
"#,
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/job_send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async fn send_job_fully_customized() {
.retain_for(Duration::from_secs(60 * 60 * 2))
.delay_for(Duration::from_secs(5))
.singleton_for(Duration::from_secs(7))
.singleton_key("buzz")
.build();

let inserted_id = c.send_job(&job).await.expect("no error");
Expand All @@ -176,6 +177,7 @@ async fn send_job_fully_customized() {
assert_eq!(job_info.retry_limit, job.retry_limit.unwrap());
assert_eq!(job_info.retry_backoff, job.retry_backoff.unwrap());
assert!(job_info.singleton_at.is_some());
assert_eq!(job_info.singleton_key.unwrap(), "buzz");

// PostgreSQL will cut nanoseconds
assert_eq!(
Expand Down

0 comments on commit bc78577

Please sign in to comment.