Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed May 21, 2024
1 parent 96e3f2c commit e59e2ec
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 59 deletions.
50 changes: 44 additions & 6 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,47 @@ impl<E: 'static> WorkerBuilder<E> {

/// Set a graceful shutdown signal.
///
/// As soon as the provided future resolves, the graceful shutdown will
/// step in making the [`Worker::run`] operation return control to the calling
/// code.
/// As soon as the provided future resolves, the graceful shutdown will step in
/// making the [`Worker::run`] operation return control to the calling code.
/// In case of the [`Worker::run_to_completion`] operation, the process will be exited
/// upon gracefull shutdown.
///
/// The graceful shutdown itself is a race between the clean up needed to be performed
/// (e.g. report on the currently processed to the Faktory server) and a shutdown deadline.
/// The latter can be customized via [`WorkerBuilder::graceful_shutdown_period`].
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, Job, Worker};
/// use tokio_util::sync::CancellationToken;
///
/// Client::connect(None)
/// .await
/// .unwrap()
/// .enqueue(Job::new("foobar", vec!["z"]))
/// .await
/// .unwrap();
///
/// // create a signalling future (we are using a utility from the `tokio_util` crate)
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
/// let signal = async move { child_token.cancelled().await };
///
/// // get a connected worker
/// let mut w = Worker::builder()
/// .with_graceful_shutdown(signal)
/// .register_fn("job_type", move |_| async { Ok::<(), std::io::Error>(()) })
/// .connect(None)
/// .await
/// .unwrap();
///
/// // start consuming
/// let jh = tokio::spawn(async move { w.run(&["default"]).await });
///
/// // send a signal to eventually return control (upon graceful shutdown)
/// token.cancel();
/// # });
/// ```
pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
where
F: Future<Output = ()> + 'static + Send,
Expand All @@ -111,9 +149,9 @@ impl<E: 'static> WorkerBuilder<E> {

/// Set the graceful shutdown period in milliseconds. Defaults to 5000.
///
/// This will be used once the worker is sent a termination signal whether
/// it is at the application (see [`Worker::run`](Worker::run)) or OS level
/// (via Ctrl-C signal, see docs for [`Worker::run_to_completion`](Worker::run_to_completion)).
/// This will be used once the worker is sent a termination signal whether it is at the application
/// (via a signalling future, see [`WorkerBuilder::with_graceful_shutdown`]) or OS level (via Ctrl-C signal,
/// see [`Worker::run_to_completion`]).
pub fn graceful_shutdown_period(mut self, millis: u64) -> Self {
self.shutdown_timeout = millis;
self
Expand Down
56 changes: 5 additions & 51 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,64 +358,18 @@ impl<
}))
}

/// Run this worker on the given `queues` until an I/O error occurs (`Err` is returned), or
/// until the server tells the worker to disengage (`Ok` is returned), or a signal from the user-space
/// code has been received via a cancellation token (`Ok` is returned).
/// Run this worker on the given `queues`.
///
/// Will run the worker until an I/O error occurs (`Err` is returned), or until the server tells the worker
/// to disengage (`Ok` is returned), or a signal from the user-space code has been received via a future
/// supplied to [`WorkerBuilder::with_graceful_shutdown`](`Ok` is returned).
///
/// The value in an `Ok` indicates the number of workers that may still be processing jobs, but `0` can also
/// indicate the [graceful shutdown period](WorkerBuilder::graceful_shutdown_period) has been exceeded.
///
/// If an error occurred while reporting a job success or failure, the result will be re-reported to the server
/// without re-executing the job. If the worker was terminated (i.e., `run` returns with an `Ok` response),
/// the worker should **not** try to resume by calling `run` again. This will cause a panic.
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, Job, Worker};
/// use tokio_util::sync::CancellationToken;
///
/// Client::connect(None)
/// .await
/// .unwrap()
/// .enqueue(Job::new("foobar", vec!["z"]))
/// .await
/// .unwrap();
///
/// let mut w = Worker::builder()
/// .graceful_shutdown_period(5_000)
/// .register_fn("foobar", |_j| async { Ok::<(), std::io::Error>(()) })
/// .connect(None).await.unwrap();
///
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
///
/// let _handle = tokio::spawn(async move { w.run(&["qname"]).await });
///
/// token.cancel();
/// # });
/// ```
///
/// In case you have no intention to send termination signals, just pass `None` as the second
/// argument of [`Worker::run`]. The modified example from above will look like so:
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, Job, Worker};
///
/// Client::connect(None)
/// .await
/// .unwrap()
/// .enqueue(Job::new("foobar", vec!["z"]))
/// .await
/// .unwrap();
///
/// let mut w = Worker::builder()
/// .graceful_shutdown_period(5_000)
/// .register_fn("foobar", |_j| async { Ok::<(), std::io::Error>(()) })
/// .connect(None).await.unwrap();
///
/// let _handle = tokio::spawn(async move { w.run(&["qname"]).await });
/// # });
/// ```
pub async fn run<Q>(&mut self, queues: &[Q]) -> Result<usize, Error>
where
Q: AsRef<str>,
Expand Down
7 changes: 5 additions & 2 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,16 @@ async fn test_shutdown_signals_handling() {
let (tx, mut rx_for_test_purposes) = tokio::sync::mpsc::channel::<bool>(1);
let tx = sync::Arc::new(tx);

// create
// create a token
let token = CancellationToken::new();
let child_token = token.child_token();

// create a signalling future
let signal = async move { child_token.cancelled().await };

// get a connected worker
let mut w = WorkerBuilder::default()
.with_graceful_shutdown(async move { child_token.cancelled().await })
.with_graceful_shutdown(signal)
.graceful_shutdown_period(shutdown_timeout)
.register_fn(jkind, process_hard_task(tx))
.connect(None)
Expand Down

0 comments on commit e59e2ec

Please sign in to comment.