diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b42d5e73..f3914872 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,6 +42,12 @@ jobs: run: cargo test --locked --all-features --all-targets env: # set this explicitly so integration tests will run FAKTORY_URL: tcp://127.0.0.1:7419 + # commands executed during the following test affect all the queues on the Faktory server, + # so we perform this test in a dedicated - isolated - step, re-using the the Faktory container + - name: cargo test --locked (queue control actions) + run: cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored + env: # set this explicitly so integration tests will run + FAKTORY_URL: tcp://127.0.0.1:7419 # https://github.com/rust-lang/cargo/issues/6669 - name: cargo test --doc run: cargo test --locked --all-features --doc diff --git a/Cargo.lock b/Cargo.lock index 8505c38b..a4de9e5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,6 +493,7 @@ dependencies = [ "pin-project", "rand 0.8.0", "rustls-pki-types", + "semver", "serde", "serde_derive", "serde_json", @@ -1099,20 +1100,29 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +dependencies = [ + "serde", +] + [[package]] name = "serde" -version = "1.0.186" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f5db24220c009de9bd45e69fb2938f4b6d2df856aa9304ce377b3180f83b7c1" +checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.186" +version = "1.0.202" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad697f7e0b65af4983a4ce8f56ed5b357e8d3c36651bf6a7e13639c17b8e670" +checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 5e5a257e..ee94fa0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ tokio = { version = "1.35.1", features = [ tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } url = "2" +semver = { version = "1.0.23", features = ["serde"] } [dev-dependencies] rustls-pki-types = "1.0.1" diff --git a/Makefile b/Makefile index 16a2ff95..4af3ed1d 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,13 @@ FAKTORY_PORT=7419 FAKTORY_PORT_SECURE=17419 FAKTORY_PORT_UI=7420 +.PHONY: precommit +precommit: fmt check test/doc test/e2e test/e2e/tls + +.PHONY: fmt +fmt: + cargo fmt + .PHONY: check check: cargo fmt --check @@ -34,7 +41,7 @@ faktory/tls: .PHONY: faktory/tls/kill faktory/tls/kill: - docker compose -f docker/compose.yml down + docker compose -f docker/compose.yml down -v .PHONY: test test: diff --git a/docker/faktory.Dockerfile b/docker/faktory.Dockerfile index ff106455..b41fa142 100644 --- a/docker/faktory.Dockerfile +++ b/docker/faktory.Dockerfile @@ -1 +1 @@ -FROM contribsys/faktory:1.8.0 +FROM contribsys/faktory:1.9.0 diff --git a/src/lib.rs b/src/lib.rs index ef94a5d6..660eeafb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,10 @@ mod proto; mod worker; pub use crate::error::Error; -pub use crate::proto::{Client, Connection, Job, JobBuilder, JobId, Reconnect, WorkerId}; +pub use crate::proto::{ + Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, + ServerSnapshot, WorkerId, +}; pub use crate::worker::{JobRunner, Worker, WorkerBuilder}; #[cfg(feature = "ent")] diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index b5233021..694d1426 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -306,6 +306,20 @@ impl Client { }, } } + + pub(crate) async fn perform_queue_action( + &mut self, + queues: &[Q], + action: QueueAction, + ) -> Result<(), Error> + where + Q: AsRef + Sync, + { + self.issue(&QueueControl::new(action, queues)) + .await? + .read_ok() + .await + } } impl Client { @@ -349,10 +363,10 @@ impl Client { Ok((jobs_count - errors.len(), Some(errors))) } - /// Retrieve information about the running server. + /// Retrieve [information](crate::ServerSnapshot) about the running server. /// /// The returned value is the result of running the `INFO` command on the server. - pub async fn info(&mut self) -> Result { + pub async fn current_info(&mut self) -> Result { self.issue(&Info) .await? .read_json() @@ -361,25 +375,54 @@ impl Client { } /// Pause the given queues. + /// + /// Passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will pause all the queues. To be more explicit, you may want to call [`Client::queue_pause_all`] + /// shortcut method to pause all the queues. pub async fn queue_pause(&mut self, queues: &[Q]) -> Result<(), Error> where Q: AsRef + Sync, { - self.issue(&QueueControl::new(QueueAction::Pause, queues)) - .await? - .read_ok() - .await + self.perform_queue_action(queues, QueueAction::Pause).await + } + + /// Pause all queues. + pub async fn queue_pause_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Pause).await } /// Resume the given queues. + /// + /// Passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will resume all the queues. To be more explicit, you may want to call [`Client::queue_resume_all`] + /// shortcut method to resume all the queues. pub async fn queue_resume(&mut self, queues: &[Q]) -> Result<(), Error> where Q: AsRef + Sync, { - self.issue(&QueueControl::new(QueueAction::Resume, queues)) - .await? - .read_ok() - .await + self.perform_queue_action(queues, QueueAction::Resume).await + } + + /// Resume all queues. + pub async fn queue_resume_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Resume).await + } + + /// Remove the given queues. + /// + /// Beware, passing a wildcard `&["*"]` as the value of the `queues` parameter + /// will **remove** all the queues. To be more explicit, you may want to call [`Client::queue_remove_all`] + /// shortcut method to remove all the queues. + pub async fn queue_remove(&mut self, queues: &[Q]) -> Result<(), Error> + where + Q: AsRef + Sync, + { + self.perform_queue_action(queues, QueueAction::Remove).await + } + + /// Remove all queues. + pub async fn queue_remove_all(&mut self) -> Result<(), Error> { + self.perform_queue_action(&["*"], QueueAction::Remove).await } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 64065d4b..2972b127 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,7 @@ pub use client::{Client, Connection}; mod single; -pub use single::{Job, JobBuilder, JobId, WorkerId}; +pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId}; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/cmd.rs b/src/proto/single/cmd.rs index 54fb7115..13efb0ab 100644 --- a/src/proto/single/cmd.rs +++ b/src/proto/single/cmd.rs @@ -279,6 +279,7 @@ impl FaktoryCommand for PushBulk { pub(crate) enum QueueAction { Pause, Resume, + Remove, } pub(crate) struct QueueControl<'a, S> @@ -298,6 +299,7 @@ where let command = match self.action { QueueAction::Pause => b"QUEUE PAUSE".as_ref(), QueueAction::Resume => b"QUEUE RESUME".as_ref(), + QueueAction::Remove => b"QUEUE REMOVE".as_ref(), }; w.write_all(command).await?; write_queues(w, self.queues).await?; diff --git a/src/proto/single/ent/progress.rs b/src/proto/single/ent/progress.rs index f7ca87d4..20f36960 100644 --- a/src/proto/single/ent/progress.rs +++ b/src/proto/single/ent/progress.rs @@ -1,8 +1,8 @@ -use crate::proto::single::JobId; - use super::utils; +use crate::proto::single::JobId; use chrono::{DateTime, Utc}; use derive_builder::Builder; + /// Info on job execution progress (sent). /// /// In Enterprise Faktory, a client executing a job can report on the execution diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 5d4ede59..05c470ea 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,9 +1,13 @@ +use super::utils; +use crate::error::{self, Error}; +use chrono::{DateTime, Utc}; +use std::collections::BTreeMap; +use std::time::Duration; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt}; + #[cfg(feature = "ent")] use crate::ent::BatchId; -use crate::error::{self, Error}; -use tokio::io::AsyncBufRead; - pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol { let stringy = match *got { RawResponse::String(ref s) => Some(&**s), @@ -118,6 +122,107 @@ pub async fn read_ok(r: R) -> Result<(), Error> { Err(bad("server ok", &rr).into()) } +// ---------------------------------------------- + +/// Faktory service information. +/// +/// This holds information on the registered [queues](DataSnapshot::queues) as well as +/// some aggregated data, e.g. total number of jobs [processed](DataSnapshot::total_processed), +/// total number of jobs [enqueued](DataSnapshot::total_enqueued), etc. +#[derive(Serialize, Deserialize, Debug, Clone)] +#[non_exhaustive] +pub struct DataSnapshot { + /// Total number of job failures. + pub total_failures: u64, + + /// Total number of processed jobs. + pub total_processed: u64, + + /// Total number of enqueued jobs. + pub total_enqueued: u64, + + /// Total number of queues. + pub total_queues: u64, + + /// Queues stats. + /// + /// A mapping between a queue name and its size (number of jobs on the queue). + /// The keys of this map effectively make up a list of queues that are currently + /// registered in the Faktory service. + pub queues: BTreeMap, + + /// ***Deprecated***. Faktory's task runner stats. + /// + /// Note that this is exposed as a "generic" `serde_json::Value` since this info + /// belongs to deep implementation details of the Faktory service. + #[deprecated( + note = "marked as deprecated in the Faktory source code and is likely to be completely removed in the future, so please do not rely on this data" + )] + pub tasks: serde_json::Value, +} + +/// Faktory's server process information. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ServerSnapshot { + /// Description of the server process (e.g. "Faktory"). + pub description: String, + + /// Faktory's version as semver. + #[serde(rename = "faktory_version")] + pub version: semver::Version, + + /// Faktory server process uptime in seconds. + #[serde(deserialize_with = "utils::deser_duration")] + #[serde(serialize_with = "utils::ser_duration")] + pub uptime: Duration, + + /// Number of clients connected to the server. + pub connections: u64, + + /// Number of executed commands. + pub command_count: u64, + + /// Faktory server process memory usage. + pub used_memory_mb: u64, +} + +/// Current server state. +/// +/// Contains such details as how many queues there are on the server, statistics on the jobs, +/// as well as some specific info on server process memory usage, uptime, etc. +/// +/// Here is an example of the simplest way to fetch info on the server state. +/// ```no_run +/// # tokio_test::block_on(async { +/// use faktory::Client; +/// +/// let mut client = Client::connect(None).await.unwrap(); +/// let _server_state = client.current_info().await.unwrap(); +/// # }); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FaktoryState { + /// Server time. + pub now: DateTime, + + /// Server time (naive representation). + /// + /// Faktory sends it as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC") + /// and it is being parsed as `NaiveTime`. + /// + /// Most of the time, though, you will want to use [`FaktoryState::now`] instead. + #[serde(deserialize_with = "utils::deser_server_time")] + #[serde(serialize_with = "utils::ser_server_time")] + pub server_utc_time: chrono::naive::NaiveTime, + + /// Faktory service information. + #[serde(rename = "faktory")] + pub data: DataSnapshot, + + /// Faktory's server process information. + pub server: ServerSnapshot, +} + // ---------------------------------------------- // // below is the implementation of the Redis RESP protocol @@ -132,7 +237,6 @@ pub enum RawResponse { Null, } -use tokio::io::{AsyncBufReadExt, AsyncReadExt}; async fn read(mut r: R) -> Result where R: AsyncBufRead + Unpin, diff --git a/src/proto/single/utils.rs b/src/proto/single/utils.rs index ed4f55f6..0138f974 100644 --- a/src/proto/single/utils.rs +++ b/src/proto/single/utils.rs @@ -1,4 +1,7 @@ +use chrono::naive::NaiveTime; use rand::{thread_rng, Rng}; +use serde::{de::Deserializer, Deserialize, Serializer}; +use std::time::Duration; const JOB_ID_LENGTH: usize = 16; const WORKER_ID_LENGTH: usize = 32; @@ -19,6 +22,44 @@ pub(crate) fn gen_random_wid() -> String { gen_random_id(WORKER_ID_LENGTH) } +pub(crate) fn ser_duration(value: &Duration, serializer: S) -> Result +where + S: Serializer, +{ + let secs = value.as_secs(); + serializer.serialize_u64(secs) +} + +pub(crate) fn deser_duration<'de, D>(value: D) -> Result +where + D: Deserializer<'de>, +{ + let secs = u64::deserialize(value)?; + Ok(Duration::from_secs(secs)) +} + +pub(crate) fn ser_server_time(value: &NaiveTime, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&format!("{} UTC", value)) +} + +pub(crate) fn deser_server_time<'de, D>(value: D) -> Result +where + D: Deserializer<'de>, +{ + let naive_time_str = String::deserialize(value)?; + let naive_time_str = naive_time_str + .strip_suffix(" UTC") + .ok_or(serde::de::Error::custom( + "Expected a naive time string that ends with ' UTC'", + ))?; + let naive_time = + NaiveTime::parse_from_str(naive_time_str, "%H:%M:%S").map_err(serde::de::Error::custom)?; + Ok(naive_time) +} + #[cfg(test)] mod test { use super::*; @@ -49,4 +90,43 @@ mod test { } assert_eq!(ids.len(), 1_000_000); } + + #[test] + fn test_ser_deser_duration() { + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] + struct FaktoryServer { + #[serde(deserialize_with = "deser_duration")] + #[serde(serialize_with = "ser_duration")] + uptime: Duration, + } + + let server = FaktoryServer { + uptime: Duration::from_secs(2024), + }; + + let serialized = serde_json::to_string(&server).expect("serialized ok"); + let deserialized = serde_json::from_str(&serialized).expect("deserialized ok"); + assert_eq!(server, deserialized); + } + + #[test] + fn test_ser_deser_server_time() { + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] + struct FaktoryServer { + /// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC"). + #[serde(deserialize_with = "deser_server_time")] + #[serde(serialize_with = "ser_server_time")] + pub server_utc_time: NaiveTime, + } + + let server = FaktoryServer { + server_utc_time: NaiveTime::from_hms_opt(19, 47, 39).expect("valid"), + }; + + let serialized = serde_json::to_string(&server).expect("serialized ok"); + assert_eq!(serialized, "{\"server_utc_time\":\"19:47:39 UTC\"}"); + + let deserialized = serde_json::from_str(&serialized).expect("deserialized ok"); + assert_eq!(server, deserialized); + } } diff --git a/tests/real/community.rs b/tests/real/community.rs index 6ed32d9f..66755421 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,7 +1,8 @@ -use crate::skip_check; +use crate::{assert_gte, skip_check}; use faktory::{Client, Job, JobBuilder, JobId, Worker, WorkerBuilder, WorkerId}; use serde_json::Value; -use std::{io, sync}; +use std::{io, sync, time::Duration}; +use tokio::time as tokio_time; #[tokio::test(flavor = "multi_thread")] async fn hello_client() { @@ -70,6 +71,109 @@ async fn roundtrip() { assert!(drained); } +#[tokio::test(flavor = "multi_thread")] +async fn server_state() { + skip_check!(); + + let local = "server_state"; + + // prepare a worker + let mut w = WorkerBuilder::default() + .register_fn(local, move |_| async move { Ok::<(), io::Error>(()) }) + .connect(None) + .await + .unwrap(); + + // prepare a producing client + let mut client = Client::connect(None).await.unwrap(); + + // examine server state before pushing anything + let server_state = client.current_info().await.unwrap(); + // the Faktory release we are writing bindings and testing + // against is at least "1.8.0" + assert_eq!(server_state.server.version.major, 1); + assert_gte!(server_state.server.version.minor, 8); + assert!(server_state.data.queues.get(local).is_none()); + // the following two assertions are not super-helpful but + // there is not much info we can make meaningful assetions on anyhow + // (like memusage, server description string, version, etc.) + assert_gte!( + server_state.server.connections, + 2, + "{}", + server_state.server.connections + ); // at least two clients from the current test + assert_ne!(server_state.server.uptime.as_secs(), 0); // if IPC is happenning, this should hold :) + + let nenqueued = server_state.data.total_enqueued; + let nqueues = server_state.data.total_queues; + + // push 1 job + client + .enqueue( + JobBuilder::new(local) + .args(vec!["abc"]) + .queue(local) + .build(), + ) + .await + .unwrap(); + + // let's give Faktory a second to get updated + tokio_time::sleep(Duration::from_secs(1)).await; + + // we only pushed 1 job on this queue + let server_state = client.current_info().await.unwrap(); + assert_eq!(*server_state.data.queues.get(local).unwrap(), 1); + // `total_enqueued` should be at least +1 job from from last read + assert_gte!( + server_state.data.total_enqueued, + nenqueued + 1, + "`total_enqueued` equals {} which is not greater than or equal to {}", + server_state.data.total_enqueued, + nenqueued + 1 + ); + // `total_queues` should be at least +1 queue from last read + assert_gte!( + server_state.data.total_queues, + nqueues + 1, + "`total_queues` equals {} which is not greater than or equal to {}", + server_state.data.total_queues, + nqueues + 1 + ); + + // let's know consume that job ... + assert!(w.run_one(0, &[local]).await.unwrap()); + + // ... and verify the queue has got 0 pending jobs + // + // NB! If this is not passing locally, make sure to launch a fresh Faktory container, + // because if you have not pruned its volume the Faktory will still keep the queue name + // as registered. + // But generally, we are performing a clean-up by consuming the jobs from the local queue/ + // and then deleting the queue programmatically, so there is normally no need to prune docker + // volumes to perform the next test run. Also note that on CI we are always starting a-fresh. + let server_state = client.current_info().await.unwrap(); + assert_eq!(*server_state.data.queues.get(local).unwrap(), 0); + // `total_processed` should be at least +1 queue from last read + assert_gte!( + server_state.data.total_processed, + 1, + "{}", + server_state.data.total_processed + ); + + client.queue_remove(&[local]).await.unwrap(); + assert!(client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .is_none()); +} + #[tokio::test(flavor = "multi_thread")] async fn multi() { skip_check!(); @@ -152,41 +256,191 @@ async fn fail() { } #[tokio::test(flavor = "multi_thread")] -async fn queue() { +async fn queue_control_actions() { skip_check!(); - let local = "pause"; + + let local_1 = "queue_control_pause_and_resume_1"; + let local_2 = "queue_control_pause_and_resume_2"; let (tx, rx) = sync::mpsc::channel(); - let tx = sync::Arc::new(sync::Mutex::new(tx)); + let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); + let tx_2 = sync::Arc::clone(&tx_1); - let mut w = WorkerBuilder::default() + let mut worker = WorkerBuilder::default() .hostname("tester".to_string()) - .wid(WorkerId::new(local)) - .register_fn(local, move |_job| { - let tx = sync::Arc::clone(&tx); + .wid(WorkerId::new(local_1)) + .register_fn(local_1, move |_job| { + let tx = sync::Arc::clone(&tx_1); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .register_fn(local_2, move |_job| { + let tx = sync::Arc::clone(&tx_2); Box::pin(async move { tx.lock().unwrap().send(true) }) }) .connect(None) .await .unwrap(); - let mut p = Client::connect(None).await.unwrap(); - p.enqueue(Job::new(local, vec![Value::from(1)]).on_queue(local)) + let mut client = Client::connect(None).await.unwrap(); + + // enqueue three jobs + client + .enqueue_many([ + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + ]) .await .unwrap(); - p.queue_pause(&[local]).await.unwrap(); - let had_job = w.run_one(0, &[local]).await.unwrap(); + // pause the queue + client.queue_pause(&[local_1]).await.unwrap(); + + // try to consume from that queue + let had_job = worker.run_one(0, &[local_1]).await.unwrap(); assert!(!had_job); let worker_executed = rx.try_recv().is_ok(); assert!(!worker_executed); - p.queue_resume(&[local]).await.unwrap(); + // resume that queue and ... + client.queue_resume(&[local_1]).await.unwrap(); - let had_job = w.run_one(0, &[local]).await.unwrap(); + // ... be able to consume from it + let had_job = worker.run_one(0, &[local_1]).await.unwrap(); assert!(had_job); let worker_executed = rx.try_recv().is_ok(); assert!(worker_executed); + + // push two jobs on the other queue (reminder: we got two jobs + // remaining on the first queue): + client + .enqueue_many([ + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + ]) + .await + .unwrap(); + + // pause both queues the queues + client.queue_pause(&[local_1, local_2]).await.unwrap(); + + // try to consume from them + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // now, resume the queues and ... + client.queue_resume(&[local_1, local_2]).await.unwrap(); + + // ... be able to consume from both of them + assert!(worker.run_one(0, &[local_1]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + assert!(worker.run_one(0, &[local_2]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + + // let's inspect the sever state + let server_state = client.current_info().await.unwrap(); + let queues = &server_state.data.queues; + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining + + // let's now remove the queues + client.queue_remove(&[local_1, local_2]).await.unwrap(); + + // though there _was_ a job in each queue, consuming from + // the removed queues will not yield anything + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // let's inspect the sever state again + let server_state = client.current_info().await.unwrap(); + let queues = &server_state.data.queues; + // our queue are not even mentioned in the server report: + assert!(queues.get(local_1).is_none()); + assert!(queues.get(local_2).is_none()); +} + +// Run the following test with: +// FAKTORY_URL=tcp://127.0.0.1:7419 cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored +#[tokio::test(flavor = "multi_thread")] +#[ignore = "this test requires a dedicated test run since the commands being tested will affect all queues on the Faktory server"] +async fn queue_control_actions_wildcard() { + skip_check!(); + + let local_1 = "queue_control_wildcard_1"; + let local_2 = "queue_control_wildcard_2"; + + let (tx, rx) = sync::mpsc::channel(); + let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); + let tx_2 = sync::Arc::clone(&tx_1); + + let mut worker = WorkerBuilder::default() + .hostname("tester".to_string()) + .wid(WorkerId::new(local_1)) + .register_fn(local_1, move |_job| { + let tx = sync::Arc::clone(&tx_1); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .register_fn(local_2, move |_job| { + let tx = sync::Arc::clone(&tx_2); + Box::pin(async move { tx.lock().unwrap().send(true) }) + }) + .connect(None) + .await + .unwrap(); + + let mut client = Client::connect(None).await.unwrap(); + + // enqueue two jobs on each queue + client + .enqueue_many([ + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_1, vec![Value::from(1)]).on_queue(local_1), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + Job::new(local_2, vec![Value::from(1)]).on_queue(local_2), + ]) + .await + .unwrap(); + + // pause all queues the queues + client.queue_pause_all().await.unwrap(); + + // try to consume from queues + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // now, resume all the queues and ... + client.queue_resume_all().await.unwrap(); + + // ... be able to consume from both of them + assert!(worker.run_one(0, &[local_1]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + assert!(worker.run_one(0, &[local_2]).await.unwrap()); + assert!(rx.try_recv().is_ok()); + + // let's inspect the sever state + let server_state = client.current_info().await.unwrap(); + let queues = &server_state.data.queues; + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining + + // let's now remove all the queues + client.queue_remove_all().await.unwrap(); + + // though there _was_ a job in each queue, consuming from + // the removed queues will not yield anything + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + assert!(!worker.run_one(0, &[local_2]).await.unwrap()); + assert!(!rx.try_recv().is_ok()); + + // let's inspect the sever state again + let server_state = client.current_info().await.unwrap(); + let queues = &server_state.data.queues; + // our queue are not even mentioned in the server report: + assert!(queues.get(local_1).is_none()); + assert!(queues.get(local_2).is_none()); } #[tokio::test(flavor = "multi_thread")] diff --git a/tests/real/utils.rs b/tests/real/utils.rs index 9fefb92f..1ae08534 100644 --- a/tests/real/utils.rs +++ b/tests/real/utils.rs @@ -16,6 +16,34 @@ macro_rules! skip_if_not_enterprise { }; } +#[macro_export] +macro_rules! assert_gt { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a > $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_gte { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a >= $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_lt { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a < $b $(, $rest) *) + }; +} + +#[macro_export] +macro_rules! assert_lte { + ($a:expr, $b:expr $(, $rest:expr) *) => { + assert!($a <= $b $(, $rest) *) + }; +} + #[cfg(feature = "ent")] pub fn learn_faktory_url() -> String { let url = std::env::var_os("FAKTORY_URL").expect(