Skip to content

Commit

Permalink
fix: graceful shutdown should wait until the event worker has finishe…
Browse files Browse the repository at this point in the history
…d all its work (#426)

* fix: graceful shutdown should wait until the event worker has finished all its work

* chore: update event manager example
  • Loading branch information
nyannyacha authored Oct 13, 2024
1 parent 5f87d9a commit fbddb16
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 78 deletions.
28 changes: 14 additions & 14 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::ffi::c_void;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::task::Poll;
Expand Down Expand Up @@ -197,8 +198,8 @@ impl GetRuntimeContext for () {
}

pub struct DenoRuntime<RuntimeContext = ()> {
pub js_runtime: ManuallyDrop<JsRuntime>,
pub drop_token: CancellationToken,
pub js_runtime: JsRuntime,
pub env_vars: HashMap<String, String>, // TODO: does this need to be pub?
pub conf: WorkerRuntimeOpts,

Expand All @@ -218,14 +219,18 @@ pub struct DenoRuntime<RuntimeContext = ()> {

impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
fn drop(&mut self) {
self.drop_token.cancel();

if self.conf.is_user_worker() {
self.js_runtime.v8_isolate().remove_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Arc::as_ptr(&self.mem_check) as *mut _,
);
}

unsafe {
ManuallyDrop::drop(&mut self.js_runtime);
}

self.drop_token.cancel();
}
}

Expand All @@ -250,12 +255,11 @@ where
maybe_inspector: Option<Inspector>,
) -> Result<Self, Error> {
let WorkerContextInitOpts {
mut conf,
service_path,
no_module_cache,
import_map_path,
env_vars,
events_rx,
conf,
maybe_eszip,
maybe_entrypoint,
maybe_decorator,
Expand Down Expand Up @@ -536,7 +540,7 @@ where
..Default::default()
};

let mut js_runtime = JsRuntime::new(runtime_options);
let mut js_runtime = ManuallyDrop::new(JsRuntime::new(runtime_options));
let version: Option<&str> = option_env!("GIT_V_TAG");

{
Expand Down Expand Up @@ -610,10 +614,10 @@ where

let mut env_vars = env_vars.clone();

if conf.is_events_worker() {
// if worker is an events worker, assert events_rx is to be available
op_state
.put::<mpsc::UnboundedReceiver<WorkerEventWithMetadata>>(events_rx.unwrap());
if let Some(opts) = conf.as_events_worker_mut() {
op_state.put::<mpsc::UnboundedReceiver<WorkerEventWithMetadata>>(
opts.events_msg_rx.take().unwrap(),
);
}

if conf.is_main_worker() || conf.is_user_worker() {
Expand Down Expand Up @@ -1215,7 +1219,6 @@ mod test {
static_patterns,
maybe_jsx_import_source_config: jsx_import_source_config,

events_rx: None,
timing: None,

import_map_path: None,
Expand Down Expand Up @@ -1290,7 +1293,6 @@ mod test {
no_module_cache: false,
import_map_path: None,
env_vars: Default::default(),
events_rx: None,
timing: None,
maybe_eszip: None,
maybe_entrypoint: None,
Expand Down Expand Up @@ -1337,7 +1339,6 @@ mod test {
no_module_cache: false,
import_map_path: None,
env_vars: Default::default(),
events_rx: None,
timing: None,
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
maybe_entrypoint: None,
Expand Down Expand Up @@ -1401,7 +1402,6 @@ mod test {
no_module_cache: false,
import_map_path: None,
env_vars: Default::default(),
events_rx: None,
timing: None,
maybe_eszip: Some(EszipPayloadKind::VecKind(eszip_code)),
maybe_entrypoint: None,
Expand Down
85 changes: 52 additions & 33 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts, WorkerExit, Wor
use std::any::Any;
use std::future::{pending, Future};
use std::pin::Pin;
use std::time::Duration;
use tokio::io;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{self, Receiver, Sender};
Expand Down Expand Up @@ -111,7 +112,6 @@ impl Worker {
let method_cloner = self.clone();
let timing = opts.timing.take();
let worker_kind = opts.conf.to_worker_kind();
let maybe_main_worker_opts = opts.conf.as_main_worker().cloned();

let cancel = self.cancel.clone();
let rt = if worker_kind.is_user_worker() {
Expand All @@ -130,10 +130,8 @@ impl Worker {
let permit = DenoRuntime::acquire().await;
let result = match DenoRuntime::new(opts, inspector).await {
Ok(new_runtime) => {
let mut runtime = scopeguard::guard(new_runtime, |mut runtime| {
unsafe {
runtime.js_runtime.v8_isolate().enter();
}
let mut runtime = scopeguard::guard(new_runtime, |mut runtime| unsafe {
runtime.js_runtime.v8_isolate().enter();
});

unsafe {
Expand All @@ -143,12 +141,10 @@ impl Worker {
drop(permit);

let metric_src = {
let js_runtime = &mut runtime.js_runtime;
let metric_src = WorkerMetricSource::from_js_runtime(js_runtime);

if worker_kind.is_main_worker() {
let opts = maybe_main_worker_opts.unwrap();
let state = js_runtime.op_state();
let metric_src =
WorkerMetricSource::from_js_runtime(&mut runtime.js_runtime);
if let Some(opts) = runtime.conf.as_main_worker().cloned() {
let state = runtime.js_runtime.op_state();
let mut state_mut = state.borrow_mut();
let metric_src = RuntimeMetricSource::new(
metric_src.clone(),
Expand All @@ -173,7 +169,6 @@ impl Worker {
let _cpu_timer;
let mut supervise_cancel_token = None;

// TODO: Allow customization of supervisor
let termination_fut = if worker_kind.is_user_worker() {
// cputimer is returned from supervisor and assigned here to keep it in scope.
let Ok((maybe_timer, cancel_token)) = create_supervisor(
Expand Down Expand Up @@ -207,27 +202,50 @@ impl Worker {
)
};

let maybe_event_worker_ctx =
runtime.conf.as_events_worker().map(|it| {
Duration::from_secs(
it.event_worker_exit_deadline_sec.unwrap_or(10),
)
});

base_rt::SUPERVISOR_RT
.spawn(async move {
token.inbound.cancelled().await;
termination_request_token.cancel();

let data_ptr_mut =
Box::into_raw(Box::new(supervisor::IsolateInterruptData {
should_terminate: true,
isolate_memory_usage_tx: None,
}));

if !thread_safe_handle.request_interrupt(
supervisor::handle_interrupt,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });

let mut already_terminated = false;
if let Some(dur) = maybe_event_worker_ctx {
already_terminated = tokio::time::timeout(dur, async {
while !is_terminated.is_raised() {
waker.wake();
tokio::task::yield_now().await;
}
})
.await
.is_ok();
}

while !is_terminated.is_raised() {
waker.wake();
tokio::task::yield_now().await;
if !already_terminated {
termination_request_token.cancel();

let data_ptr_mut = Box::into_raw(Box::new(
supervisor::IsolateInterruptData {
should_terminate: true,
isolate_memory_usage_tx: None,
},
));

if !thread_safe_handle.request_interrupt(
supervisor::handle_interrupt,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });
}

while !is_terminated.is_raised() {
waker.wake();
tokio::task::yield_now().await;
}
}

let _ = termination_event_tx.send(WorkerEvents::Shutdown(
Expand All @@ -251,7 +269,9 @@ impl Worker {
let _guard = scopeguard::guard((), |_| {
worker_key.and_then(|worker_key_unwrapped| {
pool_msg_tx.map(|tx| {
if let Err(err) = tx.send(UserWorkerMsgs::Shutdown(worker_key_unwrapped)) {
if let Err(err) =
tx.send(UserWorkerMsgs::Shutdown(worker_key_unwrapped))
{
error!(
"failed to send the shutdown signal to user worker pool: {:?}",
err
Expand All @@ -269,7 +289,6 @@ impl Worker {
}
});


let result = method_cloner
.handle_creation(
&mut runtime,
Expand All @@ -284,10 +303,10 @@ impl Worker {
Ok(WorkerEvents::UncaughtException(ev)) => Some(ev.clone()),
Err(err) => Some(UncaughtExceptionEvent {
cpu_time_used: 0,
exception: err.to_string()
exception: err.to_string(),
}),

_ => None
_ => None,
};

if let Some(ev) = maybe_uncaught_exception_event {
Expand Down Expand Up @@ -342,7 +361,7 @@ impl Worker {
};

send_event_if_event_worker_available(
events_msg_tx.clone(),
events_msg_tx.as_ref(),
event,
event_metadata.clone(),
);
Expand Down
20 changes: 12 additions & 8 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::deno_runtime::DenoRuntime;
use crate::inspector_server::Inspector;
use crate::server::ServerFlags;
use crate::timeout::{self, CancelOnWriteTimeout, ReadTimeoutStream};
use crate::utils::send_event_if_event_worker_available;

Expand Down Expand Up @@ -75,7 +76,7 @@ impl TerminationToken {
pub fn child_token(&self) -> Self {
Self {
inbound: self.inbound.child_token(),
outbound: self.outbound.clone(),
outbound: self.outbound.child_token(),
}
}

Expand Down Expand Up @@ -648,7 +649,7 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
.as_millis();

send_event_if_event_worker_available(
worker_struct_ref.events_msg_tx.clone(),
worker_struct_ref.events_msg_tx.as_ref(),
WorkerEvents::Boot(BootEvent {
boot_time: elapsed as usize,
}),
Expand Down Expand Up @@ -749,7 +750,6 @@ pub async fn create_main_worker(
service_path,
import_map_path,
no_module_cache,
events_rx: None,
timing: None,
maybe_eszip,
maybe_entrypoint,
Expand All @@ -772,9 +772,9 @@ pub async fn create_main_worker(
}

pub async fn create_events_worker(
flags: &ServerFlags,
events_worker_path: PathBuf,
import_map_path: Option<String>,
no_module_cache: bool,
maybe_entrypoint: Option<String>,
maybe_decorator: Option<DecoratorType>,
termination_token: Option<TerminationToken>,
Expand All @@ -796,16 +796,18 @@ pub async fn create_events_worker(
(
WorkerContextInitOpts {
service_path,
no_module_cache,
no_module_cache: flags.no_module_cache,
import_map_path,
env_vars: std::env::vars().collect(),
events_rx: Some(events_rx),
timing: None,
maybe_eszip,
maybe_entrypoint,
maybe_decorator,
maybe_module_code: None,
conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {}),
conf: WorkerRuntimeOpts::EventsWorker(EventWorkerRuntimeOpts {
events_msg_rx: Some(events_rx),
event_worker_exit_deadline_sec: Some(flags.event_worker_exit_deadline_sec),
}),
static_patterns: vec![],
maybe_jsx_import_source_config: None,
},
Expand Down Expand Up @@ -885,7 +887,7 @@ pub async fn create_user_worker_pool(
}
},
..worker_options
}, tx, termination_token.as_ref().map(|it| it.child_token()));
}, tx, token.map(TerminationToken::child_token));
}

Some(UserWorkerMsgs::Created(key, profile)) => {
Expand Down Expand Up @@ -916,6 +918,8 @@ pub async fn create_user_worker_pool(
}
}

worker_pool.worker_event_sender.take();

Ok(())
}
});
Expand Down
1 change: 0 additions & 1 deletion crates/base/src/rt_worker/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ impl WorkerPool {
no_module_cache,
import_map_path,
env_vars,
events_rx: None,
timing: None,
conf,
maybe_eszip,
Expand Down
Loading

0 comments on commit fbddb16

Please sign in to comment.