Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
Merge #453
Browse files Browse the repository at this point in the history
453: Fix multinode test spans r=koivunej a=koivunej

This PR changes how the spans are set up from IpfsOptions and in UninitializedIpfs. While debugging the now ignored test (see #450) I found that the spans were configured wrong and thus none of the multiple nodes created by `spawn_nodes` could be differentiated.

This also renames the spans to more logical from the "$root" given at `IpfsOptions::span`:

`Span::current` => `$root:init`
`Span::current` => `$root:init:swarm`
`$root:swarm` => `$root:exec` (literally the executor libp2p spawns futures through)
`$root` => `$root:bg_task` => `$root:swarm` (background task)
`$root` => `$root:facade` (futures created through Ipfs::* methods)

Still very far from perfect, but perhaps a step into better direction.

Co-authored-by: Joonas Koivunen <[email protected]>
  • Loading branch information
bors[bot] and koivunej authored Jan 27, 2021
2 parents a89b0c9 + 329587d commit cf37032
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* feat(http): create `Profile` abstraction [#421]
* feat: `sled` pinstore [#439], [#442], [#444]
* chore: update a lot of dependencies including libp2p, tokio, warp [#446]
* fix: rename spans (part of [#453])

[#429]: https://github.com/rs-ipfs/rust-ipfs/pull/429
[#428]: https://github.com/rs-ipfs/rust-ipfs/pull/428
Expand All @@ -15,6 +16,7 @@
[#442]: https://github.com/rs-ipfs/rust-ipfs/pull/442
[#444]: https://github.com/rs-ipfs/rust-ipfs/pull/444
[#446]: https://github.com/rs-ipfs/rust-ipfs/pull/446
[#453]: https://github.com/rs-ipfs/rust-ipfs/pull/453

# 0.2.1

Expand Down
50 changes: 35 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {

/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
/// future should be spawned on a executor as soon as possible.
///
/// The future returned from this method should not need
/// (instrumenting)[`tracing_futures::Instrument::instrument`] as the [`IpfsOptions::span`]
/// will be used as parent span for all of the awaited and created futures.
pub async fn start(self) -> Result<(Ipfs<Types>, impl Future<Output = ()>), Error> {
use futures::stream::StreamExt;

Expand All @@ -350,16 +354,28 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
mut options,
} = self;

repo.init().await?;

let (to_task, receiver) = channel::<IpfsEvent>(1);

let facade_span = options
let root_span = options
.span
.take()
.unwrap_or_else(|| tracing::trace_span!("ipfs"));
// not sure what would be the best practice with tracing and spans
.unwrap_or_else(|| tracing::trace_span!(parent: &Span::current(), "ipfs"));

// the "current" span which is not entered but the awaited futures are instrumented with it
let init_span = tracing::trace_span!(parent: &root_span, "init");

// stored in the Ipfs, instrumenting every method call
let facade_span = tracing::trace_span!("facade");

let swarm_span = tracing::trace_span!(parent: facade_span.clone(), "swarm");
// stored in the executor given to libp2p, used to spawn at least the connections,
// instrumenting each of those.
let exec_span = tracing::trace_span!(parent: &root_span, "exec");

// instruments the IpfsFuture, the background task.
let swarm_span = tracing::trace_span!(parent: &root_span, "swarm");

repo.init().instrument(init_span.clone()).await?;

let (to_task, receiver) = channel::<IpfsEvent>(1);

let ipfs = Ipfs {
span: facade_span,
Expand All @@ -368,8 +384,12 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
to_task,
};

// FIXME: mutating options above is an unfortunate side-effect of this call, which could be
// reordered for less error prone code.
let swarm_options = SwarmOptions::from(&options);
let swarm = create_swarm(swarm_options, swarm_span, repo).await?;
let swarm = create_swarm(swarm_options, exec_span, repo)
.instrument(tracing::trace_span!(parent: &init_span, "swarm"))
.await?;

let IpfsOptions {
listening_addrs, ..
Expand All @@ -386,7 +406,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
fut.start_add_listener_address(addr, None);
}

Ok((ipfs, fut))
Ok((ipfs, fut.instrument(swarm_span)))
}
}

Expand Down Expand Up @@ -1663,12 +1683,12 @@ mod node {
pub async fn with_options(opts: IpfsOptions) -> Self {
let id = opts.keypair.public().into_peer_id();

let (ipfs, fut): (Ipfs<TestTypes>, _) = UninitializedIpfs::new(opts)
.start()
.in_current_span()
.await
.unwrap();
let bg_task = tokio::task::spawn(fut.in_current_span());
// for future: assume UninitializedIpfs handles instrumenting any futures with the
// given span

let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::new(opts).start().await.unwrap();
let bg_task = tokio::task::spawn(fut);
let addrs = ipfs.identity().await.unwrap().1;

Node {
Expand Down
4 changes: 2 additions & 2 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl From<&IpfsOptions> for SwarmOptions {
/// Creates a new IPFS swarm.
pub async fn create_swarm<TIpfsTypes: IpfsTypes>(
options: SwarmOptions,
swarm_span: Span,
span: Span,
repo: Arc<Repo<TIpfsTypes>>,
) -> io::Result<TSwarm<TIpfsTypes>> {
let peer_id = options.peer_id;
Expand All @@ -68,7 +68,7 @@ pub async fn create_swarm<TIpfsTypes: IpfsTypes>(

// Create a Swarm
let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(SpannedExecutor(swarm_span)))
.executor(Box::new(SpannedExecutor(span)))
.build();

Ok(swarm)
Expand Down
3 changes: 1 addition & 2 deletions src/repo/fs/pinstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::fs;
use tokio::sync::Semaphore;
use tokio_stream::{empty, wrappers::ReadDirStream, StreamExt};
use tokio_util::either::Either;
use tracing_futures::Instrument;

// PinStore is a trait from ipfs::repo implemented on FsDataStore defined at ipfs::repo::fs or
// parent module.
Expand Down Expand Up @@ -318,7 +317,7 @@ impl PinStore for FsDataStore {
}
};

Box::pin(st.in_current_span())
Box::pin(st)
}

async fn query(
Expand Down

0 comments on commit cf37032

Please sign in to comment.