Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve peer connection handling #526

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -85,6 +86,8 @@ type impl struct {

// inbound messages from the network are forwarded to the receiver
receivers []Receiver

cancel context.CancelFunc
}

type streamMessageSender struct {
Expand Down Expand Up @@ -349,15 +352,96 @@ func (bsnet *impl) Start(r ...Receiver) {
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
log.Debugf("setting up handler for protocol: %s", proto)
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
}

// try to subscribe to libp2p events that indicate a change in connection state
// if this fails, continue as normal
err := bsnet.trySubscribePeerUpdates()
if err != nil {
log.Errorf("failed to subscribe to libp2p events: %s", err)
}

// listen for disconnects and start processing the events
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
bsnet.connectEvtMgr.Start()
}

func (bsnet *impl) Stop() {
bsnet.connectEvtMgr.Stop()
bsnet.host.Network().StopNotify((*netNotifiee)(bsnet))
bsnet.cancel()
}

func (bsnet *impl) trySubscribePeerUpdates() error {
// first, subscribe to libp2p events that indicate a change in connection state
sub, err := bsnet.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerProtocolsUpdated{},
&event.EvtPeerIdentificationCompleted{},
})
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
bsnet.cancel = cancel

go bsnet.peerUpdatedSubscription(ctx, sub)

// next, add any peers with existing connections that support bitswap protocols
for _, conn := range bsnet.host.Network().Conns() {
peerID := conn.RemotePeer()
if bsnet.peerSupportsBitswap(peerID) {
log.Debugf("connecting to existing peer: %s", peerID)
bsnet.connectEvtMgr.Connected(peerID)
}
}

return nil
}

func (bsnet *impl) peerUpdatedSubscription(ctx context.Context, sub event.Subscription) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will race with the "old" version of notifications. Instead, we need to:

  1. Drop the old version (the net notifiee).
  2. Subscribe to EvtPeerConnectednessChanged to get connectivity changes.
  3. We care about "disconnected" events, but ignore "connected" events. Instead, rely on "protocol changed" and "identify" notifications to tell you that the peer supports bitswap (but check this with @Jorropo).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, but once you do this, you also have to move the existing connection walk to the top of this function. That is, you need to:

  1. Subscribe to connect/disconnect events.
  2. Process all existing connections.
  3. Start processing the events.

Failure modes:

  • If you do 2 before 1, you could miss new connections.
  • If you do 3 concurrent with 2, you could process a connect/disconnect event in step-3, then re-add the peer in the step-2 loop.

If you do them in order (1,2,3), you may end up processing a "connect" event for a disconnected peer in step 3, but then you'll process the disconnect event.

You could additionally check the peer's "connectedness" before updating a peer's status, but I don't think that is necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Although, I think we need to ask libp2p for all connections to the peer to make sure that we have a non-transient one. I wonder if that'll cause any issues? I assume we'll re-identify when we create a new connection... That'll require some testing.

Ideally libp2p would let us filter protocols by connection type, but that's a larger change.

Thoughts @Jorropo? @aschmahmann?

for {
select {
case <-ctx.Done():
return
case evt := <-sub.Out():
switch e := evt.(type) {
case event.EvtPeerProtocolsUpdated:
if bsnet.hasBitswapProtocol(e.Added) {
log.Debugf("connecting to peer with updated protocol list: %s", e.Peer)
bsnet.connectEvtMgr.Connected(e.Peer)
continue
}

if bsnet.hasBitswapProtocol(e.Removed) && !bsnet.peerSupportsBitswap(e.Peer) {
log.Debugf("disconnecting from peer with updated protocol list: %s", e.Peer)
bsnet.connectEvtMgr.Disconnected(e.Peer)
}
case event.EvtPeerIdentificationCompleted:
if bsnet.peerSupportsBitswap(e.Peer) {
log.Debugf("connecting to peer with new identification: %s", e.Peer)
bsnet.connectEvtMgr.Connected(e.Peer)
}
}
}
}
}

func (bsnet *impl) peerSupportsBitswap(peerID peer.ID) bool {
protocols, err := bsnet.host.Peerstore().SupportsProtocols(peerID, bsnet.supportedProtocols...)
return err == nil && len(protocols) > 0
}

func (bsnet *impl) hasBitswapProtocol(protos []protocol.ID) bool {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range protos {
switch p {
case bsnet.protocolBitswap, bsnet.protocolBitswapOneOne, bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers:
return true
}
}
return false
}

func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
Expand Down Expand Up @@ -460,6 +544,7 @@ func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
return
}

log.Debugf("peer disconnected: %s", v.RemotePeer())
nn.impl().connectEvtMgr.Disconnected(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {}
Expand Down
63 changes: 63 additions & 0 deletions bitswap/network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,66 @@
testNetworkCounters(t, 10-n, n)
}
}

func TestPeerDiscovery(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

mn := mocknet.New()
defer mn.Close()

mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}

// start 2 disconnected nodes
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

bsnet1 := streamNet.Adapter(p1)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
bsnet1.Start(r1)
t.Cleanup(bsnet1.Stop)
bsnet2.Start(r2)
t.Cleanup(bsnet2.Stop)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}

// send request from node 1 to node 2
blockGenerator := blocksutil.NewBlockGenerator()

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-check / All

undefined: blocksutil (compile)

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go this)

undefined: blocksutil

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go next)

undefined: blocksutil

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go this)

undefined: blocksutil

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go next)

undefined: blocksutil

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go this)

undefined: blocksutil

Check failure on line 706 in bitswap/network/ipfs_impl_test.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go next)

undefined: blocksutil
block := blockGenerator.Next()
sent := bsmsg.New(false)
sent.AddBlock(block)

err = bsnet1.SendMessage(ctx, p2.ID(), sent)
if err != nil {
t.Fatal(err)
}

// node 2 should connect to node 1
select {
case <-ctx.Done():
t.Fatal("did not connect peer")
case <-r2.connectionEvent:
}

// verify the message is received
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r2.messageReceived:
}

sender := r2.lastSender
if sender != p1.ID() {
t.Fatal("received message from wrong node")
}
}
Loading