diff --git a/src/repo/fs/blocks.rs b/src/repo/fs/blocks.rs index eadc9c9f9..f0ab246d6 100644 --- a/src/repo/fs/blocks.rs +++ b/src/repo/fs/blocks.rs @@ -355,43 +355,42 @@ impl BlockStore for FsBlockStore { } async fn list(&self) -> Result, Error> { - use futures::future::{ready, Either}; - use futures::stream::{empty, TryStreamExt}; + use futures::future::Either; + use futures::stream::{empty, StreamExt}; use tokio_stream::wrappers::ReadDirStream; let span = tracing::trace_span!("listing blocks"); async move { - let stream = ReadDirStream::new(fs::read_dir(self.path.clone()).await?); + let mut stream = ReadDirStream::new(fs::read_dir(self.path.clone()).await?); // FIXME: written as a stream to make the Vec be BoxStream<'static, Cid> - let vec = stream - .and_then(|d| async move { - // map over the shard directories - Ok(if d.file_type().await?.is_dir() { - Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?)) - } else { - Either::Right(empty()) - }) - }) - // flatten each - .try_flatten() - // convert the paths ending in ".data" into cid - .try_filter_map(|d| { + let mut folders_to_list: Vec>>> = Vec::new(); + while let Some(d) = stream.next().await { + let d = d?; + let either = if d.file_type().await?.is_dir() { + Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?)) + } else { + Either::Right(empty()) + }; + folders_to_list.push(either); + } + let mut cids: Vec = Vec::new(); + for mut folder in folders_to_list { + while let Some(d) = folder.next().await { + let d = d?; let name = d.file_name(); let path: &std::path::Path = name.as_ref(); - ready(if path.extension() != Some("data".as_ref()) { - Ok(None) - } else { - let maybe_cid = filestem_to_block_cid(path.file_stem()); - Ok(maybe_cid) - }) - }) - .try_collect::>() - .await?; + if path.extension() == Some("data".as_ref()) { + if let Some(maybe_cid) = filestem_to_block_cid(path.file_stem()) { + cids.push(maybe_cid); + } + } + } + } - Ok(vec) + Ok(cids) } .instrument(span) .await