Skip to content

Commit

Permalink
Merge tag 'tokio-1.31.0' into solid-rs/1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
kawadakk committed Aug 24, 2023
2 parents adb455c + 8b8005e commit 6d543ba
Show file tree
Hide file tree
Showing 18 changed files with 264 additions and 120 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,3 @@ jobs:
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.30.0", features = ["full"] }
tokio = { version = "1.31.0", features = ["full"] }
```
Then, on your main.rs:

Expand Down
15 changes: 15 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# 1.31.0 (August 10, 2023)

### Fixed

* io: delegate `WriteHalf::poll_write_vectored` ([#5914])

### Unstable

* rt(unstable): fix memory leak in unstable next-gen scheduler prototype ([#5911])
* rt: expose mean task poll time metric ([#5927])

[#5914]: https://github.com/tokio-rs/tokio/pull/5914
[#5911]: https://github.com/tokio-rs/tokio/pull/5911
[#5927]: https://github.com/tokio-rs/tokio/pull/5927

# 1.30.0 (August 9, 2023)

This release bumps the MSRV of Tokio to 1.63. ([#5887])
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.30.0"
version = "1.31.0"
edition = "2021"
rust-version = "1.63"
authors = ["Tokio Contributors <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.30.0", features = ["full"] }
tokio = { version = "1.31.0", features = ["full"] }
```
Then, on your main.rs:

Expand Down
17 changes: 17 additions & 0 deletions tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ cfg_io_util! {
where
T: AsyncRead + AsyncWrite,
{
let is_write_vectored = stream.is_write_vectored();

let inner = Arc::new(Inner {
locked: AtomicBool::new(false),
stream: UnsafeCell::new(stream),
is_write_vectored,
});

let rd = ReadHalf {
Expand All @@ -53,6 +56,7 @@ cfg_io_util! {
struct Inner<T> {
locked: AtomicBool,
stream: UnsafeCell<T>,
is_write_vectored: bool,
}

struct Guard<'a, T> {
Expand Down Expand Up @@ -131,6 +135,19 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_shutdown(cx)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored
}
}

impl<T> Inner<T> {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ impl MetricsBatch {
}
}

pub(crate) fn submit(&mut self, worker: &WorkerMetrics) {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl MetricsBatch {
Self {}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {}
pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
Expand Down
41 changes: 41 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,47 @@ impl RuntimeMetrics {
.unwrap_or_default()
}

/// Returns the mean duration of task polls, in nanoseconds.
///
/// This is an exponentially weighted moving average. Currently, this metric
/// is only provided by the multi-threaded runtime.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_mean_poll_time(0);
/// println!("worker 0 has a mean poll time of {:?}", n);
/// }
/// ```
#[track_caller]
pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.mean_poll_time
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(crate) struct WorkerMetrics {
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,

/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: AtomicU64,

/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: AtomicU64,

Expand Down Expand Up @@ -62,6 +65,7 @@ impl WorkerMetrics {
steal_count: AtomicU64::new(0),
steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
mean_poll_time: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
local_schedule_count: AtomicU64::new(0),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl Core {
}

fn submit_metrics(&mut self, handle: &Handle) {
self.metrics.submit(&handle.shared.worker_metrics);
self.metrics.submit(&handle.shared.worker_metrics, 0);
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Stats {
}

pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}

pub(crate) fn about_to_park(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Handle {
}

pub(crate) fn shutdown(&self) {
self.shared.close();
self.shared.close(self);
self.driver.unpark();
}

Expand Down
34 changes: 16 additions & 18 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Shared};
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};

Expand Down Expand Up @@ -146,23 +146,13 @@ impl Idle {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = synced.idle.available_cores.pop() {
if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
debug_assert!(!core.is_searching);
core.is_searching = true;

self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);

// Update the number of sleeping workers
self.num_idle.store(num_idle, Release);

// Drop the lock before notifying the condvar.
drop(synced);

Expand Down Expand Up @@ -198,6 +188,7 @@ impl Idle {

for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
// TODO: can this be switched to use next_available_core?
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);

Expand Down Expand Up @@ -236,15 +227,10 @@ impl Idle {
// eventually find the cores and shut them down.
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();

self.idle_map.unset(core.index);
let core = self.try_acquire_available_core(&mut synced.idle).unwrap();

synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();

self.num_idle
.store(synced.idle.available_cores.len(), Release);
}

debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
Expand All @@ -255,6 +241,18 @@ impl Idle {
}
}

pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
// If there are any remaining cores, shut them down here.
//
// This code is a bit convoluted to avoid lock-reentry.
while let Some(core) = {
let mut synced = shared.synced.lock();
self.try_acquire_available_core(&mut synced.idle)
} {
shared.shutdown_core(handle, core);
}
}

/// The worker releases the given core, making it available to other workers
/// that are waiting.
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Stats {
}

pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}

pub(crate) fn about_to_park(&mut self) {
Expand Down
Loading

0 comments on commit 6d543ba

Please sign in to comment.