Skip to content

Commit

Permalink
Add support for "Expect: 100-continue" header
Browse files Browse the repository at this point in the history
Change-Id: I480c73d330989fa23a6545fe0ecfa686ac1cb295
  • Loading branch information
johan-bjareholt committed Nov 8, 2023
1 parent 1456980 commit e2c7fee
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 65 deletions.
145 changes: 83 additions & 62 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,39 +548,34 @@ impl Response {
})
}

/// Create a response from a Read trait impl.
/// Create a response from a DeadlineStream, reading and parsing only the status line, headers
/// and its following CRLF.
///
/// This is hopefully useful for unit tests.
///
/// Example:
///
/// use std::io::Cursor;
///
/// let text = "HTTP/1.1 401 Authorization Required\r\n\r\nPlease log in\n";
/// let read = Cursor::new(text.to_string().into_bytes());
/// let resp = ureq::Response::do_from_read(read);
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(stream: Stream, unit: &Unit) -> Result<Response, Error> {
let remote_addr = stream.remote_addr;

let local_addr = match stream.socket() {
/// Since this function only reads the status line, header and the following CRLF, the returned
/// Response will have an empty reader and does not take ownership of DeadlineStream.
/// To read the following data, the DeadlineStream can be read again after the call to this
/// function.
/// If consume is set to true, the status line and headers will be consumed in the
/// DeadlineStream, so next call to read will start after the headers and their CRLF.
pub(crate) fn do_head_from_stream(stream: &mut DeadlineStream, unit: &Unit, consume: bool) -> Result<Response, Error> {
let mut tot_bytes_read = 0;
let remote_addr = stream.inner_ref().remote_addr;

let local_addr = match stream.inner_ref().socket() {
Some(sock) => sock.local_addr().map_err(Error::from)?,
None => std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 0).into(),
};

//
// HTTP/1.1 200 OK\r\n
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);

// The status line we can ignore non-utf8 chars and parse as_str_lossy().
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
let (bytes_read, status_line) = read_next_line(stream, "the status line")?;
tot_bytes_read += bytes_read;
let status_line = status_line.into_string_lossy();
let (index, status) = parse_status_line(status_line.as_str())?;
let http_version = &status_line.as_str()[0..index.http_version];

let mut headers: Vec<Header> = Vec::new();
while headers.len() <= MAX_HEADER_COUNT {
let line = read_next_line(&mut stream, "a header")?;
let (bytes_read, line) = read_next_line(stream, "a header")?;
tot_bytes_read += bytes_read;
if line.is_empty() {
break;
}
Expand All @@ -595,24 +590,15 @@ impl Response {
));
}

let compression =
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);

let connection_option =
Self::connection_option(http_version, get_header(&headers, "connection"));

let body_type = Self::body_type(&unit.method, status, http_version, &headers);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
if consume {
stream.fill_buf()?;
stream.consume(tot_bytes_read);
}

let reader =
Self::stream_to_reader(stream, unit, body_type, compression, connection_option);

let url = unit.url.clone();

let reader = Box::new(Cursor::new(vec![]));

let response = Response {
url,
status_line,
Expand All @@ -627,6 +613,41 @@ impl Response {
Ok(response)
}

/// Create a Response from a Read trait impl.
///
/// This is hopefully useful for unit tests.
///
/// Example:
///
/// use std::io::Cursor;
///
/// let text = "HTTP/1.1 401 Authorization Required\r\n\r\nPlease log in\n";
/// let read = Cursor::new(text.to_string().into_bytes());
/// let resp = ureq::Response::do_from_read(read);
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(mut stream: DeadlineStream, unit: &Unit) -> Result<Response, Error> {
let mut response = Self::do_head_from_stream(&mut stream, unit, false)?;

let compression =
get_header(&response.headers, "content-encoding").and_then(Compression::from_header_value);

let connection_option =
Self::connection_option(response.http_version(), get_header(&response.headers, "connection"));

let body_type = Self::body_type(&unit.method, response.status(), response.http_version(), &response.headers);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
response.headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
}

response.reader =
Self::stream_to_reader(stream, unit, body_type, compression, connection_option);

Ok(response)
}

#[cfg(test)]
pub fn set_url(&mut self, url: Url) {
self.url = url;
Expand Down Expand Up @@ -766,16 +787,19 @@ impl FromStr for Response {
&request_reader,
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Self::do_from_stream(stream, &unit)
}
}

fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<HeaderLine> {
fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<(usize, HeaderLine)> {
let mut buf = Vec::new();
let result = reader
.take((MAX_HEADER_SIZE + 1) as u64)
.read_until(b'\n', &mut buf);

let bytes_read = buf.len();

match result {
Ok(0) => Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
Expand Down Expand Up @@ -812,7 +836,7 @@ fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<Header
buf.pop();
}

Ok(buf.into())
Ok((bytes_read, buf.into()))
}

/// Limits a `Read` to a content size (as set by a "Content-Length" header).
Expand Down Expand Up @@ -1204,17 +1228,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(&agent, PoolKey::from_parts("https", "example.com", 443)),
);
Response::do_from_stream(
stream,
&Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Response::do_from_stream(stream, unit)
.unwrap();
assert_eq!(agent2.state.pool.len(), 1);
}
Expand All @@ -1236,18 +1259,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::none(),
);
let resp = Response::do_from_stream(
stream,
&Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
let resp = Response::do_from_stream(stream, unit).unwrap();
let body = resp.into_string().unwrap();
assert_eq!(body, "hi\n");
}
Expand Down
5 changes: 3 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,9 @@ pub(crate) fn connect_host(
let s = stream.try_clone()?;
let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
let s = Stream::new(s, remote_addr, pool_returner);
let response = Response::do_from_stream(s, &unit)?;
let stream = Stream::new(s, remote_addr, pool_returner);
let stream = DeadlineStream::new(stream, unit.deadline);
let response = Response::do_from_stream(stream, &unit)?;
Proxy::verify_response(&response)?;
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ fn connect_inner(
debug!("sending request {} {}", method, url);
}

let mut expect_100_continue = false;
for header in &unit.headers {
if header.name() == "Expect" && header.value() == Some("100-continue") {
expect_100_continue = true;
}
}

let send_result = send_prelude(unit, &mut stream);

if let Err(err) = send_result {
Expand All @@ -277,8 +284,17 @@ fn connect_inner(
}
let retryable = unit.is_retryable(&body);

let mut stream = stream::DeadlineStream::new(stream, unit.deadline);

if expect_100_continue {
let response = Response::do_head_from_stream(&mut stream, &unit, true)?;
if response.status() != 100 {
return Err(Error::Status(response.status(), response));
}
}

// send the body (which can be empty now depending on redirects)
body::send_body(body, unit.is_chunked, &mut stream)?;
body::send_body(body, unit.is_chunked, stream.inner_mut())?;

// start reading the response to process cookies and redirects.
let result = Response::do_from_stream(stream, &unit);
Expand Down

0 comments on commit e2c7fee

Please sign in to comment.