Skip to content

Commit

Permalink
fix(release 0.3.0: rewrite FsBlockStore.list; crate compiles as depen…
Browse files Browse the repository at this point in the history
…dency again

See issue rs-ipfs#458
  • Loading branch information
Christian7573 committed Jul 16, 2021
1 parent 4bce467 commit 7a0c93b
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions src/repo/fs/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,43 +355,42 @@ impl BlockStore for FsBlockStore {
}

async fn list(&self) -> Result<Vec<Cid>, Error> {
use futures::future::{ready, Either};
use futures::stream::{empty, TryStreamExt};
use futures::future::Either;
use futures::stream::{empty, StreamExt};
use tokio_stream::wrappers::ReadDirStream;

let span = tracing::trace_span!("listing blocks");

async move {
let stream = ReadDirStream::new(fs::read_dir(self.path.clone()).await?);
let mut stream = ReadDirStream::new(fs::read_dir(self.path.clone()).await?);

// FIXME: written as a stream to make the Vec be BoxStream<'static, Cid>
let vec = stream
.and_then(|d| async move {
// map over the shard directories
Ok(if d.file_type().await?.is_dir() {
Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?))
} else {
Either::Right(empty())
})
})
// flatten each
.try_flatten()
// convert the paths ending in ".data" into cid
.try_filter_map(|d| {
let mut folders_to_list: Vec<Either<ReadDirStream, futures::stream::Empty<std::io::Result<tokio::fs::DirEntry>>>> = Vec::new();
while let Some(d) = stream.next().await {
let d = d?;
let either = if d.file_type().await?.is_dir() {
Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?))
} else {
Either::Right(empty())
};
folders_to_list.push(either);
}
let mut cids: Vec<Cid> = Vec::new();
for mut folder in folders_to_list {
while let Some(d) = folder.next().await {
let d = d?;
let name = d.file_name();
let path: &std::path::Path = name.as_ref();

ready(if path.extension() != Some("data".as_ref()) {
Ok(None)
} else {
let maybe_cid = filestem_to_block_cid(path.file_stem());
Ok(maybe_cid)
})
})
.try_collect::<Vec<_>>()
.await?;
if path.extension() == Some("data".as_ref()) {
if let Some(maybe_cid) = filestem_to_block_cid(path.file_stem()) {
cids.push(maybe_cid);
}
}
}
}

Ok(vec)
Ok(cids)
}
.instrument(span)
.await
Expand Down

0 comments on commit 7a0c93b

Please sign in to comment.