Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for "Expect: 100-continue" header #679

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,13 @@ mod tests {
assert!(size < 500); // 344 on Macbook M1
}
}

pub(crate) fn error_get_root_source<'a>(
err: &'a (dyn std::error::Error + 'static),
) -> &'a (dyn std::error::Error + 'static) {
if let Some(err) = err.source() {
error_get_root_source(err) as &(dyn std::error::Error + 'static)
} else {
err as &(dyn std::error::Error + 'static)
}
}
10 changes: 7 additions & 3 deletions src/http_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
};

use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
use crate::{
header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response,
};

/// Converts an [`http::Response`] into a [`Response`].
///
Expand Down Expand Up @@ -44,7 +46,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
HeaderLine::from(raw_header).into_header().unwrap()
})
.collect::<Vec<_>>(),
reader: Box::new(Cursor::new(value.into_body())),
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
history: vec![],
Expand Down Expand Up @@ -328,12 +330,14 @@ mod tests {

#[test]
fn convert_to_http_response_bytes() {
use crate::response::PendingReader;
use http::Response;
use std::io::Cursor;

let mut response = super::Response::new(200, "OK", "tbr").unwrap();
// b'\xFF' as invalid UTF-8 character
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
response.reader =
PendingReader::Reader(Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef])));
let http_response: Response<Vec<u8>> = response.into();

assert_eq!(
Expand Down
11 changes: 8 additions & 3 deletions src/http_interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
};

use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
use crate::{
header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response,
};

/// Converts an [`http::Response`] into a [`Response`].
///
Expand Down Expand Up @@ -47,7 +49,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
HeaderLine::from(raw_header).into_header().unwrap()
})
.collect::<Vec<_>>(),
reader: Box::new(Cursor::new(value.into_body())),
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
history: vec![],
Expand Down Expand Up @@ -280,6 +282,7 @@ impl From<Request> for http::request::Builder {
mod tests {
use crate::header::{add_header, get_header_raw, HeaderLine};
use http_02 as http;
use std::io::Read;

#[test]
fn convert_http_response() {
Expand Down Expand Up @@ -376,7 +379,9 @@ mod tests {

let mut response = super::Response::new(200, "OK", "tbr").unwrap();
// b'\xFF' as invalid UTF-8 character
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
response.reader = super::PendingReader::Reader(Box::new(Cursor::new(vec![
b'\xFF', 0xde, 0xad, 0xbe, 0xef,
])));
let http_response: Response<Vec<u8>> = response.into();

assert_eq!(
Expand Down
184 changes: 113 additions & 71 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ enum BodyType {
CloseDelimited,
}

pub(crate) enum PendingReader {
BeforeBodyStart,
Reader(Box<dyn Read + Send + Sync + 'static>),
}

/// Response instances are created as results of firing off requests.
///
/// The `Response` is used to read response headers and decide what to do with the body.
Expand Down Expand Up @@ -81,7 +86,7 @@ pub struct Response {
pub(crate) index: ResponseStatusIndex,
pub(crate) status: u16,
pub(crate) headers: Vec<Header>,
pub(crate) reader: Box<dyn Read + Send + Sync + 'static>,
pub(crate) reader: PendingReader,
/// The socket address of the server that sent the response.
pub(crate) remote_addr: SocketAddr,
/// The socket address of the client that sent the request.
Expand Down Expand Up @@ -282,7 +287,12 @@ impl Response {
/// # }
/// ```
pub fn into_reader(self) -> Box<dyn Read + Send + Sync + 'static> {
self.reader
match self.reader {
PendingReader::Reader(reader) => reader,
PendingReader::BeforeBodyStart => panic!(
"It is not valid to call into_reader before Request::stream_to_reader is called"
),
}
}

// Determine what to do with the connection after we've read the body.
Expand Down Expand Up @@ -548,39 +558,33 @@ impl Response {
})
}

/// Create a response from a Read trait impl.
///
/// This is hopefully useful for unit tests.
///
/// Example:
///
/// use std::io::Cursor;
/// Create a response from a DeadlineStream, reading and parsing only the status line, headers
/// and its following CRLF.
///
/// let text = "HTTP/1.1 401 Authorization Required\r\n\r\nPlease log in\n";
/// let read = Cursor::new(text.to_string().into_bytes());
/// let resp = ureq::Response::do_from_read(read);
///
/// assert_eq!(resp.status(), 401);
pub(crate) fn do_from_stream(stream: Stream, unit: Unit) -> Result<Response, Error> {
let remote_addr = stream.remote_addr;
/// Since this function only reads the status line, header and the following CRLF, the returned
/// Response will have an empty reader and does not take ownership of DeadlineStream.
/// To read the following data, the DeadlineStream can be read again after the call to this
/// function.
pub(crate) fn read_response_head(
stream: &mut DeadlineStream,
unit: &Unit,
) -> Result<Response, Error> {
let mut bytes_read = 0;
let remote_addr = stream.inner_ref().remote_addr;

let local_addr = match stream.socket() {
let local_addr = match stream.inner_ref().socket() {
Some(sock) => sock.local_addr().map_err(Error::from)?,
None => std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 0).into(),
};

//
// HTTP/1.1 200 OK\r\n
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);

// The status line we can ignore non-utf8 chars and parse as_str_lossy().
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
let status_line =
read_next_line(stream, "the status line", &mut bytes_read)?.into_string_lossy();
let (index, status) = parse_status_line(status_line.as_str())?;
let http_version = &status_line.as_str()[0..index.http_version];

let mut headers: Vec<Header> = Vec::new();
while headers.len() <= MAX_HEADER_COUNT {
let line = read_next_line(&mut stream, "a header")?;
let line = read_next_line(stream, "a header", &mut bytes_read)?;
if line.is_empty() {
break;
}
Expand All @@ -595,23 +599,8 @@ impl Response {
));
}

let compression =
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);

let connection_option =
Self::connection_option(http_version, get_header(&headers, "connection"));

let body_type = Self::body_type(&unit.method, status, http_version, &headers);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
}

let reader =
Self::stream_to_reader(stream, &unit, body_type, compression, connection_option);

let url = unit.url.clone();
let reader = PendingReader::BeforeBodyStart;

let response = Response {
url,
Expand All @@ -627,6 +616,54 @@ impl Response {
Ok(response)
}

/// Attach a stream to Response, for reading the body.
///
/// The response reader also uncompresses the body if it is compressed.
pub(crate) fn take_body(&mut self, stream: DeadlineStream, unit: &Unit) -> Result<(), Error> {
let compression =
get_header(&self.headers, "content-encoding").and_then(Compression::from_header_value);

let connection_option =
Self::connection_option(self.http_version(), get_header(&self.headers, "connection"));

let body_type = Self::body_type(
&unit.method,
self.status(),
self.http_version(),
&self.headers,
);

// remove Content-Encoding and length due to automatic decompression
if compression.is_some() {
self.headers
.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
}

self.reader = PendingReader::Reader(Self::stream_to_reader(
stream,
unit,
body_type,
compression,
connection_option,
));

Ok(())
}

/// Create a Response from a DeadlineStream
///
/// Parses and comsumes the header from the stream and creates a Response with
/// the stream as the reader. The response reader also uncompresses the body
/// if it is compressed.
pub(crate) fn do_from_stream(
mut stream: DeadlineStream,
unit: &Unit,
) -> Result<Response, Error> {
let mut response = Self::read_response_head(&mut stream, unit)?;
response.take_body(stream, unit)?;
Ok(response)
}

#[cfg(test)]
pub fn set_url(&mut self, url: Url) {
self.url = url;
Expand Down Expand Up @@ -766,16 +803,23 @@ impl FromStr for Response {
&request_reader,
None,
);
Self::do_from_stream(stream, unit)
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Self::do_from_stream(stream, &unit)
}
}

fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<HeaderLine> {
fn read_next_line(
reader: &mut impl BufRead,
context: &str,
running_total: &mut usize,
) -> io::Result<HeaderLine> {
let mut buf = Vec::new();
let result = reader
.take((MAX_HEADER_SIZE + 1) as u64)
.read_until(b'\n', &mut buf);

*running_total += buf.len();

match result {
Ok(0) => Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
Expand Down Expand Up @@ -1078,7 +1122,8 @@ mod tests {
const LEN: usize = MAX_HEADER_SIZE + 1;
let s = format!("Long-Header: {}\r\n", "A".repeat(LEN),);
let mut cursor = Cursor::new(s);
let result = read_next_line(&mut cursor, "some context");
let mut bytes_read = 0;
let result = read_next_line(&mut cursor, "some context", &mut bytes_read);
let err = result.expect_err("did not error on too-large header");
assert_eq!(err.kind(), io::ErrorKind::Other);
assert_eq!(
Expand Down Expand Up @@ -1117,9 +1162,9 @@ mod tests {
encoding_rs::WINDOWS_1252.encode("HTTP/1.1 302 Déplacé Temporairement\r\n");
let bytes = cow.to_vec();
let mut reader = io::BufReader::new(io::Cursor::new(bytes));
let r = read_next_line(&mut reader, "test status line");
let h = r.unwrap();
assert_eq!(h.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
let mut bytes_read = 0;
let header = read_next_line(&mut reader, "test status line", &mut bytes_read).unwrap();
assert_eq!(header.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
}

#[test]
Expand Down Expand Up @@ -1150,7 +1195,8 @@ mod tests {
&request_reader,
None,
);
let resp = Response::do_from_stream(s.into(), unit).unwrap();
let s = stream::DeadlineStream::new(s, unit.deadline);
let resp = Response::do_from_stream(s.into(), &unit).unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.header("x-geo-header"), None);
}
Expand Down Expand Up @@ -1204,18 +1250,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(&agent, PoolKey::from_parts("https", "example.com", 443)),
);
Response::do_from_stream(
stream,
Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
Response::do_from_stream(stream, unit).unwrap();
assert_eq!(agent2.state.pool.len(), 1);
}

Expand All @@ -1236,18 +1280,16 @@ mod tests {
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::none(),
);
let resp = Response::do_from_stream(
stream,
Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
),
)
.unwrap();
let unit = &Unit::new(
&agent,
"GET",
&"https://example.com/".parse().unwrap(),
vec![],
&Payload::Empty.into_read(),
None,
);
let stream = stream::DeadlineStream::new(stream, unit.deadline);
let resp = Response::do_from_stream(stream, unit).unwrap();
let body = resp.into_string().unwrap();
assert_eq!(body, "hi\n");
}
Expand Down
9 changes: 7 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ impl DeadlineStream {
pub(crate) fn inner_mut(&mut self) -> &mut Stream {
&mut self.stream
}

pub(crate) fn into_inner(self) -> Stream {
self.stream
}
}

impl From<DeadlineStream> for Stream {
Expand Down Expand Up @@ -455,8 +459,9 @@ pub(crate) fn connect_host(
let s = stream.try_clone()?;
let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
let s = Stream::new(s, remote_addr, pool_returner);
let response = Response::do_from_stream(s, unit.clone())?;
let stream = Stream::new(s, remote_addr, pool_returner);
let stream = DeadlineStream::new(stream, unit.deadline);
let response = Response::do_from_stream(stream, unit)?;
Proxy::verify_response(&response)?;
}
}
Expand Down
Loading
Loading