diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 934e8aebe..8813d27d8 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -790,9 +790,8 @@ mod test { Some(WorkerRuntimeOpts::UserWorker(UserWorkerRuntimeOpts { memory_limit_mb: memory_limit, worker_timeout_ms, - cpu_burst_interval_ms: 100, - cpu_time_threshold_ms: 50, - max_cpu_bursts: 10, + cpu_time_soft_limit_ms: 100, + cpu_time_hard_limit_ms: 200, low_memory_multiplier: 5, force_create: true, net_access_disabled: false, diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index d060b7718..f7b0cd851 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -19,7 +19,7 @@ use sb_worker_context::essentials::{ }; use std::path::PathBuf; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::net::UnixStream; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, oneshot}; @@ -115,7 +115,11 @@ pub fn create_supervisor( // Note: CPU timer must be started in the same thread as the worker runtime let (cpu_alarms_tx, mut cpu_alarms_rx) = mpsc::unbounded_channel::<()>(); - let cputimer = CPUTimer::start(conf.cpu_time_threshold_ms, CPUAlarmVal { cpu_alarms_tx })?; + let cputimer = CPUTimer::start( + conf.cpu_time_soft_limit_ms, + conf.cpu_time_hard_limit_ms, + CPUAlarmVal { cpu_alarms_tx }, + )?; let thread_name = format!("sb-sup-{:?}", key); let _handle = thread::Builder::new() @@ -130,8 +134,8 @@ pub fn create_supervisor( let (isolate_memory_usage_tx, isolate_memory_usage_rx) = oneshot::channel::(); let future = async move { - let mut bursts = 0; - let mut last_burst = Instant::now(); + let mut cpu_time_soft_limit_reached = false; + let mut wall_clock_alerts = 0; // reduce 100ms from wall clock duration, so the interrupt can be handled before // isolate is dropped @@ -142,22 +146,26 @@ pub fn create_supervisor( let wall_clock_duration_alert = tokio::time::interval(wall_clock_duration.checked_div(2).unwrap_or(Duration::from_millis(0))); tokio::pin!(wall_clock_duration_alert); - let mut wall_clock_alerts = 0; - loop { tokio::select! { Some(_) = cpu_alarms_rx.recv() => { - if last_burst.elapsed().as_millis() > (conf.cpu_burst_interval_ms as u128) { - bursts += 1; - last_burst = Instant::now(); - } - if bursts > conf.max_cpu_bursts { + if !cpu_time_soft_limit_reached { + // retire worker + if let Some(tx) = pool_msg_tx.clone() { + if tx.send(UserWorkerMsgs::Retire(key)).is_err() { + error!("failed to send retire msg to pool: {:?}", key); + } + } + error!("CPU time soft limit reached. isolate: {:?}", key); + cpu_time_soft_limit_reached = true; + } else { + // shutdown worker let interrupt_data = IsolateInterruptData { should_terminate: true, isolate_memory_usage_tx }; thread_safe_handle.request_interrupt(handle_interrupt, Box::into_raw(Box::new(interrupt_data)) as *mut std::ffi::c_void); - error!("CPU time limit reached. isolate: {:?}", key); + error!("CPU time hard limit reached. isolate: {:?}", key); return ShutdownReason::CPUTime; } } diff --git a/crates/cpu_timer/src/lib.rs b/crates/cpu_timer/src/lib.rs index d28846cee..95a64de12 100644 --- a/crates/cpu_timer/src/lib.rs +++ b/crates/cpu_timer/src/lib.rs @@ -25,7 +25,11 @@ pub struct CPUTimer {} impl CPUTimer { #[cfg(target_os = "linux")] - pub fn start(interval: u64, cpu_alarm_val: CPUAlarmVal) -> Result { + pub fn start( + initial_expiry: u64, + interval: u64, + cpu_alarm_val: CPUAlarmVal, + ) -> Result { let mut timerid = TimerId(std::ptr::null_mut()); let val_ptr = Box::into_raw(Box::new(cpu_alarm_val)); let sival_ptr: *mut libc::c_void = val_ptr as *mut libc::c_void; @@ -47,11 +51,15 @@ impl CPUTimer { bail!(std::io::Error::last_os_error()) } + let initial_expiry_secs = initial_expiry / 1000; + let initial_expiry_msecs = initial_expiry % 1000; + let interval_secs = interval / 1000; + let interval_msecs = interval % 1000; let mut tmspec: libc::itimerspec = unsafe { std::mem::zeroed() }; - tmspec.it_interval.tv_sec = 0; - tmspec.it_interval.tv_nsec = (interval as i64) * 1_000_000; - tmspec.it_value.tv_sec = 0; - tmspec.it_value.tv_nsec = (interval as i64) * 1_000_000; + tmspec.it_value.tv_sec = initial_expiry_secs as i64; + tmspec.it_value.tv_nsec = (initial_expiry_msecs as i64) * 1_000_000; + tmspec.it_interval.tv_sec = interval_secs as i64; + tmspec.it_interval.tv_nsec = (interval_msecs as i64) * 1_000_000; if unsafe { // start the timer with an expiry @@ -68,7 +76,7 @@ impl CPUTimer { } #[cfg(not(target_os = "linux"))] - pub fn start(_: u64, _: CPUAlarmVal) -> Result { + pub fn start(_: u64, _: u64, _: CPUAlarmVal) -> Result { println!("CPU timer: not enabled (need Linux)"); Ok(Self {}) } @@ -120,5 +128,6 @@ pub fn get_thread_time() -> Result { return Err(std::io::Error::last_os_error().into()); } - Ok(time.tv_nsec) + // convert seconds to nanoseconds and add to nsec value + Ok(time.tv_sec * 1_000_000_000 + time.tv_nsec) } diff --git a/crates/sb_worker_context/essentials.rs b/crates/sb_worker_context/essentials.rs index 7eb2ef677..8a87d459e 100644 --- a/crates/sb_worker_context/essentials.rs +++ b/crates/sb_worker_context/essentials.rs @@ -23,9 +23,8 @@ pub struct UserWorkerRuntimeOpts { pub worker_timeout_ms: u64, // wall clock limit - pub cpu_time_threshold_ms: u64, - pub cpu_burst_interval_ms: u64, - pub max_cpu_bursts: u64, + pub cpu_time_soft_limit_ms: u64, + pub cpu_time_hard_limit_ms: u64, pub force_create: bool, pub net_access_disabled: bool, @@ -39,9 +38,8 @@ impl Default for UserWorkerRuntimeOpts { memory_limit_mb: 512, worker_timeout_ms: 5 * 60 * 1000, low_memory_multiplier: 5, - max_cpu_bursts: 10, - cpu_burst_interval_ms: 100, - cpu_time_threshold_ms: 50, + cpu_time_soft_limit_ms: 50, + cpu_time_hard_limit_ms: 100, force_create: false, key: None, diff --git a/crates/sb_workers/lib.rs b/crates/sb_workers/lib.rs index 99a337782..83cb78878 100644 --- a/crates/sb_workers/lib.rs +++ b/crates/sb_workers/lib.rs @@ -55,9 +55,8 @@ pub struct UserWorkerCreateOptions { memory_limit_mb: u64, low_memory_multiplier: u64, worker_timeout_ms: u64, - cpu_time_threshold_ms: u64, - max_cpu_bursts: u64, - cpu_burst_interval_ms: u64, + cpu_time_soft_limit_ms: u64, + cpu_time_hard_limit_ms: u64, } #[op2(async)] @@ -87,9 +86,8 @@ pub async fn op_user_worker_create( memory_limit_mb, low_memory_multiplier, worker_timeout_ms, - cpu_time_threshold_ms, - max_cpu_bursts, - cpu_burst_interval_ms, + cpu_time_soft_limit_ms, + cpu_time_hard_limit_ms, } = opts; let mut env_vars_map = HashMap::new(); @@ -110,9 +108,8 @@ pub async fn op_user_worker_create( memory_limit_mb, low_memory_multiplier, worker_timeout_ms, - cpu_time_threshold_ms, - max_cpu_bursts, - cpu_burst_interval_ms, + cpu_time_soft_limit_ms, + cpu_time_hard_limit_ms, force_create, net_access_disabled, allow_remote_modules, diff --git a/crates/sb_workers/user_workers.js b/crates/sb_workers/user_workers.js index 3f7756f8a..d69eecbdb 100644 --- a/crates/sb_workers/user_workers.js +++ b/crates/sb_workers/user_workers.js @@ -99,9 +99,8 @@ class UserWorker { memoryLimitMb: 512, lowMemoryMultiplier: 5, workerTimeoutMs: 5 * 60 * 1000, - cpuTimeThresholdMs: 50, - cpuBurstIntervalMs: 100, - maxCpuBursts: 10, + cpuTimeSoftLimitMs: 50, + cpuTimeHardLimitMs: 100, noModuleCache: false, importMapPath: null, envVars: [], diff --git a/examples/main/index.ts b/examples/main/index.ts index a8a32afaa..8adf3d7da 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -51,6 +51,9 @@ serve(async (req: Request) => { // const maybeEntrypoint = 'file:///src/index.ts'; // or load module source from an inline module // const maybeModuleCode = 'Deno.serve((req) => new Response("Hello from Module Code"));'; + // + const cpuTimeSoftLimitMs = 50; + const cpuTimeHardLimitMs = 100; return await EdgeRuntime.userWorkers.create({ servicePath, @@ -61,6 +64,8 @@ serve(async (req: Request) => { envVars, forceCreate, netAccessDisabled, + cpuTimeSoftLimitMs, + cpuTimeHardLimitMs, // maybeEszip, // maybeEntrypoint, // maybeModuleCode,