Skip to content

Commit

Permalink
Merge branch 'main' into chore/tech-debt
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jul 6, 2024
2 parents 2635c19 + a4832db commit fe28a98
Show file tree
Hide file tree
Showing 14 changed files with 577 additions and 39 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ jobs:
run: cargo test --locked --all-features --all-targets
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# commands executed during the following test affect all the queues on the Faktory server,
# so we perform this test in a dedicated - isolated - step, re-using the the Faktory container
- name: cargo test --locked (queue control actions)
run: cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# https://github.com/rust-lang/cargo/issues/6669
- name: cargo test --doc
run: cargo test --locked --all-features --doc
Expand Down
18 changes: 14 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { version = "1.35.1", features = [
tokio-native-tls = { version = "0.3.1", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
url = "2"
semver = { version = "1.0.23", features = ["serde"] }

[dev-dependencies]
rustls-pki-types = "1.0.1"
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ FAKTORY_PORT=7419
FAKTORY_PORT_SECURE=17419
FAKTORY_PORT_UI=7420

.PHONY: precommit
precommit: fmt check test/doc test/e2e test/e2e/tls

.PHONY: fmt
fmt:
cargo fmt

.PHONY: check
check:
cargo fmt --check
Expand Down Expand Up @@ -34,7 +41,7 @@ faktory/tls:

.PHONY: faktory/tls/kill
faktory/tls/kill:
docker compose -f docker/compose.yml down
docker compose -f docker/compose.yml down -v

.PHONY: test
test:
Expand Down
2 changes: 1 addition & 1 deletion docker/faktory.Dockerfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
FROM contribsys/faktory:1.8.0
FROM contribsys/faktory:1.9.0
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ mod proto;
mod worker;

pub use crate::error::Error;
pub use crate::proto::{Client, Connection, Job, JobBuilder, JobId, Reconnect, WorkerId};
pub use crate::proto::{
Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect,
ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
Expand Down
63 changes: 53 additions & 10 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,20 @@ impl Client {
},
}
}

pub(crate) async fn perform_queue_action<Q>(
&mut self,
queues: &[Q],
action: QueueAction,
) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(action, queues))
.await?
.read_ok()
.await
}
}

impl Client {
Expand Down Expand Up @@ -349,10 +363,10 @@ impl Client {
Ok((jobs_count - errors.len(), Some(errors)))
}

/// Retrieve information about the running server.
/// Retrieve [information](crate::ServerSnapshot) about the running server.
///
/// The returned value is the result of running the `INFO` command on the server.
pub async fn info(&mut self) -> Result<serde_json::Value, Error> {
pub async fn current_info(&mut self) -> Result<single::FaktoryState, Error> {
self.issue(&Info)
.await?
.read_json()
Expand All @@ -361,25 +375,54 @@ impl Client {
}

/// Pause the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will pause all the queues. To be more explicit, you may want to call [`Client::queue_pause_all`]
/// shortcut method to pause all the queues.
pub async fn queue_pause<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Pause, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Pause).await
}

/// Pause all queues.
pub async fn queue_pause_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Pause).await
}

/// Resume the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will resume all the queues. To be more explicit, you may want to call [`Client::queue_resume_all`]
/// shortcut method to resume all the queues.
pub async fn queue_resume<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Resume, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Resume).await
}

/// Resume all queues.
pub async fn queue_resume_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Resume).await
}

/// Remove the given queues.
///
/// Beware, passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will **remove** all the queues. To be more explicit, you may want to call [`Client::queue_remove_all`]
/// shortcut method to remove all the queues.
pub async fn queue_remove<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Remove).await
}

/// Remove all queues.
pub async fn queue_remove_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Remove).await
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use client::{Client, Connection};

mod single;

pub use single::{Job, JobBuilder, JobId, WorkerId};
pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};

Expand Down
2 changes: 2 additions & 0 deletions src/proto/single/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl FaktoryCommand for PushBulk {
pub(crate) enum QueueAction {
Pause,
Resume,
Remove,
}

pub(crate) struct QueueControl<'a, S>
Expand All @@ -298,6 +299,7 @@ where
let command = match self.action {
QueueAction::Pause => b"QUEUE PAUSE".as_ref(),
QueueAction::Resume => b"QUEUE RESUME".as_ref(),
QueueAction::Remove => b"QUEUE REMOVE".as_ref(),
};
w.write_all(command).await?;
write_queues(w, self.queues).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/proto/single/ent/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::proto::single::JobId;

use super::utils;
use crate::proto::single::JobId;
use chrono::{DateTime, Utc};
use derive_builder::Builder;

/// Info on job execution progress (sent).
///
/// In Enterprise Faktory, a client executing a job can report on the execution
Expand Down
112 changes: 108 additions & 4 deletions src/proto/single/resp.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use super::utils;
use crate::error::{self, Error};
use chrono::{DateTime, Utc};
use std::collections::BTreeMap;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};

#[cfg(feature = "ent")]
use crate::ent::BatchId;

use crate::error::{self, Error};
use tokio::io::AsyncBufRead;

pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol {
let stringy = match *got {
RawResponse::String(ref s) => Some(&**s),
Expand Down Expand Up @@ -118,6 +122,107 @@ pub async fn read_ok<R: AsyncBufRead + Unpin>(r: R) -> Result<(), Error> {
Err(bad("server ok", &rr).into())
}

// ----------------------------------------------

/// Faktory service information.
///
/// This holds information on the registered [queues](DataSnapshot::queues) as well as
/// some aggregated data, e.g. total number of jobs [processed](DataSnapshot::total_processed),
/// total number of jobs [enqueued](DataSnapshot::total_enqueued), etc.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[non_exhaustive]
pub struct DataSnapshot {
/// Total number of job failures.
pub total_failures: u64,

/// Total number of processed jobs.
pub total_processed: u64,

/// Total number of enqueued jobs.
pub total_enqueued: u64,

/// Total number of queues.
pub total_queues: u64,

/// Queues stats.
///
/// A mapping between a queue name and its size (number of jobs on the queue).
/// The keys of this map effectively make up a list of queues that are currently
/// registered in the Faktory service.
pub queues: BTreeMap<String, u64>,

/// ***Deprecated***. Faktory's task runner stats.
///
/// Note that this is exposed as a "generic" `serde_json::Value` since this info
/// belongs to deep implementation details of the Faktory service.
#[deprecated(
note = "marked as deprecated in the Faktory source code and is likely to be completely removed in the future, so please do not rely on this data"
)]
pub tasks: serde_json::Value,
}

/// Faktory's server process information.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServerSnapshot {
/// Description of the server process (e.g. "Faktory").
pub description: String,

/// Faktory's version as semver.
#[serde(rename = "faktory_version")]
pub version: semver::Version,

/// Faktory server process uptime in seconds.
#[serde(deserialize_with = "utils::deser_duration")]
#[serde(serialize_with = "utils::ser_duration")]
pub uptime: Duration,

/// Number of clients connected to the server.
pub connections: u64,

/// Number of executed commands.
pub command_count: u64,

/// Faktory server process memory usage.
pub used_memory_mb: u64,
}

/// Current server state.
///
/// Contains such details as how many queues there are on the server, statistics on the jobs,
/// as well as some specific info on server process memory usage, uptime, etc.
///
/// Here is an example of the simplest way to fetch info on the server state.
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
///
/// let mut client = Client::connect(None).await.unwrap();
/// let _server_state = client.current_info().await.unwrap();
/// # });
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FaktoryState {
/// Server time.
pub now: DateTime<Utc>,

/// Server time (naive representation).
///
/// Faktory sends it as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC")
/// and it is being parsed as `NaiveTime`.
///
/// Most of the time, though, you will want to use [`FaktoryState::now`] instead.
#[serde(deserialize_with = "utils::deser_server_time")]
#[serde(serialize_with = "utils::ser_server_time")]
pub server_utc_time: chrono::naive::NaiveTime,

/// Faktory service information.
#[serde(rename = "faktory")]
pub data: DataSnapshot,

/// Faktory's server process information.
pub server: ServerSnapshot,
}

// ----------------------------------------------
//
// below is the implementation of the Redis RESP protocol
Expand All @@ -132,7 +237,6 @@ pub enum RawResponse {
Null,
}

use tokio::io::{AsyncBufReadExt, AsyncReadExt};
async fn read<R>(mut r: R) -> Result<RawResponse, Error>
where
R: AsyncBufRead + Unpin,
Expand Down
Loading

0 comments on commit fe28a98

Please sign in to comment.