Skip to content

Commit

Permalink
connection: send error responses
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed May 15, 2024
1 parent 55ac666 commit f0e507d
Showing 1 changed file with 114 additions and 2 deletions.
116 changes: 114 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,108 @@ where
}
}

async fn send_error_response<'a, R: AsyncRead, W: AsyncWrite>(
mut resp: server::Response<'_, R, W>,
zreceiver: &TrackedAsyncLocalReceiver<'_, (arena::Rc<zhttppacket::OwnedResponse>, usize)>,
e: &Error,
) -> Result<(), Error> {
let headers = &[http1::Header {
name: "Content-Type",
value: b"text/plain",
}];

let mut body: ArrayVec<u8, 512> = ArrayVec::new();

let code = match e {
Error::CoreHttp(CoreHttpError::Http(e)) => {
writeln!(&mut body, "Failed to parse request: {}", e)?;

400
}
Error::CoreHttp(CoreHttpError::RequestTooLarge(limit)) => {
writeln!(
&mut body,
"Request header size exceeded limit of {} bytes.",
limit
)?;

400
}
Error::CoreHttp(CoreHttpError::ResponseTooLarge(limit)) => {
writeln!(
&mut body,
"Response header size exceeded limit of {} bytes.",
limit
)?;

500
}
Error::ReqModeWebSocket => {
writeln!(&mut body, "WebSockets not supported on req mode interface.")?;

400
}
Error::InvalidWebSocketRequest => {
writeln!(&mut body, "Request contained an Upgrade header with value \"websocket\" but the request was not a valid WebSocket request.")?;

400
}
Error::WebSocketRejectionTooLarge(limit) => {
writeln!(
&mut body,
"Non-101 response body size exceeded limit of {} bytes.",
limit
)?;

500
}
_ => {
writeln!(&mut body, "Failed to process request.")?;

500
}
};

let reason = match code {
400 => "Bad Request",
_ => "Internal Server Error",
};

let mut state = server::ResponseState::default();
let (header, prepare_body) = resp.prepare_header(
code,
reason,
headers,
http1::BodySize::Known(body.len()),
&mut state,
)?;

// ABR: discard_while
let header_sent = discard_while(zreceiver, pin!(header.send())).await?;

let resp_body = header_sent.start_body(prepare_body);
resp_body.prepare(&body, true)?;

loop {
// send the buffer
let send = pin!(async {
match resp_body.send().await {
SendStatus::Complete(finished) => Ok(Some(finished)),
SendStatus::EarlyResponse(_) => unreachable!(), // for requests only
SendStatus::Partial((), _) => Ok(None),
SendStatus::Error((), e) => Err(e),
}
});

// ABR: discard_while
if let Some(_finished) = discard_while(zreceiver, send).await? {
break;
}
}

Ok(())
}

// read request body and prepare outgoing zmq message
#[allow(clippy::too_many_arguments)]
async fn server_req_read_body<R: AsyncRead, W: AsyncWrite>(
Expand Down Expand Up @@ -1606,7 +1708,12 @@ async fn server_req_handler<S: AsyncRead + AsyncWrite>(
{
Ok(Some(ret)) => ret,
Ok(None) => return Ok(false), // no request
Err(e) => return Err(e),
Err(e) => {
// on error, resp is not consumed, so we can use it
send_error_response(resp.take().unwrap(), zreceiver, &e).await?;

return Err(e);
}
};

assert!(resp.is_none());
Expand Down Expand Up @@ -3699,7 +3806,12 @@ where
{
Ok(Some(ret)) => ret,
Ok(None) => return Ok(false), // no request
Err(e) => return Err(e),
Err(e) => {
// on error, resp is not consumed, so we can use it
send_error_response(resp.take().unwrap(), zreceiver, &e).await?;

return Err(e);
}
};

assert!(resp.is_none());
Expand Down

0 comments on commit f0e507d

Please sign in to comment.