Skip to content
This repository has been archived by the owner on Jun 27, 2022. It is now read-only.

Commit

Permalink
add read timeout
Browse files Browse the repository at this point in the history
This commit adds a `set_read_timeout` function similar to Rust's
`set_read_timeout`[1]. The differences compared to upstream are:

- Operation always succeed.
- It takes an i64 instead Duration.

[1] https://doc.rust-lang.org/nightly/std/net/struct.UdpSocket.html#method.set_read_timeout
  • Loading branch information
vinipsmaker committed Oct 14, 2015
1 parent 4828dc0 commit ef04260
Showing 1 changed file with 62 additions and 17 deletions.
79 changes: 62 additions & 17 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub enum SocketError {
ConnectionClosed,
ConnectionReset,
ConnectionTimedOut,
UserTimedOut,
InvalidAddress,
InvalidPacket,
InvalidReply,
Expand All @@ -46,7 +47,7 @@ impl From<SocketError> for Error {
let (kind, message) = match error {
ConnectionClosed => (ErrorKind::NotConnected, "The socket is closed"),
ConnectionReset => (ErrorKind::ConnectionReset, "Connection reset by remote peer"),
ConnectionTimedOut => (ErrorKind::TimedOut, "Connection timed out"),
ConnectionTimedOut | UserTimedOut => (ErrorKind::TimedOut, "Connection timed out"),
InvalidAddress => (ErrorKind::InvalidInput, "Invalid address"),
InvalidPacket => (ErrorKind::Other, "Error parsing packet"),
InvalidReply => (ErrorKind::ConnectionRefused, "The remote peer sent an invalid reply"),
Expand Down Expand Up @@ -187,6 +188,9 @@ pub struct UtpSocket {

/// Maximum retransmission retries
pub max_retransmission_retries: u32,

/// Used by `set_read_timeout`.
user_read_timeout: i64,
}

impl UtpSocket {
Expand Down Expand Up @@ -233,6 +237,7 @@ impl UtpSocket {
congestion_timeout: INITIAL_CONGESTION_TIMEOUT,
cwnd: INIT_CWND * MSS,
max_retransmission_retries: MAX_RETRANSMISSION_RETRIES,
user_read_timeout: 0,
}
}

Expand Down Expand Up @@ -346,7 +351,7 @@ impl UtpSocket {
// Receive JAKE
let mut buf = [0; BUF_SIZE];
while self.state != SocketState::Closed {
try!(self.recv(&mut buf));
try!(self.recv(&mut buf, false));
}

Ok(())
Expand Down Expand Up @@ -375,7 +380,8 @@ impl UtpSocket {
return Ok((0, self.connected_to));
}

match self.recv(buf) {
let user_read_timeout = self.user_read_timeout;
match self.recv(buf, user_read_timeout != 0) {
Ok((0, _src)) => continue,
Ok(x) => return Ok(x),
Err(e) => return Err(e)
Expand All @@ -384,11 +390,32 @@ impl UtpSocket {
}
}

fn recv(&mut self, buf: &mut[u8]) -> Result<(usize, SocketAddr)> {
/// Changes read operations to block for at most the specified number of
/// milliseconds.
pub fn set_read_timeout(&mut self, user_timeout: Option<i64>) {
self.user_read_timeout = match user_timeout {
Some(t) => {
if t > 0 {
t
} else {
0
}
},
None => 0
}
}

fn recv(&mut self, buf: &mut[u8], use_user_timeout: bool)
-> Result<(usize, SocketAddr)> {
let mut b = [0; BUF_SIZE + HEADER_SIZE];
let now = now_microseconds();
let (read, src);
let mut retries = 0;
let user_timeout = if use_user_timeout {
self.user_read_timeout
} else {
0
};

// Try to receive a packet and handle timeouts
loop {
Expand All @@ -398,17 +425,32 @@ impl UtpSocket {
return Err(Error::from(SocketError::ConnectionTimedOut));
}

let timeout = if self.state != SocketState::New {
let congestion_timeout = if self.state != SocketState::New {
debug!("setting read timeout of {} ms", self.congestion_timeout);
self.congestion_timeout as i64
} else { 0 };
let timeout = if user_timeout != 0 {
if congestion_timeout != 0 {
use std::cmp::min;
min(congestion_timeout, user_timeout)
} else {
user_timeout
}
} else {
congestion_timeout
};

if user_timeout != 0
&& ((now_microseconds() - now) / 1000) as i64 >= user_timeout {
return Err(Error::from(SocketError::UserTimedOut));
}

match self.socket.recv_timeout(&mut b, timeout) {
Ok((r, s)) => { read = r; src = s; break },
Err(ref e) if (e.kind() == ErrorKind::WouldBlock ||
e.kind() == ErrorKind::TimedOut) => {
debug!("recv_from timed out");
try!(self.handle_receive_timeout());
try!(self.handle_receive_timeout(user_timeout != 0));
},
Err(e) => return Err(e),
};
Expand Down Expand Up @@ -449,8 +491,11 @@ impl UtpSocket {
Ok((read, src))
}

fn handle_receive_timeout(&mut self) -> Result<()> {
self.congestion_timeout = self.congestion_timeout * 2;
fn handle_receive_timeout(&mut self, keep_current_timeout: bool)
-> Result<()> {
if !keep_current_timeout {
self.congestion_timeout *= 2
}
self.cwnd = MSS;

// There are three possible cases here:
Expand Down Expand Up @@ -616,7 +661,7 @@ impl UtpSocket {
let mut buf = [0u8; BUF_SIZE];
while !self.send_window.is_empty() {
debug!("packets in send window: {}", self.send_window.len());
try!(self.recv(&mut buf));
try!(self.recv(&mut buf, false));
}

Ok(())
Expand Down Expand Up @@ -648,7 +693,7 @@ impl UtpSocket {
debug!("self.duplicate_ack_count: {}", self.duplicate_ack_count);
debug!("now_microseconds() - now = {}", now_microseconds() - now);
let mut buf = [0; BUF_SIZE];
try!(self.recv(&mut buf));
try!(self.recv(&mut buf, false));
}
debug!("out: now_microseconds() - now = {}", now_microseconds() - now);

Expand Down Expand Up @@ -1366,7 +1411,7 @@ mod test {
thread::spawn(move || {
// Make the server listen for incoming connections
let mut buf = [0u8; BUF_SIZE];
let _resp = server.recv(&mut buf);
let _resp = server.recv(&mut buf, false);
tx.send(server.seq_nr).unwrap();

// Close the connection
Expand Down Expand Up @@ -1730,7 +1775,7 @@ mod test {

let mut buf = [0; BUF_SIZE];
// Expect SYN
iotry!(server.recv(&mut buf));
iotry!(server.recv(&mut buf, false));

// Receive data
let data_packet = match server.socket.recv_from(&mut buf) {
Expand Down Expand Up @@ -1803,7 +1848,7 @@ mod test {
});

let mut buf = [0u8; BUF_SIZE];
server.recv(&mut buf).unwrap();
server.recv(&mut buf, false).unwrap();
// After establishing a new connection, the server's ids are a mirror of the client's.
assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);

Expand Down Expand Up @@ -1910,7 +1955,7 @@ mod test {
});

let mut buf = [0u8; BUF_SIZE];
iotry!(server.recv(&mut buf));
iotry!(server.recv(&mut buf, false));
// After establishing a new connection, the server's ids are a mirror of the client's.
assert_eq!(server.receiver_connection_id, server.sender_connection_id + 1);

Expand Down Expand Up @@ -2244,7 +2289,7 @@ mod test {
let mut buf = [0; BUF_SIZE];

// Accept connection
iotry!(server.recv(&mut buf));
iotry!(server.recv(&mut buf, false));

// Send FIN without acknowledging packets received
let mut packet = Packet::new();
Expand Down Expand Up @@ -2328,7 +2373,7 @@ mod test {

// Wait for a connection to be established
let mut buf = [0; 1024];
iotry!(server.recv(&mut buf));
iotry!(server.recv(&mut buf, false));

// `peer_addr` should succeed and be equal to the client's address
assert!(server.peer_addr().is_ok());
Expand Down Expand Up @@ -2395,7 +2440,7 @@ mod test {

// Try to receive ACKs, time out too many times on flush, and fail with `TimedOut`
let mut buf = [0; BUF_SIZE];
match server.recv(&mut buf) {
match server.recv(&mut buf, false) {
Err(ref e) if e.kind() == ErrorKind::TimedOut => (),
x => panic!("Expected Err(TimedOut), got {:?}", x),
}
Expand Down

0 comments on commit ef04260

Please sign in to comment.