Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SIGTERM and signals from user space #57

Merged
merged 182 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
182 commits
Select commit Hold shift + click to select a range
9fdcd85
Reconcile with latest main
rustworthy Feb 23, 2024
cbeb2ce
Use x509-parser for tls test. Add test verifier
rustworthy Feb 24, 2024
a3b99c4
Make minimal versions pass again
rustworthy Feb 24, 2024
7ab8ab3
Comment out Openssl installation on windows in CI
rustworthy Feb 24, 2024
e6b3a4c
Revert "Comment out Openssl installation on windows in CI"
rustworthy Feb 24, 2024
ab38bfc
Restore patch version. Comment out OpenSSL install on CI
rustworthy Feb 24, 2024
ce4c8f2
Update tls test to use job runner
rustworthy Feb 24, 2024
fb2ddf1
interim
rustworthy Feb 24, 2024
2d4ef10
Improme consumer builder api
rustworthy Feb 24, 2024
0582514
Run fmt
rustworthy Feb 24, 2024
7f44798
Split client mod
rustworthy Feb 25, 2024
84864da
Producer -> Client
rustworthy Feb 25, 2024
6d11ad9
Fix 'commit_batch' on Client
rustworthy Feb 25, 2024
a875a35
Consumer -> Worker
rustworthy Feb 25, 2024
a1c613a
Add JobId struct
rustworthy Feb 26, 2024
008126d
Use JobId in lib
rustworthy Feb 27, 2024
9a56dea
Use WorkerId new-type
rustworthy Feb 27, 2024
566d2a2
Split Batch mod into logic constructs
rustworthy Feb 27, 2024
b7560ba
Use BatchId for operaions with Batch
rustworthy Feb 27, 2024
2fb7c63
re-export async trait
rustworthy Feb 28, 2024
755d452
Make 'jid', 'kind' and 'args' public on Job struct
rustworthy Feb 28, 2024
65110cf
Clean up in worker crate
rustworthy Mar 1, 2024
dcd6d97
Re-export rustls for convenience
rustworthy Mar 1, 2024
3c9ade4
re-export tokio main, clean up
rustworthy Mar 1, 2024
55890c8
Group re-exports in lib.rs
rustworthy Mar 2, 2024
00a6d1d
Fix copypasta in cmd.rs
rustworthy Mar 2, 2024
5382d5e
Rm redundant method on Worker
rustworthy Mar 20, 2024
dde77ce
Respecrt clippy on beta chan
rustworthy Mar 20, 2024
faeb436
Use try_seconds instead of seconds on Duration struct
rustworthy Mar 20, 2024
9ba6e85
Use try_seconds instead of seconds on Duration struct in tests
rustworthy Mar 20, 2024
fe65c67
Pin chrono at 0.4.32 to make minimal-versions pass
rustworthy Mar 20, 2024
ab59c97
Pin chrono at 0.4.32 to make minimal-versions pass
rustworthy Mar 20, 2024
8b979a4
Fix opts in Worker::connect method
rustworthy Mar 20, 2024
5077f2a
Use 'dep:' for feature deps
rustworthy Apr 2, 2024
5b041c9
Use tokio-native-tls for tls feature
rustworthy Apr 2, 2024
4efd5b6
Rm re-exports. Use tokio/macros for binaries feat only
rustworthy Apr 2, 2024
d657f69
Restore pin of min version for openssl
rustworthy Apr 3, 2024
53969f7
Use JoinSet in loadtest binary
rustworthy Apr 3, 2024
ea5fa8b
Rm left-over from proto/mod
rustworthy Apr 3, 2024
d3389ca
Keep 'CommitBatch' public in crate
rustworthy Apr 3, 2024
b3a272d
Keep 'GetBatchStatus' public in crate
rustworthy Apr 3, 2024
c70ff2c
Keep 'OpenBatch' public in crate
rustworthy Apr 3, 2024
a561377
Clean up in Cleint::init
rustworthy Apr 4, 2024
e2ccfcf
Split private and public methods for Client into blocks
rustworthy Apr 4, 2024
a1a701e
Derive Debug for ClientOptions
rustworthy Apr 4, 2024
9e55c24
Update self_to_cmd to not use format
rustworthy Apr 4, 2024
3aba8da
Add 'new' method for Ack cmd
rustworthy Apr 4, 2024
8270b7e
Add docs to 'generic' and 'generic_with_backtrace'
rustworthy Apr 4, 2024
b97071e
Only make commands pub(crate)
rustworthy Apr 4, 2024
6a32b4e
Add disclaimer on batch and job progress: ent only
rustworthy Apr 5, 2024
84897e9
Make jid private again
rustworthy Apr 5, 2024
fddd71f
Keep `args` private on `Job`
rustworthy Apr 5, 2024
11c9a87
Rm rudimentary extern cate syntax from test binaries
rustworthy Apr 5, 2024
bfc76fc
Rm left-over from testing
rustworthy Apr 5, 2024
5ee03bd
Use mem::take in pop_bytes_written on MockStream
rustworthy Apr 5, 2024
8da1e6c
Rm left-over print stmts from testing from tests/mock
rustworthy Apr 5, 2024
44a3b31
Restore hello_c test as hello_worker
rustworthy Apr 5, 2024
8b6e784
Clean up 'roundtrip' test
rustworthy Apr 5, 2024
ef9aea0
Handle SIGTERM in 'run_to_completion'
rustworthy Mar 21, 2024
10ae119
Add 'forever' example
rustworthy Mar 21, 2024
a2bb363
Pass receiver to 'run' method on Worker
rustworthy Mar 21, 2024
bca69f1
Introduce 'channel' and 'Message'
rustworthy Mar 21, 2024
5d1a203
Process ExitNow and ReturnNow messages. Add test
rustworthy Mar 21, 2024
9f59757
Use public method names as example names
rustworthy Mar 21, 2024
a38cde9
Give Faktory server time to schedule the failed job
rustworthy Mar 21, 2024
ddb26c3
Remove assertions on job re-scheduling
rustworthy Mar 21, 2024
d73de7c
Mark worker as terminated when message from userland reveived
rustworthy Mar 22, 2024
0fc49ed
Update test to cover 'ReturnControlNow' case
rustworthy Mar 22, 2024
e5efcfb
Add 'connect_worker' with common logic
rustworthy Mar 22, 2024
c6aeced
Re-export 'tokio' lib
rustworthy Mar 22, 2024
90b8c11
Add 'run' to examples
rustworthy Mar 22, 2024
8ad3e60
Derive standards trait for Message. Mark as non-exhaustive
rustworthy Mar 23, 2024
fc54e9f
Clean up
rustworthy Apr 6, 2024
b633b19
Rm 'extern' declaration from tests/real
rustworthy Apr 6, 2024
dd3a989
Make min-versions pass again
rustworthy Apr 7, 2024
6bc3fd6
Make min-versions pass again
rustworthy Apr 7, 2024
ec72dea
Merge branch 'development' into feat/sigterm-handling
rustworthy Apr 9, 2024
9077b01
Ignore mutants output
rustworthy Apr 9, 2024
6f195fb
Use oneshot channel
rustworthy Apr 14, 2024
6b9f3c0
Add message on cancellation safety
rustworthy Apr 15, 2024
5a17498
Support both openssl and rustls
rustworthy Apr 21, 2024
13eaace
Support both openssl and rustls. Clean up
rustworthy Apr 21, 2024
8d54b0c
Support both openssl and rustls. Min versions fix
rustworthy Apr 21, 2024
f1e5966
Support both openssl and rustls. Add TlsStream error
rustworthy Apr 21, 2024
58cb247
Support both openssl and rustls. Rustls clean up
rustworthy Apr 22, 2024
6628efb
Rm `async` from loadtest binary name
rustworthy Apr 22, 2024
6af3dcc
Rm leading underscore in `ops_count` var in loadtest
rustworthy Apr 22, 2024
f8d3f32
Rm redundant 'asynchronously' from the docs in proto::client mod
rustworthy Apr 22, 2024
e9cd581
Update private docs for ClientOptions
rustworthy Apr 22, 2024
2df103d
Craete BatchId, WorkerId, and JobId with `::new`
rustworthy Apr 22, 2024
a491964
Add `AsRef<str>` impl for BatchId, WorkerId, and JobId
rustworthy Apr 22, 2024
bb60f6b
Run formatter. Fix doc code
rustworthy Apr 22, 2024
14bcf64
Take 'Fail' in WorkerStatesRegistry::register_failure
rustworthy Apr 23, 2024
609950f
Impl IntoIterator for WorkerStatesRegistry
rustworthy Apr 23, 2024
db9b636
Add docs for Closure newtype
rustworthy Apr 23, 2024
757f92e
Restore JobRunner impl for &' mut F
rustworthy Apr 23, 2024
6469573
Update worker in community::roundtrip test to chain `register`
rustworthy Apr 23, 2024
803400d
Do not requiere *Ext in src::worker::mod
rustworthy Apr 23, 2024
4b29f9c
Restore docs for WorkerBuilder::default
rustworthy Apr 23, 2024
d869858
Rm redunrant helper methods on Worker
rustworthy Apr 23, 2024
9329ea0
Restore docs on WorkerBuilder::connect
rustworthy Apr 23, 2024
3b05585
Place comments in worker::mod to original location
rustworthy Apr 23, 2024
14e06ad
Rm redundate .take on statuses iterator
rustworthy Apr 23, 2024
df8a4b1
Run cargo fmt
rustworthy Apr 23, 2024
2bd215d
Use JoinSet in Worker::run. Update consumer::terminate test.
rustworthy Apr 24, 2024
2d4da65
Update signature in batch ent methods to accept AsRef<BatchId>
rustworthy Apr 24, 2024
0703983
Add serde transparent for newtype ids
rustworthy Apr 24, 2024
55d9bea
Ask for String rather than &str in TlsStream::new
rustworthy Apr 24, 2024
8d91c26
Rm redundant TlsStrean::create_new
rustworthy Apr 24, 2024
a44c63a
Pin serde at 1.0.186 to make min versions pass
rustworthy Apr 24, 2024
a088548
Add WorkerBuilder::add_to_labels method. Demonstrate in test
rustworthy Apr 26, 2024
ce4a65f
Rm empty line in WorkerBuilder::register
rustworthy Apr 26, 2024
da28e8d
Rename WorkerBuilder::register to WorkerBuilder::register_fn
rustworthy Apr 27, 2024
050a28d
Add to WorkerBuilder::register and ::register_fn docs
rustworthy Apr 27, 2024
6644a2f
Re-use WorkerBuilder::connect_with in ::connect
rustworthy Apr 27, 2024
6e5121a
Do not use *Ext as bounds
rustworthy Apr 27, 2024
0490637
Clean up worker::health module
rustworthy Apr 27, 2024
dfda0db
Rm excessive bounds
rustworthy Apr 27, 2024
56ccfb6
Store STATUS_TERMINATING in Fakotry signalled so
rustworthy Apr 27, 2024
44b8cfb
Store STATUS_TERMINATING in Fakotry signalled so
rustworthy Apr 27, 2024
f07d932
Add PartialEq<str> to proto::single::id module newtypes
rustworthy Apr 28, 2024
cdb8caf
Do not require static string in tls::rustls
rustworthy Apr 28, 2024
fd231d1
Use permalink in Worker::listen_for_heartbeats docs
rustworthy Apr 28, 2024
15d1711
Make WorkerBuilder return Self
rustworthy Apr 28, 2024
7724d24
Run cargo fmt on sources
rustworthy Apr 28, 2024
46e5457
Make min version pass
rustworthy Apr 28, 2024
57c958c
Make min version pass. Re-gen lockfile
rustworthy Apr 28, 2024
70d40ad
Make min version pass. Use rustls_pki_types dev dep
rustworthy Apr 28, 2024
758b9a3
Merge branch 'development' into feat/sigterm-handling
rustworthy May 1, 2024
a679c21
Add latest from main branch
rustworthy May 14, 2024
2f61dca
Resotore Timedelta::seconds usage
rustworthy May 14, 2024
1f5d9b1
Use CancellationToken to signal to Worker::run command
rustworthy May 14, 2024
c7e8d7a
Only leave required tokio features
rustworthy May 14, 2024
299e386
Fix typo in comment in toml file
rustworthy May 14, 2024
4a6af8b
Enable feature 'macros' on tokio
rustworthy May 14, 2024
7896865
Add support for blocking handlers
rustworthy Apr 30, 2024
7865459
Revert "Add support for blocking handlers"
rustworthy May 20, 2024
7c523a6
Rm examples
rustworthy May 20, 2024
be48003
Update README.md
rustworthy May 20, 2024
4ce9d50
Rm tracing and examples that are now dedicated PR
rustworthy May 20, 2024
73b2580
Rm tracing and examples that are now dedicated PR
rustworthy May 20, 2024
96e3f2c
Add WorkerBuilder::with_graceful_shutdown
rustworthy May 21, 2024
e59e2ec
Update docs
rustworthy May 21, 2024
8340767
Update README.md
rustworthy May 21, 2024
6093201
Only hadnle cancellation future for non-forever runs
rustworthy May 24, 2024
6e9a150
Add note on cancel (un)safety to Worker::lesten_for_heartbeats
rustworthy May 26, 2024
a50300c
Update src/worker/builder.rs
rustworthy May 29, 2024
397b4c4
Update src/worker/mod.rs
rustworthy May 29, 2024
42e3bbd
Use Duration for graceful shutdown period
rustworthy May 29, 2024
8c2d054
Use no shutdown timeout by default
rustworthy May 29, 2024
2c44785
Rollback docs changes with Worker::builer
rustworthy May 30, 2024
b296315
Intorduce RunCeaseReason enum for Worker::run
rustworthy May 30, 2024
9a8c8e1
Enable gracefull shutdown for Worker::run_to_completion
rustworthy Jun 8, 2024
4b373e0
Add docs on Worker::listern_for_heartbeats usage in Worker::run
rustworthy Jun 8, 2024
55b7453
Merge branch 'main' into feat/sigterm-handling
rustworthy Jun 22, 2024
2586854
Merge branch 'main' into feat/sigterm-handling
rustworthy Jul 15, 2024
bdf7c0d
Dedup use statements in tests::real::community
rustworthy Jul 15, 2024
eee3f6d
Merge branch 'feat/sigterm-handling-latest' into feat/sigterm-handling
rustworthy Jul 15, 2024
f0f1dd7
Update src/worker/mod.rs
rustworthy Jul 15, 2024
b6ca794
Improve docs for WorkerBuilder::with_graceful_shutdown
rustworthy Jul 16, 2024
42304bf
Update src/worker/mod.rs
rustworthy Jul 16, 2024
6240de0
Update src/worker/mod.rs
rustworthy Jul 16, 2024
f41e063
Update src/worker/mod.rs
rustworthy Jul 16, 2024
68628ac
Update sources to use StopReason struct
rustworthy Jul 16, 2024
88f53ee
Introduce StopDetail holding StopReason and nrunning
rustworthy Jul 18, 2024
6edfc08
Add commnent on marking Worker as terminated
rustworthy Jul 18, 2024
b29f2ef
Do not take the signal in Worker::run
rustworthy Jul 18, 2024
d1ad275
Fix typo
rustworthy Jul 18, 2024
bf4d143
Add heart broken test
rustworthy Jul 18, 2024
85adafd
Rm noise in diff
rustworthy Jul 18, 2024
0db6f0c
Add test for re-used Worker
rustworthy Jul 18, 2024
e6f07ce
StopDetails::stop_reason -> StopDetails::reason
rustworthy Jul 19, 2024
e0cb357
StopDetails::stop_reason -> StopDetails::reason
rustworthy Jul 19, 2024
8944098
Update src/worker/stop.rs
rustworthy Jul 21, 2024
5839952
Update src/worker/stop.rs
rustworthy Jul 21, 2024
4c125b6
Update src/worker/stop.rs
rustworthy Jul 21, 2024
529812c
Adjust code to use StopDetails::workers_still_running
rustworthy Jul 21, 2024
27b27a0
Merge branch 'main' into feat/sigterm-handling
rustworthy Jul 21, 2024
4c4dd58
Mark StopDetails as non_exhaustive
rustworthy Jul 21, 2024
4415e60
Disallow Worker::run_one on a terminated worker
rustworthy Jul 21, 2024
8de6397
Avoid Arc<Mutex> for shutdown_signal
jonhoo Jul 21, 2024
6eb5b17
Also handle panics
jonhoo Jul 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,075 changes: 486 additions & 589 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ serde_json = "1.0"
sha2 = "0.10.0"
thiserror = "1.0.30"
tokio = { version = "1.35.1", features = [
"io-util",
"net",
"rt",
"rt-multi-thread",
"time",
"sync", # gives us an asynchronous `Mutex`
"io-util", # enables `AsyncWriteExt`, `AsyncReadExt`, and `AsyncBufReadExt` in `tokio::io` namespace
"macros", # brings in `tokio::select!` we are utilizing in `Worker::run`
"net", # enables `tokio::net` namespace with `TcpStream` we are heavily relying upon
"rt-multi-thread", # allows for `tokio::task::block_in_place()` in Client::drop
"time", # anables `tokio::time` namespace holding the `sleep` utility and `Duraction` struct
] }
tokio-native-tls = { version = "0.3.1", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
Expand All @@ -54,6 +55,7 @@ semver = { version = "1.0.23", features = ["serde"] }
rustls-pki-types = "1.0.1"
tokio = { version = "1.35.1", features = ["rt", "macros"] }
tokio-test = "0.4.3"
tokio-util = "0.7.11"
tracing-subscriber = "0.3.18"
x509-parser = "0.15.1"

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ mod proto;
mod worker;

pub use crate::error::Error;

pub use crate::proto::{
Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
#[cfg_attr(docsrs, doc(cfg(feature = "ent")))]
Expand Down
91 changes: 89 additions & 2 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::{runner::Closure, CallbacksRegistry, Client, Worker};
use super::{runner::Closure, CallbacksRegistry, Client, ShutdownSignal, Worker};
use crate::{
proto::{utils, ClientOptions},
Error, Job, JobRunner, WorkerId,
};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

Expand All @@ -15,6 +16,8 @@
opts: ClientOptions,
workers_count: usize,
callbacks: CallbacksRegistry<E>,
shutdown_timeout: Option<Duration>,
shutdown_signal: Option<ShutdownSignal>,
}

impl<E> Default for WorkerBuilder<E> {
Expand All @@ -33,6 +36,8 @@
opts: ClientOptions::default(),
workers_count: 1,
callbacks: CallbacksRegistry::default(),
shutdown_timeout: None,
shutdown_signal: None,
}
}
}
Expand Down Expand Up @@ -91,6 +96,81 @@
self
}

/// Set a graceful shutdown signal.
///
/// As soon as the provided future resolves, the graceful shutdown will step in
/// making the long-running operation (see [`Worker::run`]) return control to the calling code.
///
/// The graceful shutdown itself is a race between the clean up needed to be performed
/// (e.g. report on the currently processed to the Faktory server) and a shutdown deadline.
/// The latter can be customized via [`WorkerBuilder::shutdown_timeout`].
///
/// Note that once the `signal` resolves, the [`Worker`] will be marked as terminated and calling
/// [`Worker::run`] will cause a panic. You will need to build and run a new worker instead.
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, Job, StopReason, Worker};
/// use std::time::Duration;
/// use tokio_util::sync::CancellationToken;
/// use tokio::time::sleep;
///
/// Client::connect(None)
/// .await
/// .unwrap()
/// .enqueue(Job::new("foobar", vec!["z"]))
/// .await
/// .unwrap();
///
/// // create a signalling future (we are using a utility from the `tokio_util` crate)
/// let token = CancellationToken::new();
/// let child_token = token.child_token();
/// let signal = async move { child_token.cancelled().await };
///
/// // get a connected worker
/// let mut w = Worker::builder()
/// .with_graceful_shutdown(signal)
/// .register_fn("job_type", move |_| async { Ok::<(), std::io::Error>(()) })
/// .connect(None)
/// .await
/// .unwrap();
///
/// // start consuming
/// let jh = tokio::spawn(async move { w.run(&["default"]).await });
///
/// // verify the consumer thread has not finished
/// sleep(Duration::from_secs(2)).await;
/// assert!(!jh.is_finished());
///
/// // send a signal to eventually return control (upon graceful shutdown)
/// token.cancel();
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
///
/// // learn the stop reason and the number of workers that were still running
/// let stop_details = jh.await.expect("joined ok").unwrap();
/// assert_eq!(stop_details.reason, StopReason::GracefulShutdown);
/// let _nrunning = stop_details.workers_still_running;
/// # });
/// ```
pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
where
F: Future<Output = ()> + 'static + Send,
{
self.shutdown_signal = Some(Box::pin(signal));
self
}

/// Set a shutdown timeout.
///
/// This will be used once the worker is sent a termination signal whether it is at the application
/// (via a signalling future, see [`WorkerBuilder::with_graceful_shutdown`]) or OS level (via Ctrl-C signal,
/// see [`Worker::run_to_completion`]).
///
/// Defaults to `None`, i.e. no shoutdown abortion due to a timeout.
pub fn shutdown_timeout(mut self, dur: Duration) -> Self {
self.shutdown_timeout = Some(dur);
self
}

/// Register a handler function for the given job type (`kind`).
///
/// Whenever a job whose type matches `kind` is fetched from the Faktory, the given handler
Expand Down Expand Up @@ -161,7 +241,14 @@
self.opts.is_worker = true;
let buffered = BufStream::new(stream);
let client = Client::new(buffered, self.opts).await?;
Ok(Worker::new(client, self.workers_count, self.callbacks).await)
Ok(Worker::new(
client,
self.workers_count,
self.callbacks,
self.shutdown_timeout,
self.shutdown_signal,
)
.await)

Check warning on line 251 in src/worker/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/builder.rs#L251

Added line #L251 was not covered by tests
}

/// Connect to a Faktory server.
Expand Down
6 changes: 5 additions & 1 deletion src/worker/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ where
/// but should _continue_ processing its current job (if any);
///
/// See more details [here](https://github.com/contribsys/faktory/blob/b4a93227a3323ab4b1365b0c37c2fac4f9588cc8/server/workers.go#L13-L49).
///
/// Note that this method is not cancellation safe. We are using an interval timer internally, that
/// would be reset should we call this method anew. Besides, the `Heartbeat` command is being issued
/// with the help of `AsyncWriteExt::write_all` which is not cancellation safe either.
pub(crate) async fn listen_for_heartbeats(
&mut self,
statuses: &[Arc<atomic::AtomicUsize>],
Expand All @@ -51,7 +55,7 @@ where
}

if last.elapsed() < HEARTBEAT_INTERVAL {
// don't sent a heartbeat yet
// don't send a heartbeat yet
continue;
}

Expand Down
Loading
Loading