From 1552f75e908ace8ef99f3786238a7cf97ad68237 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 3 Dec 2023 20:42:43 +0500 Subject: [PATCH 01/33] Use 'derive_builder' macro for 'Job' --- Cargo.lock | 73 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/proto/single/mod.rs | 6 ++-- 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e9d6667..6663ff8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,72 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "derive_builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.5" @@ -163,6 +229,7 @@ dependencies = [ "bufstream", "chrono", "clap", + "derive_builder", "fnv", "hostname", "libc", @@ -277,6 +344,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 1372241c..d2f84dae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ fnv = "1.0.5" native-tls = { version = "0.2", optional = true } clap = { version = "3.1.0", optional = true } thiserror = "1.0.30" +derive_builder = "0.12.0" [dev-dependencies] mockstream = "0.0.3" diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 9eca8d8e..555b62b8 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Utc}; +use derive_builder::Builder; use std::collections::HashMap; use std::io::prelude::*; @@ -13,7 +14,8 @@ pub use self::resp::*; /// A Faktory job. /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Builder)] +#[builder(setter(into))] pub struct Job { /// The job's unique identifier. pub(crate) jid: String, @@ -86,7 +88,7 @@ pub struct Job { pub custom: HashMap, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Failure { retry_count: usize, failed_at: String, From 69e6836ac980893396d3b2531cd1bc7cfa7490dc Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 3 Dec 2023 20:45:19 +0500 Subject: [PATCH 02/33] Add initial test for 'JobBuilder' --- src/proto/single/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 555b62b8..aed5d248 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -177,3 +177,13 @@ pub fn write_command_and_await_ok( write_command(x, command)?; read_ok(x) } + +#[cfg(test)] +mod test { + use super::JobBuilder; + #[test] + fn test_job_can_be_created_with_builder() { + let job = JobBuilder::default().build(); + assert!(job.is_err()); + } +} From d389b19e87e375ccc0af80628bf0accc094c5d44 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 3 Dec 2023 20:45:59 +0500 Subject: [PATCH 03/33] Add client-side error group, add 'MalformedJob' --- src/error.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 6c3d4dd9..9d762c83 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,9 @@ //! [`Protocol`] describes lower-level errors relating to communication //! with the faktory server. Typically, [`Protocol`] errors are the result //! of the server sending a response this client did not expect. +//! +//! [`Client`] describes errors that occur even before communication with the server, e.g. +//! errors when building a 'Job'. use thiserror::Error; @@ -22,7 +25,11 @@ pub enum Error { /// The connection to the server, or one of its prerequisites, failed. #[error("connection")] Connect(#[from] Connect), - + /// Client-side errors. + /// + /// These are errors arising even before communicating to server, e.g. malformed job. + #[error("client")] + Client(#[from] Client), /// Underlying I/O layer errors. /// /// These are overwhelmingly network communication errors on the socket connection to the server. @@ -82,6 +89,18 @@ pub enum Connect { ParseUrl(#[source] url::ParseError), } +/// Errors happening client side +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum Client { + /// The 'Job' is malformed. + #[error("job is malformed: {desc}")] + MalformedJob { + /// Details on what is missing or incorrect about the 'Job' + desc: String, + }, +} + /// The set of observable application-level errors when interacting with a Faktory server. #[derive(Debug, Error)] #[non_exhaustive] From 3aa1599892210421f80ddb348d061b3a7fc9b4f4 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 3 Dec 2023 22:04:01 +0500 Subject: [PATCH 04/33] Add custom build remapping to 'MalformedJob' err --- src/proto/single/mod.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index aed5d248..9e49aac5 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -6,7 +6,7 @@ use std::io::prelude::*; mod cmd; mod resp; -use crate::error::Error; +use crate::error::{self, Error}; pub use self::cmd::*; pub use self::resp::*; @@ -15,7 +15,7 @@ pub use self::resp::*; /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] -#[builder(setter(into))] +#[builder(setter(into), build_fn(name = "try_build"))] pub struct Job { /// The job's unique identifier. pub(crate) jid: String, @@ -88,6 +88,18 @@ pub struct Job { pub custom: HashMap, } +impl JobBuilder { + #[allow(dead_code)] + fn build(&self) -> Result { + let job = self + .try_build() + .map_err(|err| error::Client::MalformedJob { + desc: err.to_string(), + })?; + Ok(job) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Failure { retry_count: usize, @@ -180,10 +192,15 @@ pub fn write_command_and_await_ok( #[cfg(test)] mod test { - use super::JobBuilder; + use super::*; + #[test] fn test_job_can_be_created_with_builder() { let job = JobBuilder::default().build(); - assert!(job.is_err()); + let err = job.unwrap_err(); + assert_eq!( + err.to_string(), + "job is malformed: `jid` must be initialized" + ) } } From e888e398d46bde3a20699f21c4cc1e4d1ef5480c Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 4 Dec 2023 11:11:59 +0500 Subject: [PATCH 05/33] Add 'single/utils' mod with 'gen_random_jid' utility --- src/proto/single/mod.rs | 8 ++----- src/proto/single/utils.rs | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 src/proto/single/utils.rs diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 9e49aac5..e2ae9b68 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -5,6 +5,7 @@ use std::io::prelude::*; mod cmd; mod resp; +mod utils; use crate::error::{self, Error}; @@ -122,12 +123,7 @@ impl Job { S: Into, A: Into, { - use rand::{thread_rng, Rng}; - let random_jid = thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .map(char::from) - .take(16) - .collect(); + let random_jid = utils::gen_random_jid(); use chrono::prelude::*; Job { jid: random_jid, diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs new file mode 100644 index 00000000..b6f068dc --- /dev/null +++ b/src/proto/single/utils.rs @@ -0,0 +1,45 @@ +use rand::{thread_rng, Rng}; + +const JOB_ID_LENGTH: usize = 16; + +pub fn gen_random_jid() -> String { + thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .map(char::from) + .take(JOB_ID_LENGTH) + .collect() +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use super::*; + + #[test] + fn test_jid_of_known_size_generated() { + let jid1 = gen_random_jid(); + let jid2 = gen_random_jid(); + assert_ne!(jid1, jid2); + println!("{}", jid1); + assert_eq!(jid1.len(), JOB_ID_LENGTH); + assert_eq!(jid2.len(), JOB_ID_LENGTH); + } + + #[test] + fn test_jids_are_unique() { + let mut ids = HashSet::new(); + + ids.insert("IYKOxEfLcwcgKaRa".to_string()); + ids.insert("IYKOxEfLcwcgKaRa".to_string()); + assert_ne!(ids.len(), 2); + + ids.clear(); + + for _ in 0..1_000_000 { + let jid = gen_random_jid(); + ids.insert(jid); + } + assert_eq!(ids.len(), 1_000_000); + } +} From 21e6dbbf3607278a4ba218e2cb7dac380cfb5a00 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 4 Dec 2023 11:19:36 +0500 Subject: [PATCH 06/33] Rm duplicated import in Job::new --- src/proto/single/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index e2ae9b68..fe6091e5 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -123,10 +123,8 @@ impl Job { S: Into, A: Into, { - let random_jid = utils::gen_random_jid(); - use chrono::prelude::*; Job { - jid: random_jid, + jid: utils::gen_random_jid(), queue: "default".into(), kind: kind.into(), args: args.into_iter().map(|s| s.into()).collect(), From 88a3d77a5ea4695e339b84c6bb0fd59bf8f66908 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 4 Dec 2023 11:22:00 +0500 Subject: [PATCH 07/33] Use 'gen_random_jid' as 'jid' default on 'Job' struct --- src/proto/single/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index fe6091e5..a3e03ac3 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -19,6 +19,7 @@ pub use self::resp::*; #[builder(setter(into), build_fn(name = "try_build"))] pub struct Job { /// The job's unique identifier. + #[builder(default = "utils::gen_random_jid()")] pub(crate) jid: String, /// The queue this job belongs to. Usually `default`. @@ -194,7 +195,7 @@ mod test { let err = job.unwrap_err(); assert_eq!( err.to_string(), - "job is malformed: `jid` must be initialized" + "job is malformed: `queue` must be initialized" ) } } From ede140b699c9d561e93a55445bd7b7caf4e27589 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 4 Dec 2023 11:31:59 +0500 Subject: [PATCH 08/33] Add builder default for 'gueue' field on 'Job' struct --- src/proto/single/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a3e03ac3..4fbde467 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -23,6 +23,7 @@ pub struct Job { pub(crate) jid: String, /// The queue this job belongs to. Usually `default`. + #[builder(default = r#"String::from("default")"#)] pub queue: String, /// The job's type. Called `kind` because `type` is reserved. @@ -195,7 +196,7 @@ mod test { let err = job.unwrap_err(); assert_eq!( err.to_string(), - "job is malformed: `queue` must be initialized" + "job is malformed: `kind` must be initialized" ) } } From f1f3b34980c2008850f01a8bbc7ed6f65f575e32 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 4 Dec 2023 12:12:17 +0500 Subject: [PATCH 09/33] Add test for required fields in 'Job' --- src/proto/single/mod.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 4fbde467..f317123e 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -191,12 +191,38 @@ mod test { use super::*; #[test] - fn test_job_can_be_created_with_builder() { - let job = JobBuilder::default().build(); + fn test_job_build_fails_if_kind_missing() { + let job = JobBuilder::default() + .args(vec![serde_json::Value::from("ISBN-13:9781718501850")]) + .build(); let err = job.unwrap_err(); assert_eq!( err.to_string(), "job is malformed: `kind` must be initialized" ) } + + #[test] + fn test_job_build_fails_if_args_missing() { + let job = JobBuilder::default().kind("order").build(); + let err = job.unwrap_err(); + assert_eq!( + err.to_string(), + "job is malformed: `args` must be initialized" + ); + } + + #[test] + fn test_job_can_be_created_with_builder() { + let job = JobBuilder::default() + .kind("order") + .args(vec![serde_json::Value::from("ISBN-13:9781718501850")]) + .build(); + + let err = job.unwrap_err(); + assert_eq!( + err.to_string(), + "job is malformed: `created_at` must be initialized" + ) + } } From 6885a3be8d1c572f3b6f5a9de0c5806d44131aa1 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 00:13:43 +0500 Subject: [PATCH 10/33] Add defaults for 'created_at', 'enqueued_at' and 'at' fields --- src/proto/single/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index f317123e..a4b31785 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -37,16 +37,19 @@ pub struct Job { // note that serializing works correctly here since the default chrono serialization // is RFC3339, which is also what Faktory expects. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(Utc::now())")] pub created_at: Option>, /// When this job was supplied to the Faktory server. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] pub enqueued_at: Option>, /// When this job is scheduled for. /// /// Defaults to immediately. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] pub at: Option>, /// How long to allow this job to run for. @@ -222,7 +225,7 @@ mod test { let err = job.unwrap_err(); assert_eq!( err.to_string(), - "job is malformed: `created_at` must be initialized" + "job is malformed: `reserve_for` must be initialized" ) } } From 21179ace9ed2edc6b387138174ae0dcccc035265 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 09:47:11 +0500 Subject: [PATCH 11/33] Add defaults for 'reserved_for', 'retry' and 'priority' fields --- src/proto/single/mod.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a4b31785..a81923ad 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -12,6 +12,10 @@ use crate::error::{self, Error}; pub use self::cmd::*; pub use self::resp::*; +const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; +const JOB_DEFAULT_RETRIES_COUNT: usize = 25; +const JOB_DEFAULT_PRIORITY: u8 = 5; + /// A Faktory job. /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). @@ -56,18 +60,21 @@ pub struct Job { /// /// Defaults to 600 seconds. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")] pub reserve_for: Option, /// Number of times to retry this job. /// /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] - pub retry: Option, + #[builder(default = "Some(JOB_DEFAULT_RETRIES_COUNT)")] + pub retry: Option, /// The priority of this job from 1-9 (9 is highest). /// /// Pushing a job with priority 9 will effectively put it at the front of the queue. /// Defaults to 5. + #[builder(default = "Some(JOB_DEFAULT_PRIORITY)")] pub priority: Option, /// Number of lines of backtrace to keep if this job fails. @@ -225,7 +232,7 @@ mod test { let err = job.unwrap_err(); assert_eq!( err.to_string(), - "job is malformed: `reserve_for` must be initialized" + "job is malformed: `backtrace` must be initialized" ) } } From c41e4a0a95c52819677b5e5b5b14236875d1ed3b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 10:02:22 +0500 Subject: [PATCH 12/33] Add custom validator, check priority is valid --- src/proto/single/mod.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a81923ad..c25349ee 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -14,13 +14,17 @@ pub use self::resp::*; const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; const JOB_DEFAULT_RETRIES_COUNT: usize = 25; +const JOB_PRIORITY_MAX: u8 = 9; const JOB_DEFAULT_PRIORITY: u8 = 5; /// A Faktory job. /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] -#[builder(setter(into), build_fn(name = "try_build"))] +#[builder( + setter(into), + build_fn(name = "try_build", validate = "Self::validate") +)] pub struct Job { /// The job's unique identifier. #[builder(default = "utils::gen_random_jid()")] @@ -102,6 +106,15 @@ pub struct Job { } impl JobBuilder { + fn validate(&self) -> Result<(), String> { + if let Some(ref priority) = self.priority { + if *priority > Some(JOB_PRIORITY_MAX) { + return Err("`priority` must be in the range from 0 to 9 inclusive".to_string()); + } + } + Ok(()) + } + #[allow(dead_code)] fn build(&self) -> Result { let job = self @@ -222,6 +235,20 @@ mod test { ); } + #[test] + fn test_job_build_fails_if_priority_invalid() { + let job = JobBuilder::default() + .kind("order") + .args(vec![]) + .priority(JOB_PRIORITY_MAX + 1) + .build(); + let err = job.unwrap_err(); + assert_eq!( + err.to_string(), + "job is malformed: `priority` must be in the range from 0 to 9 inclusive" + ); + } + #[test] fn test_job_can_be_created_with_builder() { let job = JobBuilder::default() From ca2bbdc0af50e41c57edd454686f718faf3a40d9 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 10:22:32 +0500 Subject: [PATCH 13/33] Add test for job created with 'JobBuilder' --- src/proto/single/mod.rs | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index c25349ee..ece60042 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -12,10 +12,12 @@ use crate::error::{self, Error}; pub use self::cmd::*; pub use self::resp::*; +const JOB_DEFAULT_QUEUE: &str = "default"; const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; -const JOB_DEFAULT_RETRIES_COUNT: usize = 25; +const JOB_DEFAULT_RETRY_COUNT: usize = 25; const JOB_PRIORITY_MAX: u8 = 9; const JOB_DEFAULT_PRIORITY: u8 = 5; +const JOB_DEFAULT_BACKTRACE: usize = 0; /// A Faktory job. /// @@ -31,7 +33,7 @@ pub struct Job { pub(crate) jid: String, /// The queue this job belongs to. Usually `default`. - #[builder(default = r#"String::from("default")"#)] + #[builder(default = "JOB_DEFAULT_QUEUE.into()")] pub queue: String, /// The job's type. Called `kind` because `type` is reserved. @@ -71,7 +73,7 @@ pub struct Job { /// /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default = "Some(JOB_DEFAULT_RETRIES_COUNT)")] + #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] pub retry: Option, /// The priority of this job from 1-9 (9 is highest). @@ -85,12 +87,14 @@ pub struct Job { /// /// Defaults to 0. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(JOB_DEFAULT_BACKTRACE)")] pub backtrace: Option, /// Data about this job's most recent failure. /// /// This field is read-only. #[serde(skip_serializing)] + #[builder(default = "None")] failure: Option, /// Extra context to include with the job. @@ -102,6 +106,7 @@ pub struct Job { /// across a complex distributed system, etc. #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default = "HashMap::default")] + #[builder(default = "HashMap::default()")] pub custom: HashMap, } @@ -251,15 +256,26 @@ mod test { #[test] fn test_job_can_be_created_with_builder() { + let job_kind = "order"; + let job_args = vec![serde_json::Value::from("ISBN-13:9781718501850")]; let job = JobBuilder::default() - .kind("order") - .args(vec![serde_json::Value::from("ISBN-13:9781718501850")]) - .build(); - - let err = job.unwrap_err(); - assert_eq!( - err.to_string(), - "job is malformed: `backtrace` must be initialized" - ) + .kind(job_kind) + .args(job_args.clone()) + .build() + .unwrap(); + + assert!(job.jid != "".to_owned()); + assert!(job.queue == JOB_DEFAULT_QUEUE.to_string()); + assert_eq!(job.kind, job_kind); + assert_eq!(job.args, job_args); + assert!(job.created_at < Some(Utc::now())); + assert!(job.enqueued_at.is_none()); + assert!(job.at.is_none()); + assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS)); + assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT)); + assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY)); + assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE)); + assert!(job.failure.is_none()); + assert_eq!(job.custom, HashMap::default()); } } From a824b5c15fa43d81e223b94cc1f8a7f0a46ffb58 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 10:24:07 +0500 Subject: [PATCH 14/33] Use constants in Job::new --- src/proto/single/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index ece60042..658a18f8 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -155,17 +155,17 @@ impl Job { { Job { jid: utils::gen_random_jid(), - queue: "default".into(), + queue: JOB_DEFAULT_QUEUE.into(), kind: kind.into(), args: args.into_iter().map(|s| s.into()).collect(), created_at: Some(Utc::now()), enqueued_at: None, at: None, - reserve_for: Some(600), - retry: Some(25), - priority: Some(5), - backtrace: Some(0), + reserve_for: Some(JOB_DEFAULT_RESERVED_FOR_SECS), + retry: Some(JOB_DEFAULT_RETRY_COUNT), + priority: Some(JOB_DEFAULT_PRIORITY), + backtrace: Some(JOB_DEFAULT_BACKTRACE), failure: None, custom: Default::default(), } From 414e73e7dcc4709efd6b00f4631120f550bf1c42 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 11:49:25 +0500 Subject: [PATCH 15/33] Align API of Job::new and JobBuilder --- src/proto/single/mod.rs | 73 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 658a18f8..8f9391b8 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -21,6 +21,22 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// A Faktory job. /// +/// To create a job, use 'Job::new' specifying 'kind' and 'args': +/// ``` +/// use faktory::Job; +/// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]); +/// ``` +/// +/// Alternatively, 'JobBuilder' a builder to construct a job: +/// ```ignore +/// use faktory::JobBuilder; +/// let result = JobBuilder::default().kind("order").args(vec!["ISBN-13:9781718501850"]).build(); +/// if result.is_err() { +/// todo!("Handle me gracefully in userland") +/// }; +/// let _job = result.unwrap(); +/// ``` +/// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] #[builder( @@ -41,6 +57,7 @@ pub struct Job { pub(crate) kind: String, /// The arguments provided for this job. + #[builder(setter(custom))] pub(crate) args: Vec, /// When this job was created. @@ -111,6 +128,15 @@ pub struct Job { } impl JobBuilder { + #[allow(dead_code)] + fn args(&mut self, args: Vec) -> &mut Self + where + A: Into, + { + self.args = Some(args.into_iter().map(|s| s.into()).collect()); + self + } + fn validate(&self) -> Result<(), String> { if let Some(ref priority) = self.priority { if *priority > Some(JOB_PRIORITY_MAX) { @@ -131,7 +157,7 @@ impl JobBuilder { } } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Failure { retry_count: usize, failed_at: String, @@ -221,7 +247,7 @@ mod test { #[test] fn test_job_build_fails_if_kind_missing() { let job = JobBuilder::default() - .args(vec![serde_json::Value::from("ISBN-13:9781718501850")]) + .args(vec!["ISBN-13:9781718501850"]) .build(); let err = job.unwrap_err(); assert_eq!( @@ -244,7 +270,7 @@ mod test { fn test_job_build_fails_if_priority_invalid() { let job = JobBuilder::default() .kind("order") - .args(vec![]) + .args(vec!["ISBN-13:9781718501850"]) .priority(JOB_PRIORITY_MAX + 1) .build(); let err = job.unwrap_err(); @@ -257,7 +283,7 @@ mod test { #[test] fn test_job_can_be_created_with_builder() { let job_kind = "order"; - let job_args = vec![serde_json::Value::from("ISBN-13:9781718501850")]; + let job_args = vec!["ISBN-13:9781718501850"]; let job = JobBuilder::default() .kind(job_kind) .args(job_args.clone()) @@ -278,4 +304,43 @@ mod test { assert!(job.failure.is_none()); assert_eq!(job.custom, HashMap::default()); } + + #[test] + fn test_method_mew_and_builder_align() { + let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]); + let job2 = JobBuilder::default() + .kind("order") + .args(vec!["ISBN-13:9781718501850"]) + .build() + .unwrap(); + + assert_eq!(job1.kind, job2.kind); + assert_eq!(job1.args, job2.args); + assert_eq!(job1.queue, job2.queue); + assert_eq!(job1.enqueued_at, job2.enqueued_at); + assert_eq!(job1.at, job2.at); + assert_eq!(job1.reserve_for, job2.reserve_for); + assert_eq!(job1.retry, job2.retry); + assert_eq!(job1.priority, job2.priority); + assert_eq!(job1.backtrace, job2.backtrace); + assert_eq!(job1.failure, job2.failure); + assert_eq!(job1.custom, job2.custom); + + assert_ne!(job1.jid, job2.jid); + assert_ne!(job1.created_at, job2.created_at); + } + + #[test] + fn test_ignored_snippet_in_docs() { + // This is a snippet which is marked `ignore` in the docs and + // which will fail since 'JobBuilder' is undeclared at doc test run + let result = JobBuilder::default() + .kind("order") + .args(vec!["ISBN-13:9781718501850"]) + .build(); + if result.is_err() { + todo!("Handle me gracefully in userland") + }; + let _job = result.unwrap(); + } } From 7081990d622725319831e0e2354a8f6551050cb2 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 12:04:01 +0500 Subject: [PATCH 16/33] Make JobBuilder publicly available --- src/lib.rs | 2 +- src/proto/mod.rs | 4 +++- src/proto/single/mod.rs | 24 +++++------------------- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 922725a4..5a83286e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,5 +73,5 @@ pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; pub use crate::error::Error; pub use crate::producer::Producer; -pub use crate::proto::Job; pub use crate::proto::Reconnect; +pub use crate::proto::{Job, JobBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ea7a4f58..d8af3c0c 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,9 @@ pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; mod single; // commands that users can issue -pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, QueueControl}; +pub use self::single::{ + Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl, +}; // responses that users can see pub use self::single::Hi; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 8f9391b8..e04db043 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -28,7 +28,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// ``` /// /// Alternatively, 'JobBuilder' a builder to construct a job: -/// ```ignore +/// ``` /// use faktory::JobBuilder; /// let result = JobBuilder::default().kind("order").args(vec!["ISBN-13:9781718501850"]).build(); /// if result.is_err() { @@ -128,8 +128,8 @@ pub struct Job { } impl JobBuilder { - #[allow(dead_code)] - fn args(&mut self, args: Vec) -> &mut Self + /// Setter for the arguments provided for this job. + pub fn args(&mut self, args: Vec) -> &mut Self where A: Into, { @@ -146,8 +146,8 @@ impl JobBuilder { Ok(()) } - #[allow(dead_code)] - fn build(&self) -> Result { + /// Builds a new job (a.k.a. bulder's finalizer) + pub fn build(&self) -> Result { let job = self .try_build() .map_err(|err| error::Client::MalformedJob { @@ -329,18 +329,4 @@ mod test { assert_ne!(job1.jid, job2.jid); assert_ne!(job1.created_at, job2.created_at); } - - #[test] - fn test_ignored_snippet_in_docs() { - // This is a snippet which is marked `ignore` in the docs and - // which will fail since 'JobBuilder' is undeclared at doc test run - let result = JobBuilder::default() - .kind("order") - .args(vec!["ISBN-13:9781718501850"]) - .build(); - if result.is_err() { - todo!("Handle me gracefully in userland") - }; - let _job = result.unwrap(); - } } From ac361ae84c6aad05cdbb2a81bff254f4669ad743 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 12:17:03 +0500 Subject: [PATCH 17/33] Add demo usage of 'JobBuilder' to loadtest --- src/bin/loadtest.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index 22ec82e8..c847671b 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -70,12 +70,26 @@ fn main() { for idx in 0..jobs { if idx % 2 == 0 { // push - let mut job = Job::new( - "SomeJob", - vec![serde_json::Value::from(1), "string".into(), 3.into()], - ); - job.priority = Some(rng.gen_range(1..10)); - job.queue = QUEUES.choose(&mut rng).unwrap().to_string(); + let job = match idx % 4 { + // Solely to demo how 'JobBuilder' works in userland + 0 => JobBuilder::default() + .kind("SomeJob") + .args(vec![serde_json::Value::from(1), "string".into(), 3.into()]) + .priority(Some(rng.gen_range(1..10))) + .queue(QUEUES.choose(&mut rng).unwrap().to_string()) + .build() + .unwrap(), + _ => { + let mut job = Job::new( + "SomeJob", + vec![serde_json::Value::from(1), "string".into(), 3.into()], + ); + job.priority = Some(rng.gen_range(1..10)); + job.queue = QUEUES.choose(&mut rng).unwrap().to_string(); + job + } + }; + p.enqueue(job)?; if pushed.fetch_add(1, atomic::Ordering::SeqCst) >= jobs { return Ok(idx); From 6700c114e3c913af0cb6a7a92dd52e5fc66d0a37 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 5 Dec 2023 12:46:16 +0500 Subject: [PATCH 18/33] Do clean up before PR submitting --- src/error.rs | 6 +++--- src/proto/single/mod.rs | 13 +++++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/error.rs b/src/error.rs index 9d762c83..fcc59451 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,8 +13,8 @@ //! with the faktory server. Typically, [`Protocol`] errors are the result //! of the server sending a response this client did not expect. //! -//! [`Client`] describes errors that occur even before communication with the server, e.g. -//! errors when building a 'Job'. +//! [`Client`] describes errors that occur even before submitting a job to the server, e.g. +//! errors when building a 'Job' (missing required fields, invalid values). use thiserror::Error; @@ -27,7 +27,7 @@ pub enum Error { Connect(#[from] Connect), /// Client-side errors. /// - /// These are errors arising even before communicating to server, e.g. malformed job. + /// These are errors arising even before submitting a job to the server, e.g. malformed job. #[error("client")] Client(#[from] Client), /// Underlying I/O layer errors. diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index e04db043..a41b2ac8 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -24,15 +24,20 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// To create a job, use 'Job::new' specifying 'kind' and 'args': /// ``` /// use faktory::Job; +/// /// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]); /// ``` /// -/// Alternatively, 'JobBuilder' a builder to construct a job: +/// Alternatively, use 'JobBuilder' to construct a job: /// ``` /// use faktory::JobBuilder; -/// let result = JobBuilder::default().kind("order").args(vec!["ISBN-13:9781718501850"]).build(); +/// +/// let result = JobBuilder::default() +/// .kind("order") +/// .args(vec!["ISBN-13:9781718501850"]) +/// .build(); /// if result.is_err() { -/// todo!("Handle me gracefully in userland") +/// todo!("Handle me gracefully, please.") /// }; /// let _job = result.unwrap(); /// ``` @@ -146,7 +151,7 @@ impl JobBuilder { Ok(()) } - /// Builds a new job (a.k.a. bulder's finalizer) + /// Builds a new job pub fn build(&self) -> Result { let job = self .try_build() From ecab0ee7435b92e72dd61ec50b29bf6d872b77b4 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Thu, 7 Dec 2023 22:34:02 +0500 Subject: [PATCH 19/33] Make 'try_build' private --- src/proto/single/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index a41b2ac8..27d07412 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -46,7 +46,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; #[derive(Serialize, Deserialize, Debug, Builder)] #[builder( setter(into), - build_fn(name = "try_build", validate = "Self::validate") + build_fn(name = "try_build", private, validate = "Self::validate") )] pub struct Job { /// The job's unique identifier. From b2938c3b3c980a57a1378a9ae07d1df908b2a312 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 8 Dec 2023 09:26:27 +0500 Subject: [PATCH 20/33] Opt out of setter for 'failure' on JobBuilder --- src/proto/single/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 27d07412..af1612b8 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -116,7 +116,7 @@ pub struct Job { /// /// This field is read-only. #[serde(skip_serializing)] - #[builder(default = "None")] + #[builder(setter(skip))] failure: Option, /// Extra context to include with the job. From 903ab252af228c29e528e36aea5796535d1aae03 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 12 Dec 2023 23:14:13 +0500 Subject: [PATCH 21/33] Skip 'enqueued_at' setter on 'JobBuilder' --- src/proto/single/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index af1612b8..7b6a6ffc 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -74,7 +74,7 @@ pub struct Job { /// When this job was supplied to the Faktory server. #[serde(skip_serializing_if = "Option::is_none")] - #[builder(default = "None")] + #[builder(setter(skip))] pub enqueued_at: Option>, /// When this job is scheduled for. From 7b48efe37e9d47f462487b735ddc4090c5555d0b Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 13 Dec 2023 10:43:09 +0500 Subject: [PATCH 22/33] Update 'build' signature to return 'Error' --- src/proto/single/mod.rs | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 7b6a6ffc..7aa26aea 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -152,7 +152,7 @@ impl JobBuilder { } /// Builds a new job - pub fn build(&self) -> Result { + pub fn build(&self) -> Result { let job = self .try_build() .map_err(|err| error::Client::MalformedJob { @@ -254,21 +254,27 @@ mod test { let job = JobBuilder::default() .args(vec!["ISBN-13:9781718501850"]) .build(); - let err = job.unwrap_err(); - assert_eq!( - err.to_string(), - "job is malformed: `kind` must be initialized" - ) + if let Error::Client(e) = job.unwrap_err() { + assert_eq!( + e.to_string(), + "job is malformed: `kind` must be initialized" + ) + } else { + unreachable!() + } } #[test] fn test_job_build_fails_if_args_missing() { let job = JobBuilder::default().kind("order").build(); - let err = job.unwrap_err(); - assert_eq!( - err.to_string(), - "job is malformed: `args` must be initialized" - ); + if let Error::Client(e) = job.unwrap_err() { + assert_eq!( + e.to_string(), + "job is malformed: `args` must be initialized" + ); + } else { + unreachable!(); + } } #[test] @@ -278,11 +284,14 @@ mod test { .args(vec!["ISBN-13:9781718501850"]) .priority(JOB_PRIORITY_MAX + 1) .build(); - let err = job.unwrap_err(); - assert_eq!( - err.to_string(), - "job is malformed: `priority` must be in the range from 0 to 9 inclusive" - ); + if let Error::Client(e) = job.unwrap_err() { + assert_eq!( + e.to_string(), + "job is malformed: `priority` must be in the range from 0 to 9 inclusive" + ); + } else { + unreachable!() + } } #[test] From c7e329b76350b028d98ea839fa25debda4059384 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:04:14 +0500 Subject: [PATCH 23/33] Checkout loadtest bin to main --- src/bin/loadtest.rs | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/src/bin/loadtest.rs b/src/bin/loadtest.rs index c847671b..22ec82e8 100644 --- a/src/bin/loadtest.rs +++ b/src/bin/loadtest.rs @@ -70,26 +70,12 @@ fn main() { for idx in 0..jobs { if idx % 2 == 0 { // push - let job = match idx % 4 { - // Solely to demo how 'JobBuilder' works in userland - 0 => JobBuilder::default() - .kind("SomeJob") - .args(vec![serde_json::Value::from(1), "string".into(), 3.into()]) - .priority(Some(rng.gen_range(1..10))) - .queue(QUEUES.choose(&mut rng).unwrap().to_string()) - .build() - .unwrap(), - _ => { - let mut job = Job::new( - "SomeJob", - vec![serde_json::Value::from(1), "string".into(), 3.into()], - ); - job.priority = Some(rng.gen_range(1..10)); - job.queue = QUEUES.choose(&mut rng).unwrap().to_string(); - job - } - }; - + let mut job = Job::new( + "SomeJob", + vec![serde_json::Value::from(1), "string".into(), 3.into()], + ); + job.priority = Some(rng.gen_range(1..10)); + job.queue = QUEUES.choose(&mut rng).unwrap().to_string(); p.enqueue(job)?; if pushed.fetch_add(1, atomic::Ordering::SeqCst) >= jobs { return Ok(idx); From c92856cd1a5fe9734fb3daba780511e298f11452 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:06:03 +0500 Subject: [PATCH 24/33] Fmt enum variants in 'error' mod --- src/error.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/error.rs b/src/error.rs index fcc59451..069bb433 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,11 +25,13 @@ pub enum Error { /// The connection to the server, or one of its prerequisites, failed. #[error("connection")] Connect(#[from] Connect), + /// Client-side errors. /// /// These are errors arising even before submitting a job to the server, e.g. malformed job. #[error("client")] Client(#[from] Client), + /// Underlying I/O layer errors. /// /// These are overwhelmingly network communication errors on the socket connection to the server. From 5cfa0fbac04259e777265c2d0cb2ab048f3d59a2 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:08:33 +0500 Subject: [PATCH 25/33] Remove PartialEq derive from 'Failure' --- src/proto/single/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 7aa26aea..11f74527 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -162,7 +162,7 @@ impl JobBuilder { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Failure { retry_count: usize, failed_at: String, @@ -337,7 +337,6 @@ mod test { assert_eq!(job1.retry, job2.retry); assert_eq!(job1.priority, job2.priority); assert_eq!(job1.backtrace, job2.backtrace); - assert_eq!(job1.failure, job2.failure); assert_eq!(job1.custom, job2.custom); assert_ne!(job1.jid, job2.jid); From ba49903070c2ddaf39de9fe248f11bc48ec3845a Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:12:47 +0500 Subject: [PATCH 26/33] Do not validate priority --- src/proto/single/mod.rs | 32 +------------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 11f74527..8c9abac1 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -15,7 +15,6 @@ pub use self::resp::*; const JOB_DEFAULT_QUEUE: &str = "default"; const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; const JOB_DEFAULT_RETRY_COUNT: usize = 25; -const JOB_PRIORITY_MAX: u8 = 9; const JOB_DEFAULT_PRIORITY: u8 = 5; const JOB_DEFAULT_BACKTRACE: usize = 0; @@ -44,10 +43,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] -#[builder( - setter(into), - build_fn(name = "try_build", private, validate = "Self::validate") -)] +#[builder(setter(into), build_fn(name = "try_build", private))] pub struct Job { /// The job's unique identifier. #[builder(default = "utils::gen_random_jid()")] @@ -142,15 +138,6 @@ impl JobBuilder { self } - fn validate(&self) -> Result<(), String> { - if let Some(ref priority) = self.priority { - if *priority > Some(JOB_PRIORITY_MAX) { - return Err("`priority` must be in the range from 0 to 9 inclusive".to_string()); - } - } - Ok(()) - } - /// Builds a new job pub fn build(&self) -> Result { let job = self @@ -277,23 +264,6 @@ mod test { } } - #[test] - fn test_job_build_fails_if_priority_invalid() { - let job = JobBuilder::default() - .kind("order") - .args(vec!["ISBN-13:9781718501850"]) - .priority(JOB_PRIORITY_MAX + 1) - .build(); - if let Error::Client(e) = job.unwrap_err() { - assert_eq!( - e.to_string(), - "job is malformed: `priority` must be in the range from 0 to 9 inclusive" - ); - } else { - unreachable!() - } - } - #[test] fn test_job_can_be_created_with_builder() { let job_kind = "order"; From f58e346807db224aa7ddf127cd947cf4c17d2128 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:41:33 +0500 Subject: [PATCH 27/33] Add custom constructor instead of 'default' impl --- src/proto/single/mod.rs | 49 ++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 8c9abac1..909c60cf 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -31,19 +31,20 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// ``` /// use faktory::JobBuilder; /// -/// let result = JobBuilder::default() -/// .kind("order") +/// let _job = JobBuilder::new("order") /// .args(vec!["ISBN-13:9781718501850"]) -/// .build(); -/// if result.is_err() { -/// todo!("Handle me gracefully, please.") -/// }; -/// let _job = result.unwrap(); +/// .build()?; +/// +/// # Ok(()) /// ``` /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] -#[builder(setter(into), build_fn(name = "try_build", private))] +#[builder( + custom_constructor, + setter(into), + build_fn(name = "try_build", private) +)] pub struct Job { /// The job's unique identifier. #[builder(default = "utils::gen_random_jid()")] @@ -55,6 +56,7 @@ pub struct Job { /// The job's type. Called `kind` because `type` is reserved. #[serde(rename = "jobtype")] + #[builder(setter(custom))] pub(crate) kind: String, /// The arguments provided for this job. @@ -129,6 +131,14 @@ pub struct Job { } impl JobBuilder { + /// docs + pub fn new(kind: impl Into) -> JobBuilder { + JobBuilder { + kind: Some(kind.into()), + ..JobBuilder::create_empty() + } + } + /// Setter for the arguments provided for this job. pub fn args(&mut self, args: Vec) -> &mut Self where @@ -236,24 +246,9 @@ pub fn write_command_and_await_ok( mod test { use super::*; - #[test] - fn test_job_build_fails_if_kind_missing() { - let job = JobBuilder::default() - .args(vec!["ISBN-13:9781718501850"]) - .build(); - if let Error::Client(e) = job.unwrap_err() { - assert_eq!( - e.to_string(), - "job is malformed: `kind` must be initialized" - ) - } else { - unreachable!() - } - } - #[test] fn test_job_build_fails_if_args_missing() { - let job = JobBuilder::default().kind("order").build(); + let job = JobBuilder::new("order").build(); if let Error::Client(e) = job.unwrap_err() { assert_eq!( e.to_string(), @@ -268,8 +263,7 @@ mod test { fn test_job_can_be_created_with_builder() { let job_kind = "order"; let job_args = vec!["ISBN-13:9781718501850"]; - let job = JobBuilder::default() - .kind(job_kind) + let job = JobBuilder::new(job_kind) .args(job_args.clone()) .build() .unwrap(); @@ -292,8 +286,7 @@ mod test { #[test] fn test_method_mew_and_builder_align() { let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]); - let job2 = JobBuilder::default() - .kind("order") + let job2 = JobBuilder::new("order") .args(vec!["ISBN-13:9781718501850"]) .build() .unwrap(); From 3325dcbf4c8f67d3f3bb33f0222a7ce720be9a3f Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 10:51:21 +0500 Subject: [PATCH 28/33] Make 'JobBuilder' infallible, rm 'Client' from Error enum --- src/error.rs | 18 ------------------ src/proto/single/mod.rs | 41 ++++++++++------------------------------- 2 files changed, 10 insertions(+), 49 deletions(-) diff --git a/src/error.rs b/src/error.rs index 069bb433..f6a19574 100644 --- a/src/error.rs +++ b/src/error.rs @@ -26,12 +26,6 @@ pub enum Error { #[error("connection")] Connect(#[from] Connect), - /// Client-side errors. - /// - /// These are errors arising even before submitting a job to the server, e.g. malformed job. - #[error("client")] - Client(#[from] Client), - /// Underlying I/O layer errors. /// /// These are overwhelmingly network communication errors on the socket connection to the server. @@ -91,18 +85,6 @@ pub enum Connect { ParseUrl(#[source] url::ParseError), } -/// Errors happening client side -#[derive(Debug, Error)] -#[non_exhaustive] -pub enum Client { - /// The 'Job' is malformed. - #[error("job is malformed: {desc}")] - MalformedJob { - /// Details on what is missing or incorrect about the 'Job' - desc: String, - }, -} - /// The set of observable application-level errors when interacting with a Faktory server. #[derive(Debug, Error)] #[non_exhaustive] diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 909c60cf..eea52122 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -7,7 +7,7 @@ mod cmd; mod resp; mod utils; -use crate::error::{self, Error}; +use crate::error::Error; pub use self::cmd::*; pub use self::resp::*; @@ -33,9 +33,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// /// let _job = JobBuilder::new("order") /// .args(vec!["ISBN-13:9781718501850"]) -/// .build()?; -/// -/// # Ok(()) +/// .build(); /// ``` /// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). @@ -60,7 +58,7 @@ pub struct Job { pub(crate) kind: String, /// The arguments provided for this job. - #[builder(setter(custom))] + #[builder(setter(custom), default = "Vec::new()")] pub(crate) args: Vec, /// When this job was created. @@ -149,13 +147,8 @@ impl JobBuilder { } /// Builds a new job - pub fn build(&self) -> Result { - let job = self - .try_build() - .map_err(|err| error::Client::MalformedJob { - desc: err.to_string(), - })?; - Ok(job) + pub fn build(&self) -> Job { + self.try_build().expect("All required fields have been set") } } @@ -246,33 +239,20 @@ pub fn write_command_and_await_ok( mod test { use super::*; - #[test] - fn test_job_build_fails_if_args_missing() { - let job = JobBuilder::new("order").build(); - if let Error::Client(e) = job.unwrap_err() { - assert_eq!( - e.to_string(), - "job is malformed: `args` must be initialized" - ); - } else { - unreachable!(); - } - } - #[test] fn test_job_can_be_created_with_builder() { let job_kind = "order"; let job_args = vec!["ISBN-13:9781718501850"]; - let job = JobBuilder::new(job_kind) - .args(job_args.clone()) - .build() - .unwrap(); + let job = JobBuilder::new(job_kind).args(job_args.clone()).build(); assert!(job.jid != "".to_owned()); assert!(job.queue == JOB_DEFAULT_QUEUE.to_string()); assert_eq!(job.kind, job_kind); assert_eq!(job.args, job_args); + + assert!(job.created_at.is_some()); assert!(job.created_at < Some(Utc::now())); + assert!(job.enqueued_at.is_none()); assert!(job.at.is_none()); assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS)); @@ -288,8 +268,7 @@ mod test { let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]); let job2 = JobBuilder::new("order") .args(vec!["ISBN-13:9781718501850"]) - .build() - .unwrap(); + .build(); assert_eq!(job1.kind, job2.kind); assert_eq!(job1.args, job2.args); From d4070e57fe4a7e7866bffa392d0263fbe566e90e Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 12:02:03 +0500 Subject: [PATCH 29/33] Add example usage of JobBuidler::new and Job::builder --- src/proto/single/mod.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index eea52122..acbb5ab3 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -36,6 +36,22 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// .build(); /// ``` /// +/// Equivalently: +/// ``` +/// use faktory::Job; +/// +/// let _job = Job::builder("order") +/// .args(vec!["ISBN-13:9781718501850"]) +/// .build(); +/// ``` +/// +/// In case no arguments are expected 'on the other side', you can simply go with: +/// ``` +/// use faktory::Job; +/// +/// let _job = Job::builder("rebuild_index").build(); +/// ``` +/// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). #[derive(Serialize, Deserialize, Debug, Builder)] #[builder( @@ -129,7 +145,7 @@ pub struct Job { } impl JobBuilder { - /// docs + /// Create a new instance of 'JobBuilder' pub fn new(kind: impl Into) -> JobBuilder { JobBuilder { kind: Some(kind.into()), @@ -146,9 +162,10 @@ impl JobBuilder { self } - /// Builds a new job + /// Builds a new 'Job' pub fn build(&self) -> Job { - self.try_build().expect("All required fields have been set") + self.try_build() + .expect("All required fields have been set.") } } @@ -192,6 +209,11 @@ impl Job { } } + /// Craete an instance of JobBuilder with 'kind' already set. + pub fn builder(kind: impl Into) -> JobBuilder { + JobBuilder::new(kind) + } + /// Place this job on the given `queue`. /// /// If this method is not called (or `self.queue` set otherwise), the queue will be set to From 0482aad5c959c37c9bc498a5d76a8df0483c1a52 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 12:24:12 +0500 Subject: [PATCH 30/33] Use JobBuilder::new in Job::new --- src/proto/single/mod.rs | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index acbb5ab3..58bfe35b 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -48,7 +48,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// In case no arguments are expected 'on the other side', you can simply go with: /// ``` /// use faktory::Job; -/// +/// /// let _job = Job::builder("rebuild_index").build(); /// ``` /// @@ -191,26 +191,13 @@ impl Job { S: Into, A: Into, { - Job { - jid: utils::gen_random_jid(), - queue: JOB_DEFAULT_QUEUE.into(), - kind: kind.into(), - args: args.into_iter().map(|s| s.into()).collect(), - - created_at: Some(Utc::now()), - enqueued_at: None, - at: None, - reserve_for: Some(JOB_DEFAULT_RESERVED_FOR_SECS), - retry: Some(JOB_DEFAULT_RETRY_COUNT), - priority: Some(JOB_DEFAULT_PRIORITY), - backtrace: Some(JOB_DEFAULT_BACKTRACE), - failure: None, - custom: Default::default(), - } + JobBuilder::new(kind).args(args).build() } - /// Craete an instance of JobBuilder with 'kind' already set. - pub fn builder(kind: impl Into) -> JobBuilder { + /// Create an instance of JobBuilder with 'kind' already set. + /// + /// Equivalent to 'JobBuilder::new' + pub fn builder>(kind: S) -> JobBuilder { JobBuilder::new(kind) } @@ -283,15 +270,17 @@ mod test { assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE)); assert!(job.failure.is_none()); assert_eq!(job.custom, HashMap::default()); + + let job = JobBuilder::new(job_kind).build(); + assert!(job.args.is_empty()); } #[test] - fn test_method_mew_and_builder_align() { + fn test_all_job_creation_variants_align() { let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]); let job2 = JobBuilder::new("order") .args(vec!["ISBN-13:9781718501850"]) .build(); - assert_eq!(job1.kind, job2.kind); assert_eq!(job1.args, job2.args); assert_eq!(job1.queue, job2.queue); @@ -305,5 +294,14 @@ mod test { assert_ne!(job1.jid, job2.jid); assert_ne!(job1.created_at, job2.created_at); + + let job3 = Job::builder("order") + .args(vec!["ISBN-13:9781718501850"]) + .build(); + assert_eq!(job2.kind, job3.kind); + assert_eq!(job1.args, job2.args); + + assert_ne!(job2.jid, job3.jid); + assert_ne!(job2.created_at, job3.created_at); } } From 053dad459581da50b487cd22a310cae9900cafeb Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 12:25:15 +0500 Subject: [PATCH 31/33] Checkout src/error to main --- src/error.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/error.rs b/src/error.rs index f6a19574..6c3d4dd9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,9 +12,6 @@ //! [`Protocol`] describes lower-level errors relating to communication //! with the faktory server. Typically, [`Protocol`] errors are the result //! of the server sending a response this client did not expect. -//! -//! [`Client`] describes errors that occur even before submitting a job to the server, e.g. -//! errors when building a 'Job' (missing required fields, invalid values). use thiserror::Error; From 26ced57fa8ee3536e983b6db3649681ae1f28876 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Mon, 18 Dec 2023 13:00:01 +0500 Subject: [PATCH 32/33] Add test to tests/real using all job creation variants --- tests/real.rs | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/real.rs b/tests/real.rs index d00f6d0e..fd9642f8 100644 --- a/tests/real.rs +++ b/tests/real.rs @@ -163,3 +163,56 @@ fn queue() { let worker_executed = rx.try_recv().is_ok(); assert!(worker_executed); } + +#[test] +fn test_jobs_created_with_builder() { + skip_check!(); + + // prepare a producer ("client" in Faktory terms) and consumer ("worker"): + let mut producer = Producer::connect(None).unwrap(); + let mut consumer = ConsumerBuilder::default(); + consumer.register("rebuild_index", move |job| -> io::Result<_> { + assert!(job.args().is_empty()); + Ok(eprintln!("{:?}", job)) + }); + consumer.register("register_order", move |job| -> io::Result<_> { + assert!(job.args().len() != 0); + Ok(eprintln!("{:?}", job)) + }); + + let mut consumer = consumer.connect(None).unwrap(); + + // prepare some jobs with JobBuilder: + let job1 = JobBuilder::new("rebuild_index") + .queue("test_jobs_created_with_builder_0") + .build(); + + let job2 = Job::builder("register_order") + .args(vec!["ISBN-13:9781718501850"]) + .queue("test_jobs_created_with_builder_1") + .build(); + + let mut job3 = Job::new("register_order", vec!["ISBN-13:9781718501850"]); + job3.queue = "test_jobs_created_with_builder_1".to_string(); + + // enqueue ... + producer.enqueue(job1).unwrap(); + producer.enqueue(job2).unwrap(); + producer.enqueue(job3).unwrap(); + + // ... and execute: + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_0"]) + .unwrap(); + assert!(had_job); + + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_1"]) + .unwrap(); + assert!(had_job); + + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_1"]) + .unwrap(); + assert!(had_job); +} From 9d5dc1697ec47534376ce076ddf7581085f62e89 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Wed, 3 Jan 2024 13:01:49 +0100 Subject: [PATCH 33/33] Apply suggestions from code review --- src/proto/single/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 58bfe35b..349acf67 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -27,7 +27,7 @@ const JOB_DEFAULT_BACKTRACE: usize = 0; /// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]); /// ``` /// -/// Alternatively, use 'JobBuilder' to construct a job: +/// Alternatively, use [`JobBuilder`] to configure more aspects of a job: /// ``` /// use faktory::JobBuilder; /// @@ -145,7 +145,7 @@ pub struct Job { } impl JobBuilder { - /// Create a new instance of 'JobBuilder' + /// Create a new builder for a [`Job`] pub fn new(kind: impl Into) -> JobBuilder { JobBuilder { kind: Some(kind.into()), @@ -162,7 +162,7 @@ impl JobBuilder { self } - /// Builds a new 'Job' + /// Builds a new [`Job`] from the parameters of this builder. pub fn build(&self) -> Job { self.try_build() .expect("All required fields have been set.") @@ -194,9 +194,9 @@ impl Job { JobBuilder::new(kind).args(args).build() } - /// Create an instance of JobBuilder with 'kind' already set. + /// Creates an ergonomic constructor for a new [`Job`]. /// - /// Equivalent to 'JobBuilder::new' + /// Also equivalent to [`JobBuilder::new`]. pub fn builder>(kind: S) -> JobBuilder { JobBuilder::new(kind) }