Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't allow to open more than 256 unacknowledged streams #153

Merged
merged 39 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
570d9d3
Add test for ACK backlog
thomaseizinger Feb 21, 2023
639687c
Don't allow opening more than 256 without receiving an ACK
thomaseizinger Feb 21, 2023
4e477e4
Add test for acknowledging streams
thomaseizinger May 10, 2023
0bd7e0e
Correctly track acknowledged state
thomaseizinger May 10, 2023
7c8f2f8
Update yamux/src/lib.rs
thomaseizinger May 22, 2023
8f67482
Update yamux/src/connection.rs
thomaseizinger May 22, 2023
2d92a77
Don't duplicate data
thomaseizinger May 23, 2023
e675e79
Merge remote-tracking branch 'origin/feat/256-backlog' into feat/256-…
thomaseizinger May 23, 2023
471edc7
Reduce diff
thomaseizinger May 23, 2023
39db682
Merge branch 'master' into feat/256-backlog
thomaseizinger May 24, 2023
fbcbff0
Fix bug where we count inbound and outbound streams for ack backlog
thomaseizinger May 24, 2023
d715980
Window size for inbound streams always starts with DEFAULT_CREDIT
thomaseizinger May 24, 2023
5793d27
Credit for outbound stream always starts with DEFAULT_CREDIT
thomaseizinger May 24, 2023
e1cbf7a
Inline constructor
thomaseizinger May 24, 2023
bc615d4
Introduce `is_outbound` function
thomaseizinger May 24, 2023
c07a5ad
Compute `is_outbound` based on `Mode`
thomaseizinger May 24, 2023
f377492
Track acknowledged as part of state
thomaseizinger May 24, 2023
0e201a4
Merge branch 'master' into feat/256-backlog
thomaseizinger Jun 28, 2023
f2f5bc8
Unify harness code
thomaseizinger Jun 28, 2023
3b35567
Port `concurrent_streams` test to poll-api
thomaseizinger Jun 28, 2023
384ade8
Replace custom channel with `futures_ringbuf`
thomaseizinger Jun 28, 2023
9c090d3
Remove `prop_send_recv`
thomaseizinger Jun 28, 2023
660e6c9
Rewrite `prop_send_recv_half_closed` to poll-api and move
thomaseizinger Jun 28, 2023
7c87447
Rewrite `prop_config_send_recv_single` to poll-api
thomaseizinger Jun 28, 2023
05eee85
Move test to `poll_api`
thomaseizinger Jun 28, 2023
efa7310
Rewrite deadlock test to use poll-api
thomaseizinger Jun 28, 2023
0f10d51
Move final test to poll_api
thomaseizinger Jun 28, 2023
0337b85
Merge branch 'feat/remove-control' into feat/256-backlog
thomaseizinger Jun 30, 2023
cb56b70
Fix errors after merge
thomaseizinger Jun 30, 2023
749fd9a
Move `MessageSender` to test-harness
thomaseizinger Jun 30, 2023
7ff430f
Move benchmarks to test-harness
thomaseizinger Jun 30, 2023
d3667e3
Add message multiplier to `MessageSender`
thomaseizinger Jun 30, 2023
358a728
Migrate benchmark away from `Control`
thomaseizinger Jun 30, 2023
d5d5972
Remove `Control` and `ControlledConnection`
thomaseizinger Jun 30, 2023
40f01c6
Fix formatting
thomaseizinger Jun 30, 2023
ab16660
Merge branch 'feat/remove-control' into feat/256-backlog
thomaseizinger Jun 30, 2023
8e78dbb
Merge branch 'master' into feat/256-backlog
thomaseizinger Jul 3, 2023
d6a4261
Update docs
thomaseizinger Jul 3, 2023
7b4fdba
Add docs
thomaseizinger Jul 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ mod cleanup;
mod closing;
mod stream;

use crate::Result;
use crate::{
error::ConnectionError,
frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
frame::{self, Frame},
Config, WindowUpdateMode, DEFAULT_CREDIT, MAX_COMMAND_BACKLOG,
};
use crate::{Result, MAX_ACK_BACKLOG};
use cleanup::Cleanup;
use closing::Closing;
use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse};
use nohash_hasher::IntMap;
use std::collections::VecDeque;
use std::task::Context;
use std::collections::{HashSet, VecDeque};
use std::task::{Context, Waker};
use std::{fmt, sync::Arc, task::Poll};

pub use stream::{Packet, State, Stream};
Expand Down Expand Up @@ -160,12 +160,16 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
loop {
match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) {
ConnectionState::Active(mut active) => match active.new_outbound() {
Ok(stream) => {
ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) {
Poll::Ready(Ok(stream)) => {
self.inner = ConnectionState::Active(active);
return Poll::Ready(Ok(stream));
}
Err(e) => {
Poll::Pending => {
self.inner = ConnectionState::Active(active);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
self.inner = ConnectionState::Cleanup(active.cleanup(e));
continue;
}
Expand Down Expand Up @@ -352,6 +356,8 @@ struct Active<T> {
stream_receiver: mpsc::Receiver<StreamCommand>,
dropped_streams: Vec<StreamId>,
pending_frames: VecDeque<Frame<()>>,
pending_acks: HashSet<StreamId>,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
new_outbound_stream_waker: Option<Waker>,
}

/// `Stream` to `Connection` commands.
Expand Down Expand Up @@ -424,6 +430,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
},
dropped_streams: Vec::new(),
pending_frames: VecDeque::default(),
pending_acks: HashSet::default(),
new_outbound_stream_waker: None,
}
}

Expand Down Expand Up @@ -490,10 +498,16 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
}

fn new_outbound(&mut self) -> Result<Stream> {
fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
if self.streams.len() >= self.config.max_num_streams {
log::error!("{}: maximum number of streams reached", self.id);
return Err(ConnectionError::TooManyStreams);
return Poll::Ready(Err(ConnectionError::TooManyStreams));
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

if self.pending_acks.len() >= MAX_ACK_BACKLOG {
log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, parking task until remote acknowledges at least one stream");
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
self.new_outbound_stream_waker = Some(cx.waker().clone());
return Poll::Pending;
}

log::trace!("{}: creating new outbound stream", self.id);
Expand Down Expand Up @@ -522,7 +536,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
log::debug!("{}: new outbound {} of {}", self.id, stream, self);
self.streams.insert(id, stream.clone());

Ok(stream)
self.pending_acks.insert(id);

Poll::Ready(Ok(stream))
}

fn on_send_frame(&mut self, frame: Frame<Either<Data, WindowUpdate>>) {
Expand All @@ -549,6 +565,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
/// if one was opened by the remote.
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
log::trace!("{}: received: {}", self.id, frame.header());

if frame.header().flags().contains(header::ACK) {
self.pending_acks.remove(&frame.header().stream_id());
if let Some(waker) = self.new_outbound_stream_waker.take() {
waker.wake();
}
}

let action = match frame.header().tag() {
Tag::Data => self.on_data(frame.into_data()),
Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()),
Expand Down
5 changes: 5 additions & 0 deletions yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub type Result<T> = std::result::Result<T, ConnectionError>;
/// actual upper bound is this value + number of clones.
const MAX_COMMAND_BACKLOG: usize = 32;

/// The maximum number of streams we will open without an acknowledgement from the other peer.
///
/// This enables a very basic form of backpressure.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
const MAX_ACK_BACKLOG: usize = 256;

/// Default maximum number of bytes a Yamux data frame might carry as its
/// payload when being send. Larger Payloads will be split.
///
Expand Down