Skip to content

Commit

Permalink
feat(s2n-quic-dc): update MTU on dc path when MTU is updated (#2327)
Browse files Browse the repository at this point in the history
* feat(s2n-quic-dc): update MTU on dc path when MTU is updated

* remove std

* add tests and update map

* typo

* add test

* add clone test

* use fetch_min

* add entry to the handshakingpath
  • Loading branch information
WesleyRosenblum authored Oct 10, 2024
1 parent 0fbb99d commit 7752afb
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 65 deletions.
28 changes: 20 additions & 8 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl Map {
) -> Option<(seal::Once, Credentials, ApplicationParams)> {
let state = self.state.peers.get_by_key(&peer)?;
let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters))
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
Expand All @@ -319,7 +319,7 @@ impl Map {
let state = self.state.peers.get_by_key(&peer)?;
let keys = state.bidi_local(features);

Some((keys, state.parameters))
Some((keys, state.parameters.clone()))
}

pub fn pair_for_credentials(
Expand All @@ -330,7 +330,7 @@ impl Map {
) -> Option<(Bidirectional, ApplicationParams)> {
let state = self.pre_authentication(credentials, control_out)?;

let params = state.parameters;
let params = state.parameters.clone();
let keys = state.bidi_remote(self.clone(), credentials, features);

Some((keys, params))
Expand Down Expand Up @@ -684,13 +684,13 @@ impl Entry {
secret: schedule::Secret,
sender: sender::State,
receiver: receiver::State,
mut parameters: ApplicationParams,
parameters: ApplicationParams,
rehandshake_time: Duration,
) -> Self {
// clamp max datagram size to a well-known value
parameters.max_datagram_size = parameters
parameters
.max_datagram_size
.min(crate::stream::MAX_DATAGRAM_SIZE as _);
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);

assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
Expand Down Expand Up @@ -911,6 +911,7 @@ pub struct HandshakingPath {
parameters: ApplicationParams,
endpoint_type: s2n_quic_core::endpoint::Type,
secret: Option<schedule::Secret>,
entry: Option<Arc<Entry>>,
map: Map,
}

Expand All @@ -924,9 +925,10 @@ impl HandshakingPath {
Self {
peer: connection_info.remote_address.clone().into(),
dc_version: connection_info.dc_version,
parameters: connection_info.application_params,
parameters: connection_info.application_params.clone(),
endpoint_type,
secret: None,
entry: None,
map,
}
}
Expand Down Expand Up @@ -1018,12 +1020,22 @@ impl dc::Path for HandshakingPath {
.expect("peer tokens are only received after secrets are ready"),
sender,
receiver,
self.parameters,
self.parameters.clone(),
self.map.state.rehandshake_period,
);
let entry = Arc::new(entry);
self.entry = Some(entry.clone());
self.map.insert(entry);
}

fn on_mtu_updated(&mut self, mtu: u16) {
if let Some(entry) = self.entry.as_ref() {
entry
.parameters
.max_datagram_size
.store(mtu, Ordering::Relaxed);
}
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use s2n_quic_core::{
inet::{ExplicitCongestionNotification, SocketAddress},
varint::VarInt,
};
use std::{io, sync::Arc};
use std::{
io,
sync::{atomic::Ordering, Arc},
};
use tracing::{debug_span, Instrument as _};

type Result<T = (), E = io::Error> = core::result::Result<T, E>;
Expand Down Expand Up @@ -193,7 +196,7 @@ where
let flow = flow::non_blocking::State::new(flow_offset);

let path = send::path::Info {
max_datagram_size: parameters.max_datagram_size,
max_datagram_size: parameters.max_datagram_size.load(Ordering::Relaxed),
send_quantum,
ecn: ExplicitCongestionNotification::Ect0,
next_expected_control_packet: VarInt::ZERO,
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/send/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use s2n_quic_core::{
varint::VarInt,
};
use slotmap::SlotMap;
use std::collections::{BinaryHeap, VecDeque};
use std::{
collections::{BinaryHeap, VecDeque},
sync::atomic::Ordering,
};
use tracing::{debug, trace};

pub mod probe;
Expand Down Expand Up @@ -118,7 +121,7 @@ pub struct PeerActivity {
impl State {
#[inline]
pub fn new(stream_id: stream::Id, params: &ApplicationParams) -> Self {
let max_datagram_size = params.max_datagram_size;
let max_datagram_size = params.max_datagram_size.load(Ordering::Relaxed);
let initial_max_data = params.remote_max_data;
let local_max_data = params.local_send_max_data;

Expand Down
73 changes: 69 additions & 4 deletions quic/s2n-quic-core/src/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use crate::{
transport::parameters::{DcSupportedVersions, InitialFlowControlLimits},
varint::VarInt,
};
use core::{num::NonZeroU32, time::Duration};
use core::{
num::NonZeroU32,
sync::atomic::{AtomicU16, Ordering},
time::Duration,
};

mod disabled;
mod traits;
Expand Down Expand Up @@ -91,25 +95,37 @@ impl<'a> DatagramInfo<'a> {
}

/// Various settings relevant to the dc path
#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
#[non_exhaustive]
pub struct ApplicationParams {
pub max_datagram_size: u16,
pub max_datagram_size: AtomicU16,
pub remote_max_data: VarInt,
pub local_send_max_data: VarInt,
pub local_recv_max_data: VarInt,
// Actually a Duration, stored as milliseconds to shrink this struct
pub max_idle_timeout: Option<NonZeroU32>,
}

impl Clone for ApplicationParams {
fn clone(&self) -> Self {
Self {
max_datagram_size: AtomicU16::new(self.max_datagram_size.load(Ordering::Relaxed)),
remote_max_data: self.remote_max_data,
local_send_max_data: self.local_send_max_data,
local_recv_max_data: self.local_recv_max_data,
max_idle_timeout: self.max_idle_timeout,
}
}
}

impl ApplicationParams {
pub fn new(
max_datagram_size: u16,
peer_flow_control_limits: &InitialFlowControlLimits,
limits: &Limits,
) -> Self {
Self {
max_datagram_size,
max_datagram_size: AtomicU16::new(max_datagram_size),
remote_max_data: peer_flow_control_limits.max_data,
local_send_max_data: limits.initial_stream_limits().max_data_bidi_local,
local_recv_max_data: limits.initial_stream_limits().max_data_bidi_remote,
Expand All @@ -125,3 +141,52 @@ impl ApplicationParams {
Some(Duration::from_millis(self.max_idle_timeout?.get() as u64))
}
}

#[cfg(test)]
mod tests {
use crate::{
connection::Limits, dc::ApplicationParams, transport::parameters::InitialFlowControlLimits,
varint::VarInt,
};
use std::{sync::atomic::Ordering, time::Duration};

#[test]
fn clone() {
let initial_flow_control_limits = InitialFlowControlLimits {
max_data: VarInt::from_u32(2222),
..Default::default()
};

let limits = Limits {
bidirectional_local_data_window: 1234.try_into().unwrap(),
bidirectional_remote_data_window: 6789.try_into().unwrap(),
max_idle_timeout: Duration::from_millis(999).try_into().unwrap(),
..Default::default()
};

let params = ApplicationParams::new(9000, &initial_flow_control_limits, &limits);

assert_eq!(9000, params.max_datagram_size.load(Ordering::Relaxed));
assert_eq!(limits.max_idle_timeout(), params.max_idle_timeout());
assert_eq!(1234, params.local_send_max_data.as_u64());
assert_eq!(6789, params.local_recv_max_data.as_u64());
assert_eq!(2222, params.remote_max_data.as_u64());

let cloned_params = params.clone();

assert_eq!(
params.max_datagram_size.load(Ordering::Relaxed),
cloned_params.max_datagram_size.load(Ordering::Relaxed)
);
assert_eq!(params.max_idle_timeout, cloned_params.max_idle_timeout);
assert_eq!(
params.local_send_max_data,
cloned_params.local_send_max_data
);
assert_eq!(
params.local_recv_max_data,
cloned_params.local_recv_max_data
);
assert_eq!(params.remote_max_data, cloned_params.remote_max_data);
}
}
4 changes: 4 additions & 0 deletions quic/s2n-quic-core/src/dc/disabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ impl Path for () {
) {
unimplemented!()
}

fn on_mtu_updated(&mut self, _mtu: u16) {
unimplemented!()
}
}
16 changes: 13 additions & 3 deletions quic/s2n-quic-core/src/dc/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use core::{num::NonZeroU32, time::Duration};
use std::sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU16, AtomicU8, Ordering},
Arc,
};

Expand All @@ -36,14 +36,19 @@ pub struct MockDcPath {
pub on_peer_stateless_reset_tokens_count: u8,
pub stateless_reset_tokens: Vec<stateless_reset::Token>,
pub peer_stateless_reset_tokens: Vec<stateless_reset::Token>,
pub mtu: u16,
}

impl dc::Endpoint for MockDcEndpoint {
type Path = MockDcPath;

fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option<Self::Path> {
fn new_path(&mut self, connection_info: &ConnectionInfo) -> Option<Self::Path> {
Some(MockDcPath {
stateless_reset_tokens: self.stateless_reset_tokens.clone(),
mtu: connection_info
.application_params
.max_datagram_size
.load(Ordering::Relaxed),
..Default::default()
})
}
Expand Down Expand Up @@ -76,10 +81,15 @@ impl dc::Path for MockDcPath {
self.peer_stateless_reset_tokens
.extend(stateless_reset_tokens);
}

fn on_mtu_updated(&mut self, mtu: u16) {
self.mtu = mtu
}
}

#[allow(clippy::declare_interior_mutable_const)]
pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams {
max_datagram_size: 1472,
max_datagram_size: AtomicU16::new(1472),
remote_max_data: VarInt::from_u32(1u32 << 25),
local_send_max_data: VarInt::from_u32(1u32 << 25),
local_recv_max_data: VarInt::from_u32(1u32 << 25),
Expand Down
10 changes: 10 additions & 0 deletions quic/s2n-quic-core/src/dc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub trait Path: 'static + Send {
&mut self,
stateless_reset_tokens: impl Iterator<Item = &'a stateless_reset::Token>,
);

/// Called when the MTU has been updated for the path
fn on_mtu_updated(&mut self, mtu: u16);
}

impl<P: Path> Path for Option<P> {
Expand All @@ -69,4 +72,11 @@ impl<P: Path> Path for Option<P> {
path.on_peer_stateless_reset_tokens(stateless_reset_tokens)
}
}

#[inline]
fn on_mtu_updated(&mut self, max_datagram_size: u16) {
if let Some(path) = self {
path.on_mtu_updated(max_datagram_size)
}
}
}
Loading

0 comments on commit 7752afb

Please sign in to comment.