From 5e4ec57843fbb485b3788d7b888fd8fd36cf5f68 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Mon, 6 May 2024 18:15:25 -0700 Subject: [PATCH] re-add old code, commented out --- src/connection.rs | 700 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 700 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index 686f2ea5..f12fa2d2 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -637,6 +637,706 @@ async fn recv_nonzero(r: &mut R, buf: &mut VecRingBuffer) -> Resul Ok(()) } +/*struct LimitedRingBuffer<'a> { + inner: &'a mut VecRingBuffer, + limit: usize, +} + +impl AsRef<[u8]> for LimitedRingBuffer<'_> { + fn as_ref(&self) -> &[u8] { + let buf = Buffer::read_buf(self.inner); + let limit = cmp::min(buf.len(), self.limit); + + &buf[..limit] + } +} + +struct HttpRead<'a, R: AsyncRead> { + stream: ReadHalf<'a, R>, + buf1: &'a mut VecRingBuffer, + buf2: &'a mut VecRingBuffer, +} + +struct HttpWrite<'a, W: AsyncWrite> { + stream: WriteHalf<'a, W>, +} + +struct RequestHandler<'a, R: AsyncRead, W: AsyncWrite> { + r: HttpRead<'a, R>, + w: HttpWrite<'a, W>, +} + +impl<'a, R: AsyncRead, W: AsyncWrite> RequestHandler<'a, R, W> { + fn new( + stream: (ReadHalf<'a, R>, WriteHalf<'a, W>), + buf1: &'a mut VecRingBuffer, + buf2: &'a mut VecRingBuffer, + ) -> Self { + buf1.align(); + buf2.clear(); + + Self { + r: HttpRead { + stream: stream.0, + buf1, + buf2, + }, + w: HttpWrite { stream: stream.1 }, + } + } + + // read from stream into buf, and parse buf as a request header + async fn recv_request<'b: 'c, 'c, const N: usize>( + mut self, + mut scratch: &'b mut http1::ParseScratch, + req_mem: &'c mut Option>, + ) -> Result, Error> { + let mut protocol = http1::ServerProtocol::new(); + + assert_eq!(protocol.state(), http1::ServerState::ReceivingRequest); + + loop { + { + let hbuf = self.r.buf1.take_inner(); + + match protocol.recv_request_owned(hbuf, scratch) { + http1::ParseStatus::Complete(req) => { + assert!([ + http1::ServerState::ReceivingBody, + http1::ServerState::AwaitingResponse + ] + .contains(&protocol.state())); + + *req_mem = Some(req); + + break Ok(RequestHeader { + r: self.r, + w: self.w, + protocol, + req_mem, + }); + } + http1::ParseStatus::Incomplete((), hbuf, ret_scratch) => { + // NOTE: after polonius it may not be necessary for + // scratch to be returned + scratch = ret_scratch; + self.r.buf1.set_inner(hbuf); + } + http1::ParseStatus::Error(e, hbuf, _) => { + self.r.buf1.set_inner(hbuf); + + return Err(e.into()); + } + } + } + + if let Err(e) = recv_nonzero(&mut self.r.stream, self.r.buf1).await { + if e.kind() == io::ErrorKind::WriteZero { + return Err(Error::BufferExceeded); + } + + return Err(e.into()); + } + } + } +} + +struct RequestHeader<'a, 'b, 'c, R: AsyncRead, W: AsyncWrite, const N: usize> { + r: HttpRead<'a, R>, + w: HttpWrite<'a, W>, + protocol: http1::ServerProtocol, + req_mem: &'c mut Option>, +} + +impl<'a, 'b, 'c, R: AsyncRead, W: AsyncWrite, const N: usize> RequestHeader<'a, 'b, 'c, R, W, N> { + fn request(&self) -> http1::Request { + self.req_mem.as_ref().unwrap().get() + } + + async fn start_recv_body(mut self) -> Result, Error> { + self.handle_expect().await?; + + // restore the read ringbuffer + self.discard_request(); + + Ok(self.into_recv_body().0) + } + + async fn start_recv_body_and_keep_header( + mut self, + ) -> Result, Error> { + self.handle_expect().await?; + + // we're keeping the request, so put any remaining bytes into buf2 + // and swap the inner buffers. those bytes will then become readable + // from buf1. we'll plan to give the request's inner buffer to buf2 + // after the request is no longer needed + let req = self.req_mem.as_ref().unwrap(); + self.r.buf2.write_all(req.remaining_bytes())?; + self.r.buf1.swap_inner(self.r.buf2); + + let (recv_body, req_mem) = self.into_recv_body(); + + Ok(RequestRecvBodyKeepHeader { + inner: recv_body, + req_mem, + }) + } + + fn recv_done(mut self) -> Result, Error> { + // restore the read ringbuffer + self.discard_request(); + + Ok(RequestStartResponse::new(self.r, self.w, self.protocol)) + } + + // this method requires the request to exist + async fn handle_expect(&mut self) -> Result<(), Error> { + if !self.request().expect_100 { + return Ok(()); + } + + let mut cont = [0; 32]; + + let cont = { + let mut c = io::Cursor::new(&mut cont[..]); + + if let Err(e) = self.protocol.send_100_continue(&mut c) { + return Err(e.into()); + } + + let size = c.position() as usize; + + &cont[..size] + }; + + let mut left = cont.len(); + + while left > 0 { + let pos = cont.len() - left; + + let size = match self.w.stream.write(&cont[pos..]).await { + Ok(size) => size, + Err(e) => return Err(e.into()), + }; + + left -= size; + } + + Ok(()) + } + + // consumes request and gives the inner buffer back to buf1 + fn discard_request(&mut self) { + let req = self.req_mem.take().unwrap(); + + let remaining_len = req.remaining_bytes().len(); + let inner_buf = req.into_buf(); + let hsize = inner_buf.filled_len() - remaining_len; + + self.r.buf1.set_inner(inner_buf); + self.r.buf1.read_commit(hsize); + } + + fn into_recv_body( + self, + ) -> ( + RequestRecvBody<'a, R, W>, + &'c mut Option>, + ) { + ( + RequestRecvBody { + r: RefCell::new(RecvBodyRead { + stream: self.r.stream, + buf: self.r.buf1, + }), + wstream: self.w.stream, + buf2: self.r.buf2, + protocol: RefCell::new(self.protocol), + }, + self.req_mem, + ) + } +} + +struct RecvBodyRead<'a, R: AsyncRead> { + stream: ReadHalf<'a, R>, + buf: &'a mut VecRingBuffer, +} + +struct RequestRecvBody<'a, R: AsyncRead, W: AsyncWrite> { + r: RefCell>, + wstream: WriteHalf<'a, W>, + buf2: &'a mut VecRingBuffer, + protocol: RefCell, +} + +impl<'a, R: AsyncRead, W: AsyncWrite> RequestRecvBody<'a, R, W> { + fn more(&self) -> bool { + self.protocol.borrow().state() == http1::ServerState::ReceivingBody + } + + #[allow(clippy::await_holding_refcell_ref)] + async fn add_to_recv_buffer(&self) -> Result<(), Error> { + let r = &mut *self.r.borrow_mut(); + + if let Err(e) = recv_nonzero(&mut r.stream, r.buf).await { + if e.kind() == io::ErrorKind::WriteZero { + return Err(Error::BufferExceeded); + } + + return Err(e.into()); + } + + Ok(()) + } + + fn try_recv_body(&self, dest: &mut [u8]) -> Option> { + let r = &mut *self.r.borrow_mut(); + let protocol = &mut *self.protocol.borrow_mut(); + + if protocol.state() == http1::ServerState::ReceivingBody { + loop { + let (size, read_size) = { + let mut buf = io::Cursor::new(Buffer::read_buf(r.buf)); + + let mut headers = [httparse::EMPTY_HEADER; HEADERS_MAX]; + + let (size, _) = match protocol.recv_body(&mut buf, dest, &mut headers) { + Ok(ret) => ret, + Err(e) => return Some(Err(e.into())), + }; + + let read_size = buf.position() as usize; + + (size, read_size) + }; + + if protocol.state() == http1::ServerState::ReceivingBody && read_size == 0 { + if !r.buf.is_readable_contiguous() { + r.buf.align(); + continue; + } + + return None; + } + + r.buf.read_commit(read_size); + + return Some(Ok(size)); + } + } + + assert_eq!(protocol.state(), http1::ServerState::AwaitingResponse); + + Some(Ok(0)) + } + + async fn recv_body(&self, dest: &mut [u8]) -> Result { + loop { + if let Some(ret) = self.try_recv_body(dest) { + return ret; + } + + self.add_to_recv_buffer().await?; + } + } + + fn recv_done(self) -> RequestStartResponse<'a, R, W> { + let r = self.r.into_inner(); + + RequestStartResponse::new( + HttpRead { + stream: r.stream, + buf1: r.buf, + buf2: self.buf2, + }, + HttpWrite { + stream: self.wstream, + }, + self.protocol.into_inner(), + ) + } +} + +struct RequestRecvBodyKeepHeader<'a, 'b, 'c, R: AsyncRead, W: AsyncWrite, const N: usize> { + inner: RequestRecvBody<'a, R, W>, + req_mem: &'c mut Option>, +} + +impl<'a, 'b, 'c, R: AsyncRead, W: AsyncWrite, const N: usize> + RequestRecvBodyKeepHeader<'a, 'b, 'c, R, W, N> +{ + fn request(&self) -> http1::Request { + self.req_mem.as_ref().unwrap().get() + } + + async fn recv_body(&self, dest: &mut [u8]) -> Result { + self.inner.recv_body(dest).await + } + + fn recv_done(self) -> RequestStartResponse<'a, R, W> { + // the request is no longer needed, so give its inner buffer to buf2 + // and clear it + let buf = self.req_mem.take().unwrap().into_buf(); + self.inner.buf2.set_inner(buf); + self.inner.buf2.clear(); + + self.inner.recv_done() + } +} + +struct RequestStartResponse<'a, R: AsyncRead, W: AsyncWrite> { + r: HttpRead<'a, R>, + w: HttpWrite<'a, W>, + protocol: http1::ServerProtocol, +} + +impl<'a, R: AsyncRead, W: AsyncWrite> RequestStartResponse<'a, R, W> { + fn new(r: HttpRead<'a, R>, w: HttpWrite<'a, W>, protocol: http1::ServerProtocol) -> Self { + Self { r, w, protocol } + } + + async fn fill_recv_buffer(&mut self) -> Error { + loop { + if let Err(e) = recv_nonzero(&mut self.r.stream, self.r.buf1).await { + if e.kind() == io::ErrorKind::WriteZero { + // if there's no more space, suspend forever + std::future::pending::<()>().await; + } + + return e.into(); + } + } + } + + fn prepare_response( + mut self, + code: u16, + reason: &str, + headers: &[http1::Header<'_>], + body_size: http1::BodySize, + ) -> Result, Error> { + self.r.buf2.clear(); + + let mut hbuf = io::Cursor::new(self.r.buf2.write_buf()); + + if let Err(e) = self + .protocol + .send_response(&mut hbuf, code, reason, headers, body_size) + { + return Err(e.into()); + } + + let size = hbuf.position() as usize; + self.r.buf2.write_commit(size); + + let (stream, buf1, buf2) = ((self.r.stream, self.w.stream), self.r.buf1, self.r.buf2); + + Ok(RequestSendHeader::new( + stream, + buf1, + buf2, + self.protocol, + size, + )) + } +} + +struct SendHeaderRead<'a, R: AsyncRead> { + stream: ReadHalf<'a, R>, + buf: &'a mut VecRingBuffer, +} + +struct EarlyBody { + overflow: Option, + done: bool, +} + +struct RequestSendHeader<'a, R: AsyncRead, W: AsyncWrite> { + r: RefCell>, + wstream: RefCell>, + wbuf: RefCell>, + protocol: http1::ServerProtocol, + early_body: RefCell, +} + +impl<'a, R: AsyncRead, W: AsyncWrite> RequestSendHeader<'a, R, W> { + fn new( + stream: (ReadHalf<'a, R>, WriteHalf<'a, W>), + buf1: &'a mut VecRingBuffer, + buf2: &'a mut VecRingBuffer, + protocol: http1::ServerProtocol, + header_size: usize, + ) -> Self { + Self { + r: RefCell::new(SendHeaderRead { + stream: stream.0, + buf: buf1, + }), + wstream: RefCell::new(stream.1), + wbuf: RefCell::new(LimitedRingBuffer { + inner: buf2, + limit: header_size, + }), + protocol, + early_body: RefCell::new(EarlyBody { + overflow: None, + done: false, + }), + } + } + + #[allow(clippy::await_holding_refcell_ref)] + async fn send_header(&self) -> Result<(), Error> { + let mut stream = self.wstream.borrow_mut(); + + // limit = header bytes left + while self.wbuf.borrow().limit > 0 { + let size = stream.write_shared(&self.wbuf).await?; + + let mut wbuf = self.wbuf.borrow_mut(); + + wbuf.inner.read_commit(size); + wbuf.limit -= size; + } + + let mut wbuf = self.wbuf.borrow_mut(); + let mut early_body = self.early_body.borrow_mut(); + + if let Some(overflow) = &mut early_body.overflow { + wbuf.inner.write_all(Buffer::read_buf(overflow))?; + + early_body.overflow = None; + } + + Ok(()) + } + + fn append_body(&self, body: &[u8], more: bool, id: &str) -> Result<(), Error> { + let mut wbuf = self.wbuf.borrow_mut(); + let mut early_body = self.early_body.borrow_mut(); + + // limit = header bytes left + if wbuf.limit > 0 { + // if there are still header bytes in the buffer, then we may + // need to overflow into a separate buffer if there's not enough + // room + + // workaround for rust 1.77 + #[allow(clippy::unused_io_amount)] + let accepted = if early_body.overflow.is_none() { + wbuf.inner.write(body)? + } else { + 0 + }; + + if accepted < body.len() { + debug!( + "server-conn {}: overflowing {} bytes", + id, + body.len() - accepted + ); + + if early_body.overflow.is_none() { + // only allow overflowing as much as there are header + // bytes left + early_body.overflow = Some(ContiguousBuffer::new(wbuf.limit)); + } + + let overflow = early_body.overflow.as_mut().unwrap(); + + overflow.write_all(&body[accepted..])?; + } + } else { + // if the header has been fully cleared from the buffer, then + // always write directly to the buffer + wbuf.inner.write_all(body)?; + } + + early_body.done = !more; + + Ok(()) + } + + fn send_header_done(self) -> RequestSendBody<'a, R, W> { + let r = self.r.into_inner(); + let wstream = self.wstream.into_inner(); + let wbuf = self.wbuf.into_inner(); + let early_body = self.early_body.borrow(); + + assert_eq!(wbuf.limit, 0); + assert!(early_body.overflow.is_none()); + + let (stream, buf1, buf2) = { ((r.stream, wstream), r.buf, wbuf.inner) }; + + let block_size = buf2.capacity(); + + RequestSendBody { + r: RefCell::new(HttpSendBodyRead { + stream: stream.0, + buf: buf1, + }), + w: RefCell::new(HttpSendBodyWrite { + stream: stream.1, + buf: buf2, + body_done: early_body.done, + block_size, + }), + protocol: RefCell::new(self.protocol), + } + } +} + +struct HttpSendBodyRead<'a, R: AsyncRead> { + stream: ReadHalf<'a, R>, + buf: &'a mut VecRingBuffer, +} + +struct HttpSendBodyWrite<'a, W: AsyncWrite> { + stream: WriteHalf<'a, W>, + buf: &'a mut VecRingBuffer, + body_done: bool, + block_size: usize, +} + +struct SendBodyFuture<'a, 'b, W: AsyncWrite> { + w: &'a RefCell>, + protocol: &'a RefCell, +} + +impl<'a, 'b, W: AsyncWrite> Future for SendBodyFuture<'a, 'b, W> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let f = &*self; + + let w = &mut *f.w.borrow_mut(); + + let stream = &mut w.stream; + + if !stream.is_writable() { + return Poll::Pending; + } + + let protocol = &mut *f.protocol.borrow_mut(); + + let mut buf_arr = [&b""[..]; VECTORED_MAX - 2]; + let bufs = w.buf.read_bufs(&mut buf_arr); + + match protocol.send_body( + &mut StdWriteWrapper::new(Pin::new(&mut w.stream), cx), + bufs, + w.body_done, + None, + ) { + Ok(size) => Poll::Ready(Ok(size)), + Err(http1::Error::Io(e)) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + Err(e) => Poll::Ready(Err(e.into())), + } + } +} + +impl Drop for SendBodyFuture<'_, '_, W> { + fn drop(&mut self) { + self.w.borrow_mut().stream.cancel(); + } +} + +struct RequestSendBody<'a, R: AsyncRead, W: AsyncWrite> { + r: RefCell>, + w: RefCell>, + protocol: RefCell, +} + +impl<'a, R: AsyncRead, W: AsyncWrite> RequestSendBody<'a, R, W> { + fn append_body(&self, body: &[u8], more: bool) -> Result<(), Error> { + let w = &mut *self.w.borrow_mut(); + + w.buf.write_all(body)?; + w.body_done = !more; + + Ok(()) + } + + fn expand_write_buffer(&self, blocks_max: usize, blocks_avail: &Counter) -> usize { + let w = &mut *self.w.borrow_mut(); + + resize_write_buffer_if_full(w.buf, w.block_size, blocks_max, blocks_avail) + } + + fn can_flush(&self) -> bool { + let w = &*self.w.borrow(); + + w.buf.len() > 0 || w.body_done + } + + async fn flush_body(&self) -> Result<(usize, bool), Error> { + { + let protocol = &*self.protocol.borrow(); + + assert_eq!(protocol.state(), http1::ServerState::SendingBody); + + let w = &*self.w.borrow(); + + if w.buf.len() == 0 && !w.body_done { + return Ok((0, false)); + } + } + + let size = SendBodyFuture { + w: &self.w, + protocol: &self.protocol, + } + .await?; + + let w = &mut *self.w.borrow_mut(); + let protocol = &*self.protocol.borrow(); + + w.buf.read_commit(size); + + if w.buf.len() > 0 || !w.body_done || protocol.state() == http1::ServerState::SendingBody { + return Ok((size, false)); + } + + assert_eq!(protocol.state(), http1::ServerState::Finished); + + Ok((size, true)) + } + + #[allow(clippy::await_holding_refcell_ref)] + async fn send_body(&self, body: &[u8], more: bool) -> Result { + let w = &mut *self.w.borrow_mut(); + let protocol = &mut *self.protocol.borrow_mut(); + + assert_eq!(protocol.state(), http1::ServerState::SendingBody); + + Ok(protocol + .send_body_async(&mut w.stream, &[body], !more, None) + .await?) + } + + #[allow(clippy::await_holding_refcell_ref)] + async fn fill_recv_buffer(&self) -> Error { + let r = &mut *self.r.borrow_mut(); + + loop { + if let Err(e) = recv_nonzero(&mut r.stream, r.buf).await { + if e.kind() == io::ErrorKind::WriteZero { + // if there's no more space, suspend forever + std::future::pending::<()>().await; + } + + return e.into(); + } + } + } + + fn finish(self) -> bool { + self.protocol.borrow().is_persistent() + } +}*/ + struct WebSocketRead<'a, R: AsyncRead> { stream: ReadHalf<'a, R>, buf: &'a mut VecRingBuffer,