Skip to content

Commit

Permalink
Add support for "Expect: 100-continue" header
Browse files Browse the repository at this point in the history
Change-Id: I7cc1035e9d36e4b2c94edc5ad571b74c11f74c94
  • Loading branch information
johan-bjareholt committed Mar 26, 2024
1 parent 0955dfe commit c8e731b
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 78 deletions.
10 changes: 7 additions & 3 deletions src/http_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
};

use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
use crate::{
header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response,
};

/// Converts an [`http::Response`] into a [`Response`].
///
Expand Down Expand Up @@ -44,7 +46,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
HeaderLine::from(raw_header).into_header().unwrap()
})
.collect::<Vec<_>>(),
reader: Box::new(Cursor::new(value.into_body())),
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
history: vec![],
Expand Down Expand Up @@ -328,12 +330,14 @@ mod tests {

#[test]
fn convert_to_http_response_bytes() {
use crate::response::PendingReader;
use http::Response;
use std::io::Cursor;

let mut response = super::Response::new(200, "OK", "tbr").unwrap();
// b'\xFF' as invalid UTF-8 character
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
response.reader =
PendingReader::Reader(Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef])));
let http_response: Response<Vec<u8>> = response.into();

assert_eq!(
Expand Down
11 changes: 8 additions & 3 deletions src/http_interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
};

use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
use crate::{
header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response,
};

/// Converts an [`http::Response`] into a [`Response`].
///
Expand Down Expand Up @@ -47,7 +49,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
HeaderLine::from(raw_header).into_header().unwrap()
})
.collect::<Vec<_>>(),
reader: Box::new(Cursor::new(value.into_body())),
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
history: vec![],
Expand Down Expand Up @@ -280,6 +282,7 @@ impl From<Request> for http::request::Builder {
mod tests {
use crate::header::{add_header, get_header_raw, HeaderLine};
use http_02 as http;
use std::io::Read;

#[test]
fn convert_http_response() {
Expand Down Expand Up @@ -376,7 +379,9 @@ mod tests {

let mut response = super::Response::new(200, "OK", "tbr").unwrap();
// b'\xFF' as invalid UTF-8 character
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
response.reader = super::PendingReader::Reader(Box::new(Cursor::new(vec![
b'\xFF', 0xde, 0xad, 0xbe, 0xef,
])));
let http_response: Response<Vec<u8>> = response.into();

assert_eq!(
Expand Down
176 changes: 107 additions & 69 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ enum BodyType {
CloseDelimited,
}

pub(crate) enum PendingReader {
BeforeBodyStart,
Reader(Box<dyn Read + Send + Sync + 'static>),
}

/// Response instances are created as results of firing off requests.
///
/// The `Response` is used to read response headers and decide what to do with the body.
Expand Down Expand Up @@ -81,7 +86,7 @@ pub struct Response {
pub(crate) index: ResponseStatusIndex,
pub(crate) status: u16,
pub(crate) headers: Vec<Header>,
pub(crate) reader: Box<dyn Read + Send + Sync + 'static>,
pub(crate) reader: PendingReader,
/// The socket address of the server that sent the response.
pub(crate) remote_addr: SocketAddr,
/// The socket address of the client that sent the request.
Expand Down Expand Up @@ -282,7 +287,12 @@ impl Response {
/// # }
/// ```
pub fn into_reader(self) -> Box<dyn Read + Send + Sync + 'static> {
self.reader
match self.reader {
PendingReader::Reader(reader) => reader,
PendingReader::BeforeBodyStart => panic!(
"It is not valid to call into_reader before Request::stream_to_reader is called"
),
}
}

// Determine what to do with the connection after we've read the body.
Expand Down Expand Up @@ -548,39 +558,33 @@ impl Response {
})
}

/// Create a response from a Read trait impl.
///
/// This is hopefully useful for unit tests.
///
/// Example:
/// Create a response from a DeadlineStream, reading and parsing only the status line, headers
/// and its following CRLF.
///
/// use std::io::Cursor;
///
/// let text = "HTTP/1.1 401 Authorization Required\r\n\r\nPlease log in\n";
/// let read = Cursor::new(text.to_string().into_bytes());
/// let resp = ureq::Response::do_from_read(read);
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(stream: Stream, unit: &Unit) -> Result<Response, Error> {
let remote_addr = stream.remote_addr;
/// Since this function only reads the status line, header and the following CRLF, the returned
/// Response will have an empty reader and does not take ownership of DeadlineStream.
/// To read the following data, the DeadlineStream can be read again after the call to this
/// function.
pub(crate) fn read_response_head(
stream: &mut DeadlineStream,
unit: &Unit,
) -> Result<Response, Error> {
let mut bytes_read = 0;
let remote_addr = stream.inner_ref().remote_addr;

let local_addr = match stream.socket() {
let local_addr = match stream.inner_ref().socket() {
Some(sock) => sock.local_addr().map_err(Error::from)?,
None => std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 0).into(),
};

//
// HTTP/1.1 200 OK\r\n
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);

// The status line we can ignore non-utf8 chars and parse as_str_lossy().
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
let status_line =
read_next_line(stream, "the status line", &mut bytes_read)?.into_string_lossy();
let (index, status) = parse_status_line(status_line.as_str())?;
let http_version = &status_line.as_str()[0..index.http_version];

let mut headers: Vec<Header> = Vec::new();
while headers.len() <= MAX_HEADER_COUNT {
let line = read_next_line(&mut stream, "a header")?;
let line = read_next_line(stream, "a header", &mut bytes_read)?;
if line.is_empty() {
break;
}
Expand All @@ -595,23 +599,8 @@ impl Response {
));
}

let compression =
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);

let connection_option =
Self::connection_option(http_version, get_header(&headers, "connection"));

let body_type = Self::body_type(&unit.method, status, http_version, &headers);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
}

let reader =
Self::stream_to_reader(stream, unit, body_type, compression, connection_option);

let url = unit.url.clone();
let reader = PendingReader::BeforeBodyStart;

let response = Response {
url,
Expand All @@ -627,6 +616,50 @@ impl Response {
Ok(response)
}

/// Create a Response from a DeadlineStream
///
/// Parses and comsumes the header from the stream and creates a Response with
/// the stream as the reader. The response reader also uncompresses the body
/// if it is compressed.
pub(crate) fn do_from_stream(
mut stream: DeadlineStream,
unit: &Unit,
) -> Result<Response, Error> {
let mut response = Self::read_response_head(&mut stream, unit)?;

let compression = get_header(&response.headers, "content-encoding")
.and_then(Compression::from_header_value);

let connection_option = Self::connection_option(
response.http_version(),
get_header(&response.headers, "connection"),
);

let body_type = Self::body_type(
&unit.method,
response.status(),
response.http_version(),
&response.headers,
);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
response
.headers
.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
}

response.reader = PendingReader::Reader(Self::stream_to_reader(
stream,
unit,
body_type,
compression,
connection_option,
));

Ok(response)
}

#[cfg(test)]
pub fn set_url(&mut self, url: Url) {
self.url = url;
Expand Down Expand Up @@ -766,16 +799,23 @@ impl FromStr for Response {
&request_reader,
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Self::do_from_stream(stream, &unit)
}
}

fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<HeaderLine> {
fn read_next_line(
reader: &mut impl BufRead,
context: &str,
running_total: &mut usize,
) -> io::Result<HeaderLine> {
let mut buf = Vec::new();
let result = reader
.take((MAX_HEADER_SIZE + 1) as u64)
.read_until(b'\n', &mut buf);

*running_total += buf.len();

match result {
Ok(0) => Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
Expand Down Expand Up @@ -1078,7 +1118,8 @@ mod tests {
const LEN: usize = MAX_HEADER_SIZE + 1;
let s = format!("Long-Header: {}\r\n", "A".repeat(LEN),);
let mut cursor = Cursor::new(s);
let result = read_next_line(&mut cursor, "some context");
let mut bytes_read = 0;
let result = read_next_line(&mut cursor, "some context", &mut bytes_read);
let err = result.expect_err("did not error on too-large header");
assert_eq!(err.kind(), io::ErrorKind::Other);
assert_eq!(
Expand Down Expand Up @@ -1117,9 +1158,9 @@ mod tests {
encoding_rs::WINDOWS_1252.encode("HTTP/1.1 302 Déplacé Temporairement\r\n");
let bytes = cow.to_vec();
let mut reader = io::BufReader::new(io::Cursor::new(bytes));
let r = read_next_line(&mut reader, "test status line");
let h = r.unwrap();
assert_eq!(h.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
let mut bytes_read = 0;
let header = read_next_line(&mut reader, "test status line", &mut bytes_read).unwrap();
assert_eq!(header.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
}

#[test]
Expand Down Expand Up @@ -1150,6 +1191,7 @@ mod tests {
&request_reader,
None,
);
let s = stream::DeadlineStream::new(s, unit.deadline);
let resp = Response::do_from_stream(s.into(), &unit).unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.header("x-geo-header"), None);
Expand Down Expand Up @@ -1204,18 +1246,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(&agent, PoolKey::from_parts("https", "example.com", 443)),
);
Response::do_from_stream(
stream,
&Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Response::do_from_stream(stream, unit).unwrap();
assert_eq!(agent2.state.pool.len(), 1);
}

Expand All @@ -1236,18 +1276,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::none(),
);
let resp = Response::do_from_stream(
stream,
&Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
let resp = Response::do_from_stream(stream, unit).unwrap();
let body = resp.into_string().unwrap();
assert_eq!(body, "hi\n");
}
Expand Down
5 changes: 3 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,9 @@ pub(crate) fn connect_host(
let s = stream.try_clone()?;
let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
let s = Stream::new(s, remote_addr, pool_returner);
let response = Response::do_from_stream(s, unit)?;
let stream = Stream::new(s, remote_addr, pool_returner);
let stream = DeadlineStream::new(stream, unit.deadline);
let response = Response::do_from_stream(stream, unit)?;
Proxy::verify_response(&response)?;
}
}
Expand Down
Loading

0 comments on commit c8e731b

Please sign in to comment.