From 5d29bdfb0cb9e2562c662c76a362bb823f88ffd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Kub=C3=ADk?= Date: Thu, 10 Aug 2023 10:11:00 +0200 Subject: [PATCH 1/4] io: delegate `WriteHalf::poll_write_vectored` (#5914) --- tokio/src/io/split.rs | 17 +++++++++++++++++ tokio/tests/io_split.rs | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs index f067b65a826..63f0960e4f3 100644 --- a/tokio/src/io/split.rs +++ b/tokio/src/io/split.rs @@ -35,9 +35,12 @@ cfg_io_util! { where T: AsyncRead + AsyncWrite, { + let is_write_vectored = stream.is_write_vectored(); + let inner = Arc::new(Inner { locked: AtomicBool::new(false), stream: UnsafeCell::new(stream), + is_write_vectored, }); let rd = ReadHalf { @@ -53,6 +56,7 @@ cfg_io_util! { struct Inner { locked: AtomicBool, stream: UnsafeCell, + is_write_vectored: bool, } struct Guard<'a, T> { @@ -131,6 +135,19 @@ impl AsyncWrite for WriteHalf { let mut inner = ready!(self.inner.poll_lock(cx)); inner.stream_pin().poll_shutdown(cx) } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + let mut inner = ready!(self.inner.poll_lock(cx)); + inner.stream_pin().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored + } } impl Inner { diff --git a/tokio/tests/io_split.rs b/tokio/tests/io_split.rs index 77b77a3a04c..9f17c9eb14e 100644 --- a/tokio/tests/io_split.rs +++ b/tokio/tests/io_split.rs @@ -1,7 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery -use tokio::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf}; +use tokio::io::{ + split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf, ReadHalf, WriteHalf, +}; use std::io; use std::pin::Pin; @@ -36,6 +38,18 @@ impl AsyncWrite for RW { fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } + + fn poll_write_vectored( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Poll::Ready(Ok(2)) + } + + fn is_write_vectored(&self) -> bool { + true + } } #[test] @@ -77,3 +91,26 @@ fn unsplit_err2() { let (r, _) = split(RW); r.unsplit(w); } + +#[test] +fn method_delegation() { + let (mut r, mut w) = split(RW); + let mut buf = [0; 1]; + + tokio_test::block_on(async move { + assert_eq!(1, r.read(&mut buf).await.unwrap()); + assert_eq!(b'z', buf[0]); + + assert_eq!(1, w.write(&[b'x']).await.unwrap()); + assert_eq!( + 2, + w.write_vectored(&[io::IoSlice::new(&[b'x'])]) + .await + .unwrap() + ); + assert!(w.is_write_vectored()); + + assert!(w.flush().await.is_ok()); + assert!(w.shutdown().await.is_ok()); + }); +} From dd23f08c3a35495b4b53defa81ec0ca2c75a5f7d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 10 Aug 2023 09:18:10 -0700 Subject: [PATCH 2/4] rt(alt): fix memory leak and increase max preemption when running Loom CI tests (#5911) The memory leak was caused by a bug during shutdown where some state was leaked. --- .github/workflows/loom.yml | 2 - .../scheduler/multi_thread_alt/handle.rs | 2 +- .../scheduler/multi_thread_alt/idle.rs | 34 ++- .../scheduler/multi_thread_alt/worker.rs | 194 ++++++++++-------- 4 files changed, 124 insertions(+), 108 deletions(-) diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index efd0eb32661..e7528090205 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -120,5 +120,3 @@ jobs: working-directory: tokio env: SCOPE: ${{ matrix.scope }} - # TODO: remove this before stabilizing - LOOM_MAX_PREEMPTIONS: 1 diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs index e0353f8da6e..d746bca1a18 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle.rs @@ -39,7 +39,7 @@ impl Handle { } pub(crate) fn shutdown(&self) { - self.shared.close(); + self.shared.close(self); self.driver.unpark(); } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs index 9f08a8cdfbc..ae2fb8b4dae 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs @@ -4,7 +4,7 @@ use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::MutexGuard; -use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Shared}; +use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared}; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; @@ -146,23 +146,13 @@ impl Idle { // Find a sleeping worker if let Some(worker) = synced.idle.sleepers.pop() { // Find an available core - if let Some(mut core) = synced.idle.available_cores.pop() { + if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) { debug_assert!(!core.is_searching); core.is_searching = true; - self.idle_map.unset(core.index); - debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); - // Assign the core to the worker synced.assigned_cores[worker] = Some(core); - let num_idle = synced.idle.available_cores.len(); - #[cfg(not(loom))] - debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1); - - // Update the number of sleeping workers - self.num_idle.store(num_idle, Release); - // Drop the lock before notifying the condvar. drop(synced); @@ -198,6 +188,7 @@ impl Idle { for _ in 0..num { if let Some(worker) = synced.idle.sleepers.pop() { + // TODO: can this be switched to use next_available_core? if let Some(core) = synced.idle.available_cores.pop() { debug_assert!(!core.is_searching); @@ -236,15 +227,10 @@ impl Idle { // eventually find the cores and shut them down. while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() { let worker = synced.idle.sleepers.pop().unwrap(); - let core = synced.idle.available_cores.pop().unwrap(); - - self.idle_map.unset(core.index); + let core = self.try_acquire_available_core(&mut synced.idle).unwrap(); synced.assigned_cores[worker] = Some(core); shared.condvars[worker].notify_one(); - - self.num_idle - .store(synced.idle.available_cores.len(), Release); } debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); @@ -255,6 +241,18 @@ impl Idle { } } + pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) { + // If there are any remaining cores, shut them down here. + // + // This code is a bit convoluted to avoid lock-reentry. + while let Some(core) = { + let mut synced = shared.synced.lock(); + self.try_acquire_available_core(&mut synced.idle) + } { + shared.shutdown_core(handle, core); + } + } + /// The worker releases the given core, making it available to other workers /// that are waiting. pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box) { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index e9bb4fd9f33..1bc9a784023 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -560,7 +560,7 @@ impl Worker { // First try to acquire an available core if let Some(core) = self.try_acquire_available_core(cx, &mut synced) { // Try to poll a task from the global queue - let maybe_task = self.next_remote_task_synced(cx, &mut synced); + let maybe_task = cx.shared().next_remote_task_synced(&mut synced); (maybe_task, core) } else { // block the thread to wait for a core to be assinged to us @@ -589,10 +589,7 @@ impl Worker { } } - self.pre_shutdown(cx, &mut core); - - // Signal shutdown - self.shutdown_core(cx, core); + cx.shared().shutdown_core(&cx.handle, core); // It is possible that tasks wake others during drop, so we need to // clear the defer list. @@ -746,7 +743,7 @@ impl Worker { } } - if let Some(task) = self.next_local_task(&mut core) { + if let Some(task) = core.next_local_task() { return Ok((Some(task), core)); } @@ -759,12 +756,7 @@ impl Worker { } let mut synced = cx.shared().synced.lock(); - self.next_remote_task_synced(cx, &mut synced) - } - - fn next_remote_task_synced(&self, cx: &Context, synced: &mut Synced) -> Option { - // safety: we only have access to a valid `Synced` in this file. - unsafe { cx.shared().inject.pop(&mut synced.inject) } + cx.shared().next_remote_task_synced(&mut synced) } fn next_remote_task_batch(&self, cx: &Context, mut core: Box) -> NextTaskResult { @@ -818,14 +810,6 @@ impl Worker { ret } - fn next_local_task(&self, core: &mut Core) -> Option { - self.next_lifo_task(core).or_else(|| core.run_queue.pop()) - } - - fn next_lifo_task(&self, core: &mut Core) -> Option { - core.lifo_slot.take() - } - /// Function responsible for stealing tasks from another worker /// /// Note: Only if less than half the workers are searching for tasks to steal @@ -948,7 +932,7 @@ impl Worker { }; // Check for a task in the LIFO slot - let task = match self.next_lifo_task(&mut core) { + let task = match core.next_lifo_task() { Some(task) => task, None => { self.reset_lifo_enabled(cx); @@ -1229,7 +1213,7 @@ impl Worker { if cx.shared().inject.is_closed(&mut synced.inject) { synced.shutdown_driver = Some(driver); self.shutdown_clear_defer(cx); - self.shutdown_finalize(cx, synced); + cx.shared().shutdown_finalize(&cx.handle, &mut synced); return Err(()); } @@ -1281,61 +1265,6 @@ impl Worker { core.lifo_slot.is_some() || !core.run_queue.is_empty() } - /// Signals all tasks to shut down, and waits for them to complete. Must run - /// before we enter the single-threaded phase of shutdown processing. - fn pre_shutdown(&self, cx: &Context, core: &mut Core) { - // Signal to all tasks to shut down. - cx.shared().owned.close_and_shutdown_all(); - - core.stats.submit(&cx.shared().worker_metrics[core.index]); - } - - /// Signals that a worker has observed the shutdown signal and has replaced - /// its core back into its handle. - /// - /// If all workers have reached this point, the final cleanup is performed. - fn shutdown_core(&self, cx: &Context, core: Box) { - let mut synced = cx.shared().synced.lock(); - synced.shutdown_cores.push(core); - - self.shutdown_finalize(cx, synced); - } - - fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) { - // Wait for all cores - if synced.shutdown_cores.len() != cx.shared().remotes.len() { - return; - } - - let driver = synced.shutdown_driver.take(); - - if cx.shared().driver_enabled() && driver.is_none() { - return; - } - - debug_assert!(cx.shared().owned.is_empty()); - - for mut core in synced.shutdown_cores.drain(..) { - // Drain tasks from the local queue - while self.next_local_task(&mut core).is_some() {} - } - - // Shutdown the driver - if let Some(mut driver) = driver { - driver.shutdown(&cx.handle.driver); - } - - // Drain the injection queue - // - // We already shut down every task, so we can simply drop the tasks. We - // cannot call `next_remote_task()` because we already hold the lock. - // - // safety: passing in correct `idle::Synced` - while let Some(task) = self.next_remote_task_synced(cx, &mut synced) { - drop(task); - } - } - fn reset_lifo_enabled(&self, cx: &Context) { cx.lifo_enabled .set(!cx.handle.shared.config.disable_lifo_slot); @@ -1379,7 +1308,22 @@ impl Context { } } +impl Core { + fn next_local_task(&mut self) -> Option { + self.next_lifo_task().or_else(|| self.run_queue.pop()) + } + + fn next_lifo_task(&mut self) -> Option { + self.lifo_slot.take() + } +} + impl Shared { + fn next_remote_task_synced(&self, synced: &mut Synced) -> Option { + // safety: we only have access to a valid `Synced` in this file. + unsafe { self.inject.pop(&mut synced.inject) } + } + pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { use std::ptr; @@ -1449,17 +1393,25 @@ impl Shared { self.idle.notify_remote(synced, self); } - pub(super) fn close(&self) { - let mut synced = self.synced.lock(); + pub(super) fn close(&self, handle: &Handle) { + { + let mut synced = self.synced.lock(); - if let Some(driver) = self.driver.take() { - synced.shutdown_driver = Some(driver); - } + if let Some(driver) = self.driver.take() { + synced.shutdown_driver = Some(driver); + } + + if !self.inject.close(&mut synced.inject) { + return; + } - if self.inject.close(&mut synced.inject) { // Set the shutdown flag on all available cores self.idle.shutdown(&mut synced, self); } + + // Any unassigned cores need to be shutdown, but we have to first drop + // the lock + self.idle.shutdown_unassigned_cores(handle, self); } fn push_remote_task(&self, synced: &mut Synced, task: Notified) { @@ -1498,6 +1450,52 @@ impl Shared { fn driver_enabled(&self) -> bool { self.condvars.len() > self.remotes.len() } + + pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box) { + self.owned.close_and_shutdown_all(); + + core.stats.submit(&self.worker_metrics[core.index]); + + let mut synced = self.synced.lock(); + synced.shutdown_cores.push(core); + + self.shutdown_finalize(handle, &mut synced); + } + + pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) { + // Wait for all cores + if synced.shutdown_cores.len() != self.remotes.len() { + return; + } + + let driver = synced.shutdown_driver.take(); + + if self.driver_enabled() && driver.is_none() { + return; + } + + debug_assert!(self.owned.is_empty()); + + for mut core in synced.shutdown_cores.drain(..) { + // Drain tasks from the local queue + while core.next_local_task().is_some() {} + } + + // Shutdown the driver + if let Some(mut driver) = driver { + driver.shutdown(&handle.driver); + } + + // Drain the injection queue + // + // We already shut down every task, so we can simply drop the tasks. We + // cannot call `next_remote_task()` because we already hold the lock. + // + // safety: passing in correct `idle::Synced` + while let Some(task) = self.next_remote_task_synced(synced) { + drop(task); + } + } } impl Overflow> for Shared { @@ -1514,10 +1512,20 @@ impl Overflow> for Shared { } impl<'a> Lock for &'a Shared { - type Handle = InjectGuard<'a>; + type Handle = SyncedGuard<'a>; fn lock(self) -> Self::Handle { - InjectGuard { + SyncedGuard { + lock: self.synced.lock(), + } + } +} + +impl<'a> Lock for &'a Shared { + type Handle = SyncedGuard<'a>; + + fn lock(self) -> Self::Handle { + SyncedGuard { lock: self.synced.lock(), } } @@ -1537,16 +1545,28 @@ impl task::Schedule for Arc { } } -pub(crate) struct InjectGuard<'a> { +impl AsMut for Synced { + fn as_mut(&mut self) -> &mut Synced { + self + } +} + +pub(crate) struct SyncedGuard<'a> { lock: crate::loom::sync::MutexGuard<'a, Synced>, } -impl<'a> AsMut for InjectGuard<'a> { +impl<'a> AsMut for SyncedGuard<'a> { fn as_mut(&mut self) -> &mut inject::Synced { &mut self.lock.inject } } +impl<'a> AsMut for SyncedGuard<'a> { + fn as_mut(&mut self) -> &mut Synced { + &mut self.lock + } +} + #[track_caller] fn with_current(f: impl FnOnce(Option<&Context>) -> R) -> R { use scheduler::Context::MultiThreadAlt; From 6cb106c3538cf527495ef5491c088d1365b14c8e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 10 Aug 2023 12:43:44 -0700 Subject: [PATCH 3/4] rt: unstable EWMA poll time metric (#5927) Because the runtime uses this value as a tuning heuristic, it can be useful to get its value. This patch exposes the value as an unstable metric. --- tokio/src/runtime/metrics/batch.rs | 3 +- tokio/src/runtime/metrics/mock.rs | 2 +- tokio/src/runtime/metrics/runtime.rs | 41 +++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 4 ++ .../runtime/scheduler/current_thread/mod.rs | 2 +- .../runtime/scheduler/multi_thread/stats.rs | 2 +- .../scheduler/multi_thread_alt/stats.rs | 2 +- tokio/tests/rt_metrics.rs | 19 +++++++-- 8 files changed, 67 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 1bb4e261f73..1d0f3dea30a 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -73,7 +73,8 @@ impl MetricsBatch { } } - pub(crate) fn submit(&mut self, worker: &WorkerMetrics) { + pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { + worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 8f8345c08b4..e4bb3a99d0c 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -37,7 +37,7 @@ impl MetricsBatch { Self {} } - pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} + pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} pub(crate) fn about_to_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} pub(crate) fn start_processing_scheduled_tasks(&mut self) {} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 1f990a1f852..95b517c94eb 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -769,6 +769,47 @@ impl RuntimeMetrics { .unwrap_or_default() } + /// Returns the mean duration of task polls, in nanoseconds. + /// + /// This is an exponentially weighted moving average. Currently, this metric + /// is only provided by the multi-threaded runtime. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_mean_poll_time(0); + /// println!("worker 0 has a mean poll time of {:?}", n); + /// } + /// ``` + #[track_caller] + pub fn worker_mean_poll_time(&self, worker: usize) -> Duration { + let nanos = self + .handle + .inner + .worker_metrics(worker) + .mean_poll_time + .load(Relaxed); + Duration::from_nanos(nanos) + } + /// Returns the number of tasks currently scheduled in the blocking /// thread pool, spawned using `spawn_blocking`. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index e0f23e6a08e..b53a86bcc87 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -28,6 +28,9 @@ pub(crate) struct WorkerMetrics { /// Number of tasks the worker polled. pub(crate) poll_count: AtomicU64, + /// EWMA task poll time, in nanoseconds. + pub(crate) mean_poll_time: AtomicU64, + /// Amount of time the worker spent doing work vs. parking. pub(crate) busy_duration_total: AtomicU64, @@ -62,6 +65,7 @@ impl WorkerMetrics { steal_count: AtomicU64::new(0), steal_operations: AtomicU64::new(0), poll_count: AtomicU64::new(0), + mean_poll_time: AtomicU64::new(0), overflow_count: AtomicU64::new(0), busy_duration_total: AtomicU64::new(0), local_schedule_count: AtomicU64::new(0), diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 1100147d5cf..30b17c0e8ed 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -321,7 +321,7 @@ impl Core { } fn submit_metrics(&mut self, handle: &Handle) { - self.metrics.submit(&handle.shared.worker_metrics); + self.metrics.submit(&handle.shared.worker_metrics, 0); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index f01daaa1bff..3b8c5020e49 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -74,7 +74,7 @@ impl Stats { } pub(crate) fn submit(&mut self, to: &WorkerMetrics) { - self.batch.submit(to); + self.batch.submit(to, self.task_poll_time_ewma as u64); } pub(crate) fn about_to_park(&mut self) { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index 57657bb0391..228e797714b 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -93,7 +93,7 @@ impl Stats { } pub(crate) fn submit(&mut self, to: &WorkerMetrics) { - self.batch.submit(to); + self.batch.submit(to, self.task_poll_time_ewma as u64); } pub(crate) fn about_to_park(&mut self) { diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 7b7a957c419..42faab37f9b 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -215,18 +215,25 @@ fn worker_steal_count() { } #[test] -fn worker_poll_count() { +fn worker_poll_count_and_time() { const N: u64 = 5; + async fn task() { + // Sync sleep + std::thread::sleep(std::time::Duration::from_micros(10)); + } + let rt = current_thread(); let metrics = rt.metrics(); rt.block_on(async { for _ in 0..N { - tokio::spawn(async {}).await.unwrap(); + tokio::spawn(task()).await.unwrap(); } }); drop(rt); assert_eq!(N, metrics.worker_poll_count(0)); + // Not currently supported for current-thread runtime + assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0)); // Does not populate the histogram assert!(!metrics.poll_count_histogram_enabled()); @@ -238,7 +245,7 @@ fn worker_poll_count() { let metrics = rt.metrics(); rt.block_on(async { for _ in 0..N { - tokio::spawn(async {}).await.unwrap(); + tokio::spawn(task()).await.unwrap(); } }); drop(rt); @@ -249,6 +256,12 @@ fn worker_poll_count() { assert_eq!(N, n); + let n: Duration = (0..metrics.num_workers()) + .map(|i| metrics.worker_mean_poll_time(i)) + .sum(); + + assert!(n > Duration::default()); + // Does not populate the histogram assert!(!metrics.poll_count_histogram_enabled()); for n in 0..metrics.num_workers() { From 8b8005ebddbce0ec6d5e437905d00b91f3999b97 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 12 Aug 2023 09:37:49 -0700 Subject: [PATCH 4/4] chore: prepare Tokio v1.31.0 release (#5928) --- README.md | 2 +- tokio/CHANGELOG.md | 15 +++++++++++++++ tokio/Cargo.toml | 2 +- tokio/README.md | 2 +- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index baee6a0ed18..20f8ca2fd68 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.30.0", features = ["full"] } +tokio = { version = "1.31.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index b73a5e62d06..df4cfdf1fef 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,18 @@ +# 1.31.0 (August 10, 2023) + +### Fixed + +* io: delegate `WriteHalf::poll_write_vectored` ([#5914]) + +### Unstable + +* rt(unstable): fix memory leak in unstable next-gen scheduler prototype ([#5911]) +* rt: expose mean task poll time metric ([#5927]) + +[#5914]: https://github.com/tokio-rs/tokio/pull/5914 +[#5911]: https://github.com/tokio-rs/tokio/pull/5911 +[#5927]: https://github.com/tokio-rs/tokio/pull/5927 + # 1.30.0 (August 9, 2023) This release bumps the MSRV of Tokio to 1.63. ([#5887]) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 022c994311c..378a4957824 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.30.0" +version = "1.31.0" edition = "2021" rust-version = "1.63" authors = ["Tokio Contributors "] diff --git a/tokio/README.md b/tokio/README.md index baee6a0ed18..20f8ca2fd68 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.30.0", features = ["full"] } +tokio = { version = "1.31.0", features = ["full"] } ``` Then, on your main.rs: