Skip to content

Commit

Permalink
refactor: some type aliases for clippy
Browse files Browse the repository at this point in the history
also update comments
  • Loading branch information
rklaehn committed Jan 30, 2023
1 parent d44f5c7 commit c1f7c9c
Showing 1 changed file with 60 additions and 68 deletions.
128 changes: 60 additions & 68 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,22 +486,19 @@ mod tokio_io {
loop {
match self.state.read_next() {
NextRead::Done => {
// This is EOF. We know the internal buffer is empty,
// because we checked it before this loop.
// This is EOF.
break Poll::Ready(Ok(()));
}
NextRead::Header => {
// ensure reading state, reading 8 bytes
// we might already be in the reading state,
// read 8 bytes
// header comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(8, cx))?;
self.state.feed_header(self.buf[0..8].try_into().unwrap());
// we don't want to write the header, so we are done with the buffer contents
self.clear_buf();
}
NextRead::Parent => {
// ensure reading state, reading 64 bytes
// we might already be in the reading state,
// read 64 bytes
// parent comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(64, cx))?;
self.state
Expand All @@ -515,11 +512,7 @@ mod tokio_io {
skip,
index,
} => {
// todo: add direct output optimization

// ensure reading state, reading size bytes
// we might already be in the reading state,
// so we must not set buf_start to 0
// read size bytes
// chunk never comes from outboard
ready!(self.poll_fill_buffer_from_input(size, cx))?;

Expand Down Expand Up @@ -557,8 +550,7 @@ mod tokio_io {
) -> Poll<io::Result<bool>> {
Poll::Ready(Ok(match next {
NextRead::Header => {
// ensure reading state, reading 8 bytes
// we might already be in the reading state,
// read 8 bytes
// header comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(8, cx))?;
self.state.feed_header(self.buf[0..8].try_into().unwrap());
Expand All @@ -568,8 +560,7 @@ mod tokio_io {
false
}
NextRead::Parent => {
// ensure reading state, reading 64 bytes
// we might already be in the reading state,
// read 64 bytes
// parent comes from outboard if we have one, otherwise from input
ready!(self.poll_fill_buffer_from_input_or_outboard(64, cx))?;
self.state
Expand All @@ -585,8 +576,7 @@ mod tokio_io {
skip: _,
index,
} => {
// ensure reading state, reading size bytes
// we might already be in the reading state,
// read size bytes
// chunk never comes from outboard
ready!(self.poll_fill_buffer_from_input(size, cx))?;

Expand Down Expand Up @@ -649,40 +639,27 @@ mod tokio_io {
}
}

#[derive(Debug)]
/// type alias to make clippy happy
type BoxedDecoderShared<T, O> = Pin<Box<DecoderShared<T, O>>>;

/// state of the decoder
///
/// This is the decoder, but it is a separate type so it can be private.
/// The public AsyncDecoder just wraps this.
enum DecoderState<T: AsyncRead + Unpin, O: AsyncRead + Unpin> {
/// we are reading from the underlying reader
Reading(Box<DecoderShared<T, O>>),
Reading(BoxedDecoderShared<T, O>),
/// we are being polled for output
Output(Box<DecoderShared<T, O>>),
/// we are done
Done,
Output(BoxedDecoderShared<T, O>),
/// invalid state
Invalid,
}

impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> DecoderState<T, O> {
fn take(&mut self) -> Self {
std::mem::replace(self, DecoderState::Done)
std::mem::replace(self, DecoderState::Invalid)
}
}

#[derive(Debug)]
pub struct AsyncDecoder<T: AsyncRead + Unpin, O: AsyncRead + Unpin>(DecoderState<T, O>);

impl<T: AsyncRead + Unpin> AsyncDecoder<T, T> {
pub fn new(inner: T, hash: &Hash) -> Self {
let state = DecoderShared::new(inner, None, hash);
Self(DecoderState::Reading(Box::new(state)))
}
}

impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> AsyncDecoder<T, O> {
pub fn new_outboard(inner: T, outboard: O, hash: &Hash) -> Self {
let state = DecoderShared::new(inner, Some(outboard), hash);
Self(DecoderState::Reading(Box::new(state)))
}
}

impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> AsyncRead for AsyncDecoder<T, O> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -694,51 +671,66 @@ mod tokio_io {
}

loop {
match self.0.take() {
DecoderState::Reading(mut shared) => {
match shared.poll_input(cx) {
Poll::Ready(Ok(())) => {
// we have read a chunk from the underlying reader
// go to output state
self.0 = DecoderState::Output(shared);
continue;
}
Poll::Ready(Err(e)) => {
// we got an error from the underlying io
// stay in reading state
self.0 = DecoderState::Reading(shared);
break Poll::Ready(Err(e));
}
Poll::Pending => {
// we don't have a complete chunk yet
// stay in reading state
self.0 = DecoderState::Output(shared);
break Poll::Pending;
}
match self.take() {
Self::Reading(mut shared) => {
let res = shared.poll_input(cx);
if let Poll::Ready(Ok(())) = res {
// we have read a chunk from the underlying reader
// go to output state
*self = Self::Output(shared);
continue;
}
*self = Self::Reading(shared);
break res;
}
DecoderState::Output(mut shared) => {
Self::Output(mut shared) => {
shared.write_output(buf);
if shared.buf_len() == 0 {
*self = if shared.buf_len() == 0 {
// the caller has consumed all the data in the buffer
// go to reading state
shared.clear_buf();
self.0 = DecoderState::Reading(shared);
Self::Reading(shared)
} else {
// we still have data in the buffer
// stay in output state
self.0 = DecoderState::Output(shared);
Self::Output(shared)
};
break Poll::Ready(Ok(()));
}
DecoderState::Done => {
DecoderState::Invalid => {
break Poll::Ready(Ok(()));
}
}
}
}
}

pub struct AsyncDecoder<T: AsyncRead + Unpin, O: AsyncRead + Unpin>(DecoderState<T, O>);

impl<T: AsyncRead + Unpin> AsyncDecoder<T, T> {
pub fn new(inner: T, hash: &Hash) -> Self {
let state = DecoderShared::new(inner, None, hash);
Self(DecoderState::Reading(Box::pin(state)))
}
}

impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> AsyncDecoder<T, O> {
pub fn new_outboard(inner: T, outboard: O, hash: &Hash) -> Self {
let state = DecoderShared::new(inner, Some(outboard), hash);
Self(DecoderState::Reading(Box::pin(state)))
}
}

impl<T: AsyncRead + Unpin, O: AsyncRead + Unpin> AsyncRead for AsyncDecoder<T, O> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

pub struct SliceDecoderInner<T: AsyncRead + Unpin> {
shared: DecoderShared<T, T>,
slice_start: u64,
Expand Down

0 comments on commit c1f7c9c

Please sign in to comment.