diff --git a/Cargo.lock b/Cargo.lock index 4e9d6667..6663ff8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,72 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "derive_builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.5" @@ -163,6 +229,7 @@ dependencies = [ "bufstream", "chrono", "clap", + "derive_builder", "fnv", "hostname", "libc", @@ -277,6 +344,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 1372241c..d2f84dae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ fnv = "1.0.5" native-tls = { version = "0.2", optional = true } clap = { version = "3.1.0", optional = true } thiserror = "1.0.30" +derive_builder = "0.12.0" [dev-dependencies] mockstream = "0.0.3" diff --git a/src/lib.rs b/src/lib.rs index 922725a4..5a83286e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,5 +73,5 @@ pub use tls::TlsStream; pub use crate::consumer::{Consumer, ConsumerBuilder}; pub use crate::error::Error; pub use crate::producer::Producer; -pub use crate::proto::Job; pub use crate::proto::Reconnect; +pub use crate::proto::{Job, JobBuilder}; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ea7a4f58..d8af3c0c 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,9 @@ pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; mod single; // commands that users can issue -pub use self::single::{Ack, Fail, Heartbeat, Info, Job, Push, QueueAction, QueueControl}; +pub use self::single::{ + Ack, Fail, Heartbeat, Info, Job, JobBuilder, Push, QueueAction, QueueControl, +}; // responses that users can see pub use self::single::Hi; diff --git a/src/proto/single/mod.rs b/src/proto/single/mod.rs index 9eca8d8e..349acf67 100644 --- a/src/proto/single/mod.rs +++ b/src/proto/single/mod.rs @@ -1,77 +1,134 @@ use chrono::{DateTime, Utc}; +use derive_builder::Builder; use std::collections::HashMap; use std::io::prelude::*; mod cmd; mod resp; +mod utils; use crate::error::Error; pub use self::cmd::*; pub use self::resp::*; +const JOB_DEFAULT_QUEUE: &str = "default"; +const JOB_DEFAULT_RESERVED_FOR_SECS: usize = 600; +const JOB_DEFAULT_RETRY_COUNT: usize = 25; +const JOB_DEFAULT_PRIORITY: u8 = 5; +const JOB_DEFAULT_BACKTRACE: usize = 0; + /// A Faktory job. /// +/// To create a job, use 'Job::new' specifying 'kind' and 'args': +/// ``` +/// use faktory::Job; +/// +/// let _job = Job::new("order", vec!["ISBN-13:9781718501850"]); +/// ``` +/// +/// Alternatively, use [`JobBuilder`] to configure more aspects of a job: +/// ``` +/// use faktory::JobBuilder; +/// +/// let _job = JobBuilder::new("order") +/// .args(vec!["ISBN-13:9781718501850"]) +/// .build(); +/// ``` +/// +/// Equivalently: +/// ``` +/// use faktory::Job; +/// +/// let _job = Job::builder("order") +/// .args(vec!["ISBN-13:9781718501850"]) +/// .build(); +/// ``` +/// +/// In case no arguments are expected 'on the other side', you can simply go with: +/// ``` +/// use faktory::Job; +/// +/// let _job = Job::builder("rebuild_index").build(); +/// ``` +/// /// See also the [Faktory wiki](https://github.com/contribsys/faktory/wiki/The-Job-Payload). -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Builder)] +#[builder( + custom_constructor, + setter(into), + build_fn(name = "try_build", private) +)] pub struct Job { /// The job's unique identifier. + #[builder(default = "utils::gen_random_jid()")] pub(crate) jid: String, /// The queue this job belongs to. Usually `default`. + #[builder(default = "JOB_DEFAULT_QUEUE.into()")] pub queue: String, /// The job's type. Called `kind` because `type` is reserved. #[serde(rename = "jobtype")] + #[builder(setter(custom))] pub(crate) kind: String, /// The arguments provided for this job. + #[builder(setter(custom), default = "Vec::new()")] pub(crate) args: Vec, /// When this job was created. // note that serializing works correctly here since the default chrono serialization // is RFC3339, which is also what Faktory expects. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(Utc::now())")] pub created_at: Option>, /// When this job was supplied to the Faktory server. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(setter(skip))] pub enqueued_at: Option>, /// When this job is scheduled for. /// /// Defaults to immediately. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "None")] pub at: Option>, /// How long to allow this job to run for. /// /// Defaults to 600 seconds. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(JOB_DEFAULT_RESERVED_FOR_SECS)")] pub reserve_for: Option, /// Number of times to retry this job. /// /// Defaults to 25. #[serde(skip_serializing_if = "Option::is_none")] - pub retry: Option, + #[builder(default = "Some(JOB_DEFAULT_RETRY_COUNT)")] + pub retry: Option, /// The priority of this job from 1-9 (9 is highest). /// /// Pushing a job with priority 9 will effectively put it at the front of the queue. /// Defaults to 5. + #[builder(default = "Some(JOB_DEFAULT_PRIORITY)")] pub priority: Option, /// Number of lines of backtrace to keep if this job fails. /// /// Defaults to 0. #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default = "Some(JOB_DEFAULT_BACKTRACE)")] pub backtrace: Option, /// Data about this job's most recent failure. /// /// This field is read-only. #[serde(skip_serializing)] + #[builder(setter(skip))] failure: Option, /// Extra context to include with the job. @@ -83,10 +140,36 @@ pub struct Job { /// across a complex distributed system, etc. #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default = "HashMap::default")] + #[builder(default = "HashMap::default()")] pub custom: HashMap, } -#[derive(Serialize, Deserialize, Debug)] +impl JobBuilder { + /// Create a new builder for a [`Job`] + pub fn new(kind: impl Into) -> JobBuilder { + JobBuilder { + kind: Some(kind.into()), + ..JobBuilder::create_empty() + } + } + + /// Setter for the arguments provided for this job. + pub fn args(&mut self, args: Vec) -> &mut Self + where + A: Into, + { + self.args = Some(args.into_iter().map(|s| s.into()).collect()); + self + } + + /// Builds a new [`Job`] from the parameters of this builder. + pub fn build(&self) -> Job { + self.try_build() + .expect("All required fields have been set.") + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Failure { retry_count: usize, failed_at: String, @@ -108,29 +191,14 @@ impl Job { S: Into, A: Into, { - use rand::{thread_rng, Rng}; - let random_jid = thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .map(char::from) - .take(16) - .collect(); - use chrono::prelude::*; - Job { - jid: random_jid, - queue: "default".into(), - kind: kind.into(), - args: args.into_iter().map(|s| s.into()).collect(), - - created_at: Some(Utc::now()), - enqueued_at: None, - at: None, - reserve_for: Some(600), - retry: Some(25), - priority: Some(5), - backtrace: Some(0), - failure: None, - custom: Default::default(), - } + JobBuilder::new(kind).args(args).build() + } + + /// Creates an ergonomic constructor for a new [`Job`]. + /// + /// Also equivalent to [`JobBuilder::new`]. + pub fn builder>(kind: S) -> JobBuilder { + JobBuilder::new(kind) } /// Place this job on the given `queue`. @@ -175,3 +243,65 @@ pub fn write_command_and_await_ok( write_command(x, command)?; read_ok(x) } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_job_can_be_created_with_builder() { + let job_kind = "order"; + let job_args = vec!["ISBN-13:9781718501850"]; + let job = JobBuilder::new(job_kind).args(job_args.clone()).build(); + + assert!(job.jid != "".to_owned()); + assert!(job.queue == JOB_DEFAULT_QUEUE.to_string()); + assert_eq!(job.kind, job_kind); + assert_eq!(job.args, job_args); + + assert!(job.created_at.is_some()); + assert!(job.created_at < Some(Utc::now())); + + assert!(job.enqueued_at.is_none()); + assert!(job.at.is_none()); + assert_eq!(job.reserve_for, Some(JOB_DEFAULT_RESERVED_FOR_SECS)); + assert_eq!(job.retry, Some(JOB_DEFAULT_RETRY_COUNT)); + assert_eq!(job.priority, Some(JOB_DEFAULT_PRIORITY)); + assert_eq!(job.backtrace, Some(JOB_DEFAULT_BACKTRACE)); + assert!(job.failure.is_none()); + assert_eq!(job.custom, HashMap::default()); + + let job = JobBuilder::new(job_kind).build(); + assert!(job.args.is_empty()); + } + + #[test] + fn test_all_job_creation_variants_align() { + let job1 = Job::new("order", vec!["ISBN-13:9781718501850"]); + let job2 = JobBuilder::new("order") + .args(vec!["ISBN-13:9781718501850"]) + .build(); + assert_eq!(job1.kind, job2.kind); + assert_eq!(job1.args, job2.args); + assert_eq!(job1.queue, job2.queue); + assert_eq!(job1.enqueued_at, job2.enqueued_at); + assert_eq!(job1.at, job2.at); + assert_eq!(job1.reserve_for, job2.reserve_for); + assert_eq!(job1.retry, job2.retry); + assert_eq!(job1.priority, job2.priority); + assert_eq!(job1.backtrace, job2.backtrace); + assert_eq!(job1.custom, job2.custom); + + assert_ne!(job1.jid, job2.jid); + assert_ne!(job1.created_at, job2.created_at); + + let job3 = Job::builder("order") + .args(vec!["ISBN-13:9781718501850"]) + .build(); + assert_eq!(job2.kind, job3.kind); + assert_eq!(job1.args, job2.args); + + assert_ne!(job2.jid, job3.jid); + assert_ne!(job2.created_at, job3.created_at); + } +} diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs new file mode 100644 index 00000000..b6f068dc --- /dev/null +++ b/src/proto/single/utils.rs @@ -0,0 +1,45 @@ +use rand::{thread_rng, Rng}; + +const JOB_ID_LENGTH: usize = 16; + +pub fn gen_random_jid() -> String { + thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .map(char::from) + .take(JOB_ID_LENGTH) + .collect() +} + +#[cfg(test)] +mod test { + use std::collections::HashSet; + + use super::*; + + #[test] + fn test_jid_of_known_size_generated() { + let jid1 = gen_random_jid(); + let jid2 = gen_random_jid(); + assert_ne!(jid1, jid2); + println!("{}", jid1); + assert_eq!(jid1.len(), JOB_ID_LENGTH); + assert_eq!(jid2.len(), JOB_ID_LENGTH); + } + + #[test] + fn test_jids_are_unique() { + let mut ids = HashSet::new(); + + ids.insert("IYKOxEfLcwcgKaRa".to_string()); + ids.insert("IYKOxEfLcwcgKaRa".to_string()); + assert_ne!(ids.len(), 2); + + ids.clear(); + + for _ in 0..1_000_000 { + let jid = gen_random_jid(); + ids.insert(jid); + } + assert_eq!(ids.len(), 1_000_000); + } +} diff --git a/tests/real.rs b/tests/real.rs index d00f6d0e..fd9642f8 100644 --- a/tests/real.rs +++ b/tests/real.rs @@ -163,3 +163,56 @@ fn queue() { let worker_executed = rx.try_recv().is_ok(); assert!(worker_executed); } + +#[test] +fn test_jobs_created_with_builder() { + skip_check!(); + + // prepare a producer ("client" in Faktory terms) and consumer ("worker"): + let mut producer = Producer::connect(None).unwrap(); + let mut consumer = ConsumerBuilder::default(); + consumer.register("rebuild_index", move |job| -> io::Result<_> { + assert!(job.args().is_empty()); + Ok(eprintln!("{:?}", job)) + }); + consumer.register("register_order", move |job| -> io::Result<_> { + assert!(job.args().len() != 0); + Ok(eprintln!("{:?}", job)) + }); + + let mut consumer = consumer.connect(None).unwrap(); + + // prepare some jobs with JobBuilder: + let job1 = JobBuilder::new("rebuild_index") + .queue("test_jobs_created_with_builder_0") + .build(); + + let job2 = Job::builder("register_order") + .args(vec!["ISBN-13:9781718501850"]) + .queue("test_jobs_created_with_builder_1") + .build(); + + let mut job3 = Job::new("register_order", vec!["ISBN-13:9781718501850"]); + job3.queue = "test_jobs_created_with_builder_1".to_string(); + + // enqueue ... + producer.enqueue(job1).unwrap(); + producer.enqueue(job2).unwrap(); + producer.enqueue(job3).unwrap(); + + // ... and execute: + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_0"]) + .unwrap(); + assert!(had_job); + + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_1"]) + .unwrap(); + assert!(had_job); + + let had_job = consumer + .run_one(0, &["test_jobs_created_with_builder_1"]) + .unwrap(); + assert!(had_job); +}