Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement relay connectivity loop #642

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if err != nil {
w.log.Error("creating localnode", zap.Error(err))
}

//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(uint(w.opts.maxPeerConnections), w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
maxOutPeers := int(w.peermanager.OutRelayPeersTarget)

// Setup peer connection strategy
Expand All @@ -257,6 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
w.peermanager.SetPeerConnector(w.peerConnector)

if w.opts.enableDiscV5 {
err := w.mountDiscV5()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type PeerData struct {
ENR *enode.Node
}

// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex

Expand All @@ -54,12 +55,17 @@ type PeerConnectionStrategy struct {
logger *zap.Logger
}

// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// NewPeerConnectionStrategy creates a utility to connect to peers,
// but only if we have not recently tried connecting to them already.
//
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int,
dialTimeout time.Duration, backoff backoff.BackoffFactory,
logger *zap.Logger) (*PeerConnectionStrategy, error) {

cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
Expand Down Expand Up @@ -109,16 +115,18 @@ func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-c

}

// Sets the host to be able to mount or consume a protocol
// SetHost sets the host to be able to mount or consume a protocol
func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}

// SetPeerManager sets the peermanager in order to utilize add peer
func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) {
c.pm = pm
}

// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
Expand All @@ -139,6 +147,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
return nil
}

// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
Expand Down Expand Up @@ -176,9 +185,9 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
return
case <-ticker.C:
isPaused := c.isPaused()
_, outRelayPeers, err := c.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection()
_, outRelayPeers, err := c.pm.GroupPeersByDirection()
if err != nil {
c.logger.Info("Failed to get outRelayPeers from peerstore", zap.Error(err))
c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err))
continue
}
numPeers := outRelayPeers.Len()
Expand Down Expand Up @@ -245,6 +254,28 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {

const maxActiveDials = 5

func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
return false
}

tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
return true
}

func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()

Expand All @@ -262,51 +293,34 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
return
}

if pi.ID == c.host.ID() || pi.ID == "" {
continue
}

if c.host.Network().Connectedness(pi.ID) == network.Connected {
if pi.ID == c.host.ID() || pi.ID == "" ||
c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}

c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}

tv.nextTry = now.Add(tv.strat.Delay())
if c.canDialPeer(pi) {
sem <- struct{}{}
c.wg.Add(1)
go c.dialPeer(pi, sem)
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
continue
}
c.mux.Unlock()

sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)

case <-ctx.Done():
return
}
}
}

func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}
Loading