Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve peer connection handling #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 86 additions & 9 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -87,6 +88,8 @@ type impl struct {

// inbound messages from the network are forwarded to the receiver
receivers []Receiver

cancel context.CancelFunc
}

type streamMessageSender struct {
Expand Down Expand Up @@ -354,15 +357,96 @@ func (bsnet *impl) Start(r ...Receiver) {
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
}
for _, proto := range bsnet.supportedProtocols {
log.Debugf("setting up handler for protocol: %s", proto)
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
}

// try to subscribe to libp2p events that indicate a change in connection state
// if this fails, continue as normal
err := bsnet.trySubscribePeerUpdates()
if err != nil {
log.Errorf("failed to subscribe to libp2p events: %s", err)
}

// listen for disconnects and start processing the events
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
bsnet.connectEvtMgr.Start()
}

func (bsnet *impl) Stop() {
bsnet.connectEvtMgr.Stop()
bsnet.host.Network().StopNotify((*netNotifiee)(bsnet))
bsnet.cancel()
}

func (bsnet *impl) trySubscribePeerUpdates() error {
// first, subscribe to libp2p events that indicate a change in connection state
sub, err := bsnet.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerProtocolsUpdated{},
&event.EvtPeerIdentificationCompleted{},
})
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
bsnet.cancel = cancel

go bsnet.peerUpdatedSubscription(ctx, sub)

// next, add any peers with existing connections that support bitswap protocols
for _, conn := range bsnet.host.Network().Conns() {
peerID := conn.RemotePeer()
if bsnet.peerSupportsBitswap(peerID) {
log.Debugf("connecting to existing peer: %s", peerID)
bsnet.connectEvtMgr.Connected(peerID)
}
}

return nil
}

func (bsnet *impl) peerUpdatedSubscription(ctx context.Context, sub event.Subscription) {
for {
select {
case <-ctx.Done():
return
case evt := <-sub.Out():
switch e := evt.(type) {
case event.EvtPeerProtocolsUpdated:
if bsnet.hasBitswapProtocol(e.Added) {
log.Debugf("connecting to peer with updated protocol list: %s", e.Peer)
bsnet.connectEvtMgr.Connected(e.Peer)
continue
}

if bsnet.hasBitswapProtocol(e.Removed) && !bsnet.peerSupportsBitswap(e.Peer) {
log.Debugf("disconnecting from peer with updated protocol list: %s", e.Peer)
bsnet.connectEvtMgr.Disconnected(e.Peer)
}
case event.EvtPeerIdentificationCompleted:
if bsnet.peerSupportsBitswap(e.Peer) {
log.Debugf("connecting to peer with new identification: %s", e.Peer)
bsnet.connectEvtMgr.Connected(e.Peer)
}
}
}
}
}

func (bsnet *impl) peerSupportsBitswap(peerID peer.ID) bool {
protocols, err := bsnet.host.Peerstore().SupportsProtocols(peerID, bsnet.supportedProtocols...)
return err == nil && len(protocols) > 0
}

func (bsnet *impl) hasBitswapProtocol(protos []protocol.ID) bool {
for _, p := range protos {
switch p {
case bsnet.protocolBitswap, bsnet.protocolBitswapOneOne, bsnet.protocolBitswapOneZero, bsnet.protocolBitswapNoVers:
return true
}
}
return false
}

func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
Expand Down Expand Up @@ -450,23 +534,16 @@ func (nn *netNotifiee) impl() *impl {
return (*impl)(nn)
}

func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
// ignore transient connections
if v.Stat().Transient {
return
}

nn.impl().connectEvtMgr.Connected(v.RemotePeer())
}

func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
// Only record a "disconnect" when we actually disconnect.
if n.Connectedness(v.RemotePeer()) == network.Connected {
return
}

log.Debugf("peer disconnected: %s", v.RemotePeer())
nn.impl().connectEvtMgr.Disconnected(v.RemotePeer())
}
func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {}
func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
Expand Down
63 changes: 63 additions & 0 deletions bitswap/network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,66 @@ func TestNetworkCounters(t *testing.T) {
testNetworkCounters(t, 10-n, n)
}
}

func TestPeerDiscovery(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

mn := mocknet.New()
defer mn.Close()

mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}

// start 2 disconnected nodes
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

bsnet1 := streamNet.Adapter(p1)
bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
bsnet1.Start(r1)
t.Cleanup(bsnet1.Stop)
bsnet2.Start(r2)
t.Cleanup(bsnet2.Stop)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}

// send request from node 1 to node 2
blockGenerator := blocksutil.NewBlockGenerator()
block := blockGenerator.Next()
sent := bsmsg.New(false)
sent.AddBlock(block)

err = bsnet1.SendMessage(ctx, p2.ID(), sent)
if err != nil {
t.Fatal(err)
}

// node 2 should connect to node 1
select {
case <-ctx.Done():
t.Fatal("did not connect peer")
case <-r2.connectionEvent:
}

// verify the message is received
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r2.messageReceived:
}

sender := r2.lastSender
if sender != p1.ID() {
t.Fatal("received message from wrong node")
}
}