Skip to content

Commit

Permalink
Merge tag 'tokio-1.32.0' into solid-rs/1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
kawadakk committed Aug 24, 2023
2 parents 6d543ba + a7d52c2 commit 83b93c5
Show file tree
Hide file tree
Showing 17 changed files with 795 additions and 70 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.31.0", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }
```
Then, on your main.rs:

Expand Down Expand Up @@ -216,7 +216,6 @@ warrants a patch release with a fix for the bug, it will be backported and
released as a new patch release for each LTS minor version. Our current LTS
releases are:

* `1.18.x` - LTS release until June 2023. (MSRV 1.49)
* `1.20.x` - LTS release until September 2023. (MSRV 1.49)
* `1.25.x` - LTS release until March 2024. (MSRV 1.49)

Expand All @@ -231,6 +230,12 @@ can use the following dependency specification:
tokio = { version = "~1.18", features = [...] }
```

### Previous LTS releases

* `1.8.x` - LTS release until February 2022.
* `1.14.x` - LTS release until June 2022.
* `1.18.x` - LTS release until June 2023.

## License

This project is licensed under the [MIT license].
Expand Down
1 change: 1 addition & 0 deletions tokio-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! Tokio and Futures based testing utilities

pub mod io;
pub mod stream_mock;

mod macros;
pub mod task;
Expand Down
168 changes: 168 additions & 0 deletions tokio-test/src/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#![cfg(not(loom))]

//! A mock stream implementing [`Stream`].
//!
//! # Overview
//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
//! intervals between items.
//!
//! # Usage
//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder
//! allows you to enqueue actions such as returning items or waiting for a certain duration.
//!
//! # Example
//! ```rust
//!
//! use futures_util::StreamExt;
//! use std::time::Duration;
//! use tokio_test::stream_mock::StreamMockBuilder;
//!
//! async fn test_stream_mock_wait() {
//! let mut stream_mock = StreamMockBuilder::new()
//! .next(1)
//! .wait(Duration::from_millis(300))
//! .next(2)
//! .build();
//!
//! assert_eq!(stream_mock.next().await, Some(1));
//! let start = std::time::Instant::now();
//! assert_eq!(stream_mock.next().await, Some(2));
//! let elapsed = start.elapsed();
//! assert!(elapsed >= Duration::from_millis(300));
//! assert_eq!(stream_mock.next().await, None);
//! }
//! ```

use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use futures_core::{ready, Stream};
use std::future::Future;
use tokio::time::{sleep_until, Instant, Sleep};

#[derive(Debug, Clone)]
enum Action<T: Unpin> {
Next(T),
Wait(Duration),
}

/// A builder for [`StreamMock`]
#[derive(Debug, Clone)]
pub struct StreamMockBuilder<T: Unpin> {
actions: VecDeque<Action<T>>,
}

impl<T: Unpin> StreamMockBuilder<T> {
/// Create a new empty [`StreamMockBuilder`]
pub fn new() -> Self {
StreamMockBuilder::default()
}

/// Queue an item to be returned by the stream
pub fn next(mut self, value: T) -> Self {
self.actions.push_back(Action::Next(value));
self
}

// Queue an item to be consumed by the sink,
// commented out until Sink is implemented.
//
// pub fn consume(mut self, value: T) -> Self {
// self.actions.push_back(Action::Consume(value));
// self
// }

/// Queue the stream to wait for a duration
pub fn wait(mut self, duration: Duration) -> Self {
self.actions.push_back(Action::Wait(duration));
self
}

/// Build the [`StreamMock`]
pub fn build(self) -> StreamMock<T> {
StreamMock {
actions: self.actions,
sleep: None,
}
}
}

impl<T: Unpin> Default for StreamMockBuilder<T> {
fn default() -> Self {
StreamMockBuilder {
actions: VecDeque::new(),
}
}
}

/// A mock stream implementing [`Stream`]
///
/// See [`StreamMockBuilder`] for more information.
#[derive(Debug)]
pub struct StreamMock<T: Unpin> {
actions: VecDeque<Action<T>>,
sleep: Option<Pin<Box<Sleep>>>,
}

impl<T: Unpin> StreamMock<T> {
fn next_action(&mut self) -> Option<Action<T>> {
self.actions.pop_front()
}
}

impl<T: Unpin> Stream for StreamMock<T> {
type Item = T;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Try polling the sleep future first
if let Some(ref mut sleep) = self.sleep {
ready!(Pin::new(sleep).poll(cx));
// Since we're ready, discard the sleep future
self.sleep.take();
}

match self.next_action() {
Some(action) => match action {
Action::Next(item) => Poll::Ready(Some(item)),
Action::Wait(duration) => {
// Set up a sleep future and schedule this future to be polled again for it.
self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
cx.waker().wake_by_ref();

Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

impl<T: Unpin> Drop for StreamMock<T> {
fn drop(&mut self) {
// Avoid double panicking to make debugging easier.
if std::thread::panicking() {
return;
}

let undropped_count = self
.actions
.iter()
.filter(|action| match action {
Action::Next(_) => true,
Action::Wait(_) => false,
})
.count();

assert!(
undropped_count == 0,
"StreamMock was dropped before all actions were consumed, {} actions were not consumed",
undropped_count
);
}
}
50 changes: 50 additions & 0 deletions tokio-test/tests/stream_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use futures_util::StreamExt;
use std::time::Duration;
use tokio_test::stream_mock::StreamMockBuilder;

#[tokio::test]
async fn test_stream_mock_empty() {
let mut stream_mock = StreamMockBuilder::<u32>::new().build();

assert_eq!(stream_mock.next().await, None);
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_items() {
let mut stream_mock = StreamMockBuilder::new().next(1).next(2).build();

assert_eq!(stream_mock.next().await, Some(1));
assert_eq!(stream_mock.next().await, Some(2));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
async fn test_stream_mock_wait() {
let mut stream_mock = StreamMockBuilder::new()
.next(1)
.wait(Duration::from_millis(300))
.next(2)
.build();

assert_eq!(stream_mock.next().await, Some(1));
let start = std::time::Instant::now();
assert_eq!(stream_mock.next().await, Some(2));
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(300));
assert_eq!(stream_mock.next().await, None);
}

#[tokio::test]
#[should_panic(expected = "StreamMock was dropped before all actions were consumed")]
async fn test_stream_mock_drop_without_consuming_all() {
let stream_mock = StreamMockBuilder::new().next(1).next(2).build();
drop(stream_mock);
}

#[tokio::test]
#[should_panic(expected = "test panic was not masked")]
async fn test_stream_mock_drop_during_panic_doesnt_mask_panic() {
let _stream_mock = StreamMockBuilder::new().next(1).next(2).build();
panic!("test panic was not masked");
}
22 changes: 21 additions & 1 deletion tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# 1.32.0 (August 16, 2023)

### Fixed

- sync: fix potential quadradic behavior in `broadcast::Receiver` ([#5925])

### Added

- process: stabilize `Command::raw_arg` ([#5930])
- io: enable awaiting error readiness ([#5781])

### Unstable

- rt(alt): improve scalability of alt runtime as the number of cores grows ([#5935])

[#5925]: https://github.com/tokio-rs/tokio/pull/5925
[#5930]: https://github.com/tokio-rs/tokio/pull/5930
[#5781]: https://github.com/tokio-rs/tokio/pull/5781
[#5935]: https://github.com/tokio-rs/tokio/pull/5935

# 1.31.0 (August 10, 2023)

### Fixed
Expand All @@ -6,7 +26,7 @@

### Unstable

* rt(unstable): fix memory leak in unstable next-gen scheduler prototype ([#5911])
* rt(alt): fix memory leak in unstable next-gen scheduler prototype ([#5911])
* rt: expose mean task poll time metric ([#5927])

[#5914]: https://github.com/tokio-rs/tokio/pull/5914
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.31.0"
version = "1.32.0"
edition = "2021"
rust-version = "1.63"
authors = ["Tokio Contributors <[email protected]>"]
Expand Down
9 changes: 7 additions & 2 deletions tokio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.31.0", features = ["full"] }
tokio = { version = "1.32.0", features = ["full"] }
```
Then, on your main.rs:

Expand Down Expand Up @@ -216,7 +216,6 @@ warrants a patch release with a fix for the bug, it will be backported and
released as a new patch release for each LTS minor version. Our current LTS
releases are:

* `1.18.x` - LTS release until June 2023. (MSRV 1.49)
* `1.20.x` - LTS release until September 2023. (MSRV 1.49)
* `1.25.x` - LTS release until March 2024. (MSRV 1.49)

Expand All @@ -231,6 +230,12 @@ can use the following dependency specification:
tokio = { version = "~1.18", features = [...] }
```

### Previous LTS releases

* `1.8.x` - LTS release until February 2022.
* `1.14.x` - LTS release until June 2022.
* `1.18.x` - LTS release until June 2023.

## License

This project is licensed under the [MIT license].
Expand Down
Loading

0 comments on commit 83b93c5

Please sign in to comment.