Skip to content

Commit

Permalink
Handle SIGTERM and signals from user space (#57)
Browse files Browse the repository at this point in the history
Co-authored-by: Jon Gjengset <[email protected]>
  • Loading branch information
rustworthy and jonhoo authored Jul 21, 2024
1 parent 5c0ff09 commit a63b658
Show file tree
Hide file tree
Showing 10 changed files with 986 additions and 649 deletions.
1,075 changes: 486 additions & 589 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ serde_json = "1.0"
sha2 = "0.10.0"
thiserror = "1.0.30"
tokio = { version = "1.35.1", features = [
"io-util",
"net",
"rt",
"rt-multi-thread",
"time",
"sync", # gives us an asynchronous `Mutex`
"io-util", # enables `AsyncWriteExt`, `AsyncReadExt`, and `AsyncBufReadExt` in `tokio::io` namespace
"macros", # brings in `tokio::select!` we are utilizing in `Worker::run`
"net", # enables `tokio::net` namespace with `TcpStream` we are heavily relying upon
"rt-multi-thread", # allows for `tokio::task::block_in_place()` in Client::drop
"time", # anables `tokio::time` namespace holding the `sleep` utility and `Duraction` struct
] }
tokio-native-tls = { version = "0.3.1", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
Expand All @@ -54,6 +55,7 @@ semver = { version = "1.0.23", features = ["serde"] }
rustls-pki-types = "1.0.1"
tokio = { version = "1.35.1", features = ["rt", "macros"] }
tokio-test = "0.4.3"
tokio-util = "0.7.11"
tracing-subscriber = "0.3.18"
x509-parser = "0.15.1"

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ mod proto;
mod worker;

pub use crate::error::Error;

pub use crate::proto::{
Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
Expand Down
91 changes: 89 additions & 2 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::{runner::Closure, CallbacksRegistry, Client, Worker};
use super::{runner::Closure, CallbacksRegistry, Client, ShutdownSignal, Worker};
use crate::{
proto::{utils, ClientOptions},
Error, Job, JobRunner, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

Expand All @@ -15,6 +16,8 @@ pub struct WorkerBuilder<E> {
opts: ClientOptions,
workers_count: usize,
callbacks: CallbacksRegistry<E>,
shutdown_timeout: Option<Duration>,
shutdown_signal: Option<ShutdownSignal>,
}

impl<E> Default for WorkerBuilder<E> {
Expand All @@ -33,6 +36,8 @@ impl<E> Default for WorkerBuilder<E> {
opts: ClientOptions::default(),
workers_count: 1,
callbacks: CallbacksRegistry::default(),
shutdown_timeout: None,
shutdown_signal: None,
}
}
}
Expand Down Expand Up @@ -91,6 +96,81 @@ impl<E: 'static> WorkerBuilder<E> {
self
}

/// Set a graceful shutdown signal.
///
/// As soon as the provided future resolves, the graceful shutdown will step in
/// making the long-running operation (see [`Worker::run`]) return control to the calling code.
///
/// The graceful shutdown itself is a race between the clean up needed to be performed
/// (e.g. report on the currently processed to the Faktory server) and a shutdown deadline.
/// The latter can be customized via [`WorkerBuilder::shutdown_timeout`].
///
/// Note that once the `signal` resolves, the [`Worker`] will be marked as terminated and calling
/// [`Worker::run`] will cause a panic. You will need to build and run a new worker instead.
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, Job, StopReason, Worker};
/// use std::time::Duration;
/// use tokio_util::sync::CancellationToken;
/// use tokio::time::sleep;
///
/// Client::connect(None)
/// .await
/// .unwrap()
/// .enqueue(Job::new("foobar", vec!["z"]))
/// .await
/// .unwrap();
///
/// // create a signalling future (we are using a utility from the `tokio_util` crate)
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
/// let signal = async move { child_token.cancelled().await };
///
/// // get a connected worker
/// let mut w = Worker::builder()
/// .with_graceful_shutdown(signal)
/// .register_fn("job_type", move |_| async { Ok::<(), std::io::Error>(()) })
/// .connect(None)
/// .await
/// .unwrap();
///
/// // start consuming
/// let jh = tokio::spawn(async move { w.run(&["default"]).await });
///
/// // verify the consumer thread has not finished
/// sleep(Duration::from_secs(2)).await;
/// assert!(!jh.is_finished());
///
/// // send a signal to eventually return control (upon graceful shutdown)
/// token.cancel();
///
/// // learn the stop reason and the number of workers that were still running
/// let stop_details = jh.await.expect("joined ok").unwrap();
/// assert_eq!(stop_details.reason, StopReason::GracefulShutdown);
/// let _nrunning = stop_details.workers_still_running;
/// # });
/// ```
pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
where
F: Future<Output = ()> + 'static + Send,
{
self.shutdown_signal = Some(Box::pin(signal));
self
}

/// Set a shutdown timeout.
///
/// This will be used once the worker is sent a termination signal whether it is at the application
/// (via a signalling future, see [`WorkerBuilder::with_graceful_shutdown`]) or OS level (via Ctrl-C signal,
/// see [`Worker::run_to_completion`]).
///
/// Defaults to `None`, i.e. no shoutdown abortion due to a timeout.
pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
self.shutdown_timeout = Some(dur);
self
}

/// Register a handler function for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
Expand Down Expand Up @@ -161,7 +241,14 @@ impl<E: 'static> WorkerBuilder<E> {
self.opts.is_worker = true;
let buffered = BufStream::new(stream);
let client = Client::new(buffered, self.opts).await?;
Ok(Worker::new(client, self.workers_count, self.callbacks).await)
Ok(Worker::new(
client,
self.workers_count,
self.callbacks,
self.shutdown_timeout,
self.shutdown_signal,
)
.await)
}

/// Connect to a Faktory server.
Expand Down
6 changes: 5 additions & 1 deletion src/worker/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ where
/// but should _continue_ processing its current job (if any);
///
/// See more details [here](https://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/workers.go#L13-L49).
///
/// Note that this method is not cancellation safe. We are using an interval timer internally, that
/// would be reset should we call this method anew. Besides, the `Heartbeat` command is being issued
/// with the help of `AsyncWriteExt::write_all` which is not cancellation safe either.
pub(crate) async fn listen_for_heartbeats(
&mut self,
statuses: &[Arc<atomic::AtomicUsize>],
Expand All @@ -51,7 +55,7 @@ where
}

if last.elapsed() < HEARTBEAT_INTERVAL {
// don't sent a heartbeat yet
// don't send a heartbeat yet
continue;
}

Expand Down
Loading

0 comments on commit a63b658

Please sign in to comment.