Skip to content

Commit

Permalink
chore: fix codeclimate issues
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Aug 14, 2023
1 parent b66b13f commit d17ce1f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 43 deletions.
100 changes: 57 additions & 43 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type PeerData struct {
ENR *enode.Node
}

// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex

Expand All @@ -54,12 +55,17 @@ type PeerConnectionStrategy struct {
logger *zap.Logger
}

// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// NewPeerConnectionStrategy creates a utility to connect to peers,
// but only if we have not recently tried connecting to them already.
//
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int,
dialTimeout time.Duration, backoff backoff.BackoffFactory,
logger *zap.Logger) (*PeerConnectionStrategy, error) {

cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
Expand Down Expand Up @@ -109,16 +115,18 @@ func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-c

}

// Sets the host to be able to mount or consume a protocol
// SetHost sets the host to be able to mount or consume a protocol
func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}

// SetPeerManager sets the peermanager in order to utilize add peer
func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) {
c.pm = pm
}

// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
// Start attempts to connect to the peers passed in by peerCh.
// Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
Expand All @@ -139,6 +147,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
return nil
}

// Stop terminates the peer-connector
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
Expand Down Expand Up @@ -245,6 +254,28 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {

const maxActiveDials = 5

func (c *PeerConnectionStrategy) canDailPeer(pi peer.AddrInfo) bool {
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
return false
}

tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
return true
}

func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()

Expand All @@ -262,51 +293,34 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
return
}

if pi.ID == c.host.ID() || pi.ID == "" {
continue
}

if c.host.Network().Connectedness(pi.ID) == network.Connected {
if pi.ID == c.host.ID() || pi.ID == "" ||
c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}

c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}

tv.nextTry = now.Add(tv.strat.Delay())
if c.canDailPeer(pi) {
sem <- struct{}{}
c.wg.Add(1)
go c.dialPeer(pi, sem)
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
continue
}
c.mux.Unlock()

sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)

case <-ctx.Done():
return
}
}
}

func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
defer c.wg.Done()
c.RLock()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
c.RUnlock()
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi)
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}
2 changes: 2 additions & 0 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager {
return pm
}

// SetHost sets the host to be used in order to access the peerStore.
func (pm *PeerManager) SetHost(host host.Host) {
pm.host = host
}

// SetPeerConnector sets the peer connector to be used for establishing relay connections.
func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {
pm.peerConnector = pc
}
Expand Down

0 comments on commit d17ce1f

Please sign in to comment.