Skip to content

Commit

Permalink
feat: support graceful shutdown by unix signals (#298)
Browse files Browse the repository at this point in the history
* stamp: conn drain

* stamp: make `graceful-exit-timeout` flag as default

* stamp: consider graceful shutdown for http connection

* stamp: clippy and adjust warning message

* stamp: update integration test macro

* stamp: expose some debug metrics

* stamp: polishing integration tests and add a test for graceful shutdown

* stamp: realign expected test output

* stamp: polishing

* stamp: final adjustment

* stamp: add `test_indefinitely.sh` into `scripts`

* stamp: introduce `experimental-graceful-exit-keepalive-deadline-ratio` flag

* stamp: add a comment

* stamp: update `Cargo.lock`

* stamp: adjust unix signal handling routine
  • Loading branch information
nyannyacha authored Apr 22, 2024
1 parent 4cb0bb7 commit 28faf77
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 98 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 98 additions & 30 deletions crates/base/src/macros/test_macros.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#[macro_export]
macro_rules! integration_test {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $termination_token:expr)?) => {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
use futures_util::FutureExt;
use $crate::macros::test_macros::__private;

let (tx, mut rx) = tokio::sync::mpsc::channel::<base::server::ServerHealth>(1);

let req_builder: Option<reqwest::RequestBuilder> = $req_builder;
let tls: Option<base::server::Tls> = $tls;
let schema = if tls.is_some() { "https" } else { "http" };
let signal = tokio::spawn(async move {
while let Some(base::server::ServerHealth::Listening(event_rx)) = rx.recv().await {
integration_test!(@req event_rx, $port, $url, req_builder, ($($function)+));
while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await {
integration_test!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+));
}
None
});

let termination_token = integration_test!(@term $(, $termination_token)?);
let token = integration_test!(@term $(, $($token)+)?);
let mut listen_fut = base::commands::start_server(
"0.0.0.0",
$port,
Expand All @@ -30,7 +32,7 @@ macro_rules! integration_test {
main: None,
events: None,
},
termination_token.clone(),
token.clone(),
vec![],
None
)
Expand All @@ -55,55 +57,121 @@ macro_rules! integration_test {

// if we have a termination token, we should advence a listen future to
// the end for a graceful exit. (3)
if let Some(token) = termination_token {
if let Some(token) = token {
let join_fut = tokio::spawn(async move {
let _ = listen_fut.await;
});

let wait_fut = async move {
token.cancel_and_wait().await;
join_fut.await.unwrap();
};

if tokio::time::timeout(
// XXX(Nyannyacha): Should we apply variable timeout?
core::time::Duration::from_secs(10),
wait_fut
)
.await
.is_err()
{
panic!("failed to terminate server within 10 seconds");
}
integration_test!(@term_cleanup $($($token)+)?, token, join_fut);
}
};

(@term , $termination_token:expr) => {
Some($termination_token)
(@term , #[manual] $token:expr) => {
Some($token)
};

(@term , $token:expr) => {
Some($token)
};


(@term) => {
None
};

(@req $event_rx:ident, $port:expr, $url:expr, $req_builder:expr, ($req:expr, $_:expr)) => {
return $req($port, $url, $req_builder, $event_rx).await;
(@term_cleanup $(#[manual] $_:expr)?, $__:ident, $___:ident) => {};
(@term_cleanup $_:expr, $token:ident, $join_fut:ident) => {
let wait_fut = async move {
$token.cancel_and_wait().await;
$join_fut.await.unwrap();
};

if tokio::time::timeout(
// XXX(Nyannyacha): Should we apply variable timeout?
core::time::Duration::from_secs(10),
wait_fut
)
.await
.is_err()
{
panic!("failed to terminate server within 10 seconds");
}
};

(@req $event_rx:ident, $metric_src:ident, $schema:expr, $port:expr, $url:expr, $req_builder:expr, ($req:expr, $_:expr)) => {
if let Some(resp) = __private::infer_req_closure_signature(
$req,
(
$port,
$url,
$req_builder,
$event_rx,
$metric_src,
)
)
.await
{
return Some(resp);
} else {
let resp = reqwest::get(format!("{}://localhost:{}/{}", $schema, $port, $url)).await;
return Some(resp);
}
};

(@req $_:ident, $port:expr, $url:expr, $req_builder:expr, $__:expr) => {
(@req $_:ident, $__:ident, $schema:expr, $port:expr, $url:expr, $req_builder:expr, $___:expr) => {
if let Some(req_factory) = $req_builder {
return Some(req_factory.send().await);
} else {
let req = reqwest::get(format!("http://localhost:{}/{}", $port, $url)).await;
return Some(req);
let resp = reqwest::get(format!("{}://localhost:{}/{}", $schema, $port, $url)).await;
return Some(resp);
}
};

(@resp $var:ident, ($_:expr, $resp:expr)) => {
$resp($var)
__private::infer_resp_closure_signature($resp, $var)
};

(@resp $var:ident, $resp:expr) => {
$resp($var)
__private::infer_resp_closure_signature($resp, $var)
};
}

#[doc(hidden)]
pub mod __private {
use std::future::Future;

use reqwest::{Error, RequestBuilder, Response};
use sb_core::SharedMetricSource;
use tokio::sync::mpsc;

use crate::server::ServerEvent;

/// NOTE(Nyannyacha): This was defined to enable pattern matching in closure
/// argument positions.
type ReqTuple = (
u16,
&'static str,
Option<RequestBuilder>,
mpsc::UnboundedReceiver<ServerEvent>,
SharedMetricSource,
);

pub async fn infer_req_closure_signature<F, R>(
closure: F,
args: ReqTuple,
) -> Option<Result<Response, Error>>
where
F: FnOnce(ReqTuple) -> R,
R: Future<Output = Option<Result<Response, Error>>>,
{
closure(args).await
}

pub async fn infer_resp_closure_signature<F, R>(closure: F, arg0: Result<Response, Error>)
where
F: FnOnce(Result<Response, Error>) -> R,
R: Future<Output = ()>,
{
closure(arg0).await;
}
}
Loading

0 comments on commit 28faf77

Please sign in to comment.