diff --git a/README.md b/README.md index 20f8ca2fd68..4a6671ad814 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.31.0", features = ["full"] } +tokio = { version = "1.32.0", features = ["full"] } ``` Then, on your main.rs: @@ -216,7 +216,6 @@ warrants a patch release with a fix for the bug, it will be backported and released as a new patch release for each LTS minor version. Our current LTS releases are: - * `1.18.x` - LTS release until June 2023. (MSRV 1.49) * `1.20.x` - LTS release until September 2023. (MSRV 1.49) * `1.25.x` - LTS release until March 2024. (MSRV 1.49) @@ -231,6 +230,12 @@ can use the following dependency specification: tokio = { version = "~1.18", features = [...] } ``` +### Previous LTS releases + + * `1.8.x` - LTS release until February 2022. + * `1.14.x` - LTS release until June 2022. + * `1.18.x` - LTS release until June 2023. + ## License This project is licensed under the [MIT license]. diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index de3f0864a94..87e63861210 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -12,6 +12,7 @@ //! Tokio and Futures based testing utilities pub mod io; +pub mod stream_mock; mod macros; pub mod task; diff --git a/tokio-test/src/stream_mock.rs b/tokio-test/src/stream_mock.rs new file mode 100644 index 00000000000..0426470af27 --- /dev/null +++ b/tokio-test/src/stream_mock.rs @@ -0,0 +1,168 @@ +#![cfg(not(loom))] + +//! A mock stream implementing [`Stream`]. +//! +//! # Overview +//! This crate provides a `StreamMock` that can be used to test code that interacts with streams. +//! It allows you to mock the behavior of a stream and control the items it yields and the waiting +//! intervals between items. +//! +//! # Usage +//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder +//! allows you to enqueue actions such as returning items or waiting for a certain duration. +//! +//! # Example +//! ```rust +//! +//! use futures_util::StreamExt; +//! use std::time::Duration; +//! use tokio_test::stream_mock::StreamMockBuilder; +//! +//! async fn test_stream_mock_wait() { +//! let mut stream_mock = StreamMockBuilder::new() +//! .next(1) +//! .wait(Duration::from_millis(300)) +//! .next(2) +//! .build(); +//! +//! assert_eq!(stream_mock.next().await, Some(1)); +//! let start = std::time::Instant::now(); +//! assert_eq!(stream_mock.next().await, Some(2)); +//! let elapsed = start.elapsed(); +//! assert!(elapsed >= Duration::from_millis(300)); +//! assert_eq!(stream_mock.next().await, None); +//! } +//! ``` + +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +use futures_core::{ready, Stream}; +use std::future::Future; +use tokio::time::{sleep_until, Instant, Sleep}; + +#[derive(Debug, Clone)] +enum Action { + Next(T), + Wait(Duration), +} + +/// A builder for [`StreamMock`] +#[derive(Debug, Clone)] +pub struct StreamMockBuilder { + actions: VecDeque>, +} + +impl StreamMockBuilder { + /// Create a new empty [`StreamMockBuilder`] + pub fn new() -> Self { + StreamMockBuilder::default() + } + + /// Queue an item to be returned by the stream + pub fn next(mut self, value: T) -> Self { + self.actions.push_back(Action::Next(value)); + self + } + + // Queue an item to be consumed by the sink, + // commented out until Sink is implemented. + // + // pub fn consume(mut self, value: T) -> Self { + // self.actions.push_back(Action::Consume(value)); + // self + // } + + /// Queue the stream to wait for a duration + pub fn wait(mut self, duration: Duration) -> Self { + self.actions.push_back(Action::Wait(duration)); + self + } + + /// Build the [`StreamMock`] + pub fn build(self) -> StreamMock { + StreamMock { + actions: self.actions, + sleep: None, + } + } +} + +impl Default for StreamMockBuilder { + fn default() -> Self { + StreamMockBuilder { + actions: VecDeque::new(), + } + } +} + +/// A mock stream implementing [`Stream`] +/// +/// See [`StreamMockBuilder`] for more information. +#[derive(Debug)] +pub struct StreamMock { + actions: VecDeque>, + sleep: Option>>, +} + +impl StreamMock { + fn next_action(&mut self) -> Option> { + self.actions.pop_front() + } +} + +impl Stream for StreamMock { + type Item = T; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // Try polling the sleep future first + if let Some(ref mut sleep) = self.sleep { + ready!(Pin::new(sleep).poll(cx)); + // Since we're ready, discard the sleep future + self.sleep.take(); + } + + match self.next_action() { + Some(action) => match action { + Action::Next(item) => Poll::Ready(Some(item)), + Action::Wait(duration) => { + // Set up a sleep future and schedule this future to be polled again for it. + self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration))); + cx.waker().wake_by_ref(); + + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} + +impl Drop for StreamMock { + fn drop(&mut self) { + // Avoid double panicking to make debugging easier. + if std::thread::panicking() { + return; + } + + let undropped_count = self + .actions + .iter() + .filter(|action| match action { + Action::Next(_) => true, + Action::Wait(_) => false, + }) + .count(); + + assert!( + undropped_count == 0, + "StreamMock was dropped before all actions were consumed, {} actions were not consumed", + undropped_count + ); + } +} diff --git a/tokio-test/tests/stream_mock.rs b/tokio-test/tests/stream_mock.rs new file mode 100644 index 00000000000..a54ea838a5b --- /dev/null +++ b/tokio-test/tests/stream_mock.rs @@ -0,0 +1,50 @@ +use futures_util::StreamExt; +use std::time::Duration; +use tokio_test::stream_mock::StreamMockBuilder; + +#[tokio::test] +async fn test_stream_mock_empty() { + let mut stream_mock = StreamMockBuilder::::new().build(); + + assert_eq!(stream_mock.next().await, None); + assert_eq!(stream_mock.next().await, None); +} + +#[tokio::test] +async fn test_stream_mock_items() { + let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build(); + + assert_eq!(stream_mock.next().await, Some(1)); + assert_eq!(stream_mock.next().await, Some(2)); + assert_eq!(stream_mock.next().await, None); +} + +#[tokio::test] +async fn test_stream_mock_wait() { + let mut stream_mock = StreamMockBuilder::new() + .next(1) + .wait(Duration::from_millis(300)) + .next(2) + .build(); + + assert_eq!(stream_mock.next().await, Some(1)); + let start = std::time::Instant::now(); + assert_eq!(stream_mock.next().await, Some(2)); + let elapsed = start.elapsed(); + assert!(elapsed >= Duration::from_millis(300)); + assert_eq!(stream_mock.next().await, None); +} + +#[tokio::test] +#[should_panic(expected = "StreamMock was dropped before all actions were consumed")] +async fn test_stream_mock_drop_without_consuming_all() { + let stream_mock = StreamMockBuilder::new().next(1).next(2).build(); + drop(stream_mock); +} + +#[tokio::test] +#[should_panic(expected = "test panic was not masked")] +async fn test_stream_mock_drop_during_panic_doesnt_mask_panic() { + let _stream_mock = StreamMockBuilder::new().next(1).next(2).build(); + panic!("test panic was not masked"); +} diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index df4cfdf1fef..eb40bc10100 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,23 @@ +# 1.32.0 (August 16, 2023) + +### Fixed + +- sync: fix potential quadradic behavior in `broadcast::Receiver` ([#5925]) + +### Added + +- process: stabilize `Command::raw_arg` ([#5930]) +- io: enable awaiting error readiness ([#5781]) + +### Unstable + +- rt(alt): improve scalability of alt runtime as the number of cores grows ([#5935]) + +[#5925]: https://github.com/tokio-rs/tokio/pull/5925 +[#5930]: https://github.com/tokio-rs/tokio/pull/5930 +[#5781]: https://github.com/tokio-rs/tokio/pull/5781 +[#5935]: https://github.com/tokio-rs/tokio/pull/5935 + # 1.31.0 (August 10, 2023) ### Fixed @@ -6,7 +26,7 @@ ### Unstable -* rt(unstable): fix memory leak in unstable next-gen scheduler prototype ([#5911]) +* rt(alt): fix memory leak in unstable next-gen scheduler prototype ([#5911]) * rt: expose mean task poll time metric ([#5927]) [#5914]: https://github.com/tokio-rs/tokio/pull/5914 diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 378a4957824..19384b75d54 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.31.0" +version = "1.32.0" edition = "2021" rust-version = "1.63" authors = ["Tokio Contributors "] diff --git a/tokio/README.md b/tokio/README.md index 20f8ca2fd68..4a6671ad814 100644 --- a/tokio/README.md +++ b/tokio/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.31.0", features = ["full"] } +tokio = { version = "1.32.0", features = ["full"] } ``` Then, on your main.rs: @@ -216,7 +216,6 @@ warrants a patch release with a fix for the bug, it will be backported and released as a new patch release for each LTS minor version. Our current LTS releases are: - * `1.18.x` - LTS release until June 2023. (MSRV 1.49) * `1.20.x` - LTS release until September 2023. (MSRV 1.49) * `1.25.x` - LTS release until March 2024. (MSRV 1.49) @@ -231,6 +230,12 @@ can use the following dependency specification: tokio = { version = "~1.18", features = [...] } ``` +### Previous LTS releases + + * `1.8.x` - LTS release until February 2022. + * `1.14.x` - LTS release until June 2022. + * `1.18.x` - LTS release until June 2023. + ## License This project is licensed under the [MIT license]. diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index db56359c5ad..ae8f5c6417c 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -176,6 +176,8 @@ use std::{task::Context, task::Poll}; /// [`AsyncWrite`]: trait@crate::io::AsyncWrite pub struct AsyncFd { registration: Registration, + // The inner value is always present. the Option is required for `drop` and `into_inner`. + // In all other methods `unwrap` is valid, and will never panic. inner: Option, } @@ -271,13 +273,12 @@ impl AsyncFd { } fn take_inner(&mut self) -> Option { - let fd = self.inner.as_ref().map(AsRawFd::as_raw_fd); + let inner = self.inner.take()?; + let fd = inner.as_raw_fd(); - if let Some(fd) = fd { - let _ = self.registration.deregister(&mut SourceFd(&fd)); - } + let _ = self.registration.deregister(&mut SourceFd(&fd)); - self.inner.take() + Some(inner) } /// Deregisters this file descriptor and returns ownership of the backing @@ -319,11 +320,10 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.registration.poll_read_ready(cx))?; - Ok(AsyncFdReadyGuard { + Poll::Ready(Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), - }) - .into() + })) } /// Polls for read readiness. @@ -357,11 +357,10 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.registration.poll_read_ready(cx))?; - Ok(AsyncFdReadyMutGuard { + Poll::Ready(Ok(AsyncFdReadyMutGuard { async_fd: self, event: Some(event), - }) - .into() + })) } /// Polls for write readiness. @@ -397,11 +396,10 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.registration.poll_write_ready(cx))?; - Ok(AsyncFdReadyGuard { + Poll::Ready(Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), - }) - .into() + })) } /// Polls for write readiness. @@ -435,11 +433,10 @@ impl AsyncFd { ) -> Poll>> { let event = ready!(self.registration.poll_write_ready(cx))?; - Ok(AsyncFdReadyMutGuard { + Poll::Ready(Ok(AsyncFdReadyMutGuard { async_fd: self, event: Some(event), - }) - .into() + })) } /// Waits for any of the requested ready states, returning a @@ -1013,14 +1010,11 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { ) -> Result, TryIoError> { let result = f(self.async_fd); - if let Err(e) = result.as_ref() { - if e.kind() == io::ErrorKind::WouldBlock { + match result { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { self.clear_ready(); + Err(TryIoError(())) } - } - - match result { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => Err(TryIoError(())), result => Ok(result), } } @@ -1193,14 +1187,11 @@ impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> { ) -> Result, TryIoError> { let result = f(self.async_fd); - if let Err(e) = result.as_ref() { - if e.kind() == io::ErrorKind::WouldBlock { + match result { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { self.clear_ready(); + Err(TryIoError(())) } - } - - match result { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => Err(TryIoError(())), result => Ok(result), } } diff --git a/tokio/src/io/interest.rs b/tokio/src/io/interest.rs index 3a39cf761b7..9256bd238da 100644 --- a/tokio/src/io/interest.rs +++ b/tokio/src/io/interest.rs @@ -5,13 +5,28 @@ use crate::io::ready::Ready; use std::fmt; use std::ops; +// These must be unique. +// same as mio +const READABLE: usize = 0b0001; +const WRITABLE: usize = 0b0010; +// The following are not available on all platforms. +#[cfg(target_os = "freebsd")] +const AIO: usize = 0b0100; +#[cfg(target_os = "freebsd")] +const LIO: usize = 0b1000; +#[cfg(any(target_os = "linux", target_os = "android"))] +const PRIORITY: usize = 0b0001_0000; +// error is available on all platforms, but behavior is platform-specific +// mio does not have this interest +const ERROR: usize = 0b0010_0000; + /// Readiness event interest. /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. #[cfg_attr(docsrs, doc(cfg(feature = "net")))] #[derive(Clone, Copy, Eq, PartialEq)] -pub struct Interest(mio::Interest); +pub struct Interest(usize); impl Interest { // The non-FreeBSD definitions in this block are active only when @@ -19,35 +34,41 @@ impl Interest { cfg_aio! { /// Interest for POSIX AIO. #[cfg(target_os = "freebsd")] - pub const AIO: Interest = Interest(mio::Interest::AIO); + pub const AIO: Interest = Interest(AIO); /// Interest for POSIX AIO. #[cfg(not(target_os = "freebsd"))] - pub const AIO: Interest = Interest(mio::Interest::READABLE); + pub const AIO: Interest = Interest(READABLE); /// Interest for POSIX AIO lio_listio events. #[cfg(target_os = "freebsd")] - pub const LIO: Interest = Interest(mio::Interest::LIO); + pub const LIO: Interest = Interest(LIO); /// Interest for POSIX AIO lio_listio events. #[cfg(not(target_os = "freebsd"))] - pub const LIO: Interest = Interest(mio::Interest::READABLE); + pub const LIO: Interest = Interest(READABLE); } /// Interest in all readable events. /// /// Readable interest includes read-closed events. - pub const READABLE: Interest = Interest(mio::Interest::READABLE); + pub const READABLE: Interest = Interest(READABLE); /// Interest in all writable events. /// /// Writable interest includes write-closed events. - pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + pub const WRITABLE: Interest = Interest(WRITABLE); + + /// Interest in error events. + /// + /// Passes error interest to the underlying OS selector. + /// Behavior is platform-specific, read your platform's documentation. + pub const ERROR: Interest = Interest(ERROR); /// Returns a `Interest` set representing priority completion interests. #[cfg(any(target_os = "linux", target_os = "android"))] #[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))] - pub const PRIORITY: Interest = Interest(mio::Interest::PRIORITY); + pub const PRIORITY: Interest = Interest(PRIORITY); /// Returns true if the value includes readable interest. /// @@ -63,7 +84,7 @@ impl Interest { /// assert!(both.is_readable()); /// ``` pub const fn is_readable(self) -> bool { - self.0.is_readable() + self.0 & READABLE != 0 } /// Returns true if the value includes writable interest. @@ -80,7 +101,34 @@ impl Interest { /// assert!(both.is_writable()); /// ``` pub const fn is_writable(self) -> bool { - self.0.is_writable() + self.0 & WRITABLE != 0 + } + + /// Returns true if the value includes error interest. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(Interest::ERROR.is_error()); + /// assert!(!Interest::WRITABLE.is_error()); + /// + /// let combined = Interest::READABLE | Interest::ERROR; + /// assert!(combined.is_error()); + /// ``` + pub const fn is_error(self) -> bool { + self.0 & ERROR != 0 + } + + #[cfg(target_os = "freebsd")] + const fn is_aio(self) -> bool { + self.0 & AIO != 0 + } + + #[cfg(target_os = "freebsd")] + const fn is_lio(self) -> bool { + self.0 & LIO != 0 } /// Returns true if the value includes priority interest. @@ -99,7 +147,7 @@ impl Interest { #[cfg(any(target_os = "linux", target_os = "android"))] #[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))] pub const fn is_priority(self) -> bool { - self.0.is_priority() + self.0 & PRIORITY != 0 } /// Add together two `Interest` values. @@ -116,12 +164,60 @@ impl Interest { /// assert!(BOTH.is_readable()); /// assert!(BOTH.is_writable()); pub const fn add(self, other: Interest) -> Interest { - Interest(self.0.add(other.0)) + Self(self.0 | other.0) } // This function must be crate-private to avoid exposing a `mio` dependency. - pub(crate) const fn to_mio(self) -> mio::Interest { - self.0 + pub(crate) fn to_mio(self) -> mio::Interest { + fn mio_add(wrapped: &mut Option, add: mio::Interest) { + match wrapped { + Some(inner) => *inner |= add, + None => *wrapped = Some(add), + } + } + + // mio does not allow and empty interest, so use None for empty + let mut mio = None; + + if self.is_readable() { + mio_add(&mut mio, mio::Interest::READABLE); + } + + if self.is_writable() { + mio_add(&mut mio, mio::Interest::WRITABLE); + } + + #[cfg(any(target_os = "linux", target_os = "android"))] + if self.is_priority() { + mio_add(&mut mio, mio::Interest::PRIORITY); + } + + #[cfg(target_os = "freebsd")] + if self.is_aio() { + mio_add(&mut mio, mio::Interest::AIO); + } + + #[cfg(target_os = "freebsd")] + if self.is_lio() { + mio_add(&mut mio, mio::Interest::LIO); + } + + if self.is_error() { + // There is no error interest in mio, because error events are always reported. + // But mio interests cannot be empty and an interest is needed just for the registeration. + // + // read readiness is filtered out in `Interest::mask` or `Ready::from_interest` if + // the read interest was not specified by the user. + mio_add(&mut mio, mio::Interest::READABLE); + } + + // the default `mio::Interest::READABLE` should never be used in practice. Either + // + // - at least one tokio interest with a mio counterpart was used + // - only the error tokio interest was specified + // + // in both cases, `mio` is Some already + mio.unwrap_or(mio::Interest::READABLE) } pub(crate) fn mask(self) -> Ready { @@ -130,6 +226,7 @@ impl Interest { Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, #[cfg(any(target_os = "linux", target_os = "android"))] Interest::PRIORITY => Ready::PRIORITY | Ready::READ_CLOSED, + Interest::ERROR => Ready::ERROR, _ => Ready::EMPTY, } } @@ -147,12 +244,67 @@ impl ops::BitOr for Interest { impl ops::BitOrAssign for Interest { #[inline] fn bitor_assign(&mut self, other: Self) { - self.0 = (*self | other).0; + *self = *self | other } } impl fmt::Debug for Interest { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(fmt) + let mut separator = false; + + if self.is_readable() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "READABLE")?; + separator = true; + } + + if self.is_writable() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "WRITABLE")?; + separator = true; + } + + #[cfg(any(target_os = "linux", target_os = "android"))] + if self.is_priority() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "PRIORITY")?; + separator = true; + } + + #[cfg(target_os = "freebsd")] + if self.is_aio() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "AIO")?; + separator = true; + } + + #[cfg(target_os = "freebsd")] + if self.is_lio() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "LIO")?; + separator = true; + } + + if self.is_error() { + if separator { + write!(fmt, " | ")?; + } + write!(fmt, "ERROR")?; + separator = true; + } + + let _ = separator; + + Ok(()) } } diff --git a/tokio/src/io/ready.rs b/tokio/src/io/ready.rs index c2a80264b2c..919f1992c6b 100644 --- a/tokio/src/io/ready.rs +++ b/tokio/src/io/ready.rs @@ -11,6 +11,7 @@ const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; #[cfg(any(target_os = "linux", target_os = "android"))] const PRIORITY: usize = 0b1_0000; +const ERROR: usize = 0b10_0000; /// Describes the readiness state of an I/O resources. /// @@ -40,13 +41,17 @@ impl Ready { #[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))] pub const PRIORITY: Ready = Ready(PRIORITY); + /// Returns a `Ready` representing error readiness. + pub const ERROR: Ready = Ready(ERROR); + /// Returns a `Ready` representing readiness for all operations. #[cfg(any(target_os = "linux", target_os = "android"))] - pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | PRIORITY); + pub const ALL: Ready = + Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR | PRIORITY); /// Returns a `Ready` representing readiness for all operations. #[cfg(not(any(target_os = "linux", target_os = "android")))] - pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR); // Must remain crate-private to avoid adding a public dependency on Mio. pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { @@ -79,6 +84,10 @@ impl Ready { ready |= Ready::WRITE_CLOSED; } + if event.is_error() { + ready |= Ready::ERROR; + } + #[cfg(any(target_os = "linux", target_os = "android"))] { if event.is_priority() { @@ -182,6 +191,21 @@ impl Ready { self.contains(Ready::PRIORITY) } + /// Returns `true` if the value includes error `readiness`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_error()); + /// assert!(!Ready::WRITABLE.is_error()); + /// assert!(Ready::ERROR.is_error()); + /// ``` + pub fn is_error(self) -> bool { + self.contains(Ready::ERROR) + } + /// Returns true if `self` is a superset of `other`. /// /// `other` may represent more than one readiness operations, in which case @@ -230,6 +254,10 @@ impl Ready { ready |= Ready::READ_CLOSED; } + if interest.is_error() { + ready |= Ready::ERROR; + } + ready } @@ -283,7 +311,8 @@ impl fmt::Debug for Ready { fmt.field("is_readable", &self.is_readable()) .field("is_writable", &self.is_writable()) .field("is_read_closed", &self.is_read_closed()) - .field("is_write_closed", &self.is_write_closed()); + .field("is_write_closed", &self.is_write_closed()) + .field("is_error", &self.is_error()); #[cfg(any(target_os = "linux", target_os = "android"))] fmt.field("is_priority", &self.is_priority()); diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index 179e4834d69..7588f822fb6 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -230,10 +230,13 @@ cfg_io_util! { /// let mut buffer = BytesMut::with_capacity(10); /// /// assert!(buffer.is_empty()); + /// assert!(buffer.capacity() >= 10); /// - /// // read up to 10 bytes, note that the return value is not needed - /// // to access the data that was read as `buffer`'s internal - /// // cursor is updated. + /// // note that the return value is not needed to access the data + /// // that was read as `buffer`'s internal cursor is updated. + /// // + /// // this might read more than 10 bytes if the capacity of `buffer` + /// // is larger than 10. /// f.read_buf(&mut buffer).await?; /// /// println!("The bytes: {:?}", &buffer[..]); diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index d6c05442c58..78ac3555d1c 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -402,17 +402,11 @@ impl Command { self } - cfg_unstable_windows! { + cfg_windows! { /// Append literal text to the command line without any quoting or escaping. /// /// This is useful for passing arguments to `cmd.exe /c`, which doesn't follow /// `CommandLineToArgvW` escaping rules. - /// - /// **Note**: This is an [unstable API][unstable] but will be stabilised once - /// tokio's MSRV is sufficiently new. See [the documentation on - /// unstable features][unstable] for details about using unstable features. - /// - /// [unstable]: crate#unstable-features pub fn raw_arg>(&mut self, text_to_append_as_is: S) -> &mut Command { self.std.raw_arg(text_to_append_as_is); self diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 9b76867d279..03f1678dcb1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -404,9 +404,17 @@ impl Builder { /// Specifies the limit for additional threads spawned by the Runtime. /// /// These threads are used for blocking operations like tasks spawned - /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not - /// always active and will exit if left idle for too long. You can change - /// this timeout duration with [`thread_keep_alive`]. + /// through [`spawn_blocking`], this includes but is not limited to: + /// - [`fs`] operations + /// - dns resolution through [`ToSocketAddrs`] + /// - writing to [`Stdout`] or [`Stderr`] + /// - reading from [`Stdin`] + /// + /// Unlike the [`worker_threads`], they are not always active and will exit + /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`]. + /// + /// It's recommended to not set this limit too low in order to avoid hanging on operations + /// requiring [`spawn_blocking`]. /// /// The default value is 512. /// @@ -420,6 +428,11 @@ impl Builder { /// current `max_blocking_threads` does not include async worker threads in the count. /// /// [`spawn_blocking`]: fn@crate::task::spawn_blocking + /// [`fs`]: mod@crate::fs + /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs + /// [`Stdout`]: struct@crate::io::Stdout + /// [`Stdin`]: struct@crate::io::Stdin + /// [`Stderr`]: struct@crate::io::Stderr /// [`worker_threads`]: Self::worker_threads /// [`thread_keep_alive`]: Self::thread_keep_alive #[track_caller] diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index 85f40080f4a..ddce4b3ae4b 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -17,6 +17,93 @@ use std::task::{Context, Poll, Waker}; /// Stored in the I/O driver resource slab. #[derive(Debug)] +// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied +// from crossbeam-utils/src/cache_padded.rs +// +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + ), + repr(align(128)) +)] +// arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 +// +// riscv32 is assumed not to exceed the cache line size of riscv64. +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv32", + target_arch = "riscv64", + target_arch = "sparc", + target_arch = "hexagon", + ), + repr(align(32)) +)] +// m68k has 16-byte cache line size. +// +// Sources: +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 +#[cfg_attr(target_arch = "m68k", repr(align(16)))] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86, wasm, and sparc64 have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv32", + target_arch = "riscv64", + target_arch = "sparc", + target_arch = "hexagon", + target_arch = "m68k", + target_arch = "s390x", + )), + repr(align(64)) +)] pub(crate) struct ScheduledIo { pub(super) linked_list_pointers: UnsafeCell>, diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 1bc9a784023..c9a05557a40 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -817,7 +817,7 @@ impl Worker { /// workers will be trying to steal at the same time. fn search_for_work(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { #[cfg(not(loom))] - const ROUNDS: usize = 1; + const ROUNDS: usize = 4; #[cfg(loom)] const ROUNDS: usize = 1; @@ -1000,6 +1000,14 @@ impl Worker { // Number of tasks we want to try to spread across idle workers let num_fanout = cmp::min(defer.len(), cx.shared().idle.num_idle(&synced.idle)); + // Cap the number of threads woken up at one time. This is to limit + // the number of no-op wakes and reduce mutext contention. + // + // This number was picked after some basic benchmarks, but it can + // probably be tuned using the mean poll time value (slower task + // polls can leverage more woken workers). + let num_fanout = cmp::min(2, num_fanout); + if num_fanout > 0 { cx.shared() .push_remote_task_batch_synced(&mut synced, defer.drain(..num_fanout)); diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 246ec212f53..42cde81dc9b 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,7 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::util::linked_list::{self, LinkedList}; +use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; use std::fmt; @@ -366,6 +366,17 @@ struct Waiter { _p: PhantomPinned, } +impl Waiter { + fn new() -> Self { + Self { + queued: false, + waker: None, + pointers: linked_list::Pointers::new(), + _p: PhantomPinned, + } + } +} + generate_addr_of_methods! { impl<> Waiter { unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { @@ -817,12 +828,75 @@ fn new_receiver(shared: Arc>) -> Receiver { Receiver { shared, next } } +/// List used in `Shared::notify_rx`. It wraps a guarded linked list +/// and gates the access to it on the `Shared.tail` mutex. It also empties +/// the list on drop. +struct WaitersList<'a, T> { + list: GuardedLinkedList::Target>, + is_empty: bool, + shared: &'a Shared, +} + +impl<'a, T> Drop for WaitersList<'a, T> { + fn drop(&mut self) { + // If the list is not empty, we unlink all waiters from it. + // We do not wake the waiters to avoid double panics. + if !self.is_empty { + let _lock_guard = self.shared.tail.lock(); + while self.list.pop_back().is_some() {} + } + } +} + +impl<'a, T> WaitersList<'a, T> { + fn new( + unguarded_list: LinkedList::Target>, + guard: Pin<&'a Waiter>, + shared: &'a Shared, + ) -> Self { + let guard_ptr = NonNull::from(guard.get_ref()); + let list = unguarded_list.into_guarded(guard_ptr); + WaitersList { + list, + is_empty: false, + shared, + } + } + + /// Removes the last element from the guarded list. Modifying this list + /// requires an exclusive access to the main list in `Notify`. + fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option> { + let result = self.list.pop_back(); + if result.is_none() { + // Save information about emptiness to avoid waiting for lock + // in the destructor. + self.is_empty = true; + } + result + } +} + impl Shared { fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) { + // It is critical for `GuardedLinkedList` safety that the guard node is + // pinned in memory and is not dropped until the guarded list is dropped. + let guard = Waiter::new(); + pin!(guard); + + // We move all waiters to a secondary list. It uses a `GuardedLinkedList` + // underneath to allow every waiter to safely remove itself from it. + // + // * This list will be still guarded by the `waiters` lock. + // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. + // * This wrapper will empty the list on drop. It is critical for safety + // that we will not leave any list entry with a pointer to the local + // guard node after this function returns / panics. + let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self); + let mut wakers = WakeList::new(); 'outer: loop { while wakers.can_push() { - match tail.waiters.pop_back() { + match list.pop_back_locked(&mut tail) { Some(mut waiter) => { // Safety: `tail` lock is still held. let waiter = unsafe { waiter.as_mut() }; diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index cdbfbacd0db..7abd592d9df 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -685,3 +685,128 @@ async fn clear_ready_matching_clears_ready_mut() { guard.clear_ready_matching(Ready::WRITABLE); assert_eq!(guard.ready(), Ready::EMPTY); } + +#[tokio::test] +#[cfg(target_os = "linux")] +async fn await_error_readiness_timestamping() { + use std::net::{Ipv4Addr, SocketAddr}; + + use tokio::io::{Interest, Ready}; + + let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + + let socket = std::net::UdpSocket::bind(address_a).unwrap(); + + socket.set_nonblocking(true).unwrap(); + + // configure send timestamps + configure_timestamping_socket(&socket).unwrap(); + + socket.connect(address_b).unwrap(); + + let fd = AsyncFd::new(socket).unwrap(); + + tokio::select! { + _ = fd.ready(Interest::ERROR) => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + let buf = b"hello there"; + fd.get_ref().send(buf).unwrap(); + + // the send timestamp should now be in the error queue + let guard = fd.ready(Interest::ERROR).await.unwrap(); + assert_eq!(guard.ready(), Ready::ERROR); +} + +#[cfg(target_os = "linux")] +fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result { + // enable software timestamping, and specifically software send timestamping + let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE; + + let res = unsafe { + libc::setsockopt( + udp_socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_TIMESTAMP, + &options as *const _ as *const libc::c_void, + std::mem::size_of_val(&options) as libc::socklen_t, + ) + }; + + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } +} + +#[tokio::test] +#[cfg(target_os = "linux")] +async fn await_error_readiness_invalid_address() { + use std::net::{Ipv4Addr, SocketAddr}; + use tokio::io::{Interest, Ready}; + + let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + let socket = std::net::UdpSocket::bind(socket_addr).unwrap(); + let socket_fd = socket.as_raw_fd(); + + // Enable IP_RECVERR option to receive error messages + // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information + let recv_err: libc::c_int = 1; + unsafe { + let res = libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_IP, + libc::IP_RECVERR, + &recv_err as *const _ as *const libc::c_void, + std::mem::size_of_val(&recv_err) as libc::socklen_t, + ); + if res == -1 { + panic!("{:?}", std::io::Error::last_os_error()); + } + } + + // Spawn a separate thread for sending messages + tokio::spawn(async move { + // Set the destination address. This address is invalid in this context. the OS will notice + // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"), + // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket + let mut dest_addr = + unsafe { std::mem::MaybeUninit::::zeroed().assume_init() }; + dest_addr.sin_family = libc::AF_INET as _; + // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number + // below 1024 to guarantee that other tests don't select this port by accident when they + // use port 0 to select an ephemeral port. + dest_addr.sin_port = 512u16.to_be(); // Destination port + + // Prepare the message data + let message = "Hello, Socket!"; + + // Prepare the message structure for sendmsg + let mut iov = libc::iovec { + iov_base: message.as_ptr() as *mut libc::c_void, + iov_len: message.len(), + }; + + // Prepare the destination address for the sendmsg call + let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr; + let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t; + + let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; + msg.msg_name = dest_sockaddr as *mut libc::c_void; + msg.msg_namelen = dest_addrlen; + msg.msg_iov = &mut iov; + msg.msg_iovlen = 1; + + if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 { + Err(std::io::Error::last_os_error()).unwrap() + } + }); + + let fd = AsyncFd::new(socket).unwrap(); + + let guard = fd.ready(Interest::ERROR).await.unwrap(); + assert_eq!(guard.ready(), Ready::ERROR); +}